PJLink2 Update P

This commit is contained in:
Ken Roberts 2018-02-11 03:42:13 -08:00
parent d6087813ae
commit f1996d2cb7
20 changed files with 572 additions and 40 deletions

View File

@ -24,6 +24,7 @@ The :mod:`~openlp.core.api.tab` module contains the settings tab for the API
"""
from PyQt5 import QtCore, QtGui, QtNetwork, QtWidgets
from openlp.core.common import MY_IP4
from openlp.core.common.i18n import UiStrings, translate
from openlp.core.common.registry import Registry
from openlp.core.common.settings import Settings
@ -219,17 +220,11 @@ class ApiTab(SettingsTab):
else: return ip_address
"""
if ip_address == ZERO_URL:
interfaces = QtNetwork.QNetworkInterface.allInterfaces()
for interface in interfaces:
if not interface.isValid():
continue
if not (interface.flags() & (QtNetwork.QNetworkInterface.IsUp | QtNetwork.QNetworkInterface.IsRunning)):
continue
for address in interface.addressEntries():
ip = address.ip()
if ip.protocol() == QtNetwork.QAbstractSocket.IPv4Protocol and \
ip != QtNetwork.QHostAddress.LocalHost:
return ip.toString()
# In case we have more than one interface
for key in iter(MY_IP4):
ip_address = MY_IP4.get(key)['ip']
# We only want the first interface returned
break
return ip_address
def load(self):

View File

