PJLink2 update R

This commit is contained in:
Ken Roberts 2018-05-03 07:58:50 -07:00
parent 4b5e03a357
commit 991b2880bf
8 changed files with 259 additions and 133 deletions

View File

@ -199,6 +199,7 @@ class Settings(QtCore.QSettings):
'projector/db database': '',
'projector/enable': True,
'projector/connect on start': False,
'projector/connect when LKUP received': True, # PJLink v2: Projector sends LKUP command after it powers up
'projector/last directory import': None,
'projector/last directory export': None,
'projector/poll time': 20, # PJLink timeout is 30 seconds

View File

@ -37,7 +37,7 @@ from openlp.core.lib.ui import create_widget_action
from openlp.core.projectors import DialogSourceStyle
from openlp.core.projectors.constants import E_AUTHENTICATION, E_ERROR, E_NETWORK, E_NOT_CONNECTED, \
E_UNKNOWN_SOCKET_ERROR, S_CONNECTED, S_CONNECTING, S_COOLDOWN, S_INITIALIZE, S_NOT_CONNECTED, S_OFF, S_ON, \
S_STANDBY, S_WARMUP, STATUS_CODE, STATUS_MSG, QSOCKET_STATE
S_STANDBY, S_WARMUP, PJLINK_PORT, STATUS_CODE, STATUS_MSG, QSOCKET_STATE
from openlp.core.projectors.db import ProjectorDB
from openlp.core.projectors.editform import ProjectorEditForm
@ -294,6 +294,9 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
self.projectordb = projectordb
self.projector_list = []
self.source_select_form = None
# Dictionary of PJLinkUDP objects to listen for UDP broadcasts from PJLink 2+ projectors.
# Key is port number that projectors use
self.pjlink_udp = {}
def bootstrap_initialise(self):
"""
@ -307,12 +310,15 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
else:
log.debug('Using existing ProjectorDB() instance')
self.get_settings()
self.pjlink_udp = PJLinkUDP(self.projector_list)
def bootstrap_post_set_up(self):
"""
Post-initialize setups.
"""
# Default PJLink port UDP socket
log.debug('Creating PJLinkUDP listener for default port {port}'.format(port=PJLINK_PORT))
self.pjlink_udp = {PJLINK_PORT: PJLinkUDP(port=PJLINK_PORT)}
self.pjlink_udp[PJLINK_PORT].bind(PJLINK_PORT)
# Set 1.5 second delay before loading all projectors
if self.autostart:
log.debug('Delaying 1.5 seconds before loading all projectors')
@ -513,6 +519,14 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
projector.socket_timer.timeout.disconnect(projector.link.socket_abort)
except (AttributeError, TypeError):
pass
# Disconnect signals from projector being deleted
if self.pjlink_udp[projector.port]:
try:
self.pjlink_udp[projector.port].data_received.disconnect(projector.get_buffer)
except (AttributeError, TypeError):
pass
# Rebuild projector list
new_list = []
for item in self.projector_list:
if item.link.db_item.id == projector.link.db_item.id:
@ -726,6 +740,15 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
item.link.projectorAuthentication.connect(self.authentication_error)
item.link.projectorNoAuthentication.connect(self.no_authentication_error)
item.link.projectorUpdateIcons.connect(self.update_icons)
# Connect UDP signal to projector instances with same port
if item.link.port not in self.pjlink_udp:
log.debug('Adding new PJLinkUDP listener fo port {port}'.format(port=item.link.port))
self.pjlink_udp[item.link.port] = PJLinkUDP(port=item.link.port)
self.pjlink_udp[item.link.port].bind(item.link.port)
log.debug('Connecting PJLinkUDP port {port} signal to "{item}"'.format(port=item.link.port,
item=item.link.name))
self.pjlink_udp[item.link.port].data_received.connect(item.link.get_buffer)
self.projector_list.append(item)
if start:
item.link.connect_to_host()

View File

@ -54,6 +54,7 @@ from PyQt5 import QtCore, QtNetwork
from openlp.core.common import qmd5_hash
from openlp.core.common.i18n import translate
from openlp.core.common.settings import Settings
from openlp.core.projectors.constants import CONNECTION_ERRORS, PJLINK_CLASS, PJLINK_DEFAULT_CODES, PJLINK_ERRORS, \
PJLINK_ERST_DATA, PJLINK_ERST_STATUS, PJLINK_MAX_PACKET, PJLINK_PREFIX, PJLINK_PORT, PJLINK_POWR_STATUS, \
PJLINK_SUFFIX, PJLINK_VALID_CMD, PROJECTOR_STATE, STATUS_CODE, STATUS_MSG, QSOCKET_STATE, \
@ -78,25 +79,27 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
"""
Socket service for PJLink UDP socket.
"""
def __init__(self, projector_list, port=PJLINK_PORT):
data_received = QtCore.pyqtSignal(QtNetwork.QHostAddress, int, str, name='udp_data') # host, port, data
def __init__(self, port=PJLINK_PORT):
"""
Socket services for PJLink UDP packets.
Since all UDP packets from any projector will come into the same
port, process UDP packets here then route to the appropriate
projector instance as needed.
:param port: UDP port to listen on
"""
# Keep track of currently defined projectors so we can route
# inbound packets to the correct instance
super().__init__()
self.projector_list = projector_list
self.port = port
# Local defines
self.search_active = False
self.search_time = 30000 # 30 seconds for allowed time
self.search_timer = QtCore.QTimer()
self.readyRead.connect(self.get_datagram)
log.debug('(UDP) PJLinkUDP() Initialized')
log.debug('(UDP) PJLinkUDP() Initialized for port {port}'.format(port=self.port))
@QtCore.pyqtSlot()
def get_datagram(self):
@ -105,25 +108,23 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
"""
log.debug('(UDP) get_datagram() - Receiving data')
read_size = self.pendingDatagramSize()
if read_size < 0:
if -1 == read_size:
log.warning('(UDP) No data (-1)')
return
if read_size < 1:
elif 0 == read_size:
log.warning('(UDP) get_datagram() called when pending data size is 0')
return
data, peer_address, peer_port = self.readDatagram(self.pendingDatagramSize())
elif read_size > PJLINK_MAX_PACKET:
log.warning('(UDP) UDP Packet too large ({size} bytes)- ignoring'.format(size=read_size))
return
data_in, peer_host, peer_port = self.readDatagram(read_size)
data = data_in.decode('utf-8') if isinstance(data_in, bytes) else data_in
log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data),
adx=peer_address,
port=peer_port))
adx=peer_host.toString(),
port=self.port))
log.debug('(UDP) packet "{data}"'.format(data=data))
# Send to appropriate instance to process packet
log.debug('(UDP) Checking projector list for ip {host} to process'.format(host=peer_address))
for projector in self.projector_list:
if peer_address == projector.ip:
# Dispatch packet to appropriate remote instance
log.debug('(UDP) Dispatching packet to {host}'.format(host=projector.entry.name))
return projector.get_data(buff=data, ip=peer_address, host=peer_address, port=peer_port)
log.warning('(UDP) Could not find projector with ip {ip} to process packet'.format(ip=peer_address))
log.debug('(UDP) Sending data_received signal to projectors')
self.data_received.emit(peer_host, self.localPort(), data)
return
def search_start(self):
@ -131,7 +132,6 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
Start search for projectors on local network
"""
self.search_active = True
self.ackn_list = {}
# TODO: Send SRCH packet here
self.search_timer.singleShot(self.search_time, self.search_stop)
@ -240,7 +240,7 @@ class PJLinkCommands(object):
for cmd in self.pjlink_functions:
self.pjlink_functions[cmd]["version"] = PJLINK_VALID_CMD[cmd]['default']
def process_command(self, cmd, data, *args, **kwargs):
def process_command(self, cmd, data):
"""
Verifies any return error code. Calls the appropriate command handler.
@ -272,25 +272,18 @@ class PJLinkCommands(object):
return self.change_status(status=E_AUTHENTICATION)
# Command checks already passed
log.debug('({ip}) Calling function for {cmd}'.format(ip=self.entry.name, cmd=cmd))
self.pjlink_functions[cmd]["method"](data=data, *args, **kwargs)
self.pjlink_functions[cmd]["method"](data=data)
def process_ackn(self, data, host, port):
def process_ackn(self, data):
"""
Process the ACKN command.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.debug('({ip}) Processing ACKN packet'.format(ip=self.entry.name))
if host not in self.ackn_list:
log.debug('({ip}) Adding {host} to ACKN list'.format(ip=self.entry.name, host=host))
self.ackn_list[host] = {'data': data,
'port': port}
else:
log.warning('({ip}) Host {host} already replied - ignoring'.format(ip=self.entry.name, host=host))
# TODO: Have to rethink this one
pass
def process_avmt(self, data, *args, **kwargs):
def process_avmt(self, data):
"""
Process shutter and speaker status. See PJLink specification for format.
Update self.mute (audio) and self.shutter (video shutter).
@ -319,7 +312,7 @@ class PJLinkCommands(object):
self.projectorUpdateIcons.emit()
return
def process_clss(self, data, *args, **kwargs):
def process_clss(self, data):
"""
PJLink class that this projector supports. See PJLink specification for format.
Updates self.class.
@ -365,7 +358,7 @@ class PJLinkCommands(object):
return
def process_erst(self, data, *args, **kwargs):
def process_erst(self, data):
"""
Error status. See PJLink Specifications for format.
Updates self.projector_errors
@ -417,7 +410,7 @@ class PJLinkCommands(object):
PJLINK_ERST_STATUS[other]
return
def process_inf1(self, data, *args, **kwargs):
def process_inf1(self, data):
"""
Manufacturer name set in projector.
Updates self.manufacturer
@ -429,7 +422,7 @@ class PJLinkCommands(object):
data=self.manufacturer))
return
def process_inf2(self, data, *args, **kwargs):
def process_inf2(self, data):
"""
Projector Model set in projector.
Updates self.model.
@ -440,7 +433,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector model to "{data}"'.format(ip=self.entry.name, data=self.model))
return
def process_info(self, data, *args, **kwargs):
def process_info(self, data):
"""
Any extra info set in projector.
Updates self.other_info.
@ -451,7 +444,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector other_info to "{data}"'.format(ip=self.entry.name, data=self.other_info))
return
def process_inpt(self, data, *args, **kwargs):
def process_inpt(self, data):
"""
Current source input selected. See PJLink specification for format.
Update self.source
@ -473,7 +466,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting data source to "{data}"'.format(ip=self.entry.name, data=self.source))
return
def process_inst(self, data, *args, **kwargs):
def process_inst(self, data):
"""
Available source inputs. See PJLink specification for format.
Updates self.source_available
@ -490,7 +483,7 @@ class PJLinkCommands(object):
data=self.source_available))
return
def process_lamp(self, data, *args, **kwargs):
def process_lamp(self, data):
"""
Lamp(s) status. See PJLink Specifications for format.
Data may have more than 1 lamp to process.
@ -516,18 +509,22 @@ class PJLinkCommands(object):
self.lamp = lamps
return
def process_lkup(self, data, host, port):
def process_lkup(self, data):
"""
Process reply indicating remote is available for connection
:param data: Data packet from remote
:param host: Remote IP address
:param port: Local port packet received on
"""
# TODO: Check if autoconnect is enabled and connect?
pass
log.debug('({ip}) Processing LKUP command'.format(ip=self.entry.name))
settings = Settings()
settings.beginGroup(self.settings_section)
autostart = settings.value('connect when LKUP received')
settings.endGroup()
del settings
if autostart:
self.connect_to_host()
def process_name(self, data, *args, **kwargs):
def process_name(self, data):
"""
Projector name set in projector.
Updates self.pjlink_name
@ -538,7 +535,7 @@ class PJLinkCommands(object):
log.debug('({ip}) Setting projector PJLink name to "{data}"'.format(ip=self.entry.name, data=self.pjlink_name))
return
def process_pjlink(self, data, *args, **kwargs):
def process_pjlink(self, data):
"""
Process initial socket connection to terminal.
@ -579,7 +576,7 @@ class PJLinkCommands(object):
# Since this is an initial connection, make it a priority just in case
return self.send_command(cmd="CLSS", salt=data_hash, priority=True)
def process_powr(self, data, *args, **kwargs):
def process_powr(self, data):
"""
Power status. See PJLink specification for format.
Update self.power with status. Update icons if change from previous setting.
@ -602,7 +599,7 @@ class PJLinkCommands(object):
log.warning('({ip}) Unknown power response: "{data}"'.format(ip=self.entry.name, data=data))
return
def process_rfil(self, data, *args, **kwargs):
def process_rfil(self, data):
"""
Process replacement filter type
"""
@ -613,7 +610,7 @@ class PJLinkCommands(object):
log.warning('({ip}) Saved model: "{old}"'.format(ip=self.entry.name, old=self.model_filter))
log.warning('({ip}) New model: "{new}"'.format(ip=self.entry.name, new=data))
def process_rlmp(self, data, *args, **kwargs):
def process_rlmp(self, data):
"""
Process replacement lamp type
"""
@ -624,7 +621,7 @@ class PJLinkCommands(object):
log.warning('({ip}) Saved lamp: "{old}"'.format(ip=self.entry.name, old=self.model_lamp))
log.warning('({ip}) New lamp: "{new}"'.format(ip=self.entry.name, new=data))
def process_snum(self, data, *args, **kwargs):
def process_snum(self, data):
"""
Serial number of projector.
@ -644,20 +641,18 @@ class PJLinkCommands(object):
log.warning('({ip}) NOT saving serial number'.format(ip=self.entry.name))
self.serial_no_received = data
def process_srch(self, data, host, port):
def process_srch(self, data):
"""
Process the SRCH command.
SRCH is processed by terminals so we ignore any packet.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.warning('(UDP) SRCH packet received from {host} - ignoring'.format(host=host))
log.warning("({ip}) SRCH packet detected - ignoring".format(ip=self.entry.ip))
return
def process_sver(self, data, *args, **kwargs):
def process_sver(self, data):
"""
Software version of projector
"""
@ -705,14 +700,16 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
args=args,
kwargs=kwargs))
super().__init__()
self.settings_section = 'projector'
self.entry = projector
self.ip = self.entry.ip
self.qhost = QtNetwork.QHostAddress(self.ip)
self.location = self.entry.location
self.mac_adx = self.entry.mac_adx
self.name = self.entry.name
self.notes = self.entry.notes
self.pin = self.entry.pin
self.port = self.entry.port
self.port = int(self.entry.port)
self.pjlink_class = PJLINK_CLASS if self.entry.pjlink_class is None else self.entry.pjlink_class
self.ackn_list = {} # Replies from online projectors (Class 2 option)
self.db_update = False # Use to check if db needs to be updated prior to exiting
@ -928,19 +925,21 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
count=trash_count))
return
@QtCore.pyqtSlot(str, str)
def get_buffer(self, data, ip):
@QtCore.pyqtSlot(QtNetwork.QHostAddress, int, str, name='udp_data') # host, port, data
def get_buffer(self, host, port, data):
"""
Get data from somewhere other than TCP socket
:param host: QHostAddress of sender
:param port: Destination port
:param data: Data to process. buffer must be formatted as a proper PJLink packet.
:param ip: Destination IP for buffer.
"""
log.debug('({ip}) get_buffer(data="{buff}" ip="{ip_in}"'.format(ip=self.entry.name, buff=data, ip_in=ip))
if ip is None:
log.debug("({ip}) get_buffer() Don't know who data is for - exiting".format(ip=self.entry.name))
return
return self.get_data(buff=data, ip=ip)
if (port == int(self.port)) and (host.isEqual(self.qhost)):
log.debug('({ip}) Received data from {host}'.format(ip=self.entry.name, host=host.toString()))
log.debug('({ip}) get_buffer(data="{buff}")'.format(ip=self.entry.name, buff=data))
return self.get_data(buff=data)
else:
log.debug('({ip}) Ignoring data for {host} - not me'.format(ip=self.entry.name, host=host.toString()))
@QtCore.pyqtSlot()
def get_socket(self):
@ -960,20 +959,16 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
log.debug('({ip}) get_socket(): No data available (-1)'.format(ip=self.entry.name))
return self.receive_data_signal()
self.socket_timer.stop()
return self.get_data(buff=read, ip=self.ip)
return self.get_data(buff=read)
def get_data(self, buff, ip=None, *args, **kwargs):
def get_data(self, buff, *args, **kwargs):
"""
Process received data
:param buff: Data to process.
:param ip: (optional) Destination IP.
"""
# Since "self" is not available to options and the "ip" keyword is a "maybe I'll use in the future",
# set to default here
if ip is None:
ip = self.ip
log.debug('({ip}) get_data(ip="{ip_in}" buffer="{buff}"'.format(ip=self.entry.name, ip_in=ip, buff=buff))
log.debug('({ip}) get_data(buffer="{buff}"'.format(ip=self.entry.name, buff=buff))
ignore_class = 'ignore_class' in kwargs
# NOTE: Class2 has changed to some values being UTF-8
if isinstance(buff, bytes):
data_in = decode(buff, 'utf-8')
@ -990,7 +985,9 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
elif not data.startswith(PJLINK_PREFIX):
self._trash_buffer(msg='get_data(): Invalid packet - PJLink prefix missing')
return self.receive_data_signal()
elif data[6] != '=':
elif data[6] != '=' and data[8] != '=':
# data[6] = standard command packet
# data[8] = initial PJLink connection (after mangling)
self._trash_buffer(msg='get_data(): Invalid reply - Does not have "="')
return self.receive_data_signal()
log.debug('({ip}) get_data(): Checking new data "{data}"'.format(ip=self.entry.name, data=data))
@ -1020,16 +1017,16 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
return self.receive_data_signal()
'''
if cmd not in PJLINK_VALID_CMD:
self._trash_buffer('get_data(): Invalid packet - unknown command "{data}"'.format(ip=self.entry.name,
data=cmd))
self._trash_buffer('get_data(): Invalid packet - unknown command "{data}"'.format(data=cmd))
return self.receive_data_signal()
elif version not in PJLINK_VALID_CMD[cmd]['version']:
self._trash_buffer(msg='get_data() Command reply version does not match a valid command version')
return self.receive_data_signal()
elif int(self.pjlink_class) < int(version):
if not ignore_class:
log.warning('({ip}) get_data(): Projector returned class reply higher '
'than projector stated class'.format(ip=self.entry.name))
self.process_command(cmd, data, *args, **kwargs)
self.process_command(cmd, data)
return self.receive_data_signal()
@QtCore.pyqtSlot(QtNetwork.QAbstractSocket.SocketError)
@ -1107,11 +1104,18 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
"""
Socket interface to send data. If data=None, then check queue.
:param data: Immediate data to send
:param data: Immediate data to send (Optional)
:param utf8: Send as UTF-8 string otherwise send as ASCII string
"""
# Funny looking data check, but it's a quick check for data=None
log.debug('({ip}) _send_command(data="{data}")'.format(ip=self.entry.name, data=data.strip() if data else data))
if not data and not self.priority_queue and not self.send_queue:
log.debug('({ip}) _send_command(): Nothing to send - returning'.format(ip=self.entry.name))
return
log.debug('({ip}) _send_command(data="{data}")'.format(ip=self.entry.name,
data=data.strip() if data else data))
log.debug('({ip}) _send_command(): priority_queue: {queue}'.format(ip=self.entry.name,
queue=self.priority_queue))
log.debug('({ip}) _send_command(): send_queue: {queue}'.format(ip=self.entry.name,
queue=self.send_queue))
conn_state = STATUS_CODE[QSOCKET_STATE[self.state()]]
log.debug('({ip}) _send_command(): Connection status: {data}'.format(ip=self.entry.name,
data=conn_state))
@ -1149,9 +1153,9 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.waitForBytesWritten(2000) # 2 seconds should be enough
if sent == -1:
# Network error?
log.warning('({ip}) _send_command(): -1 received - disconnecting from host'.format(ip=self.entry.name))
self.change_status(E_NETWORK,
translate('OpenLP.PJLink', 'Error while sending data to projector'))
log.warning('({ip}) _send_command(): -1 received - disconnecting from host'.format(ip=self.entry.name))
self.disconnect_from_host()
def connect_to_host(self):
@ -1164,7 +1168,7 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
return
self.error_status = S_OK
self.change_status(S_CONNECTING)
self.connectToHost(self.ip, self.port if isinstance(self.port, int) else int(self.port))
self.connectToHost(self.ip, self.port)
@QtCore.pyqtSlot()
def disconnect_from_host(self, abort=False):
@ -1177,13 +1181,15 @@ class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
self.abort()
else:
log.warning('({ip}) disconnect_from_host(): Not connected'.format(ip=self.entry.name))
self.disconnectFromHost()
try:
self.readyRead.disconnect(self.get_socket)
except TypeError:
pass
log.debug('({ip}) disconnect_from_host() '
# Since we already know what's happening, just log it for reference.
log.debug('({ip}) disconnect_from_host(): Issue detected with '
'readyRead.disconnect'.format(ip=self.entry.name))
log.debug('({ip}) disconnect_from_host(): '
'Current status {data}'.format(ip=self.entry.name, data=self._get_status(self.status_connect)[0]))
self.disconnectFromHost()
if abort:
self.change_status(E_NOT_CONNECTED)
else:

View File

@ -89,6 +89,10 @@ class ProjectorTab(SettingsTab):
self.connect_box_layout.addRow(self.dialog_type_label, self.dialog_type_combo_box)
self.left_layout.addStretch()
self.dialog_type_combo_box.activated.connect(self.on_dialog_type_combo_box_changed)
# Connect on LKUP packet received (PJLink v2+ only)
self.connect_on_linkup = QtWidgets.QCheckBox(self.connect_box)
self.connect_on_linkup.setObjectName('connect_on_linkup')
self.connect_box_layout.addRow(self.connect_on_linkup)
def retranslateUi(self):
"""
@ -109,6 +113,8 @@ class ProjectorTab(SettingsTab):
translate('OpenLP.ProjectorTab', 'Tabbed dialog box'))
self.dialog_type_combo_box.setItemText(DialogSourceStyle.Single,
translate('OpenLP.ProjectorTab', 'Single dialog box'))
self.connect_on_linkup.setText(
translate('OpenLP.ProjectorTab', 'Connect to projector when LINKUP received (v2 only)'))
def load(self):
"""
@ -120,6 +126,7 @@ class ProjectorTab(SettingsTab):
self.socket_timeout_spin_box.setValue(settings.value('socket timeout'))
self.socket_poll_spin_box.setValue(settings.value('poll time'))
self.dialog_type_combo_box.setCurrentIndex(settings.value('source dialog type'))
self.connect_on_linkup.setChecked(settings.value('connect when LKUP received'))
settings.endGroup()
def save(self):
@ -132,6 +139,7 @@ class ProjectorTab(SettingsTab):
settings.setValue('socket timeout', self.socket_timeout_spin_box.value())
settings.setValue('poll time', self.socket_poll_spin_box.value())
settings.setValue('source dialog type', self.dialog_type_combo_box.currentIndex())
settings.setValue('connect when LKUP received', self.connect_on_linkup.isChecked())
settings.endGroup()
def on_dialog_type_combo_box_changed(self):

View File

@ -27,23 +27,13 @@ from unittest.mock import call, patch, MagicMock
import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import \
E_NOT_CONNECTED, \
E_PARAMETER, \
E_UNKNOWN_SOCKET_ERROR, \
STATUS_CODE, \
STATUS_MSG, \
S_CONNECTED, \
S_CONNECTING, \
S_NOT_CONNECTED, \
S_OK, \
S_ON, \
E_NOT_CONNECTED, E_PARAMETER, E_UNKNOWN_SOCKET_ERROR, STATUS_CODE, STATUS_MSG, \
S_CONNECTED, S_CONNECTING, S_NOT_CONNECTED, S_OK, S_ON, \
QSOCKET_STATE
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink
from tests.resources.projector.data import TEST1_DATA
pjlink_test = PJLink(Projector(**TEST1_DATA), no_poll=True)
class TestPJLinkBase(TestCase):
"""

