Issue #985 - TypeErr in PJLink.get_data() fix

This commit is contained in:
Ken Roberts 2022-02-05 16:02:48 +00:00 committed by Raoul Snyman
parent 87159e6e28
commit 2dfce1ad32
3 changed files with 530 additions and 134 deletions

View File

@ -545,8 +545,9 @@ class PJLink(QtNetwork.QTcpSocket):
:param buff: Data to process. :param buff: Data to process.
""" """
log.debug('({ip}) get_data(buffer="{buff}"'.format(ip=self.entry.name, buff=buff)) log.debug(f'({self.entry.name}) get_data(buffer="{buff}"')
ignore_class = 'ignore_class' in kwargs ignore_class = False if 'ignore_class' not in kwargs else kwargs['ignore_class']
log.debug(f'({self.entry.name}) Setting ignore_class to "{ignore_class}"')
# NOTE: Class2 has changed to some values being UTF-8 # NOTE: Class2 has changed to some values being UTF-8
data_in = decode(buff, 'utf-8') if isinstance(buff, bytes) else buff data_in = decode(buff, 'utf-8') if isinstance(buff, bytes) else buff
data = data_in.strip() data = data_in.strip()
@ -556,7 +557,7 @@ class PJLink(QtNetwork.QTcpSocket):
self._trash_buffer(msg='get_data(): Invalid packet - length') self._trash_buffer(msg='get_data(): Invalid packet - length')
return return
elif len(data) > self.max_size: elif len(data) > self.max_size:
self._trash_buffer(msg='get_data(): Invalid packet - too long ({length} bytes)'.format(length=len(data))) self._trash_buffer(msg=f'get_data(): Invalid packet - too long ({len(data)} bytes)')
return return
elif not data.startswith(PJLINK_PREFIX): elif not data.startswith(PJLINK_PREFIX):
self._trash_buffer(msg='get_data(): Invalid packet - PJLink prefix missing') self._trash_buffer(msg='get_data(): Invalid packet - PJLink prefix missing')
@ -566,10 +567,9 @@ class PJLink(QtNetwork.QTcpSocket):
# data[8] = initial PJLink connection (after mangling) # data[8] = initial PJLink connection (after mangling)
self._trash_buffer(msg='get_data(): Invalid reply - Does not have "="') self._trash_buffer(msg='get_data(): Invalid reply - Does not have "="')
return return
log.debug('({ip}) get_data(): Checking new data "{data}"'.format(ip=self.entry.name, data=data)) log.debug(f'({self.entry.name}) get_data(): Checking new data "{data}"')
header, data = data.split('=') header, data = data.split('=')
log.debug('({ip}) get_data() header="{header}" data="{data}"'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) get_data() header="{header}" data="{data}"')
header=header, data=data))
# At this point, the header should contain: # At this point, the header should contain:
# "PVCCCC" # "PVCCCC"
# Where: # Where:
@ -577,20 +577,18 @@ class PJLink(QtNetwork.QTcpSocket):
# V = PJLink class or version # V = PJLink class or version
# C = PJLink command # C = PJLink command
version, cmd = header[1], header[2:].upper() version, cmd = header[1], header[2:].upper()
log.debug('({ip}) get_data() version="{version}" cmd="{cmd}" data="{data}"'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) get_data() version="{version}" cmd="{cmd}" data="{data}"')
version=version,
cmd=cmd,
data=data))
if cmd not in PJLINK_VALID_CMD: if cmd not in PJLINK_VALID_CMD:
self._trash_buffer('get_data(): Invalid packet - unknown command "{data}"'.format(data=cmd)) self._trash_buffer(msg=f'get_data(): Invalid packet - unknown command "{cmd}"')
return return
elif version not in PJLINK_VALID_CMD[cmd]['version']: elif version not in PJLINK_VALID_CMD[cmd]['version']:
self._trash_buffer(msg='get_data() Command reply version does not match a valid command version') self._trash_buffer(msg='get_data() Command reply version does not match a valid command version')
return return
elif int(self.pjlink_class) < int(version): elif int(self.pjlink_class) < int(version):
log.warning(f'({self.entry.name}) get_data(): pjlink_class={self.pjlink_class} packet={version}')
if not ignore_class: if not ignore_class:
log.warning('({ip}) get_data(): Projector returned class reply higher ' log.warning(f'({self.entry.name}) get_data(): Projector returned class reply higher '
'than projector stated class'.format(ip=self.entry.name)) 'than projector stated class')
return return
chk = process_command(self, cmd, data) chk = process_command(self, cmd, data)
@ -599,26 +597,27 @@ class PJLink(QtNetwork.QTcpSocket):
return return
# PJLink initial connection checks # PJLink initial connection checks
elif chk == S_DATA_OK: elif chk == S_DATA_OK:
# Previous command returned OK # Previous command returned OK - resend command to retrieve status
log.debug('({ip}) OK returned - resending command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) OK returned - resending command')
self.send_command(cmd=cmd, priority=True) self.send_command(cmd=cmd, priority=True)
elif chk == S_CONNECT: elif chk == S_CONNECT:
# Normal connection # Normal connection
log.debug('({ip}) Connecting normal'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Connecting normal')
self.change_status(S_CONNECTED) self.change_status(S_CONNECTED)
self.send_command(cmd='CLSS', priority=True) self.send_command(cmd='CLSS', priority=True)
self.readyRead.connect(self.get_socket) self.readyRead.connect(self.get_socket)
elif chk == S_AUTHENTICATE: elif chk == S_AUTHENTICATE:
# Connection with pin # Connection with pin
log.debug('({ip}) Connecting with pin'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Connecting with pin')
data_hash = str(qmd5_hash(salt=chk[1].encode('utf-8'), data=self.pin.encode('utf-8')), data_hash = str(qmd5_hash(salt=data.split()[1].encode('utf-8'),
data=self.pin.encode('utf-8')),
encoding='ascii') encoding='ascii')
self.change_status(S_CONNECTED) self.change_status(S_CONNECTED)
self.readyRead.connect(self.get_socket) self.readyRead.connect(self.get_socket)
self.send_command(cmd='CLSS', salt=data_hash, priority=True) self.send_command(cmd='CLSS', salt=data_hash, priority=True)
elif chk == E_AUTHENTICATION: elif chk == E_AUTHENTICATION:
# Projector did not like our pin # Projector did not like our pin
log.warning('({ip}) Failed authentication - disconnecting'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) Failed authentication - disconnecting')
self.disconnect_from_host() self.disconnect_from_host()
self.projectorAuthentication.emit(self.entry.name) self.projectorAuthentication.emit(self.entry.name)
self.change_status(status=E_AUTHENTICATION) self.change_status(status=E_AUTHENTICATION)

View File

@ -0,0 +1,512 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2022 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
Test PJLink.get_data 01
"""
import logging
import openlp.core.common
import openlp.core.projectors.pjlink
from unittest.mock import DEFAULT, patch
from openlp.core.projectors.constants import PJLINK_MAX_PACKET, PJLINK_PREFIX, \
E_AUTHENTICATION, S_AUTHENTICATE, S_CONNECT, S_CONNECTED, S_DATA_OK
test_module = openlp.core.projectors.pjlink.__name__
test_qmd5 = openlp.core.common.__name__
def test_buff_short(pjlink, caplog):
"""
Test method with invalid short buffer
"""
# GIVEN: Initial setup
t_buff = "short"
t_err = 'get_data(): Invalid packet - length'
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"')]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT) as mock_pjlink:
# WHEN: get_data called
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err)
mock_pjlink['receive_data_signal'].assert_called_once()
mock_command.assert_not_called()
def test_buff_long(pjlink, caplog):
"""
Test method with invalid long buffer
"""
# GIVEN: Initial setup
t_buff = "X" * (PJLINK_MAX_PACKET + 10)
t_err = f'get_data(): Invalid packet - too long ({len(t_buff)} bytes)'
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"')]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT) as mock_pjlink:
# WHEN: get_data called
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err)
mock_pjlink['receive_data_signal'].assert_called_once()
mock_command.assert_not_called()
def test_invalid_prefix(pjlink, caplog):
"""
Test method with invalid PJLink prefix character
"""
# GIVEN: Initial setup
t_buff = "#1CLSS=OK"
t_err = 'get_data(): Invalid packet - PJLink prefix missing'
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"')]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT) as mock_pjlink:
# WHEN: get_data called
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err)
mock_pjlink['receive_data_signal'].assert_called_once()
mock_command.assert_not_called()
def test_missing_equal(pjlink, caplog):
"""
Test method with missing command/data separator
"""
# GIVEN: Initial setup
t_buff = f"{PJLINK_PREFIX}1CLSS OK"
t_err = 'get_data(): Invalid reply - Does not have "="'
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"')]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT) as mock_pjlink:
# WHEN: get_data called
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err)
mock_pjlink['receive_data_signal'].assert_called_once()
mock_command.assert_not_called()
def test_invalid_command(pjlink, caplog):
"""
Test method with invalid command
"""
# GIVEN: Initial setup
t_cmd = "CLASSS"
t_ver = "1"
t_data = "1"
t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}"
t_err = f'get_data(): Invalid packet - unknown command "{t_cmd}"'
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"')
]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT) as mock_pjlink:
# WHEN: get_data called
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err)
mock_pjlink['receive_data_signal'].assert_called_once()
mock_command.assert_not_called()
def test_mismatch_command_versions(pjlink, caplog):
"""
Test method with pjlink.pjlink_class < cmd.class
"""
# GIVEN: Initial setup
t_cmd = "CLSS"
t_ver = "2"
t_data = "1"
t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}"
t_err = 'get_data() Command reply version does not match a valid command version'
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"')
]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT) as mock_pjlink:
# WHEN: get_data called
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['receive_data_signal'].assert_called_once()
mock_pjlink['_trash_buffer'].assert_called_once_with(msg=t_err)
mock_command.assert_not_called()
def test_mismatch_class_versions(pjlink, caplog):
"""
Test method with pjlink.pjlink_class < cmd.class
"""
# GIVEN: Initial setup
t_cmd = "FILT"
t_ver = "2"
t_data = "2"
t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}"
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.WARNING,
f'({pjlink.entry.name}) get_data(): pjlink_class={pjlink.pjlink_class} packet={t_ver}'),
(f'{test_module}', logging.WARNING,
f'({pjlink.entry.name}) get_data(): Projector returned class reply higher than projector stated class')
]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT) as mock_pjlink:
pjlink.pjlink_class = 1 # Set class to 1 and call with cmd class = 2
# WHEN: get_data called
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['receive_data_signal'].assert_called_once()
mock_command.assert_not_called()
def test_ignore_class_versions(pjlink, caplog):
"""
Test method with pjlink.pjlink_class < cmd.class
"""
# GIVEN: Initial setup
t_cmd = "FILT"
t_ver = "2"
t_data = "2"
t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}"
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "True"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.WARNING,
f'({pjlink.entry.name}) get_data(): pjlink_class={pjlink.pjlink_class} packet={t_ver}')
]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT) as mock_pjlink:
pjlink.pjlink_class = 1 # Set class to 1 and call with cmd class = 2
mock_command.return_value = None
# WHEN: get_data called with ignore_class=True
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff, ignore_class=True)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['receive_data_signal'].assert_called_once()
mock_command.assert_called_with(pjlink, t_cmd, t_data)
def test_s_data_ok(pjlink, caplog):
"""
Test projector returns "OK"
"""
t_cmd = "INPT"
t_ver = "1"
t_data = "OK"
t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}"
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) OK returned - resending command')
]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT,
send_command=DEFAULT) as mock_pjlink:
mock_command.return_value = S_DATA_OK
# WHEN: get_data called with OK
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['receive_data_signal'].assert_called_once()
mock_pjlink['_trash_buffer'].assert_not_called()
mock_pjlink['send_command'].assert_called_with(cmd=t_cmd, priority=True)
mock_command.assert_called_with(pjlink, t_cmd, t_data)
def test_s_connect(pjlink, caplog):
"""
Test projector connect with no authenticate
"""
t_cmd = "PJLINK"
t_ver = "1"
t_data = "0"
t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}"
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Connecting normal')
]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT,
send_command=DEFAULT,
change_status=DEFAULT,
readyRead=DEFAULT,
get_socket=DEFAULT) as mock_pjlink:
mock_command.return_value = S_CONNECT
# WHEN: get_data called with OK
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['receive_data_signal'].assert_called_once()
mock_pjlink['_trash_buffer'].assert_not_called()
mock_pjlink['send_command'].assert_called_with(cmd='CLSS', priority=True)
mock_pjlink['change_status'].assert_called_with(S_CONNECTED)
mock_pjlink['readyRead'].connect.assert_called_once_with(mock_pjlink['get_socket'])
mock_command.assert_called_with(pjlink, t_cmd, t_data)
def test_s_authenticate(pjlink, caplog):
"""
Test projector connect with no authenticate
"""
'''
Projector sends "PJLINK 1 <salt>"
Combine salt+pin
Send first command "<hash><PJLINK_PREFIX><CLASS><CMD><sp><OPTS>"
Projector will either:
- Not reply (socket timeout) indicating invalid hash
- Reply with command reply
Example in documentation:
- Send "<cr>"
- Received "PJLINK 1 498e4a67"
- Prefix = "%"
- Salt = "498e4a67"
- Pin = "JBMIAProjectorLink"
- Hash = "5d8409bc1c3fa39749434aa3a5c38682"
- Send "5d8409bc1c3fa39749434aa3a5c38682%1CLSS<sp>?"
- Reply "%1CLSS=1"
'''
# GIVEN: Initial setup
# t_pin = "JBMIAProjectorLink"
# t_salt = "498e4a67"
# t_hash = "5d8409bc1c3fa39749434aa3a5c38682"
t_salt = '498e4a67'
t_hash = '5d8409bc1c3fa39749434aa3a5c38682'
t_pin = "JBMIAProjectorLink"
t_cmd = "PJLINK"
t_ver = "1"
t_data = f"1 {t_salt}"
t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}"
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Connecting with pin'),
(f'{test_qmd5}.__init__', logging.DEBUG,
f'qmd5_hash(salt="b\'{t_salt}\'"'),
(f'{test_qmd5}.__init__', logging.DEBUG,
f'qmd5_hash() returning "b\'{t_hash}\'"')
]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT,
send_command=DEFAULT,
change_status=DEFAULT,
readyRead=DEFAULT,
get_socket=DEFAULT) as mock_pjlink:
mock_command.return_value = S_AUTHENTICATE
pjlink.pin = t_pin
# WHEN: get_data called with OK
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['receive_data_signal'].assert_called_once()
mock_pjlink['_trash_buffer'].assert_not_called()
mock_pjlink['send_command'].assert_called_with(cmd='CLSS', salt=t_hash, priority=True)
mock_pjlink['change_status'].assert_called_with(S_CONNECTED)
mock_pjlink['readyRead'].connect.assert_called_once_with(mock_pjlink['get_socket'])
mock_command.assert_called_with(pjlink, t_cmd, t_data)
def test_e_authenticate(pjlink, caplog):
"""
Test projector connect with invalid pin
"""
t_salt = '498e4a67'
t_cmd = "PJLINK"
t_ver = "1"
t_data = f"1 {t_salt}"
t_buff = f"{PJLINK_PREFIX}{t_ver}{t_cmd}={t_data}"
logs = [(f"{test_module}", logging.DEBUG,
f'({pjlink.entry.name}) get_data(buffer="{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) Setting ignore_class to "False"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data(): Checking new data "{t_buff}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() header="{PJLINK_PREFIX}{t_ver}{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.DEBUG,
f'({pjlink.entry.name}) get_data() version="{t_ver}" cmd="{t_cmd}" data="{t_data}"'),
(f'{test_module}', logging.WARNING,
f'({pjlink.entry.name}) Failed authentication - disconnecting')
]
with patch.object(openlp.core.projectors.pjlink, "process_command") as mock_command, \
patch.multiple(pjlink,
_trash_buffer=DEFAULT,
receive_data_signal=DEFAULT,
disconnect_from_host=DEFAULT,
change_status=DEFAULT,
projectorAuthentication=DEFAULT) as mock_pjlink:
mock_command.return_value = E_AUTHENTICATION
# WHEN: get_data called with OK
caplog.set_level(logging.DEBUG)
t_check = pjlink.get_data(buff=t_buff)
# THEN: t_check should be None and log entry made
assert t_check is None, "Invalid return code"
assert caplog.record_tuples == logs, "Invalid log entries"
mock_pjlink['receive_data_signal'].assert_called_once()
mock_pjlink['_trash_buffer'].assert_not_called()
mock_pjlink['change_status'].assert_called_with(status=E_AUTHENTICATION)
mock_pjlink['projectorAuthentication'].emit.assert_called_with(pjlink.entry.name)
mock_command.assert_called_with(pjlink, t_cmd, t_data)

View File

@ -1,115 +0,0 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2021 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
Package to test the openlp.core.projectors.pjlink command routing.
"""
from unittest.mock import call, patch
import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import PJLINK_PREFIX
@patch.object(openlp.core.projectors.pjlinkcommands, 'process_command')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_projector_get_data_invalid_version(mock_log, mock_process_cmd, pjlink):
"""
Test projector received valid command invalid version
"""
# GIVEN: Test object
log_warning_text = [call('({ip}) _send_command(): Nothing to send - returning'.format(ip=pjlink.name)),
call('({ip}) get_data() Command reply version does not match '
'a valid command version'.format(ip=pjlink.name))]
log_debug_text = [call('({ip}) get_data(buffer="{pre}XCLSS=X"'.format(ip=pjlink.name, pre=PJLINK_PREFIX)),
call('({ip}) get_data(): Checking new data "{pre}XCLSS=X"'.format(ip=pjlink.name,
pre=PJLINK_PREFIX)),
call('({ip}) get_data() header="{pre}XCLSS" data="X"'.format(ip=pjlink.name,
pre=PJLINK_PREFIX)),
call('({ip}) get_data() version="X" cmd="CLSS" data="X"'.format(ip=pjlink.name)),
call('({ip}) Cleaning buffer - msg = "get_data() Command reply version does '
'not match a valid command version"'.format(ip=pjlink.name)),
call('({ip}) Finished cleaning buffer - 0 bytes dropped'.format(ip=pjlink.name))]
# WHEN: get_data called with an unknown command
pjlink.get_data(buff='{prefix}XCLSS=X'.format(prefix=PJLINK_PREFIX))
# THEN: Appropriate log entries should have been made and methods called/not called
mock_log.warning.assert_has_calls(log_warning_text)
mock_log.debug.assert_has_calls(log_debug_text)
assert (mock_process_cmd.call_count == 0), 'process_command should not have been called'
@patch.object(openlp.core.projectors.pjlinkcommands, 'process_command')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_projector_get_data_unknown_command(mock_log, mock_process_cmd, pjlink):
"""
Test projector receiving invalid command
"""
# GIVEN: Test object
log_warning_text = [call('({ip}) _send_command(): Nothing to send - '
'returning'.format(ip=pjlink.name)),
call('({ip}) get_data(): Invalid packet - '
'unknown command "UNKN"'.format(ip=pjlink.name))]
log_debug_text = [call('({ip}) get_data(buffer="{pre}1UNKN=Huh?"'.format(ip=pjlink.name,
pre=PJLINK_PREFIX)),
call('({ip}) get_data(): Checking new data "{pre}1UNKN=Huh?"'.format(ip=pjlink.name,
pre=PJLINK_PREFIX)),
call('({ip}) get_data() header="{pre}1UNKN" data="Huh?"'.format(ip=pjlink.name,
pre=PJLINK_PREFIX)),
call('({ip}) get_data() version="1" cmd="UNKN" data="Huh?"'.format(ip=pjlink.name)),
call('({ip}) Cleaning buffer - msg = "get_data(): Invalid packet - '
'unknown command "UNKN""'.format(ip=pjlink.name)),
call('({ip}) Finished cleaning buffer - 0 bytes dropped'.format(ip=pjlink.name))]
# WHEN: get_data called with an unknown command
pjlink.get_data(buff='{prefix}1UNKN=Huh?'.format(prefix=PJLINK_PREFIX))
# THEN: Appropriate log entries should have been made and methods called/not called
mock_log.warning.assert_has_calls(log_warning_text)
mock_log.debug.assert_has_calls(log_debug_text)
assert (mock_process_cmd.call_count == 0), 'process_command should not have been called'
@patch.object(openlp.core.projectors.pjlinkcommands, 'process_command')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_projector_get_data_version_mismatch(mock_log, mock_process_cmd, pjlink):
"""
Test projector received valid command with command version higher than projector
"""
# GIVEN: Test object
log_warning_text = [call('({ip}) _send_command(): Nothing to send - returning'.format(ip=pjlink.name)),
call('({ip}) get_data(): Projector returned class reply higher than projector '
'stated class'.format(ip=pjlink.name))]
log_debug_text = [call('({ip}) get_data(buffer="{pre}2ACKN=X"'.format(ip=pjlink.name,
pre=PJLINK_PREFIX)),
call('({ip}) get_data(): Checking new data "{pre}2ACKN=X"'.format(ip=pjlink.name,
pre=PJLINK_PREFIX)),
call('({ip}) get_data() header="{pre}2ACKN" data="X"'.format(ip=pjlink.name,
pre=PJLINK_PREFIX)),
call('({ip}) get_data() version="2" cmd="ACKN" data="X"'.format(ip=pjlink.name))]
pjlink.pjlink_class = '1'
# WHEN: get_data called with an unknown command
pjlink.get_data(buff='{prefix}2ACKN=X'.format(prefix=PJLINK_PREFIX))
# THEN: Appropriate log entries should have been made and methods called/not called
mock_log.warning.assert_has_calls(log_warning_text)
mock_log.debug.assert_has_calls(log_debug_text)
assert (mock_process_cmd.call_count == 0), 'process_command should not have been called'