PJLink2 Update T and pep8

This commit is contained in:
Ken Roberts 2018-10-19 21:33:32 -07:00
parent f325f8d6da
commit a3f2030349
10 changed files with 566 additions and 133 deletions

View File

@ -60,7 +60,6 @@ def get_local_ip4():
:returns: Dict of interfaces :returns: Dict of interfaces
""" """
# Get the local IPv4 active address(es) that are NOT localhost (lo or '127.0.0.1')
log.debug('Getting local IPv4 interface(es) information') log.debug('Getting local IPv4 interface(es) information')
my_ip4 = {} my_ip4 = {}
for iface in QNetworkInterface.allInterfaces(): for iface in QNetworkInterface.allInterfaces():
@ -70,8 +69,6 @@ def get_local_ip4():
log.debug('Checking address(es) protocol') log.debug('Checking address(es) protocol')
for address in iface.addressEntries(): for address in iface.addressEntries():
ip = address.ip() ip = address.ip()
# NOTE: Next line will skip if interface is localhost - keep for now until we decide about it later
# if (ip.protocol() == QAbstractSocket.IPv4Protocol) and (ip != QHostAddress.LocalHost):
log.debug('Checking for protocol == IPv4Protocol') log.debug('Checking for protocol == IPv4Protocol')
if ip.protocol() == QAbstractSocket.IPv4Protocol: if ip.protocol() == QAbstractSocket.IPv4Protocol:
log.debug('Getting interface information') log.debug('Getting interface information')
@ -83,12 +80,13 @@ def get_local_ip4():
ip.toIPv4Address()).toString() ip.toIPv4Address()).toString()
} }
log.debug('Adding {iface} to active list'.format(iface=iface.name())) log.debug('Adding {iface} to active list'.format(iface=iface.name()))
if len(my_ip4) == 0:
log.warning('No active IPv4 network interfaces detected')
return my_ip4
if 'localhost' in my_ip4: if 'localhost' in my_ip4:
log.debug('Renaming windows localhost to lo') log.debug('Renaming windows localhost to lo')
my_ip4['lo'] = my_ip4['localhost'] my_ip4['lo'] = my_ip4['localhost']
my_ip4.pop('localhost') my_ip4.pop('localhost')
if len(my_ip4) == 0:
log.warning('No active IPv4 network interfaces detected')
if len(my_ip4) == 1: if len(my_ip4) == 1:
if 'lo' in my_ip4: if 'lo' in my_ip4:
# No active interfaces - so leave localhost in there # No active interfaces - so leave localhost in there

View File

@ -217,7 +217,8 @@ class Settings(QtCore.QSettings):
'projector/last directory export': None, 'projector/last directory export': None,
'projector/poll time': 20, # PJLink timeout is 30 seconds 'projector/poll time': 20, # PJLink timeout is 30 seconds
'projector/socket timeout': 5, # 5 second socket timeout 'projector/socket timeout': 5, # 5 second socket timeout
'projector/source dialog type': 0 # Source select dialog box type 'projector/source dialog type': 0, # Source select dialog box type
'projector/udp broadcast listen': False # Enable/disable listening for PJLink 2 UDP broadcast packets
} }
__file_path__ = '' __file_path__ = ''
# Settings upgrades prior to 3.0 # Settings upgrades prior to 3.0

View File

