PJLink2-M updates

This commit is contained in:
Ken Roberts 2017-12-03 16:24:47 -08:00
parent c0fb7bc3aa
commit 159056f06f
9 changed files with 449 additions and 193 deletions

View File

@ -25,8 +25,6 @@
Initialization for the openlp.core.projectors modules.
"""
from openlp.core.projectors.constants import PJLINK_PORT, ERROR_MSG, ERROR_STRING
class DialogSourceStyle(object):
"""

View File

@ -144,6 +144,24 @@ PJLINK_VALID_CMD = {
}
}
# QAbstractSocketState enums converted to string
S_QSOCKET_STATE = {
0: 'QSocketState - UnconnectedState',
1: 'QSocketState - HostLookupState',
2: 'QSocketState - ConnectingState',
3: 'QSocketState - ConnectedState',
4: 'QSocketState - BoundState',
5: 'QSocketState - ListeningState (internal use only)',
6: 'QSocketState - ClosingState',
'UnconnectedState': 0,
'HostLookupState': 1,
'ConnectingState': 2,
'ConnectedState': 3,
'BoundState': 4,
'ListeningState': 5,
'ClosingState': 6,
}
# Error and status codes
S_OK = E_OK = 0 # E_OK included since I sometimes forget
# Error codes. Start at 200 so we don't duplicate system error codes.

View File

@ -415,7 +415,7 @@ class ProjectorDB(Manager):
for key in projector.source_available:
item = self.get_object_filtered(ProjectorSource,
and_(ProjectorSource.code == key,
ProjectorSource.projector_id == projector.dbid))
ProjectorSource.projector_id == projector.id))
if item is None:
source_dict[key] = PJLINK_DEFAULT_CODES[key]
else:

View File

