From 2dfce1ad3292f0cf7970df71274edeaf366965d3 Mon Sep 17 00:00:00 2001 From: Ken Roberts Date: Sat, 5 Feb 2022 16:02:48 +0000 Subject: [PATCH] Issue #985 - TypeErr in PJLink.get_data() fix --- openlp/core/projectors/pjlink.py | 37 +- .../projectors/pjlink/test_get_data.py | 512 ++++++++++++++++++ .../test_projector_pjlink_cmd_routing.py | 115 ---- 3 files changed, 530 insertions(+), 134 deletions(-) create mode 100644 tests/openlp_core/projectors/pjlink/test_get_data.py delete mode 100644 tests/openlp_core/projectors/test_projector_pjlink_cmd_routing.py diff --git a/openlp/core/projectors/pjlink.py b/openlp/core/projectors/pjlink.py index 20448a50b..c61236292 100644 --- a/openlp/core/projectors/pjlink.py +++ b/openlp/core/projectors/pjlink.py @@ -545,8 +545,9 @@ class PJLink(QtNetwork.QTcpSocket): :param buff: Data to process. """ - log.debug('({ip}) get_data(buffer="{buff}"'.format(ip=self.entry.name, buff=buff)) - ignore_class = 'ignore_class' in kwargs + log.debug(f'({self.entry.name}) get_data(buffer="{buff}"') + ignore_class = False if 'ignore_class' not in kwargs else kwargs['ignore_class'] + log.debug(f'({self.entry.name}) Setting ignore_class to "{ignore_class}"') # NOTE: Class2 has changed to some values being UTF-8 data_in = decode(buff, 'utf-8') if isinstance(buff, bytes) else buff data = data_in.strip() @@ -556,7 +557,7 @@ class PJLink(QtNetwork.QTcpSocket): self._trash_buffer(msg='get_data(): Invalid packet - length') return elif len(data) > self.max_size: - self._trash_buffer(msg='get_data(): Invalid packet - too long ({length} bytes)'.format(length=len(data))) + self._trash_buffer(msg=f'get_data(): Invalid packet - too long ({len(data)} bytes)') return elif not data.startswith(PJLINK_PREFIX): self._trash_buffer(msg='get_data(): Invalid packet - PJLink prefix missing') @@ -566,10 +567,9 @@ class PJLink(QtNetwork.QTcpSocket): # data[8] = initial PJLink connection (after mangling) self._trash_buffer(msg='get_data(): Invalid reply - Does not have "="') return - log.debug('({ip}) get_data(): Checking new data "{data}"'.format(ip=self.entry.name, data=data)) + log.debug(f'({self.entry.name}) get_data(): Checking new data "{data}"') header, data = data.split('=') - log.debug('({ip}) get_data() header="{header}" data="{data}"'.format(ip=self.entry.name, - header=header, data=data)) + log.debug(f'({self.entry.name}) get_data() header="{header}" data="{data}"') # At this point, the header should contain: # "PVCCCC" # Where: @@ -577,20 +577,18 @@ class PJLink(QtNetwork.QTcpSocket): # V = PJLink class or version # C = PJLink command version, cmd = header[1], header[2:].upper() - log.debug('({ip}) get_data() version="{version}" cmd="{cmd}" data="{data}"'.format(ip=self.entry.name, - version=version, - cmd=cmd, - data=data)) + log.debug(f'({self.entry.name}) get_data() version="{version}" cmd="{cmd}" data="{data}"') if cmd not in PJLINK_VALID_CMD: - self._trash_buffer('get_data(): Invalid packet - unknown command "{data}"'.format(data=cmd)) + self._trash_buffer(msg=f'get_data(): Invalid packet - unknown command "{cmd}"') return elif version not in PJLINK_VALID_CMD[cmd]['version']: self._trash_buffer(msg='get_data() Command reply version does not match a valid command version') return elif int(self.pjlink_class) < int(version): + log.warning(f'({self.entry.name}) get_data(): pjlink_class={self.pjlink_class} packet={version}') if not ignore_class: - log.warning('({ip}) get_data(): Projector returned class reply higher ' - 'than projector stated class'.format(ip=self.entry.name)) + log.warning(f'({self.entry.name}) get_data(): Projector returned class reply higher ' + 'than projector stated class') return chk = process_command(self, cmd, data) @@ -599,26 +597,27 @@ class PJLink(QtNetwork.QTcpSocket): return # PJLink initial connection checks elif chk == S_DATA_OK: - # Previous command returned OK - log.debug('({ip}) OK returned - resending command'.format(ip=self.entry.name)) + # Previous command returned OK - resend command to retrieve status + log.debug(f'({self.entry.name}) OK returned - resending command') self.send_command(cmd=cmd, priority=True) elif chk == S_CONNECT: # Normal connection - log.debug('({ip}) Connecting normal'.format(ip=self.entry.name)) + log.debug(f'({self.entry.name}) Connecting normal') self.change_status(S_CONNECTED) self.send_command(cmd='CLSS', priority=True) self.readyRead.connect(self.get_socket) elif chk == S_AUTHENTICATE: # Connection with pin - log.debug('({ip}) Connecting with pin'.format(ip=self.entry.name)) - data_hash = str(qmd5_hash(salt=chk[1].encode('utf-8'), data=self.pin.encode('utf-8')), + log.debug(f'({self.entry.name}) Connecting with pin') + data_hash = str(qmd5_hash(salt=data.split()[1].encode('utf-8'), + data=self.pin.encode('utf-8')), encoding='ascii') self.change_status(S_CONNECTED) self.readyRead.connect(self.get_socket) self.send_command(cmd='CLSS', salt=data_hash, priority=True) elif chk == E_AUTHENTICATION: # Projector did not like our pin - log.warning('({ip}) Failed authentication - disconnecting'.format(ip=self.entry.name)) + log.warning(f'({self.entry.name}) Failed authentication - disconnecting') self.disconnect_from_host() self.projectorAuthentication.emit(self.entry.name) self.change_status(status=E_AUTHENTICATION) diff --git a/tests/openlp_core/projectors/pjlink/test_get_data.py b/tests/openlp_core/projectors/pjlink/test_get_data.py new file mode 100644 index 000000000..1da09f23b --- /dev/null +++ b/tests/openlp_core/projectors/pjlink/test_get_data.py @@ -0,0 +1,512 @@ +# -*- coding: utf-8 -*- + +########################################################################## +# OpenLP - Open Source Lyrics Projection # +# ---------------------------------------------------------------------- # +# Copyright (c) 2008-2022 OpenLP Developers # +# ---------------------------------------------------------------------- # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +########################################################################## +""" +Test PJLink.get_data 01 +""" + +import logging +import openlp.core.common +import openlp.core.projectors.pjlink + +from unittest.mock import DEFAULT, patch +from openlp.core.projectors.constants import PJLINK_MAX_PACKET, PJLINK_PREFIX, \ + E_AUTHENTICATION, S_AUTHENTICATE, S_CONNECT, S_CONNECTED, S_DATA_OK + +test_module = openlp.core.projectors.pjlink.__name__ +test_qmd5 = openlp.core.common.__name__ + + +def test_buff_short(pjlink, caplog): + """ + Test method with invalid short buffer + """ + # GIVEN: Initial setup + t_buff = "short" + t_err = 'get_data(): Invalid packet - length' + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"')] + + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT) as mock_pjlink: + # WHEN: get_data called + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err) + mock_pjlink['receive_data_signal'].assert_called_once() + mock_command.assert_not_called() + + +def test_buff_long(pjlink, caplog): + """ + Test method with invalid long buffer + """ + # GIVEN: Initial setup + t_buff = "X" * (PJLINK_MAX_PACKET + 10) + t_err = f'get_data(): Invalid packet - too long ({len(t_buff)} bytes)' + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"')] + + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT) as mock_pjlink: + # WHEN: get_data called + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err) + mock_pjlink['receive_data_signal'].assert_called_once() + mock_command.assert_not_called() + + +def test_invalid_prefix(pjlink, caplog): + """ + Test method with invalid PJLink prefix character + """ + # GIVEN: Initial setup + t_buff = "#1CLSS=OK" + t_err = 'get_data(): Invalid packet - PJLink prefix missing' + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"')] + + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT) as mock_pjlink: + # WHEN: get_data called + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err) + mock_pjlink['receive_data_signal'].assert_called_once() + mock_command.assert_not_called() + + +def test_missing_equal(pjlink, caplog): + """ + Test method with missing command/data separator + """ + # GIVEN: Initial setup + t_buff = f"{PJLINK_PREFIX}1CLSS OK" + t_err = 'get_data(): Invalid reply - Does not have "="' + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"')] + + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT) as mock_pjlink: + + # WHEN: get_data called + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err) + mock_pjlink['receive_data_signal'].assert_called_once() + mock_command.assert_not_called() + + +def test_invalid_command(pjlink, caplog): + """ + Test method with invalid command + """ + # GIVEN: Initial setup + t_cmd = "CLASSS" + t_ver = "1" + t_data = "1" + t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}" + t_err = f'get_data(): Invalid packet - unknown command "{t_cmd}"' + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"') + ] + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT) as mock_pjlink: + + # WHEN: get_data called + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err) + mock_pjlink['receive_data_signal'].assert_called_once() + mock_command.assert_not_called() + + +def test_mismatch_command_versions(pjlink, caplog): + """ + Test method with pjlink.pjlink_class < cmd.class + """ + # GIVEN: Initial setup + t_cmd = "CLSS" + t_ver = "2" + t_data = "1" + t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}" + t_err = 'get_data() Command reply version does not match a valid command version' + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"') + ] + + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT) as mock_pjlink: + + # WHEN: get_data called + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['receive_data_signal'].assert_called_once() + mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err) + mock_command.assert_not_called() + + +def test_mismatch_class_versions(pjlink, caplog): + """ + Test method with pjlink.pjlink_class < cmd.class + """ + # GIVEN: Initial setup + t_cmd = "FILT" + t_ver = "2" + t_data = "2" + t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}" + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.WARNING, + f'({pjlink.entry.name}) get_data(): pjlink_class={pjlink.pjlink_class} packet={t_ver}'), + (f'{test_module}', logging.WARNING, + f'({pjlink.entry.name}) get_data(): Projector returned class reply higher than projector stated class') + ] + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT) as mock_pjlink: + pjlink.pjlink_class = 1 # Set class to 1 and call with cmd class = 2 + + # WHEN: get_data called + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['receive_data_signal'].assert_called_once() + mock_command.assert_not_called() + + +def test_ignore_class_versions(pjlink, caplog): + """ + Test method with pjlink.pjlink_class < cmd.class + """ + # GIVEN: Initial setup + t_cmd = "FILT" + t_ver = "2" + t_data = "2" + t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}" + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "True"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.WARNING, + f'({pjlink.entry.name}) get_data(): pjlink_class={pjlink.pjlink_class} packet={t_ver}') + ] + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT) as mock_pjlink: + pjlink.pjlink_class = 1 # Set class to 1 and call with cmd class = 2 + mock_command.return_value = None + + # WHEN: get_data called with ignore_class=True + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff, ignore_class=True) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['receive_data_signal'].assert_called_once() + mock_command.assert_called_with(pjlink, t_cmd, t_data) + + +def test_s_data_ok(pjlink, caplog): + """ + Test projector returns "OK" + """ + t_cmd = "INPT" + t_ver = "1" + t_data = "OK" + t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}" + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) OK returned - resending command') + ] + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT, + send_command=DEFAULT) as mock_pjlink: + mock_command.return_value = S_DATA_OK + + # WHEN: get_data called with OK + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['receive_data_signal'].assert_called_once() + mock_pjlink['_trash_buffer'].assert_not_called() + mock_pjlink['send_command'].assert_called_with(cmd=t_cmd, priority=True) + mock_command.assert_called_with(pjlink, t_cmd, t_data) + + +def test_s_connect(pjlink, caplog): + """ + Test projector connect with no authenticate + """ + t_cmd = "PJLINK" + t_ver = "1" + t_data = "0" + t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}" + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Connecting normal') + ] + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT, + send_command=DEFAULT, + change_status=DEFAULT, + readyRead=DEFAULT, + get_socket=DEFAULT) as mock_pjlink: + mock_command.return_value = S_CONNECT + + # WHEN: get_data called with OK + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['receive_data_signal'].assert_called_once() + mock_pjlink['_trash_buffer'].assert_not_called() + mock_pjlink['send_command'].assert_called_with(cmd='CLSS', priority=True) + mock_pjlink['change_status'].assert_called_with(S_CONNECTED) + mock_pjlink['readyRead'].connect.assert_called_once_with(mock_pjlink['get_socket']) + mock_command.assert_called_with(pjlink, t_cmd, t_data) + + +def test_s_authenticate(pjlink, caplog): + """ + Test projector connect with no authenticate + """ + ''' + Projector sends "PJLINK 1 " + Combine salt+pin + Send first command "" + Projector will either: + - Not reply (socket timeout) indicating invalid hash + - Reply with command reply + Example in documentation: + - Send "" + - Received "PJLINK 1 498e4a67" + - Prefix = "%" + - Salt = "498e4a67" + - Pin = "JBMIAProjectorLink" + - Hash = "5d8409bc1c3fa39749434aa3a5c38682" + - Send "5d8409bc1c3fa39749434aa3a5c38682%1CLSS?" + - Reply "%1CLSS=1" + ''' + # GIVEN: Initial setup + # t_pin = "JBMIAProjectorLink" + # t_salt = "498e4a67" + # t_hash = "5d8409bc1c3fa39749434aa3a5c38682" + t_salt = '498e4a67' + t_hash = '5d8409bc1c3fa39749434aa3a5c38682' + t_pin = "JBMIAProjectorLink" + t_cmd = "PJLINK" + t_ver = "1" + t_data = f"1 {t_salt}" + t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}" + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Connecting with pin'), + (f'{test_qmd5}.__init__', logging.DEBUG, + f'qmd5_hash(salt="b\'{t_salt}\'"'), + (f'{test_qmd5}.__init__', logging.DEBUG, + f'qmd5_hash() returning "b\'{t_hash}\'"') + ] + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT, + send_command=DEFAULT, + change_status=DEFAULT, + readyRead=DEFAULT, + get_socket=DEFAULT) as mock_pjlink: + mock_command.return_value = S_AUTHENTICATE + pjlink.pin = t_pin + + # WHEN: get_data called with OK + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['receive_data_signal'].assert_called_once() + mock_pjlink['_trash_buffer'].assert_not_called() + mock_pjlink['send_command'].assert_called_with(cmd='CLSS', salt=t_hash, priority=True) + mock_pjlink['change_status'].assert_called_with(S_CONNECTED) + mock_pjlink['readyRead'].connect.assert_called_once_with(mock_pjlink['get_socket']) + mock_command.assert_called_with(pjlink, t_cmd, t_data) + + +def test_e_authenticate(pjlink, caplog): + """ + Test projector connect with invalid pin + """ + t_salt = '498e4a67' + t_cmd = "PJLINK" + t_ver = "1" + t_data = f"1 {t_salt}" + t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}" + logs = [(f"{test_module}", logging.DEBUG, + f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) Setting ignore_class to "False"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.DEBUG, + f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'), + (f'{test_module}', logging.WARNING, + f'({pjlink.entry.name}) Failed authentication - disconnecting') + ] + with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \ + patch.multiple(pjlink, + _trash_buffer=DEFAULT, + receive_data_signal=DEFAULT, + disconnect_from_host=DEFAULT, + change_status=DEFAULT, + projectorAuthentication=DEFAULT) as mock_pjlink: + mock_command.return_value = E_AUTHENTICATION + + # WHEN: get_data called with OK + caplog.set_level(logging.DEBUG) + t_check = pjlink.get_data(buff=t_buff) + + # THEN: t_check should be None and log entry made + assert t_check is None, "Invalid return code" + assert caplog.record_tuples == logs, "Invalid log entries" + mock_pjlink['receive_data_signal'].assert_called_once() + mock_pjlink['_trash_buffer'].assert_not_called() + mock_pjlink['change_status'].assert_called_with(status=E_AUTHENTICATION) + mock_pjlink['projectorAuthentication'].emit.assert_called_with(pjlink.entry.name) + mock_command.assert_called_with(pjlink, t_cmd, t_data) diff --git a/tests/openlp_core/projectors/test_projector_pjlink_cmd_routing.py b/tests/openlp_core/projectors/test_projector_pjlink_cmd_routing.py deleted file mode 100644 index 28f60724f..000000000 --- a/tests/openlp_core/projectors/test_projector_pjlink_cmd_routing.py +++ /dev/null @@ -1,115 +0,0 @@ -# -*- coding: utf-8 -*- - -########################################################################## -# OpenLP - Open Source Lyrics Projection # -# ---------------------------------------------------------------------- # -# Copyright (c) 2008-2021 OpenLP Developers # -# ---------------------------------------------------------------------- # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -########################################################################## -""" -Package to test the openlp.core.projectors.pjlink command routing. -""" -from unittest.mock import call, patch - -import openlp.core.projectors.pjlink -from openlp.core.projectors.constants import PJLINK_PREFIX - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'process_command') -@patch.object(openlp.core.projectors.pjlink, 'log') -def test_projector_get_data_invalid_version(mock_log, mock_process_cmd, pjlink): - """ - Test projector received valid command invalid version - """ - # GIVEN: Test object - log_warning_text = [call('({ip}) _send_command(): Nothing to send - returning'.format(ip=pjlink.name)), - call('({ip}) get_data() Command reply version does not match ' - 'a valid command version'.format(ip=pjlink.name))] - log_debug_text = [call('({ip}) get_data(buffer="{pre}XCLSS=X"'.format(ip=pjlink.name, pre=PJLINK_PREFIX)), - call('({ip}) get_data(): Checking new data "{pre}XCLSS=X"'.format(ip=pjlink.name, - pre=PJLINK_PREFIX)), - call('({ip}) get_data() header="{pre}XCLSS" data="X"'.format(ip=pjlink.name, - pre=PJLINK_PREFIX)), - call('({ip}) get_data() version="X" cmd="CLSS" data="X"'.format(ip=pjlink.name)), - call('({ip}) Cleaning buffer - msg = "get_data() Command reply version does ' - 'not match a valid command version"'.format(ip=pjlink.name)), - call('({ip}) Finished cleaning buffer - 0 bytes dropped'.format(ip=pjlink.name))] - # WHEN: get_data called with an unknown command - pjlink.get_data(buff='{prefix}XCLSS=X'.format(prefix=PJLINK_PREFIX)) - - # THEN: Appropriate log entries should have been made and methods called/not called - mock_log.warning.assert_has_calls(log_warning_text) - mock_log.debug.assert_has_calls(log_debug_text) - assert (mock_process_cmd.call_count == 0), 'process_command should not have been called' - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'process_command') -@patch.object(openlp.core.projectors.pjlink, 'log') -def test_projector_get_data_unknown_command(mock_log, mock_process_cmd, pjlink): - """ - Test projector receiving invalid command - """ - # GIVEN: Test object - log_warning_text = [call('({ip}) _send_command(): Nothing to send - ' - 'returning'.format(ip=pjlink.name)), - call('({ip}) get_data(): Invalid packet - ' - 'unknown command "UNKN"'.format(ip=pjlink.name))] - log_debug_text = [call('({ip}) get_data(buffer="{pre}1UNKN=Huh?"'.format(ip=pjlink.name, - pre=PJLINK_PREFIX)), - call('({ip}) get_data(): Checking new data "{pre}1UNKN=Huh?"'.format(ip=pjlink.name, - pre=PJLINK_PREFIX)), - call('({ip}) get_data() header="{pre}1UNKN" data="Huh?"'.format(ip=pjlink.name, - pre=PJLINK_PREFIX)), - call('({ip}) get_data() version="1" cmd="UNKN" data="Huh?"'.format(ip=pjlink.name)), - call('({ip}) Cleaning buffer - msg = "get_data(): Invalid packet - ' - 'unknown command "UNKN""'.format(ip=pjlink.name)), - call('({ip}) Finished cleaning buffer - 0 bytes dropped'.format(ip=pjlink.name))] - - # WHEN: get_data called with an unknown command - pjlink.get_data(buff='{prefix}1UNKN=Huh?'.format(prefix=PJLINK_PREFIX)) - - # THEN: Appropriate log entries should have been made and methods called/not called - mock_log.warning.assert_has_calls(log_warning_text) - mock_log.debug.assert_has_calls(log_debug_text) - assert (mock_process_cmd.call_count == 0), 'process_command should not have been called' - - -@patch.object(openlp.core.projectors.pjlinkcommands, 'process_command') -@patch.object(openlp.core.projectors.pjlink, 'log') -def test_projector_get_data_version_mismatch(mock_log, mock_process_cmd, pjlink): - """ - Test projector received valid command with command version higher than projector - """ - # GIVEN: Test object - log_warning_text = [call('({ip}) _send_command(): Nothing to send - returning'.format(ip=pjlink.name)), - call('({ip}) get_data(): Projector returned class reply higher than projector ' - 'stated class'.format(ip=pjlink.name))] - - log_debug_text = [call('({ip}) get_data(buffer="{pre}2ACKN=X"'.format(ip=pjlink.name, - pre=PJLINK_PREFIX)), - call('({ip}) get_data(): Checking new data "{pre}2ACKN=X"'.format(ip=pjlink.name, - pre=PJLINK_PREFIX)), - call('({ip}) get_data() header="{pre}2ACKN" data="X"'.format(ip=pjlink.name, - pre=PJLINK_PREFIX)), - call('({ip}) get_data() version="2" cmd="ACKN" data="X"'.format(ip=pjlink.name))] - pjlink.pjlink_class = '1' - - # WHEN: get_data called with an unknown command - pjlink.get_data(buff='{prefix}2ACKN=X'.format(prefix=PJLINK_PREFIX)) - - # THEN: Appropriate log entries should have been made and methods called/not called - mock_log.warning.assert_has_calls(log_warning_text) - mock_log.debug.assert_has_calls(log_debug_text) - assert (mock_process_cmd.call_count == 0), 'process_command should not have been called'