forked from openlp/openlp
Bugfix 1593882 and 1593883 - projector authorization backport
- Fix exception when authenticated connection requested and pin is None - Fix pjlink authentication (use python hash instead of qt hash) - Fix md5_hash functions - Fix qmd5_hash functions - Added tests for bugfixes -------------------------------- lp:~alisonken1/openlp/2.4-bug-1593883-projector-authentication (revision 2637) [SUCCESS] https://ci.openlp.io/job/Branch-01-Pull/1630/ [SUCCESS] https://ci.openlp.io/job/Branch-02-Funct... bzr-revno: 2639 Fixes: https://launchpad.net/bugs/1593882, https://launchpad.net/bugs/1593883
This commit is contained in:
commit
ecdb379082
@ -219,7 +219,8 @@ def qmd5_hash(salt, data=None):
|
||||
log.debug('qmd5_hash(salt="%s"' % salt)
|
||||
hash_obj = QHash(QHash.Md5)
|
||||
hash_obj.addData(salt)
|
||||
hash_obj.addData(data)
|
||||
if data:
|
||||
hash_obj.addData(data)
|
||||
hash_value = hash_obj.result().toHex()
|
||||
log.debug('qmd5_hash() returning "%s"' % hash_value)
|
||||
return hash_value.data()
|
||||
|
@ -297,7 +297,11 @@ PJLINK_ERST_STATUS = {'0': ERROR_STRING[E_OK],
|
||||
PJLINK_POWR_STATUS = {'0': S_STANDBY,
|
||||
'1': S_ON,
|
||||
'2': S_COOLDOWN,
|
||||
'3': S_WARMUP}
|
||||
'3': S_WARMUP,
|
||||
S_STANDBY: '0',
|
||||
S_ON: '1',
|
||||
S_COOLDOWN: '2',
|
||||
S_WARMUP: '3'}
|
||||
|
||||
PJLINK_DEFAULT_SOURCES = {'1': translate('OpenLP.DB', 'RGB'),
|
||||
'2': translate('OpenLP.DB', 'Video'),
|
||||
|
@ -49,7 +49,7 @@ from codecs import decode
|
||||
from PyQt5.QtCore import pyqtSignal, pyqtSlot
|
||||
from PyQt5.QtNetwork import QAbstractSocket, QTcpSocket
|
||||
|
||||
from openlp.core.common import translate, qmd5_hash
|
||||
from openlp.core.common import translate, md5_hash
|
||||
from openlp.core.lib.projector.constants import *
|
||||
|
||||
# Shortcuts
|
||||
@ -58,7 +58,7 @@ SocketSTate = QAbstractSocket.SocketState
|
||||
|
||||
PJLINK_PREFIX = '%'
|
||||
PJLINK_CLASS = '1'
|
||||
PJLINK_HEADER = '%s%s' % (PJLINK_PREFIX, PJLINK_CLASS)
|
||||
PJLINK_HEADER = '{prefix}{linkclass}'.format(prefix=PJLINK_PREFIX, linkclass=PJLINK_CLASS)
|
||||
PJLINK_SUFFIX = CR
|
||||
|
||||
|
||||
@ -91,7 +91,7 @@ class PJLink1(QTcpSocket):
|
||||
:param poll_time: Time (in seconds) to poll connected projector
|
||||
:param socket_timeout: Time (in seconds) to abort the connection if no response
|
||||
"""
|
||||
log.debug('PJlink(args="%s" kwargs="%s")' % (args, kwargs))
|
||||
log.debug('PJlink(args={args} kwargs={kwargs})'.format(args=args, kwargs=kwargs))
|
||||
self.name = name
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
@ -147,7 +147,7 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
Reset projector-specific information to default
|
||||
"""
|
||||
log.debug('(%s) reset_information() connect status is %s' % (self.ip, self.state()))
|
||||
log.debug('({ip}) reset_information() connect status is {state}'.format(ip=self.ip, state=self.state()))
|
||||
self.power = S_OFF
|
||||
self.pjlink_name = None
|
||||
self.manufacturer = None
|
||||
@ -160,8 +160,10 @@ class PJLink1(QTcpSocket):
|
||||
self.source = None
|
||||
self.other_info = None
|
||||
if hasattr(self, 'timer'):
|
||||
log.debug('({ip}): Calling timer.stop()'.format(ip=self.ip))
|
||||
self.timer.stop()
|
||||
if hasattr(self, 'socket_timer'):
|
||||
log.debug('({ip}): Calling socket_timer.stop()'.format(ip=self.ip))
|
||||
self.socket_timer.stop()
|
||||
self.send_queue = []
|
||||
self.send_busy = False
|
||||
@ -170,7 +172,7 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
Connects signals to methods when thread is started.
|
||||
"""
|
||||
log.debug('(%s) Thread starting' % self.ip)
|
||||
log.debug('({ip}) Thread starting'.format(ip=self.ip))
|
||||
self.i_am_running = True
|
||||
self.connected.connect(self.check_login)
|
||||
self.disconnected.connect(self.disconnect_from_host)
|
||||
@ -180,7 +182,7 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
Cleanups when thread is stopped.
|
||||
"""
|
||||
log.debug('(%s) Thread stopped' % self.ip)
|
||||
log.debug('({ip}) Thread stopped'.format(ip=self.ip))
|
||||
try:
|
||||
self.connected.disconnect(self.check_login)
|
||||
except TypeError:
|
||||
@ -206,7 +208,7 @@ class PJLink1(QTcpSocket):
|
||||
Aborts connection and closes socket in case of brain-dead projectors.
|
||||
Should normally be called by socket_timer().
|
||||
"""
|
||||
log.debug('(%s) socket_abort() - Killing connection' % self.ip)
|
||||
log.debug('({ip}) socket_abort() - Killing connection'.format(ip=self.ip))
|
||||
self.disconnect_from_host(abort=True)
|
||||
|
||||
def poll_loop(self):
|
||||
@ -216,7 +218,7 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
if self.state() != self.ConnectedState:
|
||||
return
|
||||
log.debug('(%s) Updating projector status' % self.ip)
|
||||
log.debug('({ip}) Updating projector status'.format(ip=self.ip))
|
||||
# Reset timer in case we were called from a set command
|
||||
if self.timer.interval() < self.poll_time:
|
||||
# Reset timer to 5 seconds
|
||||
@ -276,11 +278,17 @@ class PJLink1(QTcpSocket):
|
||||
self.status_connect = S_CONNECTED
|
||||
self.projector_status = status
|
||||
(status_code, status_message) = self._get_status(self.status_connect)
|
||||
log.debug('(%s) status_connect: %s: %s' % (self.ip, status_code, status_message if msg is None else msg))
|
||||
log.debug('({ip}) status_connect: {code}: "{message}"'.format(ip=self.ip,
|
||||
code=status_code,
|
||||
message=status_message if msg is None else msg))
|
||||
(status_code, status_message) = self._get_status(self.projector_status)
|
||||
log.debug('(%s) projector_status: %s: %s' % (self.ip, status_code, status_message if msg is None else msg))
|
||||
log.debug('({ip}) projector_status: {code}: "{message}"'.format(ip=self.ip,
|
||||
code=status_code,
|
||||
message=status_message if msg is None else msg))
|
||||
(status_code, status_message) = self._get_status(self.error_status)
|
||||
log.debug('(%s) error_status: %s: %s' % (self.ip, status_code, status_message if msg is None else msg))
|
||||
log.debug('({ip}) error_status: {code}: "{message}"'.format(ip=self.ip,
|
||||
code=status_code,
|
||||
message=status_message if msg is None else msg))
|
||||
self.changeStatus.emit(self.ip, status, message)
|
||||
|
||||
@pyqtSlot()
|
||||
@ -289,29 +297,31 @@ class PJLink1(QTcpSocket):
|
||||
Processes the initial connection and authentication (if needed).
|
||||
Starts poll timer if connection is established.
|
||||
|
||||
NOTE: Qt md5 hash function doesn't work with projector authentication. Use the python md5 hash function.
|
||||
|
||||
:param data: Optional data if called from another routine
|
||||
"""
|
||||
log.debug('(%s) check_login(data="%s")' % (self.ip, data))
|
||||
log.debug('({ip}) check_login(data="{data}")'.format(ip=self.ip, data=data))
|
||||
if data is None:
|
||||
# Reconnected setup?
|
||||
if not self.waitForReadyRead(2000):
|
||||
# Possible timeout issue
|
||||
log.error('(%s) Socket timeout waiting for login' % self.ip)
|
||||
log.error('({ip}) Socket timeout waiting for login'.format(ip=self.ip))
|
||||
self.change_status(E_SOCKET_TIMEOUT)
|
||||
return
|
||||
read = self.readLine(self.maxSize)
|
||||
dontcare = self.readLine(self.maxSize) # Clean out the trailing \r\n
|
||||
if read is None:
|
||||
log.warn('(%s) read is None - socket error?' % self.ip)
|
||||
log.warn('({ip}) read is None - socket error?'.format(ip=self.ip))
|
||||
return
|
||||
elif len(read) < 8:
|
||||
log.warn('(%s) Not enough data read)' % self.ip)
|
||||
log.warn('({ip}) Not enough data read)'.format(ip=self.ip))
|
||||
return
|
||||
data = decode(read, 'ascii')
|
||||
# Possibility of extraneous data on input when reading.
|
||||
# Clean out extraneous characters in buffer.
|
||||
dontcare = self.readLine(self.maxSize)
|
||||
log.debug('(%s) check_login() read "%s"' % (self.ip, data.strip()))
|
||||
log.debug('({ip}) check_login() read "{data}"'.format(ip=self.ip, data=data.strip()))
|
||||
# At this point, we should only have the initial login prompt with
|
||||
# possible authentication
|
||||
# PJLink initial login will be:
|
||||
@ -326,26 +336,35 @@ class PJLink1(QTcpSocket):
|
||||
else:
|
||||
# Process initial connection
|
||||
data_check = data.strip().split(' ')
|
||||
log.debug('(%s) data_check="%s"' % (self.ip, data_check))
|
||||
log.debug('({ip}) data_check="{data}"'.format(ip=self.ip, data=data_check))
|
||||
# Check for projector reporting an error
|
||||
if data_check[1].upper() == 'ERRA':
|
||||
# Authentication error
|
||||
self.disconnect_from_host()
|
||||
self.change_status(E_AUTHENTICATION)
|
||||
log.debug('(%s) emitting projectorAuthentication() signal' % self.name)
|
||||
log.debug('({ip}) emitting projectorAuthentication() signal'.format(ip=self.name))
|
||||
return
|
||||
elif data_check[1] == '0' and self.pin is not None:
|
||||
# Pin set and no authentication needed
|
||||
log.warning('({ip}) Regular connection but PIN set'.format(ip=self.name))
|
||||
self.disconnect_from_host()
|
||||
self.change_status(E_AUTHENTICATION)
|
||||
log.debug('(%s) emitting projectorNoAuthentication() signal' % self.name)
|
||||
log.debug('({ip}) Emitting projectorNoAuthentication() signal'.format(ip=self.name))
|
||||
self.projectorNoAuthentication.emit(self.name)
|
||||
return
|
||||
elif data_check[1] == '1':
|
||||
# Authenticated login with salt
|
||||
log.debug('(%s) Setting hash with salt="%s"' % (self.ip, data_check[2]))
|
||||
log.debug('(%s) pin="%s"' % (self.ip, self.pin))
|
||||
salt = qmd5_hash(salt=data_check[2].encode('ascii'), data=self.pin.encode('ascii'))
|
||||
if self.pin is None:
|
||||
log.warning('({ip}) Authenticated connection but no pin set'.format(ip=self.name))
|
||||
self.disconnect_from_host()
|
||||
self.change_status(E_AUTHENTICATION)
|
||||
log.debug('({ip}) Emitting projectorAuthentication() signal'.format(ip=self.name))
|
||||
self.projectorAuthentication.emit(self.name)
|
||||
return
|
||||
else:
|
||||
log.debug('({ip}) Setting hash with salt="{data}"'.format(ip=self.ip, data=data_check[2]))
|
||||
log.debug('({ip}) pin="{data}"'.format(ip=self.ip, data=self.pin))
|
||||
salt = md5_hash(salt=data_check[2].encode('ascii'), data=self.pin.encode('ascii'))
|
||||
else:
|
||||
salt = None
|
||||
# We're connected at this point, so go ahead and do regular I/O
|
||||
@ -355,7 +374,7 @@ class PJLink1(QTcpSocket):
|
||||
self.send_command(cmd='CLSS', salt=salt)
|
||||
self.waitForReadyRead()
|
||||
if (not self.no_poll) and (self.state() == self.ConnectedState):
|
||||
log.debug('(%s) Starting timer' % self.ip)
|
||||
log.debug('({ip}) Starting timer'.format(ip=self.ip))
|
||||
self.timer.setInterval(2000) # Set 2 seconds for initial information
|
||||
self.timer.start()
|
||||
|
||||
@ -364,15 +383,15 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
Socket interface to retrieve data.
|
||||
"""
|
||||
log.debug('(%s) get_data(): Reading data' % self.ip)
|
||||
log.debug('({ip}) get_data(): Reading data'.format(ip=self.ip))
|
||||
if self.state() != self.ConnectedState:
|
||||
log.debug('(%s) get_data(): Not connected - returning' % self.ip)
|
||||
log.debug('({ip}) get_data(): Not connected - returning'.format(ip=self.ip))
|
||||
self.send_busy = False
|
||||
return
|
||||
read = self.readLine(self.maxSize)
|
||||
if read == -1:
|
||||
# No data available
|
||||
log.debug('(%s) get_data(): No data available (-1)' % self.ip)
|
||||
log.debug('({ip}) get_data(): No data available (-1)'.format(ip=self.ip))
|
||||
self.send_busy = False
|
||||
self.projectorReceivedData.emit()
|
||||
return
|
||||
@ -382,11 +401,11 @@ class PJLink1(QTcpSocket):
|
||||
data = data_in.strip()
|
||||
if len(data) < 7:
|
||||
# Not enough data for a packet
|
||||
log.debug('(%s) get_data(): Packet length < 7: "%s"' % (self.ip, data))
|
||||
log.debug('({ip}) get_data(): Packet length < 7: "{data}"'.format(ip=self.ip, data=data))
|
||||
self.send_busy = False
|
||||
self.projectorReceivedData.emit()
|
||||
return
|
||||
log.debug('(%s) get_data(): Checking new data "%s"' % (self.ip, data))
|
||||
log.debug('({ip}) get_data(): Checking new data "{data}"'.format(ip=self.ip, data=data))
|
||||
if data.upper().startswith('PJLINK'):
|
||||
# Reconnected from remote host disconnect ?
|
||||
self.check_login(data)
|
||||
@ -394,7 +413,7 @@ class PJLink1(QTcpSocket):
|
||||
self.projectorReceivedData.emit()
|
||||
return
|
||||
elif '=' not in data:
|
||||
log.warn('(%s) get_data(): Invalid packet received' % self.ip)
|
||||
log.warn('({ip}) get_data(): Invalid packet received'.format(ip=self.ip))
|
||||
self.send_busy = False
|
||||
self.projectorReceivedData.emit()
|
||||
return
|
||||
@ -402,15 +421,15 @@ class PJLink1(QTcpSocket):
|
||||
try:
|
||||
(prefix, class_, cmd, data) = (data_split[0][0], data_split[0][1], data_split[0][2:], data_split[1])
|
||||
except ValueError as e:
|
||||
log.warn('(%s) get_data(): Invalid packet - expected header + command + data' % self.ip)
|
||||
log.warn('(%s) get_data(): Received data: "%s"' % (self.ip, read))
|
||||
log.warn('({ip}) get_data(): Invalid packet - expected header + command + data'.format(ip=self.ip))
|
||||
log.warn('({ip}) get_data(): Received data: "{data}"'.format(ip=self.ip, data=data_in.strip()))
|
||||
self.change_status(E_INVALID_DATA)
|
||||
self.send_busy = False
|
||||
self.projectorReceivedData.emit()
|
||||
return
|
||||
|
||||
if not (self.pjlink_class in PJLINK_VALID_CMD and cmd in PJLINK_VALID_CMD[self.pjlink_class]):
|
||||
log.warn('(%s) get_data(): Invalid packet - unknown command "%s"' % (self.ip, cmd))
|
||||
log.warn('({ip}) get_data(): Invalid packet - unknown command "{data}"'.format(ip=self.ip, data=cmd))
|
||||
self.send_busy = False
|
||||
self.projectorReceivedData.emit()
|
||||
return
|
||||
@ -424,7 +443,7 @@ class PJLink1(QTcpSocket):
|
||||
|
||||
:param err: Error code
|
||||
"""
|
||||
log.debug('(%s) get_error(err=%s): %s' % (self.ip, err, self.errorString()))
|
||||
log.debug('({ip}) get_error(err={error}): {data}'.format(ip=self.ip, error=err, data=self.errorString()))
|
||||
if err <= 18:
|
||||
# QSocket errors. Redefined in projector.constants so we don't mistake
|
||||
# them for system errors
|
||||
@ -453,32 +472,35 @@ class PJLink1(QTcpSocket):
|
||||
:param queue: Option to force add to queue rather than sending directly
|
||||
"""
|
||||
if self.state() != self.ConnectedState:
|
||||
log.warn('(%s) send_command(): Not connected - returning' % self.ip)
|
||||
log.warn('({ip}) send_command(): Not connected - returning'.format(ip=self.ip))
|
||||
self.send_queue = []
|
||||
return
|
||||
self.projectorNetwork.emit(S_NETWORK_SENDING)
|
||||
log.debug('(%s) send_command(): Building cmd="%s" opts="%s" %s' % (self.ip,
|
||||
cmd,
|
||||
opts,
|
||||
'' if salt is None else 'with hash'))
|
||||
if salt is None:
|
||||
out = '%s%s %s%s' % (PJLINK_HEADER, cmd, opts, CR)
|
||||
else:
|
||||
out = '%s%s%s %s%s' % (salt, PJLINK_HEADER, cmd, opts, CR)
|
||||
log.debug('({ip}) send_command(): Building cmd="{command}" opts="{data}"{salt}'.format(ip=self.ip,
|
||||
command=cmd,
|
||||
data=opts,
|
||||
salt='' if salt is None
|
||||
else ' with hash'))
|
||||
out = '{salt}{header}{command} {options}{suffix}'.format(salt="" if salt is None else salt,
|
||||
header=PJLINK_HEADER,
|
||||
command=cmd,
|
||||
options=opts,
|
||||
suffix=CR)
|
||||
if out in self.send_queue:
|
||||
# Already there, so don't add
|
||||
log.debug('(%s) send_command(out="%s") Already in queue - skipping' % (self.ip, out.strip()))
|
||||
log.debug('({ip}) send_command(out="{data}") Already in queue - skipping'.format(ip=self.ip,
|
||||
data=out.strip()))
|
||||
elif not queue and len(self.send_queue) == 0:
|
||||
# Nothing waiting to send, so just send it
|
||||
log.debug('(%s) send_command(out="%s") Sending data' % (self.ip, out.strip()))
|
||||
log.debug('({ip}) send_command(out="{data}") Sending data'.format(ip=self.ip, data=out.strip()))
|
||||
return self._send_command(data=out)
|
||||
else:
|
||||
log.debug('(%s) send_command(out="%s") adding to queue' % (self.ip, out.strip()))
|
||||
log.debug('({ip}) send_command(out="{data}") adding to queue'.format(ip=self.ip, data=out.strip()))
|
||||
self.send_queue.append(out)
|
||||
self.projectorReceivedData.emit()
|
||||
log.debug('(%s) send_command(): send_busy is %s' % (self.ip, self.send_busy))
|
||||
log.debug('({ip}) send_command(): send_busy is {data}'.format(ip=self.ip, data=self.send_busy))
|
||||
if not self.send_busy:
|
||||
log.debug('(%s) send_command() calling _send_string()')
|
||||
log.debug('({ip}) send_command() calling _send_string()'.format(ip=self.ip))
|
||||
self._send_command()
|
||||
|
||||
@pyqtSlot()
|
||||
@ -488,10 +510,10 @@ class PJLink1(QTcpSocket):
|
||||
|
||||
:param data: Immediate data to send
|
||||
"""
|
||||
log.debug('(%s) _send_string()' % self.ip)
|
||||
log.debug('(%s) _send_string(): Connection status: %s' % (self.ip, self.state()))
|
||||
log.debug('({ip}) _send_string()'.format(ip=self.ip))
|
||||
log.debug('({ip}) _send_string(): Connection status: {data}'.format(ip=self.ip, data=self.state()))
|
||||
if self.state() != self.ConnectedState:
|
||||
log.debug('(%s) _send_string() Not connected - abort' % self.ip)
|
||||
log.debug('({ip}) _send_string() Not connected - abort'.format(ip=self.ip))
|
||||
self.send_queue = []
|
||||
self.send_busy = False
|
||||
return
|
||||
@ -500,30 +522,26 @@ class PJLink1(QTcpSocket):
|
||||
return
|
||||
if data is not None:
|
||||
out = data
|
||||
log.debug('(%s) _send_string(data=%s)' % (self.ip, out.strip()))
|
||||
log.debug('({ip}) _send_string(data="{data}")'.format(ip=self.ip, data=out.strip()))
|
||||
elif len(self.send_queue) != 0:
|
||||
out = self.send_queue.pop(0)
|
||||
log.debug('(%s) _send_string(queued data=%s)' % (self.ip, out.strip()))
|
||||
log.debug('({ip}) _send_string(queued data="{data}"%s)'.format(ip=self.ip, data=out.strip()))
|
||||
else:
|
||||
# No data to send
|
||||
log.debug('(%s) _send_string(): No data to send' % self.ip)
|
||||
log.debug('({ip}) _send_string(): No data to send'.format(ip=self.ip))
|
||||
self.send_busy = False
|
||||
return
|
||||
self.send_busy = True
|
||||
log.debug('(%s) _send_string(): Sending "%s"' % (self.ip, out.strip()))
|
||||
log.debug('(%s) _send_string(): Queue = %s' % (self.ip, self.send_queue))
|
||||
log.debug('({ip}) _send_string(): Sending "{data}"'.format(ip=self.ip, data=out.strip()))
|
||||
log.debug('({ip}) _send_string(): Queue = {data}'.format(ip=self.ip, data=self.send_queue))
|
||||
self.socket_timer.start()
|
||||
try:
|
||||
self.projectorNetwork.emit(S_NETWORK_SENDING)
|
||||
sent = self.write(out.encode('ascii'))
|
||||
self.waitForBytesWritten(2000) # 2 seconds should be enough
|
||||
if sent == -1:
|
||||
# Network error?
|
||||
self.change_status(E_NETWORK,
|
||||
translate('OpenLP.PJLink1', 'Error while sending data to projector'))
|
||||
except SocketError as e:
|
||||
self.disconnect_from_host(abort=True)
|
||||
self.changeStatus(E_NETWORK, '%s : %s' % (e.error(), e.errorString()))
|
||||
self.projectorNetwork.emit(S_NETWORK_SENDING)
|
||||
sent = self.write(out.encode('ascii'))
|
||||
self.waitForBytesWritten(2000) # 2 seconds should be enough
|
||||
if sent == -1:
|
||||
# Network error?
|
||||
self.change_status(E_NETWORK,
|
||||
translate('OpenLP.PJLink1', 'Error while sending data to projector'))
|
||||
|
||||
def process_command(self, cmd, data):
|
||||
"""
|
||||
@ -532,19 +550,21 @@ class PJLink1(QTcpSocket):
|
||||
:param cmd: Command to process
|
||||
:param data: Data being processed
|
||||
"""
|
||||
log.debug('(%s) Processing command "%s"' % (self.ip, cmd))
|
||||
log.debug('({ip}) Processing command "{data}"'.format(ip=self.ip, data=cmd))
|
||||
if data in PJLINK_ERRORS:
|
||||
# Oops - projector error
|
||||
log.error('({ip}) Projector returned error "{data}"'.format(ip=self.ip, data=data))
|
||||
if data.upper() == 'ERRA':
|
||||
# Authentication error
|
||||
self.disconnect_from_host()
|
||||
self.change_status(E_AUTHENTICATION)
|
||||
log.debug('(%s) emitting projectorAuthentication() signal' % self.ip)
|
||||
log.debug('({ip}) emitting projectorAuthentication() signal'.format(ip=self.ip))
|
||||
self.projectorAuthentication.emit(self.name)
|
||||
elif data.upper() == 'ERR1':
|
||||
# Undefined command
|
||||
self.change_status(E_UNDEFINED, '%s "%s"' %
|
||||
(translate('OpenLP.PJLink1', 'Undefined command:'), cmd))
|
||||
self.change_status(E_UNDEFINED, '{error} "{data}"'.format(error=translate('OpenLP.PJLink1',
|
||||
'Undefined command:'),
|
||||
data=cmd))
|
||||
elif data.upper() == 'ERR2':
|
||||
# Invalid parameter
|
||||
self.change_status(E_PARAMETER)
|
||||
@ -559,7 +579,7 @@ class PJLink1(QTcpSocket):
|
||||
return
|
||||
# Command succeeded - no extra information
|
||||
elif data.upper() == 'OK':
|
||||
log.debug('(%s) Command returned OK' % self.ip)
|
||||
log.debug('({ip}) Command returned OK'.format(ip=self.ip))
|
||||
# A command returned successfully, recheck data
|
||||
self.send_busy = False
|
||||
self.projectorReceivedData.emit()
|
||||
@ -568,7 +588,7 @@ class PJLink1(QTcpSocket):
|
||||
if cmd in self.PJLINK1_FUNC:
|
||||
self.PJLINK1_FUNC[cmd](data)
|
||||
else:
|
||||
log.warn('(%s) Invalid command %s' % (self.ip, cmd))
|
||||
log.warn('({ip}) Invalid command {data}'.format(ip=self.ip, data=cmd))
|
||||
self.send_busy = False
|
||||
self.projectorReceivedData.emit()
|
||||
|
||||
@ -587,7 +607,7 @@ class PJLink1(QTcpSocket):
|
||||
fill = {'Hours': int(data_dict[0]), 'On': False if data_dict[1] == '0' else True}
|
||||
except ValueError:
|
||||
# In case of invalid entry
|
||||
log.warn('(%s) process_lamp(): Invalid data "%s"' % (self.ip, data))
|
||||
log.warn('({ip}) process_lamp(): Invalid data "{data}"'.format(ip=self.ip, data=data))
|
||||
return
|
||||
lamps.append(fill)
|
||||
data_dict.pop(0) # Remove lamp hours
|
||||
@ -614,7 +634,7 @@ class PJLink1(QTcpSocket):
|
||||
self.send_command('INST')
|
||||
else:
|
||||
# Log unknown status response
|
||||
log.warn('Unknown power response: %s' % data)
|
||||
log.warn('({ip}) Unknown power response: {data}'.format(ip=self.ip, data=data))
|
||||
return
|
||||
|
||||
def process_avmt(self, data):
|
||||
@ -639,7 +659,7 @@ class PJLink1(QTcpSocket):
|
||||
shutter = True
|
||||
mute = True
|
||||
else:
|
||||
log.warn('Unknown shutter response: %s' % data)
|
||||
log.warn('({ip}) Unknown shutter response: {data}'.format(ip=self.ip, data=data))
|
||||
update_icons = shutter != self.shutter
|
||||
update_icons = update_icons or mute != self.mute
|
||||
self.shutter = shutter
|
||||
@ -656,6 +676,7 @@ class PJLink1(QTcpSocket):
|
||||
:param data: Currently selected source
|
||||
"""
|
||||
self.source = data
|
||||
log.info('({ip}) Setting data source to "{data}"'.format(ip=self.ip, data=self.source))
|
||||
return
|
||||
|
||||
def process_clss(self, data):
|
||||
@ -674,7 +695,8 @@ class PJLink1(QTcpSocket):
|
||||
else:
|
||||
clss = data
|
||||
self.pjlink_class = clss
|
||||
log.debug('(%s) Setting pjlink_class for this projector to "%s"' % (self.ip, self.pjlink_class))
|
||||
log.debug('({ip}) Setting pjlink_class for this projector to "{data}"'.format(ip=self.ip,
|
||||
data=self.pjlink_class))
|
||||
return
|
||||
|
||||
def process_name(self, data):
|
||||
@ -685,6 +707,7 @@ class PJLink1(QTcpSocket):
|
||||
:param data: Projector name
|
||||
"""
|
||||
self.pjlink_name = data
|
||||
log.debug('({ip}) Setting projector PJLink name to "{data}"'.format(ip=self.ip, data=self.pjlink_name))
|
||||
return
|
||||
|
||||
def process_inf1(self, data):
|
||||
@ -695,6 +718,7 @@ class PJLink1(QTcpSocket):
|
||||
:param data: Projector manufacturer
|
||||
"""
|
||||
self.manufacturer = data
|
||||
log.debug('({ip}) Setting projector manufacturer data to "{data}"'.format(ip=self.ip, data=self.manufacturer))
|
||||
return
|
||||
|
||||
def process_inf2(self, data):
|
||||
@ -705,6 +729,7 @@ class PJLink1(QTcpSocket):
|
||||
:param data: Model name
|
||||
"""
|
||||
self.model = data
|
||||
log.debug('({ip}) Setting projector model to "{data}"'.format(ip=self.ip, data=self.model))
|
||||
return
|
||||
|
||||
def process_info(self, data):
|
||||
@ -715,6 +740,7 @@ class PJLink1(QTcpSocket):
|
||||
:param data: Projector other info
|
||||
"""
|
||||
self.other_info = data
|
||||
log.debug('({ip}) Setting projector other_info to "{data}"'.format(ip=self.ip, data=self.other_info))
|
||||
return
|
||||
|
||||
def process_inst(self, data):
|
||||
@ -731,6 +757,8 @@ class PJLink1(QTcpSocket):
|
||||
sources.sort()
|
||||
self.source_available = sources
|
||||
self.projectorUpdateIcons.emit()
|
||||
log.debug('({ip}) Setting projector sources_available to "{data}"'.format(ip=self.ip,
|
||||
data=self.source_available))
|
||||
return
|
||||
|
||||
def process_erst(self, data):
|
||||
@ -780,7 +808,7 @@ class PJLink1(QTcpSocket):
|
||||
Initiate connection to projector.
|
||||
"""
|
||||
if self.state() == self.ConnectedState:
|
||||
log.warn('(%s) connect_to_host(): Already connected - returning' % self.ip)
|
||||
log.warn('({ip}) connect_to_host(): Already connected - returning'.format(ip=self.ip))
|
||||
return
|
||||
self.change_status(S_CONNECTING)
|
||||
self.connectToHost(self.ip, self.port if type(self.port) is int else int(self.port))
|
||||
@ -792,9 +820,9 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
if abort or self.state() != self.ConnectedState:
|
||||
if abort:
|
||||
log.warn('(%s) disconnect_from_host(): Aborting connection' % self.ip)
|
||||
log.warn('({ip}) disconnect_from_host(): Aborting connection'.format(ip=self.ip))
|
||||
else:
|
||||
log.warn('(%s) disconnect_from_host(): Not connected - returning' % self.ip)
|
||||
log.warn('({ip}) disconnect_from_host(): Not connected - returning'.format(ip=self.ip))
|
||||
self.reset_information()
|
||||
self.disconnectFromHost()
|
||||
try:
|
||||
@ -804,8 +832,8 @@ class PJLink1(QTcpSocket):
|
||||
if abort:
|
||||
self.change_status(E_NOT_CONNECTED)
|
||||
else:
|
||||
log.debug('(%s) disconnect_from_host() Current status %s' % (self.ip,
|
||||
self._get_status(self.status_connect)[0]))
|
||||
log.debug('({ip}) disconnect_from_host() '
|
||||
'Current status {data}'.format(ip=self.ip, data=self._get_status(self.status_connect)[0]))
|
||||
if self.status_connect != E_NOT_CONNECTED:
|
||||
self.change_status(S_NOT_CONNECTED)
|
||||
self.reset_information()
|
||||
@ -815,60 +843,70 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
Send command to retrieve available source inputs.
|
||||
"""
|
||||
log.debug('({ip}) Sending INST command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='INST')
|
||||
|
||||
def get_error_status(self):
|
||||
"""
|
||||
Send command to retrieve currently known errors.
|
||||
"""
|
||||
log.debug('({ip}) Sending ERST command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='ERST')
|
||||
|
||||
def get_input_source(self):
|
||||
"""
|
||||
Send command to retrieve currently selected source input.
|
||||
"""
|
||||
log.debug('({ip}) Sending INPT command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='INPT')
|
||||
|
||||
def get_lamp_status(self):
|
||||
"""
|
||||
Send command to return the lap status.
|
||||
"""
|
||||
log.debug('({ip}) Sending LAMP command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='LAMP')
|
||||
|
||||
def get_manufacturer(self):
|
||||
"""
|
||||
Send command to retrieve manufacturer name.
|
||||
"""
|
||||
log.debug('({ip}) Sending INF1 command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='INF1')
|
||||
|
||||
def get_model(self):
|
||||
"""
|
||||
Send command to retrieve the model name.
|
||||
"""
|
||||
log.debug('({ip}) Sending INF2 command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='INF2')
|
||||
|
||||
def get_name(self):
|
||||
"""
|
||||
Send command to retrieve name as set by end-user (if set).
|
||||
"""
|
||||
log.debug('({ip}) Sending NAME command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='NAME')
|
||||
|
||||
def get_other_info(self):
|
||||
"""
|
||||
Send command to retrieve extra info set by manufacturer.
|
||||
"""
|
||||
log.debug('({ip}) Sending INFO command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='INFO')
|
||||
|
||||
def get_power_status(self):
|
||||
"""
|
||||
Send command to retrieve power status.
|
||||
"""
|
||||
log.debug('({ip}) Sending POWR command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='POWR')
|
||||
|
||||
def get_shutter_status(self):
|
||||
"""
|
||||
Send command to retrieve shutter status.
|
||||
"""
|
||||
log.debug('({ip}) Sending AVMT command'.format(ip=self.ip))
|
||||
return self.send_command(cmd='AVMT')
|
||||
|
||||
def set_input_source(self, src=None):
|
||||
@ -878,12 +916,12 @@ class PJLink1(QTcpSocket):
|
||||
|
||||
:param src: Video source to select in projector
|
||||
"""
|
||||
log.debug('(%s) set_input_source(src=%s)' % (self.ip, src))
|
||||
log.debug('({ip}) set_input_source(src="{data}")'.format(ip=self.ip, data=src))
|
||||
if self.source_available is None:
|
||||
return
|
||||
elif src not in self.source_available:
|
||||
return
|
||||
log.debug('(%s) Setting input source to %s' % (self.ip, src))
|
||||
log.debug('({ip}) Setting input source to "{data}"'.format(ip=self.ip, data=src))
|
||||
self.send_command(cmd='INPT', opts=src)
|
||||
self.poll_loop()
|
||||
|
||||
@ -891,6 +929,7 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
Send command to turn power to on.
|
||||
"""
|
||||
log.debug('({ip}) Setting POWR to 1 (on)'.format(ip=self.ip))
|
||||
self.send_command(cmd='POWR', opts='1')
|
||||
self.poll_loop()
|
||||
|
||||
@ -898,6 +937,7 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
Send command to turn power to standby.
|
||||
"""
|
||||
log.debug('({ip}) Setting POWR to 0 (standby)'.format(ip=self.ip))
|
||||
self.send_command(cmd='POWR', opts='0')
|
||||
self.poll_loop()
|
||||
|
||||
@ -905,6 +945,7 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
Send command to set shutter to closed position.
|
||||
"""
|
||||
log.debug('({ip}) Setting AVMT to 11 (shutter closed)'.format(ip=self.ip))
|
||||
self.send_command(cmd='AVMT', opts='11')
|
||||
self.poll_loop()
|
||||
|
||||
@ -912,5 +953,6 @@ class PJLink1(QTcpSocket):
|
||||
"""
|
||||
Send command to set shutter to open position.
|
||||
"""
|
||||
log.debug('({ip}) Setting AVMT to "10" (shutter open)'.format(ip=self.ip))
|
||||
self.send_command(cmd='AVMT', opts='10')
|
||||
self.poll_loop()
|
||||
|
@ -23,13 +23,12 @@
|
||||
Package to test the openlp.core.ui.projector.networkutils package.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
from openlp.core.common import verify_ip_address, md5_hash, qmd5_hash
|
||||
|
||||
from tests.resources.projector.data import TEST_PIN, TEST_SALT, TEST_HASH
|
||||
|
||||
salt = TEST_SALT
|
||||
pin = TEST_PIN
|
||||
test_hash = TEST_HASH
|
||||
@ -148,7 +147,7 @@ class testProjectorUtilities(TestCase):
|
||||
hash_ = qmd5_hash(salt=salt.encode('ascii'), data=pin.encode('ascii'))
|
||||
|
||||
# THEN: Validate return has is same
|
||||
self.assertEquals(hash_.decode('ascii'), test_hash, 'Qt-MD5 should have returned a good hash')
|
||||
self.assertEquals(hash_, test_hash.encode('ascii'), 'Qt-MD5 should have returned a good hash')
|
||||
|
||||
def test_qmd5_hash_bad(self):
|
||||
"""
|
||||
@ -158,7 +157,7 @@ class testProjectorUtilities(TestCase):
|
||||
hash_ = qmd5_hash(salt=pin.encode('ascii'), data=salt.encode('ascii'))
|
||||
|
||||
# THEN: return data is different
|
||||
self.assertNotEquals(hash_.decode('ascii'), test_hash, 'Qt-MD5 should have returned a bad hash')
|
||||
self.assertNotEquals(hash_, test_hash, 'Qt-MD5 should have returned a bad hash')
|
||||
|
||||
def test_md5_non_ascii_string(self):
|
||||
"""
|
||||
|
@ -26,13 +26,29 @@ Package to test the openlp.core.lib.projector.pjlink1 package.
|
||||
from unittest import TestCase
|
||||
|
||||
from openlp.core.lib.projector.pjlink1 import PJLink1
|
||||
from openlp.core.lib.projector.constants import E_PARAMETER, ERROR_STRING, S_OFF, S_STANDBY, S_WARMUP, S_ON, \
|
||||
S_COOLDOWN, PJLINK_POWR_STATUS
|
||||
|
||||
from tests.functional import patch
|
||||
from tests.resources.projector.data import TEST_PIN, TEST_SALT, TEST_CONNECT_AUTHENTICATE
|
||||
from tests.resources.projector.data import TEST_PIN, TEST_SALT, TEST_CONNECT_AUTHENTICATE, TEST_HASH
|
||||
|
||||
pjlink_test = PJLink1(name='test', ip='127.0.0.1', pin=TEST_PIN, no_poll=True)
|
||||
|
||||
|
||||
class DummyTimer(object):
|
||||
'''
|
||||
Dummy class to fake timers
|
||||
'''
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def start(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def stop(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class TestPJLink(TestCase):
|
||||
"""
|
||||
Tests for the PJLink module
|
||||
@ -41,13 +57,10 @@ class TestPJLink(TestCase):
|
||||
@patch.object(pjlink_test, 'send_command')
|
||||
@patch.object(pjlink_test, 'waitForReadyRead')
|
||||
@patch('openlp.core.common.qmd5_hash')
|
||||
def authenticated_connection_call_test(self,
|
||||
mock_qmd5_hash,
|
||||
mock_waitForReadyRead,
|
||||
mock_send_command,
|
||||
def test_authenticated_connection_call(self, mock_qmd5_hash, mock_waitForReadyRead, mock_send_command,
|
||||
mock_readyRead):
|
||||
"""
|
||||
Fix for projector connect with PJLink authentication exception. Ticket 92187.
|
||||
Ticket 92187: Fix for projector connect with PJLink authentication exception.
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
@ -61,9 +74,23 @@ class TestPJLink(TestCase):
|
||||
self.assertTrue(mock_qmd5_hash.called_with(TEST_PIN,
|
||||
"Connection request should have been called with TEST_PIN"))
|
||||
|
||||
def non_standard_class_reply_test(self):
|
||||
def test_projector_class(self):
|
||||
"""
|
||||
bugfix 1550891 - CLSS request returns non-standard 'Class N' reply
|
||||
Test class version from projector
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
|
||||
# WHEN: Process class response
|
||||
pjlink.process_clss('1')
|
||||
|
||||
# THEN: Projector class should be set to 1
|
||||
self.assertEquals(pjlink.pjlink_class, '1',
|
||||
'Projector should have returned class=1')
|
||||
|
||||
def test_non_standard_class_reply(self):
|
||||
"""
|
||||
Bugfix 1550891: CLSS request returns non-standard 'Class N' reply
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
@ -74,3 +101,284 @@ class TestPJLink(TestCase):
|
||||
# THEN: Projector class should be set with proper value
|
||||
self.assertEquals(pjlink.pjlink_class, '1',
|
||||
'Non-standard class reply should have set proper class')
|
||||
|
||||
@patch.object(pjlink_test, 'change_status')
|
||||
def test_status_change(self, mock_change_status):
|
||||
"""
|
||||
Test process_command call with ERR2 (Parameter) status
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
|
||||
# WHEN: process_command is called with "ERR2" status from projector
|
||||
pjlink.process_command('POWR', 'ERR2')
|
||||
|
||||
# THEN: change_status should have called change_status with E_UNDEFINED
|
||||
# as first parameter
|
||||
mock_change_status.called_with(E_PARAMETER,
|
||||
'change_status should have been called with "{}"'.format(
|
||||
ERROR_STRING[E_PARAMETER]))
|
||||
|
||||
@patch.object(pjlink_test, 'process_inpt')
|
||||
def test_projector_return_ok(self, mock_process_inpt):
|
||||
"""
|
||||
Test projector calls process_inpt command when process_command is called with INPT option
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
|
||||
# WHEN: process_command is called with INST command and 31 input:
|
||||
pjlink.process_command('INPT', '31')
|
||||
|
||||
# THEN: process_inpt method should have been called with 31
|
||||
mock_process_inpt.called_with('31',
|
||||
"process_inpt should have been called with 31")
|
||||
|
||||
@patch.object(pjlink_test, 'projectorReceivedData')
|
||||
def test_projector_process_lamp(self, mock_projectorReceivedData):
|
||||
"""
|
||||
Test status lamp on/off and hours
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
|
||||
# WHEN: Call process_command with lamp data
|
||||
pjlink.process_command('LAMP', '22222 1')
|
||||
|
||||
# THEN: Lamp should have been set with status=ON and hours=22222
|
||||
self.assertEquals(pjlink.lamp[0]['On'], True,
|
||||
'Lamp power status should have been set to TRUE')
|
||||
self.assertEquals(pjlink.lamp[0]['Hours'], 22222,
|
||||
'Lamp hours should have been set to 22222')
|
||||
|
||||
@patch.object(pjlink_test, 'projectorReceivedData')
|
||||
def test_projector_process_multiple_lamp(self, mock_projectorReceivedData):
|
||||
"""
|
||||
Test status multiple lamp on/off and hours
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
|
||||
# WHEN: Call process_command with lamp data
|
||||
pjlink.process_command('LAMP', '11111 1 22222 0 33333 1')
|
||||
|
||||
# THEN: Lamp should have been set with proper lamp status
|
||||
self.assertEquals(len(pjlink.lamp), 3,
|
||||
'Projector should have 3 lamps specified')
|
||||
self.assertEquals(pjlink.lamp[0]['On'], True,
|
||||
'Lamp 1 power status should have been set to TRUE')
|
||||
self.assertEquals(pjlink.lamp[0]['Hours'], 11111,
|
||||
'Lamp 1 hours should have been set to 11111')
|
||||
self.assertEquals(pjlink.lamp[1]['On'], False,
|
||||
'Lamp 2 power status should have been set to FALSE')
|
||||
self.assertEquals(pjlink.lamp[1]['Hours'], 22222,
|
||||
'Lamp 2 hours should have been set to 22222')
|
||||
self.assertEquals(pjlink.lamp[2]['On'], True,
|
||||
'Lamp 3 power status should have been set to TRUE')
|
||||
self.assertEquals(pjlink.lamp[2]['Hours'], 33333,
|
||||
'Lamp 3 hours should have been set to 33333')
|
||||
|
||||
@patch.object(pjlink_test, 'projectorReceivedData')
|
||||
def test_projector_process_power_on(self, mock_projectorReceivedData):
|
||||
"""
|
||||
Test status power to ON
|
||||
"""
|
||||
# GIVEN: Test object and preset
|
||||
pjlink = pjlink_test
|
||||
pjlink.power = S_STANDBY
|
||||
|
||||
# WHEN: Call process_command with turn power on command
|
||||
pjlink.process_command('POWR', PJLINK_POWR_STATUS[S_ON])
|
||||
|
||||
# THEN: Power should be set to ON
|
||||
self.assertEquals(pjlink.power, S_ON, 'Power should have been set to ON')
|
||||
|
||||
@patch.object(pjlink_test, 'projectorReceivedData')
|
||||
def test_projector_process_power_off(self, mock_projectorReceivedData):
|
||||
"""
|
||||
Test status power to STANDBY
|
||||
"""
|
||||
# GIVEN: Test object and preset
|
||||
pjlink = pjlink_test
|
||||
pjlink.power = S_ON
|
||||
|
||||
# WHEN: Call process_command with turn power on command
|
||||
pjlink.process_command('POWR', PJLINK_POWR_STATUS[S_STANDBY])
|
||||
|
||||
# THEN: Power should be set to STANDBY
|
||||
self.assertEquals(pjlink.power, S_STANDBY, 'Power should have been set to STANDBY')
|
||||
|
||||
@patch.object(pjlink_test, 'projectorUpdateIcons')
|
||||
def test_projector_process_avmt_closed_unmuted(self, mock_projectorReceivedData):
|
||||
"""
|
||||
Test avmt status shutter closed and audio muted
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
pjlink.shutter = False
|
||||
pjlink.mute = True
|
||||
|
||||
# WHEN: Called with setting shutter closed and mute off
|
||||
pjlink.process_avmt('11')
|
||||
|
||||
# THEN: Shutter should be True and mute should be False
|
||||
self.assertTrue(pjlink.shutter, 'Shutter should have been set to closed')
|
||||
self.assertFalse(pjlink.mute, 'Audio should be off')
|
||||
|
||||
@patch.object(pjlink_test, 'projectorUpdateIcons')
|
||||
def test_projector_process_avmt_open_muted(self, mock_projectorReceivedData):
|
||||
"""
|
||||
Test avmt status shutter open and mute on
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
pjlink.shutter = True
|
||||
pjlink.mute = False
|
||||
|
||||
# WHEN: Called with setting shutter closed and mute on
|
||||
pjlink.process_avmt('21')
|
||||
|
||||
# THEN: Shutter should be closed and mute should be True
|
||||
self.assertFalse(pjlink.shutter, 'Shutter should have been set to closed')
|
||||
self.assertTrue(pjlink.mute, 'Audio should be off')
|
||||
|
||||
@patch.object(pjlink_test, 'projectorUpdateIcons')
|
||||
def test_projector_process_avmt_open_unmuted(self, mock_projectorReceivedData):
|
||||
"""
|
||||
Test avmt status shutter open and mute off off
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
pjlink.shutter = True
|
||||
pjlink.mute = True
|
||||
|
||||
# WHEN: Called with setting shutter to closed and mute on
|
||||
pjlink.process_avmt('30')
|
||||
|
||||
# THEN: Shutter should be closed and mute should be True
|
||||
self.assertFalse(pjlink.shutter, 'Shutter should have been set to open')
|
||||
self.assertFalse(pjlink.mute, 'Audio should be on')
|
||||
|
||||
@patch.object(pjlink_test, 'projectorUpdateIcons')
|
||||
def test_projector_process_avmt_closed_muted(self, mock_projectorReceivedData):
|
||||
"""
|
||||
Test avmt status shutter closed and mute off
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
pjlink.shutter = False
|
||||
pjlink.mute = False
|
||||
|
||||
# WHEN: Called with setting shutter to closed and mute on
|
||||
pjlink.process_avmt('31')
|
||||
|
||||
# THEN: Shutter should be closed and mute should be True
|
||||
self.assertTrue(pjlink.shutter, 'Shutter should have been set to closed')
|
||||
self.assertTrue(pjlink.mute, 'Audio should be on')
|
||||
|
||||
def test_projector_process_input(self):
|
||||
"""
|
||||
Test input source status shows current input
|
||||
"""
|
||||
# GIVEN: Test object
|
||||
pjlink = pjlink_test
|
||||
pjlink.source = '0'
|
||||
|
||||
# WHEN: Called with input source
|
||||
pjlink.process_inpt('1')
|
||||
|
||||
# THEN: Input selected should reflect current input
|
||||
self.assertEquals(pjlink.source, '1', 'Input source should be set to "1"')
|
||||
|
||||
def test_projector_reset_information(self):
|
||||
"""
|
||||
Test reset_information() resets all information and stops timers
|
||||
"""
|
||||
# GIVEN: Test object and test data
|
||||
pjlink = pjlink_test
|
||||
pjlink.power = S_ON
|
||||
pjlink.pjlink_name = 'OPENLPTEST'
|
||||
pjlink.manufacturer = 'PJLINK'
|
||||
pjlink.model = '1'
|
||||
pjlink.shutter = True
|
||||
pjlink.mute = True
|
||||
pjlink.lamp = True
|
||||
pjlink.fan = True
|
||||
pjlink.source_available = True
|
||||
pjlink.other_info = 'ANOTHER TEST'
|
||||
pjlink.send_queue = True
|
||||
pjlink.send_busy = True
|
||||
pjlink.timer = DummyTimer()
|
||||
pjlink.socket_timer = DummyTimer()
|
||||
|
||||
# WHEN: reset_information() is called
|
||||
with patch.object(pjlink.timer, 'stop') as mock_timer:
|
||||
with patch.object(pjlink.socket_timer, 'stop') as mock_socket_timer:
|
||||
pjlink.reset_information()
|
||||
|
||||
# THEN: All information should be reset and timers stopped
|
||||
self.assertEquals(pjlink.power, S_OFF, 'Projector power should be OFF')
|
||||
self.assertIsNone(pjlink.pjlink_name, 'Projector pjlink_name should be None')
|
||||
self.assertIsNone(pjlink.manufacturer, 'Projector manufacturer should be None')
|
||||
self.assertIsNone(pjlink.model, 'Projector model should be None')
|
||||
self.assertIsNone(pjlink.shutter, 'Projector shutter should be None')
|
||||
self.assertIsNone(pjlink.mute, 'Projector shuttter should be None')
|
||||
self.assertIsNone(pjlink.lamp, 'Projector lamp should be None')
|
||||
self.assertIsNone(pjlink.fan, 'Projector fan should be None')
|
||||
self.assertIsNone(pjlink.source_available, 'Projector source_available should be None')
|
||||
self.assertIsNone(pjlink.source, 'Projector source should be None')
|
||||
self.assertIsNone(pjlink.other_info, 'Projector other_info should be None')
|
||||
self.assertEquals(pjlink.send_queue, [], 'Projector send_queue should be an empty list')
|
||||
self.assertFalse(pjlink.send_busy, 'Projector send_busy should be False')
|
||||
self.assertTrue(mock_timer.called, 'Projector timer.stop() should have been called')
|
||||
self.assertTrue(mock_socket_timer.called, 'Projector socket_timer.stop() should have been called')
|
||||
|
||||
@patch.object(pjlink_test, 'send_command')
|
||||
@patch.object(pjlink_test, 'waitForReadyRead')
|
||||
@patch.object(pjlink_test, 'projectorAuthentication')
|
||||
@patch.object(pjlink_test, 'timer')
|
||||
@patch.object(pjlink_test, 'socket_timer')
|
||||
def test_bug_1593882_no_pin_authenticated_connection(self, mock_socket_timer,
|
||||
mock_timer,
|
||||
mock_authentication,
|
||||
mock_ready_read,
|
||||
mock_send_command):
|
||||
"""
|
||||
Test bug 1593882 no pin and authenticated request exception
|
||||
"""
|
||||
# GIVEN: Test object and mocks
|
||||
pjlink = pjlink_test
|
||||
pjlink.pin = None
|
||||
mock_ready_read.return_value = True
|
||||
|
||||
# WHEN: call with authentication request and pin not set
|
||||
pjlink.check_login(data=TEST_CONNECT_AUTHENTICATE)
|
||||
|
||||
# THEN: No Authentication signal should have been sent
|
||||
mock_authentication.emit.assert_called_with(pjlink.name)
|
||||
|
||||
@patch.object(pjlink_test, 'waitForReadyRead')
|
||||
@patch.object(pjlink_test, 'state')
|
||||
@patch.object(pjlink_test, '_send_command')
|
||||
@patch.object(pjlink_test, 'timer')
|
||||
@patch.object(pjlink_test, 'socket_timer')
|
||||
def test_bug_1593883_pjlink_authentication(self, mock_socket_timer,
|
||||
mock_timer,
|
||||
mock_send_command,
|
||||
mock_state,
|
||||
mock_waitForReadyRead):
|
||||
"""
|
||||
Test bugfix 1593883 pjlink authentication
|
||||
"""
|
||||
# GIVEN: Test object and data
|
||||
pjlink = pjlink_test
|
||||
pjlink.pin = TEST_PIN
|
||||
mock_state.return_value = pjlink.ConnectedState
|
||||
mock_waitForReadyRead.return_value = True
|
||||
|
||||
# WHEN: Athenticated connection is called
|
||||
pjlink.check_login(data=TEST_CONNECT_AUTHENTICATE)
|
||||
|
||||
# THEN: send_command should have the proper authentication
|
||||
self.assertEquals("{test}".format(test=mock_send_command.call_args),
|
||||
"call(data='{hash}%1CLSS ?\\r')".format(hash=TEST_HASH))
|
||||
|
Loading…
Reference in New Issue
Block a user