@ -179,6 +179,7 @@ class ProjectorEditForm(QtWidgets.QDialog, Ui_ProjectorEditForm):
Validate input before accepting input. Validate input before accepting input.
""" """
log.debug('accept_me() signal received') log.debug('accept_me() signal received')
valid = True
if len(self.name_text.text().strip()) < 1: if len(self.name_text.text().strip()) < 1:
QtWidgets.QMessageBox.warning(self, QtWidgets.QMessageBox.warning(self,
translate('OpenLP.ProjectorEdit', 'Name Not Set'), translate('OpenLP.ProjectorEdit', 'Name Not Set'),

View File

@ -32,13 +32,13 @@ from PyQt5 import QtCore, QtGui, QtWidgets
from openlp.core.common.i18n import translate from openlp.core.common.i18n import translate
from openlp.core.ui.icons import UiIcons from openlp.core.ui.icons import UiIcons
from openlp.core.common.mixins import LogMixin, RegistryProperties from openlp.core.common.mixins import LogMixin, RegistryProperties
from openlp.core.common.registry import RegistryBase from openlp.core.common.registry import Registry, RegistryBase
from openlp.core.common.settings import Settings from openlp.core.common.settings import Settings
from openlp.core.lib.ui import create_widget_action from openlp.core.lib.ui import create_widget_action
from openlp.core.projectors import DialogSourceStyle from openlp.core.projectors import DialogSourceStyle
from openlp.core.projectors.constants import E_AUTHENTICATION, E_ERROR, E_NETWORK, E_NOT_CONNECTED, \ from openlp.core.projectors.constants import E_AUTHENTICATION, E_ERROR, E_NETWORK, E_NOT_CONNECTED, \
E_SOCKET_TIMEOUT, E_UNKNOWN_SOCKET_ERROR, S_CONNECTED, S_CONNECTING, S_COOLDOWN, S_INITIALIZE, \ E_SOCKET_TIMEOUT, E_UNKNOWN_SOCKET_ERROR, S_CONNECTED, S_CONNECTING, S_COOLDOWN, S_INITIALIZE, \
S_NOT_CONNECTED, S_OFF, S_ON, S_STANDBY, S_WARMUP, PJLINK_PORT, STATUS_CODE, STATUS_MSG, QSOCKET_STATE S_NOT_CONNECTED, S_OFF, S_ON, S_STANDBY, S_WARMUP, STATUS_CODE, STATUS_MSG, QSOCKET_STATE
from openlp.core.projectors.db import ProjectorDB from openlp.core.projectors.db import ProjectorDB
from openlp.core.projectors.editform import ProjectorEditForm from openlp.core.projectors.editform import ProjectorEditForm
@ -297,7 +297,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
self.projector_list = [] self.projector_list = []
self.source_select_form = None self.source_select_form = None
# Dictionary of PJLinkUDP objects to listen for UDP broadcasts from PJLink 2+ projectors. # Dictionary of PJLinkUDP objects to listen for UDP broadcasts from PJLink 2+ projectors.
# Key is port number that projectors use # Key is port number
self.pjlink_udp = {} self.pjlink_udp = {}
# Dict for matching projector status to display icon # Dict for matching projector status to display icon
self.status_icons = { self.status_icons = {
@ -335,10 +335,6 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
""" """
Post-initialize setups. Post-initialize setups.
""" """
# Default PJLink port UDP socket
log.debug('Creating PJLinkUDP listener for default port {port}'.format(port=PJLINK_PORT))
self.pjlink_udp = {PJLINK_PORT: PJLinkUDP(port=PJLINK_PORT)}
self.pjlink_udp[PJLINK_PORT].bind(PJLINK_PORT)
# Set 1.5 second delay before loading all projectors # Set 1.5 second delay before loading all projectors
if self.autostart: if self.autostart:
log.debug('Delaying 1.5 seconds before loading all projectors') log.debug('Delaying 1.5 seconds before loading all projectors')
@ -351,6 +347,36 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
self.projector_form.editProjector.connect(self.edit_projector_from_wizard) self.projector_form.editProjector.connect(self.edit_projector_from_wizard)
self.projector_list_widget.itemSelectionChanged.connect(self.update_icons) self.projector_list_widget.itemSelectionChanged.connect(self.update_icons)
def udp_listen_add(self, port):
"""
Add UDP broadcast listener
"""
if port in self.pjlink_udp:
log.warning('UDP Listener for port {port} already added - skipping'.format(port=port))
else:
log.debug('Adding UDP listener on port {port}'.format(port=port))
self.pjlink_udp[port] = PJLinkUDP(port=port)
Registry().execute('udp_broadcast_add', port=port, callback=self.pjlink_udp[port].check_settings)
def udp_listen_delete(self, port):
"""
Remove a UDP broadcast listener
"""
log.debug('Checking for UDP port {port} listener deletion'.format(port=port))
if port not in self.pjlink_udp:
log.warn('UDP listener for port {port} not there - skipping delete'.format(port=port))
return
keep_port = False
for item in self.projector_list:
if port == item.link.port:
keep_port = True
if keep_port:
log.warn('UDP listener for port {port} needed for other projectors - skipping delete'.format(port=port))
return
Registry().execute('udp_broadcast_remove', port=port)
del self.pjlink_udp[port]
log.debug('UDP listener for port {port} deleted'.format(port=port))
def get_settings(self): def get_settings(self):
""" """
Retrieve the saved settings Retrieve the saved settings
@ -518,25 +544,22 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
except (AttributeError, TypeError): except (AttributeError, TypeError):
pass pass
try: try:
projector.poll_timer.stop() projector.link.poll_timer.stop()
projector.poll_timer.timeout.disconnect(projector.link.poll_loop) projector.link.poll_timer.timeout.disconnect(projector.link.poll_loop)
except (AttributeError, TypeError): except (AttributeError, TypeError):
pass pass
try: try:
projector.socket_timer.stop() projector.link.socket_timer.stop()
projector.socket_timer.timeout.disconnect(projector.link.socket_abort) projector.link.socket_timer.timeout.disconnect(projector.link.socket_abort)
except (AttributeError, TypeError):
pass
# Disconnect signals from projector being deleted
try:
self.pjlink_udp[projector.link.port].data_received.disconnect(projector.link.get_buffer)
except (AttributeError, TypeError): except (AttributeError, TypeError):
pass pass
old_port = projector.link.port
# Rebuild projector list # Rebuild projector list
new_list = [] new_list = []
for item in self.projector_list: for item in self.projector_list:
if item.link.db_item.id == projector.link.db_item.id: if item.link.db_item.id == projector.link.db_item.id:
log.debug('Removing projector "{item}"'.format(item=item.link.name))
continue continue
new_list.append(item) new_list.append(item)
self.projector_list = new_list self.projector_list = new_list
@ -546,6 +569,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
log.warning('Delete projector {item} failed'.format(item=projector.db_item)) log.warning('Delete projector {item} failed'.format(item=projector.db_item))
for item in self.projector_list: for item in self.projector_list:
log.debug('New projector list - item: {ip} {name}'.format(ip=item.link.ip, name=item.link.name)) log.debug('New projector list - item: {ip} {name}'.format(ip=item.link.ip, name=item.link.name))
self.udp_listen_delete(old_port)
def on_disconnect_projector(self, opt=None): def on_disconnect_projector(self, opt=None):
""" """
@ -748,15 +772,8 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
item.link.projectorAuthentication.connect(self.authentication_error) item.link.projectorAuthentication.connect(self.authentication_error)
item.link.projectorNoAuthentication.connect(self.no_authentication_error) item.link.projectorNoAuthentication.connect(self.no_authentication_error)
item.link.projectorUpdateIcons.connect(self.update_icons) item.link.projectorUpdateIcons.connect(self.update_icons)
# Connect UDP signal to projector instances with same port # Add UDP listener for new projector port
if item.link.port not in self.pjlink_udp: self.udp_listen_add(item.link.port)
log.debug('Adding new PJLinkUDP listener fo port {port}'.format(port=item.link.port))
self.pjlink_udp[item.link.port] = PJLinkUDP(port=item.link.port)
self.pjlink_udp[item.link.port].bind(item.link.port)
log.debug('Connecting PJLinkUDP port {port} signal to "{item}"'.format(port=item.link.port,
item=item.link.name))
self.pjlink_udp[item.link.port].data_received.connect(item.link.get_buffer)
self.projector_list.append(item) self.projector_list.append(item)
if start: if start:
item.link.connect_to_host() item.link.connect_to_host()
@ -783,13 +800,25 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
:param projector: Projector() instance of projector with updated information :param projector: Projector() instance of projector with updated information
""" """
log.debug('edit_projector_from_wizard(ip={ip})'.format(ip=projector.ip)) log.debug('edit_projector_from_wizard(ip={ip})'.format(ip=projector.ip))
old_port = self.old_projector.link.port
old_ip = self.old_projector.link.ip
self.old_projector.link.name = projector.name self.old_projector.link.name = projector.name
self.old_projector.link.ip = projector.ip self.old_projector.link.ip = projector.ip
self.old_projector.link.pin = None if projector.pin == '' else projector.pin self.old_projector.link.pin = None if projector.pin == '' else projector.pin
self.old_projector.link.port = projector.port
self.old_projector.link.location = projector.location self.old_projector.link.location = projector.location
self.old_projector.link.notes = projector.notes self.old_projector.link.notes = projector.notes
self.old_projector.widget.setText(projector.name) self.old_projector.widget.setText(projector.name)
self.old_projector.link.port = int(projector.port)
# Update projector list items
for item in self.projector_list:
if item.link.ip == old_ip:
item.link.port = int(projector.port)
# NOTE: This assumes (!) we are using IP addresses as keys
break
# Update UDP listeners before setting old_projector.port
if old_port != projector.port:
self.udp_listen_delete(old_port)
self.udp_listen_add(int(projector.port))
def _load_projectors(self): def _load_projectors(self):
"""' """'

View File

@ -98,32 +98,51 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
self.search_active = False self.search_active = False
self.search_time = 30000 # 30 seconds for allowed time self.search_time = 30000 # 30 seconds for allowed time
self.search_timer = QtCore.QTimer() self.search_timer = QtCore.QTimer()
self.udp_broadcast_listen_setting = False
log.debug('(UDP:{port}) PJLinkUDP() Initialized'.format(port=self.port))
if Settings().value('projector/udp broadcast listen'):
self.udp_start()
def udp_start(self):
"""
Start listening on UDP port
"""
log.debug('(UDP:{port}) Start called'.format(port=self.port))
self.readyRead.connect(self.get_datagram) self.readyRead.connect(self.get_datagram)
log.debug('(UDP) PJLinkUDP() Initialized for port {port}'.format(port=self.port)) self.check_settings(checked=Settings().value('projector/udp broadcast listen'))
def udp_stop(self):
"""
Stop listening on UDP port
"""
log.debug('(UDP:{port}) Stopping listener'.format(port=self.port))
self.close()
self.readyRead.disconnect(self.get_datagram)
@QtCore.pyqtSlot() @QtCore.pyqtSlot()
def get_datagram(self): def get_datagram(self):
""" """
Retrieve packet and basic checks Retrieve packet and basic checks
""" """
log.debug('(UDP) get_datagram() - Receiving data') log.debug('(UDP:{port}) get_datagram() - Receiving data'.format(port=self.port))
read_size = self.pendingDatagramSize() read_size = self.pendingDatagramSize()
if -1 == read_size: if -1 == read_size:
log.warning('(UDP) No data (-1)') log.warning('(UDP:{port}) No data (-1)'.format(port=self.port))
return return
elif 0 == read_size: elif 0 == read_size:
log.warning('(UDP) get_datagram() called when pending data size is 0') log.warning('(UDP:{port}) get_datagram() called when pending data size is 0'.format(port=self.port))
return return
elif read_size > PJLINK_MAX_PACKET: elif read_size > PJLINK_MAX_PACKET:
log.warning('(UDP) UDP Packet too large ({size} bytes)- ignoring'.format(size=read_size)) log.warning('(UDP:{port}) UDP Packet too large ({size} bytes)- ignoring'.format(size=read_size,
port=self.port))
return return
data_in, peer_host, peer_port = self.readDatagram(read_size) data_in, peer_host, peer_port = self.readDatagram(read_size)
data = data_in.decode('utf-8') if isinstance(data_in, bytes) else data_in data = data_in.decode('utf-8') if isinstance(data_in, bytes) else data_in
log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data), log.debug('(UDP:{port}) {size} bytes received from {adx}'.format(size=len(data),
adx=peer_host.toString(), adx=peer_host.toString(),
port=self.port)) port=self.port))
log.debug('(UDP) packet "{data}"'.format(data=data)) log.debug('(UDP:{port}) packet "{data}"'.format(data=data, port=self.port))
log.debug('(UDP) Sending data_received signal to projectors') log.debug('(UDP:{port}) Sending data_received signal to projectors'.format(port=self.port))
self.data_received.emit(peer_host, self.localPort(), data) self.data_received.emit(peer_host, self.localPort(), data)
return return
@ -143,6 +162,25 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
self.search_active = False self.search_active = False
self.search_timer.stop() self.search_timer.stop()
def check_settings(self, checked):
"""
Update UDP listening state based on settings change.
NOTE: This method is called by projector settings tab and setup/removed by ProjectorManager
"""
if self.udp_broadcast_listen_setting == checked:
log.debug('(UDP:{port}) No change to status - skipping'.format(port=self.port))
return
self.udp_broadcast_listen_setting = checked
if self.udp_broadcast_listen_setting:
if self.state() == self.ListeningState:
log.debug('(UDP:{port}) Already listening - skipping')
return
self.bind(self.port)
log.debug('(UDP:{port}) Listening'.format(port=self.port))
else:
# Close socket
self.udp_stop()
class PJLinkCommands(object): class PJLinkCommands(object):
""" """

View File

@ -27,6 +27,7 @@ import logging
from PyQt5 import QtWidgets from PyQt5 import QtWidgets
from openlp.core.common.i18n import UiStrings, translate from openlp.core.common.i18n import UiStrings, translate
from openlp.core.common.registry import Registry
from openlp.core.common.settings import Settings from openlp.core.common.settings import Settings
from openlp.core.lib.settingstab import SettingsTab from openlp.core.lib.settingstab import SettingsTab
from openlp.core.ui.icons import UiIcons from openlp.core.ui.icons import UiIcons
@ -47,8 +48,11 @@ class ProjectorTab(SettingsTab):
:param parent: Parent widget :param parent: Parent widget
""" """
self.icon_path = UiIcons().projector self.icon_path = UiIcons().projector
self.udp_listeners = {} # Key on port number
projector_translated = translate('OpenLP.ProjectorTab', 'Projector') projector_translated = translate('OpenLP.ProjectorTab', 'Projector')
super(ProjectorTab, self).__init__(parent, 'Projector', projector_translated) super(ProjectorTab, self).__init__(parent, 'Projector', projector_translated)
Registry().register_function('udp_broadcast_add', self.add_udp_listener)
Registry().register_function('udp_broadcast_remove', self.remove_udp_listener)
def setupUi(self): def setupUi(self):
""" """
@ -90,6 +94,10 @@ class ProjectorTab(SettingsTab):
self.connect_box_layout.addRow(self.dialog_type_label, self.dialog_type_combo_box) self.connect_box_layout.addRow(self.dialog_type_label, self.dialog_type_combo_box)
self.left_layout.addStretch() self.left_layout.addStretch()
self.dialog_type_combo_box.activated.connect(self.on_dialog_type_combo_box_changed) self.dialog_type_combo_box.activated.connect(self.on_dialog_type_combo_box_changed)
# Enable/disable listening on UDP ports for PJLink2 broadcasts
self.udp_broadcast_listen = QtWidgets.QCheckBox(self.connect_box)
self.udp_broadcast_listen.setObjectName('udp_broadcast_listen')
self.connect_box_layout.addRow(self.udp_broadcast_listen)
# Connect on LKUP packet received (PJLink v2+ only) # Connect on LKUP packet received (PJLink v2+ only)
self.connect_on_linkup = QtWidgets.QCheckBox(self.connect_box) self.connect_on_linkup = QtWidgets.QCheckBox(self.connect_box)
self.connect_on_linkup.setObjectName('connect_on_linkup') self.connect_on_linkup.setObjectName('connect_on_linkup')
@ -116,6 +124,9 @@ class ProjectorTab(SettingsTab):
translate('OpenLP.ProjectorTab', 'Single dialog box')) translate('OpenLP.ProjectorTab', 'Single dialog box'))
self.connect_on_linkup.setText( self.connect_on_linkup.setText(
translate('OpenLP.ProjectorTab', 'Connect to projector when LINKUP received (v2 only)')) translate('OpenLP.ProjectorTab', 'Connect to projector when LINKUP received (v2 only)'))
self.udp_broadcast_listen.setText(
translate('OpenLP.ProjectorTab', 'Enable listening for PJLink2 broadcast messages'))
log.debug('PJLink settings tab initialized')
def load(self): def load(self):
""" """
@ -125,6 +136,7 @@ class ProjectorTab(SettingsTab):
self.socket_timeout_spin_box.setValue(Settings().value('projector/socket timeout')) self.socket_timeout_spin_box.setValue(Settings().value('projector/socket timeout'))
self.socket_poll_spin_box.setValue(Settings().value('projector/poll time')) self.socket_poll_spin_box.setValue(Settings().value('projector/poll time'))
self.dialog_type_combo_box.setCurrentIndex(Settings().value('projector/source dialog type')) self.dialog_type_combo_box.setCurrentIndex(Settings().value('projector/source dialog type'))
self.udp_broadcast_listen.setChecked(Settings().value('projector/udp broadcast listen'))
self.connect_on_linkup.setChecked(Settings().value('projector/connect when LKUP received')) self.connect_on_linkup.setChecked(Settings().value('projector/connect when LKUP received'))
def save(self): def save(self):
@ -136,6 +148,41 @@ class ProjectorTab(SettingsTab):
Settings().setValue('projector/poll time', self.socket_poll_spin_box.value()) Settings().setValue('projector/poll time', self.socket_poll_spin_box.value())
Settings().setValue('projector/source dialog type', self.dialog_type_combo_box.currentIndex()) Settings().setValue('projector/source dialog type', self.dialog_type_combo_box.currentIndex())
Settings().setValue('projector/connect when LKUP received', self.connect_on_linkup.isChecked()) Settings().setValue('projector/connect when LKUP received', self.connect_on_linkup.isChecked())
Settings().setValue('projector/udp broadcast listen', self.udp_broadcast_listen.isChecked())
self.call_udp_listener()
def on_dialog_type_combo_box_changed(self): def on_dialog_type_combo_box_changed(self):
self.dialog_type = self.dialog_type_combo_box.currentIndex() self.dialog_type = self.dialog_type_combo_box.currentIndex()
def add_udp_listener(self, port, callback):
"""
Add new UDP listener to list
"""
if port in self.udp_listeners:
log.warning('Port {port} already in list - not adding'.format(port=port))
return
self.udp_listeners[port] = callback
log.debug('PJLinkSettings: new callback list: {port}'.format(port=self.udp_listeners.keys()))
def remove_udp_listener(self, port):
"""
Remove UDP listener from list
"""
if port not in self.udp_listeners:
log.warning('Port {port} not in list - ignoring'.format(port=port))
return
# Turn off listener before deleting
self.udp_listeners[port](checked=False)
del self.udp_listeners[port]
log.debug('PJLinkSettings: new callback list: {port}'.format(port=self.udp_listeners.keys()))
def call_udp_listener(self):
"""
Call listeners to update UDP listen setting
"""
if len(self.udp_listeners) < 1:
log.warning('PJLinkSettings: No callers - returning')
return
log.debug('PJLinkSettings: Calling UDP listeners')
for call in self.udp_listeners:
self.udp_listeners[call](checked=self.udp_broadcast_listen.isChecked())

