diff --git a/openlp/core/api/tab.py b/openlp/core/api/tab.py index eaa021800..840fb0d0b 100644 --- a/openlp/core/api/tab.py +++ b/openlp/core/api/tab.py @@ -24,6 +24,7 @@ The :mod:`~openlp.core.api.tab` module contains the settings tab for the API """ from PyQt5 import QtCore, QtGui, QtNetwork, QtWidgets +from openlp.core.common import get_local_ip4 from openlp.core.common.i18n import UiStrings, translate from openlp.core.common.registry import Registry from openlp.core.common.settings import Settings @@ -219,17 +220,12 @@ class ApiTab(SettingsTab): else: return ip_address """ if ip_address == ZERO_URL: - interfaces = QtNetwork.QNetworkInterface.allInterfaces() - for interface in interfaces: - if not interface.isValid(): - continue - if not (interface.flags() & (QtNetwork.QNetworkInterface.IsUp | QtNetwork.QNetworkInterface.IsRunning)): - continue - for address in interface.addressEntries(): - ip = address.ip() - if ip.protocol() == QtNetwork.QAbstractSocket.IPv4Protocol and \ - ip != QtNetwork.QHostAddress.LocalHost: - return ip.toString() + # In case we have more than one interface + ifaces = get_local_ip4() + for key in iter(ifaces): + ip_address = ifaces.get(key)['ip'] + # We only want the first interface returned + break return ip_address def load(self): diff --git a/openlp/core/common/__init__.py b/openlp/core/common/__init__.py index e4c806f22..d1f62fb8b 100644 --- a/openlp/core/common/__init__.py +++ b/openlp/core/common/__init__.py @@ -36,6 +36,7 @@ from subprocess import check_output, CalledProcessError, STDOUT from PyQt5 import QtGui from PyQt5.QtCore import QCryptographicHash as QHash +from PyQt5.QtNetwork import QAbstractSocket, QHostAddress, QNetworkInterface from chardet.universaldetector import UniversalDetector log = logging.getLogger(__name__ + '.__init__') @@ -52,6 +53,44 @@ NEW_LINE_REGEX = re.compile(r' ?(\r\n?|\n) ?') WHITESPACE_REGEX = re.compile(r'[ \t]+') +def get_local_ip4(): + """ + Creates a dictionary of local IPv4 interfaces on local machine. + If no active interfaces available, returns a dict of localhost IPv4 information + + :returns: Dict of interfaces + """ + # Get the local IPv4 active address(es) that are NOT localhost (lo or '127.0.0.1') + log.debug('Getting local IPv4 interface(es) information') + MY_IP4 = {} + for iface in QNetworkInterface.allInterfaces(): + if not iface.isValid() or not (iface.flags() & (QNetworkInterface.IsUp | QNetworkInterface.IsRunning)): + continue + for address in iface.addressEntries(): + ip = address.ip() + # NOTE: Next line will skip if interface is localhost - keep for now until we decide about it later + # if (ip.protocol() == QAbstractSocket.IPv4Protocol) and (ip != QHostAddress.LocalHost): + if (ip.protocol() == QAbstractSocket.IPv4Protocol): + MY_IP4[iface.name()] = {'ip': ip.toString(), + 'broadcast': address.broadcast().toString(), + 'netmask': address.netmask().toString(), + 'prefix': address.prefixLength(), + 'localnet': QHostAddress(address.netmask().toIPv4Address() & + ip.toIPv4Address()).toString() + } + log.debug('Adding {iface} to active list'.format(iface=iface.name())) + if len(MY_IP4) == 1: + if 'lo' in MY_IP4: + # No active interfaces - so leave localhost in there + log.warning('No active IPv4 interfaces found except localhost') + else: + # Since we have a valid IP4 interface, remove localhost + log.debug('Found at least one IPv4 interface, removing localhost') + MY_IP4.pop('lo') + + return MY_IP4 + + def trace_error_handler(logger): """ Log the calling path of an exception diff --git a/openlp/core/projectors/manager.py b/openlp/core/projectors/manager.py index 3bc7373fa..d676a0d85 100644 --- a/openlp/core/projectors/manager.py +++ b/openlp/core/projectors/manager.py @@ -308,7 +308,6 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM self.settings_section = 'projector' self.projectordb = projectordb self.projector_list = [] - self.pjlink_udp = PJLinkUDP(self.projector_list) self.source_select_form = None def bootstrap_initialise(self): @@ -323,6 +322,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM else: log.debug('Using existing ProjectorDB() instance') self.get_settings() + self.pjlink_udp = PJLinkUDP(self.projector_list) def bootstrap_post_set_up(self): """ @@ -344,6 +344,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM """ Retrieve the saved settings """ + log.debug('Updating ProjectorManager settings') settings = Settings() settings.beginGroup(self.settings_section) self.autostart = settings.value('connect on start') @@ -501,10 +502,6 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM ans = msg.exec() if ans == msg.Cancel: return - try: - projector.link.projectorNetwork.disconnect(self.update_status) - except (AttributeError, TypeError): - pass try: projector.link.changeStatus.disconnect(self.update_status) except (AttributeError, TypeError): diff --git a/openlp/core/projectors/pjlink.py b/openlp/core/projectors/pjlink.py index 3517fb1e4..706d4b5aa 100644 --- a/openlp/core/projectors/pjlink.py +++ b/openlp/core/projectors/pjlink.py @@ -64,7 +64,7 @@ from openlp.core.projectors.constants import CONNECTION_ERRORS, PJLINK_CLASS, PJ log = logging.getLogger(__name__) log.debug('pjlink loaded') -__all__ = ['PJLink'] +__all__ = ['PJLink', 'PJLinkUDP'] # Shortcuts SocketError = QtNetwork.QAbstractSocket.SocketError @@ -79,22 +79,145 @@ class PJLinkUDP(QtNetwork.QUdpSocket): """ Socket service for PJLink UDP socket. """ - # New commands available in PJLink Class 2 - pjlink_udp_commands = [ - 'ACKN', # Class 2 (cmd is SRCH) - 'ERST', # Class 1/2 - 'INPT', # Class 1/2 - 'LKUP', # Class 2 (reply only - no cmd) - 'POWR', # Class 1/2 - 'SRCH' # Class 2 (reply is ACKN) - ] - def __init__(self, projector_list, port=PJLINK_PORT): """ - Initialize socket + Socket services for PJLink UDP packets. + + Since all UDP packets from any projector will come into the same + port, process UDP packets here then route to the appropriate + projector instance as needed. """ + # Keep track of currently defined projectors so we can route + # inbound packets to the correct instance + super().__init__() self.projector_list = projector_list self.port = port + # Local defines + self.ackn_list = {} # Replies from online projetors + self.search_active = False + self.search_time = 30000 # 30 seconds for allowed time + self.search_timer = QtCore.QTimer() + # New commands available in PJLink Class 2 + # ACKN/SRCH is processed here since it's used to find available projectors + # Other commands are processed by the individual projector instances + self.pjlink_udp_functions = { + 'ACKN': self.process_ackn, # Class 2, command is 'SRCH' + 'ERST': None, # Class 1/2 + 'INPT': None, # Class 1/2 + 'LKUP': None, # Class 2 (reply only - no cmd) + 'POWR': None, # Class 1/2 + 'SRCH': self.process_srch # Class 2 (reply is ACKN) + } + + self.readyRead.connect(self.get_datagram) + log.debug('(UDP) PJLinkUDP() Initialized') + + @QtCore.pyqtSlot() + def get_datagram(self): + """ + Retrieve packet and basic checks + """ + log.debug('(UDP) get_datagram() - Receiving data') + read = self.pendingDatagramSize() + if read < 0: + log.warn('(UDP) No data (-1)') + return + if read < 1: + log.warn('(UDP) get_datagram() called when pending data size is 0') + return + data, peer_address, peer_port = self.readDatagram(self.pendingDatagramSize()) + log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data), + adx=peer_address, + port=peer_port)) + log.debug('(UDP) packet "{data}"'.format(data=data)) + if len(data) < 0: + log.warn('(UDP) No data (-1)') + return + elif len(data) < 8: + # Minimum packet is '%2CCCC=' + log.warn('(UDP) Invalid packet - not enough data') + return + elif data is None: + log.warn('(UDP) No data (None)') + return + elif len(data) > PJLINK_MAX_PACKET: + log.warn('(UDP) Invalid packet - length too long') + return + elif not data.startswith(PJLINK_PREFIX): + log.warn('(UDP) Invalid packet - does not start with PJLINK_PREFIX') + return + elif data[1] != '2': + log.warn('(UDP) Invalid packet - missing/invalid PJLink class version') + return + elif data[6] != '=': + log.warn('(UDP) Invalid packet - separator missing') + return + # First two characters are header information we don't need at this time + cmd, data = data[2:].split('=') + if cmd not in self.pjlink_udp_functions: + log.warn('(UDP) Invalid packet - not a valid PJLink UDP reply') + return + if self.pjlink_udp_functions[cmd] is not None: + log.debug('(UDP) Processing {cmd} with "{data}"'.format(cmd=cmd, data=data)) + return self.pjlink_udp_functions[cmd](data=data, host=peer_address, port=peer_port) + else: + log.debug('(UDP) Checking projector list for ip {host} to process'.format(host=peer_address)) + for projector in self.projector_list: + if peer_address == projector.ip: + if cmd not in projector.pjlink_functions: + log.error('(UDP) Could not find method to process ' + '"{cmd}" in {host}'.format(cmd=cmd, host=projector.ip)) + return + log.debug('(UDP) Calling "{cmd}" in {host}'.format(cmd=cmd, host=projector.ip)) + return projector.pjlink_functions[cmd](data=data) + log.warn('(UDP) Could not find projector with ip {ip} to process packet'.format(ip=peer_address)) + return + + def process_ackn(self, data, host, port): + """ + Process the ACKN command. + + :param data: Data in packet + :param host: IP address of sending host + :param port: Port received on + """ + log.debug('(UDP) Processing ACKN packet') + if host not in self.ackn_list: + log.debug('(UDP) Adding {host} to ACKN list'.format(host=host)) + self.ackn_list[host] = {'data': data, + 'port': port} + else: + log.warn('(UDP) Host {host} already replied - ignoring'.format(host=host)) + + def process_srch(self, data, host, port): + """ + Process the SRCH command. + + SRCH is processed by terminals so we ignore any packet. + + :param data: Data in packet + :param host: IP address of sending host + :param port: Port received on + """ + log.debug('(UDP) SRCH packet received - ignoring') + return + + def search_start(self): + """ + Start search for projectors on local network + """ + self.search_active = True + self.ackn_list = {} + # TODO: Send SRCH packet here + self.search_timer.singleShot(self.search_time, self.search_stop) + + @QtCore.pyqtSlot() + def search_stop(self): + """ + Stop search + """ + self.search_active = False + self.search_timer.stop() class PJLinkCommands(object): @@ -257,8 +380,9 @@ class PJLinkCommands(object): else: clss = data self.pjlink_class = clss - log.debug('({ip}) Setting pjlink_class for this projector to "{data}"'.format(ip=self.entry.name, - data=self.pjlink_class)) + log.debug('({ip}) Setting pjlink_class for this projector ' + 'to "{data}"'.format(ip=self.entry.name, + data=self.pjlink_class)) # Since we call this one on first connect, setup polling from here if not self.no_poll: log.debug('({ip}) process_pjlink(): Starting timer'.format(ip=self.entry.name)) @@ -276,9 +400,10 @@ class PJLinkCommands(object): """ if len(data) != PJLINK_ERST_DATA['DATA_LENGTH']: count = PJLINK_ERST_DATA['DATA_LENGTH'] - log.warning('({ip}) Invalid error status response "{data}": length != {count}'.format(ip=self.entry.name, - data=data, - count=count)) + log.warning('({ip}) Invalid error status response "{data}": ' + 'length != {count}'.format(ip=self.entry.name, + data=data, + count=count)) return try: datacheck = int(data) @@ -557,7 +682,7 @@ class PJLinkCommands(object): class PJLink(QtNetwork.QTcpSocket, PJLinkCommands): """ - Socket service for PJLink TCP socket. + Socket services for PJLink TCP packets. """ # Signals sent by this module changeStatus = QtCore.pyqtSignal(str, int, str) diff --git a/tests/functional/openlp_core/api/test_tab.py b/tests/functional/openlp_core/api/test_tab.py index 0de0b6d94..cf8b4d0b0 100644 --- a/tests/functional/openlp_core/api/test_tab.py +++ b/tests/functional/openlp_core/api/test_tab.py @@ -29,6 +29,7 @@ from unittest.mock import patch from PyQt5 import QtWidgets from openlp.core.api.tab import ApiTab +from openlp.core.common import get_local_ip4 from openlp.core.common.registry import Registry from openlp.core.common.settings import Settings from tests.helpers.testmixin import TestMixin @@ -63,6 +64,7 @@ class TestApiTab(TestCase, TestMixin): Registry().create() Registry().set_flag('website_version', '00-00-0000') self.form = ApiTab(self.parent) + self.my_ip4_list = get_local_ip4() def tearDown(self): """ @@ -76,11 +78,18 @@ class TestApiTab(TestCase, TestMixin): """ Test the get_ip_address function with ZERO_URL """ + # GIVEN: list of local IP addresses for this machine + ip4_list = [] + for ip4 in iter(self.my_ip4_list): + ip4_list.append(self.my_ip4_list.get(ip4)['ip']) + # WHEN: the default ip address is given ip_address = self.form.get_ip_address(ZERO_URL) + # THEN: the default ip address will be returned assert re.match('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', ip_address), \ 'The return value should be a valid ip address' + assert ip_address in ip4_list, 'The return address should be in the list of local IP addresses' def test_get_ip_address_with_ip(self): """ @@ -88,8 +97,10 @@ class TestApiTab(TestCase, TestMixin): """ # GIVEN: An ip address given_ip = '192.168.1.1' + # WHEN: the default ip address is given ip_address = self.form.get_ip_address(given_ip) + # THEN: the default ip address will be returned assert ip_address == given_ip, 'The return value should be %s' % given_ip diff --git a/tests/openlp_core/__init__.py b/tests/openlp_core/__init__.py new file mode 100644 index 000000000..cf242cd7f --- /dev/null +++ b/tests/openlp_core/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4 + +############################################################################### +# OpenLP - Open Source Lyrics Projection # +# --------------------------------------------------------------------------- # +# Copyright (c) 2008-2018 OpenLP Developers # +# --------------------------------------------------------------------------- # +# This program is free software; you can redistribute it and/or modify it # +# under the terms of the GNU General Public License as published by the Free # +# Software Foundation; version 2 of the License. # +# # +# This program is distributed in the hope that it will be useful, but WITHOUT # +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # +# more details. # +# # +# You should have received a copy of the GNU General Public License along # +# with this program; if not, write to the Free Software Foundation, Inc., 59 # +# Temple Place, Suite 330, Boston, MA 02111-1307 USA # +############################################################################### +""" +:mod: `tests.openlp_core` module + +Tests modules/files for module openlp.core +""" diff --git a/tests/functional/openlp_core/projectors/__init__.py b/tests/openlp_core/projectors/__init__.py similarity index 96% rename from tests/functional/openlp_core/projectors/__init__.py rename to tests/openlp_core/projectors/__init__.py index 7efaa18af..0a1a5ca7e 100644 --- a/tests/functional/openlp_core/projectors/__init__.py +++ b/tests/openlp_core/projectors/__init__.py @@ -20,5 +20,5 @@ # Temple Place, Suite 330, Boston, MA 02111-1307 USA # ############################################################################### """ -Module-level functions for the functional test suite +Module-level functions for the projector test suite """ diff --git a/tests/functional/openlp_core/projectors/test_projector_bugfixes_01.py b/tests/openlp_core/projectors/test_projector_bugfixes_01.py similarity index 100% rename from tests/functional/openlp_core/projectors/test_projector_bugfixes_01.py rename to tests/openlp_core/projectors/test_projector_bugfixes_01.py diff --git a/tests/functional/openlp_core/projectors/test_projector_constants.py b/tests/openlp_core/projectors/test_projector_constants.py similarity index 100% rename from tests/functional/openlp_core/projectors/test_projector_constants.py rename to tests/openlp_core/projectors/test_projector_constants.py diff --git a/tests/functional/openlp_core/projectors/test_projector_db.py b/tests/openlp_core/projectors/test_projector_db.py similarity index 100% rename from tests/functional/openlp_core/projectors/test_projector_db.py rename to tests/openlp_core/projectors/test_projector_db.py diff --git a/tests/functional/openlp_core/projectors/test_projector_pjlink_base.py b/tests/openlp_core/projectors/test_projector_pjlink_base.py similarity index 100% rename from tests/functional/openlp_core/projectors/test_projector_pjlink_base.py rename to tests/openlp_core/projectors/test_projector_pjlink_base.py diff --git a/tests/functional/openlp_core/projectors/test_projector_pjlink_cmd_routing.py b/tests/openlp_core/projectors/test_projector_pjlink_cmd_routing.py similarity index 100% rename from tests/functional/openlp_core/projectors/test_projector_pjlink_cmd_routing.py rename to tests/openlp_core/projectors/test_projector_pjlink_cmd_routing.py diff --git a/tests/functional/openlp_core/projectors/test_projector_pjlink_commands_01.py b/tests/openlp_core/projectors/test_projector_pjlink_commands_01.py similarity index 100% rename from tests/functional/openlp_core/projectors/test_projector_pjlink_commands_01.py rename to tests/openlp_core/projectors/test_projector_pjlink_commands_01.py diff --git a/tests/functional/openlp_core/projectors/test_projector_pjlink_commands_02.py b/tests/openlp_core/projectors/test_projector_pjlink_commands_02.py similarity index 100% rename from tests/functional/openlp_core/projectors/test_projector_pjlink_commands_02.py rename to tests/openlp_core/projectors/test_projector_pjlink_commands_02.py diff --git a/tests/openlp_core/projectors/test_projector_pjlink_udp.py b/tests/openlp_core/projectors/test_projector_pjlink_udp.py new file mode 100644 index 000000000..ad4f7709e --- /dev/null +++ b/tests/openlp_core/projectors/test_projector_pjlink_udp.py @@ -0,0 +1,360 @@ + +# -*- coding: utf-8 -*- +# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4 + +############################################################################### +# OpenLP - Open Source Lyrics Projection # +# --------------------------------------------------------------------------- # +# Copyright (c) 2008-2018 OpenLP Developers # +# --------------------------------------------------------------------------- # +# This program is free software; you can redistribute it and/or modify it # +# under the terms of the GNU General Public License as published by the Free # +# Software Foundation; version 2 of the License. # +# # +# This program is distributed in the hope that it will be useful, but WITHOUT # +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # +# more details. # +# # +# You should have received a copy of the GNU General Public License along # +# with this program; if not, write to the Free Software Foundation, Inc., 59 # +# Temple Place, Suite 330, Boston, MA 02111-1307 USA # +############################################################################### +""" +Package to test the PJLink UDP functions +""" + +from unittest import TestCase +from unittest.mock import call, patch + +import openlp.core.projectors.pjlink +from openlp.core.projectors.constants import PJLINK_MAX_PACKET, PJLINK_PORT, PJLINK_PREFIX + +from openlp.core.projectors.db import Projector +from openlp.core.projectors.pjlink import PJLinkUDP +from tests.resources.projector.data import TEST1_DATA, TEST2_DATA + + +class TestPJLinkBase(TestCase): + """ + Tests for the PJLinkUDP class + """ + def setUp(self): + """ + Setup generic test conditions + """ + self.test_list = [Projector(**TEST1_DATA), Projector(**TEST2_DATA)] + + def tearDown(self): + """ + Close generic test condidtions + """ + self.test_list = None + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_get_datagram_data_invalid_class(self, mock_log): + """ + Test get_datagram with invalid class number + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_warn_calls = [call('(UDP) Invalid packet - missing/invalid PJLink class version')] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) get_datagram() - Receiving data'), + call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'), + call('(UDP) packet "%1ACKN=11:11:11:11:11:11"')] + with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ + patch.object(pjlink_udp, 'readDatagram') as mock_read: + mock_datagram.return_value = 24 + mock_read.return_value = ('{prefix}1ACKN={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']), + TEST1_DATA['ip'], PJLINK_PORT) + + # WHEN: get_datagram called with 0 bytes ready + pjlink_udp.get_datagram() + + # THEN: Log entries should be made and method returns + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warn.assert_has_calls(log_warn_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_get_datagram_data_invalid_command(self, mock_log): + """ + Test get_datagram with invalid PJLink UDP command + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_warn_calls = [call('(UDP) Invalid packet - not a valid PJLink UDP reply')] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) get_datagram() - Receiving data'), + call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'), + call('(UDP) packet "%2DUMB=11:11:11:11:11:11"')] + with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ + patch.object(pjlink_udp, 'readDatagram') as mock_read: + mock_datagram.return_value = 24 + mock_read.return_value = ('{prefix}2DUMB={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']), + TEST1_DATA['ip'], PJLINK_PORT) + + # WHEN: get_datagram called with 0 bytes ready + pjlink_udp.get_datagram() + + # THEN: Log entries should be made and method returns + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warn.assert_has_calls(log_warn_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_get_datagram_data_invalid_prefix(self, mock_log): + """ + Test get_datagram when prefix != PJLINK_PREFIX + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_warn_calls = [call('(UDP) Invalid packet - does not start with PJLINK_PREFIX')] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) get_datagram() - Receiving data'), + call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'), + call('(UDP) packet "$2ACKN=11:11:11:11:11:11"')] + with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ + patch.object(pjlink_udp, 'readDatagram') as mock_read: + mock_datagram.return_value = 24 + mock_read.return_value = ('{prefix}2ACKN={mac}'.format(prefix='$', mac=TEST1_DATA['mac_adx']), + TEST1_DATA['ip'], PJLINK_PORT) + + # WHEN: get_datagram called with 0 bytes ready + pjlink_udp.get_datagram() + + # THEN: Log entries should be made and method returns + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warn.assert_has_calls(log_warn_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_get_datagram_data_invalid_separator(self, mock_log): + """ + Test get_datagram when separator not equal to = + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_warn_calls = [call('(UDP) Invalid packet - separator missing')] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) get_datagram() - Receiving data'), + call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'), + call('(UDP) packet "%2ACKN 11:11:11:11:11:11"')] + with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ + patch.object(pjlink_udp, 'readDatagram') as mock_read: + mock_datagram.return_value = 24 + mock_read.return_value = ('{prefix}2ACKN {mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']), + TEST1_DATA['ip'], PJLINK_PORT) + + # WHEN: get_datagram called with 0 bytes ready + pjlink_udp.get_datagram() + + # THEN: Log entries should be made and method returns + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warn.assert_has_calls(log_warn_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_get_datagram_data_long(self, mock_log): + """ + Test get_datagram when datagram > PJLINK_MAX_PACKET + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_warn_calls = [call('(UDP) Invalid packet - length too long')] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) get_datagram() - Receiving data'), + call('(UDP) 143 bytes received from 111.111.111.111 on port 4352'), + call('(UDP) packet "%2ACKN={long}"'.format(long='X' * PJLINK_MAX_PACKET))] + with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ + patch.object(pjlink_udp, 'readDatagram') as mock_read: + mock_datagram.return_value = PJLINK_MAX_PACKET + 7 + mock_read.return_value = ('{prefix}2ACKN={long}'.format(prefix=PJLINK_PREFIX, + long='X' * PJLINK_MAX_PACKET), + TEST1_DATA['ip'], PJLINK_PORT) + + # WHEN: get_datagram called with 0 bytes ready + pjlink_udp.get_datagram() + + # THEN: Log entries should be made and method returns + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warn.assert_has_calls(log_warn_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_get_datagram_data_negative_zero_length(self, mock_log): + """ + Test get_datagram when pendingDatagramSize = 0 + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_warn_calls = [call('(UDP) No data (-1)')] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) get_datagram() - Receiving data')] + with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ + patch.object(pjlink_udp, 'readDatagram') as mock_read: + mock_datagram.return_value = -1 + mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT) + + # WHEN: get_datagram called with 0 bytes ready + pjlink_udp.get_datagram() + + # THEN: Log entries should be made and method returns + mock_log.warn.assert_has_calls(log_warn_calls) + mock_log.debug.assert_has_calls(log_debug_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_get_datagram_data_no_data(self, mock_log): + """ + Test get_datagram when data length = 0 + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_warn_calls = [call('(UDP) Invalid packet - not enough data')] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) get_datagram() - Receiving data')] + with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ + patch.object(pjlink_udp, 'readDatagram') as mock_read: + mock_datagram.return_value = 1 + mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT) + + # WHEN: get_datagram called with 0 bytes ready + pjlink_udp.get_datagram() + + # THEN: Log entries should be made and method returns + mock_log.warn.assert_has_calls(log_warn_calls) + mock_log.debug.assert_has_calls(log_debug_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_get_datagram_data_short(self, mock_log): + """ + Test get_datagram when data length < 8 + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_warn_calls = [call('(UDP) Invalid packet - not enough data')] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) get_datagram() - Receiving data')] + with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ + patch.object(pjlink_udp, 'readDatagram') as mock_read: + mock_datagram.return_value = 6 + mock_read.return_value = ('{prefix}2ACKN'.format(prefix=PJLINK_PREFIX), TEST1_DATA['ip'], PJLINK_PORT) + + # WHEN: get_datagram called with 0 bytes ready + pjlink_udp.get_datagram() + + # THEN: Log entries should be made and method returns + mock_log.warn.assert_has_calls(log_warn_calls) + mock_log.debug.assert_has_calls(log_debug_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_get_datagram_pending_zero_length(self, mock_log): + """ + Test get_datagram when pendingDatagramSize = 0 + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_warn_calls = [call('(UDP) get_datagram() called when pending data size is 0')] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) get_datagram() - Receiving data')] + with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram: + mock_datagram.return_value = 0 + + # WHEN: get_datagram called with 0 bytes ready + pjlink_udp.get_datagram() + + # THEN: Log entries should be made and method returns + mock_log.warn.assert_has_calls(log_warn_calls) + mock_log.debug.assert_has_calls(log_debug_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_process_ackn_duplicate(self, mock_log): + """ + Test process_ackn method with multiple calls with same data + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}} + log_warn_calls = [call('(UDP) Host {host} already replied - ignoring'.format(host=TEST1_DATA['ip']))] + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) Processing ACKN packet'), + call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])), + call('(UDP) Processing ACKN packet')] + + # WHEN: process_ackn called twice with same data + pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT) + pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT) + + # THEN: pjlink_udp.ack_list should equal test_list + # NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function? + if pjlink_udp.ackn_list != check_list: + # Check this way so we can print differences to stdout + print('\nackn_list: ', pjlink_udp.ackn_list) + print('test_list: ', check_list) + assert pjlink_udp.ackn_list == check_list + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warn.assert_has_calls(log_warn_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_process_ackn_multiple(self, mock_log): + """ + Test process_ackn method with multiple calls + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}, + TEST2_DATA['ip']: {'data': TEST2_DATA['mac_adx'], 'port': PJLINK_PORT}} + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) Processing ACKN packet'), + call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])), + call('(UDP) Processing ACKN packet'), + call('(UDP) Adding {host} to ACKN list'.format(host=TEST2_DATA['ip']))] + + # WHEN: process_ackn called twice with different data + pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT) + pjlink_udp.process_ackn(data=TEST2_DATA['mac_adx'], host=TEST2_DATA['ip'], port=PJLINK_PORT) + + # THEN: pjlink_udp.ack_list should equal test_list + # NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function? + if pjlink_udp.ackn_list != check_list: + # Check this way so we can print differences to stdout + print('\nackn_list: ', pjlink_udp.ackn_list) + print('test_list: ', check_list) + assert pjlink_udp.ackn_list == check_list + mock_log.debug.assert_has_calls(log_debug_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_process_ackn_single(self, mock_log): + """ + Test process_ackn method with single call + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}} + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) Processing ACKN packet'), + call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip']))] + + # WHEN: process_ackn called twice with different data + pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT) + + # THEN: pjlink_udp.ack_list should equal test_list + # NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function? + if pjlink_udp.ackn_list != check_list: + # Check this way so we can print differences to stdout + print('\nackn_list: ', pjlink_udp.ackn_list) + print('test_list: ', check_list) + assert pjlink_udp.ackn_list == check_list + mock_log.debug.assert_has_calls(log_debug_calls) + + @patch.object(openlp.core.projectors.pjlink, 'log') + def test_process_srch(self, mock_log): + """ + Test process_srch method + """ + # GIVEN: Test setup + pjlink_udp = PJLinkUDP(projector_list=self.test_list) + log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'), + call('(UDP) SRCH packet received - ignoring')] + + # WHEN: process_srch called + pjlink_udp.process_srch(data=None, host=None, port=None) + + # THEN: debug log entry should be entered + mock_log.debug.assert_has_calls(log_debug_calls) diff --git a/tests/functional/openlp_core/common/test_projector_utilities.py b/tests/openlp_core/projectors/test_projector_utilities.py similarity index 100% rename from tests/functional/openlp_core/common/test_projector_utilities.py rename to tests/openlp_core/projectors/test_projector_utilities.py diff --git a/tests/interfaces/openlp_core/ui/test_projectoreditform.py b/tests/openlp_core/projectors/test_projectoreditform.py similarity index 100% rename from tests/interfaces/openlp_core/ui/test_projectoreditform.py rename to tests/openlp_core/projectors/test_projectoreditform.py diff --git a/tests/interfaces/openlp_core/ui/test_projectormanager.py b/tests/openlp_core/projectors/test_projectormanager.py similarity index 100% rename from tests/interfaces/openlp_core/ui/test_projectormanager.py rename to tests/openlp_core/projectors/test_projectormanager.py diff --git a/tests/interfaces/openlp_core/ui/test_projectorsourceform.py b/tests/openlp_core/projectors/test_projectorsourceform.py similarity index 98% rename from tests/interfaces/openlp_core/ui/test_projectorsourceform.py rename to tests/openlp_core/projectors/test_projectorsourceform.py index 5ddf5641a..65c3eea99 100644 --- a/tests/interfaces/openlp_core/ui/test_projectorsourceform.py +++ b/tests/openlp_core/projectors/test_projectorsourceform.py @@ -125,7 +125,6 @@ class ProjectorSourceFormTest(TestCase, TestMixin): select_form = SourceSelectSingle(parent=None, projectordb=self.projectordb) select_form.edit = True select_form.exec(projector=self.projector) - projector = select_form.projector # THEN: Verify all 4 buttons are available assert len(select_form.button_box.buttons()) == 4, \ @@ -144,7 +143,6 @@ class ProjectorSourceFormTest(TestCase, TestMixin): select_form = SourceSelectSingle(parent=None, projectordb=self.projectordb) select_form.edit = False select_form.exec(projector=self.projector) - projector = select_form.projector # THEN: Verify only 2 buttons are available assert len(select_form.button_box.buttons()) == 2, \ diff --git a/tests/resources/projector/data.py b/tests/resources/projector/data.py index acfb51df9..c3a161b16 100644 --- a/tests/resources/projector/data.py +++ b/tests/resources/projector/data.py @@ -45,7 +45,8 @@ TEST1_DATA = dict(ip='111.111.111.111', serial_no='Serial Number 1', sw_version='Version 1', model_filter='Filter type 1', - model_lamp='Lamp type 1') + model_lamp='Lamp type 1', + mac_adx='11:11:11:11:11:11') TEST2_DATA = dict(ip='222.222.222.222', port='2222', @@ -56,7 +57,8 @@ TEST2_DATA = dict(ip='222.222.222.222', serial_no='Serial Number 2', sw_version='Version 2', model_filter='Filter type 2', - model_lamp='Lamp type 2') + model_lamp='Lamp type 2', + mac_adx='22:22:22:22:22:22') TEST3_DATA = dict(ip='333.333.333.333', port='3333', @@ -67,7 +69,8 @@ TEST3_DATA = dict(ip='333.333.333.333', serial_no='Serial Number 3', sw_version='Version 3', model_filter='Filter type 3', - model_lamp='Lamp type 3') + model_lamp='Lamp type 3', + mac_adx='33:33:33:33:33:33') TEST_VIDEO_CODES = { '11': 'RGB 1',