Refactor threads to use new openlp.core.threading module

This commit is contained in:
Raoul Snyman 2017-12-20 07:17:07 -07:00
parent 9bbd3fa72d
commit 2fa88b17db
10 changed files with 117 additions and 106 deletions

View File

@ -39,28 +39,21 @@ from openlp.core.api.http import application
from openlp.core.api.poll import Poller from openlp.core.api.poll import Poller
from openlp.core.common.applocation import AppLocation from openlp.core.common.applocation import AppLocation
from openlp.core.common.i18n import UiStrings from openlp.core.common.i18n import UiStrings
from openlp.core.common.i18n import translate
from openlp.core.common.mixins import LogMixin, RegistryProperties from openlp.core.common.mixins import LogMixin, RegistryProperties
from openlp.core.common.path import create_paths from openlp.core.common.path import create_paths
from openlp.core.common.registry import Registry, RegistryBase from openlp.core.common.registry import Registry, RegistryBase
from openlp.core.common.settings import Settings from openlp.core.common.settings import Settings
from openlp.core.common.i18n import translate from openlp.core.threading import ThreadWorker, run_thread
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class HttpWorker(QtCore.QObject): class HttpWorker(ThreadWorker):
""" """
A special Qt thread class to allow the HTTP server to run at the same time as the UI. A special Qt thread class to allow the HTTP server to run at the same time as the UI.
""" """
def __init__(self): def start(self):
"""
Constructor for the thread class.
:param server: The http server class.
"""
super(HttpWorker, self).__init__()
def run(self):
""" """
Run the thread. Run the thread.
""" """
@ -71,9 +64,7 @@ class HttpWorker(QtCore.QObject):
serve(application, host=address, port=port) serve(application, host=address, port=port)
except OSError: except OSError:
log.exception('An error occurred when serving the application.') log.exception('An error occurred when serving the application.')
self.quit.emit()
def stop(self):
pass
class HttpServer(RegistryBase, RegistryProperties, LogMixin): class HttpServer(RegistryBase, RegistryProperties, LogMixin):
@ -86,11 +77,8 @@ class HttpServer(RegistryBase, RegistryProperties, LogMixin):
""" """
super(HttpServer, self).__init__(parent) super(HttpServer, self).__init__(parent)
if Registry().get_flag('no_web_server'): if Registry().get_flag('no_web_server'):
self.worker = HttpWorker() worker = HttpWorker()
self.thread = QtCore.QThread() run_thread(worker, 'http_server')
self.worker.moveToThread(self.thread)
self.thread.started.connect(self.worker.run)
self.thread.start()
Registry().register_function('download_website', self.first_time) Registry().register_function('download_website', self.first_time)
Registry().register_function('get_website_version', self.website_version) Registry().register_function('get_website_version', self.website_version)
Registry().set_flag('website_version', '0.0') Registry().set_flag('website_version', '0.0')

View File

@ -34,11 +34,12 @@ from PyQt5 import QtCore
from openlp.core.common.mixins import LogMixin, RegistryProperties from openlp.core.common.mixins import LogMixin, RegistryProperties
from openlp.core.common.registry import Registry from openlp.core.common.registry import Registry
from openlp.core.common.settings import Settings from openlp.core.common.settings import Settings
from openlp.core.threading import ThreadWorker, run_thread
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class WebSocketWorker(QtCore.QObject): class WebSocketWorker(ThreadWorker):
""" """
A special Qt thread class to allow the WebSockets server to run at the same time as the UI. A special Qt thread class to allow the WebSockets server to run at the same time as the UI.
""" """
@ -49,15 +50,20 @@ class WebSocketWorker(QtCore.QObject):
:param server: The http server class. :param server: The http server class.
""" """
self.ws_server = server self.ws_server = server
super(WebSocketWorker, self).__init__() super().__init__()
def run(self): def start(self):
""" """
Run the thread. Run the worker.
""" """
self.ws_server.start_server() self.ws_server.start_server()
self.quit.emit()
@QtCore.pyqtSlot()
def stop(self): def stop(self):
"""
Stop the websocket server
"""
self.ws_server.stop = True self.ws_server.stop = True
@ -72,11 +78,8 @@ class WebSocketServer(RegistryProperties, LogMixin):
super(WebSocketServer, self).__init__() super(WebSocketServer, self).__init__()
if Registry().get_flag('no_web_server'): if Registry().get_flag('no_web_server'):
self.settings_section = 'api' self.settings_section = 'api'
self.worker = WebSocketWorker(self) worker = WebSocketWorker(self)
self.thread = QtCore.QThread() run_thread(worker, 'websocket_server')
self.worker.moveToThread(self.thread)
self.thread.started.connect(self.worker.run)
self.thread.start()
def start_server(self): def start_server(self):
""" """

