PJLink2 update Q

This commit is contained in:
Ken Roberts 2018-04-19 23:04:43 -07:00
parent 1ce964f243
commit 604581c71a
10 changed files with 433 additions and 497 deletions

View File

@ -154,110 +154,137 @@ PROJECTOR_STATE = [
S_INFO
]
# NOTE: Changed format to account for some commands are both class 1 and 2
# NOTE: Changed format to account for some commands are both class 1 and 2.
# Make sure the sequence of 'version' is lowest-to-highest.
PJLINK_VALID_CMD = {
'ACKN': {'version': ['2', ],
'ACKN': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'Acknowledge a PJLink SRCH command - returns MAC address.')
},
'AVMT': {'version': ['1', ],
'AVMT': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Blank/unblank video and/or mute audio.')
},
'CLSS': {'version': ['1', ],
'CLSS': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Query projector PJLink class support.')
},
'ERST': {'version': ['1', '2'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Query error status from projector. '
'Returns fan/lamp/temp/cover/filter/other error status.')
},
'FILT': {'version': ['2', ],
'FILT': {'version': ['2'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Query number of hours on filter.')
},
'FREZ': {'version': ['2', ],
'FREZ': {'version': ['2'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Freeze or unfreeze current image being projected.')
},
'INF1': {'version': ['1', ],
'INF1': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Query projector manufacturer name.')
},
'INF2': {'version': ['1', ],
'INF2': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Query projector product name.')
},
'INFO': {'version': ['1', ],
'INFO': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Query projector for other information set by manufacturer.')
},
'INNM': {'version': ['2', ],
'INNM': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'Query specified input source name')
},
'INPT': {'version': ['1', ],
'INPT': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Switch to specified video source.')
},
'INST': {'version': ['1', ],
'INST': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Query available input sources.')
},
'IRES': {'version:': ['2', ],
'IRES': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'Query current input resolution.')
},
'LAMP': {'version': ['1', ],
'LAMP': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Query lamp time and on/off status. Multiple lamps supported.')
},
'LKUP': {'version': ['2', ],
'LKUP': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'UDP Status - Projector is now available on network. Includes MAC address.')
},
'MVOL': {'version': ['2', ],
'MVOL': {'version': ['2'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Adjust microphone volume by 1 step.')
},
'NAME': {'version': ['1', ],
'NAME': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Query customer-set projector name.')
},
'PJLINK': {'version': ['1', ],
'PJLINK': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Initial connection with authentication/no authentication request.')
},
'POWR': {'version': ['1', ],
'POWR': {'version': ['1'],
'default': '1',
'description': translate('OpenLP.PJLinkConstants',
'Turn lamp on or off/standby.')
},
'RFIL': {'version': ['2', ],
'RFIL': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'Query replacement air filter model number.')
},
'RLMP': {'version': ['2', ],
'RLMP': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'Query replacement lamp model number.')
},
'RRES': {'version': ['2', ],
'RRES': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'Query recommended resolution.')
},
'SNUM': {'version': ['2', ],
'SNUM': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'Query projector serial number.')
},
'SRCH': {'version': ['2', ],
'SRCH': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'UDP broadcast search request for available projectors. Reply is ACKN.')
},
'SVER': {'version': ['2', ],
'SVER': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'Query projector software version number.')
},
'SVOL': {'version': ['2', ],
'SVOL': {'version': ['2'],
'default': '2',
'description': translate('OpenLP.PJLinkConstants',
'Adjust speaker volume by 1 step.')
}

View File

@ -58,10 +58,15 @@ class Ui_ProjectorEditForm(object):
# IP Address
self.ip_label = QtWidgets.QLabel(edit_projector_dialog)
self.ip_label.setObjectName('projector_edit_ip_label')
self.ip_text = QtWidgets.QLineEdit(edit_projector_dialog)
self.ip_text.setObjectName('projector_edit_ip_text')
self.ip_text_edit = QtWidgets.QLineEdit(edit_projector_dialog)
self.ip_text_edit.setObjectName('projector_edit_ip_text')
self.ip_text_label = QtWidgets.QLabel(edit_projector_dialog)
self.ip_text_label.setObjectName('projector_show_ip_text')
self.dialog_layout.addWidget(self.ip_label, 0, 0)
self.dialog_layout.addWidget(self.ip_text, 0, 1)
# For new projector, use edit widget
self.dialog_layout.addWidget(self.ip_text_edit, 0, 1)
# For edit projector, use show widget
self.dialog_layout.addWidget(self.ip_text_label, 0, 1)
# Port number
self.port_label = QtWidgets.QLabel(edit_projector_dialog)
self.port_label.setObjectName('projector_edit_ip_label')
@ -111,8 +116,8 @@ class Ui_ProjectorEditForm(object):
title = translate('OpenLP.ProjectorEditForm', 'Edit Projector')
edit_projector_dialog.setWindowTitle(title)
self.ip_label.setText(translate('OpenLP.ProjectorEditForm', 'IP Address'))
self.ip_text.setText(self.projector.ip)
self.ip_text.setFocus()
self.ip_text_edit.setText(self.projector.ip)
self.ip_text_label.setText(self.projector.ip)
self.port_label.setText(translate('OpenLP.ProjectorEditForm', 'Port Number'))
self.port_text.setText(str(self.projector.port))
self.pin_label.setText(translate('OpenLP.ProjectorEditForm', 'PIN'))
@ -131,7 +136,7 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
Class to add or edit a projector entry in the database.
Fields that are editable:
ip = Column(String(100))
ip = Column(String(100)) (Only edit for new projector)
port = Column(String(8))
pin = Column(String(20))
name = Column(String(20))
@ -154,9 +159,16 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
if projector is None:
self.projector = Projector()
self.new_projector = True
self.ip_text_edit.setVisible(True)
self.ip_text_edit.setFocus()
self.ip_text_label.setVisible(False)
else:
self.projector = projector
self.new_projector = False
self.ip_text_edit.setVisible(False)
self.ip_text_label.setVisible(True)
# Since it's already defined, IP address is unchangeable, so focus on port number
self.port_text.setFocus()
self.retranslateUi(self)
reply = QtWidgets.QDialog.exec(self)
return reply
@ -187,30 +199,32 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
record=record.id)))
valid = False
return
adx = self.ip_text.text()
valid = verify_ip_address(adx)
if valid:
ip = self.projectordb.get_projector_by_ip(adx)
if ip is None:
valid = True
self.new_projector = True
elif ip.id != self.projector.id:
if self.new_projector:
# Only validate a new entry - otherwise it's been previously verified
adx = self.ip_text_edit.text()
valid = verify_ip_address(adx)
if valid:
# With a valid IP - check if it's already in database so we don't duplicate
ip = self.projectordb.get_projector_by_ip(adx)
if ip is None:
valid = True
self.new_projector = True
elif ip.id != self.projector.id:
QtWidgets.QMessageBox.warning(self,
translate('OpenLP.ProjectorWizard', 'Duplicate IP Address'),
translate('OpenLP.ProjectorWizard',
'IP address "{ip}"<br />is already in the database '
'as ID {data}.<br /><br />Please Enter a different '
'IP address.'.format(ip=adx, data=ip.id)))
return
else:
QtWidgets.QMessageBox.warning(self,
translate('OpenLP.ProjectorWizard', 'Duplicate IP Address'),
translate('OpenLP.ProjectorWizard', 'Invalid IP Address'),
translate('OpenLP.ProjectorWizard',
'IP address "{ip}"<br />is already in the database '
'as ID {data}.<br /><br />Please Enter a different '
'IP address.'.format(ip=adx, data=ip.id)))
'IP address "{ip}"<br>is not a valid IP address.'
'<br /><br />Please enter a valid IP address.'.format(ip=adx)))
valid = False
return
else:
QtWidgets.QMessageBox.warning(self,
translate('OpenLP.ProjectorWizard', 'Invalid IP Address'),
translate('OpenLP.ProjectorWizard',
'IP address "{ip}"<br>is not a valid IP address.'
'<br /><br />Please enter a valid IP address.'.format(ip=adx)))
valid = False
return
port = int(self.port_text.text())
if port < 1000 or port > 32767:
QtWidgets.QMessageBox.warning(self,
@ -223,7 +237,8 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
'Default PJLink port is {port}'.format(port=PJLINK_PORT)))
valid = False
if valid:
self.projector.ip = self.ip_text.text()
if self.new_projector:
self.projector.ip = self.ip_text_edit.text()
self.projector.pin = self.pin_text.text()
self.projector.port = int(self.port_text.text())
self.projector.name = self.name_text.text()