@ -36,6 +36,7 @@ from subprocess import check_output, CalledProcessError, STDOUT
from PyQt5 import QtGui
from PyQt5.QtCore import QCryptographicHash as QHash
from PyQt5.QtNetwork import QAbstractSocket, QHostAddress, QNetworkInterface
from chardet.universaldetector import UniversalDetector
log = logging.getLogger(__name__ + '.__init__')
@ -51,6 +52,27 @@ REPLACMENT_CHARS_MAP = str.maketrans({'\u2018': '\'', '\u2019': '\'', '\u201c':
NEW_LINE_REGEX = re.compile(r' ?(\r\n?|\n) ?')
WHITESPACE_REGEX = re.compile(r'[ \t]+')
# Get the local IPv4 active address(es) that are NOT localhost (lo or '127.0.0.1')
log.debug('Getting local IPv4 interface(es) information')
MY_IP4 = {}
for iface in QNetworkInterface.allInterfaces():
if not iface.isValid() or not (iface.flags() & (QNetworkInterface.IsUp | QNetworkInterface.IsRunning)):
continue
for address in iface.addressEntries():
ip = address.ip()
if (ip.protocol() == QAbstractSocket.IPv4Protocol) and (ip != QHostAddress.LocalHost):
MY_IP4[iface.name()] = {'ip': ip.toString(),
'broadcast': address.broadcast().toString(),
'netmask': address.netmask().toString(),
'prefix': address.prefixLength(),
'localnet': QHostAddress(address.netmask().toIPv4Address() &
ip.toIPv4Address()).toString()
}
log.debug('Adding {iface} to active list'.format(iface=iface.name()))
if not MY_IP4:
log.warning('No active IPv4 interfaces found')
def trace_error_handler(logger):
"""

View File

@ -308,7 +308,6 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
self.settings_section = 'projector'
self.projectordb = projectordb
self.projector_list = []
self.pjlink_udp = PJLinkUDP(self.projector_list)
self.source_select_form = None
def bootstrap_initialise(self):
@ -323,6 +322,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
else:
log.debug('Using existing ProjectorDB() instance')
self.get_settings()
self.pjlink_udp = PJLinkUDP(self.projector_list)
def bootstrap_post_set_up(self):
"""
@ -344,6 +344,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
"""
Retrieve the saved settings
"""
log.debug('Updating ProjectorManager settings')
settings = Settings()
settings.beginGroup(self.settings_section)
self.autostart = settings.value('connect on start')
@ -501,10 +502,6 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
ans = msg.exec()
if ans == msg.Cancel:
return
try:
projector.link.projectorNetwork.disconnect(self.update_status)
except (AttributeError, TypeError):
pass
try:
projector.link.changeStatus.disconnect(self.update_status)
except (AttributeError, TypeError):

View File

@ -64,7 +64,7 @@ from openlp.core.projectors.constants import CONNECTION_ERRORS, PJLINK_CLASS, PJ
log = logging.getLogger(__name__)
log.debug('pjlink loaded')
__all__ = ['PJLink']
__all__ = ['PJLink', 'PJLinkUDP']
# Shortcuts
SocketError = QtNetwork.QAbstractSocket.SocketError
@ -79,22 +79,145 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
"""
Socket service for PJLink UDP socket.
"""
# New commands available in PJLink Class 2
pjlink_udp_commands = [
'ACKN', # Class 2 (cmd is SRCH)
'ERST', # Class 1/2
'INPT', # Class 1/2
'LKUP', # Class 2 (reply only - no cmd)
'POWR', # Class 1/2
'SRCH' # Class 2 (reply is ACKN)
]
def __init__(self, projector_list, port=PJLINK_PORT):
"""
Initialize socket
Socket services for PJLink UDP packets.
Since all UDP packets from any projector will come into the same
port, process UDP packets here then route to the appropriate
projector instance as needed.
"""
# Keep track of currently defined projectors so we can route
# inbound packets to the correct instance
super().__init__()
self.projector_list = projector_list
self.port = port
# Local defines
self.ackn_list = {} # Replies from online projetors
self.search_active = False
self.search_time = 30000 # 30 seconds for allowed time
self.search_timer = QtCore.QTimer()
# New commands available in PJLink Class 2
# ACKN/SRCH is processed here since it's used to find available projectors
# Other commands are processed by the individual projector instances
self.pjlink_udp_functions = {
'ACKN': self.process_ackn, # Class 2, command is 'SRCH'
'ERST': None, # Class 1/2
'INPT': None, # Class 1/2
'LKUP': None, # Class 2 (reply only - no cmd)
'POWR': None, # Class 1/2
'SRCH': self.process_srch # Class 2 (reply is ACKN)
}
self.readyRead.connect(self.get_datagram)
log.debug('(UDP) PJLinkUDP() Initialized')
@QtCore.pyqtSlot()
def get_datagram(self):
"""
Retrieve packet and basic checks
"""
log.debug('(UDP) get_datagram() - Receiving data')
read = self.pendingDatagramSize()
if read < 0:
log.warn('(UDP) No data (-1)')
return
if read < 1:
log.warn('(UDP) get_datagram() called when pending data size is 0')
return
data, peer_address, peer_port = self.readDatagram(self.pendingDatagramSize())
log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data),
adx=peer_address,
port=peer_port))
log.debug('(UDP) packet "{data}"'.format(data=data))
if len(data) < 0:
log.warn('(UDP) No data (-1)')
return
elif len(data) < 8:
# Minimum packet is '%2CCCC='
log.warn('(UDP) Invalid packet - not enough data')
return
elif data is None:
log.warn('(UDP) No data (None)')
return
elif len(data) > PJLINK_MAX_PACKET:
log.warn('(UDP) Invalid packet - length too long')
return
elif not data.startswith(PJLINK_PREFIX):
log.warn('(UDP) Invalid packet - does not start with PJLINK_PREFIX')
return
elif data[1] != '2':
log.warn('(UDP) Invalid packet - missing/invalid PJLink class version')
return
elif data[6] != '=':
log.warn('(UDP) Invalid packet - separator missing')
return
# First two characters are header information we don't need at this time
cmd, data = data[2:].split('=')
if cmd not in self.pjlink_udp_functions:
log.warn('(UDP) Invalid packet - not a valid PJLink UDP reply')
return
if self.pjlink_udp_functions[cmd] is not None:
log.debug('(UDP) Processing {cmd} with "{data}"'.format(cmd=cmd, data=data))
return self.pjlink_udp_functions[cmd](data=data, host=peer_address, port=peer_port)
else:
log.debug('(UDP) Checking projector list for ip {host} to process'.format(host=peer_address))
for projector in self.projector_list:
if peer_address == projector.ip:
if cmd not in projector.pjlink_functions:
log.error('(UDP) Could not find method to process '
'"{cmd}" in {host}'.format(cmd=cmd, host=projector.ip))
return
log.debug('(UDP) Calling "{cmd}" in {host}'.format(cmd=cmd, host=projector.ip))
return projector.pjlink_functions[cmd](data=data)
log.warn('(UDP) Could not find projector with ip {ip} to process packet'.format(ip=peer_address))
return
def process_ackn(self, data, host, port):
"""
Process the ACKN command.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.debug('(UDP) Processing ACKN packet')
if host not in self.ackn_list:
log.debug('(UDP) Adding {host} to ACKN list'.format(host=host))
self.ackn_list[host] = {'data': data,
'port': port}
else:
log.warn('(UDP) Host {host} already replied - ignoring'.format(host=host))
def process_srch(self, data, host, port):
"""
Process the SRCH command.
SRCH is processed by terminals so we ignore any packet.
:param data: Data in packet
:param host: IP address of sending host
:param port: Port received on
"""
log.debug('(UDP) SRCH packet received - ignoring')
return
def search_start(self):
"""
Start search for projectors on local network
"""
self.search_active = True
self.ackn_list = {}
# TODO: Send SRCH packet here
self.search_timer.singleShot(self.search_time, self.search_stop)
@QtCore.pyqtSlot()
def search_stop(self):
"""
Stop search
"""
self.search_active = False
self.search_timer.stop()
class PJLinkCommands(object):
@ -257,8 +380,9 @@ class PJLinkCommands(object):
else:
clss = data
self.pjlink_class = clss
log.debug('({ip}) Setting pjlink_class for this projector to "{data}"'.format(ip=self.entry.name,
data=self.pjlink_class))
log.debug('({ip}) Setting pjlink_class for this projector '
'to "{data}"'.format(ip=self.entry.name,
data=self.pjlink_class))
# Since we call this one on first connect, setup polling from here
if not self.no_poll:
log.debug('({ip}) process_pjlink(): Starting timer'.format(ip=self.entry.name))
@ -276,9 +400,10 @@ class PJLinkCommands(object):
"""
if len(data) != PJLINK_ERST_DATA['DATA_LENGTH']:
count = PJLINK_ERST_DATA['DATA_LENGTH']
log.warning('({ip}) Invalid error status response "{data}": length != {count}'.format(ip=self.entry.name,
data=data,
count=count))
log.warning('({ip}) Invalid error status response "{data}": '
'length != {count}'.format(ip=self.entry.name,
data=data,
count=count))
return
try:
datacheck = int(data)
@ -557,7 +682,7 @@ class PJLinkCommands(object):
class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
"""
Socket service for PJLink TCP socket.
Socket services for PJLink TCP packets.
"""
# Signals sent by this module
changeStatus = QtCore.pyqtSignal(str, int, str)

