- Added pjlink.process_pjlink

- Split pjlink.check_login() to use process_pjlink()
- Added QAbstractSocket connect enum to constants
- Minor code cleanups for connection and command processing
- Updated packet queueing
- Fix get_object_filtered()
- Fix tests in test_projector_pjlink_base
- Fix tests in test_projector_pjlink_cmd_routing
- Added tests for process_pjlink method
- Updated test_projector_bugfixes_01
- Some OLP style cleanups

-------------------------------------------------------...

bzr-revno: 2795
This commit is contained in:
Ken Roberts 2017-12-09 14:47:23 +00:00 committed by Tim Bentley
commit 63aa4927a5
9 changed files with 499 additions and 240 deletions

View File

@ -25,8 +25,6 @@
Initialization for the openlp.core.projectors modules.
"""
from openlp.core.projectors.constants import PJLINK_PORT, ERROR_MSG, ERROR_STRING
class DialogSourceStyle(object):
"""

View File

@ -144,6 +144,24 @@ PJLINK_VALID_CMD = {
}
}
# QAbstractSocketState enums converted to string
S_QSOCKET_STATE = {
0: 'QSocketState - UnconnectedState',
1: 'QSocketState - HostLookupState',
2: 'QSocketState - ConnectingState',
3: 'QSocketState - ConnectedState',
4: 'QSocketState - BoundState',
5: 'QSocketState - ListeningState (internal use only)',
6: 'QSocketState - ClosingState',
'UnconnectedState': 0,
'HostLookupState': 1,
'ConnectingState': 2,
'ConnectedState': 3,
'BoundState': 4,
'ListeningState': 5,
'ClosingState': 6
}
# Error and status codes
S_OK = E_OK = 0 # E_OK included since I sometimes forget
# Error codes. Start at 200 so we don't duplicate system error codes.

View File

@ -415,7 +415,7 @@ class ProjectorDB(Manager):
for key in projector.source_available:
item = self.get_object_filtered(ProjectorSource,
and_(ProjectorSource.code == key,
ProjectorSource.projector_id == projector.dbid))
ProjectorSource.projector_id == projector.id))
if item is None:
source_dict[key] = PJLINK_DEFAULT_CODES[key]
else:

View File