View File

@ -35,24 +35,9 @@ from openlp.core.common.registry import RegistryBase
from openlp.core.common.settings import Settings
from openlp.core.lib.ui import create_widget_action
from openlp.core.projectors import DialogSourceStyle
from openlp.core.projectors.constants import \
E_AUTHENTICATION, \
E_ERROR, \
E_NETWORK, \
E_NOT_CONNECTED, \
E_UNKNOWN_SOCKET_ERROR, \
S_CONNECTED, \
S_CONNECTING, \
S_COOLDOWN, \
S_INITIALIZE, \
S_NOT_CONNECTED, \
S_OFF, \
S_ON, \
S_STANDBY, \
S_WARMUP, \
STATUS_CODE, \
STATUS_MSG, \
QSOCKET_STATE
from openlp.core.projectors.constants import E_AUTHENTICATION, E_ERROR, E_NETWORK, E_NOT_CONNECTED, \
E_UNKNOWN_SOCKET_ERROR, S_CONNECTED, S_CONNECTING, S_COOLDOWN, S_INITIALIZE, S_NOT_CONNECTED, S_OFF, S_ON, \
S_STANDBY, S_WARMUP, STATUS_CODE, STATUS_MSG, QSOCKET_STATE
from openlp.core.projectors.db import ProjectorDB
from openlp.core.projectors.editform import ProjectorEditForm

View File