View File

@ -35,13 +35,14 @@ from openlp.core.common.registry import Registry
from openlp.core.common.settings import Settings from openlp.core.common.settings import Settings
from openlp.core.display.screens import ScreenList from openlp.core.display.screens import ScreenList
from openlp.core.lib import resize_image, image_to_byte from openlp.core.lib import resize_image, image_to_byte
from openlp.core.threading import ThreadWorker, run_thread
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class ImageThread(QtCore.QThread): class ImageWorker(ThreadWorker):
""" """
A special Qt thread class to speed up the display of images. This is threaded so it loads the frames and generates A thread worker class to speed up the display of images. This is threaded so it loads the frames and generates
byte stream in background. byte stream in background.
""" """
def __init__(self, manager): def __init__(self, manager):
@ -51,14 +52,15 @@ class ImageThread(QtCore.QThread):
``manager`` ``manager``
The image manager. The image manager.
""" """
super(ImageThread, self).__init__(None) super().__init__()
self.image_manager = manager self.image_manager = manager
def run(self): def start(self):
""" """
Run the thread. Run the thread.
""" """
self.image_manager.process() self.image_manager.process()
self.quit.emit()
class Priority(object): class Priority(object):
@ -179,7 +181,6 @@ class ImageManager(QtCore.QObject):
self.width = current_screen['size'].width() self.width = current_screen['size'].width()
self.height = current_screen['size'].height() self.height = current_screen['size'].height()
self._cache = {} self._cache = {}
self.image_thread = ImageThread(self)
self._conversion_queue = PriorityQueue() self._conversion_queue = PriorityQueue()
self.stop_manager = False self.stop_manager = False
Registry().register_function('images_regenerate', self.process_updates) Registry().register_function('images_regenerate', self.process_updates)
@ -230,9 +231,13 @@ class ImageManager(QtCore.QObject):
""" """
Flush the queue to updated any data to update Flush the queue to updated any data to update
""" """
# We want only one thread. try:
if not self.image_thread.isRunning(): worker = ImageWorker(self)
self.image_thread.start() run_thread(worker, 'image_manager')
except KeyError:
# run_thread() will throw a KeyError if this thread already exists, so ignore it so that we don't
# try to start another thread when one is already running
pass
def get_image(self, path, source, width=-1, height=-1): def get_image(self, path, source, width=-1, height=-1):
""" """

View File

@ -27,6 +27,19 @@ from PyQt5 import QtCore
from openlp.core.common.registry import Registry from openlp.core.common.registry import Registry
class ThreadWorker(QtCore.QObject):
"""
The :class:`~openlp.core.threading.ThreadWorker` class provides a base class for all worker objects
"""
quit = QtCore.pyqtSignal()
def start(self):
"""
The start method is how the worker runs. Basically, put your code here.
"""
raise NotImplementedError('Your base class needs to override this method and run self.quit.emit() at the end.')
def run_thread(worker, thread_name, can_start=True): def run_thread(worker, thread_name, can_start=True):
""" """
Create a thread and assign a worker to it. This removes a lot of boilerplate code from the codebase. Create a thread and assign a worker to it. This removes a lot of boilerplate code from the codebase.
@ -58,6 +71,27 @@ def run_thread(worker, thread_name, can_start=True):
thread.start() thread.start()
def get_thread_worker(thread_name):
"""
Get the worker by the thread name
:param str thread_name: The name of the thread
:returns ThreadWorker: The worker for this thread name
"""
return Registry().get('main_window').threads.get(thread_name)
def is_thread_finished(thread_name):
"""
Check if a thread is finished running.
:param str thread_name: The name of the thread
:returns bool: True if the thread is finished, False if it is still running
"""
main_window = Registry().get('main_window')
return thread_name not in main_window.threads or main_window.threads[thread_name]['thread'].isFinished()
def make_remove_thread(thread_name): def make_remove_thread(thread_name):
""" """
Create a function to remove the thread once the thread is finished. Create a function to remove the thread once the thread is finished.

View File

