forked from openlp/openlp
- manager: Remove unused signal disconnect projectorNetwork.disconnect()
- Change PJLinkUDP.pjlink_udp_commands to dict with link to processing methods - Add test_projector_pjlink_udp.test_process_ackn_duplicate - Add test_projector_pjlink_udp.test_process_ackn_multiple - Add test_projector_pjlink_udp.test_process_ackn_single - Add test_projector_pjlink_udp.test_process_srch - Add PJLinkUDP.get_datagram method - Add PJLinkUDP._trash_udp_buffer method - Add PJLinkUDP.process_ackn method - Add ... bzr-revno: 2813
This commit is contained in:
commit
8e5fb217d3
@ -24,6 +24,7 @@ The :mod:`~openlp.core.api.tab` module contains the settings tab for the API
|
||||
"""
|
||||
from PyQt5 import QtCore, QtGui, QtNetwork, QtWidgets
|
||||
|
||||
from openlp.core.common import get_local_ip4
|
||||
from openlp.core.common.i18n import UiStrings, translate
|
||||
from openlp.core.common.registry import Registry
|
||||
from openlp.core.common.settings import Settings
|
||||
@ -219,17 +220,12 @@ class ApiTab(SettingsTab):
|
||||
else: return ip_address
|
||||
"""
|
||||
if ip_address == ZERO_URL:
|
||||
interfaces = QtNetwork.QNetworkInterface.allInterfaces()
|
||||
for interface in interfaces:
|
||||
if not interface.isValid():
|
||||
continue
|
||||
if not (interface.flags() & (QtNetwork.QNetworkInterface.IsUp | QtNetwork.QNetworkInterface.IsRunning)):
|
||||
continue
|
||||
for address in interface.addressEntries():
|
||||
ip = address.ip()
|
||||
if ip.protocol() == QtNetwork.QAbstractSocket.IPv4Protocol and \
|
||||
ip != QtNetwork.QHostAddress.LocalHost:
|
||||
return ip.toString()
|
||||
# In case we have more than one interface
|
||||
ifaces = get_local_ip4()
|
||||
for key in iter(ifaces):
|
||||
ip_address = ifaces.get(key)['ip']
|
||||
# We only want the first interface returned
|
||||
break
|
||||
return ip_address
|
||||
|
||||
def load(self):
|
||||
|
@ -36,6 +36,7 @@ from subprocess import check_output, CalledProcessError, STDOUT
|
||||
|
||||
from PyQt5 import QtGui
|
||||
from PyQt5.QtCore import QCryptographicHash as QHash
|
||||
from PyQt5.QtNetwork import QAbstractSocket, QHostAddress, QNetworkInterface
|
||||
from chardet.universaldetector import UniversalDetector
|
||||
|
||||
log = logging.getLogger(__name__ + '.__init__')
|
||||
@ -52,6 +53,44 @@ NEW_LINE_REGEX = re.compile(r' ?(\r\n?|\n) ?')
|
||||
WHITESPACE_REGEX = re.compile(r'[ \t]+')
|
||||
|
||||
|
||||
def get_local_ip4():
|
||||
"""
|
||||
Creates a dictionary of local IPv4 interfaces on local machine.
|
||||
If no active interfaces available, returns a dict of localhost IPv4 information
|
||||
|
||||
:returns: Dict of interfaces
|
||||
"""
|
||||
# Get the local IPv4 active address(es) that are NOT localhost (lo or '127.0.0.1')
|
||||
log.debug('Getting local IPv4 interface(es) information')
|
||||
MY_IP4 = {}
|
||||
for iface in QNetworkInterface.allInterfaces():
|
||||
if not iface.isValid() or not (iface.flags() & (QNetworkInterface.IsUp | QNetworkInterface.IsRunning)):
|
||||
continue
|
||||
for address in iface.addressEntries():
|
||||
ip = address.ip()
|
||||
# NOTE: Next line will skip if interface is localhost - keep for now until we decide about it later
|
||||
# if (ip.protocol() == QAbstractSocket.IPv4Protocol) and (ip != QHostAddress.LocalHost):
|
||||
if (ip.protocol() == QAbstractSocket.IPv4Protocol):
|
||||
MY_IP4[iface.name()] = {'ip': ip.toString(),
|
||||
'broadcast': address.broadcast().toString(),
|
||||
'netmask': address.netmask().toString(),
|
||||
'prefix': address.prefixLength(),
|
||||
'localnet': QHostAddress(address.netmask().toIPv4Address() &
|
||||
ip.toIPv4Address()).toString()
|
||||
}
|
||||
log.debug('Adding {iface} to active list'.format(iface=iface.name()))
|
||||
if len(MY_IP4) == 1:
|
||||
if 'lo' in MY_IP4:
|
||||
# No active interfaces - so leave localhost in there
|
||||
log.warning('No active IPv4 interfaces found except localhost')
|
||||
else:
|
||||
# Since we have a valid IP4 interface, remove localhost
|
||||
log.debug('Found at least one IPv4 interface, removing localhost')
|
||||
MY_IP4.pop('lo')
|
||||
|
||||
return MY_IP4
|
||||
|
||||
|
||||
def trace_error_handler(logger):
|
||||
"""
|
||||
Log the calling path of an exception
|
||||
|
@ -308,7 +308,6 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
self.settings_section = 'projector'
|
||||
self.projectordb = projectordb
|
||||
self.projector_list = []
|
||||
self.pjlink_udp = PJLinkUDP(self.projector_list)
|
||||
self.source_select_form = None
|
||||
|
||||
def bootstrap_initialise(self):
|
||||
@ -323,6 +322,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
else:
|
||||
log.debug('Using existing ProjectorDB() instance')
|
||||
self.get_settings()
|
||||
self.pjlink_udp = PJLinkUDP(self.projector_list)
|
||||
|
||||
def bootstrap_post_set_up(self):
|
||||
"""
|
||||
@ -344,6 +344,7 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
"""
|
||||
Retrieve the saved settings
|
||||
"""
|
||||
log.debug('Updating ProjectorManager settings')
|
||||
settings = Settings()
|
||||
settings.beginGroup(self.settings_section)
|
||||
self.autostart = settings.value('connect on start')
|
||||
@ -501,10 +502,6 @@ class ProjectorManager(QtWidgets.QWidget, RegistryBase, UiProjectorManager, LogM
|
||||
ans = msg.exec()
|
||||
if ans == msg.Cancel:
|
||||
return
|
||||
try:
|
||||
projector.link.projectorNetwork.disconnect(self.update_status)
|
||||
except (AttributeError, TypeError):
|
||||
pass
|
||||
try:
|
||||
projector.link.changeStatus.disconnect(self.update_status)
|
||||
except (AttributeError, TypeError):
|
||||
|
@ -64,7 +64,7 @@ from openlp.core.projectors.constants import CONNECTION_ERRORS, PJLINK_CLASS, PJ
|
||||
log = logging.getLogger(__name__)
|
||||
log.debug('pjlink loaded')
|
||||
|
||||
__all__ = ['PJLink']
|
||||
__all__ = ['PJLink', 'PJLinkUDP']
|
||||
|
||||
# Shortcuts
|
||||
SocketError = QtNetwork.QAbstractSocket.SocketError
|
||||
@ -79,22 +79,145 @@ class PJLinkUDP(QtNetwork.QUdpSocket):
|
||||
"""
|
||||
Socket service for PJLink UDP socket.
|
||||
"""
|
||||
# New commands available in PJLink Class 2
|
||||
pjlink_udp_commands = [
|
||||
'ACKN', # Class 2 (cmd is SRCH)
|
||||
'ERST', # Class 1/2
|
||||
'INPT', # Class 1/2
|
||||
'LKUP', # Class 2 (reply only - no cmd)
|
||||
'POWR', # Class 1/2
|
||||
'SRCH' # Class 2 (reply is ACKN)
|
||||
]
|
||||
|
||||
def __init__(self, projector_list, port=PJLINK_PORT):
|
||||
"""
|
||||
Initialize socket
|
||||
Socket services for PJLink UDP packets.
|
||||
|
||||
Since all UDP packets from any projector will come into the same
|
||||
port, process UDP packets here then route to the appropriate
|
||||
projector instance as needed.
|
||||
"""
|
||||
# Keep track of currently defined projectors so we can route
|
||||
# inbound packets to the correct instance
|
||||
super().__init__()
|
||||
self.projector_list = projector_list
|
||||
self.port = port
|
||||
# Local defines
|
||||
self.ackn_list = {} # Replies from online projetors
|
||||
self.search_active = False
|
||||
self.search_time = 30000 # 30 seconds for allowed time
|
||||
self.search_timer = QtCore.QTimer()
|
||||
# New commands available in PJLink Class 2
|
||||
# ACKN/SRCH is processed here since it's used to find available projectors
|
||||
# Other commands are processed by the individual projector instances
|
||||
self.pjlink_udp_functions = {
|
||||
'ACKN': self.process_ackn, # Class 2, command is 'SRCH'
|
||||
'ERST': None, # Class 1/2
|
||||
'INPT': None, # Class 1/2
|
||||
'LKUP': None, # Class 2 (reply only - no cmd)
|
||||
'POWR': None, # Class 1/2
|
||||
'SRCH': self.process_srch # Class 2 (reply is ACKN)
|
||||
}
|
||||
|
||||
self.readyRead.connect(self.get_datagram)
|
||||
log.debug('(UDP) PJLinkUDP() Initialized')
|
||||
|
||||
@QtCore.pyqtSlot()
|
||||
def get_datagram(self):
|
||||
"""
|
||||
Retrieve packet and basic checks
|
||||
"""
|
||||
log.debug('(UDP) get_datagram() - Receiving data')
|
||||
read = self.pendingDatagramSize()
|
||||
if read < 0:
|
||||
log.warn('(UDP) No data (-1)')
|
||||
return
|
||||
if read < 1:
|
||||
log.warn('(UDP) get_datagram() called when pending data size is 0')
|
||||
return
|
||||
data, peer_address, peer_port = self.readDatagram(self.pendingDatagramSize())
|
||||
log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data),
|
||||
adx=peer_address,
|
||||
port=peer_port))
|
||||
log.debug('(UDP) packet "{data}"'.format(data=data))
|
||||
if len(data) < 0:
|
||||
log.warn('(UDP) No data (-1)')
|
||||
return
|
||||
elif len(data) < 8:
|
||||
# Minimum packet is '%2CCCC='
|
||||
log.warn('(UDP) Invalid packet - not enough data')
|
||||
return
|
||||
elif data is None:
|
||||
log.warn('(UDP) No data (None)')
|
||||
return
|
||||
elif len(data) > PJLINK_MAX_PACKET:
|
||||
log.warn('(UDP) Invalid packet - length too long')
|
||||
return
|
||||
elif not data.startswith(PJLINK_PREFIX):
|
||||
log.warn('(UDP) Invalid packet - does not start with PJLINK_PREFIX')
|
||||
return
|
||||
elif data[1] != '2':
|
||||
log.warn('(UDP) Invalid packet - missing/invalid PJLink class version')
|
||||
return
|
||||
elif data[6] != '=':
|
||||
log.warn('(UDP) Invalid packet - separator missing')
|
||||
return
|
||||
# First two characters are header information we don't need at this time
|
||||
cmd, data = data[2:].split('=')
|
||||
if cmd not in self.pjlink_udp_functions:
|
||||
log.warn('(UDP) Invalid packet - not a valid PJLink UDP reply')
|
||||
return
|
||||
if self.pjlink_udp_functions[cmd] is not None:
|
||||
log.debug('(UDP) Processing {cmd} with "{data}"'.format(cmd=cmd, data=data))
|
||||
return self.pjlink_udp_functions[cmd](data=data, host=peer_address, port=peer_port)
|
||||
else:
|
||||
log.debug('(UDP) Checking projector list for ip {host} to process'.format(host=peer_address))
|
||||
for projector in self.projector_list:
|
||||
if peer_address == projector.ip:
|
||||
if cmd not in projector.pjlink_functions:
|
||||
log.error('(UDP) Could not find method to process '
|
||||
'"{cmd}" in {host}'.format(cmd=cmd, host=projector.ip))
|
||||
return
|
||||
log.debug('(UDP) Calling "{cmd}" in {host}'.format(cmd=cmd, host=projector.ip))
|
||||
return projector.pjlink_functions[cmd](data=data)
|
||||
log.warn('(UDP) Could not find projector with ip {ip} to process packet'.format(ip=peer_address))
|
||||
return
|
||||
|
||||
def process_ackn(self, data, host, port):
|
||||
"""
|
||||
Process the ACKN command.
|
||||
|
||||
:param data: Data in packet
|
||||
:param host: IP address of sending host
|
||||
:param port: Port received on
|
||||
"""
|
||||
log.debug('(UDP) Processing ACKN packet')
|
||||
if host not in self.ackn_list:
|
||||
log.debug('(UDP) Adding {host} to ACKN list'.format(host=host))
|
||||
self.ackn_list[host] = {'data': data,
|
||||
'port': port}
|
||||
else:
|
||||
log.warn('(UDP) Host {host} already replied - ignoring'.format(host=host))
|
||||
|
||||
def process_srch(self, data, host, port):
|
||||
"""
|
||||
Process the SRCH command.
|
||||
|
||||
SRCH is processed by terminals so we ignore any packet.
|
||||
|
||||
:param data: Data in packet
|
||||
:param host: IP address of sending host
|
||||
:param port: Port received on
|
||||
"""
|
||||
log.debug('(UDP) SRCH packet received - ignoring')
|
||||
return
|
||||
|
||||
def search_start(self):
|
||||
"""
|
||||
Start search for projectors on local network
|
||||
"""
|
||||
self.search_active = True
|
||||
self.ackn_list = {}
|
||||
# TODO: Send SRCH packet here
|
||||
self.search_timer.singleShot(self.search_time, self.search_stop)
|
||||
|
||||
@QtCore.pyqtSlot()
|
||||
def search_stop(self):
|
||||
"""
|
||||
Stop search
|
||||
"""
|
||||
self.search_active = False
|
||||
self.search_timer.stop()
|
||||
|
||||
|
||||
class PJLinkCommands(object):
|
||||
@ -257,7 +380,8 @@ class PJLinkCommands(object):
|
||||
else:
|
||||
clss = data
|
||||
self.pjlink_class = clss
|
||||
log.debug('({ip}) Setting pjlink_class for this projector to "{data}"'.format(ip=self.entry.name,
|
||||
log.debug('({ip}) Setting pjlink_class for this projector '
|
||||
'to "{data}"'.format(ip=self.entry.name,
|
||||
data=self.pjlink_class))
|
||||
# Since we call this one on first connect, setup polling from here
|
||||
if not self.no_poll:
|
||||
@ -276,7 +400,8 @@ class PJLinkCommands(object):
|
||||
"""
|
||||
if len(data) != PJLINK_ERST_DATA['DATA_LENGTH']:
|
||||
count = PJLINK_ERST_DATA['DATA_LENGTH']
|
||||
log.warning('({ip}) Invalid error status response "{data}": length != {count}'.format(ip=self.entry.name,
|
||||
log.warning('({ip}) Invalid error status response "{data}": '
|
||||
'length != {count}'.format(ip=self.entry.name,
|
||||
data=data,
|
||||
count=count))
|
||||
return
|
||||
@ -557,7 +682,7 @@ class PJLinkCommands(object):
|
||||
|
||||
class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
|
||||
"""
|
||||
Socket service for PJLink TCP socket.
|
||||
Socket services for PJLink TCP packets.
|
||||
"""
|
||||
# Signals sent by this module
|
||||
changeStatus = QtCore.pyqtSignal(str, int, str)
|
||||
|
@ -29,6 +29,7 @@ from unittest.mock import patch
|
||||
from PyQt5 import QtWidgets
|
||||
|
||||
from openlp.core.api.tab import ApiTab
|
||||
from openlp.core.common import get_local_ip4
|
||||
from openlp.core.common.registry import Registry
|
||||
from openlp.core.common.settings import Settings
|
||||
from tests.helpers.testmixin import TestMixin
|
||||
@ -63,6 +64,7 @@ class TestApiTab(TestCase, TestMixin):
|
||||
Registry().create()
|
||||
Registry().set_flag('website_version', '00-00-0000')
|
||||
self.form = ApiTab(self.parent)
|
||||
self.my_ip4_list = get_local_ip4()
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
@ -76,11 +78,18 @@ class TestApiTab(TestCase, TestMixin):
|
||||
"""
|
||||
Test the get_ip_address function with ZERO_URL
|
||||
"""
|
||||
# GIVEN: list of local IP addresses for this machine
|
||||
ip4_list = []
|
||||
for ip4 in iter(self.my_ip4_list):
|
||||
ip4_list.append(self.my_ip4_list.get(ip4)['ip'])
|
||||
|
||||
# WHEN: the default ip address is given
|
||||
ip_address = self.form.get_ip_address(ZERO_URL)
|
||||
|
||||
# THEN: the default ip address will be returned
|
||||
assert re.match('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', ip_address), \
|
||||
'The return value should be a valid ip address'
|
||||
assert ip_address in ip4_list, 'The return address should be in the list of local IP addresses'
|
||||
|
||||
def test_get_ip_address_with_ip(self):
|
||||
"""
|
||||
@ -88,8 +97,10 @@ class TestApiTab(TestCase, TestMixin):
|
||||
"""
|
||||
# GIVEN: An ip address
|
||||
given_ip = '192.168.1.1'
|
||||
|
||||
# WHEN: the default ip address is given
|
||||
ip_address = self.form.get_ip_address(given_ip)
|
||||
|
||||
# THEN: the default ip address will be returned
|
||||
assert ip_address == given_ip, 'The return value should be %s' % given_ip
|
||||
|
||||
|
26
tests/openlp_core/__init__.py
Normal file
26
tests/openlp_core/__init__.py
Normal file
@ -0,0 +1,26 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
|
||||
|
||||
###############################################################################
|
||||
# OpenLP - Open Source Lyrics Projection #
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Copyright (c) 2008-2018 OpenLP Developers #
|
||||
# --------------------------------------------------------------------------- #
|
||||
# This program is free software; you can redistribute it and/or modify it #
|
||||
# under the terms of the GNU General Public License as published by the Free #
|
||||
# Software Foundation; version 2 of the License. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT #
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
|
||||
# more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License along #
|
||||
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
|
||||
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
|
||||
###############################################################################
|
||||
"""
|
||||
:mod: `tests.openlp_core` module
|
||||
|
||||
Tests modules/files for module openlp.core
|
||||
"""
|
@ -20,5 +20,5 @@
|
||||
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
|
||||
###############################################################################
|
||||
"""
|
||||
Module-level functions for the functional test suite
|
||||
Module-level functions for the projector test suite
|
||||
"""
|
360
tests/openlp_core/projectors/test_projector_pjlink_udp.py
Normal file
360
tests/openlp_core/projectors/test_projector_pjlink_udp.py
Normal file
@ -0,0 +1,360 @@
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
|
||||
|
||||
###############################################################################
|
||||
# OpenLP - Open Source Lyrics Projection #
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Copyright (c) 2008-2018 OpenLP Developers #
|
||||
# --------------------------------------------------------------------------- #
|
||||
# This program is free software; you can redistribute it and/or modify it #
|
||||
# under the terms of the GNU General Public License as published by the Free #
|
||||
# Software Foundation; version 2 of the License. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT #
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
|
||||
# more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License along #
|
||||
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
|
||||
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
|
||||
###############################################################################
|
||||
"""
|
||||
Package to test the PJLink UDP functions
|
||||
"""
|
||||
|
||||
from unittest import TestCase
|
||||
from unittest.mock import call, patch
|
||||
|
||||
import openlp.core.projectors.pjlink
|
||||
from openlp.core.projectors.constants import PJLINK_MAX_PACKET, PJLINK_PORT, PJLINK_PREFIX
|
||||
|
||||
from openlp.core.projectors.db import Projector
|
||||
from openlp.core.projectors.pjlink import PJLinkUDP
|
||||
from tests.resources.projector.data import TEST1_DATA, TEST2_DATA
|
||||
|
||||
|
||||
class TestPJLinkBase(TestCase):
|
||||
"""
|
||||
Tests for the PJLinkUDP class
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
Setup generic test conditions
|
||||
"""
|
||||
self.test_list = [Projector(**TEST1_DATA), Projector(**TEST2_DATA)]
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
Close generic test condidtions
|
||||
"""
|
||||
self.test_list = None
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_get_datagram_data_invalid_class(self, mock_log):
|
||||
"""
|
||||
Test get_datagram with invalid class number
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_warn_calls = [call('(UDP) Invalid packet - missing/invalid PJLink class version')]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) get_datagram() - Receiving data'),
|
||||
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
|
||||
call('(UDP) packet "%1ACKN=11:11:11:11:11:11"')]
|
||||
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
|
||||
patch.object(pjlink_udp, 'readDatagram') as mock_read:
|
||||
mock_datagram.return_value = 24
|
||||
mock_read.return_value = ('{prefix}1ACKN={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
|
||||
TEST1_DATA['ip'], PJLINK_PORT)
|
||||
|
||||
# WHEN: get_datagram called with 0 bytes ready
|
||||
pjlink_udp.get_datagram()
|
||||
|
||||
# THEN: Log entries should be made and method returns
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_get_datagram_data_invalid_command(self, mock_log):
|
||||
"""
|
||||
Test get_datagram with invalid PJLink UDP command
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_warn_calls = [call('(UDP) Invalid packet - not a valid PJLink UDP reply')]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) get_datagram() - Receiving data'),
|
||||
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
|
||||
call('(UDP) packet "%2DUMB=11:11:11:11:11:11"')]
|
||||
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
|
||||
patch.object(pjlink_udp, 'readDatagram') as mock_read:
|
||||
mock_datagram.return_value = 24
|
||||
mock_read.return_value = ('{prefix}2DUMB={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
|
||||
TEST1_DATA['ip'], PJLINK_PORT)
|
||||
|
||||
# WHEN: get_datagram called with 0 bytes ready
|
||||
pjlink_udp.get_datagram()
|
||||
|
||||
# THEN: Log entries should be made and method returns
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_get_datagram_data_invalid_prefix(self, mock_log):
|
||||
"""
|
||||
Test get_datagram when prefix != PJLINK_PREFIX
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_warn_calls = [call('(UDP) Invalid packet - does not start with PJLINK_PREFIX')]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) get_datagram() - Receiving data'),
|
||||
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
|
||||
call('(UDP) packet "$2ACKN=11:11:11:11:11:11"')]
|
||||
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
|
||||
patch.object(pjlink_udp, 'readDatagram') as mock_read:
|
||||
mock_datagram.return_value = 24
|
||||
mock_read.return_value = ('{prefix}2ACKN={mac}'.format(prefix='$', mac=TEST1_DATA['mac_adx']),
|
||||
TEST1_DATA['ip'], PJLINK_PORT)
|
||||
|
||||
# WHEN: get_datagram called with 0 bytes ready
|
||||
pjlink_udp.get_datagram()
|
||||
|
||||
# THEN: Log entries should be made and method returns
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_get_datagram_data_invalid_separator(self, mock_log):
|
||||
"""
|
||||
Test get_datagram when separator not equal to =
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_warn_calls = [call('(UDP) Invalid packet - separator missing')]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) get_datagram() - Receiving data'),
|
||||
call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
|
||||
call('(UDP) packet "%2ACKN 11:11:11:11:11:11"')]
|
||||
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
|
||||
patch.object(pjlink_udp, 'readDatagram') as mock_read:
|
||||
mock_datagram.return_value = 24
|
||||
mock_read.return_value = ('{prefix}2ACKN {mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
|
||||
TEST1_DATA['ip'], PJLINK_PORT)
|
||||
|
||||
# WHEN: get_datagram called with 0 bytes ready
|
||||
pjlink_udp.get_datagram()
|
||||
|
||||
# THEN: Log entries should be made and method returns
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_get_datagram_data_long(self, mock_log):
|
||||
"""
|
||||
Test get_datagram when datagram > PJLINK_MAX_PACKET
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_warn_calls = [call('(UDP) Invalid packet - length too long')]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) get_datagram() - Receiving data'),
|
||||
call('(UDP) 143 bytes received from 111.111.111.111 on port 4352'),
|
||||
call('(UDP) packet "%2ACKN={long}"'.format(long='X' * PJLINK_MAX_PACKET))]
|
||||
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
|
||||
patch.object(pjlink_udp, 'readDatagram') as mock_read:
|
||||
mock_datagram.return_value = PJLINK_MAX_PACKET + 7
|
||||
mock_read.return_value = ('{prefix}2ACKN={long}'.format(prefix=PJLINK_PREFIX,
|
||||
long='X' * PJLINK_MAX_PACKET),
|
||||
TEST1_DATA['ip'], PJLINK_PORT)
|
||||
|
||||
# WHEN: get_datagram called with 0 bytes ready
|
||||
pjlink_udp.get_datagram()
|
||||
|
||||
# THEN: Log entries should be made and method returns
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_get_datagram_data_negative_zero_length(self, mock_log):
|
||||
"""
|
||||
Test get_datagram when pendingDatagramSize = 0
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_warn_calls = [call('(UDP) No data (-1)')]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) get_datagram() - Receiving data')]
|
||||
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
|
||||
patch.object(pjlink_udp, 'readDatagram') as mock_read:
|
||||
mock_datagram.return_value = -1
|
||||
mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT)
|
||||
|
||||
# WHEN: get_datagram called with 0 bytes ready
|
||||
pjlink_udp.get_datagram()
|
||||
|
||||
# THEN: Log entries should be made and method returns
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_get_datagram_data_no_data(self, mock_log):
|
||||
"""
|
||||
Test get_datagram when data length = 0
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_warn_calls = [call('(UDP) Invalid packet - not enough data')]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) get_datagram() - Receiving data')]
|
||||
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
|
||||
patch.object(pjlink_udp, 'readDatagram') as mock_read:
|
||||
mock_datagram.return_value = 1
|
||||
mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT)
|
||||
|
||||
# WHEN: get_datagram called with 0 bytes ready
|
||||
pjlink_udp.get_datagram()
|
||||
|
||||
# THEN: Log entries should be made and method returns
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_get_datagram_data_short(self, mock_log):
|
||||
"""
|
||||
Test get_datagram when data length < 8
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_warn_calls = [call('(UDP) Invalid packet - not enough data')]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) get_datagram() - Receiving data')]
|
||||
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
|
||||
patch.object(pjlink_udp, 'readDatagram') as mock_read:
|
||||
mock_datagram.return_value = 6
|
||||
mock_read.return_value = ('{prefix}2ACKN'.format(prefix=PJLINK_PREFIX), TEST1_DATA['ip'], PJLINK_PORT)
|
||||
|
||||
# WHEN: get_datagram called with 0 bytes ready
|
||||
pjlink_udp.get_datagram()
|
||||
|
||||
# THEN: Log entries should be made and method returns
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_get_datagram_pending_zero_length(self, mock_log):
|
||||
"""
|
||||
Test get_datagram when pendingDatagramSize = 0
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_warn_calls = [call('(UDP) get_datagram() called when pending data size is 0')]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) get_datagram() - Receiving data')]
|
||||
with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram:
|
||||
mock_datagram.return_value = 0
|
||||
|
||||
# WHEN: get_datagram called with 0 bytes ready
|
||||
pjlink_udp.get_datagram()
|
||||
|
||||
# THEN: Log entries should be made and method returns
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_process_ackn_duplicate(self, mock_log):
|
||||
"""
|
||||
Test process_ackn method with multiple calls with same data
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
|
||||
log_warn_calls = [call('(UDP) Host {host} already replied - ignoring'.format(host=TEST1_DATA['ip']))]
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) Processing ACKN packet'),
|
||||
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
|
||||
call('(UDP) Processing ACKN packet')]
|
||||
|
||||
# WHEN: process_ackn called twice with same data
|
||||
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
|
||||
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
|
||||
|
||||
# THEN: pjlink_udp.ack_list should equal test_list
|
||||
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
|
||||
if pjlink_udp.ackn_list != check_list:
|
||||
# Check this way so we can print differences to stdout
|
||||
print('\nackn_list: ', pjlink_udp.ackn_list)
|
||||
print('test_list: ', check_list)
|
||||
assert pjlink_udp.ackn_list == check_list
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
mock_log.warn.assert_has_calls(log_warn_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_process_ackn_multiple(self, mock_log):
|
||||
"""
|
||||
Test process_ackn method with multiple calls
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT},
|
||||
TEST2_DATA['ip']: {'data': TEST2_DATA['mac_adx'], 'port': PJLINK_PORT}}
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) Processing ACKN packet'),
|
||||
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
|
||||
call('(UDP) Processing ACKN packet'),
|
||||
call('(UDP) Adding {host} to ACKN list'.format(host=TEST2_DATA['ip']))]
|
||||
|
||||
# WHEN: process_ackn called twice with different data
|
||||
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
|
||||
pjlink_udp.process_ackn(data=TEST2_DATA['mac_adx'], host=TEST2_DATA['ip'], port=PJLINK_PORT)
|
||||
|
||||
# THEN: pjlink_udp.ack_list should equal test_list
|
||||
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
|
||||
if pjlink_udp.ackn_list != check_list:
|
||||
# Check this way so we can print differences to stdout
|
||||
print('\nackn_list: ', pjlink_udp.ackn_list)
|
||||
print('test_list: ', check_list)
|
||||
assert pjlink_udp.ackn_list == check_list
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_process_ackn_single(self, mock_log):
|
||||
"""
|
||||
Test process_ackn method with single call
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) Processing ACKN packet'),
|
||||
call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip']))]
|
||||
|
||||
# WHEN: process_ackn called twice with different data
|
||||
pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
|
||||
|
||||
# THEN: pjlink_udp.ack_list should equal test_list
|
||||
# NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
|
||||
if pjlink_udp.ackn_list != check_list:
|
||||
# Check this way so we can print differences to stdout
|
||||
print('\nackn_list: ', pjlink_udp.ackn_list)
|
||||
print('test_list: ', check_list)
|
||||
assert pjlink_udp.ackn_list == check_list
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
||||
|
||||
@patch.object(openlp.core.projectors.pjlink, 'log')
|
||||
def test_process_srch(self, mock_log):
|
||||
"""
|
||||
Test process_srch method
|
||||
"""
|
||||
# GIVEN: Test setup
|
||||
pjlink_udp = PJLinkUDP(projector_list=self.test_list)
|
||||
log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
|
||||
call('(UDP) SRCH packet received - ignoring')]
|
||||
|
||||
# WHEN: process_srch called
|
||||
pjlink_udp.process_srch(data=None, host=None, port=None)
|
||||
|
||||
# THEN: debug log entry should be entered
|
||||
mock_log.debug.assert_has_calls(log_debug_calls)
|
@ -125,7 +125,6 @@ class ProjectorSourceFormTest(TestCase, TestMixin):
|
||||
select_form = SourceSelectSingle(parent=None, projectordb=self.projectordb)
|
||||
select_form.edit = True
|
||||
select_form.exec(projector=self.projector)
|
||||
projector = select_form.projector
|
||||
|
||||
# THEN: Verify all 4 buttons are available
|
||||
assert len(select_form.button_box.buttons()) == 4, \
|
||||
@ -144,7 +143,6 @@ class ProjectorSourceFormTest(TestCase, TestMixin):
|
||||
select_form = SourceSelectSingle(parent=None, projectordb=self.projectordb)
|
||||
select_form.edit = False
|
||||
select_form.exec(projector=self.projector)
|
||||
projector = select_form.projector
|
||||
|
||||
# THEN: Verify only 2 buttons are available
|
||||
assert len(select_form.button_box.buttons()) == 2, \
|
@ -45,7 +45,8 @@ TEST1_DATA = dict(ip='111.111.111.111',
|
||||
serial_no='Serial Number 1',
|
||||
sw_version='Version 1',
|
||||
model_filter='Filter type 1',
|
||||
model_lamp='Lamp type 1')
|
||||
model_lamp='Lamp type 1',
|
||||
mac_adx='11:11:11:11:11:11')
|
||||
|
||||
TEST2_DATA = dict(ip='222.222.222.222',
|
||||
port='2222',
|
||||
@ -56,7 +57,8 @@ TEST2_DATA = dict(ip='222.222.222.222',
|
||||
serial_no='Serial Number 2',
|
||||
sw_version='Version 2',
|
||||
model_filter='Filter type 2',
|
||||
model_lamp='Lamp type 2')
|
||||
model_lamp='Lamp type 2',
|
||||
mac_adx='22:22:22:22:22:22')
|
||||
|
||||
TEST3_DATA = dict(ip='333.333.333.333',
|
||||
port='3333',
|
||||
@ -67,7 +69,8 @@ TEST3_DATA = dict(ip='333.333.333.333',
|
||||
serial_no='Serial Number 3',
|
||||
sw_version='Version 3',
|
||||
model_filter='Filter type 3',
|
||||
model_lamp='Lamp type 3')
|
||||
model_lamp='Lamp type 3',
|
||||
mac_adx='33:33:33:33:33:33')
|
||||
|
||||
TEST_VIDEO_CODES = {
|
||||
'11': 'RGB 1',
|
||||
|
Loading…
Reference in New Issue
Block a user