View File

@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2015 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
Package to test the openlp.core.projectors.pjlink base package.
"""
from unittest import TestCase
from unittest.mock import call, patch
import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import S_NOT_CONNECTED
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLink
from tests.resources.projector.data import TEST1_DATA
class TestPJLinkBase(TestCase):
"""
Tests for the PJLink module
"""
@patch.object(openlp.core.projectors.pjlink.PJLink, 'state')
@patch.object(openlp.core.projectors.pjlink.PJLink, 'reset_information')
@patch.object(openlp.core.projectors.pjlink.PJLink, '_send_command')
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_send_command_no_data(self, mock_log, mock_send_command, mock_reset, mock_state):
"""
Test _send_command with no data to send
"""
# GIVEN: Test object
log_warning_calls = [call('({ip}) send_command(): Not connected - returning'.format(ip=TEST1_DATA['name']))]
log_debug_calls = [call('PJlink(projector="< Projector(id="None", ip="111.111.111.111", port="1111", '
'mac_adx="11:11:11:11:11:11", pin="1111", name="___TEST_ONE___", '
'location="location one", notes="notes one", pjlink_name="None", '
'pjlink_class="None", manufacturer="None", model="None", '
'serial_no="Serial Number 1", other="None", sources="None", source_list="[]", '
'model_filter="Filter type 1", model_lamp="Lamp type 1", '
'sw_version="Version 1") >", args="()" kwargs="{\'no_poll\': True}")'),
call('PJlinkCommands(args=() kwargs={})')]
mock_state.return_value = S_NOT_CONNECTED
pjlink = PJLink(Projector(**TEST1_DATA), no_poll=True)
pjlink.send_queue = []
pjlink.priority_queue = []
# WHEN: _send_command called with no data and queue's empty
pjlink.send_command(cmd='DONTCARE')
# THEN:
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warning.assert_has_calls(log_warning_calls)
assert mock_reset.called is True
assert mock_reset.called is True
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_local_send_command_no_data(self, mock_log):
"""
Test _send_command with no data to send
"""
# GIVEN: Test object
log_debug_calls = [call('PJlink(projector="< Projector(id="None", ip="111.111.111.111", port="1111", '
'mac_adx="11:11:11:11:11:11", pin="1111", name="___TEST_ONE___", '
'location="location one", notes="notes one", pjlink_name="None", '
'pjlink_class="None", manufacturer="None", model="None", '
'serial_no="Serial Number 1", other="None", sources="None", source_list="[]", '
'model_filter="Filter type 1", model_lamp="Lamp type 1", '
'sw_version="Version 1") >", args="()" kwargs="{\'no_poll\': True}")'),
call('PJlinkCommands(args=() kwargs={})'),
call('(___TEST_ONE___) reset_information() connect status is S_NOT_CONNECTED'),
call('(___TEST_ONE___) _send_command(): Nothing to send - returning')]
pjlink = PJLink(Projector(**TEST1_DATA), no_poll=True)
pjlink.send_queue = []
pjlink.priority_queue = []
# WHEN: _send_command called with no data and queue's emtpy
# Patch here since pjlink does not have socket_timer until after instantiation
with patch.object(pjlink, 'socket_timer') as mock_timer:
pjlink._send_command(data=None, utf8=False)
# THEN:
mock_log.debug.assert_has_calls(log_debug_calls)
assert mock_timer.called is False

View File

@ -49,13 +49,23 @@ class TestPJLinkRouting(TestCase):
pjlink.pjlink_functions = MagicMock()
log_warning_text = [call('({ip}) get_data(): Invalid packet - '
'unknown command "UNKN"'.format(ip=pjlink.name))]
log_debug_text = [call('(___TEST_ONE___) get_data(ip="111.111.111.111" buffer="%1UNKN=Huh?"'),
log_debug_text = [call('PJlink(projector="< Projector(id="None", ip="111.111.111.111", port="1111", '
'mac_adx="11:11:11:11:11:11", pin="1111", name="___TEST_ONE___", '
'location="location one", notes="notes one", pjlink_name="None", '
'pjlink_class="None", manufacturer="None", model="None", serial_no="Serial Number 1", '
'other="None", sources="None", source_list="[]", model_filter="Filter type 1", '
'model_lamp="Lamp type 1", sw_version="Version 1") >", '
'args="()" kwargs="{\'no_poll\': True}")'),
call('PJlinkCommands(args=() kwargs={})'),
call('(___TEST_ONE___) reset_information() connect status is S_NOT_CONNECTED'),
call('(___TEST_ONE___) get_data(buffer="%1UNKN=Huh?"'),
call('(___TEST_ONE___) get_data(): Checking new data "%1UNKN=Huh?"'),
call('(___TEST_ONE___) get_data() header="%1UNKN" data="Huh?"'),
call('(___TEST_ONE___) get_data() version="1" cmd="UNKN"'),
call('(___TEST_ONE___) Cleaning buffer - msg = "get_data(): '
'Invalid packet - unknown command "UNKN""'),
call('(___TEST_ONE___) Finished cleaning buffer - 0 bytes dropped')]
call('(___TEST_ONE___) Cleaning buffer - msg = "get_data(): Invalid packet - '
'unknown command "UNKN""'),
call('(___TEST_ONE___) Finished cleaning buffer - 0 bytes dropped'),
call('(___TEST_ONE___) _send_command(): Nothing to send - returning')]
# WHEN: get_data called with an unknown command
pjlink.get_data(buff='{prefix}1UNKN=Huh?'.format(prefix=PJLINK_PREFIX))

View File

@ -30,37 +30,23 @@ from unittest.mock import call, patch
import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import PJLINK_PORT
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLinkUDP, PJLink
from tests.resources.projector.data import TEST1_DATA, TEST2_DATA
from openlp.core.projectors.pjlink import PJLinkUDP
from tests.resources.projector.data import TEST1_DATA
class TestPJLinkBase(TestCase):
"""
Tests for the PJLinkUDP class
"""
def setUp(self):
"""
Setup generic test conditions
"""
self.test_list = [PJLink(projector=Projector(**TEST1_DATA)),
PJLink(projector=Projector(**TEST2_DATA))]
def tearDown(self):
"""
Close generic test condidtions
"""
self.test_list = None
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_negative_zero_length(self, mock_log):
"""
Test get_datagram when pendingDatagramSize = 0
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) No data (-1)')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
pjlink_udp = PJLinkUDP()
log_warning_calls = [call('(UDP) No data (-1)')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized for port 4352'),
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
@ -71,7 +57,7 @@ class TestPJLinkBase(TestCase):
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.warning.assert_has_calls(log_warning_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
@ -80,9 +66,10 @@ class TestPJLinkBase(TestCase):
Test get_datagram when data length = 0
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) get_datagram() called when pending data size is 0')]
log_debug_calls = [call('(UDP) get_datagram() - Receiving data')]
pjlink_udp = PJLinkUDP()
log_warning_calls = [call('(UDP) get_datagram() called when pending data size is 0')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized for port 4352'),
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 0
@ -92,7 +79,7 @@ class TestPJLinkBase(TestCase):
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.warning.assert_has_calls(log_warning_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
@ -101,9 +88,9 @@ class TestPJLinkBase(TestCase):
Test get_datagram when pendingDatagramSize = 0
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) get_datagram() called when pending data size is 0')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
pjlink_udp = PJLinkUDP()
log_warning_calls = [call('(UDP) get_datagram() called when pending data size is 0')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized for port 4352'),
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram:
mock_datagram.return_value = 0
@ -112,5 +99,5 @@ class TestPJLinkBase(TestCase):
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warning.assert_has_calls(log_warn_calls)
mock_log.warning.assert_has_calls(log_warning_calls)
mock_log.debug.assert_has_calls(log_debug_calls)