@ -36,20 +36,21 @@ from PyQt5 import QtCore, QtWidgets
from openlp.core.common import clean_button_text, trace_error_handler from openlp.core.common import clean_button_text, trace_error_handler
from openlp.core.common.applocation import AppLocation from openlp.core.common.applocation import AppLocation
from openlp.core.common.httputils import get_web_page, get_url_file_size, url_get_file, CONNECTION_TIMEOUT
from openlp.core.common.i18n import translate from openlp.core.common.i18n import translate
from openlp.core.common.path import Path, create_paths
from openlp.core.common.mixins import RegistryProperties from openlp.core.common.mixins import RegistryProperties
from openlp.core.common.path import Path, create_paths
from openlp.core.common.registry import Registry from openlp.core.common.registry import Registry
from openlp.core.common.settings import Settings from openlp.core.common.settings import Settings
from openlp.core.lib import PluginStatus, build_icon from openlp.core.lib import PluginStatus, build_icon
from openlp.core.lib.ui import critical_error_message_box from openlp.core.lib.ui import critical_error_message_box
from openlp.core.common.httputils import get_web_page, get_url_file_size, url_get_file, CONNECTION_TIMEOUT from openlp.core.threading import ThreadWorker, run_thread, get_thread_worker, is_thread_finished
from .firsttimewizard import UiFirstTimeWizard, FirstTimePage from openlp.core.ui.firsttimewizard import UiFirstTimeWizard, FirstTimePage
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class ThemeScreenshotWorker(QtCore.QObject): class ThemeScreenshotWorker(ThreadWorker):
""" """
This thread downloads a theme's screenshot This thread downloads a theme's screenshot
""" """
@ -67,11 +68,11 @@ class ThemeScreenshotWorker(QtCore.QObject):
self.sha256 = sha256 self.sha256 = sha256
self.screenshot = screenshot self.screenshot = screenshot
socket.setdefaulttimeout(CONNECTION_TIMEOUT) socket.setdefaulttimeout(CONNECTION_TIMEOUT)
super(ThemeScreenshotWorker, self).__init__() super().__init__()
def run(self): def start(self):
""" """
Overridden method to run the thread. Run the worker
""" """
if self.was_download_cancelled: if self.was_download_cancelled:
return return
@ -80,9 +81,10 @@ class ThemeScreenshotWorker(QtCore.QObject):
os.path.join(gettempdir(), 'openlp', self.screenshot)) os.path.join(gettempdir(), 'openlp', self.screenshot))
# Signal that the screenshot has been downloaded # Signal that the screenshot has been downloaded
self.screenshot_downloaded.emit(self.title, self.filename, self.sha256) self.screenshot_downloaded.emit(self.title, self.filename, self.sha256)
except: except: # noqa
log.exception('Unable to download screenshot') log.exception('Unable to download screenshot')
finally: finally:
self.quit.emit()
self.finished.emit() self.finished.emit()
@QtCore.pyqtSlot(bool) @QtCore.pyqtSlot(bool)
@ -145,12 +147,13 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
return FirstTimePage.Progress return FirstTimePage.Progress
elif self.currentId() == FirstTimePage.Themes: elif self.currentId() == FirstTimePage.Themes:
self.application.set_busy_cursor() self.application.set_busy_cursor()
while not all([thread.isFinished() for thread in self.theme_screenshot_threads]): while not all([is_thread_finished(thread_name) for thread_name in self.theme_screenshot_threads]):
time.sleep(0.1) time.sleep(0.1)
self.application.process_events() self.application.process_events()
# Build the screenshot icons, as this can not be done in the thread. # Build the screenshot icons, as this can not be done in the thread.
self._build_theme_screenshots() self._build_theme_screenshots()
self.application.set_normal_cursor() self.application.set_normal_cursor()
self.theme_screenshot_threads = []
return FirstTimePage.Defaults return FirstTimePage.Defaults
else: else:
return self.get_next_page_id() return self.get_next_page_id()
@ -171,7 +174,6 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
self.screens = screens self.screens = screens
self.was_cancelled = False self.was_cancelled = False
self.theme_screenshot_threads = [] self.theme_screenshot_threads = []
self.theme_screenshot_workers = []
self.has_run_wizard = False self.has_run_wizard = False
def _download_index(self): def _download_index(self):
@ -256,14 +258,10 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
sha256 = self.config.get('theme_{theme}'.format(theme=theme), 'sha256', fallback='') sha256 = self.config.get('theme_{theme}'.format(theme=theme), 'sha256', fallback='')
screenshot = self.config.get('theme_{theme}'.format(theme=theme), 'screenshot') screenshot = self.config.get('theme_{theme}'.format(theme=theme), 'screenshot')
worker = ThemeScreenshotWorker(self.themes_url, title, filename, sha256, screenshot) worker = ThemeScreenshotWorker(self.themes_url, title, filename, sha256, screenshot)
self.theme_screenshot_workers.append(worker)
worker.screenshot_downloaded.connect(self.on_screenshot_downloaded) worker.screenshot_downloaded.connect(self.on_screenshot_downloaded)
thread = QtCore.QThread(self) thread_name = 'theme_screenshot_{title}'.format(title=title)
self.theme_screenshot_threads.append(thread) run_thread(worker, thread_name)
thread.started.connect(worker.run) self.theme_screenshot_threads.append(thread_name)
worker.finished.connect(thread.quit)
worker.moveToThread(thread)
thread.start()
self.application.process_events() self.application.process_events()
def set_defaults(self): def set_defaults(self):
@ -353,9 +351,11 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
Process the triggering of the cancel button. Process the triggering of the cancel button.
""" """
self.was_cancelled = True self.was_cancelled = True
if self.theme_screenshot_workers: if self.theme_screenshot_threads:
for worker in self.theme_screenshot_workers: for thread_name in self.theme_screenshot_threads:
worker.set_download_canceled(True) worker = get_thread_worker(thread_name)
if worker:
worker.set_download_canceled(True)
# Was the thread created. # Was the thread created.
if self.theme_screenshot_threads: if self.theme_screenshot_threads:
while any([thread.isRunning() for thread in self.theme_screenshot_threads]): while any([thread.isRunning() for thread in self.theme_screenshot_threads]):

