Fix bug #1742910 by moving the threads to the application object instead of the main window object.

Add this to your merge proposal:
--------------------------------------------------------------------------------
lp:~raoul-snyman/openlp/bug-1742910 (revision 2810)
https://ci.openlp.io/job/Branch-01-Pull/2418/                          [SUCCESS]
https://ci.openlp.io/job/Branch-02a-Linux-Tests/2319/                  [SUCCESS]
https://ci.openlp.io/job/Branch-02b-macOS-Tests/114/                ...

bzr-revno: 2808
This commit is contained in:
Raoul Snyman 2018-01-13 08:17:04 +00:00 committed by Tim Bentley
commit 6263e6654a
10 changed files with 322 additions and 183 deletions

View File

@ -63,8 +63,8 @@ class OpenLP(QtWidgets.QApplication, LogMixin):
The core application class. This class inherits from Qt's QApplication The core application class. This class inherits from Qt's QApplication
class in order to provide the core of the application. class in order to provide the core of the application.
""" """
args = [] args = []
worker_threads = {}
def exec(self): def exec(self):
""" """

View File

@ -26,9 +26,9 @@ from contextlib import suppress
from openlp.core.common import is_win from openlp.core.common import is_win
if is_win(): if is_win():
from pathlib import WindowsPath as PathVariant from pathlib import WindowsPath as PathVariant # pragma: nocover
else: else:
from pathlib import PosixPath as PathVariant from pathlib import PosixPath as PathVariant # pragma: nocover
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View File

@ -50,12 +50,12 @@ def run_thread(worker, thread_name, can_start=True):
""" """
if not thread_name: if not thread_name:
raise ValueError('A thread_name is required when calling the "run_thread" function') raise ValueError('A thread_name is required when calling the "run_thread" function')
main_window = Registry().get('main_window') application = Registry().get('application')
if thread_name in main_window.threads: if thread_name in application.worker_threads:
raise KeyError('A thread with the name "{}" has already been created, please use another'.format(thread_name)) raise KeyError('A thread with the name "{}" has already been created, please use another'.format(thread_name))
# Create the thread and add the thread and the worker to the parent # Create the thread and add the thread and the worker to the parent
thread = QtCore.QThread() thread = QtCore.QThread()
main_window.threads[thread_name] = { application.worker_threads[thread_name] = {
'thread': thread, 'thread': thread,
'worker': worker 'worker': worker
} }
@ -78,7 +78,10 @@ def get_thread_worker(thread_name):
:param str thread_name: The name of the thread :param str thread_name: The name of the thread
:returns: The worker for this thread name :returns: The worker for this thread name
""" """
return Registry().get('main_window').threads.get(thread_name) thread_info = Registry().get('application').worker_threads.get(thread_name)
if not thread_info:
raise KeyError('No thread named "{}" exists'.format(thread_name))
return thread_info.get('worker')
def is_thread_finished(thread_name): def is_thread_finished(thread_name):
@ -88,8 +91,8 @@ def is_thread_finished(thread_name):
:param str thread_name: The name of the thread :param str thread_name: The name of the thread
:returns: True if the thread is finished, False if it is still running :returns: True if the thread is finished, False if it is still running
""" """
main_window = Registry().get('main_window') app = Registry().get('application')
return thread_name not in main_window.threads or main_window.threads[thread_name]['thread'].isFinished() return thread_name not in app.worker_threads or app.worker_threads[thread_name]['thread'].isFinished()
def make_remove_thread(thread_name): def make_remove_thread(thread_name):
@ -99,13 +102,14 @@ def make_remove_thread(thread_name):
:param str thread_name: The name of the thread which should be removed from the thread registry. :param str thread_name: The name of the thread which should be removed from the thread registry.
:returns: A function which will remove the thread from the thread registry. :returns: A function which will remove the thread from the thread registry.
""" """
def remove_thread():
def remove_thread(): # pragma: nocover
""" """
Stop and remove a registered thread Stop and remove a registered thread
:param str thread_name: The name of the thread to stop and remove :param str thread_name: The name of the thread to stop and remove
""" """
main_window = Registry().get('main_window') application = Registry().get('application')
if thread_name in main_window.threads: if thread_name in application.worker_threads:
del main_window.threads[thread_name] del application.worker_threads[thread_name]
return remove_thread return remove_thread

