From a3f203034926e00c131f1c2118096d1dad8c10e0 Mon Sep 17 00:00:00 2001 From: Ken Roberts Date: Fri, 19 Oct 2018 21:33:32 -0700 Subject: [PATCH] PJLink2 Update T and pep8 --- openlp/core/common/__init__.py | 8 +- openlp/core/common/settings.py | 3 +- openlp/core/projectors/editform.py | 1 + openlp/core/projectors/manager.py | 81 +++++-- openlp/core/projectors/pjlink.py | 58 ++++- openlp/core/projectors/tab.py | 47 ++++ .../common/test_network_interfaces.py | 78 ------ .../common/test_network_interfaces.py | 224 ++++++++++++++++++ tests/openlp_core/projectors/__init__.py | 4 +- .../projectors/test_projector_pjlink_udp.py | 195 ++++++++++++++- 10 files changed, 566 insertions(+), 133 deletions(-) delete mode 100644 tests/functional/openlp_core/common/test_network_interfaces.py create mode 100644 tests/openlp_core/common/test_network_interfaces.py diff --git a/openlp/core/common/__init__.py b/openlp/core/common/__init__.py index 0148bce99..a7215e394 100644 --- a/openlp/core/common/__init__.py +++ b/openlp/core/common/__init__.py @@ -60,7 +60,6 @@ def get_local_ip4(): :returns: Dict of interfaces """ - # Get the local IPv4 active address(es) that are NOT localhost (lo or '127.0.0.1') log.debug('Getting local IPv4 interface(es) information') my_ip4 = {} for iface in QNetworkInterface.allInterfaces(): @@ -70,8 +69,6 @@ def get_local_ip4(): log.debug('Checking address(es) protocol') for address in iface.addressEntries(): ip = address.ip() - # NOTE: Next line will skip if interface is localhost - keep for now until we decide about it later - # if (ip.protocol() == QAbstractSocket.IPv4Protocol) and (ip != QHostAddress.LocalHost): log.debug('Checking for protocol == IPv4Protocol') if ip.protocol() == QAbstractSocket.IPv4Protocol: log.debug('Getting interface information') @@ -83,12 +80,13 @@ def get_local_ip4(): ip.toIPv4Address()).toString() } log.debug('Adding {iface} to active list'.format(iface=iface.name())) + if len(my_ip4) == 0: + log.warning('No active IPv4 network interfaces detected') + return my_ip4 if 'localhost' in my_ip4: log.debug('Renaming windows localhost to lo') my_ip4['lo'] = my_ip4['localhost'] my_ip4.pop('localhost') - if len(my_ip4) == 0: - log.warning('No active IPv4 network interfaces detected') if len(my_ip4) == 1: if 'lo' in my_ip4: # No active interfaces - so leave localhost in there diff --git a/openlp/core/common/settings.py b/openlp/core/common/settings.py index 74d686b09..8ac41375c 100644 --- a/openlp/core/common/settings.py +++ b/openlp/core/common/settings.py @@ -217,7 +217,8 @@ class Settings(QtCore.QSettings): 'projector/last directory export': None, 'projector/poll time': 20, # PJLink timeout is 30 seconds 'projector/socket timeout': 5, # 5 second socket timeout - 'projector/source dialog type': 0 # Source select dialog box type + 'projector/source dialog type': 0, # Source select dialog box type + 'projector/udp broadcast listen': False # Enable/disable listening for PJLink 2 UDP broadcast packets } __file_path__ = '' # Settings upgrades prior to 3.0 diff --git a/openlp/core/projectors/editform.py b/openlp/core/projectors/editform.py index ef88605ef..04ad668ca 100644 --- a/openlp/core/projectors/editform.py +++ b/openlp/core/projectors/editform.py @@ -179,6 +179,7 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm): Validate input before accepting input. """ log.debug('accept_me() signal received') + valid = True if len(self.name_text.text().strip()) < 1: QtWidgets.QMessageBox.warning(self, translate('OpenLP.ProjectorEdit', 'Name Not Set'), diff --git a/openlp/core/projectors/manager.py b/openlp/core/projectors/manager.py index c2575c60a..add7cd4c0 100644 --- a/openlp/core/projectors/manager.py +++ b/openlp/core/projectors/manager.py @@ -32,13 +32,13 @@ from PyQt5 import QtCore, QtGui, QtWidgets from openlp.core.common.i18n import translate from openlp.core.ui.icons import UiIcons from openlp.core.common.mixins import LogMixin, RegistryProperties -from openlp.core.common.registry import RegistryBase +from openlp.core.common.registry import Registry, RegistryBase from openlp.core.common.settings import Settings from openlp.core.lib.ui import create_widget_action from openlp.core.projectors import DialogSourceStyle from openlp.core.projectors.constants import E_AUTHENTICATION, E_ERROR, E_NETWORK, E_NOT_CONNECTED, \ E_SOCKET_TIMEOUT, E_UNKNOWN_SOCKET_ERROR, S_CONNECTED, S_CONNECTING, S_COOLDOWN, S_INITIALIZE, \ - S_NOT_CONNECTED, S_OFF, S_ON, S_STANDBY, S_WARMUP, PJLINK_PORT, STATUS_CODE, STATUS_MSG, QSOCKET_STATE + S_NOT_CONNECTED, S_OFF, S_ON, S_STANDBY, S_WARMUP, STATUS_CODE, STATUS_MSG, QSOCKET_STATE from openlp.core.projectors.db import ProjectorDB from openlp.core.projectors.editform import ProjectorEditForm @@ -297,7 +297,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM self.projector_list = [] self.source_select_form = None # Dictionary of PJLinkUDP objects to listen for UDP broadcasts from PJLink 2+ projectors. - # Key is port number that projectors use + # Key is port number self.pjlink_udp = {} # Dict for matching projector status to display icon self.status_icons = { @@ -335,10 +335,6 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM """ Post-initialize setups. """ - # Default PJLink port UDP socket - log.debug('Creating PJLinkUDP listener for default port {port}'.format(port=PJLINK_PORT)) - self.pjlink_udp = {PJLINK_PORT: PJLinkUDP(port=PJLINK_PORT)} - self.pjlink_udp[PJLINK_PORT].bind(PJLINK_PORT) # Set 1.5 second delay before loading all projectors if self.autostart: log.debug('Delaying 1.5 seconds before loading all projectors') @@ -351,6 +347,36 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM self.projector_form.editProjector.connect(self.edit_projector_from_wizard) self.projector_list_widget.itemSelectionChanged.connect(self.update_icons) + def udp_listen_add(self, port): + """ + Add UDP broadcast listener + """ + if port in self.pjlink_udp: + log.warning('UDP Listener for port {port} already added - skipping'.format(port=port)) + else: + log.debug('Adding UDP listener on port {port}'.format(port=port)) + self.pjlink_udp[port] = PJLinkUDP(port=port) + Registry().execute('udp_broadcast_add', port=port, callback=self.pjlink_udp[port].check_settings) + + def udp_listen_delete(self, port): + """ + Remove a UDP broadcast listener + """ + log.debug('Checking for UDP port {port} listener deletion'.format(port=port)) + if port not in self.pjlink_udp: + log.warn('UDP listener for port {port} not there - skipping delete'.format(port=port)) + return + keep_port = False + for item in self.projector_list: + if port == item.link.port: + keep_port = True + if keep_port: + log.warn('UDP listener for port {port} needed for other projectors - skipping delete'.format(port=port)) + return + Registry().execute('udp_broadcast_remove', port=port) + del self.pjlink_udp[port] + log.debug('UDP listener for port {port} deleted'.format(port=port)) + def get_settings(self): """ Retrieve the saved settings @@ -518,25 +544,22 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM except (AttributeError, TypeError): pass try: - projector.poll_timer.stop() - projector.poll_timer.timeout.disconnect(projector.link.poll_loop) + projector.link.poll_timer.stop() + projector.link.poll_timer.timeout.disconnect(projector.link.poll_loop) except (AttributeError, TypeError): pass try: - projector.socket_timer.stop() - projector.socket_timer.timeout.disconnect(projector.link.socket_abort) - except (AttributeError, TypeError): - pass - # Disconnect signals from projector being deleted - try: - self.pjlink_udp[projector.link.port].data_received.disconnect(projector.link.get_buffer) + projector.link.socket_timer.stop() + projector.link.socket_timer.timeout.disconnect(projector.link.socket_abort) except (AttributeError, TypeError): pass + old_port = projector.link.port # Rebuild projector list new_list = [] for item in self.projector_list: if item.link.db_item.id == projector.link.db_item.id: + log.debug('Removing projector "{item}"'.format(item=item.link.name)) continue new_list.append(item) self.projector_list = new_list @@ -546,6 +569,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM log.warning('Delete projector {item} failed'.format(item=projector.db_item)) for item in self.projector_list: log.debug('New projector list - item: {ip} {name}'.format(ip=item.link.ip, name=item.link.name)) + self.udp_listen_delete(old_port) def on_disconnect_projector(self, opt=None): """ @@ -748,15 +772,8 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM item.link.projectorAuthentication.connect(self.authentication_error) item.link.projectorNoAuthentication.connect(self.no_authentication_error) item.link.projectorUpdateIcons.connect(self.update_icons) - # Connect UDP signal to projector instances with same port - if item.link.port not in self.pjlink_udp: - log.debug('Adding new PJLinkUDP listener fo port {port}'.format(port=item.link.port)) - self.pjlink_udp[item.link.port] = PJLinkUDP(port=item.link.port) - self.pjlink_udp[item.link.port].bind(item.link.port) - log.debug('Connecting PJLinkUDP port {port} signal to "{item}"'.format(port=item.link.port, - item=item.link.name)) - self.pjlink_udp[item.link.port].data_received.connect(item.link.get_buffer) - + # Add UDP listener for new projector port + self.udp_listen_add(item.link.port) self.projector_list.append(item) if start: item.link.connect_to_host() @@ -783,13 +800,25 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM :param projector: Projector() instance of projector with updated information """ log.debug('edit_projector_from_wizard(ip={ip})'.format(ip=projector.ip)) + old_port = self.old_projector.link.port + old_ip = self.old_projector.link.ip self.old_projector.link.name = projector.name self.old_projector.link.ip = projector.ip self.old_projector.link.pin = None if projector.pin == '' else projector.pin - self.old_projector.link.port = projector.port self.old_projector.link.location = projector.location self.old_projector.link.notes = projector.notes self.old_projector.widget.setText(projector.name) + self.old_projector.link.port = int(projector.port) + # Update projector list items + for item in self.projector_list: + if item.link.ip == old_ip: + item.link.port = int(projector.port) + # NOTE: This assumes (!) we are using IP addresses as keys + break + # Update UDP listeners before setting old_projector.port + if old_port != projector.port: + self.udp_listen_delete(old_port) + self.udp_listen_add(int(projector.port)) def _load_projectors(self): """' diff --git a/openlp/core/projectors/pjlink.py b/openlp/core/projectors/pjlink.py index 3ea1bb414..cd178bb48 100644 --- a/openlp/core/projectors/pjlink.py +++ b/openlp/core/projectors/pjlink.py @@ -98,32 +98,51 @@ class PJLinkUDP(QtNetwork.QUdpSocket): self.search_active = False self.search_time = 30000 # 30 seconds for allowed time self.search_timer = QtCore.QTimer() + self.udp_broadcast_listen_setting = False + log.debug('(UDP:{port}) PJLinkUDP() Initialized'.format(port=self.port)) + if Settings().value('projector/udp broadcast listen'): + self.udp_start() + + def udp_start(self): + """ + Start listening on UDP port + """ + log.debug('(UDP:{port}) Start called'.format(port=self.port)) self.readyRead.connect(self.get_datagram) - log.debug('(UDP) PJLinkUDP() Initialized for port {port}'.format(port=self.port)) + self.check_settings(checked=Settings().value('projector/udp broadcast listen')) + + def udp_stop(self): + """ + Stop listening on UDP port + """ + log.debug('(UDP:{port}) Stopping listener'.format(port=self.port)) + self.close() + self.readyRead.disconnect(self.get_datagram) @QtCore.pyqtSlot() def get_datagram(self): """ Retrieve packet and basic checks """ - log.debug('(UDP) get_datagram() - Receiving data') + log.debug('(UDP:{port}) get_datagram() - Receiving data'.format(port=self.port)) read_size = self.pendingDatagramSize() if -1 == read_size: - log.warning('(UDP) No data (-1)') + log.warning('(UDP:{port}) No data (-1)'.format(port=self.port)) return elif 0 == read_size: - log.warning('(UDP) get_datagram() called when pending data size is 0') + log.warning('(UDP:{port}) get_datagram() called when pending data size is 0'.format(port=self.port)) return elif read_size > PJLINK_MAX_PACKET: - log.warning('(UDP) UDP Packet too large ({size} bytes)- ignoring'.format(size=read_size)) + log.warning('(UDP:{port}) UDP Packet too large ({size} bytes)- ignoring'.format(size=read_size, + port=self.port)) return data_in, peer_host, peer_port = self.readDatagram(read_size) data = data_in.decode('utf-8') if isinstance(data_in, bytes) else data_in - log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data), - adx=peer_host.toString(), - port=self.port)) - log.debug('(UDP) packet "{data}"'.format(data=data)) - log.debug('(UDP) Sending data_received signal to projectors') + log.debug('(UDP:{port}) {size} bytes received from {adx}'.format(size=len(data), + adx=peer_host.toString(), + port=self.port)) + log.debug('(UDP:{port}) packet "{data}"'.format(data=data, port=self.port)) + log.debug('(UDP:{port}) Sending data_received signal to projectors'.format(port=self.port)) self.data_received.emit(peer_host, self.localPort(), data) return @@ -143,6 +162,25 @@ class PJLinkUDP(QtNetwork.QUdpSocket): self.search_active = False self.search_timer.stop() + def check_settings(self, checked): + """ + Update UDP listening state based on settings change. + NOTE: This method is called by projector settings tab and setup/removed by ProjectorManager + """ + if self.udp_broadcast_listen_setting == checked: + log.debug('(UDP:{port}) No change to status - skipping'.format(port=self.port)) + return + self.udp_broadcast_listen_setting = checked + if self.udp_broadcast_listen_setting: + if self.state() == self.ListeningState: + log.debug('(UDP:{port}) Already listening - skipping') + return + self.bind(self.port) + log.debug('(UDP:{port}) Listening'.format(port=self.port)) + else: + # Close socket + self.udp_stop() + class PJLinkCommands(object): """ diff --git a/openlp/core/projectors/tab.py b/openlp/core/projectors/tab.py index 2407bad66..fa76e1c25 100644 --- a/openlp/core/projectors/tab.py +++ b/openlp/core/projectors/tab.py @@ -27,6 +27,7 @@ import logging from PyQt5 import QtWidgets from openlp.core.common.i18n import UiStrings, translate +from openlp.core.common.registry import Registry from openlp.core.common.settings import Settings from openlp.core.lib.settingstab import SettingsTab from openlp.core.ui.icons import UiIcons @@ -47,8 +48,11 @@ class ProjectorTab(SettingsTab): :param parent: Parent widget """ self.icon_path = UiIcons().projector + self.udp_listeners = {} # Key on port number projector_translated = translate('OpenLP.ProjectorTab', 'Projector') super(ProjectorTab, self).__init__(parent, 'Projector', projector_translated) + Registry().register_function('udp_broadcast_add', self.add_udp_listener) + Registry().register_function('udp_broadcast_remove', self.remove_udp_listener) def setupUi(self): """ @@ -90,6 +94,10 @@ class ProjectorTab(SettingsTab): self.connect_box_layout.addRow(self.dialog_type_label, self.dialog_type_combo_box) self.left_layout.addStretch() self.dialog_type_combo_box.activated.connect(self.on_dialog_type_combo_box_changed) + # Enable/disable listening on UDP ports for PJLink2 broadcasts + self.udp_broadcast_listen = QtWidgets.QCheckBox(self.connect_box) + self.udp_broadcast_listen.setObjectName('udp_broadcast_listen') + self.connect_box_layout.addRow(self.udp_broadcast_listen) # Connect on LKUP packet received (PJLink v2+ only) self.connect_on_linkup = QtWidgets.QCheckBox(self.connect_box) self.connect_on_linkup.setObjectName('connect_on_linkup') @@ -116,6 +124,9 @@ class ProjectorTab(SettingsTab): translate('OpenLP.ProjectorTab', 'Single dialog box')) self.connect_on_linkup.setText( translate('OpenLP.ProjectorTab', 'Connect to projector when LINKUP received (v2 only)')) + self.udp_broadcast_listen.setText( + translate('OpenLP.ProjectorTab', 'Enable listening for PJLink2 broadcast messages')) + log.debug('PJLink settings tab initialized') def load(self): """ @@ -125,6 +136,7 @@ class ProjectorTab(SettingsTab): self.socket_timeout_spin_box.setValue(Settings().value('projector/socket timeout')) self.socket_poll_spin_box.setValue(Settings().value('projector/poll time')) self.dialog_type_combo_box.setCurrentIndex(Settings().value('projector/source dialog type')) + self.udp_broadcast_listen.setChecked(Settings().value('projector/udp broadcast listen')) self.connect_on_linkup.setChecked(Settings().value('projector/connect when LKUP received')) def save(self): @@ -136,6 +148,41 @@ class ProjectorTab(SettingsTab): Settings().setValue('projector/poll time', self.socket_poll_spin_box.value()) Settings().setValue('projector/source dialog type', self.dialog_type_combo_box.currentIndex()) Settings().setValue('projector/connect when LKUP received', self.connect_on_linkup.isChecked()) + Settings().setValue('projector/udp broadcast listen', self.udp_broadcast_listen.isChecked()) + self.call_udp_listener() def on_dialog_type_combo_box_changed(self): self.dialog_type = self.dialog_type_combo_box.currentIndex() + + def add_udp_listener(self, port, callback): + """ + Add new UDP listener to list + """ + if port in self.udp_listeners: + log.warning('Port {port} already in list - not adding'.format(port=port)) + return + self.udp_listeners[port] = callback + log.debug('PJLinkSettings: new callback list: {port}'.format(port=self.udp_listeners.keys())) + + def remove_udp_listener(self, port): + """ + Remove UDP listener from list + """ + if port not in self.udp_listeners: + log.warning('Port {port} not in list - ignoring'.format(port=port)) + return + # Turn off listener before deleting + self.udp_listeners[port](checked=False) + del self.udp_listeners[port] + log.debug('PJLinkSettings: new callback list: {port}'.format(port=self.udp_listeners.keys())) + + def call_udp_listener(self): + """ + Call listeners to update UDP listen setting + """ + if len(self.udp_listeners) < 1: + log.warning('PJLinkSettings: No callers - returning') + return + log.debug('PJLinkSettings: Calling UDP listeners') + for call in self.udp_listeners: + self.udp_listeners[call](checked=self.udp_broadcast_listen.isChecked()) diff --git a/tests/functional/openlp_core/common/test_network_interfaces.py b/tests/functional/openlp_core/common/test_network_interfaces.py deleted file mode 100644 index d1547bd0a..000000000 --- a/tests/functional/openlp_core/common/test_network_interfaces.py +++ /dev/null @@ -1,78 +0,0 @@ -# -*- coding: utf-8 -*- -# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4 - -############################################################################### -# OpenLP - Open Source Lyrics Projection # -# --------------------------------------------------------------------------- # -# Copyright (c) 2008-2017 OpenLP Developers # -# --------------------------------------------------------------------------- # -# This program is free software; you can redistribute it and/or modify it # -# under the terms of the GNU General Public License as published by the Free # -# Software Foundation; version 2 of the License. # -# # -# This program is distributed in the hope that it will be useful, but WITHOUT # -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # -# more details. # -# # -# You should have received a copy of the GNU General Public License along # -# with this program; if not, write to the Free Software Foundation, Inc., 59 # -# Temple Place, Suite 330, Boston, MA 02111-1307 USA # -############################################################################### -""" -Functional tests to test calls for network interfaces. -""" -from unittest import TestCase -from unittest.mock import MagicMock, call, patch - -import openlp.core.common -from openlp.core.common import get_local_ip4 - -from tests.helpers.testmixin import TestMixin - -lo_address_attrs = {'isValid.return_value': True, - 'flags.return_value': True, - 'InterfaceFlags.return_value': True, - 'name.return_value': 'lo', - 'broadcast.toString.return_value': '127.0.0.255', - 'netmask.toString.return_value': '255.0.0.0', - 'prefixLength.return_value': 8, - 'ip.protocol.return_value': True} - - -class TestInterfaces(TestCase, TestMixin): - """ - A test suite to test out functions/methods that use network interface(s). - """ - def setUp(self): - """ - Create an instance and a few example actions. - """ - self.build_settings() - - self.ip4_lo_address = MagicMock() - self.ip4_lo_address.configure_mock(**lo_address_attrs) - - def tearDown(self): - """ - Clean up - """ - self.destroy_settings() - - @patch.object(openlp.core.common, 'log') - def test_ip4_no_interfaces(self, mock_log): - """ - Test no interfaces available - """ - # GIVEN: Test environment - call_warning = [call('No active IPv4 network interfaces detected')] - - with patch('openlp.core.common.QNetworkInterface') as mock_newtork_interface: - mock_newtork_interface.allInterfaces.return_value = [] - - # WHEN: get_local_ip4 is called - ifaces = get_local_ip4() - - # THEN: There should not be any interfaces detected - assert not ifaces, 'There should have been no active interfaces' - mock_log.warning.assert_has_calls(call_warning) diff --git a/tests/openlp_core/common/test_network_interfaces.py b/tests/openlp_core/common/test_network_interfaces.py new file mode 100644 index 000000000..7a049b234 --- /dev/null +++ b/tests/openlp_core/common/test_network_interfaces.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- +# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4 + +############################################################################### +# OpenLP - Open Source Lyrics Projection # +# --------------------------------------------------------------------------- # +# Copyright (c) 2008-2017 OpenLP Developers # +# --------------------------------------------------------------------------- # +# This program is free software; you can redistribute it and/or modify it # +# under the terms of the GNU General Public License as published by the Free # +# Software Foundation; version 2 of the License. # +# # +# This program is distributed in the hope that it will be useful, but WITHOUT # +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # +# more details. # +# # +# You should have received a copy of the GNU General Public License along # +# with this program; if not, write to the Free Software Foundation, Inc., 59 # +# Temple Place, Suite 330, Boston, MA 02111-1307 USA # +############################################################################### +""" +Functional tests to test calls for network interfaces. +""" +from unittest import TestCase +from unittest.mock import call, patch +from PyQt5.QtCore import QObject +from PyQt5.QtNetwork import QHostAddress, QNetworkAddressEntry, QNetworkInterface + +import openlp.core.common +from openlp.core.common import get_local_ip4 + +from tests.helpers.testmixin import TestMixin + + +class FakeIP4InterfaceEntry(QObject): + """ + Class to face an interface for testing purposes + """ + def __init__(self, name='lo'): + self.my_name = name + if name in ['localhost', 'lo']: + self.my_ip = QNetworkAddressEntry() + self.my_ip.setBroadcast(QHostAddress('255.0.0.0')) + self.my_ip.setIp(QHostAddress('127.0.0.2')) + self.my_ip.setPrefixLength(8) + self.fake_data = {'lo': {'ip': '127.0.0.2', + 'broadcast': '255.0.0.0', + 'netmask': '255.0.0.0', + 'prefix': 8, + 'localnet': '127.0.0.0'}} + else: + # Define a fake real address + self.my_ip = QNetworkAddressEntry() + self.my_ip.setBroadcast(QHostAddress('255.255.255.0')) + self.my_ip.setIp(QHostAddress('127.254.0.2')) + self.my_ip.setPrefixLength(24) + self.fake_data = {self.my_name: {'ip': '127.254.0.2', + 'broadcast': '255.255.255.0', + 'netmask': '255.255.255.0', + 'prefix': 24, + 'localnet': '127.254.0.0'}} + + def addressEntries(self): + """ + Return fake IP address + """ + return [self.my_ip] + + def flags(self): + """ + Return a QFlags enum with IsUp and IsRunning + """ + return (QNetworkInterface.IsUp | QNetworkInterface.IsRunning) + + def name(self): + return self.my_name + + def isValid(self): + return True + + +class TestInterfaces(TestCase, TestMixin): + """ + A test suite to test out functions/methods that use network interface(s). + """ + def setUp(self): + """ + Create an instance and a few example actions. + """ + self.build_settings() + if not hasattr(self, 'fake_lo'): + # Since these shouldn't change, only need to instantiate them the first time + self.fake_lo = FakeIP4InterfaceEntry() + self.fake_localhost = FakeIP4InterfaceEntry(name='localhost') + self.fake_address = FakeIP4InterfaceEntry(name='eth25') + + def tearDown(self): + """ + Clean up + """ + self.destroy_settings() + + @patch.object(openlp.core.common, 'log') + def test_ip4_no_interfaces(self, mock_log): + """ + Test no interfaces available + """ + # GIVEN: Test environment + call_debug = [call('Getting local IPv4 interface(es) information')] + call_warning = [call('No active IPv4 network interfaces detected')] + + # WHEN: get_local_ip4 is called + with patch('openlp.core.common.QNetworkInterface') as mock_network_interface: + mock_network_interface.allInterfaces.return_value = [] + ifaces = get_local_ip4() + + # THEN: There should not be any interfaces detected + mock_log.debug.assert_has_calls(call_debug) + mock_log.warning.assert_has_calls(call_warning) + assert not ifaces, 'There should have been no active interfaces listed' + + @patch.object(openlp.core.common, 'log') + def test_ip4_lo(self, mock_log): + """ + Test get_local_ip4 returns proper dictionary with 'lo' + """ + # GIVEN: Test environment + call_debug = [call('Getting local IPv4 interface(es) information'), + call('Checking for isValid and flags == IsUP | IsRunning'), + call('Checking address(es) protocol'), + call('Checking for protocol == IPv4Protocol'), + call('Getting interface information'), + call('Adding lo to active list')] + call_warning = [call('No active IPv4 interfaces found except localhost')] + + # WHEN: get_local_ip4 is called + with patch('openlp.core.common.QNetworkInterface') as mock_network_interface: + mock_network_interface.allInterfaces.return_value = [self.fake_lo] + ifaces = get_local_ip4() + + # THEN: There should be a fake 'lo' interface + mock_log.debug.assert_has_calls(call_debug) + mock_log.warning.assert_has_calls(call_warning) + assert ifaces == self.fake_lo.fake_data, "There should have been an 'lo' interface listed" + + @patch.object(openlp.core.common, 'log') + def test_ip4_localhost(self, mock_log): + """ + Test get_local_ip4 returns proper dictionary with 'lo' if interface is 'localhost' + """ + # GIVEN: Test environment + call_debug = [call('Getting local IPv4 interface(es) information'), + call('Checking for isValid and flags == IsUP | IsRunning'), + call('Checking address(es) protocol'), + call('Checking for protocol == IPv4Protocol'), + call('Getting interface information'), + call('Adding localhost to active list'), + call('Renaming windows localhost to lo')] + call_warning = [call('No active IPv4 interfaces found except localhost')] + + # WHEN: get_local_ip4 is called + with patch('openlp.core.common.QNetworkInterface') as mock_network_interface: + mock_network_interface.allInterfaces.return_value = [self.fake_localhost] + ifaces = get_local_ip4() + + # THEN: There should be a fake 'lo' interface + mock_log.debug.assert_has_calls(call_debug) + mock_log.warning.assert_has_calls(call_warning) + assert ifaces == self.fake_lo.fake_data, "There should have been an 'lo' interface listed" + + @patch.object(openlp.core.common, 'log') + def test_ip4_eth25(self, mock_log): + """ + Test get_local_ip4 returns proper dictionary with 'eth25' + """ + # GIVEN: Test environment + call_debug = [call('Getting local IPv4 interface(es) information'), + call('Checking for isValid and flags == IsUP | IsRunning'), + call('Checking address(es) protocol'), + call('Checking for protocol == IPv4Protocol'), + call('Getting interface information'), + call('Adding eth25 to active list')] + call_warning = [] + + # WHEN: get_local_ip4 is called + with patch('openlp.core.common.QNetworkInterface') as mock_network_interface: + mock_network_interface.allInterfaces.return_value = [self.fake_address] + ifaces = get_local_ip4() + + # THEN: There should be a fake 'eth25' interface + mock_log.debug.assert_has_calls(call_debug) + mock_log.warning.assert_has_calls(call_warning) + assert ifaces == self.fake_address.fake_data + + @patch.object(openlp.core.common, 'log') + def test_ip4_lo_eth25(self, mock_log): + """ + Test get_local_ip4 returns proper dictionary with 'eth25' + """ + # GIVEN: Test environment + call_debug = [call('Getting local IPv4 interface(es) information'), + call('Checking for isValid and flags == IsUP | IsRunning'), + call('Checking address(es) protocol'), + call('Checking for protocol == IPv4Protocol'), + call('Getting interface information'), + call('Adding lo to active list'), + call('Checking for isValid and flags == IsUP | IsRunning'), + call('Checking address(es) protocol'), + call('Checking for protocol == IPv4Protocol'), + call('Getting interface information'), + call('Adding eth25 to active list'), + call('Found at least one IPv4 interface, removing localhost')] + call_warning = [] + + # WHEN: get_local_ip4 is called + with patch('openlp.core.common.QNetworkInterface') as mock_network_interface: + mock_network_interface.allInterfaces.return_value = [self.fake_lo, self.fake_address] + ifaces = get_local_ip4() + + # THEN: There should be a fake 'eth25' interface + mock_log.debug.assert_has_calls(call_debug) + mock_log.warning.assert_has_calls(call_warning) + assert ifaces == self.fake_address.fake_data, "There should have been only 'eth25' interface listed" diff --git a/tests/openlp_core/projectors/__init__.py b/tests/openlp_core/projectors/__init__.py index 0a1a5ca7e..64536c41d 100644 --- a/tests/openlp_core/projectors/__init__.py +++ b/tests/openlp_core/projectors/__init__.py @@ -4,7 +4,7 @@ ############################################################################### # OpenLP - Open Source Lyrics Projection # # --------------------------------------------------------------------------- # -# Copyright (c) 2008-2017 OpenLP Developers # +# Copyright (c) 2008-2018 OpenLP Developers # # --------------------------------------------------------------------------- # # This program is free software; you can redistribute it and/or modify it # # under the terms of the GNU General Public License as published by the Free # @@ -20,5 +20,5 @@ # Temple Place, Suite 330, Boston, MA 02111-1307 USA # ############################################################################### """ -Module-level functions for the projector test suite +:mod tests/openlp_core/projectors: Tests for projector code """ diff --git a/tests/openlp_core/projectors/test_projector_pjlink_udp.py b/tests/openlp_core/projectors/test_projector_pjlink_udp.py index 45c91347f..564e13128 100644 --- a/tests/openlp_core/projectors/test_projector_pjlink_udp.py +++ b/tests/openlp_core/projectors/test_projector_pjlink_udp.py @@ -28,16 +28,45 @@ from unittest import TestCase from unittest.mock import call, patch import openlp.core.projectors.pjlink +from openlp.core.common.registry import Registry from openlp.core.projectors.constants import PJLINK_PORT - +from openlp.core.projectors.tab import ProjectorTab from openlp.core.projectors.pjlink import PJLinkUDP + +from tests.helpers.testmixin import TestMixin from tests.resources.projector.data import TEST1_DATA -class TestPJLinkBase(TestCase): +class TestPJLinkBase(TestCase, TestMixin): """ Tests for the PJLinkUDP class """ + def setUp(self): + """ + Create the UI and setup necessary options + """ + self.setup_application() + self.build_settings() + Registry.create() + """ + with patch('openlp.core.projectors.db.init_url') as mocked_init_url: + if os.path.exists(TEST_DB): + os.unlink(TEST_DB) + mocked_init_url.return_value = 'sqlite:///%s' % TEST_DB + self.projectordb = ProjectorDB() + if not hasattr(self, 'projector_manager'): + self.projector_manager = ProjectorManager(projectordb=self.projectordb) + """ + + def tearDown(self): + """ + Remove test database. + Delete all the C++ objects at the end so that we don't have a segfault. + """ + # self.projectordb.session.close() + self.destroy_settings() + # del self.projector_manager + @patch.object(openlp.core.projectors.pjlink, 'log') def test_get_datagram_data_negative_zero_length(self, mock_log): """ @@ -45,9 +74,9 @@ class TestPJLinkBase(TestCase): """ # GIVEN: Test setup pjlink_udp = PJLinkUDP() - log_warning_calls = [call('(UDP) No data (-1)')] - log_debug_calls = [call('(UDP) PJLinkUDP() Initialized for port 4352'), - call('(UDP) get_datagram() - Receiving data')] + log_warning_calls = [call('(UDP:4352) No data (-1)')] + log_debug_calls = [call('(UDP:4352) PJLinkUDP() Initialized'), + call('(UDP:4352) get_datagram() - Receiving data')] with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ patch.object(pjlink_udp, 'readDatagram') as mock_read: mock_datagram.return_value = -1 @@ -67,9 +96,9 @@ class TestPJLinkBase(TestCase): """ # GIVEN: Test setup pjlink_udp = PJLinkUDP() - log_warning_calls = [call('(UDP) get_datagram() called when pending data size is 0')] - log_debug_calls = [call('(UDP) PJLinkUDP() Initialized for port 4352'), - call('(UDP) get_datagram() - Receiving data')] + log_warning_calls = [call('(UDP:4352) get_datagram() called when pending data size is 0')] + log_debug_calls = [call('(UDP:4352) PJLinkUDP() Initialized'), + call('(UDP:4352) get_datagram() - Receiving data')] with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ patch.object(pjlink_udp, 'readDatagram') as mock_read: mock_datagram.return_value = 0 @@ -89,9 +118,9 @@ class TestPJLinkBase(TestCase): """ # GIVEN: Test setup pjlink_udp = PJLinkUDP() - log_warning_calls = [call('(UDP) get_datagram() called when pending data size is 0')] - log_debug_calls = [call('(UDP) PJLinkUDP() Initialized for port 4352'), - call('(UDP) get_datagram() - Receiving data')] + log_warning_calls = [call('(UDP:4352) get_datagram() called when pending data size is 0')] + log_debug_calls = [call('(UDP:4352) PJLinkUDP() Initialized'), + call('(UDP:4352) get_datagram() - Receiving data')] with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram: mock_datagram.return_value = 0 @@ -101,3 +130,147 @@ class TestPJLinkBase(TestCase): # THEN: Log entries should be made and method returns mock_log.warning.assert_has_calls(log_warning_calls) mock_log.debug.assert_has_calls(log_debug_calls) + + @patch.object(openlp.core.projectors.tab, 'log') + def test_pjlinksettings_add_udp_listener(self, mock_log): + """ + Test adding UDP listners to PJLink Settings tab + """ + # GIVEN: Initial setup + log_debug_calls = [call('PJLink settings tab initialized'), + call('PJLinkSettings: new callback list: dict_keys([4352])')] + log_warning_calls = [] + + pjlink_udp = PJLinkUDP() + settings_tab = ProjectorTab(parent=None) + + # WHEN: add_udp_listener is called with single port + settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings) + + # THEN: settings tab should have one entry + assert len(settings_tab.udp_listeners) == 1 + assert pjlink_udp.port in settings_tab.udp_listeners + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warning.assert_has_calls(log_warning_calls) + + @patch.object(openlp.core.projectors.tab, 'log') + def test_pjlinksettings_add_udp_listener_multiple_same(self, mock_log): + """ + Test adding second UDP listner with same port to PJLink Settings tab + """ + # GIVEN: Initial setup + log_debug_calls = [call('PJLink settings tab initialized'), + call('PJLinkSettings: new callback list: dict_keys([4352])')] + log_warning_calls = [call('Port 4352 already in list - not adding')] + pjlink_udp = PJLinkUDP() + settings_tab = ProjectorTab(parent=None) + settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings) + + # WHEN: add_udp_listener is called with second instance same port + settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings) + + # THEN: settings tab should have one entry + assert len(settings_tab.udp_listeners) == 1 + assert pjlink_udp.port in settings_tab.udp_listeners + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warning.assert_has_calls(log_warning_calls) + + @patch.object(openlp.core.projectors.tab, 'log') + def test_pjlinksettings_add_udp_listener_multiple_different(self, mock_log): + """ + Test adding second UDP listner with different port to PJLink Settings tab + """ + # GIVEN: Initial setup + log_debug_calls = [call('PJLink settings tab initialized'), + call('PJLinkSettings: new callback list: dict_keys([4352])')] + log_warning_calls = [] + + settings_tab = ProjectorTab(parent=None) + pjlink_udp1 = PJLinkUDP(port=4352) + settings_tab.add_udp_listener(port=pjlink_udp1.port, callback=pjlink_udp1.check_settings) + + # WHEN: add_udp_listener is called with second instance different port + pjlink_udp2 = PJLinkUDP(port=4353) + settings_tab.add_udp_listener(port=pjlink_udp2.port, callback=pjlink_udp2.check_settings) + + # THEN: settings tab should have two entry + assert len(settings_tab.udp_listeners) == 2 + assert pjlink_udp1.port in settings_tab.udp_listeners + assert pjlink_udp2.port in settings_tab.udp_listeners + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warning.assert_has_calls(log_warning_calls) + + @patch.object(openlp.core.projectors.tab, 'log') + def test_pjlinksettings_remove_udp_listener(self, mock_log): + """ + Test removing UDP listners to PJLink Settings tab + """ + # GIVEN: Initial setup + log_debug_calls = [call('PJLink settings tab initialized'), + call('PJLinkSettings: new callback list: dict_keys([4352])'), + call('PJLinkSettings: new callback list: dict_keys([])')] + log_warning_calls = [] + + pjlink_udp = PJLinkUDP() + settings_tab = ProjectorTab(parent=None) + settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings) + + # WHEN: remove_udp_listener is called with single port + settings_tab.remove_udp_listener(port=pjlink_udp.port) + + # THEN: settings tab should have one entry + assert len(settings_tab.udp_listeners) == 0 + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warning.assert_has_calls(log_warning_calls) + + @patch.object(openlp.core.projectors.tab, 'log') + def test_pjlinksettings_remove_udp_listener_multiple_different(self, mock_log): + """ + Test adding second UDP listner with different port to PJLink Settings tab + """ + # GIVEN: Initial setup + log_debug_calls = [call('PJLink settings tab initialized'), + call('PJLinkSettings: new callback list: dict_keys([4352])')] + log_warning_calls = [] + + settings_tab = ProjectorTab(parent=None) + pjlink_udp1 = PJLinkUDP(port=4352) + settings_tab.add_udp_listener(port=pjlink_udp1.port, callback=pjlink_udp1.check_settings) + pjlink_udp2 = PJLinkUDP(port=4353) + settings_tab.add_udp_listener(port=pjlink_udp2.port, callback=pjlink_udp2.check_settings) + + # WHEN: remove_udp_listener called for one port + settings_tab.remove_udp_listener(port=4353) + + # THEN: settings tab should have one entry + assert len(settings_tab.udp_listeners) == 1 + assert pjlink_udp1.port in settings_tab.udp_listeners + assert pjlink_udp2.port not in settings_tab.udp_listeners + mock_log.debug.assert_has_calls(log_debug_calls) + mock_log.warning.assert_has_calls(log_warning_calls) + + @patch.object(PJLinkUDP, 'check_settings') + @patch.object(openlp.core.projectors.pjlink, 'log') + @patch.object(openlp.core.projectors.tab, 'log') + def test_pjlinksettings_call_udp_listener(self, mock_tab_log, mock_pjlink_log, mock_check_settings): + """ + Test calling UDP listners in PJLink Settings tab + """ + # GIVEN: Initial setup + tab_debug_calls = [call('PJLink settings tab initialized'), + call('PJLinkSettings: new callback list: dict_keys([4352])'), + call('PJLinkSettings: Calling UDP listeners')] + pjlink_debug_calls = [call.debug('(UDP:4352) PJLinkUDP() Initialized')] + + pjlink_udp = PJLinkUDP() + settings_tab = ProjectorTab(parent=None) + settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings) + + # WHEN: calling UDP listener via registry + settings_tab.call_udp_listener() + + # THEN: settings tab should have one entry + assert len(settings_tab.udp_listeners) == 1 + mock_check_settings.assert_called() + mock_tab_log.debug.assert_has_calls(tab_debug_calls) + mock_pjlink_log.assert_has_calls(pjlink_debug_calls)