View File

@ -29,6 +29,7 @@ from unittest.mock import patch
from PyQt5 import QtWidgets
from openlp.core.api.tab import ApiTab
from openlp.core.common import MY_IP4
from openlp.core.common.registry import Registry
from openlp.core.common.settings import Settings
from tests.helpers.testmixin import TestMixin
@ -72,15 +73,18 @@ class TestApiTab(TestCase, TestMixin):
del self.form
self.destroy_settings()
@patch.dict(MY_IP4, {'test': {'ip': '127.0.0.1'}}, clear=True)
def test_get_ip_address_default(self):
"""
Test the get_ip_address function with ZERO_URL
"""
# WHEN: the default ip address is given
ip_address = self.form.get_ip_address(ZERO_URL)
# THEN: the default ip address will be returned
assert re.match('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', ip_address), \
'The return value should be a valid ip address'
assert ip_address == '127.0.0.1', 'The return address should match the test address'
def test_get_ip_address_with_ip(self):
"""
@ -88,8 +92,10 @@ class TestApiTab(TestCase, TestMixin):
"""
# GIVEN: An ip address
given_ip = '192.168.1.1'
# WHEN: the default ip address is given
ip_address = self.form.get_ip_address(given_ip)
# THEN: the default ip address will be returned
assert ip_address == given_ip, 'The return value should be %s' % given_ip

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2018 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
:mod: `tests.openlp_core` module
Tests modules/files for module openlp.core
"""

View File

@ -20,5 +20,5 @@
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
Module-level functions for the functional test suite
Module-level functions for the projector test suite
"""

View File