View File

@ -477,7 +477,6 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, RegistryProperties):
""" """
super(MainWindow, self).__init__() super(MainWindow, self).__init__()
Registry().register('main_window', self) Registry().register('main_window', self)
self.threads = {}
self.clipboard = self.application.clipboard() self.clipboard = self.application.clipboard()
self.arguments = ''.join(self.application.args) self.arguments = ''.join(self.application.args)
# Set up settings sections for the main application (not for use by plugins). # Set up settings sections for the main application (not for use by plugins).
@ -557,11 +556,11 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, RegistryProperties):
wait_dialog.setAutoClose(False) wait_dialog.setAutoClose(False)
wait_dialog.setCancelButton(None) wait_dialog.setCancelButton(None)
wait_dialog.show() wait_dialog.show()
for thread_name in self.threads.keys(): for thread_name in self.application.worker_threads.keys():
log.debug('Waiting for thread %s', thread_name) log.debug('Waiting for thread %s', thread_name)
self.application.processEvents() self.application.processEvents()
thread = self.threads[thread_name]['thread'] thread = self.application.worker_threads[thread_name]['thread']
worker = self.threads[thread_name]['worker'] worker = self.application.worker_threads[thread_name]['worker']
try: try:
if worker and hasattr(worker, 'stop'): if worker and hasattr(worker, 'stop'):
# If the worker has a stop method, run it # If the worker has a stop method, run it

View File

@ -22,38 +22,40 @@
""" """
Functional tests to test the API Error Class. Functional tests to test the API Error Class.
""" """
from openlp.core.api.http.errors import HttpError, NotFound, ServerError
from unittest import TestCase
from openlp.core.api.http.errors import NotFound, ServerError
class TestApiError(TestCase): def test_http_error():
""" """
A test suite to test out the Error in the API code Test the HTTPError class
""" """
def test_not_found(self): # GIVEN: An HTTPError class
# WHEN: An instance is created
error = HttpError(400, 'Access Denied')
# THEN: The to_response() method should return the correct information
assert error.to_response() == ('Access Denied', 400), 'to_response() should have returned the correct info'
def test_not_found():
""" """
Test the Not Found error displays the correct information Test the Not Found error displays the correct information
""" """
# GIVEN: # GIVEN: A NotFound class
# WHEN: I raise an exception # WHEN: An instance is created
with self.assertRaises(Exception) as context: error = NotFound()
raise NotFound()
# THEN: we get an error and a status # THEN: The to_response() method should return the correct information
assert 'Not Found' == context.exception.message, 'A Not Found exception should be thrown' assert error.to_response() == ('Not Found', 404), 'to_response() should have returned the correct info'
assert 404 == context.exception.status, 'A 404 status should be thrown'
def test_server_error(self):
def test_server_error():
""" """
Test the server error displays the correct information Test the server error displays the correct information
""" """
# GIVEN: # GIVEN: A ServerError class
# WHEN: I raise an exception # WHEN: An instance of the class is created
with self.assertRaises(Exception) as context: error = ServerError()
raise ServerError()
# THEN: we get an error and a status # THEN: The to_response() method should return the correct information
assert'Server Error' == context.exception.message, 'A Not Found exception should be thrown' assert error.to_response() == ('Server Error', 500), 'to_response() should have returned the correct info'
assert 500 == context.exception.status, 'A 500 status should be thrown'

View File