View File

@ -1071,8 +1071,8 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, RegistryProperties):
:param save_settings: Switch to prevent saving settings. Defaults to **True**. :param save_settings: Switch to prevent saving settings. Defaults to **True**.
""" """
self.image_manager.stop_manager = True self.image_manager.stop_manager = True
while self.image_manager.image_thread.isRunning(): # while self.image_manager.image_thread.isRunning():
time.sleep(0.1) # time.sleep(0.1)
if save_settings: if save_settings:
if Settings().value('advanced/save current plugin'): if Settings().value('advanced/save current plugin'):
Settings().setValue('advanced/current media plugin', self.media_tool_box.currentIndex()) Settings().setValue('advanced/current media plugin', self.media_tool_box.currentIndex())

View File

@ -31,6 +31,7 @@ from PyQt5 import QtCore, QtMultimedia, QtMultimediaWidgets
from openlp.core.common.i18n import translate from openlp.core.common.i18n import translate
from openlp.core.ui.media import MediaState from openlp.core.ui.media import MediaState
from openlp.core.ui.media.mediaplayer import MediaPlayer from openlp.core.ui.media.mediaplayer import MediaPlayer
from openlp.core.threading import ThreadWorker, run_thread, is_thread_finished
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -294,39 +295,38 @@ class SystemPlayer(MediaPlayer):
:param path: Path to file to be checked :param path: Path to file to be checked
:return: True if file can be played otherwise False :return: True if file can be played otherwise False
""" """
thread = QtCore.QThread()
check_media_worker = CheckMediaWorker(path) check_media_worker = CheckMediaWorker(path)
check_media_worker.setVolume(0) check_media_worker.setVolume(0)
check_media_worker.moveToThread(thread) run_thread(check_media_worker, 'check_media')
check_media_worker.finished.connect(thread.quit) while not is_thread_finished('check_media'):
thread.started.connect(check_media_worker.play)
thread.start()
while thread.isRunning():
self.application.processEvents() self.application.processEvents()
return check_media_worker.result return check_media_worker.result
class CheckMediaWorker(QtMultimedia.QMediaPlayer): class CheckMediaWorker(QtMultimedia.QMediaPlayer, ThreadWorker):
""" """
Class used to check if a media file is playable Class used to check if a media file is playable
""" """
finished = QtCore.pyqtSignal()
def __init__(self, path): def __init__(self, path):
super(CheckMediaWorker, self).__init__(None, QtMultimedia.QMediaPlayer.VideoSurface) super(CheckMediaWorker, self).__init__(None, QtMultimedia.QMediaPlayer.VideoSurface)
self.result = None self.path = path
def start(self):
"""
Start the thread worker
"""
self.result = None
self.error.connect(functools.partial(self.signals, 'error')) self.error.connect(functools.partial(self.signals, 'error'))
self.mediaStatusChanged.connect(functools.partial(self.signals, 'media')) self.mediaStatusChanged.connect(functools.partial(self.signals, 'media'))
self.setMedia(QtMultimedia.QMediaContent(QtCore.QUrl.fromLocalFile(self.path)))
self.setMedia(QtMultimedia.QMediaContent(QtCore.QUrl.fromLocalFile(path))) self.play()
def signals(self, origin, status): def signals(self, origin, status):
if origin == 'media' and status == self.BufferedMedia: if origin == 'media' and status == self.BufferedMedia:
self.result = True self.result = True
self.stop() self.stop()
self.finished.emit() self.quit.emit()
elif origin == 'error' and status != self.NoError: elif origin == 'error' and status != self.NoError:
self.result = False self.result = False
self.stop() self.stop()
self.finished.emit() self.quit.emit()