View File

@ -1,78 +0,0 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2017 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
Functional tests to test calls for network interfaces.
"""
from unittest import TestCase
from unittest.mock import MagicMock, call, patch
import openlp.core.common
from openlp.core.common import get_local_ip4
from tests.helpers.testmixin import TestMixin
lo_address_attrs = {'isValid.return_value': True,
'flags.return_value': True,
'InterfaceFlags.return_value': True,
'name.return_value': 'lo',
'broadcast.toString.return_value': '127.0.0.255',
'netmask.toString.return_value': '255.0.0.0',
'prefixLength.return_value': 8,
'ip.protocol.return_value': True}
class TestInterfaces(TestCase, TestMixin):
"""
A test suite to test out functions/methods that use network interface(s).
"""
def setUp(self):
"""
Create an instance and a few example actions.
"""
self.build_settings()
self.ip4_lo_address = MagicMock()
self.ip4_lo_address.configure_mock(**lo_address_attrs)
def tearDown(self):
"""
Clean up
"""
self.destroy_settings()
@patch.object(openlp.core.common, 'log')
def test_ip4_no_interfaces(self, mock_log):
"""
Test no interfaces available
"""
# GIVEN: Test environment
call_warning = [call('No active IPv4 network interfaces detected')]
with patch('openlp.core.common.QNetworkInterface') as mock_newtork_interface:
mock_newtork_interface.allInterfaces.return_value = []
# WHEN: get_local_ip4 is called
ifaces = get_local_ip4()
# THEN: There should not be any interfaces detected
assert not ifaces, 'There should have been no active interfaces'
mock_log.warning.assert_has_calls(call_warning)

View File

@ -0,0 +1,224 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2017 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
Functional tests to test calls for network interfaces.
"""
from unittest import TestCase
from unittest.mock import call, patch
from PyQt5.QtCore import QObject
from PyQt5.QtNetwork import QHostAddress, QNetworkAddressEntry, QNetworkInterface
import openlp.core.common
from openlp.core.common import get_local_ip4
from tests.helpers.testmixin import TestMixin
class FakeIP4InterfaceEntry(QObject):
"""
Class to face an interface for testing purposes
"""
def __init__(self, name='lo'):
self.my_name = name
if name in ['localhost', 'lo']:
self.my_ip = QNetworkAddressEntry()
self.my_ip.setBroadcast(QHostAddress('255.0.0.0'))
self.my_ip.setIp(QHostAddress('127.0.0.2'))
self.my_ip.setPrefixLength(8)
self.fake_data = {'lo': {'ip': '127.0.0.2',
'broadcast': '255.0.0.0',
'netmask': '255.0.0.0',
'prefix': 8,
'localnet': '127.0.0.0'}}
else:
# Define a fake real address
self.my_ip = QNetworkAddressEntry()
self.my_ip.setBroadcast(QHostAddress('255.255.255.0'))
self.my_ip.setIp(QHostAddress('127.254.0.2'))
self.my_ip.setPrefixLength(24)
self.fake_data = {self.my_name: {'ip': '127.254.0.2',
'broadcast': '255.255.255.0',
'netmask': '255.255.255.0',
'prefix': 24,
'localnet': '127.254.0.0'}}
def addressEntries(self):
"""
Return fake IP address
"""
return [self.my_ip]
def flags(self):
"""
Return a QFlags enum with IsUp and IsRunning
"""
return (QNetworkInterface.IsUp | QNetworkInterface.IsRunning)
def name(self):
return self.my_name
def isValid(self):
return True
class TestInterfaces(TestCase, TestMixin):
"""
A test suite to test out functions/methods that use network interface(s).
"""
def setUp(self):
"""
Create an instance and a few example actions.
"""
self.build_settings()
if not hasattr(self, 'fake_lo'):
# Since these shouldn't change, only need to instantiate them the first time
self.fake_lo = FakeIP4InterfaceEntry()
self.fake_localhost = FakeIP4InterfaceEntry(name='localhost')
self.fake_address = FakeIP4InterfaceEntry(name='eth25')
def tearDown(self):
"""
Clean up
"""
self.destroy_settings()
@patch.object(openlp.core.common, 'log')
def test_ip4_no_interfaces(self, mock_log):
"""
Test no interfaces available
"""
# GIVEN: Test environment
call_debug = [call('Getting local IPv4 interface(es) information')]
call_warning = [call('No active IPv4 network interfaces detected')]
# WHEN: get_local_ip4 is called
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
mock_network_interface.allInterfaces.return_value = []
ifaces = get_local_ip4()
# THEN: There should not be any interfaces detected
mock_log.debug.assert_has_calls(call_debug)
mock_log.warning.assert_has_calls(call_warning)
assert not ifaces, 'There should have been no active interfaces listed'
@patch.object(openlp.core.common, 'log')
def test_ip4_lo(self, mock_log):
"""
Test get_local_ip4 returns proper dictionary with 'lo'
"""
# GIVEN: Test environment
call_debug = [call('Getting local IPv4 interface(es) information'),
call('Checking for isValid and flags == IsUP | IsRunning'),
call('Checking address(es) protocol'),
call('Checking for protocol == IPv4Protocol'),
call('Getting interface information'),
call('Adding lo to active list')]
call_warning = [call('No active IPv4 interfaces found except localhost')]
# WHEN: get_local_ip4 is called
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
mock_network_interface.allInterfaces.return_value = [self.fake_lo]
ifaces = get_local_ip4()
# THEN: There should be a fake 'lo' interface
mock_log.debug.assert_has_calls(call_debug)
mock_log.warning.assert_has_calls(call_warning)
assert ifaces == self.fake_lo.fake_data, "There should have been an 'lo' interface listed"
@patch.object(openlp.core.common, 'log')
def test_ip4_localhost(self, mock_log):
"""
Test get_local_ip4 returns proper dictionary with 'lo' if interface is 'localhost'
"""
# GIVEN: Test environment
call_debug = [call('Getting local IPv4 interface(es) information'),
call('Checking for isValid and flags == IsUP | IsRunning'),
call('Checking address(es) protocol'),
call('Checking for protocol == IPv4Protocol'),
call('Getting interface information'),
call('Adding localhost to active list'),
call('Renaming windows localhost to lo')]
call_warning = [call('No active IPv4 interfaces found except localhost')]
# WHEN: get_local_ip4 is called
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
mock_network_interface.allInterfaces.return_value = [self.fake_localhost]
ifaces = get_local_ip4()
# THEN: There should be a fake 'lo' interface
mock_log.debug.assert_has_calls(call_debug)
mock_log.warning.assert_has_calls(call_warning)
assert ifaces == self.fake_lo.fake_data, "There should have been an 'lo' interface listed"
@patch.object(openlp.core.common, 'log')
def test_ip4_eth25(self, mock_log):
"""
Test get_local_ip4 returns proper dictionary with 'eth25'
"""
# GIVEN: Test environment
call_debug = [call('Getting local IPv4 interface(es) information'),
call('Checking for isValid and flags == IsUP | IsRunning'),
call('Checking address(es) protocol'),
call('Checking for protocol == IPv4Protocol'),
call('Getting interface information'),
call('Adding eth25 to active list')]
call_warning = []
# WHEN: get_local_ip4 is called
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
mock_network_interface.allInterfaces.return_value = [self.fake_address]
ifaces = get_local_ip4()
# THEN: There should be a fake 'eth25' interface
mock_log.debug.assert_has_calls(call_debug)
mock_log.warning.assert_has_calls(call_warning)
assert ifaces == self.fake_address.fake_data
@patch.object(openlp.core.common, 'log')
def test_ip4_lo_eth25(self, mock_log):
"""
Test get_local_ip4 returns proper dictionary with 'eth25'
"""
# GIVEN: Test environment
call_debug = [call('Getting local IPv4 interface(es) information'),
call('Checking for isValid and flags == IsUP | IsRunning'),
call('Checking address(es) protocol'),
call('Checking for protocol == IPv4Protocol'),
call('Getting interface information'),
call('Adding lo to active list'),
call('Checking for isValid and flags == IsUP | IsRunning'),
call('Checking address(es) protocol'),
call('Checking for protocol == IPv4Protocol'),
call('Getting interface information'),
call('Adding eth25 to active list'),
call('Found at least one IPv4 interface, removing localhost')]
call_warning = []
# WHEN: get_local_ip4 is called
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
mock_network_interface.allInterfaces.return_value = [self.fake_lo, self.fake_address]
ifaces = get_local_ip4()
# THEN: There should be a fake 'eth25' interface
mock_log.debug.assert_has_calls(call_debug)
mock_log.warning.assert_has_calls(call_warning)
assert ifaces == self.fake_address.fake_data, "There should have been only 'eth25' interface listed"