@ -21,11 +21,12 @@
############################################################################### ###############################################################################
from tempfile import mkdtemp from tempfile import mkdtemp
from unittest import TestCase from unittest import TestCase
from unittest.mock import MagicMock, patch
from openlp.core.api.deploy import deploy_zipfile from openlp.core.api.deploy import deploy_zipfile, download_sha256, download_and_check
from openlp.core.common.path import Path, copyfile from openlp.core.common.path import Path
TEST_PATH = (Path(__file__).parent / '..' / '..' / '..' / 'resources').resolve() CONFIG_FILE = '2c266badff1e3d140664c50fd1460a2b332b24d5ad8c267fa62e506b5eb6d894 deploy/site.zip\n2017_06_27'
class TestRemoteDeploy(TestCase): class TestRemoteDeploy(TestCase):
@ -45,17 +46,95 @@ class TestRemoteDeploy(TestCase):
""" """
self.app_root_path.rmtree() self.app_root_path.rmtree()
def test_deploy_zipfile(self): @patch('openlp.core.api.deploy.ZipFile')
def test_deploy_zipfile(self, MockZipFile):
""" """
Remote Deploy tests - test the dummy zip file is processed correctly Remote Deploy tests - test the dummy zip file is processed correctly
""" """
# GIVEN: A new downloaded zip file # GIVEN: A new downloaded zip file
zip_path = TEST_PATH / 'remotes' / 'site.zip' mocked_zipfile = MagicMock()
app_root_path = self.app_root_path / 'site.zip' MockZipFile.return_value = mocked_zipfile
copyfile(zip_path, app_root_path) root_path = Path('/tmp/remotes')
# WHEN: I process the zipfile # WHEN: deploy_zipfile() is called
deploy_zipfile(self.app_root_path, 'site.zip') deploy_zipfile(root_path, 'site.zip')
# THEN: test if www directory has been created # THEN: the zip file should have been extracted to the right location
assert (self.app_root_path / 'www').is_dir(), 'We should have a www directory' MockZipFile.assert_called_once_with('/tmp/remotes/site.zip')
mocked_zipfile.extractall.assert_called_once_with('/tmp/remotes')
@patch('openlp.core.api.deploy.Registry')
@patch('openlp.core.api.deploy.get_web_page')
def test_download_sha256_connection_error(self, mocked_get_web_page, MockRegistry):
"""
Test that if a ConnectionError occurs while downloading a sha256 False is returned
"""
# GIVEN: A bunch of mocks
MockRegistry.return_value.get.return_value.applicationVersion.return_value = '1.0'
mocked_get_web_page.side_effect = ConnectionError()
# WHEN: download_sha256() is called
result = download_sha256()
# THEN: The result should be False
assert result is False, 'download_sha256() should return False when encountering ConnectionError'
@patch('openlp.core.api.deploy.Registry')
@patch('openlp.core.api.deploy.get_web_page')
def test_download_sha256_no_config(self, mocked_get_web_page, MockRegistry):
"""
Test that if there's no config when downloading a sha256 None is returned
"""
# GIVEN: A bunch of mocks
MockRegistry.return_value.get.return_value.applicationVersion.return_value = '1.0'
mocked_get_web_page.return_value = None
# WHEN: download_sha256() is called
result = download_sha256()
# THEN: The result should be Nonw
assert result is None, 'download_sha256() should return None when there is a problem downloading the page'
@patch('openlp.core.api.deploy.Registry')
@patch('openlp.core.api.deploy.get_web_page')
def test_download_sha256(self, mocked_get_web_page, MockRegistry):
"""
Test that the sha256 and the version are returned
"""
# GIVEN: A bunch of mocks
MockRegistry.return_value.get.return_value.applicationVersion.return_value = '1.0'
mocked_get_web_page.return_value = CONFIG_FILE
# WHEN: download_sha256() is called
result = download_sha256()
# THEN: The result should be Nonw
assert result == ('2c266badff1e3d140664c50fd1460a2b332b24d5ad8c267fa62e506b5eb6d894', '2017_06_27'), \
'download_sha256() should return a tuple of sha256 and version'
@patch('openlp.core.api.deploy.Registry')
@patch('openlp.core.api.deploy.download_sha256')
@patch('openlp.core.api.deploy.get_url_file_size')
@patch('openlp.core.api.deploy.download_file')
@patch('openlp.core.api.deploy.AppLocation.get_section_data_path')
@patch('openlp.core.api.deploy.deploy_zipfile')
def test_download_and_check(self, mocked_deploy_zipfile, mocked_get_data_path, mocked_download_file,
mocked_get_url_file_size, mocked_download_sha256, MockRegistry):
# GIVEN: A bunch of mocks
mocked_get_data_path.return_value = Path('/tmp/remotes')
mocked_download_file.return_value = True
mocked_get_url_file_size.return_value = 5
mocked_download_sha256.return_value = ('asdfgh', '0.1')
MockRegistry.return_value.get.return_value.applicationVersion.return_value = '1.0'
mocked_callback = MagicMock()
# WHEN: download_and_check() is called
download_and_check(mocked_callback)
# THEN: The correct things should have been done
mocked_download_sha256.assert_called_once_with()
mocked_get_url_file_size.assert_called_once_with('https://get.openlp.org/webclient/site.zip')
mocked_callback.setRange.assert_called_once_with(0, 5)
mocked_download_file.assert_called_once_with(mocked_callback, 'https://get.openlp.org/webclient/site.zip',
Path('/tmp/remotes/site.zip'), sha256='asdfgh')
mocked_deploy_zipfile.assert_called_once_with(Path('/tmp/remotes'), 'site.zip')

