forked from openlp/openlp
Merge branch 'pjlink_issue_1033' into 'master'
Fix issue #1033 Closes #1033 See merge request openlp/openlp!443
This commit is contained in:
commit
3347e651aa
@ -444,7 +444,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
:param item: Optional ProjectorItem() instance in case of direct call
|
||||
:param opt: (Deprecated)
|
||||
"""
|
||||
if item is not None:
|
||||
if hasattr(item, 'pjlink'):
|
||||
return item.pjlink.set_shutter_closed()
|
||||
for list_item in self.projector_list_widget.selectedItems():
|
||||
list_item.data(QtCore.Qt.UserRole).pjlink.set_shutter_closed()
|
||||
@ -470,7 +470,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
:param item: (Optional) ProjectorItem() for direct call
|
||||
:param opt: (Deprecated)
|
||||
"""
|
||||
if item is not None:
|
||||
if hasattr(item, 'pjlink'):
|
||||
return item.pjlink.connect_to_host()
|
||||
else:
|
||||
for list_item in self.projector_list_widget.selectedItems():
|
||||
@ -547,7 +547,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
:param item: (Optional) ProjectorItem() for direct call
|
||||
:param opt: (Deprecated)
|
||||
"""
|
||||
if item is not None:
|
||||
if hasattr(item, 'pjlink'):
|
||||
return item.pjlink.disconnect_from_host()
|
||||
else:
|
||||
for list_item in self.projector_list_widget.selectedItems():
|
||||
@ -575,7 +575,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
:param item: (Optional) ProjectorItem() for direct call
|
||||
:param opt: (Deprecated)
|
||||
"""
|
||||
if item is not None:
|
||||
if hasattr(item, 'pjlink'):
|
||||
return item.pjlink.set_power_off()
|
||||
else:
|
||||
for list_item in self.projector_list_widget.selectedItems():
|
||||
@ -588,7 +588,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
:param item: (Optional) ProjectorItem() for direct call
|
||||
:param opt: (Deprecated)
|
||||
"""
|
||||
if item is not None:
|
||||
if hasattr(item, 'pjlink'):
|
||||
return item.pjlink.set_power_on()
|
||||
else:
|
||||
for list_item in self.projector_list_widget.selectedItems():
|
||||
@ -601,7 +601,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
:param item: (Optional) ProjectorItem() for direct call
|
||||
:param opt: (Deprecated)
|
||||
"""
|
||||
if item is not None:
|
||||
if hasattr(item, 'pjlink'):
|
||||
return item.pjlink.set_shutter_open()
|
||||
else:
|
||||
for list_item in self.projector_list_widget.selectedItems():
|
||||
|
@ -431,36 +431,39 @@ class PJLink(QtNetwork.QTcpSocket):
|
||||
log.debug(f'({self.entry.name}) check_login(data="{data}")')
|
||||
if data is None:
|
||||
# Reconnected setup?
|
||||
if not self.waitForReadyRead(2000):
|
||||
log.debug(f'({self.name}) check_login() Waiting for readyRead()')
|
||||
_chk = self.waitForReadyRead(2000) # 2 seconds should be plenty of time
|
||||
if not _chk:
|
||||
# Possible timeout issue
|
||||
log.error(f'({self.entry.name}) Socket timeout waiting for login')
|
||||
self.change_status(E_SOCKET_TIMEOUT)
|
||||
return
|
||||
read = self.readLine(self.max_size)
|
||||
self.readLine(self.max_size) # Clean out any trailing whitespace
|
||||
if read is None:
|
||||
log.debug(f'({self.name}) check_login() Checking for data')
|
||||
data = self.readLine(self.max_size)
|
||||
if data is None:
|
||||
log.warning(f'({self.entry.name}) read is None - socket error?')
|
||||
return
|
||||
elif len(read) < 8:
|
||||
elif len(data) < 8:
|
||||
log.warning(f'({self.entry.name}) Not enough data read - skipping')
|
||||
return
|
||||
data = decode(read, 'utf-8')
|
||||
# Possibility of extraneous data on input when reading.
|
||||
# Clean out extraneous characters in buffer.
|
||||
self.read(1024)
|
||||
_ = self.read(1024)
|
||||
log.debug(f'({self.entry.name}) check_login() read "{data.strip()}"')
|
||||
# At this point, we should only have the initial login prompt with
|
||||
# possible authentication
|
||||
# PJLink initial login will be:
|
||||
# 'PJLink 0' - Unauthenticated login - no extra steps required.
|
||||
# 'PJLink 1 XXXXXX' Authenticated login - extra processing required.
|
||||
if not data.startswith('PJLINK'):
|
||||
if type(data) is bytes:
|
||||
data = decode(data, 'utf-8')
|
||||
if not data.upper().startswith('PJLINK'):
|
||||
# Invalid initial packet - close socket
|
||||
log.error(f'({self.entry.name}) Invalid initial packet received - closing socket')
|
||||
return self.disconnect_from_host()
|
||||
# Convert the initial login prompt with the expected PJLink normal command format for processing
|
||||
log.debug(f'({self.entry.name}) check_login(): Formatting initial connection prompt to PJLink packet')
|
||||
return self.get_data(f'{PJLINK_PREFIX}1{data.replace(" ", "=", 1).encode("utf-8")}')
|
||||
return self.get_data(f'{PJLINK_PREFIX}1{data.replace(" ", "=", 1)}'.strip())
|
||||
|
||||
def _trash_buffer(self, msg=None):
|
||||
"""
|
||||
@ -533,8 +536,11 @@ class PJLink(QtNetwork.QTcpSocket):
|
||||
ignore_class = False if 'ignore_class' not in kwargs else kwargs['ignore_class']
|
||||
log.debug(f'({self.entry.name}) Setting ignore_class to "{ignore_class}"')
|
||||
# NOTE: Class2 has changed to some values being UTF-8
|
||||
data_in = decode(buff, 'utf-8') if isinstance(buff, bytes) else buff
|
||||
data = data_in.strip()
|
||||
if type(buff) is bytes:
|
||||
data = decode(buff, 'utf-8')
|
||||
else:
|
||||
data = buff
|
||||
data = data.strip()
|
||||
self.receive_data_signal()
|
||||
# Initial packet checks
|
||||
if len(data) < 7:
|
||||
|
272
tests/openlp_core/projectors/pjlink/test_check_login.py
Normal file
272
tests/openlp_core/projectors/pjlink/test_check_login.py
Normal file
@ -0,0 +1,272 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
##########################################################################
|
||||
# OpenLP - Open Source Lyrics Projection #
|
||||
# ---------------------------------------------------------------------- #
|
||||
# Copyright (c) 2008-2022 OpenLP Developers #
|
||||
# ---------------------------------------------------------------------- #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
##########################################################################
|
||||
"""
|
||||
Test PJLink.check_login
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import openlp.core.projectors.pjlink
|
||||
|
||||
from unittest.mock import DEFAULT, patch
|
||||
from openlp.core.projectors.constants import PJLINK_PREFIX, E_SOCKET_TIMEOUT
|
||||
from tests.resources.projector.data import TEST_SALT
|
||||
test_module = openlp.core.projectors.pjlink.__name__
|
||||
|
||||
|
||||
def test_socket_timeout(pjlink, caplog):
|
||||
"""
|
||||
Test return when socket timeout
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
t_data = None
|
||||
t_readLine = None
|
||||
|
||||
caplog.set_level(logging.DEBUG)
|
||||
logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'),
|
||||
(test_module, logging.ERROR, f'({pjlink.name}) Socket timeout waiting for login'),
|
||||
]
|
||||
|
||||
with patch.multiple(pjlink,
|
||||
waitForReadyRead=DEFAULT,
|
||||
get_data=DEFAULT,
|
||||
change_status=DEFAULT,
|
||||
readLine=DEFAULT,
|
||||
read=DEFAULT,
|
||||
disconnect_from_host=DEFAULT,
|
||||
write=DEFAULT) as mock_pjlink:
|
||||
mock_pjlink['waitForReadyRead'].return_value = False
|
||||
mock_pjlink['readLine'].return_value = t_readLine
|
||||
|
||||
# WHEN: Called with no data
|
||||
caplog.clear()
|
||||
pjlink.check_login(data=None)
|
||||
|
||||
# THEN: Log entries and get_data not called
|
||||
assert caplog.record_tuples == logs, 'Invalid log entires'
|
||||
mock_pjlink['change_status'].assert_called_with(E_SOCKET_TIMEOUT)
|
||||
mock_pjlink['get_data'].assert_not_called()
|
||||
mock_pjlink['read'].assert_not_called()
|
||||
mock_pjlink['disconnect_from_host'].assert_not_called()
|
||||
|
||||
|
||||
def test_readLine_no_data(pjlink, caplog):
|
||||
"""
|
||||
Test return when readLine data is None
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
t_data = None
|
||||
t_readLine = None
|
||||
|
||||
caplog.set_level(logging.DEBUG)
|
||||
logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'),
|
||||
(test_module, logging.WARNING, f'({pjlink.name}) read is None - socket error?'),
|
||||
]
|
||||
|
||||
with patch.multiple(pjlink,
|
||||
waitForReadyRead=DEFAULT,
|
||||
get_data=DEFAULT,
|
||||
change_status=DEFAULT,
|
||||
readLine=DEFAULT,
|
||||
read=DEFAULT,
|
||||
disconnect_from_host=DEFAULT,
|
||||
write=DEFAULT) as mock_pjlink:
|
||||
mock_pjlink['waitForReadyRead'].return_value = True
|
||||
mock_pjlink['readLine'].return_value = t_readLine
|
||||
|
||||
# WHEN: Called with no data
|
||||
caplog.clear()
|
||||
pjlink.check_login(data=None)
|
||||
|
||||
# THEN: Log entries and get_data not called
|
||||
assert caplog.record_tuples == logs, 'Invalid log entires'
|
||||
mock_pjlink['change_status'].assert_not_called()
|
||||
mock_pjlink['get_data'].assert_not_called()
|
||||
mock_pjlink['read'].assert_not_called()
|
||||
mock_pjlink['disconnect_from_host'].assert_not_called()
|
||||
|
||||
|
||||
def test_readLine_short_data(pjlink, caplog):
|
||||
"""
|
||||
Test return when readLine data < minimum packet size
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
t_data = None
|
||||
t_readLine = 'PJLink'
|
||||
|
||||
caplog.set_level(logging.DEBUG)
|
||||
logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'),
|
||||
(test_module, logging.WARNING, f'({pjlink.name}) Not enough data read - skipping'),
|
||||
]
|
||||
|
||||
with patch.multiple(pjlink,
|
||||
waitForReadyRead=DEFAULT,
|
||||
get_data=DEFAULT,
|
||||
change_status=DEFAULT,
|
||||
readLine=DEFAULT,
|
||||
read=DEFAULT,
|
||||
disconnect_from_host=DEFAULT,
|
||||
write=DEFAULT) as mock_pjlink:
|
||||
mock_pjlink['waitForReadyRead'].return_value = True
|
||||
mock_pjlink['readLine'].return_value = t_readLine
|
||||
|
||||
# WHEN: Called with no data
|
||||
caplog.clear()
|
||||
pjlink.check_login(data=None)
|
||||
|
||||
# THEN: Log entries and get_data not called
|
||||
assert caplog.record_tuples == logs, 'Invalid log entires'
|
||||
mock_pjlink['change_status'].assert_not_called()
|
||||
mock_pjlink['get_data'].assert_not_called()
|
||||
mock_pjlink['read'].assert_not_called()
|
||||
mock_pjlink['disconnect_from_host'].assert_not_called()
|
||||
|
||||
|
||||
def test_readLine_invalid_data(pjlink, caplog):
|
||||
"""
|
||||
Test return when readLine data < minimum packet size
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
t_data = None
|
||||
t_readLine = 'SOMETHING 0'
|
||||
|
||||
caplog.set_level(logging.DEBUG)
|
||||
logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() read "{t_readLine}"'),
|
||||
(test_module, logging.ERROR, f'({pjlink.name}) Invalid initial packet received - closing socket')
|
||||
]
|
||||
|
||||
with patch.multiple(pjlink,
|
||||
waitForReadyRead=DEFAULT,
|
||||
get_data=DEFAULT,
|
||||
change_status=DEFAULT,
|
||||
readLine=DEFAULT,
|
||||
read=DEFAULT,
|
||||
disconnect_from_host=DEFAULT,
|
||||
write=DEFAULT) as mock_pjlink:
|
||||
mock_pjlink['waitForReadyRead'].return_value = True
|
||||
mock_pjlink['readLine'].return_value = t_readLine
|
||||
|
||||
# WHEN: Called with no data
|
||||
caplog.clear()
|
||||
pjlink.check_login(data=None)
|
||||
|
||||
print(caplog.record_tuples)
|
||||
|
||||
# THEN: Log entries and get_data not called
|
||||
assert caplog.record_tuples == logs, 'Invalid log entires'
|
||||
mock_pjlink['change_status'].assert_not_called()
|
||||
mock_pjlink['get_data'].assert_not_called()
|
||||
mock_pjlink['read'].assert_called_once()
|
||||
mock_pjlink['disconnect_from_host'].assert_called_once()
|
||||
|
||||
|
||||
def test_readLine_no_authentication(pjlink, caplog):
|
||||
"""
|
||||
Test return when readLine data < minimum packet size
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
t_data = None
|
||||
t_readLine = 'PJLink 0'
|
||||
t_return = f'{PJLINK_PREFIX}1PJLink=0'
|
||||
|
||||
caplog.set_level(logging.DEBUG)
|
||||
logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() read "{t_readLine}"'),
|
||||
(test_module, logging.DEBUG,
|
||||
f'({pjlink.name}) check_login(): Formatting initial connection prompt to PJLink packet')
|
||||
]
|
||||
|
||||
with patch.multiple(pjlink,
|
||||
waitForReadyRead=DEFAULT,
|
||||
get_data=DEFAULT,
|
||||
change_status=DEFAULT,
|
||||
readLine=DEFAULT,
|
||||
read=DEFAULT,
|
||||
disconnect_from_host=DEFAULT,
|
||||
write=DEFAULT) as mock_pjlink:
|
||||
mock_pjlink['waitForReadyRead'].return_value = True
|
||||
mock_pjlink['readLine'].return_value = t_readLine
|
||||
|
||||
# WHEN: Called with no data
|
||||
caplog.clear()
|
||||
pjlink.check_login(data=None)
|
||||
|
||||
print(caplog.record_tuples)
|
||||
|
||||
# THEN: Log entries and get_data not called
|
||||
assert caplog.record_tuples == logs, 'Invalid log entires'
|
||||
mock_pjlink['change_status'].assert_not_called()
|
||||
mock_pjlink['read'].assert_called_once()
|
||||
mock_pjlink['disconnect_from_host'].assert_not_called()
|
||||
mock_pjlink['get_data'].assert_called_with(t_return)
|
||||
|
||||
|
||||
def test_readLine_with_authentication(pjlink, caplog):
|
||||
"""
|
||||
Test return when readLine data < minimum packet size
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
t_data = None
|
||||
t_readLine = f'PJLink 1 {TEST_SALT}'
|
||||
t_return = f'{PJLINK_PREFIX}1PJLink=1 {TEST_SALT}'
|
||||
|
||||
caplog.set_level(logging.DEBUG)
|
||||
logs = [(test_module, logging.DEBUG, f'({pjlink.name}) check_login(data="{t_data}")'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Waiting for readyRead()'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() Checking for data'),
|
||||
(test_module, logging.DEBUG, f'({pjlink.name}) check_login() read "{t_readLine}"'),
|
||||
(test_module, logging.DEBUG,
|
||||
f'({pjlink.name}) check_login(): Formatting initial connection prompt to PJLink packet')
|
||||
]
|
||||
|
||||
with patch.multiple(pjlink,
|
||||
waitForReadyRead=DEFAULT,
|
||||
get_data=DEFAULT,
|
||||
change_status=DEFAULT,
|
||||
readLine=DEFAULT,
|
||||
read=DEFAULT,
|
||||
disconnect_from_host=DEFAULT,
|
||||
write=DEFAULT) as mock_pjlink:
|
||||
mock_pjlink['waitForReadyRead'].return_value = True
|
||||
mock_pjlink['readLine'].return_value = t_readLine
|
||||
|
||||
# WHEN: Called with no data
|
||||
caplog.clear()
|
||||
pjlink.check_login(data=None)
|
||||
|
||||
print(caplog.record_tuples)
|
||||
|
||||
# THEN: Log entries and get_data not called
|
||||
assert caplog.record_tuples == logs, 'Invalid log entires'
|
||||
mock_pjlink['change_status'].assert_not_called()
|
||||
mock_pjlink['read'].assert_called_once()
|
||||
mock_pjlink['disconnect_from_host'].assert_not_called()
|
||||
mock_pjlink['get_data'].assert_called_with(t_return)
|
Loading…
Reference in New Issue
Block a user