@ -58,8 +58,7 @@ from openlp.core.projectors.constants import CONNECTION_ERRORS, CR, ERROR_MSG, E
E_AUTHENTICATION, E_CONNECTION_REFUSED, E_GENERAL, E_INVALID_DATA, E_NETWORK, E_NOT_CONNECTED, E_OK, \
E_PARAMETER, E_PROJECTOR, E_SOCKET_TIMEOUT, E_UNAVAILABLE, E_UNDEFINED, PJLINK_ERRORS, PJLINK_ERST_DATA, \
PJLINK_ERST_STATUS, PJLINK_MAX_PACKET, PJLINK_PORT, PJLINK_POWR_STATUS, PJLINK_VALID_CMD, \
STATUS_STRING, S_CONNECTED, S_CONNECTING, S_INFO, S_NETWORK_RECEIVED, S_NETWORK_SENDING, \
S_NOT_CONNECTED, S_OFF, S_OK, S_ON, S_STATUS
STATUS_STRING, S_CONNECTED, S_CONNECTING, S_INFO, S_NOT_CONNECTED, S_OFF, S_OK, S_ON, S_QSOCKET_STATE, S_STATUS
log = logging.getLogger(__name__)
log.debug('pjlink loaded')
@ -111,7 +110,7 @@ class PJLinkCommands(object):
"""
log.debug('PJlinkCommands(args={args} kwargs={kwargs})'.format(args=args, kwargs=kwargs))
super().__init__()
# Map command to function
# Map PJLink command to method
self.pjlink_functions = {
'AVMT': self.process_avmt,
'CLSS': self.process_clss,
@ -123,7 +122,9 @@ class PJLinkCommands(object):
'INST': self.process_inst,
'LAMP': self.process_lamp,
'NAME': self.process_name,
'PJLINK': self.check_login,
'PJLINK': self.process_pjlink,
# TODO: Part of check_login refactor - remove when done
# 'PJLINK': self.check_login,
'POWR': self.process_powr,
'SNUM': self.process_snum,
'SVER': self.process_sver,
@ -135,7 +136,8 @@ class PJLinkCommands(object):
"""
Initialize instance variables. Also used to reset projector-specific information to default.
"""
log.debug('({ip}) reset_information() connect status is {state}'.format(ip=self.ip, state=self.state()))
log.debug('({ip}) reset_information() connect status is {state}'.format(ip=self.ip,
state=S_QSOCKET_STATE[self.state()]))
self.fan = None # ERST
self.filter_time = None # FILT
self.lamp = None # LAMP
@ -165,6 +167,7 @@ class PJLinkCommands(object):
self.socket_timer.stop()
self.send_busy = False
self.send_queue = []
self.priority_queue = []
def process_command(self, cmd, data):
"""
@ -176,18 +179,19 @@ class PJLinkCommands(object):
log.debug('({ip}) Processing command "{cmd}" with data "{data}"'.format(ip=self.ip,
cmd=cmd,
data=data))
# Check if we have a future command not available yet
_cmd = cmd.upper()
# cmd should already be in uppercase, but data may be in mixed-case.
# Due to some replies should stay as mixed-case, validate using separate uppercase check
_data = data.upper()
if _cmd not in PJLINK_VALID_CMD:
log.error("({ip}) Ignoring command='{cmd}' (Invalid/Unknown)".format(ip=self.ip, cmd=cmd))
# Check if we have a future command not available yet
if cmd not in PJLINK_VALID_CMD:
log.error('({ip}) Ignoring command="{cmd}" (Invalid/Unknown)'.format(ip=self.ip, cmd=cmd))
return
elif _data == 'OK':
log.debug('({ip}) Command "{cmd}" returned OK'.format(ip=self.ip, cmd=cmd))
# A command returned successfully, no further processing needed
return
elif _cmd not in self.pjlink_functions:
log.warning("({ip}) Unable to process command='{cmd}' (Future option)".format(ip=self.ip, cmd=cmd))
# A command returned successfully, so do a query on command to verify status
return self.send_command(cmd=cmd)
elif cmd not in self.pjlink_functions:
log.warning('({ip}) Unable to process command="{cmd}" (Future option?)'.format(ip=self.ip, cmd=cmd))
return
elif _data in PJLINK_ERRORS:
# Oops - projector error
@ -211,12 +215,10 @@ class PJLinkCommands(object):
elif _data == PJLINK_ERRORS[E_PROJECTOR]:
# Projector/display error
self.change_status(E_PROJECTOR)
self.receive_data_signal()
return
# Command checks already passed
log.debug('({ip}) Calling function for {cmd}'.format(ip=self.ip, cmd=cmd))
self.receive_data_signal()
self.pjlink_functions[_cmd](data)
self.pjlink_functions[cmd](data)
def process_avmt(self, data):
"""
@ -259,19 +261,19 @@ class PJLinkCommands(object):
# : Received: '%1CLSS=Class 1' (Optoma)
# : Received: '%1CLSS=Version1' (BenQ)
if len(data) > 1:
log.warning("({ip}) Non-standard CLSS reply: '{data}'".format(ip=self.ip, data=data))
log.warning('({ip}) Non-standard CLSS reply: "{data}"'.format(ip=self.ip, data=data))
# Due to stupid projectors not following standards (Optoma, BenQ comes to mind),
# AND the different responses that can be received, the semi-permanent way to
# fix the class reply is to just remove all non-digit characters.
try:
clss = re.findall('\d', data)[0] # Should only be the first match
except IndexError:
log.error("({ip}) No numbers found in class version reply '{data}' - "
"defaulting to class '1'".format(ip=self.ip, data=data))
log.error('({ip}) No numbers found in class version reply "{data}" - '
'defaulting to class "1"'.format(ip=self.ip, data=data))
clss = '1'
elif not data.isdigit():
log.error("({ip}) NAN clss version reply '{data}' - "
"defaulting to class '1'".format(ip=self.ip, data=data))
log.error('({ip}) NAN CLSS version reply "{data}" - '
'defaulting to class "1"'.format(ip=self.ip, data=data))
clss = '1'
else:
clss = data
@ -289,7 +291,7 @@ class PJLinkCommands(object):
"""
if len(data) != PJLINK_ERST_DATA['DATA_LENGTH']:
count = PJLINK_ERST_DATA['DATA_LENGTH']
log.warning("{ip}) Invalid error status response '{data}': length != {count}".format(ip=self.ip,
log.warning('{ip}) Invalid error status response "{data}": length != {count}'.format(ip=self.ip,
data=data,
count=count))
return
@ -297,7 +299,7 @@ class PJLinkCommands(object):
datacheck = int(data)
except ValueError:
# Bad data - ignore
log.warning("({ip}) Invalid error status response '{data}'".format(ip=self.ip, data=data))
log.warning('({ip}) Invalid error status response "{data}"'.format(ip=self.ip, data=data))
return
if datacheck == 0:
self.projector_errors = None
@ -430,6 +432,51 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector PJLink name to "{data}"'.format(ip=self.ip, data=self.pjlink_name))
return
def process_pjlink(self, data):
"""
Process initial socket connection to terminal.
:param data: Initial packet with authentication scheme
"""
log.debug('({ip}) Processing PJLINK command'.format(ip=self.ip))
chk = data.split(' ')
if len(chk[0]) != 1:
# Invalid - after splitting, first field should be 1 character, either '0' or '1' only
log.error('({ip}) Invalid initial authentication scheme - aborting'.format(ip=self.ip))
return self.disconnect_from_host()
elif chk[0] == '0':
# Normal connection no authentication
if len(chk) > 1:
# Invalid data - there should be nothing after a normal authentication scheme
log.error('({ip}) Normal connection with extra information - aborting'.format(ip=self.ip))
return self.disconnect_from_host()
elif self.pin:
log.error('({ip}) Normal connection but PIN set - aborting'.format(ip=self.ip))
return self.disconnect_from_host()
else:
data_hash = None
elif chk[0] == '1':
if len(chk) < 2:
# Not enough information for authenticated connection
log.error('({ip}) Authenticated connection but not enough info - aborting'.format(ip=self.ip))
return self.disconnect_from_host()
elif not self.pin:
log.error('({ip}) Authenticate connection but no PIN - aborting'.format(ip=self.ip))
return self.disconnect_from_host()
else:
data_hash = str(qmd5_hash(salt=chk[1].encode('utf-8'), data=self.pin.encode('utf-8')),
encoding='ascii')
# Passed basic checks, so start connection
self.readyRead.connect(self.get_socket)
if not self.no_poll:
log.debug('({ip}) process_pjlink(): Starting timer'.format(ip=self.ip))
self.timer.setInterval(2000) # Set 2 seconds for initial information
self.timer.start()
self.change_status(S_CONNECTED)
log.debug('({ip}) process_pjlink(): Sending "CLSS" initial command'.format(ip=self.ip))
# Since this is an initial connection, make it a priority just in case
return self.send_command(cmd="CLSS", salt=data_hash, priority=True)
def process_powr(self, data):
"""
Power status. See PJLink specification for format.
@ -450,7 +497,7 @@ class PJLinkCommands(object):
self.send_command('INST')
else:
# Log unknown status response
log.warning('({ip}) Unknown power response: {data}'.format(ip=self.ip, data=data))
log.warning('({ip}) Unknown power response: "{data}"'.format(ip=self.ip, data=data))
return
def process_rfil(self, data):
@ -460,9 +507,9 @@ class PJLinkCommands(object):
if self.model_filter is None:
self.model_filter = data
else:
log.warning("({ip}) Filter model already set".format(ip=self.ip))
log.warning("({ip}) Saved model: '{old}'".format(ip=self.ip, old=self.model_filter))
log.warning("({ip}) New model: '{new}'".format(ip=self.ip, new=data))
log.warning('({ip}) Filter model already set'.format(ip=self.ip))
log.warning('({ip}) Saved model: "{old}"'.format(ip=self.ip, old=self.model_filter))
log.warning('({ip}) New model: "{new}"'.format(ip=self.ip, new=data))
def process_rlmp(self, data):
"""
@ -471,9 +518,9 @@ class PJLinkCommands(object):
if self.model_lamp is None:
self.model_lamp = data
else:
log.warning("({ip}) Lamp model already set".format(ip=self.ip))
log.warning("({ip}) Saved lamp: '{old}'".format(ip=self.ip, old=self.model_lamp))
log.warning("({ip}) New lamp: '{new}'".format(ip=self.ip, new=data))
log.warning('({ip}) Lamp model already set'.format(ip=self.ip))
log.warning('({ip}) Saved lamp: "{old}"'.format(ip=self.ip, old=self.model_lamp))
log.warning('({ip}) New lamp: "{new}"'.format(ip=self.ip, new=data))
def process_snum(self, data):
"""
@ -482,16 +529,16 @@ class PJLinkCommands(object):
:param data: Serial number from projector.
"""
if self.serial_no is None:
log.debug("({ip}) Setting projector serial number to '{data}'".format(ip=self.ip, data=data))
log.debug('({ip}) Setting projector serial number to "{data}"'.format(ip=self.ip, data=data))
self.serial_no = data
self.db_update = False
else:
# Compare serial numbers and see if we got the same projector
if self.serial_no != data:
log.warning("({ip}) Projector serial number does not match saved serial number".format(ip=self.ip))
log.warning("({ip}) Saved: '{old}'".format(ip=self.ip, old=self.serial_no))
log.warning("({ip}) Received: '{new}'".format(ip=self.ip, new=data))
log.warning("({ip}) NOT saving serial number".format(ip=self.ip))
log.warning('({ip}) Projector serial number does not match saved serial number'.format(ip=self.ip))
log.warning('({ip}) Saved: "{old}"'.format(ip=self.ip, old=self.serial_no))
log.warning('({ip}) Received: "{new}"'.format(ip=self.ip, new=data))
log.warning('({ip}) NOT saving serial number'.format(ip=self.ip))
self.serial_no_received = data
def process_sver(self, data):
@ -500,20 +547,20 @@ class PJLinkCommands(object):
"""
if len(data) > 32:
# Defined in specs max version is 32 characters
log.warning("Invalid software version - too long")
log.warning('Invalid software version - too long')
return
elif self.sw_version is None:
log.debug("({ip}) Setting projector software version to '{data}'".format(ip=self.ip, data=data))
log.debug('({ip}) Setting projector software version to "{data}"'.format(ip=self.ip, data=data))
self.sw_version = data
self.db_update = True
else:
# Compare software version and see if we got the same projector
if self.serial_no != data:
log.warning("({ip}) Projector software version does not match saved "
"software version".format(ip=self.ip))
log.warning("({ip}) Saved: '{old}'".format(ip=self.ip, old=self.sw_version))
log.warning("({ip}) Received: '{new}'".format(ip=self.ip, new=data))
log.warning("({ip}) Saving new serial number as sw_version_received".format(ip=self.ip))
log.warning('({ip}) Projector software version does not match saved '
'software version'.format(ip=self.ip))
log.warning('({ip}) Saved: "{old}"'.format(ip=self.ip, old=self.sw_version))
log.warning('({ip}) Received: "{new}"'.format(ip=self.ip, new=data))
log.warning('({ip}) Saving new serial number as sw_version_received'.format(ip=self.ip))
self.sw_version_received = data
@ -540,7 +587,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
:param poll_time: Time (in seconds) to poll connected projector
:param socket_timeout: Time (in seconds) to abort the connection if no response
"""
log.debug('PJlink(projector={projector}, args={args} kwargs={kwargs})'.format(projector=projector,
log.debug('PJlink(projector="{projector}", args="{args}" kwargs="{kwargs}")'.format(projector=projector,
args=args,
kwargs=kwargs))
super().__init__()
@ -573,6 +620,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.widget = None # QListBox entry
self.timer = None # Timer that calls the poll_loop
self.send_queue = []
self.priority_queue = []
self.send_busy = False
# Socket timer for some possible brain-dead projectors or network cable pulled
self.socket_timer = None
@ -586,6 +634,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.connected.connect(self.check_login)
self.disconnected.connect(self.disconnect_from_host)
self.error.connect(self.get_error)
self.projectorReceivedData.connect(self._send_command)
def thread_stopped(self):
"""
@ -608,6 +657,10 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.projectorReceivedData.disconnect(self._send_command)
except TypeError:
pass
try:
self.readyRead.disconnect(self.get_socket) # Set in process_pjlink
except TypeError:
pass
self.disconnect_from_host()
self.deleteLater()
self.i_am_running = False
@ -625,10 +678,10 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
Retrieve information from projector that changes.
Normally called by timer().
"""
if self.state() != self.ConnectedState:
log.warning("({ip}) poll_loop(): Not connected - returning".format(ip=self.ip))
if self.state() != S_QSOCKET_STATE['ConnectedState']:
log.warning('({ip}) poll_loop(): Not connected - returning'.format(ip=self.ip))
return
log.debug('({ip}) Updating projector status'.format(ip=self.ip))
log.debug('({ip}) poll_loop(): Updating projector status'.format(ip=self.ip))
# Reset timer in case we were called from a set command
if self.timer.interval() < self.poll_time:
# Reset timer to 5 seconds
@ -640,28 +693,28 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
if self.pjlink_class == '2':
check_list.extend(['FILT', 'FREZ'])
for command in check_list:
self.send_command(command, queue=True)
self.send_command(command)
# The following commands do not change, so only check them once
if self.power == S_ON and self.source_available is None:
self.send_command('INST', queue=True)
self.send_command('INST')
if self.other_info is None:
self.send_command('INFO', queue=True)
self.send_command('INFO')
if self.manufacturer is None:
self.send_command('INF1', queue=True)
self.send_command('INF1')
if self.model is None:
self.send_command('INF2', queue=True)
self.send_command('INF2')
if self.pjlink_name is None:
self.send_command('NAME', queue=True)
self.send_command('NAME')
if self.pjlink_class == '2':
# Class 2 specific checks
if self.serial_no is None:
self.send_command('SNUM', queue=True)
self.send_command('SNUM')
if self.sw_version is None:
self.send_command('SVER', queue=True)
self.send_command('SVER')
if self.model_filter is None:
self.send_command('RFIL', queue=True)
self.send_command('RFIL')
if self.model_lamp is None:
self.send_command('RLMP', queue=True)
self.send_command('RLMP')
def _get_status(self, status):
"""
@ -713,14 +766,12 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
code=status_code,
message=status_message if msg is None else msg))
self.changeStatus.emit(self.ip, status, message)
self.projectorUpdateIcons.emit()
@QtCore.pyqtSlot()
def check_login(self, data=None):
"""
Processes the initial connection and authentication (if needed).
Starts poll timer if connection is established.
NOTE: Qt md5 hash function doesn't work with projector authentication. Use the python md5 hash function.
Processes the initial connection and convert to a PJLink packet if valid initial connection
:param data: Optional data if called from another routine
"""
@ -733,12 +784,12 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.change_status(E_SOCKET_TIMEOUT)
return
read = self.readLine(self.max_size)
self.readLine(self.max_size) # Clean out the trailing \r\n
self.readLine(self.max_size) # Clean out any trailing whitespace
if read is None:
log.warning('({ip}) read is None - socket error?'.format(ip=self.ip))
return
elif len(read) < 8:
log.warning('({ip}) Not enough data read)'.format(ip=self.ip))
log.warning('({ip}) Not enough data read - skipping'.format(ip=self.ip))
return
data = decode(read, 'utf-8')
# Possibility of extraneous data on input when reading.
@ -750,9 +801,16 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
# PJLink initial login will be:
# 'PJLink 0' - Unauthenticated login - no extra steps required.
# 'PJLink 1 XXXXXX' Authenticated login - extra processing required.
if not data.upper().startswith('PJLINK'):
# Invalid response
if not data.startswith('PJLINK'):
# Invalid initial packet - close socket
log.error('({ip}) Invalid initial packet received - closing socket'.format(ip=self.ip))
return self.disconnect_from_host()
log.debug('({ip}) check_login(): Formatting initial connection prompt to PJLink packet'.format(ip=self.ip))
return self.get_data('{start}{clss}{data}'.format(start=PJLINK_PREFIX,
clss='1',
data=data.replace(' ', '=', 1)).encode('utf-8'))
# TODO: The below is replaced by process_pjlink() - remove when working properly
"""
if '=' in data:
# Processing a login reply
data_check = data.strip().split('=')
@ -801,18 +859,19 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
log.debug('({ip}) Starting timer'.format(ip=self.ip))
self.timer.setInterval(2000) # Set 2 seconds for initial information
self.timer.start()
"""
def _trash_buffer(self, msg=None):
"""
Clean out extraneous stuff in the buffer.
"""
log.warning("({ip}) {message}".format(ip=self.ip, message='Invalid packet' if msg is None else msg))
log.warning('({ip}) {message}'.format(ip=self.ip, message='Invalid packet' if msg is None else msg))
self.send_busy = False
trash_count = 0
while self.bytesAvailable() > 0:
trash = self.read(self.max_size)
trash_count += len(trash)
log.debug("({ip}) Finished cleaning buffer - {count} bytes dropped".format(ip=self.ip,
log.debug('({ip}) Finished cleaning buffer - {count} bytes dropped'.format(ip=self.ip,
count=trash_count))
return
@ -824,7 +883,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
:param data: Data to process. buffer must be formatted as a proper PJLink packet.
:param ip: Destination IP for buffer.
"""
log.debug("({ip}) get_buffer(data='{buff}' ip='{ip_in}'".format(ip=self.ip, buff=data, ip_in=ip))
log.debug('({ip}) get_buffer(data="{buff}" ip="{ip_in}"'.format(ip=self.ip, buff=data, ip_in=ip))
if ip is None:
log.debug("({ip}) get_buffer() Don't know who data is for - exiting".format(ip=self.ip))
return
@ -842,38 +901,52 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
return
# Although we have a packet length limit, go ahead and use a larger buffer
read = self.readLine(1024)
log.debug("({ip}) get_socket(): '{buff}'".format(ip=self.ip, buff=read))
log.debug('({ip}) get_socket(): "{buff}"'.format(ip=self.ip, buff=read))
if read == -1:
# No data available
log.debug('({ip}) get_socket(): No data available (-1)'.format(ip=self.ip))
return self.receive_data_signal()
self.socket_timer.stop()
return self.get_data(buff=read, ip=self.ip)
self.get_data(buff=read, ip=self.ip)
return self.receive_data_signal()
def get_data(self, buff, ip):
def get_data(self, buff, ip=None):
"""
Process received data
:param buff: Data to process.
:param ip: (optional) Destination IP.
"""
log.debug("({ip}) get_data(ip='{ip_in}' buffer='{buff}'".format(ip=self.ip, ip_in=ip, buff=buff))
# Since "self" is not available to options and the "ip" keyword is a "maybe I'll use in the future",
# set to default here
if ip is None:
ip = self.ip
log.debug('({ip}) get_data(ip="{ip_in}" buffer="{buff}"'.format(ip=self.ip, ip_in=ip, buff=buff))
# NOTE: Class2 has changed to some values being UTF-8
data_in = decode(buff, 'utf-8')
data = data_in.strip()
if (len(data) < 7) or (not data.startswith(PJLINK_PREFIX)):
return self._trash_buffer(msg='get_data(): Invalid packet - length or prefix')
# Initial packet checks
if (len(data) < 7):
return self._trash_buffer(msg='get_data(): Invalid packet - length')
elif len(data) > self.max_size:
return self._trash_buffer(msg='get_data(): Invalid packet - too long')
elif not data.startswith(PJLINK_PREFIX):
return self._trash_buffer(msg='get_data(): Invalid packet - PJLink prefix missing')
elif '=' not in data:
return self._trash_buffer(msg='get_data(): Invalid packet does not have equal')
return self._trash_buffer(msg='get_data(): Invalid reply - Does not have "="')
log.debug('({ip}) get_data(): Checking new data "{data}"'.format(ip=self.ip, data=data))
header, data = data.split('=')
# At this point, the header should contain:
# "PVCCCC"
# Where:
# P = PJLINK_PREFIX
# V = PJLink class or version
# C = PJLink command
try:
version, cmd = header[1], header[2:]
version, cmd = header[1], header[2:].upper()
except ValueError as e:
self.change_status(E_INVALID_DATA)
log.warning('({ip}) get_data(): Received data: "{data}"'.format(ip=self.ip, data=data_in.strip()))
log.warning('({ip}) get_data(): Received data: "{data}"'.format(ip=self.ip, data=data_in))
return self._trash_buffer('get_data(): Expected header + command + data')
if cmd not in PJLINK_VALID_CMD:
log.warning('({ip}) get_data(): Invalid packet - unknown command "{data}"'.format(ip=self.ip, data=cmd))
@ -881,6 +954,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
if int(self.pjlink_class) < int(version):
log.warning('({ip}) get_data(): Projector returned class reply higher '
'than projector stated class'.format(ip=self.ip))
self.send_busy = False
return self.process_command(cmd, data)
@QtCore.pyqtSlot(QtNetwork.QAbstractSocket.SocketError)
@ -910,19 +984,18 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.reset_information()
return
def send_command(self, cmd, opts='?', salt=None, queue=False):
def send_command(self, cmd, opts='?', salt=None, priority=False):
"""
Add command to output queue if not already in queue.
:param cmd: Command to send
:param opts: Command option (if any) - defaults to '?' (get information)
:param salt: Optional salt for md5 hash initial authentication
:param queue: Option to force add to queue rather than sending directly
:param priority: Option to send packet now rather than queue it up
"""
if self.state() != self.ConnectedState:
log.warning('({ip}) send_command(): Not connected - returning'.format(ip=self.ip))
self.send_queue = []
return
return self.reset_information()
if cmd not in PJLINK_VALID_CMD:
log.error('({ip}) send_command(): Invalid command requested - ignoring.'.format(ip=self.ip))
return
@ -939,28 +1012,26 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
header = PJLINK_HEADER.format(linkclass=cmd_ver[0])
else:
# NOTE: Once we get to version 3 then think about looping
log.error('({ip}): send_command(): PJLink class check issue? aborting'.format(ip=self.ip))
log.error('({ip}): send_command(): PJLink class check issue? Aborting'.format(ip=self.ip))
return
out = '{salt}{header}{command} {options}{suffix}'.format(salt="" if salt is None else salt,
header=header,
command=cmd,
options=opts,
suffix=CR)
if out in self.send_queue:
# Already there, so don't add
log.debug('({ip}) send_command(out="{data}") Already in queue - skipping'.format(ip=self.ip,
data=out.strip()))
elif not queue and len(self.send_queue) == 0:
# Nothing waiting to send, so just send it
log.debug('({ip}) send_command(out="{data}") Sending data'.format(ip=self.ip, data=out.strip()))
return self._send_command(data=out)
if out in self.priority_queue:
log.debug('({ip}) send_command(): Already in priority queue - skipping'.format(ip=self.ip))
elif out in self.send_queue:
log.debug('({ip}) send_command(): Already in normal queue - skipping'.format(ip=self.ip))
else:
log.debug('({ip}) send_command(out="{data}") adding to queue'.format(ip=self.ip, data=out.strip()))
if priority:
log.debug('({ip}) send_command(): Adding to priority queue'.format(ip=self.ip))
self.priority_queue.append(out)
else:
log.debug('({ip}) send_command(): Adding to normal queue'.format(ip=self.ip))
self.send_queue.append(out)
self.projectorReceivedData.emit()
log.debug('({ip}) send_command(): send_busy is {data}'.format(ip=self.ip, data=self.send_busy))
if not self.send_busy:
log.debug('({ip}) send_command() calling _send_string()'.format(ip=self.ip))
if self.priority_queue or self.send_queue:
# May be some initial connection setup so make sure we send data
self._send_command()
@QtCore.pyqtSlot()
@ -971,43 +1042,53 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
:param data: Immediate data to send
:param utf8: Send as UTF-8 string otherwise send as ASCII string
"""
log.debug('({ip}) _send_string()'.format(ip=self.ip))
log.debug('({ip}) _send_string(): Connection status: {data}'.format(ip=self.ip, data=self.state()))
# Funny looking data check, but it's a quick check for data=None
log.debug('({ip}) _send_command(data="{data}")'.format(ip=self.ip, data=data.strip() if data else data))
log.debug('({ip}) _send_command(): Connection status: {data}'.format(ip=self.ip,
data=S_QSOCKET_STATE[self.state()]))
if self.state() != self.ConnectedState:
log.debug('({ip}) _send_string() Not connected - abort'.format(ip=self.ip))
self.send_queue = []
log.debug('({ip}) _send_command() Not connected - abort'.format(ip=self.ip))
self.send_busy = False
return
return self.disconnect_from_host()
if data and data not in self.priority_queue:
log.debug('({ip}) _send_command(): Priority packet - adding to priority queue'.format(ip=self.ip))
self.priority_queue.append(data)
if self.send_busy:
# Still waiting for response from last command sent
log.debug('({ip}) _send_command(): Still busy, returning'.format(ip=self.ip))
log.debug('({ip}) _send_command(): Priority queue = {data}'.format(ip=self.ip, data=self.priority_queue))
log.debug('({ip}) _send_command(): Normal queue = {data}'.format(ip=self.ip, data=self.send_queue))
return
if data is not None:
out = data
log.debug('({ip}) _send_string(data="{data}")'.format(ip=self.ip, data=out.strip()))
if len(self.priority_queue) != 0:
out = self.priority_queue.pop(0)
log.debug('({ip}) _send_command(): Getting priority queued packet'.format(ip=self.ip))
elif len(self.send_queue) != 0:
out = self.send_queue.pop(0)
log.debug('({ip}) _send_string(queued data="{data}"%s)'.format(ip=self.ip, data=out.strip()))
log.debug('({ip}) _send_command(): Getting normal queued packet'.format(ip=self.ip))
else:
# No data to send
log.debug('({ip}) _send_string(): No data to send'.format(ip=self.ip))
log.debug('({ip}) _send_command(): No data to send'.format(ip=self.ip))
self.send_busy = False
return
self.send_busy = True
log.debug('({ip}) _send_string(): Sending "{data}"'.format(ip=self.ip, data=out.strip()))
log.debug('({ip}) _send_string(): Queue = {data}'.format(ip=self.ip, data=self.send_queue))
log.debug('({ip}) _send_command(): Sending "{data}"'.format(ip=self.ip, data=out.strip()))
self.socket_timer.start()
sent = self.write(out.encode('{string_encoding}'.format(string_encoding='utf-8' if utf8 else 'ascii')))
self.waitForBytesWritten(2000) # 2 seconds should be enough
if sent == -1:
# Network error?
log.warning("({ip}) _send_command(): -1 received".format(ip=self.ip))
log.warning('({ip}) _send_command(): -1 received - disconnecting from host'.format(ip=self.ip))
self.change_status(E_NETWORK,
translate('OpenLP.PJLink', 'Error while sending data to projector'))
self.disconnect_from_host()
def connect_to_host(self):
"""
Initiate connection to projector.
"""
log.debug('{ip}) connect_to_host(): Starting connection'.format(ip=self.ip))
if self.state() == self.ConnectedState:
log.warning('({ip}) connect_to_host(): Already connected - returning'.format(ip=self.ip))
return
@ -1023,22 +1104,19 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
if abort:
log.warning('({ip}) disconnect_from_host(): Aborting connection'.format(ip=self.ip))
else:
log.warning('({ip}) disconnect_from_host(): Not connected - returning'.format(ip=self.ip))
self.reset_information()
log.warning('({ip}) disconnect_from_host(): Not connected'.format(ip=self.ip))
self.disconnectFromHost()
try:
self.readyRead.disconnect(self.get_socket)
except TypeError:
pass
log.debug('({ip}) disconnect_from_host() '
'Current status {data}'.format(ip=self.ip, data=self._get_status(self.status_connect)[0]))
if abort:
self.change_status(E_NOT_CONNECTED)
else:
log.debug('({ip}) disconnect_from_host() '
'Current status {data}'.format(ip=self.ip, data=self._get_status(self.status_connect)[0]))
if self.status_connect != E_NOT_CONNECTED:
self.change_status(S_NOT_CONNECTED)
self.reset_information()
self.projectorUpdateIcons.emit()
def get_av_mute_status(self):
"""

View File

@ -23,12 +23,11 @@
Package to test the openlp.core.projectors.pjlink base package.
"""
from unittest import TestCase
from unittest.mock import patch
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink
from tests.resources.projector.data import TEST_PIN, TEST_CONNECT_AUTHENTICATE, TEST_HASH, TEST1_DATA
from tests.resources.projector.data import TEST1_DATA
class TestPJLinkBugs(TestCase):
@ -80,43 +79,17 @@ class TestPJLinkBugs(TestCase):
"""
Test bug 1593882 no pin and authenticated request exception
"""
# GIVEN: Test object and mocks
mock_socket_timer = patch.object(self.pjlink_test, 'socket_timer').start()
mock_timer = patch.object(self.pjlink_test, 'timer').start()
mock_authentication = patch.object(self.pjlink_test, 'projectorAuthentication').start()
mock_ready_read = patch.object(self.pjlink_test, 'waitForReadyRead').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
pjlink.pin = None
mock_ready_read.return_value = True
# WHEN: call with authentication request and pin not set
pjlink.check_login(data=TEST_CONNECT_AUTHENTICATE)
# THEN: 'No Authentication' signal should have been sent
mock_authentication.emit.assert_called_with(pjlink.ip)
# Test now part of test_projector_pjlink_commands_02
# Keeping here for bug reference
pass
def test_bug_1593883_pjlink_authentication(self):
"""
Test bugfix 1593883 pjlink authentication
Test bugfix 1593883 pjlink authentication and ticket 92187
"""
# GIVEN: Test object and data
mock_socket_timer = patch.object(self.pjlink_test, 'socket_timer').start()
mock_timer = patch.object(self.pjlink_test, 'timer').start()
mock_send_command = patch.object(self.pjlink_test, 'write').start()
mock_state = patch.object(self.pjlink_test, 'state').start()
mock_waitForReadyRead = patch.object(self.pjlink_test, 'waitForReadyRead').start()
pjlink = self.pjlink_test
pjlink.pin = TEST_PIN
mock_state.return_value = pjlink.ConnectedState
mock_waitForReadyRead.return_value = True
# WHEN: Athenticated connection is called
pjlink.check_login(data=TEST_CONNECT_AUTHENTICATE)
# THEN: send_command should have the proper authentication
self.assertEqual("{test}".format(test=mock_send_command.call_args),
"call(b'{hash}%1CLSS ?\\r')".format(hash=TEST_HASH))
# Test now part of test_projector_pjlink_commands_02
# Keeping here for bug reference
pass
def test_bug_1734275_process_lamp_nonstandard_reply(self):
"""

View File

@ -25,11 +25,11 @@ Package to test the openlp.core.projectors.pjlink base package.
from unittest import TestCase
from unittest.mock import call, patch, MagicMock
from openlp.core.projectors.constants import E_PARAMETER, ERROR_STRING, S_ON, S_CONNECTED
from openlp.core.projectors.constants import E_PARAMETER, ERROR_STRING, S_ON, S_CONNECTED, S_QSOCKET_STATE
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink
from tests.resources.projector.data import TEST_PIN, TEST_SALT, TEST_CONNECT_AUTHENTICATE, TEST1_DATA
from tests.resources.projector.data import TEST1_DATA
pjlink_test = PJLink(Projector(**TEST1_DATA), no_poll=True)
@ -38,29 +38,17 @@ class TestPJLinkBase(TestCase):
"""
Tests for the PJLink module
"""
@patch.object(pjlink_test, 'readyRead')
@patch.object(pjlink_test, 'send_command')
@patch.object(pjlink_test, 'waitForReadyRead')
@patch('openlp.core.common.qmd5_hash')
def test_authenticated_connection_call(self,
mock_qmd5_hash,
mock_waitForReadyRead,
mock_send_command,
mock_readyRead):
"""
Ticket 92187: Fix for projector connect with PJLink authentication exception.
"""
# GIVEN: Test object
pjlink = pjlink_test
def setUp(self):
'''
TestPJLinkCommands part 2 initialization
'''
self.pjlink_test = PJLink(Projector(**TEST1_DATA), no_poll=True)
# WHEN: Calling check_login with authentication request:
pjlink.check_login(data=TEST_CONNECT_AUTHENTICATE)
# THEN: Should have called qmd5_hash
self.assertTrue(mock_qmd5_hash.called_with(TEST_SALT,
"Connection request should have been called with TEST_SALT"))
self.assertTrue(mock_qmd5_hash.called_with(TEST_PIN,
"Connection request should have been called with TEST_PIN"))
def tearDown(self):
'''
TestPJLinkCommands part 2 cleanups
'''
self.pjlink_test = None
@patch.object(pjlink_test, 'change_status')
def test_status_change(self, mock_change_status):
@ -110,18 +98,18 @@ class TestPJLinkBase(TestCase):
# THEN: poll_loop should exit without calling any other method
self.assertFalse(pjlink.timer.called, 'Should have returned without calling any other method')
@patch.object(pjlink_test, 'send_command')
def test_poll_loop_start(self, mock_send_command):
def test_poll_loop_start(self):
"""
Test PJLink.poll_loop makes correct calls
"""
# GIVEN: test object and test data
pjlink = pjlink_test
pjlink.state = MagicMock()
pjlink.timer = MagicMock()
pjlink.timer.interval = MagicMock()
pjlink.timer.setInterval = MagicMock()
pjlink.timer.start = MagicMock()
# GIVEN: Mocks and test data
mock_state = patch.object(self.pjlink_test, 'state').start()
mock_state.return_value = S_QSOCKET_STATE['ConnectedState']
mock_timer = patch.object(self.pjlink_test, 'timer').start()
mock_timer.interval.return_value = 10
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
pjlink.poll_time = 20
pjlink.power = S_ON
pjlink.source_available = None
@ -130,19 +118,17 @@ class TestPJLinkBase(TestCase):
pjlink.model = None
pjlink.pjlink_name = None
pjlink.ConnectedState = S_CONNECTED
pjlink.timer.interval.return_value = 10
pjlink.state.return_value = S_CONNECTED
call_list = [
call('POWR', queue=True),
call('ERST', queue=True),
call('LAMP', queue=True),
call('AVMT', queue=True),
call('INPT', queue=True),
call('INST', queue=True),
call('INFO', queue=True),
call('INF1', queue=True),
call('INF2', queue=True),
call('NAME', queue=True),
call('POWR'),
call('ERST'),
call('LAMP'),
call('AVMT'),
call('INPT'),
call('INST'),
call('INFO'),
call('INF1'),
call('INF2'),
call('NAME'),
]
# WHEN: PJLink.poll_loop is called
@ -150,8 +136,8 @@ class TestPJLinkBase(TestCase):
# THEN: proper calls were made to retrieve projector data
# First, call to update the timer with the next interval
self.assertTrue(pjlink.timer.setInterval.called, 'Should have updated the timer')
self.assertTrue(mock_timer.setInterval.called)
# Next, should have called the timer to start
self.assertTrue(pjlink.timer.start.called, 'Should have started the timer')
self.assertTrue(mock_timer.start.called, 'Should have started the timer')
# Finally, should have called send_command with a list of projetctor status checks
mock_send_command.assert_has_calls(call_list, 'Should have queued projector queries')

View File

@ -46,6 +46,18 @@ class TestPJLinkRouting(TestCase):
"""
Tests for the PJLink module command routing
"""
def setUp(self):
'''
TestPJLinkCommands part 2 initialization
'''
self.pjlink_test = PJLink(Projector(**TEST1_DATA), no_poll=True)
def tearDown(self):
'''
TestPJLinkCommands part 2 cleanups
'''
self.pjlink_test = None
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_command_call_clss(self, mock_log):
"""
@ -163,20 +175,19 @@ class TestPJLinkRouting(TestCase):
mock_change_status.assert_called_once_with(E_AUTHENTICATION)
mock_log.error.assert_called_with(log_text)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_command_future(self, mock_log):
def test_process_command_future(self):
"""
Test command valid but no method to process yet
"""
# GIVEN: Test object
pjlink = pjlink_test
log_text = "(127.0.0.1) Unable to process command='CLSS' (Future option)"
mock_log.reset_mock()
# Remove a valid command so we can test valid command but not available yet
pjlink.pjlink_functions.pop('CLSS')
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_functions = patch.object(self.pjlink_test, 'pjlink_functions').start()
mock_functions.return_value = []
pjlink = self.pjlink_test
log_text = '(111.111.111.111) Unable to process command="CLSS" (Future option?)'
# WHEN: process_command called with an unknown command
with patch.object(pjlink, 'pjlink_functions') as mock_functions:
pjlink.process_command(cmd='CLSS', data='DONT CARE')
# THEN: Error should be logged and no command called
@ -196,29 +207,26 @@ class TestPJLinkRouting(TestCase):
# WHEN: process_command called with an unknown command
pjlink.process_command(cmd='Unknown', data='Dont Care')
log_text = "(127.0.0.1) Ignoring command='Unknown' (Invalid/Unknown)"
log_text = '(127.0.0.1) Ignoring command="Unknown" (Invalid/Unknown)'
# THEN: Error should be logged and no command called
self.assertFalse(mock_functions.called, 'Should not have gotten to the end of the method')
mock_log.error.assert_called_once_with(log_text)
@patch.object(pjlink_test, 'pjlink_functions')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_command_ok(self, mock_log, mock_functions):
def test_process_command_ok(self):
"""
Test command returned success
"""
# GIVEN: Test object
pjlink = pjlink_test
mock_functions.reset_mock()
mock_log.reset_mock()
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
# WHEN: process_command called with an unknown command
pjlink.process_command(cmd='CLSS', data='OK')
log_text = '(127.0.0.1) Command "CLSS" returned OK'
pjlink = self.pjlink_test
log_text = '(111.111.111.111) Command "POWR" returned OK'
# THEN: Error should be logged and no command called
self.assertFalse(mock_functions.called, 'Should not have gotten to the end of the method')
self.assertEqual(mock_log.debug.call_count, 2, 'log.debug() should have been called twice')
# Although we called it twice, only the last log entry is saved
# WHEN: process_command called with a command that returns OK
pjlink.process_command(cmd='POWR', data='OK')
# THEN: Appropriate calls should have been made
mock_log.debug.assert_called_with(log_text)
mock_send_command.assert_called_once_with(cmd='POWR')

View File

@ -47,7 +47,7 @@ for pos in range(0, len(PJLINK_ERST_DATA)):
class TestPJLinkCommands(TestCase):
"""
Tests for the PJLink module
Tests for the PJLinkCommands class part 1
"""
@patch.object(pjlink_test, 'changeStatus')
@patch.object(openlp.core.projectors.pjlink, 'log')
@ -580,7 +580,7 @@ class TestPJLinkCommands(TestCase):
# WHEN: Process invalid reply
pjlink.process_clss('Z')
log_text = "(127.0.0.1) NAN clss version reply 'Z' - defaulting to class '1'"
log_text = '(127.0.0.1) NAN CLSS version reply "Z" - defaulting to class "1"'
# THEN: Projector class should be set with default value
self.assertEqual(pjlink.pjlink_class, '1',
@ -597,7 +597,7 @@ class TestPJLinkCommands(TestCase):
# WHEN: Process invalid reply
pjlink.process_clss('Invalid')
log_text = "(127.0.0.1) No numbers found in class version reply 'Invalid' - defaulting to class '1'"
log_text = '(127.0.0.1) No numbers found in class version reply "Invalid" - defaulting to class "1"'
# THEN: Projector class should be set with default value
self.assertEqual(pjlink.pjlink_class, '1',
@ -627,7 +627,7 @@ class TestPJLinkCommands(TestCase):
# GIVEN: Test object
pjlink = pjlink_test
pjlink.projector_errors = None
log_text = "127.0.0.1) Invalid error status response '11111111': length != 6"
log_text = '127.0.0.1) Invalid error status response "11111111": length != 6'
# WHEN: process_erst called with invalid data (too many values
pjlink.process_erst('11111111')
@ -645,7 +645,7 @@ class TestPJLinkCommands(TestCase):
# GIVEN: Test object
pjlink = pjlink_test
pjlink.projector_errors = None
log_text = "(127.0.0.1) Invalid error status response '1111Z1'"
log_text = '(127.0.0.1) Invalid error status response "1111Z1"'
# WHEN: process_erst called with invalid data (too many values
pjlink.process_erst('1111Z1')
@ -671,7 +671,7 @@ class TestPJLinkCommands(TestCase):
# THEN: PJLink instance errors should match chk_value
for chk in pjlink.projector_errors:
self.assertEqual(pjlink.projector_errors[chk], chk_string,
"projector_errors['{chk}'] should have been set to {err}".format(chk=chk,
'projector_errors["{chk}"] should have been set to "{err}"'.format(chk=chk,
err=chk_string))
def test_projector_process_erst_all_error(self):
@ -690,7 +690,7 @@ class TestPJLinkCommands(TestCase):
# THEN: PJLink instance errors should match chk_value
for chk in pjlink.projector_errors:
self.assertEqual(pjlink.projector_errors[chk], chk_string,
"projector_errors['{chk}'] should have been set to {err}".format(chk=chk,
'projector_errors["{chk}"] should have been set to "{err}"'.format(chk=chk,
err=chk_string))
def test_projector_process_erst_warn_cover_only(self):
@ -744,9 +744,9 @@ class TestPJLinkCommands(TestCase):
pjlink = pjlink_test
pjlink.source_available = []
test_data = '21 10 30 31 11 20'
test_saved = ['10', '11', '20', '21', '30', '31']
log_data = '(127.0.0.1) Setting projector sources_available to ' \
'"[\'10\', \'11\', \'20\', \'21\', \'30\', \'31\']"'
test_saved = ["10", "11", "20", "21", "30", "31"]
log_data = "(127.0.0.1) Setting projector sources_available to " \
"\"['10', '11', '20', '21', '30', '31']\""
mock_UpdateIcons.reset_mock()
mock_log.reset_mock()
@ -1021,7 +1021,7 @@ class TestPJLinkCommands(TestCase):
pjlink.sw_version = None
pjlink.sw_version_received = None
test_data = 'Test 1 Subtest 1'
test_log = "(127.0.0.1) Setting projector software version to 'Test 1 Subtest 1'"
test_log = '(127.0.0.1) Setting projector software version to "Test 1 Subtest 1"'
mock_log.reset_mock()
# WHEN: process_sver called with invalid data

View File

@ -0,0 +1,198 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2015 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
Package to test the openlp.core.projectors.pjlink commands package.
"""
from unittest import TestCase
from unittest.mock import patch, call
import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import S_CONNECTED
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink
from tests.resources.projector.data import TEST_HASH, TEST_PIN, TEST_SALT, TEST1_DATA
class TestPJLinkCommands(TestCase):
"""
Tests for the PJLinkCommands class part 2
"""
def setUp(self):
'''
TestPJLinkCommands part 2 initialization
'''
self.pjlink_test = PJLink(Projector(**TEST1_DATA), no_poll=True)
def tearDown(self):
'''
TestPJLinkCommands part 2 cleanups
'''
self.pjlink_test = None
def test_process_pjlink_normal(self):
"""
Test initial connection prompt with no authentication
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, "log").start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
mock_readyRead = patch.object(self.pjlink_test, 'readyRead').start()
mock_change_status = patch.object(self.pjlink_test, 'change_status').start()
pjlink = self.pjlink_test
pjlink.pin = None
log_check = [call("({111.111.111.111}) process_pjlink(): Sending 'CLSS' initial command'"), ]
# WHEN: process_pjlink called with no authentication required
pjlink.process_pjlink(data="0")
# THEN: proper processing should have occured
mock_log.debug.has_calls(log_check)
mock_disconnect_from_host.assert_not_called()
self.assertEqual(mock_readyRead.connect.call_count, 1, 'Should have only been called once')
mock_change_status.assert_called_once_with(S_CONNECTED)
mock_send_command.assert_called_with(cmd='CLSS', priority=True, salt=None)
def test_process_pjlink_authenticate(self):
"""
Test initial connection prompt with authentication
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, "log").start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
mock_readyRead = patch.object(self.pjlink_test, 'readyRead').start()
mock_change_status = patch.object(self.pjlink_test, 'change_status').start()
pjlink = self.pjlink_test
pjlink.pin = TEST_PIN
log_check = [call("({111.111.111.111}) process_pjlink(): Sending 'CLSS' initial command'"), ]
# WHEN: process_pjlink called with no authentication required
pjlink.process_pjlink(data='1 {salt}'.format(salt=TEST_SALT))
# THEN: proper processing should have occured
mock_log.debug.has_calls(log_check)
mock_disconnect_from_host.assert_not_called()
self.assertEqual(mock_readyRead.connect.call_count, 1, 'Should have only been called once')
mock_change_status.assert_called_once_with(S_CONNECTED)
mock_send_command.assert_called_with(cmd='CLSS', priority=True, salt=TEST_HASH)
def test_process_pjlink_normal_pin_set_error(self):
"""
Test process_pjlinnk called with no authentication but pin is set
"""
# GIVEN: Initial mocks and data
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
pjlink.pin = TEST_PIN
log_check = [call('(111.111.111.111) Normal connection but PIN set - aborting'), ]
# WHEN: process_pjlink called with invalid authentication scheme
pjlink.process_pjlink(data='0')
# THEN: Proper calls should be made
mock_log.error.assert_has_calls(log_check)
self.assertEqual(mock_disconnect_from_host.call_count, 1, 'Should have only been called once')
mock_send_command.assert_not_called()
def test_process_pjlink_normal_with_salt_error(self):
"""
Test process_pjlinnk called with no authentication but pin is set
"""
# GIVEN: Initial mocks and data
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
pjlink.pin = TEST_PIN
log_check = [call('(111.111.111.111) Normal connection with extra information - aborting'), ]
# WHEN: process_pjlink called with invalid authentication scheme
pjlink.process_pjlink(data='0 {salt}'.format(salt=TEST_SALT))
# THEN: Proper calls should be made
mock_log.error.assert_has_calls(log_check)
self.assertEqual(mock_disconnect_from_host.call_count, 1, 'Should have only been called once')
mock_send_command.assert_not_called()
def test_process_pjlink_invalid_authentication_scheme_length_error(self):
"""
Test initial connection prompt with authentication scheme longer than 1 character
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
pjlink = self.pjlink_test
log_check = [call('(111.111.111.111) Invalid initial authentication scheme - aborting'), ]
# WHEN: process_pjlink called with invalid authentication scheme
pjlink.process_pjlink(data='01')
# THEN: socket should be closed and invalid data logged
mock_log.error.assert_has_calls(log_check)
self.assertEqual(mock_disconnect_from_host.call_count, 1, 'Should have only been called once')
mock_send_command.assert_not_called()
def test_process_pjlink_invalid_authentication_data_length_error(self):
"""
Test initial connection prompt with authentication no salt
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
log_check = [call('(111.111.111.111) Authenticated connection but not enough info - aborting'), ]
pjlink = self.pjlink_test
# WHEN: process_pjlink called with no salt
pjlink.process_pjlink(data='1')
# THEN: socket should be closed and invalid data logged
mock_log.error.assert_has_calls(log_check)
self.assertEqual(mock_disconnect_from_host.call_count, 1, 'Should have only been called once')
mock_send_command.assert_not_called()
def test_process_pjlink_authenticate_pin_not_set_error(self):
"""
Test process_pjlink authentication but pin not set
"""
# GIVEN: Initial mocks and data
mock_log = patch.object(openlp.core.projectors.pjlink, 'log').start()
mock_disconnect_from_host = patch.object(self.pjlink_test, 'disconnect_from_host').start()
mock_send_command = patch.object(self.pjlink_test, 'send_command').start()
log_check = [call('(111.111.111.111) Authenticate connection but no PIN - aborting'), ]
pjlink = self.pjlink_test
pjlink.pin = None
# WHEN: process_pjlink called with no salt
pjlink.process_pjlink(data='1 {salt}'.format(salt=TEST_SALT))
# THEN: socket should be closed and invalid data logged
mock_log.error.assert_has_calls(log_check)
self.assertEqual(mock_disconnect_from_host.call_count, 1, 'Should have only been called once')
mock_send_command.assert_not_called()