forked from openlp/openlp
Refactor web remote deployment, plus other fixes and enhancements
- Refactor deployment away from the web server thread - Refactor the web remote settings tab - Provide a way for threads to show an error message to the user - Fix an issue where multiple zeroconf services were being run instead of a single instance on multiple addresses - Refactored the DownloadProcess dialog - Fix the tests
This commit is contained in:
parent
4a667c77fc
commit
73507884fb
@ -21,11 +21,16 @@
|
||||
"""
|
||||
Download and "install" the remote web client
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
from zipfile import ZipFile
|
||||
from PyQt5 import QtWidgets
|
||||
|
||||
from openlp.core.common.applocation import AppLocation
|
||||
from openlp.core.common.httputils import download_file, get_url_file_size, get_web_page
|
||||
from openlp.core.common.httputils import download_file, get_web_page, get_openlp_user_agent
|
||||
|
||||
REMOTE_URL = 'https://get.openlp.org/remote/'
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def deploy_zipfile(app_root_path, zip_name):
|
||||
@ -42,28 +47,32 @@ def deploy_zipfile(app_root_path, zip_name):
|
||||
web_zip.extractall(app_root_path)
|
||||
|
||||
|
||||
def download_sha256():
|
||||
def download_version_info():
|
||||
"""
|
||||
Download the config file to extract the sha256 and version number
|
||||
Download the version information file
|
||||
"""
|
||||
user_agent = 'OpenLP/' + QtWidgets.QApplication.applicationVersion()
|
||||
try:
|
||||
web_config = get_web_page('https://get.openlp.org/webclient/download.cfg', headers={'User-Agent': user_agent})
|
||||
file_contents = get_web_page(REMOTE_URL + 'version.json', headers={'User-Agent': get_openlp_user_agent()})
|
||||
except ConnectionError:
|
||||
return False
|
||||
if not web_config:
|
||||
if not file_contents:
|
||||
return None
|
||||
file_bits = web_config.split()
|
||||
return file_bits[0], file_bits[2]
|
||||
return json.loads(file_contents)
|
||||
|
||||
|
||||
def download_and_check(callback=None):
|
||||
"""
|
||||
Download the web site and deploy it.
|
||||
"""
|
||||
sha256, version = download_sha256()
|
||||
file_size = get_url_file_size('https://get.openlp.org/webclient/site.zip')
|
||||
version_info = download_version_info()
|
||||
if not version_info:
|
||||
log.warning('Unable to access the version information, abandoning download')
|
||||
# Show the user an error message
|
||||
return None
|
||||
file_size = version_info['latest']['size']
|
||||
callback.setRange(0, file_size)
|
||||
if download_file(callback, 'https://get.openlp.org/webclient/site.zip',
|
||||
AppLocation.get_section_data_path('remotes') / 'site.zip'):
|
||||
deploy_zipfile(AppLocation.get_section_data_path('remotes'), 'site.zip')
|
||||
if download_file(callback, REMOTE_URL + '{version}/{filename}'.format(**version_info['latest']),
|
||||
AppLocation.get_section_data_path('remotes') / 'remote.zip'):
|
||||
deploy_zipfile(AppLocation.get_section_data_path('remotes'), 'remote.zip')
|
||||
return version_info['latest']['version']
|
||||
return None
|
||||
|
@ -23,16 +23,12 @@ The :mod:`http` module contains the API web server. This is a lightweight web se
|
||||
with OpenLP. It uses JSON to communicate with the remotes.
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
from secrets import token_hex
|
||||
|
||||
from PyQt5 import QtCore, QtWidgets
|
||||
from waitress.server import create_server
|
||||
|
||||
from openlp.core.api.deploy import download_and_check, download_sha256
|
||||
from openlp.core.api.poll import Poller
|
||||
from openlp.core.common.applocation import AppLocation
|
||||
from openlp.core.common.i18n import UiStrings, translate
|
||||
from openlp.core.common.mixins import LogMixin, RegistryProperties
|
||||
from openlp.core.common.path import create_paths
|
||||
from openlp.core.common.registry import Registry, RegistryBase
|
||||
@ -86,9 +82,6 @@ class HttpServer(RegistryBase, RegistryProperties, LogMixin):
|
||||
if not Registry().get_flag('no_web_server'):
|
||||
worker = HttpWorker()
|
||||
run_thread(worker, 'http_server')
|
||||
Registry().register_function('download_website', self.first_time)
|
||||
Registry().register_function('get_website_version', self.website_version)
|
||||
Registry().set_flag('website_version', '0.0')
|
||||
|
||||
def bootstrap_post_set_up(self):
|
||||
"""
|
||||
@ -97,66 +90,3 @@ class HttpServer(RegistryBase, RegistryProperties, LogMixin):
|
||||
create_paths(AppLocation.get_section_data_path('remotes'))
|
||||
self.poller = Poller()
|
||||
Registry().register('poller', self.poller)
|
||||
|
||||
def first_time(self):
|
||||
"""
|
||||
Import web site code if active
|
||||
"""
|
||||
self.application.process_events()
|
||||
progress = DownloadProgressDialog(self)
|
||||
progress.forceShow()
|
||||
self.application.process_events()
|
||||
time.sleep(1)
|
||||
download_and_check(progress)
|
||||
self.application.process_events()
|
||||
time.sleep(1)
|
||||
progress.close()
|
||||
self.application.process_events()
|
||||
self.settings.setValue('remotes/download version', self.version)
|
||||
|
||||
def website_version(self):
|
||||
"""
|
||||
Download and save the website version and sha256
|
||||
:return: None
|
||||
"""
|
||||
sha256, self.version = download_sha256()
|
||||
Registry().set_flag('website_sha256', sha256)
|
||||
Registry().set_flag('website_version', self.version)
|
||||
|
||||
|
||||
class DownloadProgressDialog(QtWidgets.QProgressDialog):
|
||||
"""
|
||||
Local class to handle download display based and supporting httputils:get_web_page
|
||||
"""
|
||||
def __init__(self, parent):
|
||||
super(DownloadProgressDialog, self).__init__(parent.main_window)
|
||||
self.parent = parent
|
||||
self.setWindowModality(QtCore.Qt.WindowModal)
|
||||
self.setWindowTitle(translate('RemotePlugin', 'Importing Website'))
|
||||
self.setLabelText(UiStrings().StartingImport)
|
||||
self.setCancelButton(None)
|
||||
self.setRange(0, 1)
|
||||
self.setMinimumDuration(0)
|
||||
self.was_cancelled = False
|
||||
self.previous_size = 0
|
||||
|
||||
def update_progress(self, count, block_size):
|
||||
"""
|
||||
Calculate and display the download progress.
|
||||
"""
|
||||
increment = (count * block_size) - self.previous_size
|
||||
self._increment_progress_bar(None, increment)
|
||||
self.previous_size = count * block_size
|
||||
|
||||
def _increment_progress_bar(self, status_text, increment=1):
|
||||
"""
|
||||
Update the wizard progress page.
|
||||
|
||||
:param status_text: Current status information to display.
|
||||
:param increment: The value to increment the progress bar by.
|
||||
"""
|
||||
if status_text:
|
||||
self.setText(status_text)
|
||||
if increment > 0:
|
||||
self.setValue(self.value() + increment)
|
||||
self.parent.application.process_events()
|
||||
|
@ -21,13 +21,17 @@
|
||||
"""
|
||||
The :mod:`~openlp.core.api.tab` module contains the settings tab for the API
|
||||
"""
|
||||
from time import sleep
|
||||
|
||||
from PyQt5 import QtCore, QtGui, QtWidgets
|
||||
|
||||
from openlp.core.api.deploy import download_and_check, download_version_info
|
||||
from openlp.core.common import get_network_interfaces
|
||||
from openlp.core.common.i18n import UiStrings, translate
|
||||
from openlp.core.common.i18n import translate
|
||||
from openlp.core.common.registry import Registry
|
||||
from openlp.core.lib.settingstab import SettingsTab
|
||||
from openlp.core.ui.icons import UiIcons
|
||||
from openlp.core.widgets.dialogs import DownloadProgressDialog
|
||||
|
||||
|
||||
ZERO_URL = '0.0.0.0'
|
||||
@ -39,7 +43,8 @@ class ApiTab(SettingsTab):
|
||||
"""
|
||||
def __init__(self, parent):
|
||||
self.icon_path = UiIcons().remote
|
||||
advanced_translated = translate('OpenLP.AdvancedTab', 'Advanced')
|
||||
advanced_translated = translate('OpenLP.APITab', 'API')
|
||||
self.master_version = None
|
||||
super(ApiTab, self).__init__(parent, 'api', advanced_translated)
|
||||
|
||||
def setup_ui(self):
|
||||
@ -115,23 +120,30 @@ class ApiTab(SettingsTab):
|
||||
self.password.setObjectName('password')
|
||||
self.user_login_layout.addRow(self.password_label, self.password)
|
||||
self.left_layout.addWidget(self.user_login_group_box)
|
||||
self.update_site_group_box = QtWidgets.QGroupBox(self.left_column)
|
||||
self.update_site_group_box.setCheckable(True)
|
||||
self.update_site_group_box.setChecked(False)
|
||||
self.update_site_group_box.setObjectName('update_site_group_box')
|
||||
self.update_site_layout = QtWidgets.QFormLayout(self.update_site_group_box)
|
||||
self.update_site_layout.setObjectName('update_site_layout')
|
||||
self.current_version_label = QtWidgets.QLabel(self.update_site_group_box)
|
||||
self.web_remote_group_box = QtWidgets.QGroupBox(self.left_column)
|
||||
self.web_remote_group_box.setObjectName('web_remote_group_box')
|
||||
self.web_remote_layout = QtWidgets.QGridLayout(self.web_remote_group_box)
|
||||
self.web_remote_layout.setObjectName('web_remote_layout')
|
||||
self.current_version_label = QtWidgets.QLabel(self.web_remote_group_box)
|
||||
self.web_remote_layout.addWidget(self.current_version_label, 0, 0)
|
||||
self.current_version_label.setObjectName('current_version_label')
|
||||
self.current_version_value = QtWidgets.QLabel(self.update_site_group_box)
|
||||
self.current_version_value = QtWidgets.QLabel(self.web_remote_group_box)
|
||||
self.current_version_value.setObjectName('current_version_value')
|
||||
self.update_site_layout.addRow(self.current_version_label, self.current_version_value)
|
||||
self.master_version_label = QtWidgets.QLabel(self.update_site_group_box)
|
||||
self.web_remote_layout.addWidget(self.current_version_value, 0, 1)
|
||||
self.upgrade_button = QtWidgets.QPushButton(self.web_remote_group_box)
|
||||
self.upgrade_button.setEnabled(False)
|
||||
self.upgrade_button.setObjectName('upgrade_button')
|
||||
self.web_remote_layout.addWidget(self.upgrade_button, 0, 2)
|
||||
self.master_version_label = QtWidgets.QLabel(self.web_remote_group_box)
|
||||
self.master_version_label.setObjectName('master_version_label')
|
||||
self.master_version_value = QtWidgets.QLabel(self.update_site_group_box)
|
||||
self.web_remote_layout.addWidget(self.master_version_label, 1, 0)
|
||||
self.master_version_value = QtWidgets.QLabel(self.web_remote_group_box)
|
||||
self.master_version_value.setObjectName('master_version_value')
|
||||
self.update_site_layout.addRow(self.master_version_label, self.master_version_value)
|
||||
self.left_layout.addWidget(self.update_site_group_box)
|
||||
self.web_remote_layout.addWidget(self.master_version_value, 1, 1)
|
||||
self.check_version_button = QtWidgets.QPushButton(self.web_remote_group_box)
|
||||
self.check_version_button.setObjectName('check_version_button')
|
||||
self.web_remote_layout.addWidget(self.check_version_button, 1, 2)
|
||||
self.left_layout.addWidget(self.web_remote_group_box)
|
||||
self.app_group_box = QtWidgets.QGroupBox(self.right_column)
|
||||
self.app_group_box.setObjectName('app_group_box')
|
||||
self.right_layout.addWidget(self.app_group_box)
|
||||
@ -152,11 +164,13 @@ class ApiTab(SettingsTab):
|
||||
self.twelve_hour_check_box.stateChanged.connect(self.on_twelve_hour_check_box_changed)
|
||||
self.thumbnails_check_box.stateChanged.connect(self.on_thumbnails_check_box_changed)
|
||||
self.address_edit.textChanged.connect(self.set_urls)
|
||||
self.upgrade_button.clicked.connect(self.on_upgrade_button_clicked)
|
||||
self.check_version_button.clicked.connect(self.on_check_version_button_clicked)
|
||||
|
||||
def retranslate_ui(self):
|
||||
self.tab_title_visible = translate('RemotePlugin.RemoteTab', 'Remote Interface')
|
||||
self.server_settings_group_box.setTitle(translate('RemotePlugin.RemoteTab', 'Server Settings'))
|
||||
self.address_label.setText(translate('RemotePlugin.RemoteTab', 'Serve on IP address:'))
|
||||
self.address_label.setText(translate('RemotePlugin.RemoteTab', 'IP address:'))
|
||||
self.port_label.setText(translate('RemotePlugin.RemoteTab', 'Port number:'))
|
||||
self.remote_url_label.setText(translate('RemotePlugin.RemoteTab', 'Remote URL:'))
|
||||
self.stage_url_label.setText(translate('RemotePlugin.RemoteTab', 'Stage view URL:'))
|
||||
@ -171,12 +185,14 @@ class ApiTab(SettingsTab):
|
||||
'Scan the QR code or click <a href="{qr}">download</a> to download an app for your mobile device'
|
||||
).format(qr='https://openlp.org/#mobile-app-downloads'))
|
||||
self.user_login_group_box.setTitle(translate('RemotePlugin.RemoteTab', 'User Authentication'))
|
||||
self.aa = UiStrings()
|
||||
self.update_site_group_box.setTitle(UiStrings().WebDownloadText)
|
||||
self.web_remote_group_box.setTitle(translate('RemotePlugin.RemoteTab', 'Web Remote'))
|
||||
self.check_version_button.setText(translate('RemotePlugin.RemoteTab', 'Check for Updates'))
|
||||
self.upgrade_button.setText(translate('RemotePlugin.RemoteTab', 'Upgrade'))
|
||||
self.user_id_label.setText(translate('RemotePlugin.RemoteTab', 'User id:'))
|
||||
self.password_label.setText(translate('RemotePlugin.RemoteTab', 'Password:'))
|
||||
self.current_version_label.setText(translate('RemotePlugin.RemoteTab', 'Current Version number:'))
|
||||
self.master_version_label.setText(translate('RemotePlugin.RemoteTab', 'Latest Version number:'))
|
||||
self.current_version_label.setText(translate('RemotePlugin.RemoteTab', 'Current version:'))
|
||||
self.master_version_label.setText(translate('RemotePlugin.RemoteTab', 'Latest version:'))
|
||||
self.unknown_version = translate('RemotePlugin.RemoteTab', '(unknown)')
|
||||
|
||||
def set_urls(self):
|
||||
"""
|
||||
@ -206,6 +222,13 @@ class ApiTab(SettingsTab):
|
||||
break
|
||||
return ip_address
|
||||
|
||||
def can_enable_upgrade_button(self):
|
||||
"""
|
||||
Do a couple checks to set the upgrade button state
|
||||
"""
|
||||
return self.master_version_value.text() != self.unknown_version and \
|
||||
self.master_version_value.text() != self.current_version_value.text()
|
||||
|
||||
def load(self):
|
||||
"""
|
||||
Load the configuration and update the server configuration if necessary
|
||||
@ -219,10 +242,9 @@ class ApiTab(SettingsTab):
|
||||
self.user_login_group_box.setChecked(self.settings.value(self.settings_section + '/authentication enabled'))
|
||||
self.user_id.setText(self.settings.value(self.settings_section + '/user id'))
|
||||
self.password.setText(self.settings.value(self.settings_section + '/password'))
|
||||
self.current_version_value.setText(self.settings.value('remotes/download version'))
|
||||
self.master_version_value.setText(Registry().get_flag('website_version'))
|
||||
if self.master_version_value.text() == self.current_version_value.text():
|
||||
self.update_site_group_box.setEnabled(False)
|
||||
self.current_version_value.setText(self.settings.value(self.settings_section + '/download version'))
|
||||
self.master_version_value.setText(self.master_version or self.unknown_version)
|
||||
self.upgrade_button.setEnabled(self.can_enable_upgrade_button())
|
||||
self.set_urls()
|
||||
|
||||
def save(self):
|
||||
@ -237,8 +259,6 @@ class ApiTab(SettingsTab):
|
||||
self.settings.setValue(self.settings_section + '/authentication enabled', self.user_login_group_box.isChecked())
|
||||
self.settings.setValue(self.settings_section + '/user id', self.user_id.text())
|
||||
self.settings.setValue(self.settings_section + '/password', self.password.text())
|
||||
if self.update_site_group_box.isChecked():
|
||||
self.settings_form.register_post_process('download_website')
|
||||
|
||||
def on_twelve_hour_check_box_changed(self, check_state):
|
||||
"""
|
||||
@ -257,3 +277,39 @@ class ApiTab(SettingsTab):
|
||||
# we have a set value convert to True/False
|
||||
if check_state == QtCore.Qt.Checked:
|
||||
self.thumbnails = True
|
||||
|
||||
def on_check_version_button_clicked(self):
|
||||
"""
|
||||
Check for the latest version on the server
|
||||
"""
|
||||
app = Registry().get('application')
|
||||
app.set_busy_cursor()
|
||||
app.process_events()
|
||||
version_info = download_version_info()
|
||||
app.process_events()
|
||||
self.master_version_value.setText(version_info['latest']['version'])
|
||||
self.upgrade_button.setEnabled(self.can_enable_upgrade_button())
|
||||
app.process_events()
|
||||
app.set_normal_cursor()
|
||||
app.process_events()
|
||||
if self.can_enable_upgrade_button():
|
||||
Registry().get('main_window').information_message('New version available!',
|
||||
'There\'s a new version of the web remote available.')
|
||||
|
||||
def on_upgrade_button_clicked(self):
|
||||
"""
|
||||
Download/upgrade the web remote
|
||||
"""
|
||||
app = Registry().get('application')
|
||||
progress = DownloadProgressDialog(self, app)
|
||||
progress.show()
|
||||
app.process_events()
|
||||
sleep(0.5)
|
||||
downloaded_version = download_and_check(progress)
|
||||
app.process_events()
|
||||
sleep(0.5)
|
||||
progress.close()
|
||||
app.process_events()
|
||||
self.current_version_value.setText(downloaded_version)
|
||||
self.settings.setValue(self.settings_section + '/download version', downloaded_version)
|
||||
self.upgrade_button.setEnabled(self.can_enable_upgrade_button())
|
||||
|
@ -25,28 +25,41 @@ RESTful API for devices on the network to discover.
|
||||
import socket
|
||||
from time import sleep
|
||||
|
||||
from zeroconf import ServiceInfo, Zeroconf
|
||||
from zeroconf import ServiceInfo, Zeroconf, Error, NonUniqueNameException
|
||||
|
||||
from openlp.core.common import get_network_interfaces
|
||||
from openlp.core.common.i18n import UiStrings
|
||||
from openlp.core.common.registry import Registry
|
||||
from openlp.core.threading import ThreadWorker, run_thread
|
||||
|
||||
|
||||
def _get_error_message(exc):
|
||||
"""
|
||||
Zeroconf doesn't have error messages, so we have to make up our own
|
||||
"""
|
||||
error_message = UiStrings().ZeroconfErrorIntro + '\n\n'
|
||||
if isinstance(exc, NonUniqueNameException):
|
||||
error_message += UiStrings().ZeroconfNonUniqueError
|
||||
else:
|
||||
error_message += UiStrings().ZeroconfGenericError
|
||||
return error_message
|
||||
|
||||
|
||||
class ZeroconfWorker(ThreadWorker):
|
||||
"""
|
||||
This thread worker runs a Zeroconf service
|
||||
"""
|
||||
address = None
|
||||
ip_address = None
|
||||
http_port = 4316
|
||||
ws_port = 4317
|
||||
_can_run = False
|
||||
|
||||
def __init__(self, ip_address, http_port=4316, ws_port=4317):
|
||||
def __init__(self, addresses, http_port=4316, ws_port=4317):
|
||||
"""
|
||||
Create the worker for the Zeroconf service
|
||||
"""
|
||||
super().__init__()
|
||||
self.address = socket.inet_aton(ip_address)
|
||||
self.addresses = addresses
|
||||
self.http_port = http_port
|
||||
self.ws_port = ws_port
|
||||
|
||||
@ -61,20 +74,24 @@ class ZeroconfWorker(ThreadWorker):
|
||||
"""
|
||||
Start the service
|
||||
"""
|
||||
addresses = [socket.inet_aton(addr) for addr in self.addresses]
|
||||
http_info = ServiceInfo('_http._tcp.local.', 'OpenLP._http._tcp.local.',
|
||||
address=self.address, port=self.http_port, properties={})
|
||||
addresses=addresses, port=self.http_port, properties={})
|
||||
ws_info = ServiceInfo('_ws._tcp.local.', 'OpenLP._ws._tcp.local.',
|
||||
address=self.address, port=self.ws_port, properties={})
|
||||
addresses=addresses, port=self.ws_port, properties={})
|
||||
zc = Zeroconf()
|
||||
zc.register_service(http_info)
|
||||
zc.register_service(ws_info)
|
||||
self._can_run = True
|
||||
while self.can_run():
|
||||
sleep(0.1)
|
||||
zc.unregister_service(http_info)
|
||||
zc.unregister_service(ws_info)
|
||||
zc.close()
|
||||
self.quit.emit()
|
||||
try:
|
||||
zc.register_service(http_info)
|
||||
zc.register_service(ws_info)
|
||||
self._can_run = True
|
||||
while self.can_run():
|
||||
sleep(0.1)
|
||||
except Error as e:
|
||||
self.error.emit('Cannot start Zeroconf service', _get_error_message(e))
|
||||
finally:
|
||||
zc.unregister_all_services()
|
||||
zc.close()
|
||||
self.quit.emit()
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
@ -92,6 +109,5 @@ def start_zeroconf():
|
||||
return
|
||||
http_port = Registry().get('settings').value('api/port')
|
||||
ws_port = Registry().get('settings').value('api/websocket port')
|
||||
for name, interface in get_network_interfaces().items():
|
||||
worker = ZeroconfWorker(interface['ip'], http_port, ws_port)
|
||||
run_thread(worker, 'api_zeroconf_{name}'.format(name=name))
|
||||
worker = ZeroconfWorker([iface['ip'] for iface in get_network_interfaces().values()], http_port, ws_port)
|
||||
run_thread(worker, 'api_zeroconf')
|
||||
|
@ -102,9 +102,9 @@ def get_proxy_settings(mode=None):
|
||||
return {'http': http_value, 'https': https_value}
|
||||
|
||||
|
||||
def get_user_agent():
|
||||
def get_random_user_agent():
|
||||
"""
|
||||
Return a user agent customised for the platform the user is on.
|
||||
Return a random user agent customised for the platform the user is on.
|
||||
"""
|
||||
browser_list = USER_AGENTS.get(sys.platform, None)
|
||||
if not browser_list:
|
||||
@ -113,6 +113,13 @@ def get_user_agent():
|
||||
return browser_list[random_index]
|
||||
|
||||
|
||||
def get_openlp_user_agent():
|
||||
"""
|
||||
Return the OpenLP user agent
|
||||
"""
|
||||
return 'OpenLP/' + Registry().get('application-qt').applicationVersion()
|
||||
|
||||
|
||||
def get_web_page(url, headers=None, update_openlp=False, proxy=None):
|
||||
"""
|
||||
Attempts to download the webpage at url and returns that page or None.
|
||||
@ -128,7 +135,7 @@ def get_web_page(url, headers=None, update_openlp=False, proxy=None):
|
||||
if not headers:
|
||||
headers = {}
|
||||
if 'user-agent' not in [key.lower() for key in headers.keys()]:
|
||||
headers['User-Agent'] = get_user_agent()
|
||||
headers['User-Agent'] = get_random_user_agent()
|
||||
if not isinstance(proxy, dict):
|
||||
proxy = get_proxy_settings(mode=proxy)
|
||||
log.debug('Downloading URL = %s' % url)
|
||||
@ -207,7 +214,7 @@ def download_file(update_object, url, file_path, sha256=None, proxy=None):
|
||||
hasher = hashlib.sha256()
|
||||
# Download until finished or canceled.
|
||||
for chunk in response.iter_content(chunk_size=block_size):
|
||||
if hasattr(update_object, 'was_cancelled') and update_object.was_cancelled:
|
||||
if hasattr(update_object, 'is_cancelled') and update_object.is_cancelled:
|
||||
break
|
||||
saved_file.write(chunk)
|
||||
if sha256:
|
||||
@ -233,7 +240,7 @@ def download_file(update_object, url, file_path, sha256=None, proxy=None):
|
||||
retries += 1
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
if hasattr(update_object, 'was_cancelled') and update_object.was_cancelled and file_path.exists():
|
||||
if hasattr(update_object, 'is_cancelled') and update_object.is_cancelled and file_path.exists():
|
||||
file_path.unlink()
|
||||
return True
|
||||
|
||||
@ -251,21 +258,21 @@ class DownloadWorker(ThreadWorker):
|
||||
"""
|
||||
self._base_url = base_url
|
||||
self._file_name = file_name
|
||||
self.was_cancelled = False
|
||||
self.is_cancelled = False
|
||||
super().__init__()
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Download the url to the temporary directory
|
||||
"""
|
||||
if self.was_cancelled:
|
||||
if self.is_cancelled:
|
||||
self.quit.emit()
|
||||
return
|
||||
try:
|
||||
dest_path = Path(gettempdir()) / 'openlp' / self._file_name
|
||||
url = '{url}{name}'.format(url=self._base_url, name=self._file_name)
|
||||
is_success = download_file(self, url, dest_path)
|
||||
if is_success and not self.was_cancelled:
|
||||
if is_success and not self.is_cancelled:
|
||||
self.download_succeeded.emit(dest_path)
|
||||
else:
|
||||
self.download_failed.emit()
|
||||
@ -280,4 +287,4 @@ class DownloadWorker(ThreadWorker):
|
||||
"""
|
||||
A slot to allow the download to be cancelled from outside of the thread
|
||||
"""
|
||||
self.was_cancelled = True
|
||||
self.is_cancelled = True
|
||||
|
@ -452,6 +452,10 @@ class UiStrings(metaclass=Singleton):
|
||||
self.ViewMode = translate('OpenLP.Ui', 'View Mode')
|
||||
self.Video = translate('OpenLP.Ui', 'Video')
|
||||
self.WebDownloadText = translate('OpenLP.Ui', 'Web Interface, Download and Install latest Version')
|
||||
self.ZeroconfErrorIntro = translate('OpenLP.Ui', 'There was a problem avertising OpenLP\'s remote '
|
||||
'interface on the network:')
|
||||
self.ZeroconfGenericError = translate('OpenLP.Ui', 'An unknown error occurred')
|
||||
self.ZeroconfNonUniqueError = translate('OpenLP.Ui', 'OpenLP already seems to be advertising itself')
|
||||
book_chapter = translate('OpenLP.Ui', 'Book Chapter')
|
||||
chapter = translate('OpenLP.Ui', 'Chapter')
|
||||
verse = translate('OpenLP.Ui', 'Verse')
|
||||
|
@ -194,6 +194,7 @@ class Settings(QtCore.QSettings):
|
||||
'api/authentication enabled': False,
|
||||
'api/ip address': '0.0.0.0',
|
||||
'api/thumbnails': True,
|
||||
'api/download version': '0.0',
|
||||
'bibles/db type': 'sqlite',
|
||||
'bibles/db username': '',
|
||||
'bibles/db password': '',
|
||||
@ -271,7 +272,6 @@ class Settings(QtCore.QSettings):
|
||||
'media/vlc arguments': '',
|
||||
'media/live volume': 50,
|
||||
'media/preview volume': 0,
|
||||
'remotes/download version': '0.0',
|
||||
'players/background color': '#000000',
|
||||
'planningcenter/status': PluginStatus.Inactive,
|
||||
'planningcenter/application_id': '',
|
||||
|
@ -32,6 +32,7 @@ class ThreadWorker(QtCore.QObject, LogMixin):
|
||||
The :class:`~openlp.core.threading.ThreadWorker` class provides a base class for all worker objects
|
||||
"""
|
||||
quit = QtCore.pyqtSignal()
|
||||
error = QtCore.pyqtSignal(str, str)
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
@ -51,6 +52,7 @@ def run_thread(worker, thread_name, can_start=True):
|
||||
if not thread_name:
|
||||
raise ValueError('A thread_name is required when calling the "run_thread" function')
|
||||
application = Registry().get('application')
|
||||
main_window = Registry().get('main_window')
|
||||
if thread_name in application.worker_threads:
|
||||
raise KeyError('A thread with the name "{}" has already been created, please use another'.format(thread_name))
|
||||
# Create the thread and add the thread and the worker to the parent
|
||||
@ -65,6 +67,7 @@ def run_thread(worker, thread_name, can_start=True):
|
||||
thread.started.connect(worker.start)
|
||||
worker.quit.connect(thread.quit)
|
||||
worker.quit.connect(worker.deleteLater)
|
||||
worker.error.connect(main_window.error_message)
|
||||
thread.finished.connect(thread.deleteLater)
|
||||
thread.finished.connect(make_remove_thread(thread_name))
|
||||
if can_start:
|
||||
|
@ -19,9 +19,10 @@
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
##########################################################################
|
||||
""" Patch the QFileDialog so it accepts and returns Path objects"""
|
||||
from PyQt5 import QtWidgets
|
||||
from PyQt5 import QtCore, QtWidgets
|
||||
|
||||
from openlp.core.common.path import path_to_str, replace_params, str_to_path
|
||||
from openlp.core.common.i18n import UiStrings, translate
|
||||
|
||||
|
||||
class FileDialog(QtWidgets.QFileDialog):
|
||||
@ -107,3 +108,42 @@ class FileDialog(QtWidgets.QFileDialog):
|
||||
# getSaveFileName returns a tuple. The first item represents the path as a str. The string is empty if the user
|
||||
# cancels the dialog.
|
||||
return str_to_path(file_name), selected_filter
|
||||
|
||||
|
||||
class DownloadProgressDialog(QtWidgets.QProgressDialog):
|
||||
"""
|
||||
Local class to handle download display based and supporting httputils:get_web_page
|
||||
"""
|
||||
def __init__(self, parent, app):
|
||||
super(DownloadProgressDialog, self).__init__(parent)
|
||||
self.parent = parent
|
||||
self.app = app
|
||||
self.setWindowModality(QtCore.Qt.WindowModal)
|
||||
self.setWindowTitle(translate('OpenLP.RemotePlugin', 'Importing Website'))
|
||||
self.setLabelText(UiStrings().StartingImport)
|
||||
self.setCancelButton(None)
|
||||
self.setRange(0, 1)
|
||||
self.setMinimumDuration(0)
|
||||
self.was_cancelled = False
|
||||
self.previous_size = 0
|
||||
|
||||
def update_progress(self, count, block_size):
|
||||
"""
|
||||
Calculate and display the download progress.
|
||||
"""
|
||||
increment = (count * block_size) - self.previous_size
|
||||
self._increment_progress_bar(None, increment)
|
||||
self.previous_size = count * block_size
|
||||
|
||||
def _increment_progress_bar(self, status_text, increment=1):
|
||||
"""
|
||||
Update the wizard progress page.
|
||||
|
||||
:param status_text: Current status information to display.
|
||||
:param increment: The value to increment the progress bar by.
|
||||
"""
|
||||
if status_text:
|
||||
self.setText(status_text)
|
||||
if increment > 0:
|
||||
self.setValue(self.value() + increment)
|
||||
self.app.process_events()
|
||||
|
@ -18,130 +18,142 @@
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
##########################################################################
|
||||
import os
|
||||
import shutil
|
||||
import json
|
||||
from pathlib import Path
|
||||
from tempfile import mkdtemp
|
||||
from unittest import TestCase, skip
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from openlp.core.api.deploy import deploy_zipfile, download_and_check, download_sha256
|
||||
from openlp.core.api.deploy import REMOTE_URL, deploy_zipfile, download_version_info, download_and_check
|
||||
|
||||
|
||||
CONFIG_FILE = '2c266badff1e3d140664c50fd1460a2b332b24d5ad8c267fa62e506b5eb6d894 deploy/site.zip\n2017_06_27'
|
||||
CONFIG_FILE = '{"latest": {"version": "0.1", "filename": "remote-0.1.zip", "sha256": "", "size": 854039}}'
|
||||
CONFIG_DICT = json.loads(CONFIG_FILE)
|
||||
|
||||
|
||||
class TestRemoteDeploy(TestCase):
|
||||
@patch('openlp.core.api.deploy.ZipFile')
|
||||
def test_deploy_zipfile(MockZipFile):
|
||||
"""
|
||||
Test the Remote plugin deploy functions
|
||||
Remote Deploy tests - test the dummy zip file is processed correctly
|
||||
"""
|
||||
# GIVEN: A new downloaded zip file
|
||||
mocked_zipfile = MagicMock()
|
||||
MockZipFile.return_value = mocked_zipfile
|
||||
root_path = Path('/') / 'tmp' / 'remotes'
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
Setup for tests
|
||||
"""
|
||||
self.app_root_path = Path(mkdtemp())
|
||||
# WHEN: deploy_zipfile() is called
|
||||
deploy_zipfile(root_path, 'site.zip')
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
Clean up after tests
|
||||
"""
|
||||
shutil.rmtree(self.app_root_path)
|
||||
# THEN: the zip file should have been extracted to the right location
|
||||
MockZipFile.assert_called_once_with(Path('/tmp/remotes/site.zip'))
|
||||
mocked_zipfile.extractall.assert_called_once_with(Path('/tmp/remotes'))
|
||||
|
||||
@patch('openlp.core.api.deploy.ZipFile')
|
||||
def test_deploy_zipfile(self, MockZipFile):
|
||||
"""
|
||||
Remote Deploy tests - test the dummy zip file is processed correctly
|
||||
"""
|
||||
# GIVEN: A new downloaded zip file
|
||||
mocked_zipfile = MagicMock()
|
||||
MockZipFile.return_value = mocked_zipfile
|
||||
root_path_str = '{sep}tmp{sep}remotes'.format(sep=os.sep)
|
||||
root_path = Path(root_path_str)
|
||||
|
||||
# WHEN: deploy_zipfile() is called
|
||||
deploy_zipfile(root_path, 'site.zip')
|
||||
@patch('openlp.core.api.deploy.get_openlp_user_agent')
|
||||
@patch('openlp.core.api.deploy.get_web_page')
|
||||
def test_download_version_info_connection_error(mocked_get_web_page, mocked_get_openlp_user_agent):
|
||||
"""
|
||||
Test that if a ConnectionError occurs while downloading a sha256 False is returned
|
||||
"""
|
||||
# GIVEN: A bunch of mocks
|
||||
mocked_get_web_page.side_effect = ConnectionError()
|
||||
mocked_get_openlp_user_agent.return_value = 'OpenLP'
|
||||
|
||||
# THEN: the zip file should have been extracted to the right location
|
||||
MockZipFile.assert_called_once_with(Path('/tmp/remotes/site.zip'))
|
||||
mocked_zipfile.extractall.assert_called_once_with(Path('/tmp/remotes'))
|
||||
# WHEN: download_sha256() is called
|
||||
result = download_version_info()
|
||||
|
||||
@skip('Broken and being refactored')
|
||||
@patch('openlp.core.api.deploy.Registry')
|
||||
@patch('openlp.core.api.deploy.get_web_page')
|
||||
def test_download_sha256_connection_error(self, mocked_get_web_page, MockRegistry):
|
||||
"""
|
||||
Test that if a ConnectionError occurs while downloading a sha256 False is returned
|
||||
"""
|
||||
# GIVEN: A bunch of mocks
|
||||
MockRegistry.return_value.get.return_value.applicationVersion.return_value = '1.0'
|
||||
mocked_get_web_page.side_effect = ConnectionError()
|
||||
# THEN: The result should be False
|
||||
assert result is False, 'download_version_info() should return False when encountering ConnectionError'
|
||||
|
||||
# WHEN: download_sha256() is called
|
||||
result = download_sha256()
|
||||
|
||||
# THEN: The result should be False
|
||||
assert result is False, 'download_sha256() should return False when encountering ConnectionError'
|
||||
@patch('openlp.core.api.deploy.get_openlp_user_agent')
|
||||
@patch('openlp.core.api.deploy.get_web_page')
|
||||
def test_download_version_info_empty_file(mocked_get_web_page, mocked_get_openlp_user_agent):
|
||||
"""
|
||||
Test that if there's no config when downloading a sha256 None is returned
|
||||
"""
|
||||
# GIVEN: A bunch of mocks
|
||||
mocked_get_web_page.return_value = None
|
||||
mocked_get_openlp_user_agent.return_value = 'OpenLP'
|
||||
|
||||
@skip('Broken and being refactored')
|
||||
@patch('openlp.core.api.deploy.Registry')
|
||||
@patch('openlp.core.api.deploy.get_web_page')
|
||||
def test_download_sha256_no_config(self, mocked_get_web_page, MockRegistry):
|
||||
"""
|
||||
Test that if there's no config when downloading a sha256 None is returned
|
||||
"""
|
||||
# GIVEN: A bunch of mocks
|
||||
MockRegistry.return_value.get.return_value.applicationVersion.return_value = '1.0'
|
||||
mocked_get_web_page.return_value = None
|
||||
# WHEN: download_sha256() is called
|
||||
result = download_version_info()
|
||||
|
||||
# WHEN: download_sha256() is called
|
||||
result = download_sha256()
|
||||
# THEN: The result should be Nonw
|
||||
assert result is None, 'download_version_info() should return None when there is a problem downloading the page'
|
||||
|
||||
# THEN: The result should be Nonw
|
||||
assert result is None, 'download_sha256() should return None when there is a problem downloading the page'
|
||||
|
||||
@skip('Broken and being refactored')
|
||||
@patch('openlp.core.api.deploy.Registry')
|
||||
@patch('openlp.core.api.deploy.get_web_page')
|
||||
def test_download_sha256(self, mocked_get_web_page, MockRegistry):
|
||||
"""
|
||||
Test that the sha256 and the version are returned
|
||||
"""
|
||||
# GIVEN: A bunch of mocks
|
||||
MockRegistry.return_value.get.return_value.applicationVersion.return_value = '1.0'
|
||||
mocked_get_web_page.return_value = CONFIG_FILE
|
||||
@patch('openlp.core.api.deploy.get_openlp_user_agent')
|
||||
@patch('openlp.core.api.deploy.get_web_page')
|
||||
def test_download_version_info(mocked_get_web_page, mocked_get_openlp_user_agent):
|
||||
"""
|
||||
Test that the sha256 and the version are returned
|
||||
"""
|
||||
# GIVEN: A bunch of mocks
|
||||
mocked_get_web_page.return_value = CONFIG_FILE
|
||||
mocked_get_openlp_user_agent.return_value = 'OpenLP'
|
||||
|
||||
# WHEN: download_sha256() is called
|
||||
result = download_sha256()
|
||||
# WHEN: download_sha256() is called
|
||||
result = download_version_info()
|
||||
|
||||
# THEN: The result should be Nonw
|
||||
assert result == ('2c266badff1e3d140664c50fd1460a2b332b24d5ad8c267fa62e506b5eb6d894', '2017_06_27'), \
|
||||
'download_sha256() should return a tuple of sha256 and version'
|
||||
# THEN: The result should be Nonw
|
||||
assert result == CONFIG_DICT, 'download_version_info() should return a dictionary of version information'
|
||||
|
||||
@skip('Broken and being refactored')
|
||||
@patch('openlp.core.api.deploy.Registry')
|
||||
@patch('openlp.core.api.deploy.download_sha256')
|
||||
@patch('openlp.core.api.deploy.get_url_file_size')
|
||||
@patch('openlp.core.api.deploy.download_file')
|
||||
@patch('openlp.core.api.deploy.AppLocation.get_section_data_path')
|
||||
@patch('openlp.core.api.deploy.deploy_zipfile')
|
||||
def test_download_and_check(self, mocked_deploy_zipfile, mocked_get_data_path, mocked_download_file,
|
||||
mocked_get_url_file_size, mocked_download_sha256, MockRegistry):
|
||||
# GIVEN: A bunch of mocks
|
||||
mocked_get_data_path.return_value = Path('/tmp/remotes')
|
||||
mocked_download_file.return_value = True
|
||||
mocked_get_url_file_size.return_value = 5
|
||||
mocked_download_sha256.return_value = ('asdfgh', '0.1')
|
||||
MockRegistry.return_value.get.return_value.applicationVersion.return_value = '1.0'
|
||||
mocked_callback = MagicMock()
|
||||
|
||||
# WHEN: download_and_check() is called
|
||||
download_and_check(mocked_callback)
|
||||
@patch('openlp.core.api.deploy.log.warning')
|
||||
@patch('openlp.core.api.deploy.download_version_info')
|
||||
def test_download_and_check_log_warning(mocked_download_version_info, mocked_warning):
|
||||
"""
|
||||
Test that when the version info fails, a warning is logged
|
||||
"""
|
||||
# GIVEN: A few mocks, and a version info of None
|
||||
mocked_download_version_info.return_value = None
|
||||
|
||||
# THEN: The correct things should have been done
|
||||
mocked_download_sha256.assert_called_once_with()
|
||||
mocked_get_url_file_size.assert_called_once_with('https://get.openlp.org/webclient/site.zip')
|
||||
mocked_callback.setRange.assert_called_once_with(0, 5)
|
||||
mocked_download_file.assert_called_once_with(mocked_callback, 'https://get.openlp.org/webclient/site.zip',
|
||||
Path('/tmp/remotes/site.zip'), sha256='asdfgh')
|
||||
mocked_deploy_zipfile.assert_called_once_with(Path('/tmp/remotes'), 'site.zip')
|
||||
# WHEN: download_and_check is run
|
||||
result = download_and_check(None)
|
||||
|
||||
# THEN: None is returned and a warning is logged
|
||||
assert result is None, 'The result should be None'
|
||||
mocked_warning.assert_called_once_with('Unable to access the version information, abandoning download')
|
||||
|
||||
|
||||
@patch('openlp.core.api.deploy.AppLocation.get_section_data_path')
|
||||
@patch('openlp.core.api.deploy.download_file')
|
||||
@patch('openlp.core.api.deploy.download_version_info')
|
||||
def test_download_and_check_download_fails(mocked_download_version_info, mocked_download_file,
|
||||
mocked_get_section_data_path):
|
||||
"""
|
||||
Test that when the version info fails, a warning is logged
|
||||
"""
|
||||
# GIVEN: A few mocks
|
||||
mocked_callback = MagicMock()
|
||||
mocked_download_version_info.return_value = CONFIG_DICT
|
||||
mocked_download_file.return_value = False
|
||||
mocked_get_section_data_path.return_value = Path('.')
|
||||
|
||||
# WHEN: download_and_check is run
|
||||
result = download_and_check(mocked_callback)
|
||||
|
||||
# THEN: None is returned and a warning is logged
|
||||
assert result is None, 'The result should be None'
|
||||
|
||||
|
||||
@patch('openlp.core.api.deploy.AppLocation.get_section_data_path')
|
||||
@patch('openlp.core.api.deploy.deploy_zipfile')
|
||||
@patch('openlp.core.api.deploy.download_file')
|
||||
@patch('openlp.core.api.deploy.download_version_info')
|
||||
def test_download_and_check(mocked_download_version_info, mocked_download_file, mocked_deploy_zipfile,
|
||||
mocked_get_section_data_path):
|
||||
# GIVEN: A bunch of mocks
|
||||
mocked_callback = MagicMock()
|
||||
mocked_download_version_info.return_value = CONFIG_DICT
|
||||
mocked_download_file.return_value = True
|
||||
mocked_remote_path = Path('/') / 'tmp' / 'remotes'
|
||||
mocked_remote_zip = mocked_remote_path / 'remote.zip'
|
||||
mocked_get_section_data_path.return_value = mocked_remote_path
|
||||
|
||||
# WHEN: download_and_check() is called
|
||||
result = download_and_check(mocked_callback)
|
||||
|
||||
# THEN: The correct things should have been done
|
||||
assert result == CONFIG_DICT['latest']['version'], 'The correct version is returned'
|
||||
mocked_download_file.assert_called_once_with(mocked_callback, REMOTE_URL + '0.1/remote-0.1.zip', mocked_remote_zip)
|
||||
mocked_deploy_zipfile.assert_called_once_with(mocked_remote_path, 'remote.zip')
|
||||
|
@ -28,7 +28,7 @@ from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from openlp.core.common.httputils import ProxyMode, download_file, get_proxy_settings, get_url_file_size, \
|
||||
get_user_agent, get_web_page
|
||||
get_random_user_agent, get_web_page
|
||||
|
||||
|
||||
@pytest.yield_fixture
|
||||
@ -39,7 +39,7 @@ def temp_file(settings):
|
||||
os.remove(tmp_file)
|
||||
|
||||
|
||||
def test_get_user_agent_linux():
|
||||
def test_get_random_user_agent_linux():
|
||||
"""
|
||||
Test that getting a user agent on Linux returns a user agent suitable for Linux
|
||||
"""
|
||||
@ -48,15 +48,15 @@ def test_get_user_agent_linux():
|
||||
# GIVEN: The system is Linux
|
||||
mocked_sys.platform = 'linux2'
|
||||
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
# WHEN: We call get_random_user_agent()
|
||||
user_agent = get_random_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
result = 'Linux' in user_agent or 'CrOS' in user_agent
|
||||
assert result is True, 'The user agent should be a valid Linux user agent'
|
||||
|
||||
|
||||
def test_get_user_agent_windows():
|
||||
def test_get_random_user_agent_windows():
|
||||
"""
|
||||
Test that getting a user agent on Windows returns a user agent suitable for Windows
|
||||
"""
|
||||
@ -65,14 +65,14 @@ def test_get_user_agent_windows():
|
||||
# GIVEN: The system is Windows
|
||||
mocked_sys.platform = 'win32'
|
||||
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
# WHEN: We call get_random_user_agent()
|
||||
user_agent = get_random_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
assert 'Windows' in user_agent, 'The user agent should be a valid Windows user agent'
|
||||
|
||||
|
||||
def test_get_user_agent_macos():
|
||||
def test_get_random_user_agent_macos():
|
||||
"""
|
||||
Test that getting a user agent on OS X returns a user agent suitable for OS X
|
||||
"""
|
||||
@ -81,14 +81,14 @@ def test_get_user_agent_macos():
|
||||
# GIVEN: The system is macOS
|
||||
mocked_sys.platform = 'darwin'
|
||||
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
# WHEN: We call get_random_user_agent()
|
||||
user_agent = get_random_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
assert 'Mac OS X' in user_agent, 'The user agent should be a valid OS X user agent'
|
||||
|
||||
|
||||
def test_get_user_agent_default():
|
||||
def test_get_random_user_agent_default():
|
||||
"""
|
||||
Test that getting a user agent on a non-Linux/Windows/OS X platform returns the default user agent
|
||||
"""
|
||||
@ -97,8 +97,8 @@ def test_get_user_agent_default():
|
||||
# GIVEN: The system is something else
|
||||
mocked_sys.platform = 'freebsd'
|
||||
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
# WHEN: We call get_random_user_agent()
|
||||
user_agent = get_random_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
assert 'NetBSD'in user_agent, 'The user agent should be the default user agent'
|
||||
@ -119,15 +119,15 @@ def test_get_web_page_no_url():
|
||||
|
||||
|
||||
@patch('openlp.core.common.httputils.requests')
|
||||
@patch('openlp.core.common.httputils.get_user_agent')
|
||||
@patch('openlp.core.common.httputils.get_random_user_agent')
|
||||
@patch('openlp.core.common.httputils.Registry')
|
||||
def test_get_web_page(MockRegistry, mocked_get_user_agent, mocked_requests):
|
||||
def test_get_web_page(MockRegistry, mocked_get_random_user_agent, mocked_requests):
|
||||
"""
|
||||
Test that the get_web_page method works correctly
|
||||
"""
|
||||
# GIVEN: Mocked out objects and a fake URL
|
||||
mocked_requests.get.return_value = MagicMock(text='text')
|
||||
mocked_get_user_agent.return_value = 'user_agent'
|
||||
mocked_get_random_user_agent.return_value = 'user_agent'
|
||||
fake_url = 'this://is.a.fake/url'
|
||||
|
||||
# WHEN: The get_web_page() method is called
|
||||
@ -136,20 +136,20 @@ def test_get_web_page(MockRegistry, mocked_get_user_agent, mocked_requests):
|
||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||
mocked_requests.get.assert_called_once_with(fake_url, headers={'User-Agent': 'user_agent'},
|
||||
proxies=None, timeout=30.0)
|
||||
mocked_get_user_agent.assert_called_once_with()
|
||||
mocked_get_random_user_agent.assert_called_once_with()
|
||||
assert MockRegistry.call_count == 1, 'The Registry() object should have been called once'
|
||||
assert returned_page == 'text', 'The returned page should be the mock object'
|
||||
|
||||
|
||||
@patch('openlp.core.common.httputils.requests')
|
||||
@patch('openlp.core.common.httputils.get_user_agent')
|
||||
def test_get_web_page_with_header(mocked_get_user_agent, mocked_requests, settings):
|
||||
@patch('openlp.core.common.httputils.get_random_user_agent')
|
||||
def test_get_web_page_with_header(mocked_get_random_user_agent, mocked_requests, settings):
|
||||
"""
|
||||
Test that adding a header to the call to get_web_page() adds the header to the request
|
||||
"""
|
||||
# GIVEN: Mocked out objects, a fake URL and a fake header
|
||||
mocked_requests.get.return_value = MagicMock(text='text')
|
||||
mocked_get_user_agent.return_value = 'user_agent'
|
||||
mocked_get_random_user_agent.return_value = 'user_agent'
|
||||
fake_url = 'this://is.a.fake/url'
|
||||
fake_headers = {'Fake-Header': 'fake value'}
|
||||
|
||||
@ -161,13 +161,13 @@ def test_get_web_page_with_header(mocked_get_user_agent, mocked_requests, settin
|
||||
expected_headers.update({'User-Agent': 'user_agent'})
|
||||
mocked_requests.get.assert_called_once_with(fake_url, headers=expected_headers,
|
||||
proxies=None, timeout=30.0)
|
||||
mocked_get_user_agent.assert_called_with()
|
||||
mocked_get_random_user_agent.assert_called_with()
|
||||
assert returned_page == 'text', 'The returned page should be the mock object'
|
||||
|
||||
|
||||
@patch('openlp.core.common.httputils.requests')
|
||||
@patch('openlp.core.common.httputils.get_user_agent')
|
||||
def test_get_web_page_with_user_agent_in_headers(mocked_get_user_agent, mocked_requests, settings):
|
||||
@patch('openlp.core.common.httputils.get_random_user_agent')
|
||||
def test_get_web_page_with_user_agent_in_headers(mocked_get_random_user_agent, mocked_requests, settings):
|
||||
"""
|
||||
Test that adding a user agent in the header when calling get_web_page() adds that user agent to the request
|
||||
"""
|
||||
@ -182,20 +182,20 @@ def test_get_web_page_with_user_agent_in_headers(mocked_get_user_agent, mocked_r
|
||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||
mocked_requests.get.assert_called_once_with(fake_url, headers=user_agent_headers,
|
||||
proxies=None, timeout=30.0)
|
||||
assert mocked_get_user_agent.call_count == 0, 'get_user_agent() should not have been called'
|
||||
assert mocked_get_random_user_agent.call_count == 0, 'get_random_user_agent() should not have been called'
|
||||
assert returned_page == 'text', 'The returned page should be "test"'
|
||||
|
||||
|
||||
@patch('openlp.core.common.httputils.requests')
|
||||
@patch('openlp.core.common.httputils.get_user_agent')
|
||||
@patch('openlp.core.common.httputils.get_random_user_agent')
|
||||
@patch('openlp.core.common.httputils.Registry')
|
||||
def test_get_web_page_update_openlp(MockRegistry, mocked_get_user_agent, mocked_requests):
|
||||
def test_get_web_page_update_openlp(MockRegistry, mocked_get_random_user_agent, mocked_requests):
|
||||
"""
|
||||
Test that passing "update_openlp" as true to get_web_page calls Registry().get('app').process_events()
|
||||
"""
|
||||
# GIVEN: Mocked out objects, a fake URL
|
||||
mocked_requests.get.return_value = MagicMock(text='text')
|
||||
mocked_get_user_agent.return_value = 'user_agent'
|
||||
mocked_get_random_user_agent.return_value = 'user_agent'
|
||||
mocked_registry_object = MagicMock()
|
||||
mocked_application_object = MagicMock()
|
||||
mocked_registry_object.get.return_value = mocked_application_object
|
||||
@ -208,7 +208,7 @@ def test_get_web_page_update_openlp(MockRegistry, mocked_get_user_agent, mocked_
|
||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||
mocked_requests.get.assert_called_once_with(fake_url, headers={'User-Agent': 'user_agent'},
|
||||
proxies=None, timeout=30.0)
|
||||
mocked_get_user_agent.assert_called_once_with()
|
||||
mocked_get_random_user_agent.assert_called_once_with()
|
||||
mocked_registry_object.get.assert_called_with('application')
|
||||
mocked_application_object.process_events.assert_called_with()
|
||||
assert returned_page == 'text', 'The returned page should be the mock object'
|
||||
|
@ -40,8 +40,10 @@ def mocked_set_icon(mock_settings):
|
||||
q_thread_patcher = patch('openlp.core.ui.firsttimeform.QtCore.QThread').start()
|
||||
mocked_app = MagicMock()
|
||||
mocked_app.worker_threads = {}
|
||||
mocked_main_window = MagicMock()
|
||||
Registry().remove('application')
|
||||
Registry().register('application', mocked_app)
|
||||
Registry().register('main_window', mocked_main_window)
|
||||
yield set_icon_patcher
|
||||
move_to_thread_patcher.stop()
|
||||
set_icon_patcher.stop()
|
||||
|
@ -23,18 +23,14 @@ from unittest.mock import MagicMock, call, patch
|
||||
from openlp.core.api.zeroconf import ZeroconfWorker, start_zeroconf
|
||||
|
||||
|
||||
@patch('openlp.core.api.zeroconf.socket.inet_aton')
|
||||
def test_zeroconf_worker_constructor(mocked_inet_aton):
|
||||
def test_zeroconf_worker_constructor():
|
||||
"""Test creating the Zeroconf worker object"""
|
||||
# GIVEN: A ZeroconfWorker class and a mocked inet_aton
|
||||
mocked_inet_aton.return_value = 'processed_ip'
|
||||
|
||||
# GIVEN: A ZeroconfWorker class
|
||||
# WHEN: An instance of the ZeroconfWorker is created
|
||||
worker = ZeroconfWorker('127.0.0.1', 8000, 8001)
|
||||
worker = ZeroconfWorker(['127.0.0.1'], 8000, 8001)
|
||||
|
||||
# THEN: The inet_aton function should have been called and the attrs should be set
|
||||
mocked_inet_aton.assert_called_once_with('127.0.0.1')
|
||||
assert worker.address == 'processed_ip'
|
||||
assert worker.addresses == ['127.0.0.1']
|
||||
assert worker.http_port == 8000
|
||||
assert worker.ws_port == 8001
|
||||
|
||||
@ -49,7 +45,7 @@ def test_zeroconf_worker_start(MockedZeroconf, MockedServiceInfo):
|
||||
mocked_zc = MagicMock()
|
||||
MockedServiceInfo.side_effect = [mocked_http_info, mocked_ws_info]
|
||||
MockedZeroconf.return_value = mocked_zc
|
||||
worker = ZeroconfWorker('127.0.0.1', 8000, 8001)
|
||||
worker = ZeroconfWorker(['127.0.0.1'], 8000, 8001)
|
||||
|
||||
# WHEN: The start() method is called
|
||||
with patch.object(worker, 'can_run') as mocked_can_run:
|
||||
@ -58,20 +54,22 @@ def test_zeroconf_worker_start(MockedZeroconf, MockedServiceInfo):
|
||||
|
||||
# THEN: The correct calls are made
|
||||
assert MockedServiceInfo.call_args_list == [
|
||||
call('_http._tcp.local.', 'OpenLP._http._tcp.local.', address=b'\x7f\x00\x00\x01', port=8000, properties={}),
|
||||
call('_ws._tcp.local.', 'OpenLP._ws._tcp.local.', address=b'\x7f\x00\x00\x01', port=8001, properties={})
|
||||
call('_http._tcp.local.', 'OpenLP._http._tcp.local.', addresses=[b'\x7f\x00\x00\x01'], port=8000,
|
||||
properties={}),
|
||||
call('_ws._tcp.local.', 'OpenLP._ws._tcp.local.', addresses=[b'\x7f\x00\x00\x01'], port=8001,
|
||||
properties={})
|
||||
]
|
||||
assert MockedZeroconf.call_count == 1
|
||||
assert mocked_zc.register_service.call_args_list == [call(mocked_http_info), call(mocked_ws_info)]
|
||||
assert mocked_can_run.call_count == 2
|
||||
assert mocked_zc.unregister_service.call_args_list == [call(mocked_http_info), call(mocked_ws_info)]
|
||||
assert mocked_zc.close.call_count == 1
|
||||
mocked_zc.unregister_all_services.assert_called_once_with()
|
||||
mocked_zc.close.assert_called_once_with()
|
||||
|
||||
|
||||
def test_zeroconf_worker_stop():
|
||||
"""Test that the ZeroconfWorker.stop() method correctly stops the service"""
|
||||
# GIVEN: A worker object with _can_run set to True
|
||||
worker = ZeroconfWorker('127.0.0.1', 8000, 8001)
|
||||
worker = ZeroconfWorker(['127.0.0.1'], 8000, 8001)
|
||||
worker._can_run = True
|
||||
|
||||
# WHEN: stop() is called
|
||||
@ -106,4 +104,4 @@ def test_start_zeroconf(mocked_run_thread, MockedZeroconfWorker, MockedRegistry,
|
||||
start_zeroconf()
|
||||
|
||||
# THEN: A worker is added to the list of threads
|
||||
mocked_run_thread.assert_called_once_with(mocked_worker, 'api_zeroconf_eth0')
|
||||
mocked_run_thread.assert_called_once_with(mocked_worker, 'api_zeroconf')
|
||||
|
Loading…
Reference in New Issue
Block a user