@ -58,8 +58,7 @@ from openlp.core.projectors.constants import CONNECTION_ERRORS, CR, ERROR_MSG, E
E_AUTHENTICATION, E_CONNECTION_REFUSED, E_GENERAL, E_INVALID_DATA, E_NETWORK, E_NOT_CONNECTED, E_OK, \
E_PARAMETER, E_PROJECTOR, E_SOCKET_TIMEOUT, E_UNAVAILABLE, E_UNDEFINED, PJLINK_ERRORS, PJLINK_ERST_DATA, \
PJLINK_ERST_STATUS, PJLINK_MAX_PACKET, PJLINK_PORT, PJLINK_POWR_STATUS, PJLINK_VALID_CMD, \
STATUS_STRING, S_CONNECTED, S_CONNECTING, S_INFO, S_NETWORK_RECEIVED, S_NETWORK_SENDING, \
S_NOT_CONNECTED, S_OFF, S_OK, S_ON, S_STATUS
STATUS_STRING, S_CONNECTED, S_CONNECTING, S_INFO, S_NOT_CONNECTED, S_OFF, S_OK, S_ON, S_QSOCKET_STATE, S_STATUS
log = logging.getLogger(__name__)
log.debug('pjlink loaded')
@ -123,7 +122,8 @@ class PJLinkCommands(object):
'INST': self.process_inst,
'LAMP': self.process_lamp,
'NAME': self.process_name,
'PJLINK': self.check_login,
'PJLINK': self.process_pjlink,
# 'PJLINK': self.check_login,
'POWR': self.process_powr,
'SNUM': self.process_snum,
'SVER': self.process_sver,
@ -135,7 +135,8 @@ class PJLinkCommands(object):
"""
Initialize instance variables. Also used to reset projector-specific information to default.
"""
log.debug('({ip}) reset_information() connect status is {state}'.format(ip=self.ip, state=self.state()))
log.debug('({ip}) reset_information() connect status is {state}'.format(ip=self.ip,
state=S_QSOCKET_STATE[self.state()]))
self.fan = None # ERST
self.filter_time = None # FILT
self.lamp = None # LAMP
@ -165,6 +166,7 @@ class PJLinkCommands(object):
self.socket_timer.stop()
self.send_busy = False
self.send_queue = []
self.priority_queue = []
def process_command(self, cmd, data):
"""
@ -176,18 +178,19 @@ class PJLinkCommands(object):
log.debug('({ip}) Processing command "{cmd}" with data "{data}"'.format(ip=self.ip,
cmd=cmd,
data=data))
# Check if we have a future command not available yet
_cmd = cmd.upper()
# cmd should already be in uppercase, but data may be in mixed-case.
# Due to some replies should stay as mixed-case, validate using separate uppercase check
_data = data.upper()
if _cmd not in PJLINK_VALID_CMD:
# Check if we have a future command not available yet
if cmd not in PJLINK_VALID_CMD:
log.error("({ip}) Ignoring command='{cmd}' (Invalid/Unknown)".format(ip=self.ip, cmd=cmd))
return
elif _data == 'OK':
log.debug('({ip}) Command "{cmd}" returned OK'.format(ip=self.ip, cmd=cmd))
# A command returned successfully, no further processing needed
return
elif _cmd not in self.pjlink_functions:
log.warning("({ip}) Unable to process command='{cmd}' (Future option)".format(ip=self.ip, cmd=cmd))
log.debug("({ip}) Command '{cmd}' returned OK".format(ip=self.ip, cmd=cmd))
# A command returned successfully, so do a query on command to verify status
return self.send_command(cmd=cmd)
elif cmd not in self.pjlink_functions:
log.warning("({ip}) Unable to process command='{cmd}' (Future option?)".format(ip=self.ip, cmd=cmd))
return
elif _data in PJLINK_ERRORS:
# Oops - projector error
@ -211,12 +214,10 @@ class PJLinkCommands(object):
elif _data == PJLINK_ERRORS[E_PROJECTOR]:
# Projector/display error
self.change_status(E_PROJECTOR)
self.receive_data_signal()
return
# Command checks already passed
log.debug('({ip}) Calling function for {cmd}'.format(ip=self.ip, cmd=cmd))
self.receive_data_signal()
self.pjlink_functions[_cmd](data)
self.pjlink_functions[cmd](data)
def process_avmt(self, data):
"""
@ -430,6 +431,51 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector PJLink name to "{data}"'.format(ip=self.ip, data=self.pjlink_name))
return
def process_pjlink(self, data):
"""
Process initial socket connection to terminal.
:param data: Initial packet with authentication scheme
"""
log.debug("({ip}) Processing PJLINK command".format(ip=self.ip))
chk = data.split(" ")
if len(chk[0]) != 1:
# Invalid - after splitting, first field should be 1 character, either '0' or '1' only
log.error("({ip}) Invalid initial authentication scheme - aborting".format(ip=self.ip))
return self.disconnect_from_host()
elif chk[0] == '0':
# Normal connection no authentication
if len(chk) > 1:
# Invalid data - there should be nothing after a normal authentication scheme
log.error("({ip}) Normal connection with extra information - aborting".format(ip=self.ip))
return self.disconnect_from_host()
elif self.pin:
log.error("({ip}) Normal connection but PIN set - aborting".format(ip=self.ip))
return self.disconnect_from_host()
else:
data_hash = None
elif chk[0] == '1':
if len(chk) < 2:
# Not enough information for authenticated connection
log.error("({ip}) Authenticated connection but not enough info - aborting".format(ip=self.ip))
return self.disconnect_from_host()
elif not self.pin:
log.error("({ip}) Authenticate connection but no PIN - aborting".format(ip=self.ip))
return self.disconnect_from_host()
else:
data_hash = str(qmd5_hash(salt=chk[1].encode('utf-8'), data=self.pin.encode('utf-8')),
encoding='ascii')
# Passed basic checks, so start connection
self.readyRead.connect(self.get_socket)
if not self.no_poll:
log.debug('({ip}) process_pjlink(): Starting timer'.format(ip=self.ip))
self.timer.setInterval(2000) # Set 2 seconds for initial information
self.timer.start()
self.change_status(S_CONNECTED)
log.debug("({ip}) process_pjlink(): Sending 'CLSS' initial command'".format(ip=self.ip))
# Since this is an initial connection, make it a priority just in case
return self.send_command(cmd="CLSS", salt=data_hash, priority=True)
def process_powr(self, data):
"""
Power status. See PJLink specification for format.
@ -573,6 +619,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.widget = None # QListBox entry
self.timer = None # Timer that calls the poll_loop
self.send_queue = []
self.priority_queue = []
self.send_busy = False
# Socket timer for some possible brain-dead projectors or network cable pulled
self.socket_timer = None
@ -586,6 +633,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.connected.connect(self.check_login)
self.disconnected.connect(self.disconnect_from_host)
self.error.connect(self.get_error)
self.projectorReceivedData.connect(self._send_command)
def thread_stopped(self):
"""
@ -608,6 +656,11 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.projectorReceivedData.disconnect(self._send_command)
except TypeError:
pass
try:
self.readyRead.connect(self.get_socket) # Set in process_pjlink
except TypeError:
pass
self.disconnect_from_host()
self.deleteLater()
self.i_am_running = False
@ -625,10 +678,10 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
Retrieve information from projector that changes.
Normally called by timer().
"""
if self.state() != self.ConnectedState:
if self.state() != S_QSOCKET_STATE['ConnectedState']:
log.warning("({ip}) poll_loop(): Not connected - returning".format(ip=self.ip))
return
log.debug('({ip}) Updating projector status'.format(ip=self.ip))
log.debug('({ip}) poll_loop(): Updating projector status'.format(ip=self.ip))
# Reset timer in case we were called from a set command
if self.timer.interval() < self.poll_time:
# Reset timer to 5 seconds
@ -640,28 +693,28 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
if self.pjlink_class == '2':
check_list.extend(['FILT', 'FREZ'])
for command in check_list:
self.send_command(command, queue=True)
self.send_command(command)
# The following commands do not change, so only check them once
if self.power == S_ON and self.source_available is None:
self.send_command('INST', queue=True)
self.send_command('INST')
if self.other_info is None:
self.send_command('INFO', queue=True)
self.send_command('INFO')
if self.manufacturer is None:
self.send_command('INF1', queue=True)
self.send_command('INF1')
if self.model is None:
self.send_command('INF2', queue=True)
self.send_command('INF2')
if self.pjlink_name is None:
self.send_command('NAME', queue=True)
self.send_command('NAME')
if self.pjlink_class == '2':
# Class 2 specific checks
if self.serial_no is None:
self.send_command('SNUM', queue=True)
self.send_command('SNUM')
if self.sw_version is None:
self.send_command('SVER', queue=True)
self.send_command('SVER')
if self.model_filter is None:
self.send_command('RFIL', queue=True)
self.send_command('RFIL')
if self.model_lamp is None:
self.send_command('RLMP', queue=True)
self.send_command('RLMP')
def _get_status(self, status):
"""
@ -713,14 +766,12 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
code=status_code,
message=status_message if msg is None else msg))
self.changeStatus.emit(self.ip, status, message)
self.projectorUpdateIcons.emit()
@QtCore.pyqtSlot()
def check_login(self, data=None):
"""
Processes the initial connection and authentication (if needed).
Starts poll timer if connection is established.
NOTE: Qt md5 hash function doesn't work with projector authentication. Use the python md5 hash function.
Processes the initial connection and convert to a PJLink packet if valid initial connection
:param data: Optional data if called from another routine
"""
@ -733,12 +784,12 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.change_status(E_SOCKET_TIMEOUT)
return
read = self.readLine(self.max_size)
self.readLine(self.max_size) # Clean out the trailing \r\n
self.readLine(self.max_size) # Clean out any trailing whitespace
if read is None:
log.warning('({ip}) read is None - socket error?'.format(ip=self.ip))
return
elif len(read) < 8:
log.warning('({ip}) Not enough data read)'.format(ip=self.ip))
log.warning('({ip}) Not enough data read - skipping'.format(ip=self.ip))
return
data = decode(read, 'utf-8')
# Possibility of extraneous data on input when reading.
@ -750,9 +801,16 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
# PJLink initial login will be:
# 'PJLink 0' - Unauthenticated login - no extra steps required.
# 'PJLink 1 XXXXXX' Authenticated login - extra processing required.
if not data.upper().startswith('PJLINK'):
# Invalid response
if not data.startswith('PJLINK'):
# Invalid initial packet - close socket
log.error("({ip}) Invalid initial packet received - closing socket".format(ip=self.ip))
return self.disconnect_from_host()
log.debug("({ip}) check_login(): Formatting initial connection prompt to PJLink packet".format(ip=self.ip))
return self.get_data("{start}{clss}{data}".format(start=PJLINK_PREFIX,
clss="1",
data=data.replace(" ", "=", 1)).encode('utf-8'))
# TODO: The below is replaced by process_pjlink() - remove when working properly
"""
if '=' in data:
# Processing a login reply
data_check = data.strip().split('=')
@ -801,6 +859,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
log.debug('({ip}) Starting timer'.format(ip=self.ip))
self.timer.setInterval(2000) # Set 2 seconds for initial information
self.timer.start()
"""
def _trash_buffer(self, msg=None):
"""
@ -848,32 +907,43 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
log.debug('({ip}) get_socket(): No data available (-1)'.format(ip=self.ip))
return self.receive_data_signal()
self.socket_timer.stop()
return self.get_data(buff=read, ip=self.ip)
self.get_data(buff=read, ip=self.ip)
return self.receive_data_signal()
def get_data(self, buff, ip):
def get_data(self, buff, ip=None):
"""
Process received data
:param buff: Data to process.
:param ip: (optional) Destination IP.
"""
ip = self.ip if ip is None else ip
log.debug("({ip}) get_data(ip='{ip_in}' buffer='{buff}'".format(ip=self.ip, ip_in=ip, buff=buff))
# NOTE: Class2 has changed to some values being UTF-8
data_in = decode(buff, 'utf-8')
data = data_in.strip()
if (len(data) < 7) or (not data.startswith(PJLINK_PREFIX)):
return self._trash_buffer(msg='get_data(): Invalid packet - length or prefix')
# Initial packet checks
if (len(data) < 7):
return self._trash_buffer(msg="get_data(): Invalid packet - length")
elif len(data) > self.max_size:
return self._trash_buffer(msg='get_data(): Invalid packet - too long')
return self._trash_buffer(msg="get_data(): Invalid packet - too long")
elif not data.startswith(PJLINK_PREFIX):
return self._trash_buffer(msg="get_data(): Invalid packet - PJLink prefix missing")
elif '=' not in data:
return self._trash_buffer(msg='get_data(): Invalid packet does not have equal')
log.debug('({ip}) get_data(): Checking new data "{data}"'.format(ip=self.ip, data=data))
return self._trash_buffer(msg="get_data(): Invalid packet - Does not have '='")
log.debug("({ip}) get_data(): Checking new data '{data}'".format(ip=self.ip, data=data))
header, data = data.split('=')
# At this point, the header should contain:
# "PVCCCC"
# Where:
# P = PJLINK_PREFIX
# V = PJLink class or version
# C = PJLink command
try:
version, cmd = header[1], header[2:]
version, cmd = header[1], header[2:].upper()
except ValueError as e:
self.change_status(E_INVALID_DATA)
log.warning('({ip}) get_data(): Received data: "{data}"'.format(ip=self.ip, data=data_in.strip()))
log.warning('({ip}) get_data(): Received data: "{data}"'.format(ip=self.ip, data=data_in))
return self._trash_buffer('get_data(): Expected header + command + data')
if cmd not in PJLINK_VALID_CMD:
log.warning('({ip}) get_data(): Invalid packet - unknown command "{data}"'.format(ip=self.ip, data=cmd))
@ -881,6 +951,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
if int(self.pjlink_class) < int(version):
log.warning('({ip}) get_data(): Projector returned class reply higher '
'than projector stated class'.format(ip=self.ip))
self.send_busy = False
return self.process_command(cmd, data)
@QtCore.pyqtSlot(QtNetwork.QAbstractSocket.SocketError)
@ -910,19 +981,18 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.reset_information()
return
def send_command(self, cmd, opts='?', salt=None, queue=False):
def send_command(self, cmd, opts='?', salt=None, priority=False):
"""
Add command to output queue if not already in queue.
:param cmd: Command to send
:param opts: Command option (if any) - defaults to '?' (get information)
:param salt: Optional salt for md5 hash initial authentication
:param queue: Option to force add to queue rather than sending directly
:param priority: Option to send packet now rather than queue it up
"""
if self.state() != self.ConnectedState:
log.warning('({ip}) send_command(): Not connected - returning'.format(ip=self.ip))
self.send_queue = []
return
return self.reset_information()
if cmd not in PJLINK_VALID_CMD:
log.error('({ip}) send_command(): Invalid command requested - ignoring.'.format(ip=self.ip))
return
@ -939,28 +1009,26 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
header = PJLINK_HEADER.format(linkclass=cmd_ver[0])
else:
# NOTE: Once we get to version 3 then think about looping
log.error('({ip}): send_command(): PJLink class check issue? aborting'.format(ip=self.ip))
log.error('({ip}): send_command(): PJLink class check issue? Aborting'.format(ip=self.ip))
return
out = '{salt}{header}{command} {options}{suffix}'.format(salt="" if salt is None else salt,
header=header,
command=cmd,
options=opts,
suffix=CR)
if out in self.send_queue:
# Already there, so don't add
log.debug('({ip}) send_command(out="{data}") Already in queue - skipping'.format(ip=self.ip,
data=out.strip()))
elif not queue and len(self.send_queue) == 0:
# Nothing waiting to send, so just send it
log.debug('({ip}) send_command(out="{data}") Sending data'.format(ip=self.ip, data=out.strip()))
return self._send_command(data=out)
if out in self.priority_queue:
log.debug("({ip}) send_command(): Already in priority queue - skipping".format(ip=self.ip))
elif out in self.send_queue:
log.debug("({ip}) send_command(): Already in normal queue - skipping".format(ip=self.ip))
else:
log.debug('({ip}) send_command(out="{data}") adding to queue'.format(ip=self.ip, data=out.strip()))
self.send_queue.append(out)
self.projectorReceivedData.emit()
log.debug('({ip}) send_command(): send_busy is {data}'.format(ip=self.ip, data=self.send_busy))
if not self.send_busy:
log.debug('({ip}) send_command() calling _send_string()'.format(ip=self.ip))
if priority:
log.debug("({ip}) send_command(): Adding to priority queue".format(ip=self.ip))
self.priority_queue.append(out)
else:
log.debug("({ip}) send_command(): Adding to normal queue".format(ip=self.ip))
self.send_queue.append(out)
if self.priority_queue or self.send_queue:
# May be some initial connection setup so make sure we send data
self._send_command()
@QtCore.pyqtSlot()
@ -971,43 +1039,53 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
:param data: Immediate data to send
:param utf8: Send as UTF-8 string otherwise send as ASCII string
"""
log.debug('({ip}) _send_string()'.format(ip=self.ip))
log.debug('({ip}) _send_string(): Connection status: {data}'.format(ip=self.ip, data=self.state()))
# Funny looking data check, but it's a quick check for data=None
log.debug("({ip}) _send_command(data='{data}')".format(ip=self.ip, data=data.strip() if data else data))
log.debug('({ip}) _send_command(): Connection status: {data}'.format(ip=self.ip,
data=S_QSOCKET_STATE[self.state()]))
if self.state() != self.ConnectedState:
log.debug('({ip}) _send_string() Not connected - abort'.format(ip=self.ip))
self.send_queue = []
log.debug('({ip}) _send_command() Not connected - abort'.format(ip=self.ip))
self.send_busy = False
return
return self.disconnect_from_host()
if data and data not in self.priority_queue:
log.debug("({ip}) _send_command(): Priority packet - adding to priority queue".format(ip=self.ip))
self.priority_queue.append(data)
if self.send_busy:
# Still waiting for response from last command sent
log.debug("({ip}) _send_command(): Still busy, returning".format(ip=self.ip))
log.debug('({ip}) _send_command(): Priority queue = {data}'.format(ip=self.ip, data=self.priority_queue))
log.debug('({ip}) _send_command(): Normal queue = {data}'.format(ip=self.ip, data=self.send_queue))
return
if data is not None:
out = data
log.debug('({ip}) _send_string(data="{data}")'.format(ip=self.ip, data=out.strip()))
if len(self.priority_queue) != 0:
out = self.priority_queue.pop(0)
log.debug("({ip}) _send_command(): Getting priority queued packet".format(ip=self.ip))
elif len(self.send_queue) != 0:
out = self.send_queue.pop(0)
log.debug('({ip}) _send_string(queued data="{data}"%s)'.format(ip=self.ip, data=out.strip()))
log.debug('({ip}) _send_command(): Getting normal queued packet'.format(ip=self.ip))
else:
# No data to send
log.debug('({ip}) _send_string(): No data to send'.format(ip=self.ip))
log.debug('({ip}) _send_command(): No data to send'.format(ip=self.ip))
self.send_busy = False
return
self.send_busy = True
log.debug('({ip}) _send_string(): Sending "{data}"'.format(ip=self.ip, data=out.strip()))
log.debug('({ip}) _send_string(): Queue = {data}'.format(ip=self.ip, data=self.send_queue))
log.debug('({ip}) _send_command(): Sending "{data}"'.format(ip=self.ip, data=out.strip()))
self.socket_timer.start()
sent = self.write(out.encode('{string_encoding}'.format(string_encoding='utf-8' if utf8 else 'ascii')))
self.waitForBytesWritten(2000) # 2 seconds should be enough
if sent == -1:
# Network error?
log.warning("({ip}) _send_command(): -1 received".format(ip=self.ip))
log.warning("({ip}) _send_command(): -1 received - disconnecting from host".format(ip=self.ip))
self.change_status(E_NETWORK,
translate('OpenLP.PJLink', 'Error while sending data to projector'))
self.disconnect_from_host()
def connect_to_host(self):
"""
Initiate connection to projector.
"""
log.debug("{ip}) connect_to_host(): Starting connection".format(ip=self.ip))
if self.state() == self.ConnectedState:
log.warning('({ip}) connect_to_host(): Already connected - returning'.format(ip=self.ip))
return
@ -1023,22 +1101,19 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
if abort:
log.warning('({ip}) disconnect_from_host(): Aborting connection'.format(ip=self.ip))
else:
log.warning('({ip}) disconnect_from_host(): Not connected - returning'.format(ip=self.ip))
self.reset_information()
log.warning('({ip}) disconnect_from_host(): Not connected'.format(ip=self.ip))
self.disconnectFromHost()
try:
self.readyRead.disconnect(self.get_socket)
except TypeError:
pass
log.debug('({ip}) disconnect_from_host() '
'Current status {data}'.format(ip=self.ip, data=self._get_status(self.status_connect)[0]))
if abort:
self.change_status(E_NOT_CONNECTED)
else:
log.debug('({ip}) disconnect_from_host() '
'Current status {data}'.format(ip=self.ip, data=self._get_status(self.status_connect)[0]))
if self.status_connect != E_NOT_CONNECTED:
self.change_status(S_NOT_CONNECTED)
self.change_status(S_NOT_CONNECTED)
self.reset_information()
self.projectorUpdateIcons.emit()
def get_av_mute_status(self):
"""

View File

@ -23,12 +23,11 @@
Package to test the openlp.core.projectors.pjlink base package.
"""
from unittest import TestCase
from unittest.mock import patch
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink
from tests.resources.projector.data import TEST_PIN, TEST_CONNECT_AUTHENTICATE, TEST_HASH, TEST1_DATA
from tests.resources.projector.data import TEST1_DATA
class TestPJLinkBugs(TestCase):
@ -80,43 +79,17 @@ class TestPJLinkBugs(TestCase):
"""
Test bug 1593882 no pin and authenticated request exception
"""
# GIVEN: Test object and mocks
mock_socket_timer = patch.object(self.pjlink_test, 'socket_timer').start()
mock_timer = patch.object(self.pjlink_test, 'timer').start()
mock_authentication = patch.object(self.pjlink_test, 'projectorAuthentication').start()
mock_ready_read = patch.object(self.pjlink_test, 'waitForReadyRead').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
pjlink.pin = None
mock_ready_read.return_value = True
# WHEN: call with authentication request and pin not set
pjlink.check_login(data=TEST_CONNECT_AUTHENTICATE)
# THEN: 'No Authentication' signal should have been sent
mock_authentication.emit.assert_called_with(pjlink.ip)
# Test now part of test_projector_pjlink_commands_02
# Keeping here for bug reference
pass
def test_bug_1593883_pjlink_authentication(self):
"""
Test bugfix 1593883 pjlink authentication
Test bugfix 1593883 pjlink authentication and ticket 92187
"""
# GIVEN: Test object and data
mock_socket_timer = patch.object(self.pjlink_test, 'socket_timer').start()
mock_timer = patch.object(self.pjlink_test, 'timer').start()
mock_send_command = patch.object(self.pjlink_test, 'write').start()
mock_state = patch.object(self.pjlink_test, 'state').start()
mock_waitForReadyRead = patch.object(self.pjlink_test, 'waitForReadyRead').start()
pjlink = self.pjlink_test
pjlink.pin = TEST_PIN
mock_state.return_value = pjlink.ConnectedState
mock_waitForReadyRead.return_value = True
# WHEN: Athenticated connection is called
pjlink.check_login(data=TEST_CONNECT_AUTHENTICATE)
# THEN: send_command should have the proper authentication
self.assertEqual("{test}".format(test=mock_send_command.call_args),
"call(b'{hash}%1CLSS ?\\r')".format(hash=TEST_HASH))
# Test now part of test_projector_pjlink_commands_02
# Keeping here for bug reference
pass
def test_bug_1734275_process_lamp_nonstandard_reply(self):
"""

View File

@ -25,11 +25,11 @@ Package to test the openlp.core.projectors.pjlink base package.
from unittest import TestCase
from unittest.mock import call, patch, MagicMock
from openlp.core.projectors.constants import E_PARAMETER, ERROR_STRING, S_ON, S_CONNECTED
from openlp.core.projectors.constants import E_PARAMETER, ERROR_STRING, S_ON, S_CONNECTED, S_QSOCKET_STATE
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink
from tests.resources.projector.data import TEST_PIN, TEST_SALT, TEST_CONNECT_AUTHENTICATE, TEST1_DATA
from tests.resources.projector.data import TEST1_DATA
pjlink_test = PJLink(Projector(**TEST1_DATA), no_poll=True)
@ -38,29 +38,17 @@ class TestPJLinkBase(TestCase):
"""
Tests for the PJLink module
"""
@patch.object(pjlink_test, 'readyRead')
@patch.object(pjlink_test, 'send_command')
@patch.object(pjlink_test, 'waitForReadyRead')
@patch('openlp.core.common.qmd5_hash')
def test_authenticated_connection_call(self,
mock_qmd5_hash,
mock_waitForReadyRead,
mock_send_command,
mock_readyRead):
"""
Ticket 92187: Fix for projector connect with PJLink authentication exception.
"""
# GIVEN: Test object
pjlink = pjlink_test
def setUp(self):
'''
TestPJLinkCommands part 2 initialization
'''
self.pjlink_test = PJLink(Projector(**TEST1_DATA), no_poll=True)
# WHEN: Calling check_login with authentication request:
pjlink.check_login(data=TEST_CONNECT_AUTHENTICATE)
# THEN: Should have called qmd5_hash
self.assertTrue(mock_qmd5_hash.called_with(TEST_SALT,
"Connection request should have been called with TEST_SALT"))
self.assertTrue(mock_qmd5_hash.called_with(TEST_PIN,
"Connection request should have been called with TEST_PIN"))
def tearDown(self):
'''
TestPJLinkCommands part 2 cleanups
'''
self.pjlink_test = None
@patch.object(pjlink_test, 'change_status')
def test_status_change(self, mock_change_status):
@ -110,18 +98,18 @@ class TestPJLinkBase(TestCase):
# THEN: poll_loop should exit without calling any other method
self.assertFalse(pjlink.timer.called, 'Should have returned without calling any other method')
@patch.object(pjlink_test, 'send_command')
def test_poll_loop_start(self, mock_send_command):
def test_poll_loop_start(self):
"""
Test PJLink.poll_loop makes correct calls
"""
# GIVEN: test object and test data
pjlink = pjlink_test
pjlink.state = MagicMock()
pjlink.timer = MagicMock()
pjlink.timer.interval = MagicMock()
pjlink.timer.setInterval = MagicMock()
pjlink.timer.start = MagicMock()
# GIVEN: Mocks and test data
mock_state = patch.object(self.pjlink_test, 'state').start()
mock_state.return_value = S_QSOCKET_STATE['ConnectedState']
mock_timer = patch.object(self.pjlink_test, 'timer').start()
mock_timer.interval.return_value = 10
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
pjlink.poll_time = 20
pjlink.power = S_ON
pjlink.source_available = None
@ -130,19 +118,17 @@ class TestPJLinkBase(TestCase):
pjlink.model = None
pjlink.pjlink_name = None
pjlink.ConnectedState = S_CONNECTED
pjlink.timer.interval.return_value = 10
pjlink.state.return_value = S_CONNECTED
call_list = [
call('POWR', queue=True),
call('ERST', queue=True),
call('LAMP', queue=True),
call('AVMT', queue=True),
call('INPT', queue=True),
call('INST', queue=True),
call('INFO', queue=True),
call('INF1', queue=True),
call('INF2', queue=True),
call('NAME', queue=True),
call('POWR'),
call('ERST'),
call('LAMP'),
call('AVMT'),
call('INPT'),
call('INST'),
call('INFO'),
call('INF1'),
call('INF2'),
call('NAME'),
]
# WHEN: PJLink.poll_loop is called
@ -150,8 +136,8 @@ class TestPJLinkBase(TestCase):
# THEN: proper calls were made to retrieve projector data
# First, call to update the timer with the next interval
self.assertTrue(pjlink.timer.setInterval.called, 'Should have updated the timer')
self.assertTrue(mock_timer.setInterval.called)
# Next, should have called the timer to start
self.assertTrue(pjlink.timer.start.called, 'Should have started the timer')
self.assertTrue(mock_timer.start.called, 'Should have started the timer')
# Finally, should have called send_command with a list of projetctor status checks
mock_send_command.assert_has_calls(call_list, 'Should have queued projector queries')

View File

@ -46,6 +46,18 @@ class TestPJLinkRouting(TestCase):
"""
Tests for the PJLink module command routing
"""
def setUp(self):
'''
TestPJLinkCommands part 2 initialization
'''
self.pjlink_test = PJLink(Projector(**TEST1_DATA), no_poll=True)
def tearDown(self):
'''
TestPJLinkCommands part 2 cleanups
'''
self.pjlink_test = None
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_command_call_clss(self, mock_log):
"""
@ -163,21 +175,20 @@ class TestPJLinkRouting(TestCase):
mock_change_status.assert_called_once_with(E_AUTHENTICATION)
mock_log.error.assert_called_with(log_text)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_command_future(self, mock_log):
def test_process_command_future(self):
"""
Test command valid but no method to process yet
"""
# GIVEN: Test object
pjlink = pjlink_test
log_text = "(127.0.0.1) Unable to process command='CLSS' (Future option)"
mock_log.reset_mock()
# Remove a valid command so we can test valid command but not available yet
pjlink.pjlink_functions.pop('CLSS')
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_functions = patch.object(self.pjlink_test, 'pjlink_functions').start()
mock_functions.return_value = []
pjlink = self.pjlink_test
log_text = "(111.111.111.111) Unable to process command='CLSS' (Future option?)"
# WHEN: process_command called with an unknown command
with patch.object(pjlink, 'pjlink_functions') as mock_functions:
pjlink.process_command(cmd='CLSS', data='DONT CARE')
pjlink.process_command(cmd='CLSS', data='DONT CARE')
# THEN: Error should be logged and no command called
self.assertFalse(mock_functions.called, 'Should not have gotten to the end of the method')
@ -202,23 +213,20 @@ class TestPJLinkRouting(TestCase):
self.assertFalse(mock_functions.called, 'Should not have gotten to the end of the method')
mock_log.error.assert_called_once_with(log_text)
@patch.object(pjlink_test, 'pjlink_functions')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_command_ok(self, mock_log, mock_functions):
def test_process_command_ok(self):
"""
Test command returned success
"""
# GIVEN: Test object
pjlink = pjlink_test
mock_functions.reset_mock()
mock_log.reset_mock()
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
# WHEN: process_command called with an unknown command
pjlink.process_command(cmd='CLSS', data='OK')
log_text = '(127.0.0.1) Command "CLSS" returned OK'
pjlink = self.pjlink_test
log_text = "(111.111.111.111) Command 'POWR' returned OK"
# THEN: Error should be logged and no command called
self.assertFalse(mock_functions.called, 'Should not have gotten to the end of the method')
self.assertEqual(mock_log.debug.call_count, 2, 'log.debug() should have been called twice')
# Although we called it twice, only the last log entry is saved
# WHEN: process_command called with a command that returns OK
pjlink.process_command(cmd='POWR', data='OK')
# THEN: Appropriate calls should have been made
mock_log.debug.assert_called_with(log_text)
mock_send_command.assert_called_once_with(cmd='POWR')

