From d6c6ef59b6f788e46998b2952ad5cbf243bba75e Mon Sep 17 00:00:00 2001 From: Ken Roberts Date: Wed, 9 Feb 2022 17:49:23 +0000 Subject: [PATCH] Update tests for projectors.pjlinkcommands.process_pjlink() --- openlp/core/projectors/pjlinkcommands.py | 23 +- .../{messages => commands}/__init__.py | 0 .../{messages => commands}/test_clss.py | 0 .../{messages => commands}/test_misc.py | 0 .../projectors/commands/test_pjlink.py | 303 ++++++++++++++++++ .../{messages => commands}/test_routing.py | 0 .../projectors/pjlink/test_get_data.py | 17 +- .../projectors/test_projector_commands_03.py | 239 -------------- 8 files changed, 320 insertions(+), 262 deletions(-) rename tests/openlp_core/projectors/{messages => commands}/__init__.py (100%) rename tests/openlp_core/projectors/{messages => commands}/test_clss.py (100%) rename tests/openlp_core/projectors/{messages => commands}/test_misc.py (100%) create mode 100644 tests/openlp_core/projectors/commands/test_pjlink.py rename tests/openlp_core/projectors/{messages => commands}/test_routing.py (100%) delete mode 100644 tests/openlp_core/projectors/test_projector_commands_03.py diff --git a/openlp/core/projectors/pjlinkcommands.py b/openlp/core/projectors/pjlinkcommands.py index 8bd3ab787..554ad5b05 100644 --- a/openlp/core/projectors/pjlinkcommands.py +++ b/openlp/core/projectors/pjlinkcommands.py @@ -382,41 +382,40 @@ def process_pjlink(projector, data): :param projector: Projector instance :param data: Initial packet with authentication scheme """ - log.debug('({ip}) Processing PJLINK command'.format(ip=projector.entry.name)) + log.debug(f'({projector.entry.name}) Processing PJLINK command') chk = data.split(' ') - if len(chk[0]) != 1: + if (len(chk[0]) != 1) or (chk[0] not in ('0', '1')): # Invalid - after splitting, first field should be 1 character, either '0' or '1' only - log.error('({ip}) Invalid initial authentication scheme - aborting'.format(ip=projector.entry.name)) + log.error(f'({projector.entry.name}) Invalid initial authentication scheme - aborting') return E_AUTHENTICATION elif chk[0] == '0': # Normal connection no authentication if len(chk) > 1: # Invalid data - there should be nothing after a normal authentication scheme - log.error('({ip}) Normal connection with extra information - aborting'.format(ip=projector.entry.name)) + log.error(f'({projector.entry.name}) Normal connection with extra information - aborting') return E_NO_AUTHENTICATION elif projector.pin: - log.error('({ip}) Normal connection but PIN set - aborting'.format(ip=projector.entry.name)) + log.error(f'({projector.entry.name}) Normal connection but PIN set - aborting') return E_NO_AUTHENTICATION - log.debug('({ip}) PJLINK: Returning S_CONNECT'.format(ip=projector.entry.name)) + log.debug(f'({projector.entry.name}) PJLINK: Returning S_CONNECT') return S_CONNECT elif chk[0] == '1': if len(chk) < 2: # Not enough information for authenticated connection - log.error('({ip}) Authenticated connection but not enough info - aborting'.format(ip=projector.entry.name)) + log.error(f'({projector.entry.name}) Authenticated connection but not enough info - aborting') return E_NO_AUTHENTICATION elif len(chk[-1]) != PJLINK_TOKEN_SIZE: # Bad token - incorrect size - log.error('({ip}) Authentication token invalid (size) - aborting'.format(ip=projector.entry.name)) + log.error(f'({projector.entry.name}) Authentication token invalid (size) - aborting') return E_NO_AUTHENTICATION elif not all(c in string.hexdigits for c in chk[-1]): # Bad token - not hexadecimal - log.error('({ip}) Authentication token invalid (not a hexadecimal number) ' - '- aborting'.format(ip=projector.entry.name)) + log.error(f'({projector.entry.name}) Authentication token invalid (not a hexadecimal number) - aborting') return E_NO_AUTHENTICATION elif not projector.pin: - log.error('({ip}) Authenticate connection but no PIN - aborting'.format(ip=projector.entry.name)) + log.error(f'({projector.entry.name}) Authenticate connection but no PIN - aborting') return E_NO_AUTHENTICATION - log.debug('({ip}) PJLINK: Returning S_AUTHENTICATE'.format(ip=projector.entry.name)) + log.debug(f'({projector.entry.name}) PJLINK: Returning S_AUTHENTICATE') return S_AUTHENTICATE diff --git a/tests/openlp_core/projectors/messages/__init__.py b/tests/openlp_core/projectors/commands/__init__.py similarity index 100% rename from tests/openlp_core/projectors/messages/__init__.py rename to tests/openlp_core/projectors/commands/__init__.py diff --git a/tests/openlp_core/projectors/messages/test_clss.py b/tests/openlp_core/projectors/commands/test_clss.py similarity index 100% rename from tests/openlp_core/projectors/messages/test_clss.py rename to tests/openlp_core/projectors/commands/test_clss.py diff --git a/tests/openlp_core/projectors/messages/test_misc.py b/tests/openlp_core/projectors/commands/test_misc.py similarity index 100% rename from tests/openlp_core/projectors/messages/test_misc.py rename to tests/openlp_core/projectors/commands/test_misc.py diff --git a/tests/openlp_core/projectors/commands/test_pjlink.py b/tests/openlp_core/projectors/commands/test_pjlink.py new file mode 100644 index 000000000..c44806755 --- /dev/null +++ b/tests/openlp_core/projectors/commands/test_pjlink.py @@ -0,0 +1,303 @@ +# -*- coding: utf-8 -*- + +########################################################################## +# OpenLP - Open Source Lyrics Projection # +# ---------------------------------------------------------------------- # +# Copyright (c) 2008-2022 OpenLP Developers # +# ---------------------------------------------------------------------- # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +########################################################################## +""" +Test process_pjlink method +""" + +import logging +import openlp.core.projectors.pjlinkcommands +import pytest + +from openlp.core.projectors.pjlinkcommands import process_pjlink +from openlp.core.projectors.constants import E_AUTHENTICATION, E_NO_AUTHENTICATION, \ + S_AUTHENTICATE, S_CONNECT + +from tests.resources.projector.data import TEST_PIN, TEST_SALT + +test_module = openlp.core.projectors.pjlinkcommands.__name__ + + +class FakeProjector(object): + """ + Helper test class + """ + def __init__(self, port=4352, name="Faker"): + self.entry = self + self.name = name + self.pin = None + self.port = port + + +@pytest.fixture +def fake_pjlink(): + """ + Helper since we don't need a full-blown PJLink() instance + """ + dumb_projector = FakeProjector() + yield dumb_projector + del(dumb_projector) + + +def test_normal_no_authentication_type(fake_pjlink, caplog): + """ + Test login prompt with not enough parameters + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = '' + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.ERROR, + f'({fake_pjlink.entry.name}) Invalid initial authentication scheme - aborting') + ] + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == E_AUTHENTICATION, 'Should have returned E_AUTHENTICATION' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_normal_invalid_value(fake_pjlink, caplog): + """ + Test login prompt with parameter neither 0 (no authentication) or 1 (authenticate) + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = '2' + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.ERROR, + f'({fake_pjlink.entry.name}) Invalid initial authentication scheme - aborting') + ] + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == E_AUTHENTICATION, 'Should have returned E_AUTHENTICATION' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_normal_extra_data(fake_pjlink, caplog): + """ + Test login prompt with no authentication but extra data + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = f'0 {TEST_SALT}' + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.ERROR, + f'({fake_pjlink.entry.name}) Normal connection with extra information - aborting') + ] + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == E_NO_AUTHENTICATION, 'Should have returned E_NO_AUTHENTICATION' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_normal_with_pin(fake_pjlink, caplog): + """ + Test login prompt with no authentication but pin set + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = '0' + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.ERROR, + f'({fake_pjlink.entry.name}) Normal connection but PIN set - aborting') + ] + fake_pjlink.pin = TEST_SALT + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == E_NO_AUTHENTICATION, 'Should have returned E_NO_AUTHENTICATION' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_normal_login(fake_pjlink, caplog): + """ + Test login prompt with no authentication + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = '0' + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) PJLINK: Returning S_CONNECT') + ] + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == S_CONNECT, 'Should have returned S_CONNECT' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_authenticate_no_salt(fake_pjlink, caplog): + """ + Test authenticate login prompt with no salt + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = '1' + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.ERROR, + f'({fake_pjlink.entry.name}) Authenticated connection but not enough info - aborting') + ] + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == E_NO_AUTHENTICATION, 'Should have returned E_NO_AUTHENTICATION' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_authenticate_short_salt(fake_pjlink, caplog): + """ + Test authenticate login prompt with salt length too short + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = f'1 {TEST_SALT[:-1]}' + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.ERROR, + f'({fake_pjlink.entry.name}) Authentication token invalid (size) - aborting') + ] + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == E_NO_AUTHENTICATION, 'Should have returned E_NO_AUTHENTICATION' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_authenticate_long_salt(fake_pjlink, caplog): + """ + Test authenticate login prompt with salt length too long + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = f'1 {TEST_SALT}Z' + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.ERROR, + f'({fake_pjlink.entry.name}) Authentication token invalid (size) - aborting') + ] + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == E_NO_AUTHENTICATION, 'Should have returned E_NO_AUTHENTICATION' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_authenticate_invalid_salt(fake_pjlink, caplog): + """ + Test authenticate login prompt with salt not a hexadecimal number + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = '1 1a2b3c4g' + print(t_data) + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.ERROR, + f'({fake_pjlink.entry.name}) Authentication token invalid (not a hexadecimal number) - aborting') + ] + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == E_NO_AUTHENTICATION, 'Should have returned E_NO_AUTHENTICATION' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_authenticate_no_pin(fake_pjlink, caplog): + """ + Test authenticate login prompt with no PIN set + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = f'1 {TEST_SALT}' + print(t_data) + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.ERROR, + f'({fake_pjlink.entry.name}) Authenticate connection but no PIN - aborting') + ] + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == E_NO_AUTHENTICATION, 'Should have returned E_NO_AUTHENTICATION' + assert caplog.record_tuples == logs, 'Invalid log entries' + + +def test_authenticate_login(fake_pjlink, caplog): + """ + Test authenticate login prompt with PIN set + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + t_data = f'1 {TEST_SALT}' + print(t_data) + logs = [(f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) Processing PJLINK command'), + (f'{test_module}', logging.DEBUG, + f'({fake_pjlink.entry.name}) PJLINK: Returning S_AUTHENTICATE') + ] + fake_pjlink.pin = TEST_PIN + + # WHEN: Calling function + caplog.clear() + t_chk = process_pjlink(projector=fake_pjlink, data=t_data) + + # THEN: Error returned and log entries + assert t_chk == S_AUTHENTICATE, 'Should have returned S_AUTHENTICATE' + assert caplog.record_tuples == logs, 'Invalid log entries' diff --git a/tests/openlp_core/projectors/messages/test_routing.py b/tests/openlp_core/projectors/commands/test_routing.py similarity index 100% rename from tests/openlp_core/projectors/messages/test_routing.py rename to tests/openlp_core/projectors/commands/test_routing.py diff --git a/tests/openlp_core/projectors/pjlink/test_get_data.py b/tests/openlp_core/projectors/pjlink/test_get_data.py index 1da09f23b..e8c280419 100644 --- a/tests/openlp_core/projectors/pjlink/test_get_data.py +++ b/tests/openlp_core/projectors/pjlink/test_get_data.py @@ -29,6 +29,7 @@ import openlp.core.projectors.pjlink from unittest.mock import DEFAULT, patch from openlp.core.projectors.constants import PJLINK_MAX_PACKET, PJLINK_PREFIX, \ E_AUTHENTICATION, S_AUTHENTICATE, S_CONNECT, S_CONNECTED, S_DATA_OK +from tests.resources.projector.data import TEST_HASH, TEST_PIN, TEST_SALT test_module = openlp.core.projectors.pjlink.__name__ test_qmd5 = openlp.core.common.__name__ @@ -414,15 +415,9 @@ def test_s_authenticate(pjlink, caplog): - Reply "%1CLSS=1" ''' # GIVEN: Initial setup - # t_pin = "JBMIAProjectorLink" - # t_salt = "498e4a67" - # t_hash = "5d8409bc1c3fa39749434aa3a5c38682" - t_salt = '498e4a67' - t_hash = '5d8409bc1c3fa39749434aa3a5c38682' - t_pin = "JBMIAProjectorLink" t_cmd = "PJLINK" t_ver = "1" - t_data = f"1 {t_salt}" + t_data = f"1 {TEST_SALT}" t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}" logs = [(f"{test_module}", logging.DEBUG, f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), @@ -437,9 +432,9 @@ def test_s_authenticate(pjlink, caplog): (f'{test_module}', logging.DEBUG, f'({pjlink.entry.name}) Connecting with pin'), (f'{test_qmd5}.__init__', logging.DEBUG, - f'qmd5_hash(salt="b\'{t_salt}\'"'), + f'qmd5_hash(salt="b\'{TEST_SALT}\'"'), (f'{test_qmd5}.__init__', logging.DEBUG, - f'qmd5_hash() returning "b\'{t_hash}\'"') + f'qmd5_hash() returning "b\'{TEST_HASH}\'"') ] with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ patch.multiple(pjlink, @@ -450,7 +445,7 @@ def test_s_authenticate(pjlink, caplog): readyRead=DEFAULT, get_socket=DEFAULT) as mock_pjlink: mock_command.return_value = S_AUTHENTICATE - pjlink.pin = t_pin + pjlink.pin = TEST_PIN # WHEN: get_data called with OK caplog.set_level(logging.DEBUG) @@ -461,7 +456,7 @@ def test_s_authenticate(pjlink, caplog): assert caplog.record_tuples == logs, "Invalid log entries" mock_pjlink['receive_data_signal'].assert_called_once() mock_pjlink['_trash_buffer'].assert_not_called() - mock_pjlink['send_command'].assert_called_with(cmd='CLSS', salt=t_hash, priority=True) + mock_pjlink['send_command'].assert_called_with(cmd='CLSS', salt=TEST_HASH, priority=True) mock_pjlink['change_status'].assert_called_with(S_CONNECTED) mock_pjlink['readyRead'].connect.assert_called_once_with(mock_pjlink['get_socket']) mock_command.assert_called_with(pjlink, t_cmd, t_data) diff --git a/tests/openlp_core/projectors/test_projector_commands_03.py b/tests/openlp_core/projectors/test_projector_commands_03.py deleted file mode 100644 index 11ccf652c..000000000 --- a/tests/openlp_core/projectors/test_projector_commands_03.py +++ /dev/null @@ -1,239 +0,0 @@ -# -*- coding: utf-8 -*- - -########################################################################## -# OpenLP - Open Source Lyrics Projection # -# ---------------------------------------------------------------------- # -# Copyright (c) 2008-2022 OpenLP Developers # -# ---------------------------------------------------------------------- # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -########################################################################## -""" -Package to test the openlp.core.projectors.pjlink commands package. -""" -from unittest.mock import call, patch - -import openlp.core.projectors.pjlink -from openlp.core.projectors.constants import E_NO_AUTHENTICATION, STATUS_CODE, S_AUTHENTICATE, S_CONNECT -from openlp.core.projectors.pjlinkcommands import process_command -from tests.resources.projector.data import TEST_PIN, TEST_SALT - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'log') -def test_process_pjlink_authenticate(mock_log, pjlink): - """ - Test initial connection prompt with authentication - """ - # GIVEN: Initial mocks and data - log_error_calls = [] - log_warning_calls = [] - log_debug_calls = [call('({ip}) Processing command "PJLINK" with data "1 {data}"'.format(ip=pjlink.name, - data=TEST_SALT)), - call('({ip}) Calling function for PJLINK'.format(ip=pjlink.name)), - call('({ip}) Processing PJLINK command'.format(ip=pjlink.name)), - call('({ip}) PJLINK: Returning {data}'.format(ip=pjlink.name, - data=STATUS_CODE[S_AUTHENTICATE]))] - - pjlink.pin = TEST_PIN - - # WHEN: process_pjlink called with no authentication required - chk = process_command(projector=pjlink, cmd='PJLINK', data='1 {salt}'.format(salt=TEST_SALT)) - - # THEN: proper processing should have occured - mock_log.error.assert_has_calls(log_error_calls) - mock_log.warning.assert_has_calls(log_warning_calls) - mock_log.debug.assert_has_calls(log_debug_calls) - assert chk == S_AUTHENTICATE, 'Should have returned {data}'.format(data=STATUS_CODE[S_AUTHENTICATE]) - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'log') -def test_process_pjlink_authenticate_pin_not_set_error(mock_log, pjlink): - """ - Test initial connection prompt with authentication and no pin set - """ - # GIVEN: Initial mocks and data - log_error_calls = [call('({ip}) Authenticate connection but no PIN - aborting'.format(ip=pjlink.name))] - log_warning_calls = [] - log_debug_calls = [call('({ip}) Processing command "PJLINK" with data "1 {data}"'.format(ip=pjlink.name, - data=TEST_SALT)), - call('({ip}) Calling function for PJLINK'.format(ip=pjlink.name)), - call('({ip}) Processing PJLINK command'.format(ip=pjlink.name))] - - pjlink.pin = None - - # WHEN: process_pjlink called with no authentication required - chk = process_command(projector=pjlink, cmd='PJLINK', data='1 {salt}'.format(salt=TEST_SALT)) - - # THEN: proper processing should have occured - mock_log.error.assert_has_calls(log_error_calls) - mock_log.warning.assert_has_calls(log_warning_calls) - mock_log.debug.assert_has_calls(log_debug_calls) - assert chk == E_NO_AUTHENTICATION, \ - 'Should have returned {data}'.format(data=STATUS_CODE[E_NO_AUTHENTICATION]) - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'log') -def test_process_pjlink_authenticate_token_invalid(mock_log, pjlink): - """ - Test initial connection prompt with authentication and bad token - """ - # GIVEN: Initial mocks and data - bad_token = 'abcdefgh' - log_error_calls = [call('({ip}) Authentication token invalid (not a hexadecimal number) - ' - 'aborting'.format(ip=pjlink.name))] - log_warning_calls = [] - log_debug_calls = [call('({ip}) Processing command "PJLINK" with data ' - '"1 {data}"'.format(ip=pjlink.name, data=bad_token)), - call('({ip}) Calling function for PJLINK'.format(ip=pjlink.name)), - call('({ip}) Processing PJLINK command'.format(ip=pjlink.name))] - pjlink.pin = TEST_SALT - - # WHEN: process_pjlink called with bad token - chk = process_command(projector=pjlink, cmd='PJLINK', data='1 {data}'.format(data=bad_token)) - - # THEN: proper processing should have occured - mock_log.error.assert_has_calls(log_error_calls) - mock_log.warning.assert_has_calls(log_warning_calls) - mock_log.debug.assert_has_calls(log_debug_calls) - assert chk == E_NO_AUTHENTICATION, \ - 'Should have returned {data}'.format(data=STATUS_CODE[E_NO_AUTHENTICATION]) - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'log') -def test_process_pjlink_authenticate_token_length(mock_log, pjlink): - """ - Test initial connection prompt with authentication and bad token - """ - # GIVEN: Initial mocks and data - bad_token = '1234abcde' # Length should be 8, this is 9 - log_error_calls = [call('({ip}) Authentication token invalid (size) - ' - 'aborting'.format(ip=pjlink.name))] - log_warning_calls = [] - log_debug_calls = [call('({ip}) Processing command "PJLINK" with data ' - '"1 {data}"'.format(ip=pjlink.name, data=bad_token)), - call('({ip}) Calling function for PJLINK'.format(ip=pjlink.name)), - call('({ip}) Processing PJLINK command'.format(ip=pjlink.name))] - pjlink.pin = TEST_SALT - - # WHEN: process_pjlink called with bad token - chk = process_command(projector=pjlink, cmd='PJLINK', data='1 {data}'.format(data=bad_token)) - - # THEN: proper processing should have occured - mock_log.error.assert_has_calls(log_error_calls) - mock_log.warning.assert_has_calls(log_warning_calls) - mock_log.debug.assert_has_calls(log_debug_calls) - assert chk == E_NO_AUTHENTICATION, \ - 'Should have returned {data}'.format(data=STATUS_CODE[E_NO_AUTHENTICATION]) - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'log') -def test_process_pjlink_authenticate_token_missing(mock_log, pjlink): - """ - Test initial connection prompt with authentication and missing token - """ - # GIVEN: Initial mocks and data - log_error_calls = [call('({ip}) Authenticated connection but not enough info - ' - 'aborting'.format(ip=pjlink.name))] - log_warning_calls = [] - log_debug_calls = [call('({ip}) Processing command "PJLINK" with data "1"'.format(ip=pjlink.name)), - call('({ip}) Calling function for PJLINK'.format(ip=pjlink.name)), - call('({ip}) Processing PJLINK command'.format(ip=pjlink.name))] - - pjlink.pin = TEST_SALT - - # WHEN: process_pjlink called with bad token - chk = process_command(projector=pjlink, cmd='PJLINK', data='1') - - # THEN: proper processing should have occured - mock_log.error.assert_has_calls(log_error_calls) - mock_log.warning.assert_has_calls(log_warning_calls) - mock_log.debug.assert_has_calls(log_debug_calls) - assert chk == E_NO_AUTHENTICATION, \ - 'Should have returned {data}'.format(data=STATUS_CODE[E_NO_AUTHENTICATION]) - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'log') -def test_process_pjlink_normal(mock_log, pjlink): - """ - Test processing PJLINK initial prompt - """ - # GIVEN: Mocks and data - log_error_calls = [] - log_warning_calls = [] - log_debug_calls = [call('({ip}) Processing command "PJLINK" with data "0"'.format(ip=pjlink.name)), - call('({ip}) Calling function for PJLINK'.format(ip=pjlink.name)), - call('({ip}) Processing PJLINK command'.format(ip=pjlink.name)), - call('({ip}) PJLINK: Returning {data}'.format(ip=pjlink.name, - data=STATUS_CODE[S_CONNECT]))] - - pjlink.pin = None - - # WHEN: process_pjlink called with no authentication required - chk = process_command(projector=pjlink, cmd='PJLINK', data="0") - - # THEN: proper processing should have occured - mock_log.error.assert_has_calls(log_error_calls) - mock_log.warning.assert_has_calls(log_warning_calls) - mock_log.debug.assert_has_calls(log_debug_calls) - assert chk == S_CONNECT, 'Should have returned {data}'.format(data=STATUS_CODE[S_CONNECT]) - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'log') -def test_process_pjlink_normal_pin_set_error(mock_log, pjlink): - """ - Test process_pjlinnk called with no authentication but pin is set - """ - # GIVEN: Initial mocks and data - log_error_calls = [call('({ip}) Normal connection but PIN set - ' - 'aborting'.format(ip=pjlink.name))] - log_warning_calls = [] - log_debug_calls = [call('({ip}) Processing command "PJLINK" with data "0"'.format(ip=pjlink.name)), - call('({ip}) Calling function for PJLINK'.format(ip=pjlink.name)), - call('({ip}) Processing PJLINK command'.format(ip=pjlink.name))] - pjlink.pin = TEST_PIN - - # WHEN: process_pjlink called with invalid authentication scheme - chk = process_command(projector=pjlink, cmd='PJLINK', data='0') - - # THEN: Proper calls should be made - mock_log.error.assert_has_calls(log_error_calls) - mock_log.warning.assert_has_calls(log_warning_calls) - mock_log.debug.assert_has_calls(log_debug_calls) - assert chk == E_NO_AUTHENTICATION, \ - 'Should have returned {data}'.format(data=STATUS_CODE[E_NO_AUTHENTICATION]) - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'log') -def test_process_pjlink_normal_with_token(mock_log, pjlink): - """ - Test process_pjlinnk called with no authentication but pin is set - """ - # GIVEN: Initial mocks and data - log_error_calls = [call('({ip}) Normal connection with extra information - ' - 'aborting'.format(ip=pjlink.name))] - log_warning_calls = [] - log_debug_calls = [call('({ip}) Processing command "PJLINK" with data ' - '"0 {data}"'.format(ip=pjlink.name, data=TEST_SALT)), - call('({ip}) Calling function for PJLINK'.format(ip=pjlink.name)), - call('({ip}) Processing PJLINK command'.format(ip=pjlink.name))] - pjlink.pin = TEST_PIN - - # WHEN: process_pjlink called with invalid authentication scheme - chk = process_command(projector=pjlink, cmd='PJLINK', data='0 {data}'.format(data=TEST_SALT)) - - # THEN: Proper calls should be made - mock_log.error.assert_has_calls(log_error_calls) - mock_log.warning.assert_has_calls(log_warning_calls) - mock_log.debug.assert_has_calls(log_debug_calls) - assert chk == E_NO_AUTHENTICATION, \ - 'Should have returned {data}'.format(data=STATUS_CODE[E_NO_AUTHENTICATION])