This commit is contained in:
Tim Bentley 2018-04-20 18:35:57 +01:00
commit 50511dc31d
12 changed files with 466 additions and 506 deletions

View File

@ -44,7 +44,7 @@ log = logging.getLogger(__name__ + '.__init__')
FIRST_CAMEL_REGEX = re.compile('(.)([A-Z][a-z]+)') FIRST_CAMEL_REGEX = re.compile('(.)([A-Z][a-z]+)')
SECOND_CAMEL_REGEX = re.compile('([a-z0-9])([A-Z])') SECOND_CAMEL_REGEX = re.compile('([a-z0-9])([A-Z])')
CONTROL_CHARS = re.compile(r'[\x00-\x1F\x7F-\x9F]') CONTROL_CHARS = re.compile(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]')
INVALID_FILE_CHARS = re.compile(r'[\\/:\*\?"<>\|\+\[\]%]') INVALID_FILE_CHARS = re.compile(r'[\\/:\*\?"<>\|\+\[\]%]')
IMAGES_FILTER = None IMAGES_FILTER = None
REPLACMENT_CHARS_MAP = str.maketrans({'\u2018': '\'', '\u2019': '\'', '\u201c': '"', '\u201d': '"', '\u2026': '...', REPLACMENT_CHARS_MAP = str.maketrans({'\u2018': '\'', '\u2019': '\'', '\u201c': '"', '\u201d': '"', '\u2026': '...',
@ -471,15 +471,15 @@ def get_file_encoding(file_path):
log.exception('Error detecting file encoding') log.exception('Error detecting file encoding')
def normalize_str(irreg_str): def normalize_str(irregular_string):
""" """
Normalize the supplied string. Remove unicode control chars and tidy up white space. Normalize the supplied string. Remove unicode control chars and tidy up white space.
:param str irreg_str: The string to normalize. :param str irregular_string: The string to normalize.
:return: The normalized string :return: The normalized string
:rtype: str :rtype: str
""" """
irreg_str = irreg_str.translate(REPLACMENT_CHARS_MAP) irregular_string = irregular_string.translate(REPLACMENT_CHARS_MAP)
irreg_str = CONTROL_CHARS.sub('', irreg_str) irregular_string = CONTROL_CHARS.sub('', irregular_string)
irreg_str = NEW_LINE_REGEX.sub('\n', irreg_str) irregular_string = NEW_LINE_REGEX.sub('\n', irregular_string)
return WHITESPACE_REGEX.sub(' ', irreg_str) return WHITESPACE_REGEX.sub(' ', irregular_string)

View File

@ -154,110 +154,137 @@ PROJECTOR_STATE = [
S_INFO S_INFO
] ]
# NOTE: Changed format to account for some commands are both class 1 and 2 # NOTE: Changed format to account for some commands are both class 1 and 2.
# Make sure the sequence of 'version' is lowest-to-highest.
PJLINK_VALID_CMD = { PJLINK_VALID_CMD = {
'ACKN': {'version': ['2', ], 'ACKN': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Acknowledge a PJLink SRCH command - returns MAC address.') 'Acknowledge a PJLink SRCH command - returns MAC address.')
}, },
'AVMT': {'version': ['1', ], 'AVMT': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Blank/unblank video and/or mute audio.') 'Blank/unblank video and/or mute audio.')
}, },
'CLSS': {'version': ['1', ], 'CLSS': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query projector PJLink class support.') 'Query projector PJLink class support.')
}, },
'ERST': {'version': ['1', '2'], 'ERST': {'version': ['1', '2'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query error status from projector. ' 'Query error status from projector. '
'Returns fan/lamp/temp/cover/filter/other error status.') 'Returns fan/lamp/temp/cover/filter/other error status.')
}, },
'FILT': {'version': ['2', ], 'FILT': {'version': ['2'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query number of hours on filter.') 'Query number of hours on filter.')
}, },
'FREZ': {'version': ['2', ], 'FREZ': {'version': ['2'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Freeze or unfreeze current image being projected.') 'Freeze or unfreeze current image being projected.')
}, },
'INF1': {'version': ['1', ], 'INF1': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query projector manufacturer name.') 'Query projector manufacturer name.')
}, },
'INF2': {'version': ['1', ], 'INF2': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query projector product name.') 'Query projector product name.')
}, },
'INFO': {'version': ['1', ], 'INFO': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query projector for other information set by manufacturer.') 'Query projector for other information set by manufacturer.')
}, },
'INNM': {'version': ['2', ], 'INNM': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query specified input source name') 'Query specified input source name')
}, },
'INPT': {'version': ['1', ], 'INPT': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Switch to specified video source.') 'Switch to specified video source.')
}, },
'INST': {'version': ['1', ], 'INST': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query available input sources.') 'Query available input sources.')
}, },
'IRES': {'version:': ['2', ], 'IRES': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query current input resolution.') 'Query current input resolution.')
}, },
'LAMP': {'version': ['1', ], 'LAMP': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query lamp time and on/off status. Multiple lamps supported.') 'Query lamp time and on/off status. Multiple lamps supported.')
}, },
'LKUP': {'version': ['2', ], 'LKUP': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'UDP Status - Projector is now available on network. Includes MAC address.') 'UDP Status - Projector is now available on network. Includes MAC address.')
}, },
'MVOL': {'version': ['2', ], 'MVOL': {'version': ['2'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Adjust microphone volume by 1 step.') 'Adjust microphone volume by 1 step.')
}, },
'NAME': {'version': ['1', ], 'NAME': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query customer-set projector name.') 'Query customer-set projector name.')
}, },
'PJLINK': {'version': ['1', ], 'PJLINK': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Initial connection with authentication/no authentication request.') 'Initial connection with authentication/no authentication request.')
}, },
'POWR': {'version': ['1', ], 'POWR': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Turn lamp on or off/standby.') 'Turn lamp on or off/standby.')
}, },
'RFIL': {'version': ['2', ], 'RFIL': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query replacement air filter model number.') 'Query replacement air filter model number.')
}, },
'RLMP': {'version': ['2', ], 'RLMP': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query replacement lamp model number.') 'Query replacement lamp model number.')
}, },
'RRES': {'version': ['2', ], 'RRES': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query recommended resolution.') 'Query recommended resolution.')
}, },
'SNUM': {'version': ['2', ], 'SNUM': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query projector serial number.') 'Query projector serial number.')
}, },
'SRCH': {'version': ['2', ], 'SRCH': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'UDP broadcast search request for available projectors. Reply is ACKN.') 'UDP broadcast search request for available projectors. Reply is ACKN.')
}, },
'SVER': {'version': ['2', ], 'SVER': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Query projector software version number.') 'Query projector software version number.')
}, },
'SVOL': {'version': ['2', ], 'SVOL': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants', 'description': translate('OpenLP.PJLinkConstants',
'Adjust speaker volume by 1 step.') 'Adjust speaker volume by 1 step.')
} }

View File