View File

@ -47,7 +47,7 @@ for pos in range(0, len(PJLINK_ERST_DATA)):
class TestPJLinkCommands(TestCase):
"""
Tests for the PJLink module
Tests for the PJLinkCommands class part 1
"""
@patch.object(pjlink_test, 'changeStatus')
@patch.object(openlp.core.projectors.pjlink, 'log')

View File

@ -0,0 +1,198 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2015 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
Package to test the openlp.core.projectors.pjlink commands package.
"""
from unittest import TestCase
from unittest.mock import patch, call
import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import S_CONNECTED
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink
from tests.resources.projector.data import TEST_HASH, TEST_PIN, TEST_SALT, TEST1_DATA
class TestPJLinkCommands(TestCase):
"""
Tests for the PJLinkCommands class part 2
"""
def setUp(self):
'''
TestPJLinkCommands part 2 initialization
'''
self.pjlink_test = PJLink(Projector(**TEST1_DATA), no_poll=True)
def tearDown(self):
'''
TestPJLinkCommands part 2 cleanups
'''
self.pjlink_test = None
def test_process_pjlink_normal(self):
"""
Test initial connection prompt with no authentication
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, "log").start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
mock_readyRead = patch.object(self.pjlink_test, 'readyRead').start()
mock_change_status = patch.object(self.pjlink_test, 'change_status').start()
pjlink = self.pjlink_test
pjlink.pin = None
log_check = [call("({111.111.111.111}) process_pjlink(): Sending 'CLSS' initial command'"), ]
# WHEN: process_pjlink called with no authentication required
pjlink.process_pjlink(data="0")
# THEN: proper processing should have occured
mock_log.debug.has_calls(log_check)
mock_disconnect_from_host.assert_not_called()
mock_readyRead.connect.assert_called_once()
mock_change_status.assert_called_once_with(S_CONNECTED)
mock_send_command.assert_called_with(cmd='CLSS', priority=True, salt=None)
def test_process_pjlink_authenticate(self):
"""
Test initial connection prompt with authentication
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, "log").start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
mock_readyRead = patch.object(self.pjlink_test, 'readyRead').start()
mock_change_status = patch.object(self.pjlink_test, 'change_status').start()
pjlink = self.pjlink_test
pjlink.pin = TEST_PIN
log_check = [call("({111.111.111.111}) process_pjlink(): Sending 'CLSS' initial command'"), ]
# WHEN: process_pjlink called with no authentication required
pjlink.process_pjlink(data='1 {salt}'.format(salt=TEST_SALT))
# THEN: proper processing should have occured
mock_log.debug.has_calls(log_check)
mock_disconnect_from_host.assert_not_called()
mock_readyRead.connect.assert_called_once()
mock_change_status.assert_called_once_with(S_CONNECTED)
mock_send_command.assert_called_with(cmd='CLSS', priority=True, salt=TEST_HASH)
def test_process_pjlink_normal_pin_set_error(self):
"""
Test process_pjlinnk called with no authentication but pin is set
"""
# GIVEN: Initial mocks and data
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
pjlink.pin = TEST_PIN
log_check = [call('(111.111.111.111) Normal connection but PIN set - aborting'), ]
# WHEN: process_pjlink called with invalid authentication scheme
pjlink.process_pjlink(data='0')
# THEN: Proper calls should be made
mock_log.error.assert_has_calls(log_check)
mock_disconnect_from_host.assert_called_once()
mock_send_command.assert_not_called()
def test_process_pjlink_normal_with_salt_error(self):
"""
Test process_pjlinnk called with no authentication but pin is set
"""
# GIVEN: Initial mocks and data
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
pjlink.pin = TEST_PIN
log_check = [call('(111.111.111.111) Normal connection with extra information - aborting'), ]
# WHEN: process_pjlink called with invalid authentication scheme
pjlink.process_pjlink(data='0 {salt}'.format(salt=TEST_SALT))
# THEN: Proper calls should be made
mock_log.error.assert_has_calls(log_check)
mock_disconnect_from_host.assert_called_once()
mock_send_command.assert_not_called()
def test_process_pjlink_invalid_authentication_scheme_length_error(self):
"""
Test initial connection prompt with authentication scheme longer than 1 character
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
log_check = [call('(111.111.111.111) Invalid initial authentication scheme - aborting'), ]
# WHEN: process_pjlink called with invalid authentication scheme
pjlink.process_pjlink(data='01')
# THEN: socket should be closed and invalid data logged
mock_log.error.assert_has_calls(log_check)
mock_disconnect_from_host.assert_called_once()
mock_send_command.assert_not_called()
def test_process_pjlink_invalid_authentication_data_length_error(self):
"""
Test initial connection prompt with authentication no salt
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
log_check = [call('(111.111.111.111) Authenticated connection but not enough info - aborting'), ]
pjlink = self.pjlink_test
# WHEN: process_pjlink called with no salt
pjlink.process_pjlink(data='1')
# THEN: socket should be closed and invalid data logged
mock_log.error.assert_has_calls(log_check)
mock_disconnect_from_host.assert_called_once()
mock_send_command.assert_not_called()
def test_process_pjlink_authenticate_pin_not_set_error(self):
"""
Test process_pjlink authentication but pin not set
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
log_check = [call('(111.111.111.111) Authenticate connection but no PIN - aborting'), ]
pjlink = self.pjlink_test
pjlink.pin = None
# WHEN: process_pjlink called with no salt
pjlink.process_pjlink(data='1 {salt}'.format(salt=TEST_SALT))
# THEN: socket should be closed and invalid data logged
mock_log.error.assert_has_calls(log_check)
mock_disconnect_from_host.assert_called_once()
mock_send_command.assert_not_called()