View File

@ -4,7 +4,7 @@
############################################################################### ###############################################################################
# OpenLP - Open Source Lyrics Projection # # OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- # # --------------------------------------------------------------------------- #
# Copyright (c) 2008-2017 OpenLP Developers # # Copyright (c) 2008-2018 OpenLP Developers #
# --------------------------------------------------------------------------- # # --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it # # This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free # # under the terms of the GNU General Public License as published by the Free #
@ -20,5 +20,5 @@
# Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Temple Place, Suite 330, Boston, MA 02111-1307 USA #
############################################################################### ###############################################################################
""" """
Module-level functions for the projector test suite :mod tests/openlp_core/projectors: Tests for projector code
""" """

View File

@ -28,16 +28,45 @@ from unittest import TestCase
from unittest.mock import call, patch from unittest.mock import call, patch
import openlp.core.projectors.pjlink import openlp.core.projectors.pjlink
from openlp.core.common.registry import Registry
from openlp.core.projectors.constants import PJLINK_PORT from openlp.core.projectors.constants import PJLINK_PORT
from openlp.core.projectors.tab import ProjectorTab
from openlp.core.projectors.pjlink import PJLinkUDP from openlp.core.projectors.pjlink import PJLinkUDP
from tests.helpers.testmixin import TestMixin
from tests.resources.projector.data import TEST1_DATA from tests.resources.projector.data import TEST1_DATA
class TestPJLinkBase(TestCase): class TestPJLinkBase(TestCase, TestMixin):
""" """
Tests for the PJLinkUDP class Tests for the PJLinkUDP class
""" """
def setUp(self):
"""
Create the UI and setup necessary options
"""
self.setup_application()
self.build_settings()
Registry.create()
"""
with patch('openlp.core.projectors.db.init_url') as mocked_init_url:
if os.path.exists(TEST_DB):
os.unlink(TEST_DB)
mocked_init_url.return_value = 'sqlite:///%s' % TEST_DB
self.projectordb = ProjectorDB()
if not hasattr(self, 'projector_manager'):
self.projector_manager = ProjectorManager(projectordb=self.projectordb)
"""
def tearDown(self):
"""
Remove test database.
Delete all the C++ objects at the end so that we don't have a segfault.
"""
# self.projectordb.session.close()
self.destroy_settings()
# del self.projector_manager
@patch.object(openlp.core.projectors.pjlink, 'log') @patch.object(openlp.core.projectors.pjlink, 'log')
def test_get_datagram_data_negative_zero_length(self, mock_log): def test_get_datagram_data_negative_zero_length(self, mock_log):
""" """
@ -45,9 +74,9 @@ class TestPJLinkBase(TestCase):
""" """
# GIVEN: Test setup # GIVEN: Test setup
pjlink_udp = PJLinkUDP() pjlink_udp = PJLinkUDP()
log_warning_calls = [call('(UDP) No data (-1)')] log_warning_calls = [call('(UDP:4352) No data (-1)')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized for port 4352'), log_debug_calls = [call('(UDP:4352) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')] call('(UDP:4352) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read: patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = -1 mock_datagram.return_value = -1
@ -67,9 +96,9 @@ class TestPJLinkBase(TestCase):
""" """
# GIVEN: Test setup # GIVEN: Test setup
pjlink_udp = PJLinkUDP() pjlink_udp = PJLinkUDP()
log_warning_calls = [call('(UDP) get_datagram() called when pending data size is 0')] log_warning_calls = [call('(UDP:4352) get_datagram() called when pending data size is 0')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized for port 4352'), log_debug_calls = [call('(UDP:4352) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')] call('(UDP:4352) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
patch.object(pjlink_udp, 'readDatagram') as mock_read: patch.object(pjlink_udp, 'readDatagram') as mock_read:
mock_datagram.return_value = 0 mock_datagram.return_value = 0
@ -89,9 +118,9 @@ class TestPJLinkBase(TestCase):
""" """
# GIVEN: Test setup # GIVEN: Test setup
pjlink_udp = PJLinkUDP() pjlink_udp = PJLinkUDP()
log_warning_calls = [call('(UDP) get_datagram() called when pending data size is 0')] log_warning_calls = [call('(UDP:4352) get_datagram() called when pending data size is 0')]
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized for port 4352'), log_debug_calls = [call('(UDP:4352) PJLinkUDP() Initialized'),
call('(UDP) get_datagram() - Receiving data')] call('(UDP:4352) get_datagram() - Receiving data')]
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram: with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram:
mock_datagram.return_value = 0 mock_datagram.return_value = 0
@ -101,3 +130,147 @@ class TestPJLinkBase(TestCase):
# THEN: Log entries should be made and method returns # THEN: Log entries should be made and method returns
mock_log.warning.assert_has_calls(log_warning_calls) mock_log.warning.assert_has_calls(log_warning_calls)
mock_log.debug.assert_has_calls(log_debug_calls) mock_log.debug.assert_has_calls(log_debug_calls)
@patch.object(openlp.core.projectors.tab, 'log')
def test_pjlinksettings_add_udp_listener(self, mock_log):
"""
Test adding UDP listners to PJLink Settings tab
"""
# GIVEN: Initial setup
log_debug_calls = [call('PJLink settings tab initialized'),
call('PJLinkSettings: new callback list: dict_keys([4352])')]
log_warning_calls = []
pjlink_udp = PJLinkUDP()
settings_tab = ProjectorTab(parent=None)
# WHEN: add_udp_listener is called with single port
settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings)
# THEN: settings tab should have one entry
assert len(settings_tab.udp_listeners) == 1
assert pjlink_udp.port in settings_tab.udp_listeners
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warning.assert_has_calls(log_warning_calls)
@patch.object(openlp.core.projectors.tab, 'log')
def test_pjlinksettings_add_udp_listener_multiple_same(self, mock_log):
"""
Test adding second UDP listner with same port to PJLink Settings tab
"""
# GIVEN: Initial setup
log_debug_calls = [call('PJLink settings tab initialized'),
call('PJLinkSettings: new callback list: dict_keys([4352])')]
log_warning_calls = [call('Port 4352 already in list - not adding')]
pjlink_udp = PJLinkUDP()
settings_tab = ProjectorTab(parent=None)
settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings)
# WHEN: add_udp_listener is called with second instance same port
settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings)
# THEN: settings tab should have one entry
assert len(settings_tab.udp_listeners) == 1
assert pjlink_udp.port in settings_tab.udp_listeners
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warning.assert_has_calls(log_warning_calls)
@patch.object(openlp.core.projectors.tab, 'log')
def test_pjlinksettings_add_udp_listener_multiple_different(self, mock_log):
"""
Test adding second UDP listner with different port to PJLink Settings tab
"""
# GIVEN: Initial setup
log_debug_calls = [call('PJLink settings tab initialized'),
call('PJLinkSettings: new callback list: dict_keys([4352])')]
log_warning_calls = []
settings_tab = ProjectorTab(parent=None)
pjlink_udp1 = PJLinkUDP(port=4352)
settings_tab.add_udp_listener(port=pjlink_udp1.port, callback=pjlink_udp1.check_settings)
# WHEN: add_udp_listener is called with second instance different port
pjlink_udp2 = PJLinkUDP(port=4353)
settings_tab.add_udp_listener(port=pjlink_udp2.port, callback=pjlink_udp2.check_settings)
# THEN: settings tab should have two entry
assert len(settings_tab.udp_listeners) == 2
assert pjlink_udp1.port in settings_tab.udp_listeners
assert pjlink_udp2.port in settings_tab.udp_listeners
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warning.assert_has_calls(log_warning_calls)
@patch.object(openlp.core.projectors.tab, 'log')
def test_pjlinksettings_remove_udp_listener(self, mock_log):
"""
Test removing UDP listners to PJLink Settings tab
"""
# GIVEN: Initial setup
log_debug_calls = [call('PJLink settings tab initialized'),
call('PJLinkSettings: new callback list: dict_keys([4352])'),
call('PJLinkSettings: new callback list: dict_keys([])')]
log_warning_calls = []
pjlink_udp = PJLinkUDP()
settings_tab = ProjectorTab(parent=None)
settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings)
# WHEN: remove_udp_listener is called with single port
settings_tab.remove_udp_listener(port=pjlink_udp.port)
# THEN: settings tab should have one entry
assert len(settings_tab.udp_listeners) == 0
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warning.assert_has_calls(log_warning_calls)
@patch.object(openlp.core.projectors.tab, 'log')
def test_pjlinksettings_remove_udp_listener_multiple_different(self, mock_log):
"""
Test adding second UDP listner with different port to PJLink Settings tab
"""
# GIVEN: Initial setup
log_debug_calls = [call('PJLink settings tab initialized'),
call('PJLinkSettings: new callback list: dict_keys([4352])')]
log_warning_calls = []
settings_tab = ProjectorTab(parent=None)
pjlink_udp1 = PJLinkUDP(port=4352)
settings_tab.add_udp_listener(port=pjlink_udp1.port, callback=pjlink_udp1.check_settings)
pjlink_udp2 = PJLinkUDP(port=4353)
settings_tab.add_udp_listener(port=pjlink_udp2.port, callback=pjlink_udp2.check_settings)
# WHEN: remove_udp_listener called for one port
settings_tab.remove_udp_listener(port=4353)
# THEN: settings tab should have one entry
assert len(settings_tab.udp_listeners) == 1
assert pjlink_udp1.port in settings_tab.udp_listeners
assert pjlink_udp2.port not in settings_tab.udp_listeners
mock_log.debug.assert_has_calls(log_debug_calls)
mock_log.warning.assert_has_calls(log_warning_calls)
@patch.object(PJLinkUDP, 'check_settings')
@patch.object(openlp.core.projectors.pjlink, 'log')
@patch.object(openlp.core.projectors.tab, 'log')
def test_pjlinksettings_call_udp_listener(self, mock_tab_log, mock_pjlink_log, mock_check_settings):
"""
Test calling UDP listners in PJLink Settings tab
"""
# GIVEN: Initial setup
tab_debug_calls = [call('PJLink settings tab initialized'),
call('PJLinkSettings: new callback list: dict_keys([4352])'),
call('PJLinkSettings: Calling UDP listeners')]
pjlink_debug_calls = [call.debug('(UDP:4352) PJLinkUDP() Initialized')]
pjlink_udp = PJLinkUDP()
settings_tab = ProjectorTab(parent=None)
settings_tab.add_udp_listener(port=pjlink_udp.port, callback=pjlink_udp.check_settings)
# WHEN: calling UDP listener via registry
settings_tab.call_udp_listener()
# THEN: settings tab should have one entry
assert len(settings_tab.udp_listeners) == 1
mock_check_settings.assert_called()
mock_tab_log.debug.assert_has_calls(tab_debug_calls)
mock_pjlink_log.assert_has_calls(pjlink_debug_calls)