@ -58,10 +58,15 @@ class Ui_ProjectorEditForm(object):
# IP Address # IP Address
self.ip_label = QtWidgets.QLabel(edit_projector_dialog) self.ip_label = QtWidgets.QLabel(edit_projector_dialog)
self.ip_label.setObjectName('projector_edit_ip_label') self.ip_label.setObjectName('projector_edit_ip_label')
self.ip_text = QtWidgets.QLineEdit(edit_projector_dialog) self.ip_text_edit = QtWidgets.QLineEdit(edit_projector_dialog)
self.ip_text.setObjectName('projector_edit_ip_text') self.ip_text_edit.setObjectName('projector_edit_ip_text')
self.ip_text_label = QtWidgets.QLabel(edit_projector_dialog)
self.ip_text_label.setObjectName('projector_show_ip_text')
self.dialog_layout.addWidget(self.ip_label, 0, 0) self.dialog_layout.addWidget(self.ip_label, 0, 0)
self.dialog_layout.addWidget(self.ip_text, 0, 1) # For new projector, use edit widget
self.dialog_layout.addWidget(self.ip_text_edit, 0, 1)
# For edit projector, use show widget
self.dialog_layout.addWidget(self.ip_text_label, 0, 1)
# Port number # Port number
self.port_label = QtWidgets.QLabel(edit_projector_dialog) self.port_label = QtWidgets.QLabel(edit_projector_dialog)
self.port_label.setObjectName('projector_edit_ip_label') self.port_label.setObjectName('projector_edit_ip_label')
@ -111,8 +116,8 @@ class Ui_ProjectorEditForm(object):
title = translate('OpenLP.ProjectorEditForm', 'Edit Projector') title = translate('OpenLP.ProjectorEditForm', 'Edit Projector')
edit_projector_dialog.setWindowTitle(title) edit_projector_dialog.setWindowTitle(title)
self.ip_label.setText(translate('OpenLP.ProjectorEditForm', 'IP Address')) self.ip_label.setText(translate('OpenLP.ProjectorEditForm', 'IP Address'))
self.ip_text.setText(self.projector.ip) self.ip_text_edit.setText(self.projector.ip)
self.ip_text.setFocus() self.ip_text_label.setText(self.projector.ip)
self.port_label.setText(translate('OpenLP.ProjectorEditForm', 'Port Number')) self.port_label.setText(translate('OpenLP.ProjectorEditForm', 'Port Number'))
self.port_text.setText(str(self.projector.port)) self.port_text.setText(str(self.projector.port))
self.pin_label.setText(translate('OpenLP.ProjectorEditForm', 'PIN')) self.pin_label.setText(translate('OpenLP.ProjectorEditForm', 'PIN'))
@ -131,7 +136,7 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
Class to add or edit a projector entry in the database. Class to add or edit a projector entry in the database.
Fields that are editable: Fields that are editable:
ip = Column(String(100)) ip = Column(String(100)) (Only edit for new projector)
port = Column(String(8)) port = Column(String(8))
pin = Column(String(20)) pin = Column(String(20))
name = Column(String(20)) name = Column(String(20))
@ -154,9 +159,16 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
if projector is None: if projector is None:
self.projector = Projector() self.projector = Projector()
self.new_projector = True self.new_projector = True
self.ip_text_edit.setVisible(True)
self.ip_text_edit.setFocus()
self.ip_text_label.setVisible(False)
else: else:
self.projector = projector self.projector = projector
self.new_projector = False self.new_projector = False
self.ip_text_edit.setVisible(False)
self.ip_text_label.setVisible(True)
# Since it's already defined, IP address is unchangeable, so focus on port number
self.port_text.setFocus()
self.retranslateUi(self) self.retranslateUi(self)
reply = QtWidgets.QDialog.exec(self) reply = QtWidgets.QDialog.exec(self)
return reply return reply
@ -187,9 +199,12 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
record=record.id))) record=record.id)))
valid = False valid = False
return return
adx = self.ip_text.text() if self.new_projector:
# Only validate a new entry - otherwise it's been previously verified
adx = self.ip_text_edit.text()
valid = verify_ip_address(adx) valid = verify_ip_address(adx)
if valid: if valid:
# With a valid IP - check if it's already in database so we don't duplicate
ip = self.projectordb.get_projector_by_ip(adx) ip = self.projectordb.get_projector_by_ip(adx)
if ip is None: if ip is None:
valid = True valid = True
@ -201,7 +216,6 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
'IP address "{ip}"<br />is already in the database ' 'IP address "{ip}"<br />is already in the database '
'as ID {data}.<br /><br />Please Enter a different ' 'as ID {data}.<br /><br />Please Enter a different '
'IP address.'.format(ip=adx, data=ip.id))) 'IP address.'.format(ip=adx, data=ip.id)))
valid = False
return return
else: else:
QtWidgets.QMessageBox.warning(self, QtWidgets.QMessageBox.warning(self,
@ -223,7 +237,8 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
'Default PJLink port is {port}'.format(port=PJLINK_PORT))) 'Default PJLink port is {port}'.format(port=PJLINK_PORT)))
valid = False valid = False
if valid: if valid:
self.projector.ip = self.ip_text.text() if self.new_projector:
self.projector.ip = self.ip_text_edit.text()
self.projector.pin = self.pin_text.text() self.projector.pin = self.pin_text.text()
self.projector.port = int(self.port_text.text()) self.projector.port = int(self.port_text.text())
self.projector.name = self.name_text.text() self.projector.name = self.name_text.text()

View File

@ -36,24 +36,9 @@ from openlp.core.common.registry import RegistryBase
from openlp.core.common.settings import Settings from openlp.core.common.settings import Settings
from openlp.core.lib.ui import create_widget_action from openlp.core.lib.ui import create_widget_action
from openlp.core.projectors import DialogSourceStyle from openlp.core.projectors import DialogSourceStyle
from openlp.core.projectors.constants import \ from openlp.core.projectors.constants import E_AUTHENTICATION, E_ERROR, E_NETWORK, E_NOT_CONNECTED, \
E_AUTHENTICATION, \ E_UNKNOWN_SOCKET_ERROR, S_CONNECTED, S_CONNECTING, S_COOLDOWN, S_INITIALIZE, S_NOT_CONNECTED, S_OFF, S_ON, \
E_ERROR, \ S_STANDBY, S_WARMUP, STATUS_CODE, STATUS_MSG, QSOCKET_STATE
E_NETWORK, \
E_NOT_CONNECTED, \
E_UNKNOWN_SOCKET_ERROR, \
S_CONNECTED, \
S_CONNECTING, \
S_COOLDOWN, \
S_INITIALIZE, \
S_NOT_CONNECTED, \
S_OFF, \
S_ON, \
S_STANDBY, \
S_WARMUP, \
STATUS_CODE, \
STATUS_MSG, \
QSOCKET_STATE
from openlp.core.projectors.db import ProjectorDB from openlp.core.projectors.db import ProjectorDB
from openlp.core.projectors.editform import ProjectorEditForm from openlp.core.projectors.editform import ProjectorEditForm

View File