View File

@ -27,7 +27,7 @@ from unittest import TestCase
from unittest.mock import ANY, MagicMock, patch from unittest.mock import ANY, MagicMock, patch
from openlp.core.common.path import Path, copy, copyfile, copytree, create_paths, path_to_str, replace_params, \ from openlp.core.common.path import Path, copy, copyfile, copytree, create_paths, path_to_str, replace_params, \
str_to_path, which str_to_path, which, files_to_paths
class TestShutil(TestCase): class TestShutil(TestCase):
@ -401,3 +401,16 @@ class TestPath(TestCase):
except: except:
# THEN: `create_paths` raises an exception # THEN: `create_paths` raises an exception
pass pass
def test_files_to_paths(self):
"""
Test the files_to_paths() method
"""
# GIVEN: A list of string filenames
test_files = ['/tmp/openlp/file1.txt', '/tmp/openlp/file2.txt']
# WHEN: files_to_paths is called
result = files_to_paths(test_files)
# THEN: The result should be a list of Paths
assert result == [Path('/tmp/openlp/file1.txt'), Path('/tmp/openlp/file2.txt')]

View File

@ -1,87 +0,0 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2017 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
Package to test the openlp.core.lib.path package.
"""
import os
from unittest import TestCase
from openlp.core.common.path import Path, path_to_str, str_to_path
class TestPath(TestCase):
"""
Tests for the :mod:`openlp.core.lib.path` module
"""
def test_path_to_str_type_error(self):
"""
Test that `path_to_str` raises a type error when called with an invalid type
"""
# GIVEN: The `path_to_str` function
# WHEN: Calling `path_to_str` with an invalid Type
# THEN: A TypeError should have been raised
with self.assertRaises(TypeError):
path_to_str(str())
def test_path_to_str_none(self):
"""
Test that `path_to_str` correctly converts the path parameter when passed with None
"""
# GIVEN: The `path_to_str` function
# WHEN: Calling the `path_to_str` function with None
result = path_to_str(None)
# THEN: `path_to_str` should return an empty string
assert result == ''
def test_path_to_str_path_object(self):
"""
Test that `path_to_str` correctly converts the path parameter when passed a Path object
"""
# GIVEN: The `path_to_str` function
# WHEN: Calling the `path_to_str` function with a Path object
result = path_to_str(Path('test/path'))
# THEN: `path_to_str` should return a string representation of the Path object
assert result == os.path.join('test', 'path')
def test_str_to_path_type_error(self):
"""
Test that `str_to_path` raises a type error when called with an invalid type
"""
# GIVEN: The `str_to_path` function
# WHEN: Calling `str_to_path` with an invalid Type
# THEN: A TypeError should have been raised
with self.assertRaises(TypeError):
str_to_path(Path())
def test_str_to_path_empty_str(self):
"""
Test that `str_to_path` correctly converts the string parameter when passed with and empty string
"""
# GIVEN: The `str_to_path` function
# WHEN: Calling the `str_to_path` function with None
result = str_to_path('')
# THEN: `path_to_str` should return None
assert result is None

View File

@ -23,14 +23,14 @@
Package to test the openlp.core.lib.ui package. Package to test the openlp.core.lib.ui package.
""" """
from unittest import TestCase from unittest import TestCase
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch, call
from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5 import QtCore, QtGui, QtWidgets
from openlp.core.common.i18n import UiStrings, translate from openlp.core.common.i18n import UiStrings, translate
from openlp.core.lib.ui import add_welcome_page, create_button_box, create_horizontal_adjusting_combo_box, \ from openlp.core.lib.ui import add_welcome_page, create_button_box, create_horizontal_adjusting_combo_box, \
create_button, create_action, create_valign_selection_widgets, find_and_set_in_combo_box, create_widget_action, \ create_button, create_action, create_valign_selection_widgets, find_and_set_in_combo_box, create_widget_action, \
set_case_insensitive_completer set_case_insensitive_completer, critical_error_message_box
class TestUi(TestCase): class TestUi(TestCase):
@ -80,6 +80,34 @@ class TestUi(TestCase):
assert 1 == len(btnbox.buttons()) assert 1 == len(btnbox.buttons())
assert QtWidgets.QDialogButtonBox.HelpRole, btnbox.buttonRole(btnbox.buttons()[0]) assert QtWidgets.QDialogButtonBox.HelpRole, btnbox.buttonRole(btnbox.buttons()[0])
@patch('openlp.core.lib.ui.Registry')
def test_critical_error_message_box(self, MockRegistry):
"""
Test the critical_error_message_box() function
"""
# GIVEN: A mocked Registry
# WHEN: critical_error_message_box() is called
critical_error_message_box('Error', 'This is an error')
# THEN: The error_message() method on the main window should be called
MockRegistry.return_value.get.return_value.error_message.assert_called_once_with('Error', 'This is an error')
@patch('openlp.core.lib.ui.QtWidgets.QMessageBox.critical')
def test_critical_error_question(self, mocked_critical):
"""
Test the critical_error_message_box() function
"""
# GIVEN: A mocked critical() method and some other mocks
mocked_parent = MagicMock()
# WHEN: critical_error_message_box() is called
critical_error_message_box(None, 'This is a question', mocked_parent, True)
# THEN: The error_message() method on the main window should be called
mocked_critical.assert_called_once_with(mocked_parent, 'Error', 'This is a question',
QtWidgets.QMessageBox.StandardButtons(QtWidgets.QMessageBox.Yes |
QtWidgets.QMessageBox.No))
def test_create_horizontal_adjusting_combo_box(self): def test_create_horizontal_adjusting_combo_box(self):
""" """
Test creating a horizontal adjusting combo box Test creating a horizontal adjusting combo box
@ -92,65 +120,64 @@ class TestUi(TestCase):
# THEN: We should get a ComboBox # THEN: We should get a ComboBox
assert isinstance(combo, QtWidgets.QComboBox) assert isinstance(combo, QtWidgets.QComboBox)
assert 'combo1' == combo.objectName() assert combo.objectName() == 'combo1'
assert QtWidgets.QComboBox.AdjustToMinimumContentsLength == combo.sizeAdjustPolicy() assert QtWidgets.QComboBox.AdjustToMinimumContentsLength == combo.sizeAdjustPolicy()
def test_create_button(self): @patch('openlp.core.lib.ui.log')
def test_create_button(self, mocked_log):
""" """
Test creating a button Test creating a button
""" """
# GIVEN: A dialog # GIVEN: A dialog
dialog = QtWidgets.QDialog() dialog = QtWidgets.QDialog()
# WHEN: We create the button
btn = create_button(dialog, 'my_btn')
# THEN: We should get a button with a name
assert isinstance(btn, QtWidgets.QPushButton)
assert 'my_btn' == btn.objectName()
assert btn.isEnabled() is True
# WHEN: We create a button with some attributes # WHEN: We create a button with some attributes
btn = create_button(dialog, 'my_btn', text='Hello', tooltip='How are you?', enabled=False) btn = create_button(dialog, 'my_btn', text='Hello', tooltip='How are you?', enabled=False, role='test', test=1)
# THEN: We should get a button with those attributes # THEN: We should get a button with those attributes
assert isinstance(btn, QtWidgets.QPushButton) assert isinstance(btn, QtWidgets.QPushButton)
assert 'Hello' == btn.text() assert btn.objectName() == 'my_btn'
assert 'How are you?' == btn.toolTip() assert btn.text() == 'Hello'
assert btn.toolTip() == 'How are you?'
assert btn.isEnabled() is False assert btn.isEnabled() is False
assert mocked_log.warning.call_args_list == [call('The role "test" is not defined in create_push_button().'),
call('Parameter test was not consumed in create_button().')]
def test_create_tool_button(self):
"""
Test creating a toolbutton
"""
# GIVEN: A dialog
dialog = QtWidgets.QDialog()
# WHEN: We create a toolbutton # WHEN: We create a toolbutton
btn = create_button(dialog, 'my_btn', btn_class='toolbutton') btn = create_button(dialog, 'my_btn', btn_class='toolbutton')
# THEN: We should get a toolbutton # THEN: We should get a toolbutton
assert isinstance(btn, QtWidgets.QToolButton) assert isinstance(btn, QtWidgets.QToolButton)
assert 'my_btn' == btn.objectName() assert btn.objectName() == 'my_btn'
assert btn.isEnabled() is True assert btn.isEnabled() is True
def test_create_action(self): @patch('openlp.core.lib.ui.log')
def test_create_action(self, mocked_log):
""" """
Test creating an action Test creating an action
""" """
# GIVEN: A dialog # GIVEN: A dialog
dialog = QtWidgets.QDialog() dialog = QtWidgets.QDialog()
# WHEN: We create an action
action = create_action(dialog, 'my_action')
# THEN: We should get a QAction
assert isinstance(action, QtWidgets.QAction)
assert 'my_action' == action.objectName()
# WHEN: We create an action with some properties # WHEN: We create an action with some properties
action = create_action(dialog, 'my_action', text='my text', icon=':/wizards/wizard_firsttime.bmp', action = create_action(dialog, 'my_action', text='my text', icon=':/wizards/wizard_firsttime.bmp',
tooltip='my tooltip', statustip='my statustip') tooltip='my tooltip', statustip='my statustip', test=1)
# THEN: These properties should be set # THEN: These properties should be set
assert isinstance(action, QtWidgets.QAction) assert isinstance(action, QtWidgets.QAction)
assert 'my text' == action.text() assert action.objectName() == 'my_action'
assert action.text() == 'my text'
assert isinstance(action.icon(), QtGui.QIcon) assert isinstance(action.icon(), QtGui.QIcon)
assert 'my tooltip' == action.toolTip() assert action.toolTip() == 'my tooltip'
assert 'my statustip' == action.statusTip() assert action.statusTip() == 'my statustip'
mocked_log.warning.assert_called_once_with('Parameter test was not consumed in create_action().')
def test_create_action_on_mac_osx(self): def test_create_action_on_mac_osx(self):
""" """

View File

@ -22,9 +22,10 @@
""" """
Package to test the openlp.core.threading package. Package to test the openlp.core.threading package.
""" """
from inspect import isfunction
from unittest.mock import MagicMock, call, patch from unittest.mock import MagicMock, call, patch
from openlp.core.version import run_thread from openlp.core.threading import ThreadWorker, run_thread, get_thread_worker, is_thread_finished, make_remove_thread
def test_run_thread_no_name(): def test_run_thread_no_name():
@ -47,9 +48,9 @@ def test_run_thread_exists(MockRegistry):
Test that trying to run a thread with a name that already exists will throw a KeyError Test that trying to run a thread with a name that already exists will throw a KeyError
""" """
# GIVEN: A mocked registry with a main window object # GIVEN: A mocked registry with a main window object
mocked_main_window = MagicMock() mocked_application = MagicMock()
mocked_main_window.threads = {'test_thread': MagicMock()} mocked_application.worker_threads = {'test_thread': MagicMock()}
MockRegistry.return_value.get.return_value = mocked_main_window MockRegistry.return_value.get.return_value = mocked_application
# WHEN: run_thread() is called # WHEN: run_thread() is called
try: try:
@ -66,18 +67,19 @@ def test_run_thread(MockRegistry, MockQThread):
Test that running a thread works correctly Test that running a thread works correctly
""" """
# GIVEN: A mocked registry with a main window object # GIVEN: A mocked registry with a main window object
mocked_main_window = MagicMock() mocked_application = MagicMock()
mocked_main_window.threads = {} mocked_application.worker_threads = {}
MockRegistry.return_value.get.return_value = mocked_main_window MockRegistry.return_value.get.return_value = mocked_application
# WHEN: run_thread() is called # WHEN: run_thread() is called
run_thread(MagicMock(), 'test_thread') run_thread(MagicMock(), 'test_thread')
# THEN: The thread should be in the threads list and the correct methods should have been called # THEN: The thread should be in the threads list and the correct methods should have been called
assert len(mocked_main_window.threads.keys()) == 1, 'There should be 1 item in the list of threads' assert len(mocked_application.worker_threads.keys()) == 1, 'There should be 1 item in the list of threads'
assert list(mocked_main_window.threads.keys()) == ['test_thread'], 'The test_thread item should be in the list' assert list(mocked_application.worker_threads.keys()) == ['test_thread'], \
mocked_worker = mocked_main_window.threads['test_thread']['worker'] 'The test_thread item should be in the list'
mocked_thread = mocked_main_window.threads['test_thread']['thread'] mocked_worker = mocked_application.worker_threads['test_thread']['worker']
mocked_thread = mocked_application.worker_threads['test_thread']['thread']
mocked_worker.moveToThread.assert_called_once_with(mocked_thread) mocked_worker.moveToThread.assert_called_once_with(mocked_thread)
mocked_thread.started.connect.assert_called_once_with(mocked_worker.start) mocked_thread.started.connect.assert_called_once_with(mocked_worker.start)
expected_quit_calls = [call(mocked_thread.quit), call(mocked_worker.deleteLater)] expected_quit_calls = [call(mocked_thread.quit), call(mocked_worker.deleteLater)]
@ -87,3 +89,103 @@ def test_run_thread(MockRegistry, MockQThread):
'The threads finished signal should be connected to its deleteLater slot' 'The threads finished signal should be connected to its deleteLater slot'
assert mocked_thread.finished.connect.call_count == 2, 'The signal should have been connected twice' assert mocked_thread.finished.connect.call_count == 2, 'The signal should have been connected twice'
mocked_thread.start.assert_called_once_with() mocked_thread.start.assert_called_once_with()
def test_thread_worker():
"""
Test that creating a thread worker object and calling start throws and NotImplementedError
"""
# GIVEN: A ThreadWorker class
worker = ThreadWorker()
try:
# WHEN: calling start()
worker.start()
assert False, 'A NotImplementedError should have been thrown'
except NotImplementedError:
# A NotImplementedError should be thrown
pass
except Exception:
assert False, 'A NotImplementedError should have been thrown'
@patch('openlp.core.threading.Registry')
def test_get_thread_worker(MockRegistry):
"""
Test that calling the get_thread_worker() function returns the correct worker
"""
# GIVEN: A mocked thread worker
mocked_worker = MagicMock()
MockRegistry.return_value.get.return_value.worker_threads = {'test_thread': {'worker': mocked_worker}}
# WHEN: get_thread_worker() is called
worker = get_thread_worker('test_thread')
# THEN: The mocked worker is returned
assert worker is mocked_worker, 'The mocked worker should have been returned'
@patch('openlp.core.threading.Registry')
def test_get_thread_worker_mising(MockRegistry):
"""
Test that calling the get_thread_worker() function raises a KeyError if it does not exist
"""
# GIVEN: A mocked thread worker
MockRegistry.return_value.get.return_value.worker_threads = {}
try:
# WHEN: get_thread_worker() is called
get_thread_worker('test_thread')
assert False, 'A KeyError should have been raised'
except KeyError:
# THEN: The mocked worker is returned
pass
except Exception:
assert False, 'A KeyError should have been raised'
@patch('openlp.core.threading.Registry')
def test_is_thread_finished(MockRegistry):
"""
Test the is_thread_finished() function
"""
# GIVEN: A mock thread and worker
mocked_thread = MagicMock()
mocked_thread.isFinished.return_value = False
MockRegistry.return_value.get.return_value.worker_threads = {'test': {'thread': mocked_thread}}
# WHEN: is_thread_finished() is called
result = is_thread_finished('test')
# THEN: The result should be correct
assert result is False, 'is_thread_finished should have returned False'
@patch('openlp.core.threading.Registry')
def test_is_thread_finished_missing(MockRegistry):
"""
Test that calling the is_thread_finished() function returns True if the thread doesn't exist
"""
# GIVEN: A mocked thread worker
MockRegistry.return_value.get.return_value.worker_threads = {}
# WHEN: get_thread_worker() is called
result = is_thread_finished('test_thread')
# THEN: The result should be correct
assert result is True, 'is_thread_finished should return True when a thread is missing'
def test_make_remove_thread():
"""
Test the make_remove_thread() function
"""
# GIVEN: A thread name
thread_name = 'test_thread'
# WHEN: make_remove_thread() is called
rm_func = make_remove_thread(thread_name)
# THEN: The result should be a function
assert isfunction(rm_func), 'make_remove_thread should return a function'
assert rm_func.__name__ == 'remove_thread'