Merge branch 'pjlink-tests' into 'master'

Pjlink tests 2022-02-23

See merge request openlp/openlp!426
This commit is contained in:
Tim Bentley 2022-02-24 19:02:28 +00:00
commit 8d6657c47e
5 changed files with 400 additions and 187 deletions

View File

@ -101,7 +101,7 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
self.search_timer = QtCore.QTimer() self.search_timer = QtCore.QTimer()
self.udp_broadcast_listen_setting = False self.udp_broadcast_listen_setting = False
self.settings = Registry().get('settings') self.settings = Registry().get('settings')
log.debug('(UDP:{port}) PJLinkUDP() Initialized'.format(port=self.port)) log.debug(f'(UDP:{self.port}) PJLinkUDP() Initialized')
if self.settings.value('projector/udp broadcast listen'): if self.settings.value('projector/udp broadcast listen'):
self.udp_start() self.udp_start()
@ -109,7 +109,7 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
""" """
Start listening on UDP port Start listening on UDP port
""" """
log.debug('(UDP:{port}) Start called'.format(port=self.port)) log.debug(f'(UDP:{self.port}) Start called')
self.readyRead.connect(self.get_datagram) self.readyRead.connect(self.get_datagram)
self.check_settings(checked=self.settings.value('projector/udp broadcast listen')) self.check_settings(checked=self.settings.value('projector/udp broadcast listen'))
@ -117,7 +117,7 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
""" """
Stop listening on UDP port Stop listening on UDP port
""" """
log.debug('(UDP:{port}) Stopping listener'.format(port=self.port)) log.debug(f'(UDP:{self.port}) Stopping listener')
self.close() self.close()
self.readyRead.disconnect(self.get_datagram) self.readyRead.disconnect(self.get_datagram)
@ -126,25 +126,22 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
""" """
Retrieve packet and basic checks Retrieve packet and basic checks
""" """
log.debug('(UDP:{port}) get_datagram() - Receiving data'.format(port=self.port)) log.debug(f'(UDP:{self.port}) get_datagram() - Receiving data')
read_size = self.pendingDatagramSize() read_size = self.pendingDatagramSize()
if -1 == read_size: if -1 == read_size:
log.warning('(UDP:{port}) No data (-1)'.format(port=self.port)) log.warning(f'(UDP:{self.port}) No data (-1)')
return return
elif 0 == read_size: elif 0 == read_size:
log.warning('(UDP:{port}) get_datagram() called when pending data size is 0'.format(port=self.port)) log.warning(f'(UDP:{self.port}) get_datagram() called when pending data size is 0')
return return
elif read_size > PJLINK_MAX_PACKET: elif read_size > PJLINK_MAX_PACKET:
log.warning('(UDP:{port}) UDP Packet too large ({size} bytes)- ignoring'.format(size=read_size, log.warning(f'(UDP:{self.port}) UDP Packet too large ({read_size} bytes)- ignoring')
port=self.port))
return return
data_in, peer_host, peer_port = self.readDatagram(read_size) data_in, peer_host, peer_port = self.readDatagram(read_size)
data = data_in.decode('utf-8') if isinstance(data_in, bytes) else data_in data = data_in.decode('utf-8') if isinstance(data_in, bytes) else data_in
log.debug('(UDP:{port}) {size} bytes received from {adx}'.format(size=len(data), log.debug(f'(UDP:{self.port}) {len(data)} bytes received from {peer_host.toString()}')
adx=peer_host.toString(), log.debug(f'(UDP:{self.port}) packet "{data}"')
port=self.port)) log.debug(f'(UDP:{self.port}) Sending data_received signal to projectors')
log.debug('(UDP:{port}) packet "{data}"'.format(data=data, port=self.port))
log.debug('(UDP:{port}) Sending data_received signal to projectors'.format(port=self.port))
self.data_received.emit(peer_host, self.localPort(), data) self.data_received.emit(peer_host, self.localPort(), data)
return return
@ -170,15 +167,15 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
NOTE: This method is called by projector settings tab and setup/removed by ProjectorManager NOTE: This method is called by projector settings tab and setup/removed by ProjectorManager
""" """
if self.udp_broadcast_listen_setting == checked: if self.udp_broadcast_listen_setting == checked:
log.debug('(UDP:{port}) No change to status - skipping'.format(port=self.port)) log.debug(f'(UDP:{self.port}) No change to status - skipping')
return return
self.udp_broadcast_listen_setting = checked self.udp_broadcast_listen_setting = checked
if self.udp_broadcast_listen_setting: if self.udp_broadcast_listen_setting:
if self.state() == self.ListeningState: if self.state() == self.ListeningState:
log.debug('(UDP:{port}) Already listening - skipping') log.debug(f'(UDP:{self.port}) Already listening - skipping')
return return
self.bind(self.port) self.bind(self.port)
log.debug('(UDP:{port}) Listening'.format(port=self.port)) log.debug(f'(UDP:{self.port}) Listening')
else: else:
# Close socket # Close socket
self.udp_stop() self.udp_stop()
@ -207,9 +204,7 @@ class PJLink(QtNetwork.QTcpSocket):
:param poll_time: Time (in seconds) to poll connected projector :param poll_time: Time (in seconds) to poll connected projector
:param socket_timeout: Time (in seconds) to abort the connection if no response :param socket_timeout: Time (in seconds) to abort the connection if no response
""" """
log.debug('PJLink(projector="{projector}", args="{args}" kwargs="{kwargs}")'.format(projector=projector, log.debug(f'PJLink(projector="{projector}", args="{args}" kwargs="{kwargs}")')
args=args,
kwargs=kwargs))
super().__init__() super().__init__()
self.settings_section = 'projector' self.settings_section = 'projector'
self.entry = projector self.entry = projector
@ -266,8 +261,7 @@ class PJLink(QtNetwork.QTcpSocket):
Initialize instance variables. Also used to reset projector-specific information to default. Initialize instance variables. Also used to reset projector-specific information to default.
""" """
conn_state = STATUS_CODE[QSOCKET_STATE[self.state()]] conn_state = STATUS_CODE[QSOCKET_STATE[self.state()]]
log.debug('({ip}) reset_information() connect status is {state}'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) reset_information() connect status is {conn_state}')
state=conn_state))
self.fan = None # ERST self.fan = None # ERST
self.filter_time = None # FILT self.filter_time = None # FILT
self.lamp = None # LAMP self.lamp = None # LAMP
@ -291,13 +285,13 @@ class PJLink(QtNetwork.QTcpSocket):
self.source = None # INPT self.source = None # INPT
# These should be part of PJLink() class, but set here for convenience # These should be part of PJLink() class, but set here for convenience
if hasattr(self, 'poll_timer'): if hasattr(self, 'poll_timer'):
log.debug('({ip}): Calling poll_timer.stop()'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}): Calling poll_timer.stop()')
self.poll_timer.stop() self.poll_timer.stop()
if hasattr(self, 'socket_timer'): if hasattr(self, 'socket_timer'):
log.debug('({ip}): Calling socket_timer.stop()'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}): Calling socket_timer.stop()')
self.socket_timer.stop() self.socket_timer.stop()
if hasattr(self, 'status_timer'): if hasattr(self, 'status_timer'):
log.debug('({ip}): Calling status_timer.stop()'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}): Calling status_timer.stop()')
self.status_timer.stop() self.status_timer.stop()
self.status_timer_checks = {} self.status_timer_checks = {}
self.send_busy = False self.send_busy = False
@ -309,7 +303,7 @@ class PJLink(QtNetwork.QTcpSocket):
Aborts connection and closes socket in case of brain-dead projectors. Aborts connection and closes socket in case of brain-dead projectors.
Should normally be called by socket_timer(). Should normally be called by socket_timer().
""" """
log.debug('({ip}) socket_abort() - Killing connection'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) socket_abort() - Killing connection')
self.disconnect_from_host(abort=True) self.disconnect_from_host(abort=True)
def poll_loop(self): def poll_loop(self):
@ -318,11 +312,11 @@ class PJLink(QtNetwork.QTcpSocket):
Normally called by timer(). Normally called by timer().
""" """
if QSOCKET_STATE[self.state()] != S_CONNECTED: if QSOCKET_STATE[self.state()] != S_CONNECTED:
log.warning('({ip}) poll_loop(): Not connected - returning'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) poll_loop(): Not connected - returning')
# Stop timer just in case it's missed elsewhere # Stop timer just in case it's missed elsewhere
self.poll_timer.stop() self.poll_timer.stop()
return return
log.debug('({ip}) poll_loop(): Updating projector status'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) poll_loop(): Updating projector status')
# The following commands do not change, so only check them once # The following commands do not change, so only check them once
# Call them first in case other functions rely on something here # Call them first in case other functions rely on something here
if self.power == S_ON and self.source_available is None: if self.power == S_ON and self.source_available is None:
@ -379,13 +373,10 @@ class PJLink(QtNetwork.QTcpSocket):
:param msg: Optional message :param msg: Optional message
""" """
if status in STATUS_CODE: if status in STATUS_CODE:
log.debug('({ip}) Changing status to {status} ' log.debug(f'({self.entry.name}) Changing status to {STATUS_CODE[status]} '
'"{msg}"'.format(ip=self.entry.name, f'"{STATUS_MSG[status] if msg is None else msg}"')
status=STATUS_CODE[status],
msg=msg if msg is not None else STATUS_MSG[status]))
else: else:
log.warning('({ip}) Unknown status change code: {code}'.format(ip=self.entry.name, log.warning(f'({self.entry.name}) Unknown status change code: {status}')
code=status))
return return
if status in CONNECTION_ERRORS: if status in CONNECTION_ERRORS:
# Connection state error affects both socket and projector # Connection state error affects both socket and projector
@ -403,26 +394,20 @@ class PJLink(QtNetwork.QTcpSocket):
# These log entries are for troubleshooting only # These log entries are for troubleshooting only
(status_code, status_message) = self._get_status(self.status_connect) (status_code, status_message) = self._get_status(self.status_connect)
log.debug('({ip}) status_connect: {code}: "{message}"'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) status_connect: {status_code}: "{status_message if msg is None else msg}"')
code=status_code,
message=status_message if msg is None else msg))
(status_code, status_message) = self._get_status(self.projector_status) (status_code, status_message) = self._get_status(self.projector_status)
log.debug('({ip}) projector_status: {code}: "{message}"'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) projector_status: {status_code}: "{status_message if msg is None else msg}"')
code=status_code,
message=status_message if msg is None else msg))
(status_code, status_message) = self._get_status(self.error_status) (status_code, status_message) = self._get_status(self.error_status)
log.debug('({ip}) error_status: {code}: "{message}"'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) error_status: {status_code}: "{status_message if msg is None else msg}"')
code=status_code,
message=status_message if msg is None else msg))
# Now that we logged extra information for debugging, broadcast the original change/message # Now that we logged extra information for debugging, broadcast the original change/message
# Check for connection errors first # Check for connection errors first
if self.error_status != S_OK: if self.error_status != S_OK:
log.debug('({ip}) Signalling error code'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Signalling error code')
code, message = self._get_status(self.error_status) code, message = self._get_status(self.error_status)
status = self.error_status status = self.error_status
else: else:
log.debug('({ip}) Signalling status code'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Signalling status code')
code, message = self._get_status(status) code, message = self._get_status(status)
if msg is not None: if msg is not None:
message = msg message = msg
@ -440,27 +425,27 @@ class PJLink(QtNetwork.QTcpSocket):
:param data: Optional data if called from another routine :param data: Optional data if called from another routine
""" """
log.debug('({ip}) check_login(data="{data}")'.format(ip=self.entry.name, data=data)) log.debug(f'({self.entry.name}) check_login(data="{data}")')
if data is None: if data is None:
# Reconnected setup? # Reconnected setup?
if not self.waitForReadyRead(2000): if not self.waitForReadyRead(2000):
# Possible timeout issue # Possible timeout issue
log.error('({ip}) Socket timeout waiting for login'.format(ip=self.entry.name)) log.error(f'({self.entry.name}) Socket timeout waiting for login')
self.change_status(E_SOCKET_TIMEOUT) self.change_status(E_SOCKET_TIMEOUT)
return return
read = self.readLine(self.max_size) read = self.readLine(self.max_size)
self.readLine(self.max_size) # Clean out any trailing whitespace self.readLine(self.max_size) # Clean out any trailing whitespace
if read is None: if read is None:
log.warning('({ip}) read is None - socket error?'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) read is None - socket error?')
return return
elif len(read) < 8: elif len(read) < 8:
log.warning('({ip}) Not enough data read - skipping'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) Not enough data read - skipping')
return return
data = decode(read, 'utf-8') data = decode(read, 'utf-8')
# Possibility of extraneous data on input when reading. # Possibility of extraneous data on input when reading.
# Clean out extraneous characters in buffer. # Clean out extraneous characters in buffer.
self.read(1024) self.read(1024)
log.debug('({ip}) check_login() read "{data}"'.format(ip=self.entry.name, data=data.strip())) log.debug(f'({self.entry.name}) check_login() read "{data.strip()}"')
# At this point, we should only have the initial login prompt with # At this point, we should only have the initial login prompt with
# possible authentication # possible authentication
# PJLink initial login will be: # PJLink initial login will be:
@ -468,30 +453,26 @@ class PJLink(QtNetwork.QTcpSocket):
# 'PJLink 1 XXXXXX' Authenticated login - extra processing required. # 'PJLink 1 XXXXXX' Authenticated login - extra processing required.
if not data.startswith('PJLINK'): if not data.startswith('PJLINK'):
# Invalid initial packet - close socket # Invalid initial packet - close socket
log.error('({ip}) Invalid initial packet received - closing socket'.format(ip=self.entry.name)) log.error(f'({self.entry.name}) Invalid initial packet received - closing socket')
return self.disconnect_from_host() return self.disconnect_from_host()
# Convert the initial login prompt with the expected PJLink normal command format for processing # Convert the initial login prompt with the expected PJLink normal command format for processing
log.debug('({ip}) check_login(): Formatting initial connection prompt ' log.debug(f'({self.entry.name}) check_login(): Formatting initial connection prompt to PJLink packet')
'to PJLink packet'.format(ip=self.entry.name)) return self.get_data(f'{PJLINK_PREFIX}1{data.replace(" ", "=", 1).encode("utf-8")}')
return self.get_data('{start}{clss}{data}'.format(start=PJLINK_PREFIX,
clss='1',
data=data.replace(' ', '=', 1)).encode('utf-8'))
def _trash_buffer(self, msg=None): def _trash_buffer(self, msg=None):
""" """
Clean out extraneous stuff in the buffer. Clean out extraneous stuff in the buffer.
""" """
log.debug('({ip}) Cleaning buffer - msg = "{message}"'.format(ip=self.entry.name, message=msg)) log.debug(f'({self.entry.name}) Cleaning buffer - msg = "{msg}"')
if msg is None: if msg is None:
msg = 'Invalid packet' msg = 'Invalid packet'
log.warning('({ip}) {message}'.format(ip=self.entry.name, message=msg)) log.warning(f'({self.entry.name}) {msg}')
self.send_busy = False self.send_busy = False
trash_count = 0 trash_count = 0
while self.bytesAvailable() > 0: while self.bytesAvailable() > 0:
trash = self.read(self.max_size) trash = self.read(self.max_size)
trash_count += len(trash) trash_count += len(trash)
log.debug('({ip}) Finished cleaning buffer - {count} bytes dropped'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) Finished cleaning buffer - {trash_count} bytes dropped')
count=trash_count))
return return
@QtCore.pyqtSlot(QtNetwork.QHostAddress, int, str, name='udp_data') # host, port, data @QtCore.pyqtSlot(QtNetwork.QHostAddress, int, str, name='udp_data') # host, port, data
@ -504,20 +485,20 @@ class PJLink(QtNetwork.QTcpSocket):
:param data: Data to process. buffer must be formatted as a proper PJLink packet. :param data: Data to process. buffer must be formatted as a proper PJLink packet.
""" """
if (port == int(self.port)) and (host.isEqual(self.qhost)): if (port == int(self.port)) and (host.isEqual(self.qhost)):
log.debug('({ip}) Received data from {host}'.format(ip=self.entry.name, host=host.toString())) log.debug(f'({self.entry.name}) Received data from {host.toString()}')
log.debug('({ip}) get_buffer(data="{buff}")'.format(ip=self.entry.name, buff=data)) log.debug(f'({self.entry.name}) get_buffer(data="{data}")')
return self.get_data(buff=data) return self.get_data(buff=data)
else: else:
log.debug('({ip}) Ignoring data for {host} - not me'.format(ip=self.entry.name, host=host.toString())) log.debug(f'({self.entry.name}) Ignoring data for {host.toString()} - not me')
@QtCore.pyqtSlot() @QtCore.pyqtSlot()
def get_socket(self): def get_socket(self):
""" """
Get data from TCP socket. Get data from TCP socket.
""" """
log.debug('({ip}) get_socket(): Reading data'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) get_socket(): Reading data')
if QSOCKET_STATE[self.state()] != S_CONNECTED: if QSOCKET_STATE[self.state()] != S_CONNECTED:
log.debug('({ip}) get_socket(): Not connected - returning'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) get_socket(): Not connected - returning')
self.send_busy = False self.send_busy = False
return return
# Although we have a packet length limit, go ahead and use a larger buffer # Although we have a packet length limit, go ahead and use a larger buffer
@ -526,16 +507,16 @@ class PJLink(QtNetwork.QTcpSocket):
data = self.readLine(1024) data = self.readLine(1024)
data = data.strip() data = data.strip()
if not data: if not data:
log.warning('({ip}) get_socket(): Ignoring empty packet'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) get_socket(): Ignoring empty packet')
if self.bytesAvailable() < 1: if self.bytesAvailable() < 1:
break break
self.socket_timer.stop() self.socket_timer.stop()
if data: if data:
log.debug('({ip}) get_socket(): "{buff}"'.format(ip=self.entry.name, buff=data)) log.debug(f'({self.entry.name}) get_socket(): "{data}"')
if data == -1: if data == -1:
# No data available # No data available
log.debug('({ip}) get_socket(): No data available (-1)'.format(ip=self.entry.name)) log.debug(f'({self.entry.namee}) get_socket(): No data available (-1)')
return return
return self.get_data(buff=data) return self.get_data(buff=data)
@ -632,9 +613,7 @@ class PJLink(QtNetwork.QTcpSocket):
:param err: Error code :param err: Error code
""" """
log.debug('({ip}) get_error(err={error}): {data}'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) get_error(err={err}): {self.errorString()}')
error=err,
data=self.errorString()))
if err <= 18: if err <= 18:
# QSocket errors. Redefined in projector.constants so we don't mistake # QSocket errors. Redefined in projector.constants so we don't mistake
# them for system errors # them for system errors
@ -662,40 +641,32 @@ class PJLink(QtNetwork.QTcpSocket):
:param salt: Optional salt for md5 hash initial authentication :param salt: Optional salt for md5 hash initial authentication
:param priority: Option to send packet now rather than queue it up :param priority: Option to send packet now rather than queue it up
""" """
log.debug('({ip}) send_command(cmd="{cmd}" opts="{opts}" salt="{salt}" ' log.debug(f'({self.entry.name}) send_command(cmd="{cmd}" opts="{opts}" salt="{salt}" priority={priority}')
'priority={pri}'.format(ip=self.entry.name, cmd=cmd, opts=opts, salt=salt, pri=priority))
if QSOCKET_STATE[self.state()] != S_CONNECTED: if QSOCKET_STATE[self.state()] != S_CONNECTED:
log.warning('({ip}) send_command(): Not connected - returning'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) send_command(): Not connected - returning')
return self.reset_information() return self.reset_information()
if cmd not in PJLINK_VALID_CMD: if cmd not in PJLINK_VALID_CMD:
log.error('({ip}) send_command(): Invalid command requested - ignoring.'.format(ip=self.entry.name)) log.error(f'({self.entry.name}) send_command(): Invalid command requested - ignoring.')
if self.priority_queue or self.send_queue: if self.priority_queue or self.send_queue:
# Just in case there's already something to send # Just in case there's already something to send
return self._send_command() return self._send_command()
return return
log.debug('({ip}) send_command(): Building cmd="{command}" opts="{data}" ' log.debug(f'({self.entry.name}) send_command(): Building cmd="{cmd}" opts="{opts}" '
'{salt}'.format(ip=self.entry.name, f'{"" if salt is None else "with hash"}')
command=cmd,
data=opts,
salt='' if salt is None else 'with hash'))
# Until we absolutely have to start doing version checks, use the default # Until we absolutely have to start doing version checks, use the default
# for PJLink class # for PJLink class
header = PJLINK_HEADER.format(linkclass=PJLINK_VALID_CMD[cmd]['default']) header = PJLINK_HEADER.format(linkclass=PJLINK_VALID_CMD[cmd]['default'])
out = '{salt}{header}{command} {options}{suffix}'.format(salt="" if salt is None else salt, out = f'{"" if salt is None else salt}{header}{cmd} {opts}{PJLINK_SUFFIX}'
header=header,
command=cmd,
options=opts,
suffix=PJLINK_SUFFIX)
if out in self.priority_queue: if out in self.priority_queue:
log.warning('({ip}) send_command(): Already in priority queue - skipping'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) send_command(): Already in priority queue - skipping')
elif out in self.send_queue: elif out in self.send_queue:
log.warning('({ip}) send_command(): Already in normal queue - skipping'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) send_command(): Already in normal queue - skipping')
else: else:
if priority: if priority:
log.debug('({ip}) send_command(): Adding to priority queue'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) send_command(): Adding to priority queue')
self.priority_queue.append(out) self.priority_queue.append(out)
else: else:
log.debug('({ip}) send_command(): Adding to normal queue'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) send_command(): Adding to normal queue')
self.send_queue.append(out) self.send_queue.append(out)
if self.priority_queue or self.send_queue: if self.priority_queue or self.send_queue:
# May be some initial connection setup so make sure we send data # May be some initial connection setup so make sure we send data
@ -710,64 +681,59 @@ class PJLink(QtNetwork.QTcpSocket):
:param utf8: Send as UTF-8 string otherwise send as ASCII string :param utf8: Send as UTF-8 string otherwise send as ASCII string
""" """
if not data and not self.priority_queue and not self.send_queue: if not data and not self.priority_queue and not self.send_queue:
log.warning('({ip}) _send_command(): Nothing to send - returning'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) _send_command(): Nothing to send - returning')
self.send_busy = False self.send_busy = False
return return
log.debug('({ip}) _send_command(data="{data}")'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) _send_command(data="{data if not data else data.strip()}")')
data=data.strip() if data else data)) log.debug(f'({self.entry.name}) _send_command(): priority_queue: {self.priority_queue}')
log.debug('({ip}) _send_command(): priority_queue: {queue}'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) _send_command(): send_queue: {self.send_queue}')
queue=self.priority_queue))
log.debug('({ip}) _send_command(): send_queue: {queue}'.format(ip=self.entry.name,
queue=self.send_queue))
conn_state = STATUS_CODE[QSOCKET_STATE[self.state()]] conn_state = STATUS_CODE[QSOCKET_STATE[self.state()]]
log.debug('({ip}) _send_command(): Connection status: {data}'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) _send_command(): Connection status: {conn_state}')
data=conn_state))
if QSOCKET_STATE[self.state()] != S_CONNECTED: if QSOCKET_STATE[self.state()] != S_CONNECTED:
log.warning('({ip}) _send_command() Not connected - abort'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) _send_command() Not connected - abort')
self.send_busy = False self.send_busy = False
return self.disconnect_from_host() return self.disconnect_from_host()
if data and data not in self.priority_queue: if data and data not in self.priority_queue:
log.debug('({ip}) _send_command(): Priority packet - adding to priority queue'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) _send_command(): Priority packet - adding to priority queue')
self.priority_queue.append(data) self.priority_queue.append(data)
if self.send_busy: if self.send_busy:
# Still waiting for response from last command sent # Still waiting for response from last command sent
log.debug('({ip}) _send_command(): Still busy, returning'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) _send_command(): Still busy, returning')
log.debug('({ip}) _send_command(): Priority queue = {data}'.format(ip=self.entry.name, log.debug(f'({self.entry.name}) _send_command(): Priority queue = {self.priority_queue}')
data=self.priority_queue)) log.debug(f'({self.entry.name}) _send_command(): Normal queue = {self.send_queue}')
log.debug('({ip}) _send_command(): Normal queue = {data}'.format(ip=self.entry.name, data=self.send_queue))
return return
if not self.priority_queue and not self.send_queue: if not self.priority_queue and not self.send_queue:
# No data to send # No data to send
log.warning('({ip}) _send_command(): No data to send'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) _send_command(): No data to send')
self.send_busy = False self.send_busy = False
return return
elif self.priority_queue: elif self.priority_queue:
out = self.priority_queue.pop(0) out = self.priority_queue.pop(0)
log.debug('({ip}) _send_command(): Getting priority queued packet'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) _send_command(): Getting priority queued packet')
elif self.send_queue: elif self.send_queue:
out = self.send_queue.pop(0) out = self.send_queue.pop(0)
log.debug('({ip}) _send_command(): Getting normal queued packet'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) _send_command(): Getting normal queued packet')
self.send_busy = True self.send_busy = True
log.debug('({ip}) _send_command(): Sending "{data}"'.format(ip=self.entry.name, data=out.strip())) log.debug(f'({self.entry.name}) _send_command(): Sending "{out.strip()}"')
self.socket_timer.start() self.socket_timer.start()
sent = self.write(out.encode('{string_encoding}'.format(string_encoding='utf-8' if utf8 else 'ascii'))) sent = self.write(out.encode(f'{"utf-8" if utf8 else "ascii"}'))
self.waitForBytesWritten(2000) # 2 seconds should be enough self.waitForBytesWritten(2000) # 2 seconds should be enough
if sent == -1: if sent == -1:
# Network error? # Network error?
self.change_status(E_NETWORK, self.change_status(E_NETWORK,
translate('OpenLP.PJLink', 'Error while sending data to projector')) translate('OpenLP.PJLink', 'Error while sending data to projector'))
log.warning('({ip}) _send_command(): -1 received - disconnecting from host'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) _send_command(): -1 received - disconnecting from host')
self.disconnect_from_host() self.disconnect_from_host()
def connect_to_host(self): def connect_to_host(self):
""" """
Initiate connection to projector. Initiate connection to projector.
""" """
log.debug('({ip}) connect_to_host(): Starting connection'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) connect_to_host(): Starting connection')
if QSOCKET_STATE[self.state()] == S_CONNECTED: if QSOCKET_STATE[self.state()] == S_CONNECTED:
log.warning('({ip}) connect_to_host(): Already connected - returning'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) connect_to_host(): Already connected - returning')
return return
self.error_status = S_OK self.error_status = S_OK
self.change_status(S_CONNECTING) self.change_status(S_CONNECTING)
@ -780,18 +746,17 @@ class PJLink(QtNetwork.QTcpSocket):
""" """
if abort or QSOCKET_STATE[self.state()] != S_NOT_CONNECTED: if abort or QSOCKET_STATE[self.state()] != S_NOT_CONNECTED:
if abort: if abort:
log.warning('({ip}) disconnect_from_host(): Aborting connection'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) disconnect_from_host(): Aborting connection')
self.abort() self.abort()
else: else:
log.warning('({ip}) disconnect_from_host(): Not connected'.format(ip=self.entry.name)) log.warning(f'({self.entry.name}) disconnect_from_host(): Not connected')
try: try:
self.readyRead.disconnect(self.get_socket) self.readyRead.disconnect(self.get_socket)
except TypeError: except TypeError:
# Since we already know what's happening, just log it for reference. # Since we already know what's happening, just log it for reference.
log.debug('({ip}) disconnect_from_host(): Issue detected with ' log.debug(f'({self.entry.name}) disconnect_from_host(): Issue detected with readyRead.disconnect')
'readyRead.disconnect'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) disconnect_from_host(): '
log.debug('({ip}) disconnect_from_host(): ' f'Current status {self._get_status(self.status_connect)[0]}')
'Current status {data}'.format(ip=self.entry.name, data=self._get_status(self.status_connect)[0]))
self.disconnectFromHost() self.disconnectFromHost()
if abort: if abort:
self.change_status(E_NOT_CONNECTED) self.change_status(E_NOT_CONNECTED)
@ -803,63 +768,63 @@ class PJLink(QtNetwork.QTcpSocket):
""" """
Send command to retrieve shutter status. Send command to retrieve shutter status.
""" """
log.debug('({ip}) Sending AVMT command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending AVMT command')
return self.send_command(cmd='AVMT', priority=priority) return self.send_command(cmd='AVMT', priority=priority)
def get_available_inputs(self): def get_available_inputs(self):
""" """
Send command to retrieve available source inputs. Send command to retrieve available source inputs.
""" """
log.debug('({ip}) Sending INST command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending INST command')
return self.send_command(cmd='INST') return self.send_command(cmd='INST')
def get_error_status(self): def get_error_status(self):
""" """
Send command to retrieve currently known errors. Send command to retrieve currently known errors.
""" """
log.debug('({ip}) Sending ERST command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending ERST command')
return self.send_command(cmd='ERST') return self.send_command(cmd='ERST')
def get_input_source(self): def get_input_source(self):
""" """
Send command to retrieve currently selected source input. Send command to retrieve currently selected source input.
""" """
log.debug('({ip}) Sending INPT command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending INPT command')
return self.send_command(cmd='INPT') return self.send_command(cmd='INPT')
def get_lamp_status(self): def get_lamp_status(self):
""" """
Send command to return the lap status. Send command to return the lap status.
""" """
log.debug('({ip}) Sending LAMP command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending LAMP command')
return self.send_command(cmd='LAMP') return self.send_command(cmd='LAMP')
def get_manufacturer(self): def get_manufacturer(self):
""" """
Send command to retrieve manufacturer name. Send command to retrieve manufacturer name.
""" """
log.debug('({ip}) Sending INF1 command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending INF1 command')
return self.send_command(cmd='INF1') return self.send_command(cmd='INF1')
def get_model(self): def get_model(self):
""" """
Send command to retrieve the model name. Send command to retrieve the model name.
""" """
log.debug('({ip}) Sending INF2 command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending INF2 command')
return self.send_command(cmd='INF2') return self.send_command(cmd='INF2')
def get_name(self): def get_name(self):
""" """
Send command to retrieve name as set by end-user (if set). Send command to retrieve name as set by end-user (if set).
""" """
log.debug('({ip}) Sending NAME command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending NAME command')
return self.send_command(cmd='NAME') return self.send_command(cmd='NAME')
def get_other_info(self): def get_other_info(self):
""" """
Send command to retrieve extra info set by manufacturer. Send command to retrieve extra info set by manufacturer.
""" """
log.debug('({ip}) Sending INFO command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending INFO command')
return self.send_command(cmd='INFO') return self.send_command(cmd='INFO')
def get_power_status(self, priority=False): def get_power_status(self, priority=False):
@ -868,14 +833,14 @@ class PJLink(QtNetwork.QTcpSocket):
:param priority: (OPTIONAL) Send in priority queue :param priority: (OPTIONAL) Send in priority queue
""" """
log.debug('({ip}) Sending POWR command'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Sending POWR command')
return self.send_command(cmd='POWR', priority=priority) return self.send_command(cmd='POWR', priority=priority)
def set_audio_mute(self, priority=False): def set_audio_mute(self, priority=False):
""" """
Send command to set audio to muted Send command to set audio to muted
""" """
log.debug('({ip}) Setting AVMT to 21 (audio mute)'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Setting AVMT to 21 (audio mute)')
self.send_command(cmd='AVMT', opts='21', priority=True) self.send_command(cmd='AVMT', opts='21', priority=True)
self.status_timer_add(cmd='AVMT', callback=self.get_av_mute_status) self.status_timer_add(cmd='AVMT', callback=self.get_av_mute_status)
self.poll_loop() self.poll_loop()
@ -884,7 +849,7 @@ class PJLink(QtNetwork.QTcpSocket):
""" """
Send command to set audio to normal Send command to set audio to normal
""" """
log.debug('({ip}) Setting AVMT to 20 (audio normal)'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Setting AVMT to 20 (audio normal)')
self.send_command(cmd='AVMT', opts='20', priority=True) self.send_command(cmd='AVMT', opts='20', priority=True)
self.status_timer_add(cmd='AVMT', callback=self.get_av_mute_status) self.status_timer_add(cmd='AVMT', callback=self.get_av_mute_status)
self.poll_loop() self.poll_loop()
@ -896,12 +861,12 @@ class PJLink(QtNetwork.QTcpSocket):
:param src: Video source to select in projector :param src: Video source to select in projector
""" """
log.debug('({ip}) set_input_source(src="{data}")'.format(ip=self.entry.name, data=src)) log.debug(f'({self.entry.name}) set_input_source(src="{src}")')
if self.source_available is None: if self.source_available is None:
return return
elif src not in self.source_available: elif src not in self.source_available:
return return
log.debug('({ip}) Setting input source to "{data}"'.format(ip=self.entry.name, data=src)) log.debug(f'({self.entry.name}) Setting input source to "{src}"')
self.send_command(cmd='INPT', opts=src, priority=True) self.send_command(cmd='INPT', opts=src, priority=True)
self.poll_loop() self.poll_loop()
@ -909,7 +874,7 @@ class PJLink(QtNetwork.QTcpSocket):
""" """
Send command to turn power to on. Send command to turn power to on.
""" """
log.debug('({ip}) Setting POWR to 1 (on)'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Setting POWR to 1 (on)')
self.send_command(cmd='POWR', opts='1', priority=True) self.send_command(cmd='POWR', opts='1', priority=True)
self.status_timer_add(cmd='POWR', callback=self.get_power_status) self.status_timer_add(cmd='POWR', callback=self.get_power_status)
self.poll_loop() self.poll_loop()
@ -918,7 +883,7 @@ class PJLink(QtNetwork.QTcpSocket):
""" """
Send command to turn power to standby. Send command to turn power to standby.
""" """
log.debug('({ip}) Setting POWR to 0 (standby)'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Setting POWR to 0 (standby)')
self.send_command(cmd='POWR', opts='0', priority=True) self.send_command(cmd='POWR', opts='0', priority=True)
self.status_timer_add(cmd='POWR', callback=self.get_power_status) self.status_timer_add(cmd='POWR', callback=self.get_power_status)
self.poll_loop() self.poll_loop()
@ -927,7 +892,7 @@ class PJLink(QtNetwork.QTcpSocket):
""" """
Send command to set shutter to closed position. Send command to set shutter to closed position.
""" """
log.debug('({ip}) Setting AVMT to 11 (shutter closed)'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Setting AVMT to 11 (shutter closed)')
self.send_command(cmd='AVMT', opts='11', priority=True) self.send_command(cmd='AVMT', opts='11', priority=True)
self.status_timer_add('AVMT', self.get_av_mute_status) self.status_timer_add('AVMT', self.get_av_mute_status)
self.poll_loop() self.poll_loop()
@ -936,7 +901,7 @@ class PJLink(QtNetwork.QTcpSocket):
""" """
Send command to set shutter to open position. Send command to set shutter to open position.
""" """
log.debug('({ip}) Setting AVMT to "10" (shutter open)'.format(ip=self.entry.name)) log.debug(f'({self.entry.name}) Setting AVMT to "10" (shutter open)')
self.send_command(cmd='AVMT', opts='10', priority=True) self.send_command(cmd='AVMT', opts='10', priority=True)
self.status_timer_add('AVMT', self.get_av_mute_status) self.status_timer_add('AVMT', self.get_av_mute_status)
self.poll_loop() self.poll_loop()
@ -949,9 +914,9 @@ class PJLink(QtNetwork.QTcpSocket):
:param callback: Method to call :param callback: Method to call
""" """
if cmd in self.status_timer_checks: if cmd in self.status_timer_checks:
log.warning('({ip}) "{cmd}" already in checks - returning'.format(ip=self.entry.name, cmd=cmd)) log.warning(f'({self.entry.name}) "{cmd}" already in checks - returning')
return return
log.debug('({ip}) Adding "{cmd}" callback for status timer'.format(ip=self.entry.name, cmd=cmd)) log.debug(f'({self.entry.name}) Adding "{cmd}" callback for status timer')
self.status_timer_checks[cmd] = callback self.status_timer_checks[cmd] = callback
if not self.status_timer.isActive(): if not self.status_timer.isActive():
self.status_timer.start() self.status_timer.start()
@ -964,9 +929,9 @@ class PJLink(QtNetwork.QTcpSocket):
:param callback: Method to call :param callback: Method to call
""" """
if cmd not in self.status_timer_checks: if cmd not in self.status_timer_checks:
log.warning('({ip}) "{cmd}" not listed in status timer - returning'.format(ip=self.entry.name, cmd=cmd)) log.warning(f'({self.entry.name}) "{cmd}" not listed in status timer - returning')
return return
log.debug('({ip}) Removing "{cmd}" from status timer'.format(ip=self.entry.name, cmd=cmd)) log.debug(f'({self.entry.name}) Removing "{cmd}" from status timer')
self.status_timer_checks.pop(cmd) self.status_timer_checks.pop(cmd)
if not self.status_timer_checks: if not self.status_timer_checks:
self.status_timer.stop() self.status_timer.stop()
@ -976,12 +941,11 @@ class PJLink(QtNetwork.QTcpSocket):
Call methods defined in status_timer_checks for updates Call methods defined in status_timer_checks for updates
""" """
if not self.status_timer_checks: if not self.status_timer_checks:
log.warning('({ip}) status_timer_update() called when no callbacks - ' log.warning(f'({self.entry.name}) status_timer_update() called when no callbacks - Race condition?')
'Race condition?'.format(ip=self.entry.name))
self.status_timer.stop() self.status_timer.stop()
return return
for cmd, callback in self.status_timer_checks.items(): for cmd, callback in self.status_timer_checks.items():
log.debug('({ip}) Status update call for {cmd}'.format(ip=self.entry.name, cmd=cmd)) log.debug(f'({self.entry.name}) Status update call for {cmd}')
callback(priority=True) callback(priority=True)
def receive_data_signal(self): def receive_data_signal(self):

View File

@ -0,0 +1,293 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2022 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
Test PJLink._get_status()
"""
import logging
import openlp.core.projectors.pjlink
from PyQt5 import QtNetwork
from unittest.mock import DEFAULT, MagicMock, patch
from openlp.core.projectors.constants import QSOCKET_STATE, STATUS_CODE, STATUS_MSG, \
E_GENERAL, S_CONNECTED, S_NOT_CONNECTED, S_CONNECTING, S_OK, \
PJLINK_PORT
from tests.resources.projector.data import TEST2_DATA
test_module = openlp.core.projectors.pjlink.__name__
def test_connect_to_host_connected(pjlink, caplog):
"""
Test connect_to_host returns when already connected
"""
# GIVEN: Test setup
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) connect_to_host(): Starting connection'),
(test_module, logging.WARNING, f'({pjlink.entry.name}) connect_to_host(): Already connected - returning')
]
mock_state = MagicMock()
mock_state.return_value = QSOCKET_STATE[S_CONNECTED]
# Set error_status to something not normally used for this test
pjlink.error_status = E_GENERAL
with patch.multiple(pjlink,
state=mock_state,
change_status=DEFAULT,
connectToHost=DEFAULT) as mock_pjlink:
# WHEN: Called
caplog.clear()
pjlink.connect_to_host()
# THEN: Appropriate entries and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
assert pjlink.error_status == E_GENERAL, 'Error status should not have change'
mock_pjlink['change_status'].assert_not_called()
mock_pjlink['connectToHost']. assert_not_called()
def test_connect_to_host_not_connected(pjlink, caplog):
"""
Test connect_to_host calls appropriate methods to connect
"""
# GIVEN: Test setup
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) connect_to_host(): Starting connection'),
]
mock_state = MagicMock()
mock_state.return_value = QSOCKET_STATE[S_NOT_CONNECTED]
pjlink.error_status = E_GENERAL
with patch.multiple(pjlink,
state=mock_state,
change_status=DEFAULT,
connectToHost=DEFAULT) as mock_pjlink:
# WHEN: Called
caplog.clear()
pjlink.connect_to_host()
# THEN: Appropriate entries and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
assert pjlink.error_status == S_OK, 'Error status should have changed to S_OK'
mock_pjlink['change_status'].assert_called_with(S_CONNECTING)
mock_pjlink['connectToHost'].assert_called_with(pjlink.ip, pjlink.port)
def test_get_buffer_me(pjlink, caplog):
"""
Test get_buffer() calls get_data()
"""
# NOTE: Verify pjlink.qhost == host.isEqual() works as expected
# NOTE: May have to fix get_buffer() on this test
# GIVEN: Test setup
t_host = pjlink.qhost
t_port = pjlink.port
t_data = "Test me with a spoon"
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) Received data from {t_host.toString()}'),
(test_module, logging.DEBUG, f'({pjlink.entry.name}) get_buffer(data="{t_data}")')
]
with patch.object(pjlink, 'get_data') as mock_data:
# WHEN: Called
pjlink.get_buffer(host=t_host, port=t_port, data=t_data)
# THEN: Appropriate logs and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_data.assert_called_with(buff=t_data)
def test_get_buffer_not_me(pjlink, caplog):
"""
Test get_buffer() returns without calls
"""
# GIVEN: Test setup
t_host = QtNetwork.QHostAddress(TEST2_DATA['ip'])
t_port = pjlink.port
t_data = "Test me with a spoon"
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) Ignoring data for {t_host.toString()} - not me')]
with patch.object(pjlink, 'get_data') as mock_data:
# WHEN: Called
pjlink.get_buffer(host=t_host, port=t_port, data=t_data)
# THEN: Appropriate logs and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_data.assert_not_called()
def test_get_buffer_wrong_port(pjlink, caplog):
"""
Test get_buffer() returns without calls
"""
# GIVEN: Test setup
t_host = pjlink.qhost
t_port = PJLINK_PORT
t_data = "Test me with a spoon"
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) Ignoring data for {t_host.toString()} - not me')]
with patch.object(pjlink, 'get_data') as mock_data:
# WHEN: Called
pjlink.get_buffer(host=t_host, port=t_port, data=t_data)
# THEN: Appropriate logs and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_data.assert_not_called()
def test_get_status_invalid_string(pjlink):
"""
Test get_status returns invalid when given a string
"""
# GIVEN: Test setup
t_status = "String"
# WHEN: Called with a string
t_code, t_msg = pjlink._get_status(status=t_status)
# THEN: Appropriate return values
assert t_code == -1, 'Should have returned code -1'
assert t_msg is None, 'Should have returned message None'
def test_get_status_invalid_string_digit(pjlink):
"""
Test get_status returns invalid when given a digit in a string
"""
# GIVEN: Test setup
t_status = "1"
# WHEN: Called with a string
t_code, t_msg = pjlink._get_status(status=t_status)
# THEN: Appropriate return values
assert t_code == -1, 'Should have returned code -1'
assert t_msg is None, 'Should have returned message None'
def test_get_status_invalid_code(pjlink):
"""
Test get_status returns invalid when given an invalid code
"""
# GIVEN: Test setup
t_status = E_GENERAL - 1
# WHEN: Called with a string
t_code, t_msg = pjlink._get_status(status=t_status)
# THEN: Appropriate return values
assert t_code is None, 'hould have returned code None'
assert t_msg is None, 'Should have returned message None'
def test_get_status_valid(pjlink):
"""
Test get_status returns valid status message
"""
# GIVEN: Test setup
t_status = E_GENERAL
# WHEN: Called with a string
t_code, t_msg = pjlink._get_status(status=t_status)
# THEN: Appropriate return values
assert t_code == STATUS_CODE[E_GENERAL], f'Should have returned "{STATUS_CODE[t_status]}"'
assert t_msg == STATUS_MSG[E_GENERAL], f'Should have returned "{STATUS_MSG[t_status]}"'
def test_receive_data_signal(pjlink):
"""
Test PJLink.receive_data_signal sets and sends valid signal
"""
# GIVEN: Test setup
pjlink.send_busy = True
with patch.object(pjlink, 'projectorReceivedData') as mock_receive:
# WHEN: Called
pjlink.receive_data_signal()
# THEN: Appropriate calls and settings
assert pjlink.send_busy is False, 'Did not clear send_busy'
mock_receive.emit.assert_called_once()
def test_status_timer_update_two_callbacks(pjlink, caplog):
"""
Test status_timer_update calls status_timer.stop when no updates listed
"""
# GIVEN: Test setup
t_cb1 = MagicMock()
t_cb2 = MagicMock()
pjlink.status_timer_checks = {'ONE': t_cb1,
'TWO': t_cb2}
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) Status update call for ONE'),
(test_module, logging.DEBUG, f'({pjlink.entry.name}) Status update call for TWO')]
with patch.object(pjlink, 'status_timer') as mock_timer:
# WHEN: Called
caplog.clear()
pjlink.status_timer_update()
# THEN: Returns with timer stop called
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_timer.stop.assert_not_called()
t_cb1.assert_called_once_with(priority=True)
t_cb2.assert_called_once_with(priority=True)
def test_status_timer_update_empty(pjlink, caplog):
"""
Test status_timer_update calls status_timer.stop when no updates listed
"""
# GIVEN: Test setup
pjlink.status_timer_checks = {}
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.WARNING,
f'({pjlink.entry.name}) status_timer_update() called when no callbacks - Race condition?')]
with patch.object(pjlink, 'status_timer') as mock_timer:
# WHEN: Called
caplog.clear()
pjlink.status_timer_update()
# THEN: Returns with timer stop called
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_timer.stop.assert_called_once()

View File

@ -26,7 +26,7 @@ from unittest.mock import MagicMock, call, patch
import openlp.core.projectors.pjlink import openlp.core.projectors.pjlink
from openlp.core.projectors.pjlinkcommands import process_command from openlp.core.projectors.pjlinkcommands import process_command
from openlp.core.projectors.constants import E_NOT_CONNECTED, E_PARAMETER, E_UNKNOWN_SOCKET_ERROR, QSOCKET_STATE, \ from openlp.core.projectors.constants import E_NOT_CONNECTED, E_PARAMETER, E_UNKNOWN_SOCKET_ERROR, QSOCKET_STATE, \
S_CONNECTED, S_CONNECTING, S_NOT_CONNECTED, S_OK, S_ON, STATUS_CODE, STATUS_MSG S_CONNECTED, S_CONNECTING, S_OK, S_ON, STATUS_CODE, STATUS_MSG
@patch.object(openlp.core.projectors.pjlink.PJLink, 'changeStatus') @patch.object(openlp.core.projectors.pjlink.PJLink, 'changeStatus')
@ -433,47 +433,3 @@ def test_projector_get_power_status(mock_log, mock_send_command, pjlink):
# THEN: log data and send_command should have been called # THEN: log data and send_command should have been called
mock_log.debug.assert_has_calls(log_debug_calls) mock_log.debug.assert_has_calls(log_debug_calls)
mock_send_command.assert_called_once_with(cmd=test_data, priority=False) mock_send_command.assert_called_once_with(cmd=test_data, priority=False)
def test_projector_get_status_invalid(pjlink):
"""
Test to check returned information for error code
"""
# GIVEN: Test object
test_string = 'NaN test'
# WHEN: get_status called
code, message = pjlink._get_status(status=test_string)
# THEN: Proper data should have been returned
assert code == -1, 'Should have returned -1 as a bad status check'
assert message is None, 'Invalid code type should have returned None for message'
def test_projector_get_status_valid(pjlink):
"""
Test to check returned information for status codes
"""
# GIVEN: Test object
test_message = 'Not Connected'
# WHEN: get_status called
code, message = pjlink._get_status(status=S_NOT_CONNECTED)
# THEN: Proper strings should have been returned
assert code == 'S_NOT_CONNECTED', 'Code returned should have been the same code that was sent'
assert message == test_message, 'Description of code should have been returned'
def test_projector_get_status_unknown(pjlink):
"""
Test to check returned information for unknown code
"""
# GIVEN: Test object
# WHEN: get_status called
code, message = pjlink._get_status(status=9999)
# THEN: Proper strings should have been returned
assert code is None, 'Code returned should have been the same code that was sent'
assert message is None, 'Should have returned None as message'