Pjlink tests 2022-02-23

This commit is contained in:
Ken Roberts 2022-02-24 19:02:28 +00:00 committed by Tim Bentley
parent 9d598773ef
commit 99d9398783
5 changed files with 400 additions and 187 deletions

View File

@ -101,7 +101,7 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
self.search_timer = QtCore.QTimer()
self.udp_broadcast_listen_setting = False
self.settings = Registry().get('settings')
log.debug('(UDP:{port}) PJLinkUDP() Initialized'.format(port=self.port))
log.debug(f'(UDP:{self.port}) PJLinkUDP() Initialized')
if self.settings.value('projector/udp broadcast listen'):
self.udp_start()
@ -109,7 +109,7 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
"""
Start listening on UDP port
"""
log.debug('(UDP:{port}) Start called'.format(port=self.port))
log.debug(f'(UDP:{self.port}) Start called')
self.readyRead.connect(self.get_datagram)
self.check_settings(checked=self.settings.value('projector/udp broadcast listen'))
@ -117,7 +117,7 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
"""
Stop listening on UDP port
"""
log.debug('(UDP:{port}) Stopping listener'.format(port=self.port))
log.debug(f'(UDP:{self.port}) Stopping listener')
self.close()
self.readyRead.disconnect(self.get_datagram)
@ -126,25 +126,22 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
"""
Retrieve packet and basic checks
"""
log.debug('(UDP:{port}) get_datagram() - Receiving data'.format(port=self.port))
log.debug(f'(UDP:{self.port}) get_datagram() - Receiving data')
read_size = self.pendingDatagramSize()
if -1 == read_size:
log.warning('(UDP:{port}) No data (-1)'.format(port=self.port))
log.warning(f'(UDP:{self.port}) No data (-1)')
return
elif 0 == read_size:
log.warning('(UDP:{port}) get_datagram() called when pending data size is 0'.format(port=self.port))
log.warning(f'(UDP:{self.port}) get_datagram() called when pending data size is 0')
return
elif read_size > PJLINK_MAX_PACKET:
log.warning('(UDP:{port}) UDP Packet too large ({size} bytes)- ignoring'.format(size=read_size,
port=self.port))
log.warning(f'(UDP:{self.port}) UDP Packet too large ({read_size} bytes)- ignoring')
return
data_in, peer_host, peer_port = self.readDatagram(read_size)
data = data_in.decode('utf-8') if isinstance(data_in, bytes) else data_in
log.debug('(UDP:{port}) {size} bytes received from {adx}'.format(size=len(data),
adx=peer_host.toString(),
port=self.port))
log.debug('(UDP:{port}) packet "{data}"'.format(data=data, port=self.port))
log.debug('(UDP:{port}) Sending data_received signal to projectors'.format(port=self.port))
log.debug(f'(UDP:{self.port}) {len(data)} bytes received from {peer_host.toString()}')
log.debug(f'(UDP:{self.port}) packet "{data}"')
log.debug(f'(UDP:{self.port}) Sending data_received signal to projectors')
self.data_received.emit(peer_host, self.localPort(), data)
return
@ -170,15 +167,15 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
NOTE: This method is called by projector settings tab and setup/removed by ProjectorManager
"""
if self.udp_broadcast_listen_setting == checked:
log.debug('(UDP:{port}) No change to status - skipping'.format(port=self.port))
log.debug(f'(UDP:{self.port}) No change to status - skipping')
return
self.udp_broadcast_listen_setting = checked
if self.udp_broadcast_listen_setting:
if self.state() == self.ListeningState:
log.debug('(UDP:{port}) Already listening - skipping')
log.debug(f'(UDP:{self.port}) Already listening - skipping')
return
self.bind(self.port)
log.debug('(UDP:{port}) Listening'.format(port=self.port))
log.debug(f'(UDP:{self.port}) Listening')
else:
# Close socket
self.udp_stop()
@ -207,9 +204,7 @@ class PJLink(QtNetwork.QTcpSocket):
:param poll_time: Time (in seconds) to poll connected projector
:param socket_timeout: Time (in seconds) to abort the connection if no response
"""
log.debug('PJLink(projector="{projector}", args="{args}" kwargs="{kwargs}")'.format(projector=projector,
args=args,
kwargs=kwargs))
log.debug(f'PJLink(projector="{projector}", args="{args}" kwargs="{kwargs}")')
super().__init__()
self.settings_section = 'projector'
self.entry = projector
@ -266,8 +261,7 @@ class PJLink(QtNetwork.QTcpSocket):
Initialize instance variables. Also used to reset projector-specific information to default.
"""
conn_state = STATUS_CODE[QSOCKET_STATE[self.state()]]
log.debug('({ip}) reset_information() connect status is {state}'.format(ip=self.entry.name,
state=conn_state))
log.debug(f'({self.entry.name}) reset_information() connect status is {conn_state}')
self.fan = None # ERST
self.filter_time = None # FILT
self.lamp = None # LAMP
@ -291,13 +285,13 @@ class PJLink(QtNetwork.QTcpSocket):
self.source = None # INPT
# These should be part of PJLink() class, but set here for convenience
if hasattr(self, 'poll_timer'):
log.debug('({ip}): Calling poll_timer.stop()'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}): Calling poll_timer.stop()')
self.poll_timer.stop()
if hasattr(self, 'socket_timer'):
log.debug('({ip}): Calling socket_timer.stop()'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}): Calling socket_timer.stop()')
self.socket_timer.stop()
if hasattr(self, 'status_timer'):
log.debug('({ip}): Calling status_timer.stop()'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}): Calling status_timer.stop()')
self.status_timer.stop()
self.status_timer_checks = {}
self.send_busy = False
@ -309,7 +303,7 @@ class PJLink(QtNetwork.QTcpSocket):
Aborts connection and closes socket in case of brain-dead projectors.
Should normally be called by socket_timer().
"""
log.debug('({ip}) socket_abort() - Killing connection'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) socket_abort() - Killing connection')
self.disconnect_from_host(abort=True)
def poll_loop(self):
@ -318,11 +312,11 @@ class PJLink(QtNetwork.QTcpSocket):
Normally called by timer().
"""
if QSOCKET_STATE[self.state()] != S_CONNECTED:
log.warning('({ip}) poll_loop(): Not connected - returning'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) poll_loop(): Not connected - returning')
# Stop timer just in case it's missed elsewhere
self.poll_timer.stop()
return
log.debug('({ip}) poll_loop(): Updating projector status'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) poll_loop(): Updating projector status')
# The following commands do not change, so only check them once
# Call them first in case other functions rely on something here
if self.power == S_ON and self.source_available is None:
@ -379,13 +373,10 @@ class PJLink(QtNetwork.QTcpSocket):
:param msg: Optional message
"""
if status in STATUS_CODE:
log.debug('({ip}) Changing status to {status} '
'"{msg}"'.format(ip=self.entry.name,
status=STATUS_CODE[status],
msg=msg if msg is not None else STATUS_MSG[status]))
log.debug(f'({self.entry.name}) Changing status to {STATUS_CODE[status]} '
f'"{STATUS_MSG[status] if msg is None else msg}"')
else:
log.warning('({ip}) Unknown status change code: {code}'.format(ip=self.entry.name,
code=status))
log.warning(f'({self.entry.name}) Unknown status change code: {status}')
return
if status in CONNECTION_ERRORS:
# Connection state error affects both socket and projector
@ -403,26 +394,20 @@ class PJLink(QtNetwork.QTcpSocket):
# These log entries are for troubleshooting only
(status_code, status_message) = self._get_status(self.status_connect)
log.debug('({ip}) status_connect: {code}: "{message}"'.format(ip=self.entry.name,
code=status_code,
message=status_message if msg is None else msg))
log.debug(f'({self.entry.name}) status_connect: {status_code}: "{status_message if msg is None else msg}"')
(status_code, status_message) = self._get_status(self.projector_status)
log.debug('({ip}) projector_status: {code}: "{message}"'.format(ip=self.entry.name,
code=status_code,
message=status_message if msg is None else msg))
log.debug(f'({self.entry.name}) projector_status: {status_code}: "{status_message if msg is None else msg}"')
(status_code, status_message) = self._get_status(self.error_status)
log.debug('({ip}) error_status: {code}: "{message}"'.format(ip=self.entry.name,
code=status_code,
message=status_message if msg is None else msg))
log.debug(f'({self.entry.name}) error_status: {status_code}: "{status_message if msg is None else msg}"')
# Now that we logged extra information for debugging, broadcast the original change/message
# Check for connection errors first
if self.error_status != S_OK:
log.debug('({ip}) Signalling error code'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Signalling error code')
code, message = self._get_status(self.error_status)
status = self.error_status
else:
log.debug('({ip}) Signalling status code'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Signalling status code')
code, message = self._get_status(status)
if msg is not None:
message = msg
@ -440,27 +425,27 @@ class PJLink(QtNetwork.QTcpSocket):
:param data: Optional data if called from another routine
"""
log.debug('({ip}) check_login(data="{data}")'.format(ip=self.entry.name, data=data))
log.debug(f'({self.entry.name}) check_login(data="{data}")')
if data is None:
# Reconnected setup?
if not self.waitForReadyRead(2000):
# Possible timeout issue
log.error('({ip}) Socket timeout waiting for login'.format(ip=self.entry.name))
log.error(f'({self.entry.name}) Socket timeout waiting for login')
self.change_status(E_SOCKET_TIMEOUT)
return
read = self.readLine(self.max_size)
self.readLine(self.max_size) # Clean out any trailing whitespace
if read is None:
log.warning('({ip}) read is None - socket error?'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) read is None - socket error?')
return
elif len(read) < 8:
log.warning('({ip}) Not enough data read - skipping'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) Not enough data read - skipping')
return
data = decode(read, 'utf-8')
# Possibility of extraneous data on input when reading.
# Clean out extraneous characters in buffer.
self.read(1024)
log.debug('({ip}) check_login() read "{data}"'.format(ip=self.entry.name, data=data.strip()))
log.debug(f'({self.entry.name}) check_login() read "{data.strip()}"')
# At this point, we should only have the initial login prompt with
# possible authentication
# PJLink initial login will be:
@ -468,30 +453,26 @@ class PJLink(QtNetwork.QTcpSocket):
# 'PJLink 1 XXXXXX' Authenticated login - extra processing required.
if not data.startswith('PJLINK'):
# Invalid initial packet - close socket
log.error('({ip}) Invalid initial packet received - closing socket'.format(ip=self.entry.name))
log.error(f'({self.entry.name}) Invalid initial packet received - closing socket')
return self.disconnect_from_host()
# Convert the initial login prompt with the expected PJLink normal command format for processing
log.debug('({ip}) check_login(): Formatting initial connection prompt '
'to PJLink packet'.format(ip=self.entry.name))
return self.get_data('{start}{clss}{data}'.format(start=PJLINK_PREFIX,
clss='1',
data=data.replace(' ', '=', 1)).encode('utf-8'))
log.debug(f'({self.entry.name}) check_login(): Formatting initial connection prompt to PJLink packet')
return self.get_data(f'{PJLINK_PREFIX}1{data.replace(" ", "=", 1).encode("utf-8")}')
def _trash_buffer(self, msg=None):
"""
Clean out extraneous stuff in the buffer.
"""
log.debug('({ip}) Cleaning buffer - msg = "{message}"'.format(ip=self.entry.name, message=msg))
log.debug(f'({self.entry.name}) Cleaning buffer - msg = "{msg}"')
if msg is None:
msg = 'Invalid packet'
log.warning('({ip}) {message}'.format(ip=self.entry.name, message=msg))
log.warning(f'({self.entry.name}) {msg}')
self.send_busy = False
trash_count = 0
while self.bytesAvailable() > 0:
trash = self.read(self.max_size)
trash_count += len(trash)
log.debug('({ip}) Finished cleaning buffer - {count} bytes dropped'.format(ip=self.entry.name,
count=trash_count))
log.debug(f'({self.entry.name}) Finished cleaning buffer - {trash_count} bytes dropped')
return
@QtCore.pyqtSlot(QtNetwork.QHostAddress, int, str, name='udp_data') # host, port, data
@ -504,20 +485,20 @@ class PJLink(QtNetwork.QTcpSocket):
:param data: Data to process. buffer must be formatted as a proper PJLink packet.
"""
if (port == int(self.port)) and (host.isEqual(self.qhost)):
log.debug('({ip}) Received data from {host}'.format(ip=self.entry.name, host=host.toString()))
log.debug('({ip}) get_buffer(data="{buff}")'.format(ip=self.entry.name, buff=data))
log.debug(f'({self.entry.name}) Received data from {host.toString()}')
log.debug(f'({self.entry.name}) get_buffer(data="{data}")')
return self.get_data(buff=data)
else:
log.debug('({ip}) Ignoring data for {host} - not me'.format(ip=self.entry.name, host=host.toString()))
log.debug(f'({self.entry.name}) Ignoring data for {host.toString()} - not me')
@QtCore.pyqtSlot()
def get_socket(self):
"""
Get data from TCP socket.
"""
log.debug('({ip}) get_socket(): Reading data'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) get_socket(): Reading data')
if QSOCKET_STATE[self.state()] != S_CONNECTED:
log.debug('({ip}) get_socket(): Not connected - returning'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) get_socket(): Not connected - returning')
self.send_busy = False
return
# Although we have a packet length limit, go ahead and use a larger buffer
@ -526,16 +507,16 @@ class PJLink(QtNetwork.QTcpSocket):
data = self.readLine(1024)
data = data.strip()
if not data:
log.warning('({ip}) get_socket(): Ignoring empty packet'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) get_socket(): Ignoring empty packet')
if self.bytesAvailable() < 1:
break
self.socket_timer.stop()
if data:
log.debug('({ip}) get_socket(): "{buff}"'.format(ip=self.entry.name, buff=data))
log.debug(f'({self.entry.name}) get_socket(): "{data}"')
if data == -1:
# No data available
log.debug('({ip}) get_socket(): No data available (-1)'.format(ip=self.entry.name))
log.debug(f'({self.entry.namee}) get_socket(): No data available (-1)')
return
return self.get_data(buff=data)
@ -632,9 +613,7 @@ class PJLink(QtNetwork.QTcpSocket):
:param err: Error code
"""
log.debug('({ip}) get_error(err={error}): {data}'.format(ip=self.entry.name,
error=err,
data=self.errorString()))
log.debug(f'({self.entry.name}) get_error(err={err}): {self.errorString()}')
if err <= 18:
# QSocket errors. Redefined in projector.constants so we don't mistake
# them for system errors
@ -662,40 +641,32 @@ class PJLink(QtNetwork.QTcpSocket):
:param salt: Optional salt for md5 hash initial authentication
:param priority: Option to send packet now rather than queue it up
"""
log.debug('({ip}) send_command(cmd="{cmd}" opts="{opts}" salt="{salt}" '
'priority={pri}'.format(ip=self.entry.name, cmd=cmd, opts=opts, salt=salt, pri=priority))
log.debug(f'({self.entry.name}) send_command(cmd="{cmd}" opts="{opts}" salt="{salt}" priority={priority}')
if QSOCKET_STATE[self.state()] != S_CONNECTED:
log.warning('({ip}) send_command(): Not connected - returning'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) send_command(): Not connected - returning')
return self.reset_information()
if cmd not in PJLINK_VALID_CMD:
log.error('({ip}) send_command(): Invalid command requested - ignoring.'.format(ip=self.entry.name))
log.error(f'({self.entry.name}) send_command(): Invalid command requested - ignoring.')
if self.priority_queue or self.send_queue:
# Just in case there's already something to send
return self._send_command()
return
log.debug('({ip}) send_command(): Building cmd="{command}" opts="{data}" '
'{salt}'.format(ip=self.entry.name,
command=cmd,
data=opts,
salt='' if salt is None else 'with hash'))
log.debug(f'({self.entry.name}) send_command(): Building cmd="{cmd}" opts="{opts}" '
f'{"" if salt is None else "with hash"}')
# Until we absolutely have to start doing version checks, use the default
# for PJLink class
header = PJLINK_HEADER.format(linkclass=PJLINK_VALID_CMD[cmd]['default'])
out = '{salt}{header}{command} {options}{suffix}'.format(salt="" if salt is None else salt,
header=header,
command=cmd,
options=opts,
suffix=PJLINK_SUFFIX)
out = f'{"" if salt is None else salt}{header}{cmd} {opts}{PJLINK_SUFFIX}'
if out in self.priority_queue:
log.warning('({ip}) send_command(): Already in priority queue - skipping'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) send_command(): Already in priority queue - skipping')
elif out in self.send_queue:
log.warning('({ip}) send_command(): Already in normal queue - skipping'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) send_command(): Already in normal queue - skipping')
else:
if priority:
log.debug('({ip}) send_command(): Adding to priority queue'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) send_command(): Adding to priority queue')
self.priority_queue.append(out)
else:
log.debug('({ip}) send_command(): Adding to normal queue'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) send_command(): Adding to normal queue')
self.send_queue.append(out)
if self.priority_queue or self.send_queue:
# May be some initial connection setup so make sure we send data
@ -710,64 +681,59 @@ class PJLink(QtNetwork.QTcpSocket):
:param utf8: Send as UTF-8 string otherwise send as ASCII string
"""
if not data and not self.priority_queue and not self.send_queue:
log.warning('({ip}) _send_command(): Nothing to send - returning'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) _send_command(): Nothing to send - returning')
self.send_busy = False
return
log.debug('({ip}) _send_command(data="{data}")'.format(ip=self.entry.name,
data=data.strip() if data else data))
log.debug('({ip}) _send_command(): priority_queue: {queue}'.format(ip=self.entry.name,
queue=self.priority_queue))
log.debug('({ip}) _send_command(): send_queue: {queue}'.format(ip=self.entry.name,
queue=self.send_queue))
log.debug(f'({self.entry.name}) _send_command(data="{data if not data else data.strip()}")')
log.debug(f'({self.entry.name}) _send_command(): priority_queue: {self.priority_queue}')
log.debug(f'({self.entry.name}) _send_command(): send_queue: {self.send_queue}')
conn_state = STATUS_CODE[QSOCKET_STATE[self.state()]]
log.debug('({ip}) _send_command(): Connection status: {data}'.format(ip=self.entry.name,
data=conn_state))
log.debug(f'({self.entry.name}) _send_command(): Connection status: {conn_state}')
if QSOCKET_STATE[self.state()] != S_CONNECTED:
log.warning('({ip}) _send_command() Not connected - abort'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) _send_command() Not connected - abort')
self.send_busy = False
return self.disconnect_from_host()
if data and data not in self.priority_queue:
log.debug('({ip}) _send_command(): Priority packet - adding to priority queue'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) _send_command(): Priority packet - adding to priority queue')
self.priority_queue.append(data)
if self.send_busy:
# Still waiting for response from last command sent
log.debug('({ip}) _send_command(): Still busy, returning'.format(ip=self.entry.name))
log.debug('({ip}) _send_command(): Priority queue = {data}'.format(ip=self.entry.name,
data=self.priority_queue))
log.debug('({ip}) _send_command(): Normal queue = {data}'.format(ip=self.entry.name, data=self.send_queue))
log.debug(f'({self.entry.name}) _send_command(): Still busy, returning')
log.debug(f'({self.entry.name}) _send_command(): Priority queue = {self.priority_queue}')
log.debug(f'({self.entry.name}) _send_command(): Normal queue = {self.send_queue}')
return
if not self.priority_queue and not self.send_queue:
# No data to send
log.warning('({ip}) _send_command(): No data to send'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) _send_command(): No data to send')
self.send_busy = False
return
elif self.priority_queue:
out = self.priority_queue.pop(0)
log.debug('({ip}) _send_command(): Getting priority queued packet'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) _send_command(): Getting priority queued packet')
elif self.send_queue:
out = self.send_queue.pop(0)
log.debug('({ip}) _send_command(): Getting normal queued packet'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) _send_command(): Getting normal queued packet')
self.send_busy = True
log.debug('({ip}) _send_command(): Sending "{data}"'.format(ip=self.entry.name, data=out.strip()))
log.debug(f'({self.entry.name}) _send_command(): Sending "{out.strip()}"')
self.socket_timer.start()
sent = self.write(out.encode('{string_encoding}'.format(string_encoding='utf-8' if utf8 else 'ascii')))
sent = self.write(out.encode(f'{"utf-8" if utf8 else "ascii"}'))
self.waitForBytesWritten(2000) # 2 seconds should be enough
if sent == -1:
# Network error?
self.change_status(E_NETWORK,
translate('OpenLP.PJLink', 'Error while sending data to projector'))
log.warning('({ip}) _send_command(): -1 received - disconnecting from host'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) _send_command(): -1 received - disconnecting from host')
self.disconnect_from_host()
def connect_to_host(self):
"""
Initiate connection to projector.
"""
log.debug('({ip}) connect_to_host(): Starting connection'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) connect_to_host(): Starting connection')
if QSOCKET_STATE[self.state()] == S_CONNECTED:
log.warning('({ip}) connect_to_host(): Already connected - returning'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) connect_to_host(): Already connected - returning')
return
self.error_status = S_OK
self.change_status(S_CONNECTING)
@ -780,18 +746,17 @@ class PJLink(QtNetwork.QTcpSocket):
"""
if abort or QSOCKET_STATE[self.state()] != S_NOT_CONNECTED:
if abort:
log.warning('({ip}) disconnect_from_host(): Aborting connection'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) disconnect_from_host(): Aborting connection')
self.abort()
else:
log.warning('({ip}) disconnect_from_host(): Not connected'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) disconnect_from_host(): Not connected')
try:
self.readyRead.disconnect(self.get_socket)
except TypeError:
# Since we already know what's happening, just log it for reference.
log.debug('({ip}) disconnect_from_host(): Issue detected with '
'readyRead.disconnect'.format(ip=self.entry.name))
log.debug('({ip}) disconnect_from_host(): '
'Current status {data}'.format(ip=self.entry.name, data=self._get_status(self.status_connect)[0]))
log.debug(f'({self.entry.name}) disconnect_from_host(): Issue detected with readyRead.disconnect')
log.debug(f'({self.entry.name}) disconnect_from_host(): '
f'Current status {self._get_status(self.status_connect)[0]}')
self.disconnectFromHost()
if abort:
self.change_status(E_NOT_CONNECTED)
@ -803,63 +768,63 @@ class PJLink(QtNetwork.QTcpSocket):
"""
Send command to retrieve shutter status.
"""
log.debug('({ip}) Sending AVMT command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending AVMT command')
return self.send_command(cmd='AVMT', priority=priority)
def get_available_inputs(self):
"""
Send command to retrieve available source inputs.
"""
log.debug('({ip}) Sending INST command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending INST command')
return self.send_command(cmd='INST')
def get_error_status(self):
"""
Send command to retrieve currently known errors.
"""
log.debug('({ip}) Sending ERST command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending ERST command')
return self.send_command(cmd='ERST')
def get_input_source(self):
"""
Send command to retrieve currently selected source input.
"""
log.debug('({ip}) Sending INPT command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending INPT command')
return self.send_command(cmd='INPT')
def get_lamp_status(self):
"""
Send command to return the lap status.
"""
log.debug('({ip}) Sending LAMP command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending LAMP command')
return self.send_command(cmd='LAMP')
def get_manufacturer(self):
"""
Send command to retrieve manufacturer name.
"""
log.debug('({ip}) Sending INF1 command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending INF1 command')
return self.send_command(cmd='INF1')
def get_model(self):
"""
Send command to retrieve the model name.
"""
log.debug('({ip}) Sending INF2 command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending INF2 command')
return self.send_command(cmd='INF2')
def get_name(self):
"""
Send command to retrieve name as set by end-user (if set).
"""
log.debug('({ip}) Sending NAME command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending NAME command')
return self.send_command(cmd='NAME')
def get_other_info(self):
"""
Send command to retrieve extra info set by manufacturer.
"""
log.debug('({ip}) Sending INFO command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending INFO command')
return self.send_command(cmd='INFO')
def get_power_status(self, priority=False):
@ -868,14 +833,14 @@ class PJLink(QtNetwork.QTcpSocket):
:param priority: (OPTIONAL) Send in priority queue
"""
log.debug('({ip}) Sending POWR command'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Sending POWR command')
return self.send_command(cmd='POWR', priority=priority)
def set_audio_mute(self, priority=False):
"""
Send command to set audio to muted
"""
log.debug('({ip}) Setting AVMT to 21 (audio mute)'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Setting AVMT to 21 (audio mute)')
self.send_command(cmd='AVMT', opts='21', priority=True)
self.status_timer_add(cmd='AVMT', callback=self.get_av_mute_status)
self.poll_loop()
@ -884,7 +849,7 @@ class PJLink(QtNetwork.QTcpSocket):
"""
Send command to set audio to normal
"""
log.debug('({ip}) Setting AVMT to 20 (audio normal)'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Setting AVMT to 20 (audio normal)')
self.send_command(cmd='AVMT', opts='20', priority=True)
self.status_timer_add(cmd='AVMT', callback=self.get_av_mute_status)
self.poll_loop()
@ -896,12 +861,12 @@ class PJLink(QtNetwork.QTcpSocket):
:param src: Video source to select in projector
"""
log.debug('({ip}) set_input_source(src="{data}")'.format(ip=self.entry.name, data=src))
log.debug(f'({self.entry.name}) set_input_source(src="{src}")')
if self.source_available is None:
return
elif src not in self.source_available:
return
log.debug('({ip}) Setting input source to "{data}"'.format(ip=self.entry.name, data=src))
log.debug(f'({self.entry.name}) Setting input source to "{src}"')
self.send_command(cmd='INPT', opts=src, priority=True)
self.poll_loop()
@ -909,7 +874,7 @@ class PJLink(QtNetwork.QTcpSocket):
"""
Send command to turn power to on.
"""
log.debug('({ip}) Setting POWR to 1 (on)'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Setting POWR to 1 (on)')
self.send_command(cmd='POWR', opts='1', priority=True)
self.status_timer_add(cmd='POWR', callback=self.get_power_status)
self.poll_loop()
@ -918,7 +883,7 @@ class PJLink(QtNetwork.QTcpSocket):
"""
Send command to turn power to standby.
"""
log.debug('({ip}) Setting POWR to 0 (standby)'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Setting POWR to 0 (standby)')
self.send_command(cmd='POWR', opts='0', priority=True)
self.status_timer_add(cmd='POWR', callback=self.get_power_status)
self.poll_loop()
@ -927,7 +892,7 @@ class PJLink(QtNetwork.QTcpSocket):
"""
Send command to set shutter to closed position.
"""
log.debug('({ip}) Setting AVMT to 11 (shutter closed)'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Setting AVMT to 11 (shutter closed)')
self.send_command(cmd='AVMT', opts='11', priority=True)
self.status_timer_add('AVMT', self.get_av_mute_status)
self.poll_loop()
@ -936,7 +901,7 @@ class PJLink(QtNetwork.QTcpSocket):
"""
Send command to set shutter to open position.
"""
log.debug('({ip}) Setting AVMT to "10" (shutter open)'.format(ip=self.entry.name))
log.debug(f'({self.entry.name}) Setting AVMT to "10" (shutter open)')
self.send_command(cmd='AVMT', opts='10', priority=True)
self.status_timer_add('AVMT', self.get_av_mute_status)
self.poll_loop()
@ -949,9 +914,9 @@ class PJLink(QtNetwork.QTcpSocket):
:param callback: Method to call
"""
if cmd in self.status_timer_checks:
log.warning('({ip}) "{cmd}" already in checks - returning'.format(ip=self.entry.name, cmd=cmd))
log.warning(f'({self.entry.name}) "{cmd}" already in checks - returning')
return
log.debug('({ip}) Adding "{cmd}" callback for status timer'.format(ip=self.entry.name, cmd=cmd))
log.debug(f'({self.entry.name}) Adding "{cmd}" callback for status timer')
self.status_timer_checks[cmd] = callback
if not self.status_timer.isActive():
self.status_timer.start()
@ -964,9 +929,9 @@ class PJLink(QtNetwork.QTcpSocket):
:param callback: Method to call
"""
if cmd not in self.status_timer_checks:
log.warning('({ip}) "{cmd}" not listed in status timer - returning'.format(ip=self.entry.name, cmd=cmd))
log.warning(f'({self.entry.name}) "{cmd}" not listed in status timer - returning')
return
log.debug('({ip}) Removing "{cmd}" from status timer'.format(ip=self.entry.name, cmd=cmd))
log.debug(f'({self.entry.name}) Removing "{cmd}" from status timer')
self.status_timer_checks.pop(cmd)
if not self.status_timer_checks:
self.status_timer.stop()
@ -976,12 +941,11 @@ class PJLink(QtNetwork.QTcpSocket):
Call methods defined in status_timer_checks for updates
"""
if not self.status_timer_checks:
log.warning('({ip}) status_timer_update() called when no callbacks - '
'Race condition?'.format(ip=self.entry.name))
log.warning(f'({self.entry.name}) status_timer_update() called when no callbacks - Race condition?')
self.status_timer.stop()
return
for cmd, callback in self.status_timer_checks.items():
log.debug('({ip}) Status update call for {cmd}'.format(ip=self.entry.name, cmd=cmd))
log.debug(f'({self.entry.name}) Status update call for {cmd}')
callback(priority=True)
def receive_data_signal(self):

View File

@ -0,0 +1,293 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2022 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
Test PJLink._get_status()
"""
import logging
import openlp.core.projectors.pjlink
from PyQt5 import QtNetwork
from unittest.mock import DEFAULT, MagicMock, patch
from openlp.core.projectors.constants import QSOCKET_STATE, STATUS_CODE, STATUS_MSG, \
E_GENERAL, S_CONNECTED, S_NOT_CONNECTED, S_CONNECTING, S_OK, \
PJLINK_PORT
from tests.resources.projector.data import TEST2_DATA
test_module = openlp.core.projectors.pjlink.__name__
def test_connect_to_host_connected(pjlink, caplog):
"""
Test connect_to_host returns when already connected
"""
# GIVEN: Test setup
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) connect_to_host(): Starting connection'),
(test_module, logging.WARNING, f'({pjlink.entry.name}) connect_to_host(): Already connected - returning')
]
mock_state = MagicMock()
mock_state.return_value = QSOCKET_STATE[S_CONNECTED]
# Set error_status to something not normally used for this test
pjlink.error_status = E_GENERAL
with patch.multiple(pjlink,
state=mock_state,
change_status=DEFAULT,
connectToHost=DEFAULT) as mock_pjlink:
# WHEN: Called
caplog.clear()
pjlink.connect_to_host()
# THEN: Appropriate entries and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
assert pjlink.error_status == E_GENERAL, 'Error status should not have change'
mock_pjlink['change_status'].assert_not_called()
mock_pjlink['connectToHost']. assert_not_called()
def test_connect_to_host_not_connected(pjlink, caplog):
"""
Test connect_to_host calls appropriate methods to connect
"""
# GIVEN: Test setup
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) connect_to_host(): Starting connection'),
]
mock_state = MagicMock()
mock_state.return_value = QSOCKET_STATE[S_NOT_CONNECTED]
pjlink.error_status = E_GENERAL
with patch.multiple(pjlink,
state=mock_state,
change_status=DEFAULT,
connectToHost=DEFAULT) as mock_pjlink:
# WHEN: Called
caplog.clear()
pjlink.connect_to_host()
# THEN: Appropriate entries and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
assert pjlink.error_status == S_OK, 'Error status should have changed to S_OK'
mock_pjlink['change_status'].assert_called_with(S_CONNECTING)
mock_pjlink['connectToHost'].assert_called_with(pjlink.ip, pjlink.port)
def test_get_buffer_me(pjlink, caplog):
"""
Test get_buffer() calls get_data()
"""
# NOTE: Verify pjlink.qhost == host.isEqual() works as expected
# NOTE: May have to fix get_buffer() on this test
# GIVEN: Test setup
t_host = pjlink.qhost
t_port = pjlink.port
t_data = "Test me with a spoon"
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) Received data from {t_host.toString()}'),
(test_module, logging.DEBUG, f'({pjlink.entry.name}) get_buffer(data="{t_data}")')
]
with patch.object(pjlink, 'get_data') as mock_data:
# WHEN: Called
pjlink.get_buffer(host=t_host, port=t_port, data=t_data)
# THEN: Appropriate logs and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_data.assert_called_with(buff=t_data)
def test_get_buffer_not_me(pjlink, caplog):
"""
Test get_buffer() returns without calls
"""
# GIVEN: Test setup
t_host = QtNetwork.QHostAddress(TEST2_DATA['ip'])
t_port = pjlink.port
t_data = "Test me with a spoon"
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) Ignoring data for {t_host.toString()} - not me')]
with patch.object(pjlink, 'get_data') as mock_data:
# WHEN: Called
pjlink.get_buffer(host=t_host, port=t_port, data=t_data)
# THEN: Appropriate logs and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_data.assert_not_called()
def test_get_buffer_wrong_port(pjlink, caplog):
"""
Test get_buffer() returns without calls
"""
# GIVEN: Test setup
t_host = pjlink.qhost
t_port = PJLINK_PORT
t_data = "Test me with a spoon"
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) Ignoring data for {t_host.toString()} - not me')]
with patch.object(pjlink, 'get_data') as mock_data:
# WHEN: Called
pjlink.get_buffer(host=t_host, port=t_port, data=t_data)
# THEN: Appropriate logs and calls
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_data.assert_not_called()
def test_get_status_invalid_string(pjlink):
"""
Test get_status returns invalid when given a string
"""
# GIVEN: Test setup
t_status = "String"
# WHEN: Called with a string
t_code, t_msg = pjlink._get_status(status=t_status)
# THEN: Appropriate return values
assert t_code == -1, 'Should have returned code -1'
assert t_msg is None, 'Should have returned message None'
def test_get_status_invalid_string_digit(pjlink):
"""
Test get_status returns invalid when given a digit in a string
"""
# GIVEN: Test setup
t_status = "1"
# WHEN: Called with a string
t_code, t_msg = pjlink._get_status(status=t_status)
# THEN: Appropriate return values
assert t_code == -1, 'Should have returned code -1'
assert t_msg is None, 'Should have returned message None'
def test_get_status_invalid_code(pjlink):
"""
Test get_status returns invalid when given an invalid code
"""
# GIVEN: Test setup
t_status = E_GENERAL - 1
# WHEN: Called with a string
t_code, t_msg = pjlink._get_status(status=t_status)
# THEN: Appropriate return values
assert t_code is None, 'hould have returned code None'
assert t_msg is None, 'Should have returned message None'
def test_get_status_valid(pjlink):
"""
Test get_status returns valid status message
"""
# GIVEN: Test setup
t_status = E_GENERAL
# WHEN: Called with a string
t_code, t_msg = pjlink._get_status(status=t_status)
# THEN: Appropriate return values
assert t_code == STATUS_CODE[E_GENERAL], f'Should have returned "{STATUS_CODE[t_status]}"'
assert t_msg == STATUS_MSG[E_GENERAL], f'Should have returned "{STATUS_MSG[t_status]}"'
def test_receive_data_signal(pjlink):
"""
Test PJLink.receive_data_signal sets and sends valid signal
"""
# GIVEN: Test setup
pjlink.send_busy = True
with patch.object(pjlink, 'projectorReceivedData') as mock_receive:
# WHEN: Called
pjlink.receive_data_signal()
# THEN: Appropriate calls and settings
assert pjlink.send_busy is False, 'Did not clear send_busy'
mock_receive.emit.assert_called_once()
def test_status_timer_update_two_callbacks(pjlink, caplog):
"""
Test status_timer_update calls status_timer.stop when no updates listed
"""
# GIVEN: Test setup
t_cb1 = MagicMock()
t_cb2 = MagicMock()
pjlink.status_timer_checks = {'ONE': t_cb1,
'TWO': t_cb2}
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.DEBUG, f'({pjlink.entry.name}) Status update call for ONE'),
(test_module, logging.DEBUG, f'({pjlink.entry.name}) Status update call for TWO')]
with patch.object(pjlink, 'status_timer') as mock_timer:
# WHEN: Called
caplog.clear()
pjlink.status_timer_update()
# THEN: Returns with timer stop called
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_timer.stop.assert_not_called()
t_cb1.assert_called_once_with(priority=True)
t_cb2.assert_called_once_with(priority=True)
def test_status_timer_update_empty(pjlink, caplog):
"""
Test status_timer_update calls status_timer.stop when no updates listed
"""
# GIVEN: Test setup
pjlink.status_timer_checks = {}
caplog.set_level(logging.DEBUG)
logs = [(test_module, logging.WARNING,
f'({pjlink.entry.name}) status_timer_update() called when no callbacks - Race condition?')]
with patch.object(pjlink, 'status_timer') as mock_timer:
# WHEN: Called
caplog.clear()
pjlink.status_timer_update()
# THEN: Returns with timer stop called
assert caplog.record_tuples == logs, 'Invalid log entries'
mock_timer.stop.assert_called_once()