@ -57,8 +57,7 @@ from openlp.core.common.i18n import translate
from openlp.core.projectors.constants import CONNECTION_ERRORS, PJLINK_CLASS, PJLINK_DEFAULT_CODES, PJLINK_ERRORS, \ from openlp.core.projectors.constants import CONNECTION_ERRORS, PJLINK_CLASS, PJLINK_DEFAULT_CODES, PJLINK_ERRORS, \
PJLINK_ERST_DATA, PJLINK_ERST_STATUS, PJLINK_MAX_PACKET, PJLINK_PREFIX, PJLINK_PORT, PJLINK_POWR_STATUS, \ PJLINK_ERST_DATA, PJLINK_ERST_STATUS, PJLINK_MAX_PACKET, PJLINK_PREFIX, PJLINK_PORT, PJLINK_POWR_STATUS, \
PJLINK_SUFFIX, PJLINK_VALID_CMD, PROJECTOR_STATE, STATUS_CODE, STATUS_MSG, QSOCKET_STATE, \ PJLINK_SUFFIX, PJLINK_VALID_CMD, PROJECTOR_STATE, STATUS_CODE, STATUS_MSG, QSOCKET_STATE, \
E_AUTHENTICATION, E_CONNECTION_REFUSED, E_GENERAL, E_INVALID_DATA, E_NETWORK, E_NOT_CONNECTED, \ E_AUTHENTICATION, E_CONNECTION_REFUSED, E_GENERAL, E_NETWORK, E_NOT_CONNECTED, E_SOCKET_TIMEOUT, \
E_SOCKET_TIMEOUT, \
S_CONNECTED, S_CONNECTING, S_NOT_CONNECTED, S_OFF, S_OK, S_ON S_CONNECTED, S_CONNECTING, S_NOT_CONNECTED, S_OFF, S_OK, S_ON
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -93,22 +92,9 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
self.projector_list = projector_list self.projector_list = projector_list
self.port = port self.port = port
# Local defines # Local defines
self.ackn_list = {} # Replies from online projetors
self.search_active = False self.search_active = False
self.search_time = 30000 # 30 seconds for allowed time self.search_time = 30000 # 30 seconds for allowed time
self.search_timer = QtCore.QTimer() self.search_timer = QtCore.QTimer()
# New commands available in PJLink Class 2
# ACKN/SRCH is processed here since it's used to find available projectors
# Other commands are processed by the individual projector instances
self.pjlink_udp_functions = {
'ACKN': self.process_ackn, # Class 2, command is 'SRCH'
'ERST': None, # Class 1/2
'INPT': None, # Class 1/2
'LKUP': None, # Class 2 (reply only - no cmd)
'POWR': None, # Class 1/2
'SRCH': self.process_srch # Class 2 (reply is ACKN)
}
self.readyRead.connect(self.get_datagram) self.readyRead.connect(self.get_datagram)
log.debug('(UDP) PJLinkUDP() Initialized') log.debug('(UDP) PJLinkUDP() Initialized')
@ -118,88 +104,26 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
Retrieve packet and basic checks Retrieve packet and basic checks
""" """
log.debug('(UDP) get_datagram() - Receiving data') log.debug('(UDP) get_datagram() - Receiving data')
read = self.pendingDatagramSize() read_size = self.pendingDatagramSize()
if read < 0: if read_size < 0:
log.warn('(UDP) No data (-1)') log.warning('(UDP) No data (-1)')
return return
if read < 1: if read_size < 1:
log.warn('(UDP) get_datagram() called when pending data size is 0') log.warning('(UDP) get_datagram() called when pending data size is 0')
return return
data, peer_address, peer_port = self.readDatagram(self.pendingDatagramSize()) data, peer_address, peer_port = self.readDatagram(self.pendingDatagramSize())
log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data), log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data),
adx=peer_address, adx=peer_address,
port=peer_port)) port=peer_port))
log.debug('(UDP) packet "{data}"'.format(data=data)) log.debug('(UDP) packet "{data}"'.format(data=data))
if len(data) < 0: # Send to appropriate instance to process packet
log.warn('(UDP) No data (-1)')
return
elif len(data) < 8:
# Minimum packet is '%2CCCC='
log.warn('(UDP) Invalid packet - not enough data')
return
elif data is None:
log.warn('(UDP) No data (None)')
return
elif len(data) > PJLINK_MAX_PACKET:
log.warn('(UDP) Invalid packet - length too long')
return
elif not data.startswith(PJLINK_PREFIX):
log.warn('(UDP) Invalid packet - does not start with PJLINK_PREFIX')
return
elif data[1] != '2':
log.warn('(UDP) Invalid packet - missing/invalid PJLink class version')
return
elif data[6] != '=':
log.warn('(UDP) Invalid packet - separator missing')
return
# First two characters are header information we don't need at this time
cmd, data = data[2:].split('=')
if cmd not in self.pjlink_udp_functions:
log.warn('(UDP) Invalid packet - not a valid PJLink UDP reply')
return
if self.pjlink_udp_functions[cmd] is not None:
log.debug('(UDP) Processing {cmd} with "{data}"'.format(cmd=cmd, data=data))
return self.pjlink_udp_functions[cmd](data=data, host=peer_address, port=peer_port)
else:
log.debug('(UDP) Checking projector list for ip {host} to process'.format(host=peer_address)) log.debug('(UDP) Checking projector list for ip {host} to process'.format(host=peer_address))
for projector in self.projector_list: for projector in self.projector_list:
if peer_address == projector.ip: if peer_address == projector.ip:
if cmd not in projector.pjlink_functions: # Dispatch packet to appropriate remote instance
log.error('(UDP) Could not find method to process ' log.debug('(UDP) Dispatching packet to {host}'.format(host=projector.entry.name))
'"{cmd}" in {host}'.format(cmd=cmd, host=projector.ip)) return projector.get_data(buff=data, ip=peer_address, host=peer_address, port=peer_port)
return log.warning('(UDP) Could not find projector with ip {ip} to process packet'.format(ip=peer_address))
log.debug('(UDP) Calling "{cmd}" in {host}'.format(cmd=cmd, host=projector.ip))
return projector.pjlink_functions[cmd](data=data)
log.warn('(UDP) Could not find projector with ip {ip} to process packet'.format(ip=peer_address))
return
def process_ackn(self, data, host, port):
"""
Process the ACKN command.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.debug('(UDP) Processing ACKN packet')
if host not in self.ackn_list:
log.debug('(UDP) Adding {host} to ACKN list'.format(host=host))
self.ackn_list[host] = {'data': data,
'port': port}
else:
log.warn('(UDP) Host {host} already replied - ignoring'.format(host=host))
def process_srch(self, data, host, port):
"""
Process the SRCH command.
SRCH is processed by terminals so we ignore any packet.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.debug('(UDP) SRCH packet received - ignoring')
return return
def search_start(self): def search_start(self):
@ -224,6 +148,8 @@ class PJLinkCommands(object):
""" """
Process replies from PJLink projector. Process replies from PJLink projector.
""" """
# List of IP addresses and mac addresses found via UDP search command
ackn_list = []
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
""" """
@ -231,24 +157,47 @@ class PJLinkCommands(object):
""" """
log.debug('PJlinkCommands(args={args} kwargs={kwargs})'.format(args=args, kwargs=kwargs)) log.debug('PJlinkCommands(args={args} kwargs={kwargs})'.format(args=args, kwargs=kwargs))
super().__init__() super().__init__()
# Map PJLink command to method # Map PJLink command to method and include pjlink class version for this instance
# Default initial pjlink class version is '1'
self.pjlink_functions = { self.pjlink_functions = {
'AVMT': self.process_avmt, 'ACKN': {"method": self.process_ackn, # Class 2 (command is SRCH)
'CLSS': self.process_clss, "version": "2"},
'ERST': self.process_erst, 'AVMT': {"method": self.process_avmt,
'INFO': self.process_info, "version": "1"},
'INF1': self.process_inf1, 'CLSS': {"method": self.process_clss,
'INF2': self.process_inf2, "version": "1"},
'INPT': self.process_inpt, 'ERST': {"method": self.process_erst,
'INST': self.process_inst, "version": "1"},
'LAMP': self.process_lamp, 'INFO': {"method": self.process_info,
'NAME': self.process_name, "version": "1"},
'PJLINK': self.process_pjlink, 'INF1': {"method": self.process_inf1,
'POWR': self.process_powr, "version": "1"},
'SNUM': self.process_snum, 'INF2': {"method": self.process_inf2,
'SVER': self.process_sver, "version": "1"},
'RFIL': self.process_rfil, 'INPT': {"method": self.process_inpt,
'RLMP': self.process_rlmp "version": "1"},
'INST': {"method": self.process_inst,
"version": "1"},
'LAMP': {"method": self.process_lamp,
"version": "1"},
'LKUP': {"method": self.process_lkup, # Class 2 (reply only - no cmd)
"version": "2"},
'NAME': {"method": self.process_name,
"version": "1"},
'PJLINK': {"method": self.process_pjlink,
"version": "1"},
'POWR': {"method": self.process_powr,
"version": "1"},
'SNUM': {"method": self.process_snum,
"version": "1"},
'SRCH': {"method": self.process_srch, # Class 2 (reply is ACKN)
"version": "2"},
'SVER': {"method": self.process_sver,
"version": "1"},
'RFIL': {"method": self.process_rfil,
"version": "1"},
'RLMP': {"method": self.process_rlmp,
"version": "1"}
} }
def reset_information(self): def reset_information(self):
@ -287,8 +236,11 @@ class PJLinkCommands(object):
self.send_busy = False self.send_busy = False
self.send_queue = [] self.send_queue = []
self.priority_queue = [] self.priority_queue = []
# Reset default version in command routing dict
for cmd in self.pjlink_functions:
self.pjlink_functions[cmd]["version"] = PJLINK_VALID_CMD[cmd]['default']
def process_command(self, cmd, data): def process_command(self, cmd, data, *args, **kwargs):
""" """
Verifies any return error code. Calls the appropriate command handler. Verifies any return error code. Calls the appropriate command handler.
@ -320,9 +272,25 @@ class PJLinkCommands(object):
return self.change_status(status=E_AUTHENTICATION) return self.change_status(status=E_AUTHENTICATION)
# Command checks already passed # Command checks already passed
log.debug('({ip}) Calling function for {cmd}'.format(ip=self.entry.name, cmd=cmd)) log.debug('({ip}) Calling function for {cmd}'.format(ip=self.entry.name, cmd=cmd))
self.pjlink_functions[cmd](data=data) self.pjlink_functions[cmd]["method"](data=data, *args, **kwargs)
def process_avmt(self, data): def process_ackn(self, data, host, port):
"""
Process the ACKN command.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.debug('({ip}) Processing ACKN packet'.format(ip=self.entry.name))
if host not in self.ackn_list:
log.debug('({ip}) Adding {host} to ACKN list'.format(ip=self.entry.name, host=host))
self.ackn_list[host] = {'data': data,
'port': port}
else:
log.warning('({ip}) Host {host} already replied - ignoring'.format(ip=self.entry.name, host=host))
def process_avmt(self, data, *args, **kwargs):
""" """
Process shutter and speaker status. See PJLink specification for format. Process shutter and speaker status. See PJLink specification for format.
Update self.mute (audio) and self.shutter (video shutter). Update self.mute (audio) and self.shutter (video shutter).
@ -351,7 +319,7 @@ class PJLinkCommands(object):
self.projectorUpdateIcons.emit() self.projectorUpdateIcons.emit()
return return
def process_clss(self, data): def process_clss(self, data, *args, **kwargs):
""" """
PJLink class that this projector supports. See PJLink specification for format. PJLink class that this projector supports. See PJLink specification for format.
Updates self.class. Updates self.class.
@ -367,12 +335,13 @@ class PJLinkCommands(object):
# Due to stupid projectors not following standards (Optoma, BenQ comes to mind), # Due to stupid projectors not following standards (Optoma, BenQ comes to mind),
# AND the different responses that can be received, the semi-permanent way to # AND the different responses that can be received, the semi-permanent way to
# fix the class reply is to just remove all non-digit characters. # fix the class reply is to just remove all non-digit characters.
try: chk = re.findall('\d', data)
clss = re.findall('\d', data)[0] # Should only be the first match if len(chk) < 1:
except IndexError:
log.error('({ip}) No numbers found in class version reply "{data}" - ' log.error('({ip}) No numbers found in class version reply "{data}" - '
'defaulting to class "1"'.format(ip=self.entry.name, data=data)) 'defaulting to class "1"'.format(ip=self.entry.name, data=data))
clss = '1' clss = '1'
else:
clss = chk[0] # Should only be the first match
elif not data.isdigit(): elif not data.isdigit():
log.error('({ip}) NAN CLSS version reply "{data}" - ' log.error('({ip}) NAN CLSS version reply "{data}" - '
'defaulting to class "1"'.format(ip=self.entry.name, data=data)) 'defaulting to class "1"'.format(ip=self.entry.name, data=data))
@ -383,6 +352,11 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting pjlink_class for this projector ' log.debug('({ip}) Setting pjlink_class for this projector '
'to "{data}"'.format(ip=self.entry.name, 'to "{data}"'.format(ip=self.entry.name,
data=self.pjlink_class)) data=self.pjlink_class))
# Update method class versions
for cmd in self.pjlink_functions:
if self.pjlink_class in PJLINK_VALID_CMD[cmd]['version']:
self.pjlink_functions[cmd]['version'] = self.pjlink_class
# Since we call this one on first connect, setup polling from here # Since we call this one on first connect, setup polling from here
if not self.no_poll: if not self.no_poll:
log.debug('({ip}) process_pjlink(): Starting timer'.format(ip=self.entry.name)) log.debug('({ip}) process_pjlink(): Starting timer'.format(ip=self.entry.name))
@ -391,7 +365,7 @@ class PJLinkCommands(object):
return return
def process_erst(self, data): def process_erst(self, data, *args, **kwargs):
""" """
Error status. See PJLink Specifications for format. Error status. See PJLink Specifications for format.
Updates self.projector_errors Updates self.projector_errors
@ -443,7 +417,7 @@ class PJLinkCommands(object):
PJLINK_ERST_STATUS[other] PJLINK_ERST_STATUS[other]
return return
def process_inf1(self, data): def process_inf1(self, data, *args, **kwargs):
""" """
Manufacturer name set in projector. Manufacturer name set in projector.
Updates self.manufacturer Updates self.manufacturer
@ -455,7 +429,7 @@ class PJLinkCommands(object):
data=self.manufacturer)) data=self.manufacturer))
return return
def process_inf2(self, data): def process_inf2(self, data, *args, **kwargs):
""" """
Projector Model set in projector. Projector Model set in projector.
Updates self.model. Updates self.model.
@ -466,7 +440,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector model to "{data}"'.format(ip=self.entry.name, data=self.model)) log.debug('({ip}) Setting projector model to "{data}"'.format(ip=self.entry.name, data=self.model))
return return
def process_info(self, data): def process_info(self, data, *args, **kwargs):
""" """
Any extra info set in projector. Any extra info set in projector.
Updates self.other_info. Updates self.other_info.
@ -477,7 +451,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector other_info to "{data}"'.format(ip=self.entry.name, data=self.other_info)) log.debug('({ip}) Setting projector other_info to "{data}"'.format(ip=self.entry.name, data=self.other_info))
return return
def process_inpt(self, data): def process_inpt(self, data, *args, **kwargs):
""" """
Current source input selected. See PJLink specification for format. Current source input selected. See PJLink specification for format.
Update self.source Update self.source
@ -499,7 +473,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting data source to "{data}"'.format(ip=self.entry.name, data=self.source)) log.debug('({ip}) Setting data source to "{data}"'.format(ip=self.entry.name, data=self.source))
return return
def process_inst(self, data): def process_inst(self, data, *args, **kwargs):
""" """
Available source inputs. See PJLink specification for format. Available source inputs. See PJLink specification for format.
Updates self.source_available Updates self.source_available
@ -516,7 +490,7 @@ class PJLinkCommands(object):
data=self.source_available)) data=self.source_available))
return return
def process_lamp(self, data): def process_lamp(self, data, *args, **kwargs):
""" """
Lamp(s) status. See PJLink Specifications for format. Lamp(s) status. See PJLink Specifications for format.
Data may have more than 1 lamp to process. Data may have more than 1 lamp to process.
@ -542,7 +516,18 @@ class PJLinkCommands(object):
self.lamp = lamps self.lamp = lamps
return return
def process_name(self, data): def process_lkup(self, data, host, port):
"""
Process reply indicating remote is available for connection
:param data: Data packet from remote
:param host: Remote IP address
:param port: Local port packet received on
"""
# TODO: Check if autoconnect is enabled and connect?
pass
def process_name(self, data, *args, **kwargs):
""" """
Projector name set in projector. Projector name set in projector.
Updates self.pjlink_name Updates self.pjlink_name
@ -553,7 +538,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector PJLink name to "{data}"'.format(ip=self.entry.name, data=self.pjlink_name)) log.debug('({ip}) Setting projector PJLink name to "{data}"'.format(ip=self.entry.name, data=self.pjlink_name))
return return
def process_pjlink(self, data): def process_pjlink(self, data, *args, **kwargs):
""" """
Process initial socket connection to terminal. Process initial socket connection to terminal.
@ -594,7 +579,7 @@ class PJLinkCommands(object):
# Since this is an initial connection, make it a priority just in case # Since this is an initial connection, make it a priority just in case
return self.send_command(cmd="CLSS", salt=data_hash, priority=True) return self.send_command(cmd="CLSS", salt=data_hash, priority=True)
def process_powr(self, data): def process_powr(self, data, *args, **kwargs):
""" """
Power status. See PJLink specification for format. Power status. See PJLink specification for format.
Update self.power with status. Update icons if change from previous setting. Update self.power with status. Update icons if change from previous setting.
@ -617,7 +602,7 @@ class PJLinkCommands(object):
log.warning('({ip}) Unknown power response: "{data}"'.format(ip=self.entry.name, data=data)) log.warning('({ip}) Unknown power response: "{data}"'.format(ip=self.entry.name, data=data))
return return
def process_rfil(self, data): def process_rfil(self, data, *args, **kwargs):
""" """
Process replacement filter type Process replacement filter type
""" """
@ -628,7 +613,7 @@ class PJLinkCommands(object):
log.warning('({ip}) Saved model: "{old}"'.format(ip=self.entry.name, old=self.model_filter)) log.warning('({ip}) Saved model: "{old}"'.format(ip=self.entry.name, old=self.model_filter))
log.warning('({ip}) New model: "{new}"'.format(ip=self.entry.name, new=data)) log.warning('({ip}) New model: "{new}"'.format(ip=self.entry.name, new=data))
def process_rlmp(self, data): def process_rlmp(self, data, *args, **kwargs):
""" """
Process replacement lamp type Process replacement lamp type
""" """
@ -639,7 +624,7 @@ class PJLinkCommands(object):
log.warning('({ip}) Saved lamp: "{old}"'.format(ip=self.entry.name, old=self.model_lamp)) log.warning('({ip}) Saved lamp: "{old}"'.format(ip=self.entry.name, old=self.model_lamp))
log.warning('({ip}) New lamp: "{new}"'.format(ip=self.entry.name, new=data)) log.warning('({ip}) New lamp: "{new}"'.format(ip=self.entry.name, new=data))
def process_snum(self, data): def process_snum(self, data, *args, **kwargs):
""" """
Serial number of projector. Serial number of projector.
@ -659,7 +644,20 @@ class PJLinkCommands(object):
log.warning('({ip}) NOT saving serial number'.format(ip=self.entry.name)) log.warning('({ip}) NOT saving serial number'.format(ip=self.entry.name))
self.serial_no_received = data self.serial_no_received = data
def process_sver(self, data): def process_srch(self, data, host, port):
"""
Process the SRCH command.
SRCH is processed by terminals so we ignore any packet.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.warning('(UDP) SRCH packet received from {host} - ignoring'.format(host=host))
return
def process_sver(self, data, *args, **kwargs):
""" """
Software version of projector Software version of projector
""" """
@ -716,6 +714,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.pin = self.entry.pin self.pin = self.entry.pin
self.port = self.entry.port self.port = self.entry.port
self.pjlink_class = PJLINK_CLASS if self.entry.pjlink_class is None else self.entry.pjlink_class self.pjlink_class = PJLINK_CLASS if self.entry.pjlink_class is None else self.entry.pjlink_class
self.ackn_list = {} # Replies from online projectors (Class 2 option)
self.db_update = False # Use to check if db needs to be updated prior to exiting self.db_update = False # Use to check if db needs to be updated prior to exiting
# Poll time 20 seconds unless called with something else # Poll time 20 seconds unless called with something else
self.poll_time = 20000 if 'poll_time' not in kwargs else kwargs['poll_time'] * 1000 self.poll_time = 20000 if 'poll_time' not in kwargs else kwargs['poll_time'] * 1000
@ -916,7 +915,10 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
""" """
Clean out extraneous stuff in the buffer. Clean out extraneous stuff in the buffer.
""" """
log.warning('({ip}) {message}'.format(ip=self.entry.name, message='Invalid packet' if msg is None else msg)) log.debug('({ip}) Cleaning buffer - msg = "{message}"'.format(ip=self.entry.name, message=msg))
if msg is None:
msg = 'Invalid packet'
log.warning('({ip}) {message}'.format(ip=self.entry.name, message=msg))
self.send_busy = False self.send_busy = False
trash_count = 0 trash_count = 0
while self.bytesAvailable() > 0: while self.bytesAvailable() > 0:
@ -960,7 +962,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.socket_timer.stop() self.socket_timer.stop()
return self.get_data(buff=read, ip=self.ip) return self.get_data(buff=read, ip=self.ip)
def get_data(self, buff, ip=None): def get_data(self, buff, ip=None, *args, **kwargs):
""" """
Process received data Process received data
@ -973,45 +975,61 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
ip = self.ip ip = self.ip
log.debug('({ip}) get_data(ip="{ip_in}" buffer="{buff}"'.format(ip=self.entry.name, ip_in=ip, buff=buff)) log.debug('({ip}) get_data(ip="{ip_in}" buffer="{buff}"'.format(ip=self.entry.name, ip_in=ip, buff=buff))
# NOTE: Class2 has changed to some values being UTF-8 # NOTE: Class2 has changed to some values being UTF-8
if isinstance(buff, bytes):
data_in = decode(buff, 'utf-8') data_in = decode(buff, 'utf-8')
else:
data_in = buff
data = data_in.strip() data = data_in.strip()
# Initial packet checks # Initial packet checks
if (len(data) < 7): if (len(data) < 7):
self._trash_buffer(msg='get_data(): Invalid packet - length') self._trash_buffer(msg='get_data(): Invalid packet - length')
return self.receive_data_signal() return self.receive_data_signal()
elif len(data) > self.max_size: elif len(data) > self.max_size:
self._trash_buffer(msg='get_data(): Invalid packet - too long') self._trash_buffer(msg='get_data(): Invalid packet - too long ({length} bytes)'.format(length=len(data)))
return self.receive_data_signal() return self.receive_data_signal()
elif not data.startswith(PJLINK_PREFIX): elif not data.startswith(PJLINK_PREFIX):
self._trash_buffer(msg='get_data(): Invalid packet - PJLink prefix missing') self._trash_buffer(msg='get_data(): Invalid packet - PJLink prefix missing')
return self.receive_data_signal() return self.receive_data_signal()
elif '=' not in data: elif data[6] != '=':
self._trash_buffer(msg='get_data(): Invalid reply - Does not have "="') self._trash_buffer(msg='get_data(): Invalid reply - Does not have "="')
return self.receive_data_signal() return self.receive_data_signal()
log.debug('({ip}) get_data(): Checking new data "{data}"'.format(ip=self.entry.name, data=data)) log.debug('({ip}) get_data(): Checking new data "{data}"'.format(ip=self.entry.name, data=data))
header, data = data.split('=') header, data = data.split('=')
log.debug('({ip}) get_data() header="{header}" data="{data}"'.format(ip=self.entry.name,
header=header, data=data))
# At this point, the header should contain: # At this point, the header should contain:
# "PVCCCC" # "PVCCCC"
# Where: # Where:
# P = PJLINK_PREFIX # P = PJLINK_PREFIX
# V = PJLink class or version # V = PJLink class or version
# C = PJLink command # C = PJLink command
version, cmd = header[1], header[2:].upper()
log.debug('({ip}) get_data() version="{version}" cmd="{cmd}"'.format(ip=self.entry.name,
version=version, cmd=cmd))
# TODO: Below commented for now since it seems to cause issues with testing some invalid data.
# Revisit after more refactoring is finished.
'''
try: try:
version, cmd = header[1], header[2:].upper() version, cmd = header[1], header[2:].upper()
log.debug('({ip}) get_data() version="{version}" cmd="{cmd}"'.format(ip=self.entry.name,
version=version, cmd=cmd))
except ValueError as e: except ValueError as e:
self.change_status(E_INVALID_DATA) self.change_status(E_INVALID_DATA)
log.warning('({ip}) get_data(): Received data: "{data}"'.format(ip=self.entry.name, data=data_in)) log.warning('({ip}) get_data(): Received data: "{data}"'.format(ip=self.entry.name, data=data_in))
self._trash_buffer('get_data(): Expected header + command + data') self._trash_buffer('get_data(): Expected header + command + data')
return self.receive_data_signal() return self.receive_data_signal()
'''
if cmd not in PJLINK_VALID_CMD: if cmd not in PJLINK_VALID_CMD:
log.warning('({ip}) get_data(): Invalid packet - unknown command "{data}"'.format(ip=self.entry.name, self._trash_buffer('get_data(): Invalid packet - unknown command "{data}"'.format(ip=self.entry.name,
data=cmd)) data=cmd))
self._trash_buffer(msg='get_data(): Unknown command "{data}"'.format(data=cmd))
return self.receive_data_signal() return self.receive_data_signal()
if int(self.pjlink_class) < int(version): elif version not in PJLINK_VALID_CMD[cmd]['version']:
self._trash_buffer(msg='get_data() Command reply version does not match a valid command version')
return self.receive_data_signal()
elif int(self.pjlink_class) < int(version):
log.warning('({ip}) get_data(): Projector returned class reply higher ' log.warning('({ip}) get_data(): Projector returned class reply higher '
'than projector stated class'.format(ip=self.entry.name)) 'than projector stated class'.format(ip=self.entry.name))
self.process_command(cmd, data) self.process_command(cmd, data, *args, **kwargs)
return self.receive_data_signal() return self.receive_data_signal()
@QtCore.pyqtSlot(QtNetwork.QAbstractSocket.SocketError) @QtCore.pyqtSlot(QtNetwork.QAbstractSocket.SocketError)
@ -1063,16 +1081,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
data=opts, data=opts,
salt='' if salt is None salt='' if salt is None
else ' with hash')) else ' with hash'))
cmd_ver = PJLINK_VALID_CMD[cmd]['version'] header = PJLINK_HEADER.format(linkclass=self.pjlink_functions[cmd]["version"])
if self.pjlink_class in PJLINK_VALID_CMD[cmd]['version']:
header = PJLINK_HEADER.format(linkclass=self.pjlink_class)
elif len(cmd_ver) == 1 and (int(cmd_ver[0]) < int(self.pjlink_class)):
# Typically a class 1 only command
header = PJLINK_HEADER.format(linkclass=cmd_ver[0])
else:
# NOTE: Once we get to version 3 then think about looping
log.error('({ip}): send_command(): PJLink class check issue? Aborting'.format(ip=self.entry.name))
return
out = '{salt}{header}{command} {options}{suffix}'.format(salt="" if salt is None else salt, out = '{salt}{header}{command} {options}{suffix}'.format(salt="" if salt is None else salt,
header=header, header=header,
command=cmd, command=cmd,

View File

@ -25,8 +25,8 @@ Functional tests to test the AppLocation class and related methods.
from unittest import TestCase from unittest import TestCase
from unittest.mock import MagicMock, call, patch from unittest.mock import MagicMock, call, patch
from openlp.core.common import clean_button_text, de_hump, extension_loader, is_macosx, is_linux, is_win, \ from openlp.core.common import clean_button_text, de_hump, extension_loader, is_macosx, is_linux, \
path_to_module, trace_error_handler is_win, normalize_str, path_to_module, trace_error_handler
from openlp.core.common.path import Path from openlp.core.common.path import Path
@ -211,6 +211,30 @@ class TestCommonFunctions(TestCase):
assert is_win() is False, 'is_win() should return False' assert is_win() is False, 'is_win() should return False'
assert is_macosx() is False, 'is_macosx() should return False' assert is_macosx() is False, 'is_macosx() should return False'
def test_normalize_str_leaves_newlines(self):
# GIVEN: a string containing newlines
string = 'something\nelse'
# WHEN: normalize is called
normalized_string = normalize_str(string)
# THEN: string is unchanged
assert normalized_string == string
def test_normalize_str_removes_null_byte(self):
# GIVEN: a string containing a null byte
string = 'somet\x00hing'
# WHEN: normalize is called
normalized_string = normalize_str(string)
# THEN: nullbyte is removed
assert normalized_string == 'something'
def test_normalize_str_replaces_crlf_with_lf(self):
# GIVEN: a string containing crlf
string = 'something\r\nelse'
# WHEN: normalize is called
normalized_string = normalize_str(string)
# THEN: crlf is replaced with lf
assert normalized_string == 'something\nelse'
def test_clean_button_text(self): def test_clean_button_text(self):
""" """
Test the clean_button_text() function. Test the clean_button_text() function.

View File

@ -29,12 +29,15 @@ import os
import shutil import shutil
from tempfile import mkdtemp from tempfile import mkdtemp
from unittest import TestCase from unittest import TestCase
from unittest.mock import patch from unittest.mock import MagicMock, patch
from openlp.core.common.registry import Registry
from openlp.core.lib.db import upgrade_db from openlp.core.lib.db import upgrade_db
from openlp.core.projectors import upgrade from openlp.core.projectors import upgrade
from openlp.core.projectors.constants import PJLINK_PORT from openlp.core.projectors.constants import PJLINK_PORT
from openlp.core.projectors.db import Manufacturer, Model, Projector, ProjectorDB, ProjectorSource, Source from openlp.core.projectors.db import Manufacturer, Model, Projector, ProjectorDB, ProjectorSource, Source
from openlp.core.ui.mainwindow import MainWindow
from tests.helpers.testmixin import TestMixin
from tests.resources.projector.data import TEST_DB_PJLINK1, TEST_DB, TEST1_DATA, TEST2_DATA, TEST3_DATA from tests.resources.projector.data import TEST_DB_PJLINK1, TEST_DB, TEST1_DATA, TEST2_DATA, TEST3_DATA
from tests.utils.constants import TEST_RESOURCES_PATH from tests.utils.constants import TEST_RESOURCES_PATH
@ -122,7 +125,7 @@ class TestProjectorDBUpdate(TestCase):
assert updated_to_version == latest_version, 'The projector DB should have been upgrade to the latest version' assert updated_to_version == latest_version, 'The projector DB should have been upgrade to the latest version'
class TestProjectorDB(TestCase): class TestProjectorDB(TestCase, TestMixin):
""" """
Test case for ProjectorDB Test case for ProjectorDB
""" """
@ -131,6 +134,33 @@ class TestProjectorDB(TestCase):
""" """
Set up anything necessary for all tests Set up anything necessary for all tests
""" """
# Create a test app to keep from segfaulting
Registry.create()
self.registry = Registry()
self.setup_application()
# Mock cursor busy/normal methods.
self.app.set_busy_cursor = MagicMock()
self.app.set_normal_cursor = MagicMock()
self.app.args = []
Registry().register('application', self.app)
Registry().set_flag('no_web_server', True)
# Mock classes and methods used by mainwindow.
with patch('openlp.core.ui.mainwindow.SettingsForm'), \
patch('openlp.core.ui.mainwindow.ImageManager'), \
patch('openlp.core.ui.mainwindow.LiveController'), \
patch('openlp.core.ui.mainwindow.PreviewController'), \
patch('openlp.core.ui.mainwindow.OpenLPDockWidget'), \
patch('openlp.core.ui.mainwindow.QtWidgets.QToolBox'), \
patch('openlp.core.ui.mainwindow.QtWidgets.QMainWindow.addDockWidget'), \
patch('openlp.core.ui.mainwindow.ServiceManager'), \
patch('openlp.core.ui.mainwindow.ThemeManager'), \
patch('openlp.core.ui.mainwindow.ProjectorManager'), \
patch('openlp.core.ui.mainwindow.Renderer'), \
patch('openlp.core.ui.mainwindow.websockets.WebSocketServer'), \
patch('openlp.core.ui.mainwindow.server.HttpServer'):
self.main_window = MainWindow()
# Create a temporary database directory and database
self.tmp_folder = mkdtemp(prefix='openlp_') self.tmp_folder = mkdtemp(prefix='openlp_')
tmpdb_url = 'sqlite:///{db}'.format(db=os.path.join(self.tmp_folder, TEST_DB)) tmpdb_url = 'sqlite:///{db}'.format(db=os.path.join(self.tmp_folder, TEST_DB))
mocked_init_url.return_value = tmpdb_url mocked_init_url.return_value = tmpdb_url
@ -139,9 +169,12 @@ class TestProjectorDB(TestCase):
def tearDown(self): def tearDown(self):
""" """
Clean up Clean up
Delete all the C++ objects at the end so that we don't have a segfault
""" """
self.projector.session.close() self.projector.session.close()
self.projector = None self.projector = None
del self.main_window
# Ignore errors since windows can have problems with locked files # Ignore errors since windows can have problems with locked files
shutil.rmtree(self.tmp_folder, ignore_errors=True) shutil.rmtree(self.tmp_folder, ignore_errors=True)

View File

@ -39,30 +39,31 @@ class TestPJLinkRouting(TestCase):
""" """
Tests for the PJLink module command routing Tests for the PJLink module command routing
""" """
def test_get_data_unknown_command(self): @patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_data_unknown_command(self, mock_log):
""" """
Test not a valid command Test not a valid command
""" """
# GIVEN: Test object # GIVEN: Test object
with patch.object(openlp.core.projectors.pjlink, 'log') as mock_log, \
patch.object(openlp.core.projectors.pjlink.PJLink, '_trash_buffer') as mock_buffer:
pjlink = PJLink(Projector(**TEST1_DATA), no_poll=True) pjlink = PJLink(Projector(**TEST1_DATA), no_poll=True)
pjlink.pjlink_functions = MagicMock() pjlink.pjlink_functions = MagicMock()
log_warning_text = [call('({ip}) get_data(): Invalid packet - ' log_warning_text = [call('({ip}) get_data(): Invalid packet - '
'unknown command "UNK"'.format(ip=pjlink.name))] 'unknown command "UNKN"'.format(ip=pjlink.name))]
log_debug_text = [call('({ip}) get_data(ip="111.111.111.111" ' log_debug_text = [call('(___TEST_ONE___) get_data(ip="111.111.111.111" buffer="%1UNKN=Huh?"'),
'buffer="b\'%1UNK=Huh?\'"'.format(ip=pjlink.name)), call('(___TEST_ONE___) get_data(): Checking new data "%1UNKN=Huh?"'),
call('({ip}) get_data(): Checking new data "%1UNK=Huh?"'.format(ip=pjlink.name))] call('(___TEST_ONE___) get_data() header="%1UNKN" data="Huh?"'),
call('(___TEST_ONE___) get_data() version="1" cmd="UNKN"'),
call('(___TEST_ONE___) Cleaning buffer - msg = "get_data(): '
'Invalid packet - unknown command "UNKN""'),
call('(___TEST_ONE___) Finished cleaning buffer - 0 bytes dropped')]
# WHEN: get_data called with an unknown command # WHEN: get_data called with an unknown command
pjlink.get_data(buff='{prefix}1UNK=Huh?'.format(prefix=PJLINK_PREFIX).encode('utf-8')) pjlink.get_data(buff='{prefix}1UNKN=Huh?'.format(prefix=PJLINK_PREFIX))
# THEN: Appropriate log entries should have been made and methods called/not called # THEN: Appropriate log entries should have been made and methods called/not called
mock_log.debug.assert_has_calls(log_debug_text)
mock_log.warning.assert_has_calls(log_warning_text) mock_log.warning.assert_has_calls(log_warning_text)
mock_log.debug.assert_has_calls(log_debug_text)
assert pjlink.pjlink_functions.called is False, 'Should not have accessed pjlink_functions' assert pjlink.pjlink_functions.called is False, 'Should not have accessed pjlink_functions'
assert mock_buffer.called is True, 'Should have called _trash_buffer'
def test_process_command_call_clss(self): def test_process_command_call_clss(self):
""" """
@ -219,7 +220,6 @@ class TestPJLinkRouting(TestCase):
""" """
Test command returned success Test command returned success
""" """
# GIVEN: Initial mocks and data
# GIVEN: Test object and mocks # GIVEN: Test object and mocks
with patch.object(openlp.core.projectors.pjlink, 'log') as mock_log, \ with patch.object(openlp.core.projectors.pjlink, 'log') as mock_log, \
patch.object(openlp.core.projectors.pjlink.PJLink, 'send_command') as mock_send_command, \ patch.object(openlp.core.projectors.pjlink.PJLink, 'send_command') as mock_send_command, \

View File

@ -22,14 +22,14 @@
""" """
Package to test the openlp.core.projectors.pjlink commands package. Package to test the openlp.core.projectors.pjlink commands package.
""" """
from unittest import TestCase from unittest import TestCase, skip
from unittest.mock import patch, call from unittest.mock import patch, call
import openlp.core.projectors.pjlink import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import S_CONNECTED, S_OFF, S_ON from openlp.core.projectors.constants import PJLINK_PORT, S_CONNECTED, S_OFF, S_ON
from openlp.core.projectors.db import Projector from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink from openlp.core.projectors.pjlink import PJLink, PJLinkUDP
from tests.resources.projector.data import TEST_HASH, TEST_PIN, TEST_SALT, TEST1_DATA from tests.resources.projector.data import TEST_HASH, TEST_PIN, TEST_SALT, TEST1_DATA, TEST2_DATA
class TestPJLinkCommands(TestCase): class TestPJLinkCommands(TestCase):
@ -235,3 +235,114 @@ class TestPJLinkCommands(TestCase):
mock_log.error.assert_has_calls(log_check) mock_log.error.assert_has_calls(log_check)
assert 1 == mock_disconnect_from_host.call_count, 'Should have only been called once' assert 1 == mock_disconnect_from_host.call_count, 'Should have only been called once'
mock_send_command.assert_not_called() mock_send_command.assert_not_called()
@skip('Change to pjlink_udp.get_datagram() call')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_duplicate(self, mock_log):
"""
Test process_ackn method with multiple calls with same data
"""
# TODO: Change this to call pjlink_udp.get_datagram() so ACKN can be processed properly
# GIVEN: Test setup
pjlink = PJLink(projector=self.test_list[0])
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_warn_calls = [call('(___TEST_ONE___) Host {host} already replied - '
'ignoring'.format(host=TEST1_DATA['ip']))]
log_debug_calls = [call('PJlinkCommands(args=() kwargs={})'),
call('(___TEST_ONE___) reset_information() connect status is S_NOT_CONNECTED'),
call('(___TEST_ONE___) Processing ACKN packet'),
call('(___TEST_ONE___) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(___TEST_ONE___) Processing ACKN packet')]
# WHEN: process_ackn called twice with same data
pjlink.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink.ackn_list)
print('test_list: ', check_list, '\n')
assert pjlink.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warning.assert_has_calls(log_warn_calls)
@skip('Change to pjlink_udp.get_datagram() call')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_multiple(self, mock_log):
"""
Test process_ackn method with multiple calls
"""
# TODO: Change this to call pjlink_udp.get_datagram() so ACKN can be processed properly
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT},
TEST2_DATA['ip']: {'data': TEST2_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST2_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink_udp.process_ackn(data=TEST2_DATA['mac_adx'], host=TEST2_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@skip('Change to pjlink_udp.get_datagram() call')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_single(self, mock_log):
"""
Test process_ackn method with single call
"""
# TODO: Change this to call pjlink_udp.get_datagram() so ACKN can be processed properly
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@skip('Change to pjlink_udp.get_datagram() call')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_srch(self, mock_log):
"""
Test process_srch method
"""
# TODO: Change this to call pjlink_udp.get_datagram() so ACKN can be processed properly
# GIVEN: Test setup
log_warn_calls = [call('(UDP) SRCH packet received from {ip} - ignoring'.format(ip=TEST1_DATA['ip'])), ]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), ]
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
# WHEN: process_srch called
pjlink_udp.process_srch(data=None, host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: log entries should be entered
mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)