@ -0,0 +1,360 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2018 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
Package to test the PJLink UDP functions
"""
from unittest import TestCase
from unittest.mock import call, patch
import openlp.core.projectors.pjlink
from openlp.core.projectors.constants import PJLINK_MAX_PACKET, PJLINK_PORT, PJLINK_PREFIX
from openlp.core.projectors.db import Projector
from openlp.core.projectors.pjlink import PJLinkUDP
from tests.resources.projector.data import TEST1_DATA, TEST2_DATA
class TestPJLinkBase(TestCase):
"""
Tests for the PJLinkUDP class
"""
def setUp(self):
"""
Setup generic test conditions
"""
self.test_list = [Projector(**TEST1_DATA), Projector(**TEST2_DATA)]
def tearDown(self):
"""
Close generic test condidtions
"""
self.test_list = None
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_class(self, mock_log):
"""
Test get_datagram with invalid class number
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - missing/invalid PJLink class version')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%1ACKN=11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}1ACKN={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_command(self, mock_log):
"""
Test get_datagram with invalid PJLink UDP command
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - not a valid PJLink UDP reply')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%2DUMB=11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}2DUMB={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_prefix(self, mock_log):
"""
Test get_datagram when prefix != PJLINK_PREFIX
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - does not start with PJLINK_PREFIX')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "$2ACKN=11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}2ACKN={mac}'.format(prefix='$', mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_invalid_separator(self, mock_log):
"""
Test get_datagram when separator not equal to =
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - separator missing')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%2ACKN 11:11:11:11:11:11"')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 24
mock_read.return_value = ('{prefix}2ACKN {mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_long(self, mock_log):
"""
Test get_datagram when datagram > PJLINK_MAX_PACKET
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - length too long')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data'),
call('(UDP) 143 bytes received from 111.111.111.111 on port 4352'),
call('(UDP) packet "%2ACKN={long}"'.format(long='X' * PJLINK_MAX_PACKET))]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = PJLINK_MAX_PACKET + 7
mock_read.return_value = ('{prefix}2ACKN={long}'.format(prefix=PJLINK_PREFIX,
long='X' * PJLINK_MAX_PACKET),
TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_negative_zero_length(self, mock_log):
"""
Test get_datagram when pendingDatagramSize = 0
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) No data (-1)')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = -1
mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_no_data(self, mock_log):
"""
Test get_datagram when data length = 0
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - not enough data')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 1
mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_short(self, mock_log):
"""
Test get_datagram when data length < 8
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) Invalid packet - not enough data')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 6
mock_read.return_value = ('{prefix}2ACKN'.format(prefix=PJLINK_PREFIX), TEST1_DATA['ip'], PJLINK_PORT)
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_pending_zero_length(self, mock_log):
"""
Test get_datagram when pendingDatagramSize = 0
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_warn_calls = [call('(UDP) get_datagram() called when pending data size is 0')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram:
mock_datagram.return_value = 0
# WHEN: get_datagram called with 0 bytes ready
pjlink_udp.get_datagram()
# THEN: Log entries should be made and method returns
mock_log.warn.assert_has_calls(log_warn_calls)
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_duplicate(self, mock_log):
"""
Test process_ackn method with multiple calls with same data
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_warn_calls = [call('(UDP) Host {host} already replied - ignoring'.format(host=TEST1_DATA['ip']))]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(UDP) Processing ACKN packet')]
# WHEN: process_ackn called twice with same data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warn.assert_has_calls(log_warn_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_multiple(self, mock_log):
"""
Test process_ackn method with multiple calls
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT},
TEST2_DATA['ip']: {'data': TEST2_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST2_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
pjlink_udp.process_ackn(data=TEST2_DATA['mac_adx'], host=TEST2_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_ackn_single(self, mock_log):
"""
Test process_ackn method with single call
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) Processing ACKN packet'),
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip']))]
# WHEN: process_ackn called twice with different data
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
# THEN: pjlink_udp.ack_list should equal test_list
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
if pjlink_udp.ackn_list != check_list:
# Check this way so we can print differences to stdout
print('\nackn_list: ', pjlink_udp.ackn_list)
print('test_list: ', check_list)
assert pjlink_udp.ackn_list == check_list
mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.pjlink, 'log')
def test_process_srch(self, mock_log):
"""
Test process_srch method
"""
# GIVEN: Test setup
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
call('(UDP) SRCH packet received - ignoring')]
# WHEN: process_srch called
pjlink_udp.process_srch(data=None, host=None, port=None)
# THEN: debug log entry should be entered
mock_log.debug.assert_has_calls(log_debug_calls)

View File

@ -125,7 +125,6 @@ class ProjectorSourceFormTest(TestCase, TestMixin):
select_form = SourceSelectSingle(parent=None, projectordb=self.projectordb)
select_form.edit = True
select_form.exec(projector=self.projector)
projector = select_form.projector
# THEN: Verify all 4 buttons are available
assert len(select_form.button_box.buttons()) == 4, \
@ -144,7 +143,6 @@ class ProjectorSourceFormTest(TestCase, TestMixin):
select_form = SourceSelectSingle(parent=None, projectordb=self.projectordb)
select_form.edit = False
select_form.exec(projector=self.projector)
projector = select_form.projector
# THEN: Verify only 2 buttons are available
assert len(select_form.button_box.buttons()) == 2, \

View File

@ -45,7 +45,8 @@ TEST1_DATA = dict(ip='111.111.111.111',
serial_no='Serial Number 1',
sw_version='Version 1',
model_filter='Filter type 1',
model_lamp='Lamp type 1')
model_lamp='Lamp type 1',
mac_adx='11:11:11:11:11:11')
TEST2_DATA = dict(ip='222.222.222.222',
port='2222',
@ -56,7 +57,8 @@ TEST2_DATA = dict(ip='222.222.222.222',
serial_no='Serial Number 2',
sw_version='Version 2',
model_filter='Filter type 2',
model_lamp='Lamp type 2')
model_lamp='Lamp type 2',
mac_adx='22:22:22:22:22:22')
TEST3_DATA = dict(ip='333.333.333.333',
port='3333',
@ -67,7 +69,8 @@ TEST3_DATA = dict(ip='333.333.333.333',
serial_no='Serial Number 3',
sw_version='Version 3',
model_filter='Filter type 3',
model_lamp='Lamp type 3')
model_lamp='Lamp type 3',
mac_adx='33:33:33:33:33:33')
TEST_VIDEO_CODES = {
'11': 'RGB 1',