forked from openlp/openlp
Merge trunk updates
This commit is contained in:
commit
9cdf4faa5b
@ -24,7 +24,7 @@ The :mod:`~openlp.core.api.tab` module contains the settings tab for the API
|
||||
"""
|
||||
from PyQt5 import QtCore, QtGui, QtWidgets
|
||||
|
||||
from openlp.core.common import get_local_ip4
|
||||
from openlp.core.common import get_network_interfaces
|
||||
from openlp.core.common.i18n import UiStrings, translate
|
||||
from openlp.core.common.registry import Registry
|
||||
from openlp.core.common.settings import Settings
|
||||
@ -194,8 +194,7 @@ class ApiTab(SettingsTab):
|
||||
http_url_temp = http_url + 'main'
|
||||
self.live_url.setText('<a href="{url}">{url}</a>'.format(url=http_url_temp))
|
||||
|
||||
@staticmethod
|
||||
def get_ip_address(ip_address):
|
||||
def get_ip_address(self, ip_address):
|
||||
"""
|
||||
returns the IP address in dependency of the passed address
|
||||
ip_address == 0.0.0.0: return the IP address of the first valid interface
|
||||
@ -203,9 +202,8 @@ class ApiTab(SettingsTab):
|
||||
"""
|
||||
if ip_address == ZERO_URL:
|
||||
# In case we have more than one interface
|
||||
ifaces = get_local_ip4()
|
||||
for key in iter(ifaces):
|
||||
ip_address = ifaces.get(key)['ip']
|
||||
for _, interface in get_network_interfaces().items():
|
||||
ip_address = interface['ip']
|
||||
# We only want the first interface returned
|
||||
break
|
||||
return ip_address
|
||||
|
99
openlp/core/api/zeroconf.py
Normal file
99
openlp/core/api/zeroconf.py
Normal file
@ -0,0 +1,99 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
|
||||
|
||||
##########################################################################
|
||||
# OpenLP - Open Source Lyrics Projection #
|
||||
# ---------------------------------------------------------------------- #
|
||||
# Copyright (c) 2008-2019 OpenLP Developers #
|
||||
# ---------------------------------------------------------------------- #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
##########################################################################
|
||||
"""
|
||||
The :mod:`~openlp.core.api.zeroconf` module runs a Zerconf server so that OpenLP can advertise the
|
||||
RESTful API for devices on the network to discover.
|
||||
"""
|
||||
import socket
|
||||
from time import sleep
|
||||
|
||||
from zeroconf import ServiceInfo, Zeroconf
|
||||
|
||||
from openlp.core.common import get_network_interfaces
|
||||
from openlp.core.common.registry import Registry
|
||||
from openlp.core.common.settings import Settings
|
||||
from openlp.core.threading import ThreadWorker, run_thread
|
||||
|
||||
|
||||
class ZeroconfWorker(ThreadWorker):
|
||||
"""
|
||||
This thread worker runs a Zeroconf service
|
||||
"""
|
||||
address = None
|
||||
http_port = 4316
|
||||
ws_port = 4317
|
||||
_can_run = False
|
||||
|
||||
def __init__(self, ip_address, http_port=4316, ws_port=4317):
|
||||
"""
|
||||
Create the worker for the Zeroconf service
|
||||
"""
|
||||
super().__init__()
|
||||
self.address = socket.inet_aton(ip_address)
|
||||
self.http_port = http_port
|
||||
self.ws_port = ws_port
|
||||
|
||||
def can_run(self):
|
||||
"""
|
||||
Check if the worker can continue to run. This is mostly so that we can override this method
|
||||
and test the class.
|
||||
"""
|
||||
return self._can_run
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start the service
|
||||
"""
|
||||
http_info = ServiceInfo('_http._tcp.local.', 'OpenLP._http._tcp.local.',
|
||||
address=self.address, port=self.http_port, properties={})
|
||||
ws_info = ServiceInfo('_ws._tcp.local.', 'OpenLP._ws._tcp.local.',
|
||||
address=self.address, port=self.ws_port, properties={})
|
||||
zc = Zeroconf()
|
||||
zc.register_service(http_info)
|
||||
zc.register_service(ws_info)
|
||||
self._can_run = True
|
||||
while self.can_run():
|
||||
sleep(0.1)
|
||||
zc.unregister_service(http_info)
|
||||
zc.unregister_service(ws_info)
|
||||
zc.close()
|
||||
self.quit.emit()
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop the service
|
||||
"""
|
||||
self._can_run = False
|
||||
|
||||
|
||||
def start_zeroconf():
|
||||
"""
|
||||
Start the Zeroconf service
|
||||
"""
|
||||
# When we're running tests, just skip this set up if this flag is set
|
||||
if Registry().get_flag('no_web_server'):
|
||||
return
|
||||
http_port = Settings().value('api/port')
|
||||
ws_port = Settings().value('api/websocket port')
|
||||
for name, interface in get_network_interfaces().items():
|
||||
worker = ZeroconfWorker(interface['ip'], http_port, ws_port)
|
||||
run_thread(worker, 'api_zeroconf_{name}'.format(name=name))
|
@ -391,7 +391,11 @@ def main():
|
||||
vlc_lib = 'libvlc.dylib'
|
||||
elif is_win():
|
||||
vlc_lib = 'libvlc.dll'
|
||||
os.environ['PYTHON_VLC_LIB_PATH'] = str(AppLocation.get_directory(AppLocation.AppDir) / vlc_lib)
|
||||
# Path to libvlc
|
||||
os.environ['PYTHON_VLC_LIB_PATH'] = str(AppLocation.get_directory(AppLocation.AppDir) / 'vlc' / vlc_lib)
|
||||
log.debug('VLC Path: {}'.format(os.environ['PYTHON_VLC_LIB_PATH']))
|
||||
# Path to VLC directory containing VLC's "plugins" directory
|
||||
os.environ['PYTHON_VLC_MODULE_PATH'] = str(AppLocation.get_directory(AppLocation.AppDir) / 'vlc')
|
||||
log.debug('VLC Path: {}'.format(os.environ['PYTHON_VLC_LIB_PATH']))
|
||||
# Initialise the Registry
|
||||
Registry.create()
|
||||
|
@ -51,9 +51,10 @@ REPLACMENT_CHARS_MAP = str.maketrans({'\u2018': '\'', '\u2019': '\'', '\u201c':
|
||||
'\u2013': '-', '\u2014': '-', '\v': '\n\n', '\f': '\n\n'})
|
||||
NEW_LINE_REGEX = re.compile(r' ?(\r\n?|\n) ?')
|
||||
WHITESPACE_REGEX = re.compile(r'[ \t]+')
|
||||
INTERFACE_FILTER = re.compile('lo|loopback|docker|tun', re.IGNORECASE)
|
||||
|
||||
|
||||
def get_local_ip4():
|
||||
def get_network_interfaces():
|
||||
"""
|
||||
Creates a dictionary of local IPv4 interfaces on local machine.
|
||||
If no active interfaces available, returns a dict of localhost IPv4 information
|
||||
@ -61,43 +62,33 @@ def get_local_ip4():
|
||||
:returns: Dict of interfaces
|
||||
"""
|
||||
log.debug('Getting local IPv4 interface(es) information')
|
||||
my_ip4 = {}
|
||||
for iface in QNetworkInterface.allInterfaces():
|
||||
interfaces = {}
|
||||
for interface in QNetworkInterface.allInterfaces():
|
||||
interface_name = interface.name()
|
||||
if INTERFACE_FILTER.search(interface_name):
|
||||
log.debug('Filtering out interfaces we don\'t care about: {name}'.format(name=interface_name))
|
||||
continue
|
||||
log.debug('Checking for isValid and flags == IsUP | IsRunning')
|
||||
if not iface.isValid() or not (iface.flags() & (QNetworkInterface.IsUp | QNetworkInterface.IsRunning)):
|
||||
if not interface.isValid() or not (interface.flags() & (QNetworkInterface.IsUp | QNetworkInterface.IsRunning)):
|
||||
continue
|
||||
log.debug('Checking address(es) protocol')
|
||||
for address in iface.addressEntries():
|
||||
for address in interface.addressEntries():
|
||||
ip = address.ip()
|
||||
log.debug('Checking for protocol == IPv4Protocol')
|
||||
if ip.protocol() == QAbstractSocket.IPv4Protocol:
|
||||
log.debug('Getting interface information')
|
||||
my_ip4[iface.name()] = {'ip': ip.toString(),
|
||||
'broadcast': address.broadcast().toString(),
|
||||
'netmask': address.netmask().toString(),
|
||||
'prefix': address.prefixLength(),
|
||||
'localnet': QHostAddress(address.netmask().toIPv4Address() &
|
||||
ip.toIPv4Address()).toString()
|
||||
}
|
||||
log.debug('Adding {iface} to active list'.format(iface=iface.name()))
|
||||
if len(my_ip4) == 0:
|
||||
interfaces[interface_name] = {
|
||||
'ip': ip.toString(),
|
||||
'broadcast': address.broadcast().toString(),
|
||||
'netmask': address.netmask().toString(),
|
||||
'prefix': address.prefixLength(),
|
||||
'localnet': QHostAddress(address.netmask().toIPv4Address() &
|
||||
ip.toIPv4Address()).toString()
|
||||
}
|
||||
log.debug('Adding {interface} to active list'.format(interface=interface.name()))
|
||||
if len(interfaces) == 0:
|
||||
log.warning('No active IPv4 network interfaces detected')
|
||||
return my_ip4
|
||||
if 'localhost' in my_ip4:
|
||||
log.debug('Renaming windows localhost to lo')
|
||||
my_ip4['lo'] = my_ip4['localhost']
|
||||
my_ip4.pop('localhost')
|
||||
if len(my_ip4) == 1:
|
||||
if 'lo' in my_ip4:
|
||||
# No active interfaces - so leave localhost in there
|
||||
log.warning('No active IPv4 interfaces found except localhost')
|
||||
else:
|
||||
# Since we have a valid IP4 interface, remove localhost
|
||||
if 'lo' in my_ip4:
|
||||
log.debug('Found at least one IPv4 interface, removing localhost')
|
||||
my_ip4.pop('lo')
|
||||
|
||||
return my_ip4
|
||||
return interfaces
|
||||
|
||||
|
||||
def trace_error_handler(logger):
|
||||
|
@ -33,8 +33,9 @@ from tempfile import gettempdir
|
||||
from PyQt5 import QtCore, QtGui, QtWidgets
|
||||
|
||||
from openlp.core.state import State
|
||||
from openlp.core.api import websockets
|
||||
from openlp.core.api.http import server
|
||||
from openlp.core.api.websockets import WebSocketServer
|
||||
from openlp.core.api.http.server import HttpServer
|
||||
from openlp.core.api.zeroconf import start_zeroconf
|
||||
from openlp.core.common import add_actions, is_macosx, is_win
|
||||
from openlp.core.common.actions import ActionList, CategoryOrder
|
||||
from openlp.core.common.applocation import AppLocation
|
||||
@ -495,8 +496,9 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, LogMixin, RegistryPropert
|
||||
self.copy_data = False
|
||||
Settings().set_up_default_values()
|
||||
self.about_form = AboutForm(self)
|
||||
self.ws_server = websockets.WebSocketServer()
|
||||
self.http_server = server.HttpServer(self)
|
||||
self.ws_server = WebSocketServer()
|
||||
self.http_server = HttpServer(self)
|
||||
start_zeroconf()
|
||||
SettingsForm(self)
|
||||
self.formatting_tag_form = FormattingTagForm(self)
|
||||
self.shortcut_form = ShortcutListForm(self)
|
||||
|
@ -246,6 +246,9 @@ class PresentationMediaItem(MediaManagerItem):
|
||||
:rtype: None
|
||||
"""
|
||||
for cidx in self.controllers:
|
||||
if not self.controllers[cidx].enabled():
|
||||
# skip presentation controllers that are not enabled
|
||||
continue
|
||||
file_ext = file_path.suffix[1:]
|
||||
if file_ext in self.controllers[cidx].supports or file_ext in self.controllers[cidx].also_supports:
|
||||
doc = self.controllers[cidx].add_document(file_path)
|
||||
|
@ -23,6 +23,7 @@
|
||||
"""
|
||||
The entrypoint for OpenLP
|
||||
"""
|
||||
import atexit
|
||||
import faulthandler
|
||||
import logging
|
||||
import multiprocessing
|
||||
@ -36,18 +37,33 @@ from openlp.core.common.applocation import AppLocation
|
||||
from openlp.core.common.path import create_paths
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
error_log_file = None
|
||||
|
||||
|
||||
def tear_down_fault_handling():
|
||||
"""
|
||||
When Python exits, close the file we were using for the faulthandler
|
||||
"""
|
||||
global error_log_file
|
||||
error_log_file.close()
|
||||
|
||||
|
||||
def set_up_fault_handling():
|
||||
"""
|
||||
Set up the Python fault handler
|
||||
"""
|
||||
global error_log_file
|
||||
# Create the cache directory if it doesn't exist, and enable the fault handler to log to an error log file
|
||||
try:
|
||||
create_paths(AppLocation.get_directory(AppLocation.CacheDir))
|
||||
faulthandler.enable((AppLocation.get_directory(AppLocation.CacheDir) / 'error.log').open('wb'))
|
||||
error_log_file = (AppLocation.get_directory(AppLocation.CacheDir) / 'error.log').open('wb')
|
||||
atexit.register(tear_down_fault_handling)
|
||||
faulthandler.enable(error_log_file)
|
||||
except OSError:
|
||||
log.exception('An exception occurred when enabling the fault handler')
|
||||
atexit.unregister(tear_down_fault_handling)
|
||||
if error_log_file:
|
||||
error_log_file.close()
|
||||
|
||||
|
||||
def start():
|
||||
|
@ -18,7 +18,7 @@ environment:
|
||||
|
||||
install:
|
||||
# Install dependencies from pypi
|
||||
- "%PYTHON%\\python.exe -m pip install sqlalchemy alembic appdirs chardet beautifulsoup4 lxml Mako mysql-connector-python pytest mock pyodbc psycopg2 pypiwin32 websockets asyncio waitress six webob requests QtAwesome PyQt5 PyQtWebEngine pymediainfo PyMuPDF QDarkStyle python-vlc Pyro4"
|
||||
- "%PYTHON%\\python.exe -m pip install sqlalchemy alembic appdirs chardet beautifulsoup4 lxml Mako mysql-connector-python pytest mock pyodbc psycopg2 pypiwin32 websockets asyncio waitress six webob requests QtAwesome PyQt5 PyQtWebEngine pymediainfo PyMuPDF QDarkStyle python-vlc Pyro4 zeroconf"
|
||||
|
||||
build: off
|
||||
|
||||
|
@ -90,7 +90,8 @@ MODULES = [
|
||||
'requests',
|
||||
'qtawesome',
|
||||
'pymediainfo',
|
||||
'vlc'
|
||||
'vlc',
|
||||
'zeroconf'
|
||||
]
|
||||
|
||||
|
||||
|
3
setup.py
3
setup.py
@ -185,7 +185,8 @@ using a computer and a data projector.""",
|
||||
'SQLAlchemy >= 0.5',
|
||||
'waitress',
|
||||
'WebOb',
|
||||
'websockets'
|
||||
'websockets',
|
||||
'zeroconf'
|
||||
],
|
||||
extras_require={
|
||||
'agpl-pdf': ['PyMuPDF'],
|
||||
|
@ -28,7 +28,7 @@ from unittest import TestCase
|
||||
from PyQt5 import QtWidgets
|
||||
|
||||
from openlp.core.api.tab import ApiTab
|
||||
from openlp.core.common import get_local_ip4
|
||||
from openlp.core.common import get_network_interfaces
|
||||
from openlp.core.common.registry import Registry
|
||||
from openlp.core.common.settings import Settings
|
||||
from tests.helpers.testmixin import TestMixin
|
||||
@ -62,7 +62,7 @@ class TestApiTab(TestCase, TestMixin):
|
||||
Registry().create()
|
||||
Registry().set_flag('website_version', '00-00-0000')
|
||||
self.form = ApiTab(self.parent)
|
||||
self.my_ip4_list = get_local_ip4()
|
||||
self.interfaces = get_network_interfaces()
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
@ -77,9 +77,9 @@ class TestApiTab(TestCase, TestMixin):
|
||||
Test the get_ip_address function with ZERO_URL
|
||||
"""
|
||||
# GIVEN: list of local IP addresses for this machine
|
||||
ip4_list = []
|
||||
for ip4 in iter(self.my_ip4_list):
|
||||
ip4_list.append(self.my_ip4_list.get(ip4)['ip'])
|
||||
ip_addresses = []
|
||||
for _, interface in self.interfaces.items():
|
||||
ip_addresses.append(interface['ip'])
|
||||
|
||||
# WHEN: the default ip address is given
|
||||
ip_address = self.form.get_ip_address(ZERO_URL)
|
||||
@ -87,7 +87,7 @@ class TestApiTab(TestCase, TestMixin):
|
||||
# THEN: the default ip address will be returned
|
||||
assert re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', ip_address), \
|
||||
'The return value should be a valid ip address'
|
||||
assert ip_address in ip4_list, 'The return address should be in the list of local IP addresses'
|
||||
assert ip_address in ip_addresses, 'The return address should be in the list of local IP addresses'
|
||||
|
||||
def test_get_ip_address_with_ip(self):
|
||||
"""
|
||||
|
@ -31,7 +31,7 @@ from unittest.mock import patch
|
||||
from openlp.core.common.json import JSONMixin, OpenLPJSONDecoder, OpenLPJSONEncoder, PathSerializer, _registered_classes
|
||||
|
||||
|
||||
class TestClassBase(object):
|
||||
class BaseTestClass(object):
|
||||
"""
|
||||
Simple class to avoid repetition
|
||||
"""
|
||||
@ -81,7 +81,7 @@ class TestJSONMixin(TestCase):
|
||||
Test that an instance of a JSONMixin subclass is properly serialized to a JSON string
|
||||
"""
|
||||
# GIVEN: A instance of a subclass of the JSONMixin class
|
||||
class TestClass(TestClassBase, JSONMixin):
|
||||
class TestClass(BaseTestClass, JSONMixin):
|
||||
_json_keys = ['a', 'b']
|
||||
|
||||
instance = TestClass(a=1, c=2)
|
||||
@ -97,7 +97,7 @@ class TestJSONMixin(TestCase):
|
||||
Test that an instance of a JSONMixin subclass is properly deserialized from a JSON string
|
||||
"""
|
||||
# GIVEN: A subclass of the JSONMixin class
|
||||
class TestClass(TestClassBase, JSONMixin):
|
||||
class TestClass(BaseTestClass, JSONMixin):
|
||||
_json_keys = ['a', 'b']
|
||||
|
||||
# WHEN: Deserializing a JSON representation of the TestClass
|
||||
@ -115,7 +115,7 @@ class TestJSONMixin(TestCase):
|
||||
Test that an instance of a JSONMixin subclass is properly serialized to a JSON string when using a custom name
|
||||
"""
|
||||
# GIVEN: A instance of a subclass of the JSONMixin class with a custom name
|
||||
class TestClass(TestClassBase, JSONMixin, register_names=('AltName', )):
|
||||
class TestClass(BaseTestClass, JSONMixin, register_names=('AltName', )):
|
||||
_json_keys = ['a', 'b']
|
||||
_name = 'AltName'
|
||||
_version = 2
|
||||
@ -134,7 +134,7 @@ class TestJSONMixin(TestCase):
|
||||
name
|
||||
"""
|
||||
# GIVEN: A instance of a subclass of the JSONMixin class with a custom name
|
||||
class TestClass(TestClassBase, JSONMixin, register_names=('AltName', )):
|
||||
class TestClass(BaseTestClass, JSONMixin, register_names=('AltName', )):
|
||||
_json_keys = ['a', 'b']
|
||||
_name = 'AltName'
|
||||
_version = 2
|
||||
|
@ -24,7 +24,7 @@ This module contains tests for the lib submodule of the Presentations plugin.
|
||||
"""
|
||||
from pathlib import Path
|
||||
from unittest import TestCase
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
from unittest.mock import MagicMock, PropertyMock, call, patch
|
||||
|
||||
from openlp.core.common.registry import Registry
|
||||
from openlp.plugins.presentations.lib.mediaitem import PresentationMediaItem
|
||||
@ -94,17 +94,23 @@ class TestMediaItem(TestCase, TestMixin):
|
||||
Test that the clean_up_thumbnails method works as expected when files exists.
|
||||
"""
|
||||
# GIVEN: A mocked controller, and mocked os.path.getmtime
|
||||
mocked_controller = MagicMock()
|
||||
mocked_disabled_controller = MagicMock()
|
||||
mocked_disabled_controller.enabled.return_value = False
|
||||
mocked_disabled_supports = PropertyMock()
|
||||
type(mocked_disabled_controller).supports = mocked_disabled_supports
|
||||
mocked_enabled_controller = MagicMock()
|
||||
mocked_enabled_controller.enabled.return_value = True
|
||||
mocked_doc = MagicMock(**{'get_thumbnail_path.return_value': Path()})
|
||||
mocked_controller.add_document.return_value = mocked_doc
|
||||
mocked_controller.supports = ['tmp']
|
||||
mocked_enabled_controller.add_document.return_value = mocked_doc
|
||||
mocked_enabled_controller.supports = ['tmp']
|
||||
self.media_item.controllers = {
|
||||
'Mocked': mocked_controller
|
||||
'Enabled': mocked_enabled_controller,
|
||||
'Disabled': mocked_disabled_controller
|
||||
}
|
||||
|
||||
thmub_path = MagicMock(st_mtime=100)
|
||||
thumb_path = MagicMock(st_mtime=100)
|
||||
file_path = MagicMock(st_mtime=400)
|
||||
with patch.object(Path, 'stat', side_effect=[thmub_path, file_path]), \
|
||||
with patch.object(Path, 'stat', side_effect=[thumb_path, file_path]), \
|
||||
patch.object(Path, 'exists', return_value=True):
|
||||
presentation_file = Path('file.tmp')
|
||||
|
||||
@ -114,6 +120,7 @@ class TestMediaItem(TestCase, TestMixin):
|
||||
# THEN: doc.presentation_deleted should have been called since the thumbnails mtime will be greater than
|
||||
# the presentation_file's mtime.
|
||||
mocked_doc.assert_has_calls([call.get_thumbnail_path(1, True), call.presentation_deleted()], True)
|
||||
assert mocked_disabled_supports.call_count == 0
|
||||
|
||||
def test_clean_up_thumbnails_missing_file(self):
|
||||
"""
|
||||
|
@ -62,9 +62,10 @@ class TestMainWindow(TestCase, TestMixin):
|
||||
patch('openlp.core.ui.mainwindow.ServiceManager'), \
|
||||
patch('openlp.core.ui.mainwindow.ThemeManager'), \
|
||||
patch('openlp.core.ui.mainwindow.ProjectorManager'), \
|
||||
patch('openlp.core.ui.mainwindow.websockets.WebSocketServer'), \
|
||||
patch('openlp.core.ui.mainwindow.PluginForm'), \
|
||||
patch('openlp.core.ui.mainwindow.server.HttpServer'):
|
||||
patch('openlp.core.ui.mainwindow.HttpServer'), \
|
||||
patch('openlp.core.ui.mainwindow.WebSocketServer'), \
|
||||
patch('openlp.core.ui.mainwindow.start_zeroconf'), \
|
||||
patch('openlp.core.ui.mainwindow.PluginForm'):
|
||||
self.main_window = MainWindow()
|
||||
|
||||
def tearDown(self):
|
||||
|
112
tests/openlp_core/api/test_zeroconf.py
Normal file
112
tests/openlp_core/api/test_zeroconf.py
Normal file
@ -0,0 +1,112 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
|
||||
|
||||
##########################################################################
|
||||
# OpenLP - Open Source Lyrics Projection #
|
||||
# ---------------------------------------------------------------------- #
|
||||
# Copyright (c) 2008-2019 OpenLP Developers #
|
||||
# ---------------------------------------------------------------------- #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
##########################################################################
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
from openlp.core.api.zeroconf import ZeroconfWorker, start_zeroconf
|
||||
|
||||
|
||||
@patch('openlp.core.api.zeroconf.socket.inet_aton')
|
||||
def test_zeroconf_worker_constructor(mocked_inet_aton):
|
||||
"""Test creating the Zeroconf worker object"""
|
||||
# GIVEN: A ZeroconfWorker class and a mocked inet_aton
|
||||
mocked_inet_aton.return_value = 'processed_ip'
|
||||
|
||||
# WHEN: An instance of the ZeroconfWorker is created
|
||||
worker = ZeroconfWorker('127.0.0.1', 8000, 8001)
|
||||
|
||||
# THEN: The inet_aton function should have been called and the attrs should be set
|
||||
mocked_inet_aton.assert_called_once_with('127.0.0.1')
|
||||
assert worker.address == 'processed_ip'
|
||||
assert worker.http_port == 8000
|
||||
assert worker.ws_port == 8001
|
||||
|
||||
|
||||
@patch('openlp.core.api.zeroconf.ServiceInfo')
|
||||
@patch('openlp.core.api.zeroconf.Zeroconf')
|
||||
def test_zeroconf_worker_start(MockedZeroconf, MockedServiceInfo):
|
||||
"""Test the start() method of ZeroconfWorker"""
|
||||
# GIVEN: A few mocks and a ZeroconfWorker instance with a mocked can_run method
|
||||
mocked_http_info = MagicMock()
|
||||
mocked_ws_info = MagicMock()
|
||||
mocked_zc = MagicMock()
|
||||
MockedServiceInfo.side_effect = [mocked_http_info, mocked_ws_info]
|
||||
MockedZeroconf.return_value = mocked_zc
|
||||
worker = ZeroconfWorker('127.0.0.1', 8000, 8001)
|
||||
|
||||
# WHEN: The start() method is called
|
||||
with patch.object(worker, 'can_run') as mocked_can_run:
|
||||
mocked_can_run.side_effect = [True, False]
|
||||
worker.start()
|
||||
|
||||
# THEN: The correct calls are made
|
||||
assert MockedServiceInfo.call_args_list == [
|
||||
call('_http._tcp.local.', 'OpenLP._http._tcp.local.', address=b'\x7f\x00\x00\x01', port=8000, properties={}),
|
||||
call('_ws._tcp.local.', 'OpenLP._ws._tcp.local.', address=b'\x7f\x00\x00\x01', port=8001, properties={})
|
||||
]
|
||||
assert MockedZeroconf.call_count == 1
|
||||
assert mocked_zc.register_service.call_args_list == [call(mocked_http_info), call(mocked_ws_info)]
|
||||
assert mocked_can_run.call_count == 2
|
||||
assert mocked_zc.unregister_service.call_args_list == [call(mocked_http_info), call(mocked_ws_info)]
|
||||
assert mocked_zc.close.call_count == 1
|
||||
|
||||
|
||||
def test_zeroconf_worker_stop():
|
||||
"""Test that the ZeroconfWorker.stop() method correctly stops the service"""
|
||||
# GIVEN: A worker object with _can_run set to True
|
||||
worker = ZeroconfWorker('127.0.0.1', 8000, 8001)
|
||||
worker._can_run = True
|
||||
|
||||
# WHEN: stop() is called
|
||||
worker.stop()
|
||||
|
||||
# THEN: _can_run should be False
|
||||
assert worker._can_run is False
|
||||
|
||||
|
||||
@patch('openlp.core.api.zeroconf.get_network_interfaces')
|
||||
@patch('openlp.core.api.zeroconf.Registry')
|
||||
@patch('openlp.core.api.zeroconf.Settings')
|
||||
@patch('openlp.core.api.zeroconf.ZeroconfWorker')
|
||||
@patch('openlp.core.api.zeroconf.run_thread')
|
||||
def test_start_zeroconf(mocked_run_thread, MockedZeroconfWorker, MockedSettings, MockedRegistry,
|
||||
mocked_get_network_interfaces):
|
||||
"""Test the start_zeroconf() function"""
|
||||
# GIVEN: A whole bunch of stuff that's mocked out
|
||||
mocked_get_network_interfaces.return_value = {
|
||||
'eth0': {
|
||||
'ip': '192.168.1.191',
|
||||
'broadcast': '192.168.1.255',
|
||||
'netmask': '255.255.255.0',
|
||||
'prefix': 24,
|
||||
'localnet': '192.168.1.0'
|
||||
}
|
||||
}
|
||||
MockedRegistry.return_value.get_flag.return_value = False
|
||||
MockedSettings.return_value.value.side_effect = [8000, 8001]
|
||||
mocked_worker = MagicMock()
|
||||
MockedZeroconfWorker.return_value = mocked_worker
|
||||
|
||||
# WHEN: start_zeroconf() is called
|
||||
start_zeroconf()
|
||||
|
||||
# THEN: A worker is added to the list of threads
|
||||
mocked_run_thread.assert_called_once_with(mocked_worker, 'api_zeroconf_eth0')
|
@ -28,7 +28,7 @@ from PyQt5.QtCore import QObject
|
||||
from PyQt5.QtNetwork import QHostAddress, QNetworkAddressEntry, QNetworkInterface
|
||||
|
||||
import openlp.core.common
|
||||
from openlp.core.common import get_local_ip4
|
||||
from openlp.core.common import get_network_interfaces
|
||||
from tests.helpers.testmixin import TestMixin
|
||||
|
||||
|
||||
@ -101,7 +101,7 @@ class TestInterfaces(TestCase, TestMixin):
|
||||
self.destroy_settings()
|
||||
|
||||
@patch.object(openlp.core.common, 'log')
|
||||
def test_ip4_no_interfaces(self, mock_log):
|
||||
def test_network_interfaces_no_interfaces(self, mock_log):
|
||||
"""
|
||||
Test no interfaces available
|
||||
"""
|
||||
@ -109,115 +109,101 @@ class TestInterfaces(TestCase, TestMixin):
|
||||
call_debug = [call('Getting local IPv4 interface(es) information')]
|
||||
call_warning = [call('No active IPv4 network interfaces detected')]
|
||||
|
||||
# WHEN: get_local_ip4 is called
|
||||
# WHEN: get_network_interfaces() is called
|
||||
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
|
||||
mock_network_interface.allInterfaces.return_value = []
|
||||
ifaces = get_local_ip4()
|
||||
interfaces = get_network_interfaces()
|
||||
|
||||
# THEN: There should not be any interfaces detected
|
||||
mock_log.debug.assert_has_calls(call_debug)
|
||||
mock_log.warning.assert_has_calls(call_warning)
|
||||
assert not ifaces, 'There should have been no active interfaces listed'
|
||||
assert not interfaces, 'There should have been no active interfaces listed'
|
||||
|
||||
@patch.object(openlp.core.common, 'log')
|
||||
def test_ip4_lo(self, mock_log):
|
||||
def test_network_interfaces_lo(self, mock_log):
|
||||
"""
|
||||
Test get_local_ip4 returns proper dictionary with 'lo'
|
||||
Test get_network_interfaces() returns an empty dictionary if "lo" is the only network interface
|
||||
"""
|
||||
# GIVEN: Test environment
|
||||
call_debug = [call('Getting local IPv4 interface(es) information'),
|
||||
call('Checking for isValid and flags == IsUP | IsRunning'),
|
||||
call('Checking address(es) protocol'),
|
||||
call('Checking for protocol == IPv4Protocol'),
|
||||
call('Getting interface information'),
|
||||
call('Adding lo to active list')]
|
||||
call_warning = [call('No active IPv4 interfaces found except localhost')]
|
||||
call_debug = [
|
||||
call('Getting local IPv4 interface(es) information'),
|
||||
call("Filtering out interfaces we don't care about: lo")
|
||||
]
|
||||
|
||||
# WHEN: get_local_ip4 is called
|
||||
# WHEN: get_network_interfaces() is called
|
||||
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
|
||||
mock_network_interface.allInterfaces.return_value = [self.fake_lo]
|
||||
ifaces = get_local_ip4()
|
||||
interfaces = get_network_interfaces()
|
||||
|
||||
# THEN: There should be a fake 'lo' interface
|
||||
# THEN: There should be no interfaces
|
||||
mock_log.debug.assert_has_calls(call_debug)
|
||||
mock_log.warning.assert_has_calls(call_warning)
|
||||
assert ifaces == self.fake_lo.fake_data, "There should have been an 'lo' interface listed"
|
||||
assert interfaces == {}, 'There should be no interfaces listed'
|
||||
|
||||
@patch.object(openlp.core.common, 'log')
|
||||
def test_ip4_localhost(self, mock_log):
|
||||
def test_network_interfaces_localhost(self, mock_log):
|
||||
"""
|
||||
Test get_local_ip4 returns proper dictionary with 'lo' if interface is 'localhost'
|
||||
Test get_network_interfaces() returns an empty dictionary if "localhost" is the only network interface
|
||||
"""
|
||||
# GIVEN: Test environment
|
||||
call_debug = [call('Getting local IPv4 interface(es) information'),
|
||||
call('Checking for isValid and flags == IsUP | IsRunning'),
|
||||
call('Checking address(es) protocol'),
|
||||
call('Checking for protocol == IPv4Protocol'),
|
||||
call('Getting interface information'),
|
||||
call('Adding localhost to active list'),
|
||||
call('Renaming windows localhost to lo')]
|
||||
call_warning = [call('No active IPv4 interfaces found except localhost')]
|
||||
call_debug = [
|
||||
call('Getting local IPv4 interface(es) information'),
|
||||
call("Filtering out interfaces we don't care about: localhost")
|
||||
]
|
||||
|
||||
# WHEN: get_local_ip4 is called
|
||||
# WHEN: get_network_interfaces() is called
|
||||
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
|
||||
mock_network_interface.allInterfaces.return_value = [self.fake_localhost]
|
||||
ifaces = get_local_ip4()
|
||||
interfaces = get_network_interfaces()
|
||||
|
||||
# THEN: There should be a fake 'lo' interface
|
||||
# THEN: There should be no interfaces
|
||||
mock_log.debug.assert_has_calls(call_debug)
|
||||
mock_log.warning.assert_has_calls(call_warning)
|
||||
assert ifaces == self.fake_lo.fake_data, "There should have been an 'lo' interface listed"
|
||||
assert interfaces == {}, 'There should be no interfaces listed'
|
||||
|
||||
@patch.object(openlp.core.common, 'log')
|
||||
def test_ip4_eth25(self, mock_log):
|
||||
def test_network_interfaces_eth25(self, mock_log):
|
||||
"""
|
||||
Test get_local_ip4 returns proper dictionary with 'eth25'
|
||||
Test get_network_interfaces() returns proper dictionary with 'eth25'
|
||||
"""
|
||||
# GIVEN: Test environment
|
||||
call_debug = [call('Getting local IPv4 interface(es) information'),
|
||||
call('Checking for isValid and flags == IsUP | IsRunning'),
|
||||
call('Checking address(es) protocol'),
|
||||
call('Checking for protocol == IPv4Protocol'),
|
||||
call('Getting interface information'),
|
||||
call('Adding eth25 to active list')]
|
||||
call_warning = []
|
||||
call_debug = [
|
||||
call('Getting local IPv4 interface(es) information'),
|
||||
call('Checking for isValid and flags == IsUP | IsRunning'),
|
||||
call('Checking address(es) protocol'),
|
||||
call('Checking for protocol == IPv4Protocol'),
|
||||
call('Getting interface information'),
|
||||
call('Adding eth25 to active list')
|
||||
]
|
||||
|
||||
# WHEN: get_local_ip4 is called
|
||||
# WHEN: get_network_interfaces() is called
|
||||
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
|
||||
mock_network_interface.allInterfaces.return_value = [self.fake_address]
|
||||
ifaces = get_local_ip4()
|
||||
interfaces = get_network_interfaces()
|
||||
|
||||
# THEN: There should be a fake 'eth25' interface
|
||||
mock_log.debug.assert_has_calls(call_debug)
|
||||
mock_log.warning.assert_has_calls(call_warning)
|
||||
assert ifaces == self.fake_address.fake_data
|
||||
assert interfaces == self.fake_address.fake_data
|
||||
|
||||
@patch.object(openlp.core.common, 'log')
|
||||
def test_ip4_lo_eth25(self, mock_log):
|
||||
def test_network_interfaces_lo_eth25(self, mock_log):
|
||||
"""
|
||||
Test get_local_ip4 returns proper dictionary with 'eth25'
|
||||
Test get_network_interfaces() returns proper dictionary with 'eth25'
|
||||
"""
|
||||
# GIVEN: Test environment
|
||||
call_debug = [call('Getting local IPv4 interface(es) information'),
|
||||
call('Checking for isValid and flags == IsUP | IsRunning'),
|
||||
call('Checking address(es) protocol'),
|
||||
call('Checking for protocol == IPv4Protocol'),
|
||||
call('Getting interface information'),
|
||||
call('Adding lo to active list'),
|
||||
call('Checking for isValid and flags == IsUP | IsRunning'),
|
||||
call('Checking address(es) protocol'),
|
||||
call('Checking for protocol == IPv4Protocol'),
|
||||
call('Getting interface information'),
|
||||
call('Adding eth25 to active list'),
|
||||
call('Found at least one IPv4 interface, removing localhost')]
|
||||
call_warning = []
|
||||
call_debug = [
|
||||
call('Getting local IPv4 interface(es) information'),
|
||||
call("Filtering out interfaces we don't care about: lo"),
|
||||
call('Checking for isValid and flags == IsUP | IsRunning'),
|
||||
call('Checking address(es) protocol'),
|
||||
call('Checking for protocol == IPv4Protocol'),
|
||||
call('Getting interface information'),
|
||||
call('Adding eth25 to active list')
|
||||
]
|
||||
|
||||
# WHEN: get_local_ip4 is called
|
||||
# WHEN: get_network_interfaces() is called
|
||||
with patch('openlp.core.common.QNetworkInterface') as mock_network_interface:
|
||||
mock_network_interface.allInterfaces.return_value = [self.fake_lo, self.fake_address]
|
||||
ifaces = get_local_ip4()
|
||||
interfaces = get_network_interfaces()
|
||||
|
||||
# THEN: There should be a fake 'eth25' interface
|
||||
mock_log.debug.assert_has_calls(call_debug)
|
||||
mock_log.warning.assert_has_calls(call_warning)
|
||||
assert ifaces == self.fake_address.fake_data, "There should have been only 'eth25' interface listed"
|
||||
assert interfaces == self.fake_address.fake_data, "There should have been only 'eth25' interface listed"
|
||||
|
@ -153,8 +153,9 @@ class TestProjectorDB(TestCase, TestMixin):
|
||||
patch('openlp.core.ui.mainwindow.ServiceManager'), \
|
||||
patch('openlp.core.ui.mainwindow.ThemeManager'), \
|
||||
patch('openlp.core.ui.mainwindow.ProjectorManager'), \
|
||||
patch('openlp.core.ui.mainwindow.websockets.WebSocketServer'), \
|
||||
patch('openlp.core.ui.mainwindow.server.HttpServer'), \
|
||||
patch('openlp.core.ui.mainwindow.WebSocketServer'), \
|
||||
patch('openlp.core.ui.mainwindow.HttpServer'), \
|
||||
patch('openlp.core.ui.mainwindow.start_zeroconf'), \
|
||||
patch('openlp.core.state.State.list_plugins') as mock_plugins:
|
||||
mock_plugins.return_value = []
|
||||
self.main_window = MainWindow()
|
||||
|
Loading…
Reference in New Issue
Block a user