View File

@ -28,10 +28,10 @@ from unittest import TestCase
from unittest.mock import call, patch from unittest.mock import call, patch
import openlp.core.projectors.pjlink import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import PJLINK_MAX_PACKET, PJLINK_PORT, PJLINK_PREFIX from openlp.core.projectors.constants import PJLINK_PORT
from openlp.core.projectors.db import Projector from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLinkUDP from openlp.core.projectors.pjlink import PJLinkUDP, PJLink
from tests.resources.projector.data import TEST1_DATA, TEST2_DATA from tests.resources.projector.data import TEST1_DATA, TEST2_DATA
@ -43,7 +43,8 @@ class TestPJLinkBase(TestCase):
""" """
Setup generic test conditions Setup generic test conditions
""" """
self.test_list = [Projector(**TEST1_DATA), Projector(**TEST2_DATA)] self.test_list = [PJLink(projector=Projector(**TEST1_DATA)),
PJLink(projector=Projector(**TEST2_DATA))]
def tearDown(self): def tearDown(self):
""" """
@ -51,132 +52,6 @@ class TestPJLinkBase(TestCase):
""" """
self.test_list = None self.test_list = None
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_class(self, mock_log):
"""
Test get_datagram with invalid class number
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - missing/invalid PJLink class version')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%1ACKN=11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}1ACKN={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_command(self, mock_log):
"""
Test get_datagram with invalid PJLink UDP command
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - not a valid PJLink UDP reply')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%2DUMB=11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}2DUMB={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_prefix(self, mock_log):
"""
Test get_datagram when prefix != PJLINK_PREFIX
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - does not start with PJLINK_PREFIX')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "$2ACKN=11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}2ACKN={mac}'.format(prefix='$', mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_separator(self, mock_log):
"""
Test get_datagram when separator not equal to =
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - separator missing')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%2ACKN 11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}2ACKN {mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_long(self, mock_log):
"""
Test get_datagram when datagram > PJLINK_MAX_PACKET
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - length too long')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 143 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%2ACKN={long}"'.format(long='X' * PJLINK_MAX_PACKET))]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = PJLINK_MAX_PACKET + 7
mock_read.return_value = ('{prefix}2ACKN={long}'.format(prefix=PJLINK_PREFIX,
long='X' * PJLINK_MAX_PACKET),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log') @patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_negative_zero_length(self, mock_log): def test_get_datagram_data_negative_zero_length(self, mock_log):
""" """
@ -196,7 +71,7 @@ class TestPJLinkBase(TestCase):
pjlink_udp.get_datagram() pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns # THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls) mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls) mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log') @patch.object(openlp.core.projectors.pjlink, 'log')
@ -206,41 +81,18 @@ class TestPJLinkBase(TestCase):
""" """
# GIVEN: Test setup # GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list) pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - not enough data')] log_warn_calls = [call('(UDP) get_datagram() called when pending data size is 0')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), log_debug_calls = [call('(UDP) get_datagram() - Receiving data')]
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read: patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 1 mock_datagram.return_value = 0
mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT) mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready # WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram() pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns # THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls) mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_short(self, mock_log):
"""
Test get_datagram when data length < 8
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - not enough data')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 6
mock_read.return_value = ('{prefix}2ACKN'.format(prefix=PJLINK_PREFIX), TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls) mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log') @patch.object(openlp.core.projectors.pjlink, 'log')
@ -260,101 +112,5 @@ class TestPJLinkBase(TestCase):
pjlink_udp.get_datagram() pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns # THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls) mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_duplicate(self, mock_log):
"""
Test process_ackn method with multiple calls with same data
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_warn_calls = [call('(UDP) Host {host} already replied - ignoring'.format(host=TEST1_DATA['ip']))]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(UDP) Processing ACKN packet')]
# WHEN: process_ackn called twice with same data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_multiple(self, mock_log):
"""
Test process_ackn method with multiple calls
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT},
TEST2_DATA['ip']: {'data': TEST2_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST2_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink_udp.process_ackn(data=TEST2_DATA['mac_adx'], host=TEST2_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_single(self, mock_log):
"""
Test process_ackn method with single call
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_srch(self, mock_log):
"""
Test process_srch method
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) SRCH packet received - ignoring')]
# WHEN: process_srch called
pjlink_udp.process_srch(data=None, host=None, port=None)
# THEN: debug log entry should be entered
mock_log.debug.assert_has_calls(log_debug_calls) mock_log.debug.assert_has_calls(log_debug_calls)