View File

@ -35,7 +35,7 @@ from PyQt5 import QtCore
from openlp.core.common.applocation import AppLocation from openlp.core.common.applocation import AppLocation
from openlp.core.common.settings import Settings from openlp.core.common.settings import Settings
from openlp.core.threading import run_thread from openlp.core.threading import ThreadWorker, run_thread
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -44,14 +44,13 @@ CONNECTION_TIMEOUT = 30
CONNECTION_RETRIES = 2 CONNECTION_RETRIES = 2
class VersionWorker(QtCore.QObject): class VersionWorker(ThreadWorker):
""" """
A worker class to fetch the version of OpenLP from the website. This is run from within a thread so that it A worker class to fetch the version of OpenLP from the website. This is run from within a thread so that it
doesn't affect the loading time of OpenLP. doesn't affect the loading time of OpenLP.
""" """
new_version = QtCore.pyqtSignal(dict) new_version = QtCore.pyqtSignal(dict)
no_internet = QtCore.pyqtSignal() no_internet = QtCore.pyqtSignal()
quit = QtCore.pyqtSignal()
def __init__(self, last_check_date, current_version): def __init__(self, last_check_date, current_version):
""" """
@ -112,18 +111,18 @@ def update_check_date():
Settings().setValue('core/last version test', date.today().strftime('%Y-%m-%d')) Settings().setValue('core/last version test', date.today().strftime('%Y-%m-%d'))
def check_for_update(parent): def check_for_update(main_window):
""" """
Run a thread to download and check the version of OpenLP Run a thread to download and check the version of OpenLP
:param MainWindow parent: The parent object for the thread. Usually the OpenLP main window. :param MainWindow main_window: The OpenLP main window.
""" """
last_check_date = Settings().value('core/last version test') last_check_date = Settings().value('core/last version test')
if date.today().strftime('%Y-%m-%d') <= last_check_date: if date.today().strftime('%Y-%m-%d') <= last_check_date:
log.debug('Version check skipped, last checked today') log.debug('Version check skipped, last checked today')
return return
worker = VersionWorker(last_check_date, get_version()) worker = VersionWorker(last_check_date, get_version())
worker.new_version.connect(parent.on_new_version) worker.new_version.connect(main_window.on_new_version)
worker.quit.connect(update_check_date) worker.quit.connect(update_check_date)
# TODO: Use this to figure out if there's an Internet connection? # TODO: Use this to figure out if there's an Internet connection?
# worker.no_internet.connect(parent.on_no_internet) # worker.no_internet.connect(parent.on_no_internet)

View File

@ -30,21 +30,20 @@ from PyQt5 import QtCore, QtWidgets
from openlp.core.common.i18n import translate from openlp.core.common.i18n import translate
from openlp.core.common.mixins import RegistryProperties from openlp.core.common.mixins import RegistryProperties
from openlp.core.common.settings import Settings from openlp.core.common.settings import Settings
from openlp.core.threading import run_thread from openlp.core.threading import ThreadWorker, run_thread
from openlp.plugins.songs.forms.songselectdialog import Ui_SongSelectDialog from openlp.plugins.songs.forms.songselectdialog import Ui_SongSelectDialog
from openlp.plugins.songs.lib.songselect import SongSelectImport from openlp.plugins.songs.lib.songselect import SongSelectImport
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class SearchWorker(QtCore.QObject): class SearchWorker(ThreadWorker):
""" """
Run the actual SongSelect search, and notify the GUI when we find each song. Run the actual SongSelect search, and notify the GUI when we find each song.
""" """
show_info = QtCore.pyqtSignal(str, str) show_info = QtCore.pyqtSignal(str, str)
found_song = QtCore.pyqtSignal(dict) found_song = QtCore.pyqtSignal(dict)
finished = QtCore.pyqtSignal() finished = QtCore.pyqtSignal()
quit = QtCore.pyqtSignal()
def __init__(self, importer, search_text): def __init__(self, importer, search_text):
super().__init__() super().__init__()