View File

@ -26,7 +26,7 @@ from unittest.mock import MagicMock, call, patch
import openlp.core.projectors.pjlink
from openlp.core.projectors.pjlinkcommands import process_command
from openlp.core.projectors.constants import E_NOT_CONNECTED, E_PARAMETER, E_UNKNOWN_SOCKET_ERROR, QSOCKET_STATE, \
S_CONNECTED, S_CONNECTING, S_NOT_CONNECTED, S_OK, S_ON, STATUS_CODE, STATUS_MSG
S_CONNECTED, S_CONNECTING, S_OK, S_ON, STATUS_CODE, STATUS_MSG
@patch.object(openlp.core.projectors.pjlink.PJLink, 'changeStatus')
@ -433,47 +433,3 @@ def test_projector_get_power_status(mock_log, mock_send_command, pjlink):
# THEN: log data and send_command should have been called
mock_log.debug.assert_has_calls(log_debug_calls)
mock_send_command.assert_called_once_with(cmd=test_data, priority=False)
def test_projector_get_status_invalid(pjlink):
"""
Test to check returned information for error code
"""
# GIVEN: Test object
test_string = 'NaN test'
# WHEN: get_status called
code, message = pjlink._get_status(status=test_string)
# THEN: Proper data should have been returned
assert code == -1, 'Should have returned -1 as a bad status check'
assert message is None, 'Invalid code type should have returned None for message'
def test_projector_get_status_valid(pjlink):
"""
Test to check returned information for status codes
"""
# GIVEN: Test object
test_message = 'Not Connected'
# WHEN: get_status called
code, message = pjlink._get_status(status=S_NOT_CONNECTED)
# THEN: Proper strings should have been returned
assert code == 'S_NOT_CONNECTED', 'Code returned should have been the same code that was sent'
assert message == test_message, 'Description of code should have been returned'
def test_projector_get_status_unknown(pjlink):
"""
Test to check returned information for unknown code
"""
# GIVEN: Test object
# WHEN: get_status called
code, message = pjlink._get_status(status=9999)
# THEN: Proper strings should have been returned
assert code is None, 'Code returned should have been the same code that was sent'
assert message is None, 'Should have returned None as message'