diff --git a/openlp/core/projectors/manager.py b/openlp/core/projectors/manager.py index 04ece3c9e..6227490a2 100644 --- a/openlp/core/projectors/manager.py +++ b/openlp/core/projectors/manager.py @@ -444,7 +444,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM :param item: Optional ProjectorItem() instance in case of direct call :param opt: (Deprecated) """ - if item is not None: + if hasattr(item, 'pjlink'): return item.pjlink.set_shutter_closed() for list_item in self.projector_list_widget.selectedItems(): list_item.data(QtCore.Qt.UserRole).pjlink.set_shutter_closed() @@ -470,7 +470,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM :param item: (Optional) ProjectorItem() for direct call :param opt: (Deprecated) """ - if item is not None: + if hasattr(item, 'pjlink'): return item.pjlink.connect_to_host() else: for list_item in self.projector_list_widget.selectedItems(): @@ -547,7 +547,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM :param item: (Optional) ProjectorItem() for direct call :param opt: (Deprecated) """ - if item is not None: + if hasattr(item, 'pjlink'): return item.pjlink.disconnect_from_host() else: for list_item in self.projector_list_widget.selectedItems(): @@ -575,7 +575,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM :param item: (Optional) ProjectorItem() for direct call :param opt: (Deprecated) """ - if item is not None: + if hasattr(item, 'pjlink'): return item.pjlink.set_power_off() else: for list_item in self.projector_list_widget.selectedItems(): @@ -588,7 +588,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM :param item: (Optional) ProjectorItem() for direct call :param opt: (Deprecated) """ - if item is not None: + if hasattr(item, 'pjlink'): return item.pjlink.set_power_on() else: for list_item in self.projector_list_widget.selectedItems(): @@ -601,7 +601,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM :param item: (Optional) ProjectorItem() for direct call :param opt: (Deprecated) """ - if item is not None: + if hasattr(item, 'pjlink'): return item.pjlink.set_shutter_open() else: for list_item in self.projector_list_widget.selectedItems(): diff --git a/openlp/core/projectors/pjlink.py b/openlp/core/projectors/pjlink.py index 779722194..10029437e 100644 --- a/openlp/core/projectors/pjlink.py +++ b/openlp/core/projectors/pjlink.py @@ -431,36 +431,39 @@ class PJLink(QtNetwork.QTcpSocket): log.debug(f'({self.entry.name}) check_login(data="{data}")') if data is None: # Reconnected setup? - if not self.waitForReadyRead(2000): + log.debug(f'({self.name}) check_login() Waiting for readyRead()') + _chk = self.waitForReadyRead(2000) # 2 seconds should be plenty of time + if not _chk: # Possible timeout issue log.error(f'({self.entry.name}) Socket timeout waiting for login') self.change_status(E_SOCKET_TIMEOUT) return - read = self.readLine(self.max_size) - self.readLine(self.max_size) # Clean out any trailing whitespace - if read is None: + log.debug(f'({self.name}) check_login() Checking for data') + data = self.readLine(self.max_size) + if data is None: log.warning(f'({self.entry.name}) read is None - socket error?') return - elif len(read) < 8: + elif len(data) < 8: log.warning(f'({self.entry.name}) Not enough data read - skipping') return - data = decode(read, 'utf-8') # Possibility of extraneous data on input when reading. # Clean out extraneous characters in buffer. - self.read(1024) + _ = self.read(1024) log.debug(f'({self.entry.name}) check_login() read "{data.strip()}"') # At this point, we should only have the initial login prompt with # possible authentication # PJLink initial login will be: # 'PJLink 0' - Unauthenticated login - no extra steps required. # 'PJLink 1 XXXXXX' Authenticated login - extra processing required. - if not data.startswith('PJLINK'): + if type(data) is bytes: + data = decode(data, 'utf-8') + if not data.upper().startswith('PJLINK'): # Invalid initial packet - close socket log.error(f'({self.entry.name}) Invalid initial packet received - closing socket') return self.disconnect_from_host() # Convert the initial login prompt with the expected PJLink normal command format for processing log.debug(f'({self.entry.name}) check_login(): Formatting initial connection prompt to PJLink packet') - return self.get_data(f'{PJLINK_PREFIX}1{data.replace(" ", "=", 1).encode("utf-8")}') + return self.get_data(f'{PJLINK_PREFIX}1{data.replace(" ", "=", 1)}'.strip()) def _trash_buffer(self, msg=None): """ @@ -533,8 +536,11 @@ class PJLink(QtNetwork.QTcpSocket): ignore_class = False if 'ignore_class' not in kwargs else kwargs['ignore_class'] log.debug(f'({self.entry.name}) Setting ignore_class to "{ignore_class}"') # NOTE: Class2 has changed to some values being UTF-8 - data_in = decode(buff, 'utf-8') if isinstance(buff, bytes) else buff - data = data_in.strip() + if type(buff) is bytes: + data = decode(buff, 'utf-8') + else: + data = buff + data = data.strip() self.receive_data_signal() # Initial packet checks if len(data) < 7: diff --git a/tests/openlp_core/projectors/pjlink/test_check_login.py b/tests/openlp_core/projectors/pjlink/test_check_login.py new file mode 100644 index 000000000..20a095964 --- /dev/null +++ b/tests/openlp_core/projectors/pjlink/test_check_login.py @@ -0,0 +1,272 @@ +# -*- coding: utf-8 -*- + +########################################################################## +# OpenLP - Open Source Lyrics Projection # +# ---------------------------------------------------------------------- # +# Copyright (c) 2008-2022 OpenLP Developers # +# ---------------------------------------------------------------------- # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +########################################################################## +""" +Test PJLink.check_login +""" + +import logging + +import openlp.core.projectors.pjlink + +from unittest.mock import DEFAULT, patch +from openlp.core.projectors.constants import PJLINK_PREFIX, E_SOCKET_TIMEOUT +from tests.resources.projector.data import TEST_SALT +test_module = openlp.core.projectors.pjlink.__name__ + + +def test_socket_timeout(pjlink, caplog): + """ + Test return when socket timeout + """ + # GIVEN: Test setup + t_data = None + t_readLine = None + + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'), + (test_module, logging.ERROR, f'({pjlink.name}) Socket timeout waiting for login'), + ] + + with patch.multiple(pjlink, + waitForReadyRead=DEFAULT, + get_data=DEFAULT, + change_status=DEFAULT, + readLine=DEFAULT, + read=DEFAULT, + disconnect_from_host=DEFAULT, + write=DEFAULT) as mock_pjlink: + mock_pjlink['waitForReadyRead'].return_value = False + mock_pjlink['readLine'].return_value = t_readLine + + # WHEN: Called with no data + caplog.clear() + pjlink.check_login(data=None) + + # THEN: Log entries and get_data not called + assert caplog.record_tuples == logs, 'Invalid log entires' + mock_pjlink['change_status'].assert_called_with(E_SOCKET_TIMEOUT) + mock_pjlink['get_data'].assert_not_called() + mock_pjlink['read'].assert_not_called() + mock_pjlink['disconnect_from_host'].assert_not_called() + + +def test_readLine_no_data(pjlink, caplog): + """ + Test return when readLine data is None + """ + # GIVEN: Test setup + t_data = None + t_readLine = None + + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'), + (test_module, logging.WARNING, f'({pjlink.name}) read is None - socket error?'), + ] + + with patch.multiple(pjlink, + waitForReadyRead=DEFAULT, + get_data=DEFAULT, + change_status=DEFAULT, + readLine=DEFAULT, + read=DEFAULT, + disconnect_from_host=DEFAULT, + write=DEFAULT) as mock_pjlink: + mock_pjlink['waitForReadyRead'].return_value = True + mock_pjlink['readLine'].return_value = t_readLine + + # WHEN: Called with no data + caplog.clear() + pjlink.check_login(data=None) + + # THEN: Log entries and get_data not called + assert caplog.record_tuples == logs, 'Invalid log entires' + mock_pjlink['change_status'].assert_not_called() + mock_pjlink['get_data'].assert_not_called() + mock_pjlink['read'].assert_not_called() + mock_pjlink['disconnect_from_host'].assert_not_called() + + +def test_readLine_short_data(pjlink, caplog): + """ + Test return when readLine data < minimum packet size + """ + # GIVEN: Test setup + t_data = None + t_readLine = 'PJLink' + + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'), + (test_module, logging.WARNING, f'({pjlink.name}) Not enough data read - skipping'), + ] + + with patch.multiple(pjlink, + waitForReadyRead=DEFAULT, + get_data=DEFAULT, + change_status=DEFAULT, + readLine=DEFAULT, + read=DEFAULT, + disconnect_from_host=DEFAULT, + write=DEFAULT) as mock_pjlink: + mock_pjlink['waitForReadyRead'].return_value = True + mock_pjlink['readLine'].return_value = t_readLine + + # WHEN: Called with no data + caplog.clear() + pjlink.check_login(data=None) + + # THEN: Log entries and get_data not called + assert caplog.record_tuples == logs, 'Invalid log entires' + mock_pjlink['change_status'].assert_not_called() + mock_pjlink['get_data'].assert_not_called() + mock_pjlink['read'].assert_not_called() + mock_pjlink['disconnect_from_host'].assert_not_called() + + +def test_readLine_invalid_data(pjlink, caplog): + """ + Test return when readLine data < minimum packet size + """ + # GIVEN: Test setup + t_data = None + t_readLine = 'SOMETHING 0' + + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() read "{t_readLine}"'), + (test_module, logging.ERROR, f'({pjlink.name}) Invalid initial packet received - closing socket') + ] + + with patch.multiple(pjlink, + waitForReadyRead=DEFAULT, + get_data=DEFAULT, + change_status=DEFAULT, + readLine=DEFAULT, + read=DEFAULT, + disconnect_from_host=DEFAULT, + write=DEFAULT) as mock_pjlink: + mock_pjlink['waitForReadyRead'].return_value = True + mock_pjlink['readLine'].return_value = t_readLine + + # WHEN: Called with no data + caplog.clear() + pjlink.check_login(data=None) + + print(caplog.record_tuples) + + # THEN: Log entries and get_data not called + assert caplog.record_tuples == logs, 'Invalid log entires' + mock_pjlink['change_status'].assert_not_called() + mock_pjlink['get_data'].assert_not_called() + mock_pjlink['read'].assert_called_once() + mock_pjlink['disconnect_from_host'].assert_called_once() + + +def test_readLine_no_authentication(pjlink, caplog): + """ + Test return when readLine data < minimum packet size + """ + # GIVEN: Test setup + t_data = None + t_readLine = 'PJLink 0' + t_return = f'{PJLINK_PREFIX}1PJLink=0' + + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() read "{t_readLine}"'), + (test_module, logging.DEBUG, + f'({pjlink.name}) check_login(): Formatting initial connection prompt to PJLink packet') + ] + + with patch.multiple(pjlink, + waitForReadyRead=DEFAULT, + get_data=DEFAULT, + change_status=DEFAULT, + readLine=DEFAULT, + read=DEFAULT, + disconnect_from_host=DEFAULT, + write=DEFAULT) as mock_pjlink: + mock_pjlink['waitForReadyRead'].return_value = True + mock_pjlink['readLine'].return_value = t_readLine + + # WHEN: Called with no data + caplog.clear() + pjlink.check_login(data=None) + + print(caplog.record_tuples) + + # THEN: Log entries and get_data not called + assert caplog.record_tuples == logs, 'Invalid log entires' + mock_pjlink['change_status'].assert_not_called() + mock_pjlink['read'].assert_called_once() + mock_pjlink['disconnect_from_host'].assert_not_called() + mock_pjlink['get_data'].assert_called_with(t_return) + + +def test_readLine_with_authentication(pjlink, caplog): + """ + Test return when readLine data < minimum packet size + """ + # GIVEN: Test setup + t_data = None + t_readLine = f'PJLink 1 {TEST_SALT}' + t_return = f'{PJLINK_PREFIX}1PJLink=1 {TEST_SALT}' + + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'), + (test_module, logging.DEBUG, f'({pjlink.name}) check_login() read "{t_readLine}"'), + (test_module, logging.DEBUG, + f'({pjlink.name}) check_login(): Formatting initial connection prompt to PJLink packet') + ] + + with patch.multiple(pjlink, + waitForReadyRead=DEFAULT, + get_data=DEFAULT, + change_status=DEFAULT, + readLine=DEFAULT, + read=DEFAULT, + disconnect_from_host=DEFAULT, + write=DEFAULT) as mock_pjlink: + mock_pjlink['waitForReadyRead'].return_value = True + mock_pjlink['readLine'].return_value = t_readLine + + # WHEN: Called with no data + caplog.clear() + pjlink.check_login(data=None) + + print(caplog.record_tuples) + + # THEN: Log entries and get_data not called + assert caplog.record_tuples == logs, 'Invalid log entires' + mock_pjlink['change_status'].assert_not_called() + mock_pjlink['read'].assert_called_once() + mock_pjlink['disconnect_from_host'].assert_not_called() + mock_pjlink['get_data'].assert_called_with(t_return)