View File

@ -462,8 +462,8 @@ class TestSystemPlayer(TestCase):
self.assertEqual(expected_info, result) self.assertEqual(expected_info, result)
@patch('openlp.core.ui.media.systemplayer.CheckMediaWorker') @patch('openlp.core.ui.media.systemplayer.CheckMediaWorker')
@patch('openlp.core.ui.media.systemplayer.QtCore.QThread') @patch('openlp.core.ui.media.systemplayer.run_thread')
def test_check_media(self, MockQThread, MockCheckMediaWorker): def test_check_media(self, mocked_run_thread, MockCheckMediaWorker):
""" """
Test the check_media() method of the SystemPlayer Test the check_media() method of the SystemPlayer
""" """
@ -473,29 +473,12 @@ class TestSystemPlayer(TestCase):
Registry().create() Registry().create()
Registry().register('application', mocked_application) Registry().register('application', mocked_application)
player = SystemPlayer(self) player = SystemPlayer(self)
mocked_thread = MagicMock()
mocked_thread.isRunning.side_effect = [True, False]
mocked_thread.quit = 'quit' # actually supposed to be a slot, but it's all mocked out anyway
MockQThread.return_value = mocked_thread
mocked_check_media_worker = MagicMock()
mocked_check_media_worker.play = 'play'
mocked_check_media_worker.result = True
MockCheckMediaWorker.return_value = mocked_check_media_worker
# WHEN: check_media() is called with a valid media file # WHEN: check_media() is called with a valid media file
result = player.check_media(valid_file) player.check_media(valid_file)
# THEN: It should return True # THEN: It should return True
MockQThread.assert_called_once_with() assert False, 'Fix this test'
MockCheckMediaWorker.assert_called_once_with(valid_file)
mocked_check_media_worker.setVolume.assert_called_once_with(0)
mocked_check_media_worker.moveToThread.assert_called_once_with(mocked_thread)
mocked_check_media_worker.finished.connect.assert_called_once_with('quit')
mocked_thread.started.connect.assert_called_once_with('play')
mocked_thread.start.assert_called_once_with()
self.assertEqual(2, mocked_thread.isRunning.call_count)
mocked_application.processEvents.assert_called_once_with()
self.assertTrue(result)
class TestCheckMediaWorker(TestCase): class TestCheckMediaWorker(TestCase):
@ -524,12 +507,12 @@ class TestCheckMediaWorker(TestCase):
# WHEN: signals() is called with media and BufferedMedia # WHEN: signals() is called with media and BufferedMedia
with patch.object(worker, 'stop') as mocked_stop, \ with patch.object(worker, 'stop') as mocked_stop, \
patch.object(worker, 'finished') as mocked_finished: patch.object(worker, 'quit') as mocked_quit:
worker.signals('media', worker.BufferedMedia) worker.signals('media', worker.BufferedMedia)
# THEN: The worker should exit and the result should be True # THEN: The worker should exit and the result should be True
mocked_stop.assert_called_once_with() mocked_stop.assert_called_once_with()
mocked_finished.emit.assert_called_once_with() mocked_quit.emit.assert_called_once_with()
self.assertTrue(worker.result) self.assertTrue(worker.result)
def test_signals_error(self): def test_signals_error(self):
@ -541,10 +524,10 @@ class TestCheckMediaWorker(TestCase):
# WHEN: signals() is called with error and BufferedMedia # WHEN: signals() is called with error and BufferedMedia
with patch.object(worker, 'stop') as mocked_stop, \ with patch.object(worker, 'stop') as mocked_stop, \
patch.object(worker, 'finished') as mocked_finished: patch.object(worker, 'quit') as mocked_quit:
worker.signals('error', None) worker.signals('error', None)
# THEN: The worker should exit and the result should be True # THEN: The worker should exit and the result should be True
mocked_stop.assert_called_once_with() mocked_stop.assert_called_once_with()
mocked_finished.emit.assert_called_once_with() mocked_quit.emit.assert_called_once_with()
self.assertFalse(worker.result) self.assertFalse(worker.result)