@ -57,8 +57,7 @@ from openlp.core.common.i18n import translate
from openlp.core.projectors.constants import CONNECTION_ERRORS, PJLINK_CLASS, PJLINK_DEFAULT_CODES, PJLINK_ERRORS, \
PJLINK_ERST_DATA, PJLINK_ERST_STATUS, PJLINK_MAX_PACKET, PJLINK_PREFIX, PJLINK_PORT, PJLINK_POWR_STATUS, \
PJLINK_SUFFIX, PJLINK_VALID_CMD, PROJECTOR_STATE, STATUS_CODE, STATUS_MSG, QSOCKET_STATE, \
E_AUTHENTICATION, E_CONNECTION_REFUSED, E_GENERAL, E_INVALID_DATA, E_NETWORK, E_NOT_CONNECTED, \
E_SOCKET_TIMEOUT, \
E_AUTHENTICATION, E_CONNECTION_REFUSED, E_GENERAL, E_NETWORK, E_NOT_CONNECTED, E_SOCKET_TIMEOUT, \
S_CONNECTED, S_CONNECTING, S_NOT_CONNECTED, S_OFF, S_OK, S_ON
log = logging.getLogger(__name__)
@ -93,22 +92,9 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
self.projector_list = projector_list
self.port = port
# Local defines
self.ackn_list = {} # Replies from online projetors
self.search_active = False
self.search_time = 30000 # 30 seconds for allowed time
self.search_timer = QtCore.QTimer()
# New commands available in PJLink Class 2
# ACKN/SRCH is processed here since it's used to find available projectors
# Other commands are processed by the individual projector instances
self.pjlink_udp_functions = {
'ACKN': self.process_ackn, # Class 2, command is 'SRCH'
'ERST': None, # Class 1/2
'INPT': None, # Class 1/2
'LKUP': None, # Class 2 (reply only - no cmd)
'POWR': None, # Class 1/2
'SRCH': self.process_srch # Class 2 (reply is ACKN)
}
self.readyRead.connect(self.get_datagram)
log.debug('(UDP) PJLinkUDP() Initialized')
@ -118,88 +104,26 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
Retrieve packet and basic checks
"""
log.debug('(UDP) get_datagram() - Receiving data')
read = self.pendingDatagramSize()
if read < 0:
log.warn('(UDP) No data (-1)')
read_size = self.pendingDatagramSize()
if read_size < 0:
log.warning('(UDP) No data (-1)')
return
if read < 1:
log.warn('(UDP) get_datagram() called when pending data size is 0')
if read_size < 1:
log.warning('(UDP) get_datagram() called when pending data size is 0')
return
data, peer_address, peer_port = self.readDatagram(self.pendingDatagramSize())
log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data),
adx=peer_address,
port=peer_port))
log.debug('(UDP) packet "{data}"'.format(data=data))
if len(data) < 0:
log.warn('(UDP) No data (-1)')
return
elif len(data) < 8:
# Minimum packet is '%2CCCC='
log.warn('(UDP) Invalid packet - not enough data')
return
elif data is None:
log.warn('(UDP) No data (None)')
return
elif len(data) > PJLINK_MAX_PACKET:
log.warn('(UDP) Invalid packet - length too long')
return
elif not data.startswith(PJLINK_PREFIX):
log.warn('(UDP) Invalid packet - does not start with PJLINK_PREFIX')
return
elif data[1] != '2':
log.warn('(UDP) Invalid packet - missing/invalid PJLink class version')
return
elif data[6] != '=':
log.warn('(UDP) Invalid packet - separator missing')
return
# First two characters are header information we don't need at this time
cmd, data = data[2:].split('=')
if cmd not in self.pjlink_udp_functions:
log.warn('(UDP) Invalid packet - not a valid PJLink UDP reply')
return
if self.pjlink_udp_functions[cmd] is not None:
log.debug('(UDP) Processing {cmd} with "{data}"'.format(cmd=cmd, data=data))
return self.pjlink_udp_functions[cmd](data=data, host=peer_address, port=peer_port)
else:
log.debug('(UDP) Checking projector list for ip {host} to process'.format(host=peer_address))
for projector in self.projector_list:
if peer_address == projector.ip:
if cmd not in projector.pjlink_functions:
log.error('(UDP) Could not find method to process '
'"{cmd}" in {host}'.format(cmd=cmd, host=projector.ip))
return
log.debug('(UDP) Calling "{cmd}" in {host}'.format(cmd=cmd, host=projector.ip))
return projector.pjlink_functions[cmd](data=data)
log.warn('(UDP) Could not find projector with ip {ip} to process packet'.format(ip=peer_address))
return
def process_ackn(self, data, host, port):
"""
Process the ACKN command.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.debug('(UDP) Processing ACKN packet')
if host not in self.ackn_list:
log.debug('(UDP) Adding {host} to ACKN list'.format(host=host))
self.ackn_list[host] = {'data': data,
'port': port}
else:
log.warn('(UDP) Host {host} already replied - ignoring'.format(host=host))
def process_srch(self, data, host, port):
"""
Process the SRCH command.
SRCH is processed by terminals so we ignore any packet.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.debug('(UDP) SRCH packet received - ignoring')
# Send to appropriate instance to process packet
log.debug('(UDP) Checking projector list for ip {host} to process'.format(host=peer_address))
for projector in self.projector_list:
if peer_address == projector.ip:
# Dispatch packet to appropriate remote instance
log.debug('(UDP) Dispatching packet to {host}'.format(host=projector.entry.name))
return projector.get_data(buff=data, ip=peer_address, host=peer_address, port=peer_port)
log.warning('(UDP) Could not find projector with ip {ip} to process packet'.format(ip=peer_address))
return
def search_start(self):
@ -224,6 +148,8 @@ class PJLinkCommands(object):
"""
Process replies from PJLink projector.
"""
# List of IP addresses and mac addresses found via UDP search command
ackn_list = []
def __init__(self, *args, **kwargs):
"""
@ -231,24 +157,47 @@ class PJLinkCommands(object):
"""
log.debug('PJlinkCommands(args={args} kwargs={kwargs})'.format(args=args, kwargs=kwargs))
super().__init__()
# Map PJLink command to method
# Map PJLink command to method and include pjlink class version for this instance
# Default initial pjlink class version is '1'
self.pjlink_functions = {
'AVMT': self.process_avmt,
'CLSS': self.process_clss,
'ERST': self.process_erst,
'INFO': self.process_info,
'INF1': self.process_inf1,
'INF2': self.process_inf2,
'INPT': self.process_inpt,
'INST': self.process_inst,
'LAMP': self.process_lamp,
'NAME': self.process_name,
'PJLINK': self.process_pjlink,
'POWR': self.process_powr,
'SNUM': self.process_snum,
'SVER': self.process_sver,
'RFIL': self.process_rfil,
'RLMP': self.process_rlmp
'ACKN': {"method": self.process_ackn, # Class 2 (command is SRCH)
"version": "2"},
'AVMT': {"method": self.process_avmt,
"version": "1"},
'CLSS': {"method": self.process_clss,
"version": "1"},
'ERST': {"method": self.process_erst,
"version": "1"},
'INFO': {"method": self.process_info,
"version": "1"},
'INF1': {"method": self.process_inf1,
"version": "1"},
'INF2': {"method": self.process_inf2,
"version": "1"},
'INPT': {"method": self.process_inpt,
"version": "1"},
'INST': {"method": self.process_inst,
"version": "1"},
'LAMP': {"method": self.process_lamp,
"version": "1"},
'LKUP': {"method": self.process_lkup, # Class 2 (reply only - no cmd)
"version": "2"},
'NAME': {"method": self.process_name,
"version": "1"},
'PJLINK': {"method": self.process_pjlink,
"version": "1"},
'POWR': {"method": self.process_powr,
"version": "1"},
'SNUM': {"method": self.process_snum,
"version": "1"},
'SRCH': {"method": self.process_srch, # Class 2 (reply is ACKN)
"version": "2"},
'SVER': {"method": self.process_sver,
"version": "1"},
'RFIL': {"method": self.process_rfil,
"version": "1"},
'RLMP': {"method": self.process_rlmp,
"version": "1"}
}
def reset_information(self):
@ -287,8 +236,11 @@ class PJLinkCommands(object):
self.send_busy = False
self.send_queue = []
self.priority_queue = []
# Reset default version in command routing dict
for cmd in self.pjlink_functions:
self.pjlink_functions[cmd]["version"] = PJLINK_VALID_CMD[cmd]['default']
def process_command(self, cmd, data):
def process_command(self, cmd, data, *args, **kwargs):
"""
Verifies any return error code. Calls the appropriate command handler.
@ -320,9 +272,25 @@ class PJLinkCommands(object):
return self.change_status(status=E_AUTHENTICATION)
# Command checks already passed
log.debug('({ip}) Calling function for {cmd}'.format(ip=self.entry.name, cmd=cmd))
self.pjlink_functions[cmd](data=data)
self.pjlink_functions[cmd]["method"](data=data, *args, **kwargs)
def process_avmt(self, data):
def process_ackn(self, data, host, port):
"""
Process the ACKN command.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.debug('({ip}) Processing ACKN packet'.format(ip=self.entry.name))
if host not in self.ackn_list:
log.debug('({ip}) Adding {host} to ACKN list'.format(ip=self.entry.name, host=host))
self.ackn_list[host] = {'data': data,
'port': port}
else:
log.warning('({ip}) Host {host} already replied - ignoring'.format(ip=self.entry.name, host=host))
def process_avmt(self, data, *args, **kwargs):
"""
Process shutter and speaker status. See PJLink specification for format.
Update self.mute (audio) and self.shutter (video shutter).
@ -351,7 +319,7 @@ class PJLinkCommands(object):
self.projectorUpdateIcons.emit()
return
def process_clss(self, data):
def process_clss(self, data, *args, **kwargs):
"""
PJLink class that this projector supports. See PJLink specification for format.
Updates self.class.
@ -367,12 +335,13 @@ class PJLinkCommands(object):
# Due to stupid projectors not following standards (Optoma, BenQ comes to mind),
# AND the different responses that can be received, the semi-permanent way to
# fix the class reply is to just remove all non-digit characters.
try:
clss = re.findall('\d', data)[0] # Should only be the first match
except IndexError:
chk = re.findall('\d', data)
if len(chk) < 1:
log.error('({ip}) No numbers found in class version reply "{data}" - '
'defaulting to class "1"'.format(ip=self.entry.name, data=data))
clss = '1'
else:
clss = chk[0] # Should only be the first match
elif not data.isdigit():
log.error('({ip}) NAN CLSS version reply "{data}" - '
'defaulting to class "1"'.format(ip=self.entry.name, data=data))
@ -383,6 +352,11 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting pjlink_class for this projector '
'to "{data}"'.format(ip=self.entry.name,
data=self.pjlink_class))
# Update method class versions
for cmd in self.pjlink_functions:
if self.pjlink_class in PJLINK_VALID_CMD[cmd]['version']:
self.pjlink_functions[cmd]['version'] = self.pjlink_class
# Since we call this one on first connect, setup polling from here
if not self.no_poll:
log.debug('({ip}) process_pjlink(): Starting timer'.format(ip=self.entry.name))
@ -391,7 +365,7 @@ class PJLinkCommands(object):
return
def process_erst(self, data):
def process_erst(self, data, *args, **kwargs):
"""
Error status. See PJLink Specifications for format.
Updates self.projector_errors
@ -443,7 +417,7 @@ class PJLinkCommands(object):
PJLINK_ERST_STATUS[other]
return
def process_inf1(self, data):
def process_inf1(self, data, *args, **kwargs):
"""
Manufacturer name set in projector.
Updates self.manufacturer
@ -455,7 +429,7 @@ class PJLinkCommands(object):
data=self.manufacturer))
return
def process_inf2(self, data):
def process_inf2(self, data, *args, **kwargs):
"""
Projector Model set in projector.
Updates self.model.
@ -466,7 +440,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector model to "{data}"'.format(ip=self.entry.name, data=self.model))
return
def process_info(self, data):
def process_info(self, data, *args, **kwargs):
"""
Any extra info set in projector.
Updates self.other_info.
@ -477,7 +451,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector other_info to "{data}"'.format(ip=self.entry.name, data=self.other_info))
return
def process_inpt(self, data):
def process_inpt(self, data, *args, **kwargs):
"""
Current source input selected. See PJLink specification for format.
Update self.source
@ -499,7 +473,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting data source to "{data}"'.format(ip=self.entry.name, data=self.source))
return
def process_inst(self, data):
def process_inst(self, data, *args, **kwargs):
"""
Available source inputs. See PJLink specification for format.
Updates self.source_available
@ -516,7 +490,7 @@ class PJLinkCommands(object):
data=self.source_available))
return
def process_lamp(self, data):
def process_lamp(self, data, *args, **kwargs):
"""
Lamp(s) status. See PJLink Specifications for format.
Data may have more than 1 lamp to process.
@ -542,7 +516,18 @@ class PJLinkCommands(object):
self.lamp = lamps
return
def process_name(self, data):
def process_lkup(self, data, host, port):
"""
Process reply indicating remote is available for connection
:param data: Data packet from remote
:param host: Remote IP address
:param port: Local port packet received on
"""
# TODO: Check if autoconnect is enabled and connect?
pass
def process_name(self, data, *args, **kwargs):
"""
Projector name set in projector.
Updates self.pjlink_name
@ -553,7 +538,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector PJLink name to "{data}"'.format(ip=self.entry.name, data=self.pjlink_name))
return
def process_pjlink(self, data):
def process_pjlink(self, data, *args, **kwargs):
"""
Process initial socket connection to terminal.
@ -594,7 +579,7 @@ class PJLinkCommands(object):
# Since this is an initial connection, make it a priority just in case
return self.send_command(cmd="CLSS", salt=data_hash, priority=True)
def process_powr(self, data):
def process_powr(self, data, *args, **kwargs):
"""
Power status. See PJLink specification for format.
Update self.power with status. Update icons if change from previous setting.
@ -617,7 +602,7 @@ class PJLinkCommands(object):
log.warning('({ip}) Unknown power response: "{data}"'.format(ip=self.entry.name, data=data))
return
def process_rfil(self, data):
def process_rfil(self, data, *args, **kwargs):
"""
Process replacement filter type
"""
@ -628,7 +613,7 @@ class PJLinkCommands(object):
log.warning('({ip}) Saved model: "{old}"'.format(ip=self.entry.name, old=self.model_filter))
log.warning('({ip}) New model: "{new}"'.format(ip=self.entry.name, new=data))
def process_rlmp(self, data):
def process_rlmp(self, data, *args, **kwargs):
"""
Process replacement lamp type
"""
@ -639,7 +624,7 @@ class PJLinkCommands(object):
log.warning('({ip}) Saved lamp: "{old}"'.format(ip=self.entry.name, old=self.model_lamp))
log.warning('({ip}) New lamp: "{new}"'.format(ip=self.entry.name, new=data))
def process_snum(self, data):
def process_snum(self, data, *args, **kwargs):
"""
Serial number of projector.
@ -659,7 +644,20 @@ class PJLinkCommands(object):
log.warning('({ip}) NOT saving serial number'.format(ip=self.entry.name))
self.serial_no_received = data
def process_sver(self, data):
def process_srch(self, data, host, port):
"""
Process the SRCH command.
SRCH is processed by terminals so we ignore any packet.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.warning('(UDP) SRCH packet received from {host} - ignoring'.format(host=host))
return
def process_sver(self, data, *args, **kwargs):
"""
Software version of projector
"""
@ -716,6 +714,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.pin = self.entry.pin
self.port = self.entry.port
self.pjlink_class = PJLINK_CLASS if self.entry.pjlink_class is None else self.entry.pjlink_class
self.ackn_list = {} # Replies from online projectors (Class 2 option)
self.db_update = False # Use to check if db needs to be updated prior to exiting
# Poll time 20 seconds unless called with something else
self.poll_time = 20000 if 'poll_time' not in kwargs else kwargs['poll_time'] * 1000
@ -916,7 +915,10 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
"""
Clean out extraneous stuff in the buffer.
"""
log.warning('({ip}) {message}'.format(ip=self.entry.name, message='Invalid packet' if msg is None else msg))
log.debug('({ip}) Cleaning buffer - msg = "{message}"'.format(ip=self.entry.name, message=msg))
if msg is None:
msg = 'Invalid packet'
log.warning('({ip}) {message}'.format(ip=self.entry.name, message=msg))
self.send_busy = False
trash_count = 0
while self.bytesAvailable() > 0:
@ -960,7 +962,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.socket_timer.stop()
return self.get_data(buff=read, ip=self.ip)
def get_data(self, buff, ip=None):
def get_data(self, buff, ip=None, *args, **kwargs):
"""
Process received data
@ -973,45 +975,61 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
ip = self.ip
log.debug('({ip}) get_data(ip="{ip_in}" buffer="{buff}"'.format(ip=self.entry.name, ip_in=ip, buff=buff))
# NOTE: Class2 has changed to some values being UTF-8
data_in = decode(buff, 'utf-8')
if isinstance(buff, bytes):
data_in = decode(buff, 'utf-8')
else:
data_in = buff
data = data_in.strip()
# Initial packet checks
if (len(data) < 7):
self._trash_buffer(msg='get_data(): Invalid packet - length')
return self.receive_data_signal()
elif len(data) > self.max_size:
self._trash_buffer(msg='get_data(): Invalid packet - too long')
self._trash_buffer(msg='get_data(): Invalid packet - too long ({length} bytes)'.format(length=len(data)))
return self.receive_data_signal()
elif not data.startswith(PJLINK_PREFIX):
self._trash_buffer(msg='get_data(): Invalid packet - PJLink prefix missing')
return self.receive_data_signal()
elif '=' not in data:
elif data[6] != '=':
self._trash_buffer(msg='get_data(): Invalid reply - Does not have "="')
return self.receive_data_signal()
log.debug('({ip}) get_data(): Checking new data "{data}"'.format(ip=self.entry.name, data=data))
header, data = data.split('=')
log.debug('({ip}) get_data() header="{header}" data="{data}"'.format(ip=self.entry.name,
header=header, data=data))
# At this point, the header should contain:
# "PVCCCC"
# Where:
# P = PJLINK_PREFIX
# V = PJLink class or version
# C = PJLink command
version, cmd = header[1], header[2:].upper()
log.debug('({ip}) get_data() version="{version}" cmd="{cmd}"'.format(ip=self.entry.name,
version=version, cmd=cmd))
# TODO: Below commented for now since it seems to cause issues with testing some invalid data.
# Revisit after more refactoring is finished.
'''
try:
version, cmd = header[1], header[2:].upper()
log.debug('({ip}) get_data() version="{version}" cmd="{cmd}"'.format(ip=self.entry.name,
version=version, cmd=cmd))
except ValueError as e:
self.change_status(E_INVALID_DATA)
log.warning('({ip}) get_data(): Received data: "{data}"'.format(ip=self.entry.name, data=data_in))
self._trash_buffer('get_data(): Expected header + command + data')
return self.receive_data_signal()
'''
if cmd not in PJLINK_VALID_CMD:
log.warning('({ip}) get_data(): Invalid packet - unknown command "{data}"'.format(ip=self.entry.name,
self._trash_buffer('get_data(): Invalid packet - unknown command "{data}"'.format(ip=self.entry.name,
data=cmd))
self._trash_buffer(msg='get_data(): Unknown command "{data}"'.format(data=cmd))
return self.receive_data_signal()
if int(self.pjlink_class) < int(version):
elif version not in PJLINK_VALID_CMD[cmd]['version']:
self._trash_buffer(msg='get_data() Command reply version does not match a valid command version')
return self.receive_data_signal()
elif int(self.pjlink_class) < int(version):
log.warning('({ip}) get_data(): Projector returned class reply higher '
'than projector stated class'.format(ip=self.entry.name))
self.process_command(cmd, data)
self.process_command(cmd, data, *args, **kwargs)
return self.receive_data_signal()
@QtCore.pyqtSlot(QtNetwork.QAbstractSocket.SocketError)
@ -1063,16 +1081,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
data=opts,
salt='' if salt is None
else ' with hash'))
cmd_ver = PJLINK_VALID_CMD[cmd]['version']
if self.pjlink_class in PJLINK_VALID_CMD[cmd]['version']:
header = PJLINK_HEADER.format(linkclass=self.pjlink_class)
elif len(cmd_ver) == 1 and (int(cmd_ver[0]) < int(self.pjlink_class)):
# Typically a class 1 only command
header = PJLINK_HEADER.format(linkclass=cmd_ver[0])
else:
# NOTE: Once we get to version 3 then think about looping
log.error('({ip}): send_command(): PJLink class check issue? Aborting'.format(ip=self.entry.name))
return
header = PJLINK_HEADER.format(linkclass=self.pjlink_functions[cmd]["version"])
out = '{salt}{header}{command} {options}{suffix}'.format(salt="" if salt is None else salt,
header=header,
command=cmd,

View File

@ -29,12 +29,15 @@ import os
import shutil
from tempfile import mkdtemp
from unittest import TestCase
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from openlp.core.common.registry import Registry
from openlp.core.lib.db import upgrade_db
from openlp.core.projectors import upgrade
from openlp.core.projectors.constants import PJLINK_PORT
from openlp.core.projectors.db import Manufacturer, Model, Projector, ProjectorDB, ProjectorSource, Source
from openlp.core.ui.mainwindow import MainWindow
from tests.helpers.testmixin import TestMixin
from tests.resources.projector.data import TEST_DB_PJLINK1, TEST_DB, TEST1_DATA, TEST2_DATA, TEST3_DATA
from tests.utils.constants import TEST_RESOURCES_PATH
@ -122,7 +125,7 @@ class TestProjectorDBUpdate(TestCase):
assert updated_to_version == latest_version, 'The projector DB should have been upgrade to the latest version'
class TestProjectorDB(TestCase):
class TestProjectorDB(TestCase, TestMixin):
"""
Test case for ProjectorDB
"""
@ -131,6 +134,33 @@ class TestProjectorDB(TestCase):
"""
Set up anything necessary for all tests
"""
# Create a test app to keep from segfaulting
Registry.create()
self.registry = Registry()
self.setup_application()
# Mock cursor busy/normal methods.
self.app.set_busy_cursor = MagicMock()
self.app.set_normal_cursor = MagicMock()
self.app.args = []
Registry().register('application', self.app)
Registry().set_flag('no_web_server', True)
# Mock classes and methods used by mainwindow.
with patch('openlp.core.ui.mainwindow.SettingsForm'), \
patch('openlp.core.ui.mainwindow.ImageManager'), \
patch('openlp.core.ui.mainwindow.LiveController'), \
patch('openlp.core.ui.mainwindow.PreviewController'), \
patch('openlp.core.ui.mainwindow.OpenLPDockWidget'), \
patch('openlp.core.ui.mainwindow.QtWidgets.QToolBox'), \
patch('openlp.core.ui.mainwindow.QtWidgets.QMainWindow.addDockWidget'), \
patch('openlp.core.ui.mainwindow.ServiceManager'), \
patch('openlp.core.ui.mainwindow.ThemeManager'), \
patch('openlp.core.ui.mainwindow.ProjectorManager'), \
patch('openlp.core.ui.mainwindow.Renderer'), \
patch('openlp.core.ui.mainwindow.websockets.WebSocketServer'), \
patch('openlp.core.ui.mainwindow.server.HttpServer'):
self.main_window = MainWindow()
# Create a temporary database directory and database
self.tmp_folder = mkdtemp(prefix='openlp_')
tmpdb_url = 'sqlite:///{db}'.format(db=os.path.join(self.tmp_folder, TEST_DB))
mocked_init_url.return_value = tmpdb_url
@ -139,9 +169,12 @@ class TestProjectorDB(TestCase):
def tearDown(self):
"""
Clean up
Delete all the C++ objects at the end so that we don't have a segfault
"""
self.projector.session.close()
self.projector = None
del self.main_window
# Ignore errors since windows can have problems with locked files
shutil.rmtree(self.tmp_folder, ignore_errors=True)

View File

@ -39,30 +39,31 @@ class TestPJLinkRouting(TestCase):
"""
Tests for the PJLink module command routing
"""
def test_get_data_unknown_command(self):
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_data_unknown_command(self, mock_log):
"""
Test not a valid command
"""
# GIVEN: Test object
with patch.object(openlp.core.projectors.pjlink, 'log') as mock_log, \
patch.object(openlp.core.projectors.pjlink.PJLink, '_trash_buffer') as mock_buffer:
pjlink = PJLink(Projector(**TEST1_DATA), no_poll=True)
pjlink.pjlink_functions = MagicMock()
log_warning_text = [call('({ip}) get_data(): Invalid packet - '
'unknown command "UNKN"'.format(ip=pjlink.name))]
log_debug_text = [call('(___TEST_ONE___) get_data(ip="111.111.111.111" buffer="%1UNKN=Huh?"'),
call('(___TEST_ONE___) get_data(): Checking new data "%1UNKN=Huh?"'),
call('(___TEST_ONE___) get_data() header="%1UNKN" data="Huh?"'),
call('(___TEST_ONE___) get_data() version="1" cmd="UNKN"'),
call('(___TEST_ONE___) Cleaning buffer - msg = "get_data(): '
'Invalid packet - unknown command "UNKN""'),
call('(___TEST_ONE___) Finished cleaning buffer - 0 bytes dropped')]
pjlink = PJLink(Projector(**TEST1_DATA), no_poll=True)
pjlink.pjlink_functions = MagicMock()
log_warning_text = [call('({ip}) get_data(): Invalid packet - '
'unknown command "UNK"'.format(ip=pjlink.name))]
log_debug_text = [call('({ip}) get_data(ip="111.111.111.111" '
'buffer="b\'%1UNK=Huh?\'"'.format(ip=pjlink.name)),
call('({ip}) get_data(): Checking new data "%1UNK=Huh?"'.format(ip=pjlink.name))]
# WHEN: get_data called with an unknown command
pjlink.get_data(buff='{prefix}1UNKN=Huh?'.format(prefix=PJLINK_PREFIX))
# WHEN: get_data called with an unknown command
pjlink.get_data(buff='{prefix}1UNK=Huh?'.format(prefix=PJLINK_PREFIX).encode('utf-8'))
# THEN: Appropriate log entries should have been made and methods called/not called
mock_log.debug.assert_has_calls(log_debug_text)
mock_log.warning.assert_has_calls(log_warning_text)
assert pjlink.pjlink_functions.called is False, 'Should not have accessed pjlink_functions'
assert mock_buffer.called is True, 'Should have called _trash_buffer'
# THEN: Appropriate log entries should have been made and methods called/not called
mock_log.warning.assert_has_calls(log_warning_text)
mock_log.debug.assert_has_calls(log_debug_text)
assert pjlink.pjlink_functions.called is False, 'Should not have accessed pjlink_functions'
def test_process_command_call_clss(self):
"""
@ -219,7 +220,6 @@ class TestPJLinkRouting(TestCase):
"""
Test command returned success
"""
# GIVEN: Initial mocks and data
# GIVEN: Test object and mocks
with patch.object(openlp.core.projectors.pjlink, 'log') as mock_log, \
patch.object(openlp.core.projectors.pjlink.PJLink, 'send_command') as mock_send_command, \

View File

@ -22,14 +22,14 @@
"""
Package to test the openlp.core.projectors.pjlink commands package.
"""
from unittest import TestCase
from unittest import TestCase, skip
from unittest.mock import patch, call
import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import S_CONNECTED, S_OFF, S_ON
from openlp.core.projectors.constants import PJLINK_PORT, S_CONNECTED, S_OFF, S_ON
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink
from tests.resources.projector.data import TEST_HASH, TEST_PIN, TEST_SALT, TEST1_DATA
from openlp.core.projectors.pjlink import PJLink, PJLinkUDP
from tests.resources.projector.data import TEST_HASH, TEST_PIN, TEST_SALT, TEST1_DATA, TEST2_DATA
class TestPJLinkCommands(TestCase):
@ -235,3 +235,114 @@ class TestPJLinkCommands(TestCase):
mock_log.error.assert_has_calls(log_check)
assert 1 == mock_disconnect_from_host.call_count, 'Should have only been called once'
mock_send_command.assert_not_called()
@skip('Change to pjlink_udp.get_datagram() call')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_duplicate(self, mock_log):
"""
Test process_ackn method with multiple calls with same data
"""
# TODO: Change this to call pjlink_udp.get_datagram() so ACKN can be processed properly
# GIVEN: Test setup
pjlink = PJLink(projector=self.test_list[0])
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_warn_calls = [call('(___TEST_ONE___) Host {host} already replied - '
'ignoring'.format(host=TEST1_DATA['ip']))]
log_debug_calls = [call('PJlinkCommands(args=() kwargs={})'),
call('(___TEST_ONE___) reset_information() connect status is S_NOT_CONNECTED'),
call('(___TEST_ONE___) Processing ACKN packet'),
call('(___TEST_ONE___) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(___TEST_ONE___) Processing ACKN packet')]
# WHEN: process_ackn called twice with same data
pjlink.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink.ackn_list)
print('test_list: ', check_list, '\n')
assert pjlink.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warning.assert_has_calls(log_warn_calls)
@skip('Change to pjlink_udp.get_datagram() call')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_multiple(self, mock_log):
"""
Test process_ackn method with multiple calls
"""
# TODO: Change this to call pjlink_udp.get_datagram() so ACKN can be processed properly
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT},
TEST2_DATA['ip']: {'data': TEST2_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST2_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink_udp.process_ackn(data=TEST2_DATA['mac_adx'], host=TEST2_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@skip('Change to pjlink_udp.get_datagram() call')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_single(self, mock_log):
"""
Test process_ackn method with single call
"""
# TODO: Change this to call pjlink_udp.get_datagram() so ACKN can be processed properly
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@skip('Change to pjlink_udp.get_datagram() call')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_srch(self, mock_log):
"""
Test process_srch method
"""
# TODO: Change this to call pjlink_udp.get_datagram() so ACKN can be processed properly
# GIVEN: Test setup
log_warn_calls = [call('(UDP) SRCH packet received from {ip} - ignoring'.format(ip=TEST1_DATA['ip'])), ]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), ]
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
# WHEN: process_srch called
pjlink_udp.process_srch(data=None, host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: log entries should be entered
mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)

View File

@ -28,10 +28,10 @@ from unittest import TestCase
from unittest.mock import call, patch
import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import PJLINK_MAX_PACKET, PJLINK_PORT, PJLINK_PREFIX
from openlp.core.projectors.constants import PJLINK_PORT
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLinkUDP
from openlp.core.projectors.pjlink import PJLinkUDP, PJLink
from tests.resources.projector.data import TEST1_DATA, TEST2_DATA
@ -43,7 +43,8 @@ class TestPJLinkBase(TestCase):
"""
Setup generic test conditions
"""
self.test_list = [Projector(**TEST1_DATA), Projector(**TEST2_DATA)]
self.test_list = [PJLink(projector=Projector(**TEST1_DATA)),
PJLink(projector=Projector(**TEST2_DATA))]
def tearDown(self):
"""
@ -51,132 +52,6 @@ class TestPJLinkBase(TestCase):
"""
self.test_list = None
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_class(self, mock_log):
"""
Test get_datagram with invalid class number
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - missing/invalid PJLink class version')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%1ACKN=11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}1ACKN={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_command(self, mock_log):
"""
Test get_datagram with invalid PJLink UDP command
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - not a valid PJLink UDP reply')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%2DUMB=11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}2DUMB={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_prefix(self, mock_log):
"""
Test get_datagram when prefix != PJLINK_PREFIX
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - does not start with PJLINK_PREFIX')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "$2ACKN=11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}2ACKN={mac}'.format(prefix='$', mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_separator(self, mock_log):
"""
Test get_datagram when separator not equal to =
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - separator missing')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%2ACKN 11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}2ACKN {mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_long(self, mock_log):
"""
Test get_datagram when datagram > PJLINK_MAX_PACKET
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - length too long')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 143 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%2ACKN={long}"'.format(long='X' * PJLINK_MAX_PACKET))]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = PJLINK_MAX_PACKET + 7
mock_read.return_value = ('{prefix}2ACKN={long}'.format(prefix=PJLINK_PREFIX,
long='X' * PJLINK_MAX_PACKET),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_negative_zero_length(self, mock_log):
"""
@ -196,7 +71,7 @@ class TestPJLinkBase(TestCase):
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls)
mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
@ -206,41 +81,18 @@ class TestPJLinkBase(TestCase):
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - not enough data')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')]
log_warn_calls = [call('(UDP) get_datagram() called when pending data size is 0')]
log_debug_calls = [call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 1
mock_datagram.return_value = 0
mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_short(self, mock_log):
"""
Test get_datagram when data length < 8
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - not enough data')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 6
mock_read.return_value = ('{prefix}2ACKN'.format(prefix=PJLINK_PREFIX), TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls)
mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
@ -260,101 +112,5 @@ class TestPJLinkBase(TestCase):
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls)
mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_duplicate(self, mock_log):
"""
Test process_ackn method with multiple calls with same data
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_warn_calls = [call('(UDP) Host {host} already replied - ignoring'.format(host=TEST1_DATA['ip']))]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(UDP) Processing ACKN packet')]
# WHEN: process_ackn called twice with same data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_multiple(self, mock_log):
"""
Test process_ackn method with multiple calls
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT},
TEST2_DATA['ip']: {'data': TEST2_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST2_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink_udp.process_ackn(data=TEST2_DATA['mac_adx'], host=TEST2_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_single(self, mock_log):
"""
Test process_ackn method with single call
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_srch(self, mock_log):
"""
Test process_srch method
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) SRCH packet received - ignoring')]
# WHEN: process_srch called
pjlink_udp.process_srch(data=None, host=None, port=None)
# THEN: debug log entry should be entered
mock_log.debug.assert_has_calls(log_debug_calls)