From b4048e0d5277d5c0a4fd44a1e3566ee2b6dbbe89 Mon Sep 17 00:00:00 2001 From: Mateus Meyer Jiacomelli Date: Tue, 1 Mar 2022 06:06:38 +0000 Subject: [PATCH] Make Websocket Server and Poller to notify state change based on events --- openlp/core/api/websockets.py | 178 +++++++++++++++-------- openlp/core/api/websocketspoll.py | 40 +++-- openlp/core/ui/mainwindow.py | 3 + openlp/core/ui/servicemanager.py | 5 + openlp/core/ui/slidecontroller.py | 3 + tests/openlp_core/api/test_websockets.py | 143 +++++++++++++----- 6 files changed, 260 insertions(+), 112 deletions(-) diff --git a/openlp/core/api/websockets.py b/openlp/core/api/websockets.py index 94fcb848b..34d545179 100644 --- a/openlp/core/api/websockets.py +++ b/openlp/core/api/websockets.py @@ -25,6 +25,9 @@ changes from within OpenLP. It uses JSON to communicate with the remotes. import asyncio import json import logging +import uuid + +from PyQt5 import QtCore import time from websockets import serve @@ -44,63 +47,6 @@ ws_logger = logging.getLogger('websockets') ws_logger.setLevel(logging.ERROR) -async def handle_websocket(websocket, path): - """ - Handle web socket requests and return the state information - Check every 0.2 seconds to get the latest position and send if it changed. - - :param websocket: request from client - :param path: determines the endpoints supported - Not needed - """ - log.debug('WebSocket handle_websocket connection') - await register(websocket) - reply = poller.get_state() - if reply: - json_reply = json.dumps(reply).encode() - await websocket.send(json_reply) - while True: - try: - await notify_users() - await asyncio.wait_for(websocket.recv(), 0.2) - except asyncio.TimeoutError: - pass - except Exception: - await unregister(websocket) - break - - -async def register(websocket): - """ - Register Clients - :param websocket: The client details - :return: - """ - log.debug('WebSocket handler register') - USERS.add(websocket) - - -async def unregister(websocket): - """ - Unregister Clients - :param websocket: The client details - :return: - """ - log.debug('WebSocket handler unregister') - USERS.remove(websocket) - - -async def notify_users(): - """ - Dispatch state to all registered users if we have any changes - :return: - """ - if USERS: # asyncio.wait doesn't accept an empty list - reply = poller.get_state_if_changed() - if reply: - json_reply = json.dumps(reply).encode() - await asyncio.wait([user.send(json_reply) for user in USERS]) - - class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin): """ A special Qt thread class to allow the WebSockets server to run at the same time as the UI. @@ -118,9 +64,10 @@ class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin): # Create the websocker server loop = 1 self.server = None + self.queues = set() while not self.server: try: - self.server = serve(handle_websocket, address, port) + self.server = serve(self.handle_websocket, address, port) log.debug('WebSocket server started on {addr}:{port}'.format(addr=address, port=port)) except Exception: log.exception('Failed to start WebSocket server') @@ -143,18 +90,121 @@ class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin): """ Stop the websocket server """ - self.event_loop.call_soon_threadsafe(self.event_loop.stop) + try: + self.event_loop.call_soon_threadsafe(self.event_loop.stop) + except BaseException: + pass + + async def handle_websocket(self, websocket, path): + """ + Handle web socket requests and return the state information + Waits for information to come over queue + + :param websocket: request from client + :param path: determines the endpoints supported - Not needed + """ + client_id = uuid.uuid4() if log.getEffectiveLevel() == logging.DEBUG else 0 + log.debug(f'(client_id={client_id}) WebSocket handle_websocket connection') + queue = asyncio.Queue() + await self.register(websocket, client_id, queue) + try: + reply = poller.get_state() + if reply: + await self.send_reply(websocket, client_id, reply) + while True: + done, pending = await asyncio.wait( + [asyncio.create_task(queue.get()), asyncio.create_task(websocket.wait_closed())], + return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + # If there is a new item in client's queue, await_result will contain an item, if the connection is + # closed, it will be None. + await_result = done.pop().result() + if await_result is not None: + await self.send_reply(websocket, client_id, await_result) + else: + break + finally: + await self.unregister(websocket, client_id, queue) + + async def register(self, websocket, client_id, queue): + """ + Register Clients + :param websocket: The client details + :param queue: The Command Queue + :return: + """ + log.debug(f'(client_id={client_id}) WebSocket handler register') + USERS.add(websocket) + self.queues.add(queue) + log.debug('WebSocket clients count: {client_count}'.format(client_count=len(USERS))) + + async def unregister(self, websocket, client_id, queue): + """ + Unregister Clients + :param websocket: The client details + :return: + """ + USERS.remove(websocket) + self.queues.remove(queue) + log.debug(f'(client_id={client_id}) WebSocket handler unregister') + log.debug('WebSocket clients count: {client_count}'.format(client_count=len(USERS))) + + async def send_reply(self, websocket, client_id, reply): + json_reply = json.dumps(reply).encode() + await websocket.send(json_reply) + log.debug(f'(client_id={client_id}) WebSocket send reply: {json_reply}') + + def add_state_to_queues(self, state): + """ + Inserts the state in each connection message queue + :param state: OpenLP State + """ + for queue in self.queues.copy(): + self.event_loop.call_soon_threadsafe(queue.put_nowait, state) -class WebSocketServer(RegistryProperties, LogMixin): +class WebSocketServer(RegistryProperties, QtCore.QObject, LogMixin): """ Wrapper round a server instance """ def __init__(self): """ - Initialise and start the WebSockets server + Initialise the WebSockets server """ super(WebSocketServer, self).__init__() - if not Registry().get_flag('no_web_server'): - worker = WebSocketWorker() - run_thread(worker, 'websocket_server') + self.worker = None + + def start(self): + """ + Starts the WebSockets server + """ + if self.worker is None and not Registry().get_flag('no_web_server'): + self.worker = WebSocketWorker() + run_thread(self.worker, 'websocket_server') + poller.poller_changed.connect(self.handle_poller_signal) + # Only hooking poller signals after all UI is available + Registry().register_function('bootstrap_completion', self.try_poller_hook_signals) + + @QtCore.pyqtSlot() + def handle_poller_signal(self): + if self.worker is not None: + self.worker.add_state_to_queues(poller.get_state()) + + def try_poller_hook_signals(self): + try: + poller.hook_signals() + except Exception: + log.error('Failed to hook poller signals!') + + def close(self): + """ + Closes the WebSocket server and detach associated signals + """ + try: + poller.poller_changed.disconnect(self.handle_poller_signal) + poller.unhook_signals() + self.worker.stop() + finally: + self.worker = None diff --git a/openlp/core/api/websocketspoll.py b/openlp/core/api/websocketspoll.py index cf3fb8bb9..3570760de 100644 --- a/openlp/core/api/websocketspoll.py +++ b/openlp/core/api/websocketspoll.py @@ -18,21 +18,30 @@ # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # ########################################################################## +from PyQt5 import QtCore from openlp.core.common.mixins import RegistryProperties -class WebSocketPoller(RegistryProperties): +class WebSocketPoller(QtCore.QObject, RegistryProperties): """ Accessed by web sockets to get status type information from the application """ + + poller_changed = QtCore.pyqtSignal() + def __init__(self): """ Constructor for the web sockets poll builder class. """ super(WebSocketPoller, self).__init__() - self._previous = {} + self._state = None def get_state(self): + if self._state is None: + self._state = self.create_state() + return self._state + + def create_state(self): return {'results': { 'counter': self.live_controller.slide_count if self.live_controller.slide_count else 0, 'service': self.service_manager.service_id, @@ -47,17 +56,20 @@ class WebSocketPoller(RegistryProperties): 'chordNotation': self.settings.value('songs/chord notation') }} - def get_state_if_changed(self): - """ - Poll OpenLP to determine current state if it has changed. + def hook_signals(self): + self.live_controller.slidecontroller_changed.connect(self.on_signal_received) + self.service_manager.servicemanager_changed.connect(self.on_signal_received) - This must only be used by web sockets or else we could miss a state change. + def unhook_signals(self): + try: + self.live_controller.slidecontroller_changed.disconnect(self.on_signal_received) + self.service_manager.servicemanager_changed.disconnect(self.on_signal_received) + except Exception: + pass - :return: The current application state or None if unchanged since last call - """ - current = self.get_state() - if self._previous != current: - self._previous = current - return current - else: - return None + @QtCore.pyqtSlot(list) + @QtCore.pyqtSlot(str) + @QtCore.pyqtSlot() + def on_signal_received(self): + self._state = self.create_state() + self.poller_changed.emit() diff --git a/openlp/core/ui/mainwindow.py b/openlp/core/ui/mainwindow.py index 7c53e2f4c..59a7a6fcf 100644 --- a/openlp/core/ui/mainwindow.py +++ b/openlp/core/ui/mainwindow.py @@ -481,6 +481,7 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, LogMixin, RegistryPropert self.settings.set_up_default_values() self.about_form = AboutForm(self) self.ws_server = WebSocketServer() + self.ws_server.start() self.http_server = HttpServer(self) start_zeroconf() SettingsForm(self) @@ -1095,6 +1096,8 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, LogMixin, RegistryPropert # Check if we need to change the data directory if self.new_data_path: self.change_data_directory() + # Close down WebSocketServer + self.ws_server.close() # Close down the display self.live_controller.close_displays() # Clean temporary files used by services diff --git a/openlp/core/ui/servicemanager.py b/openlp/core/ui/servicemanager.py index 4d2eb7f81..3e71b0f1a 100644 --- a/openlp/core/ui/servicemanager.py +++ b/openlp/core/ui/servicemanager.py @@ -312,6 +312,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi servicemanager_next_item = QtCore.pyqtSignal() servicemanager_previous_item = QtCore.pyqtSignal() servicemanager_new_file = QtCore.pyqtSignal() + servicemanager_changed = QtCore.pyqtSignal() theme_update_service = QtCore.pyqtSignal() def __init__(self, parent=None): @@ -380,6 +381,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi else: service_file = translate('OpenLP.ServiceManager', 'Untitled Service') self.main_window.set_service_modified(modified, service_file) + self.servicemanager_changed.emit() def is_modified(self): """ @@ -530,6 +532,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi self.settings.setValue('servicemanager/last file', None) self.plugin_manager.new_service_created() self.live_controller.slide_count = 0 + self.servicemanager_changed.emit() def create_basic_service(self): """ @@ -1280,6 +1283,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi self.service_items.remove(self.service_items[item]) self.repaint_service_list(item - 1, -1) self.set_modified() + self.servicemanager_changed.emit() def repaint_service_list(self, service_item, service_item_child): """ @@ -1577,6 +1581,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi child = row self.application.set_busy_cursor() if self.service_items[item]['service_item'].is_valid: + self.servicemanager_changed.emit() self.live_controller.add_service_manager_item(self.service_items[item]['service_item'], child) if self.settings.value('core/auto preview'): item += 1 diff --git a/openlp/core/ui/slidecontroller.py b/openlp/core/ui/slidecontroller.py index 3d300827e..41008ba20 100644 --- a/openlp/core/ui/slidecontroller.py +++ b/openlp/core/ui/slidecontroller.py @@ -141,6 +141,8 @@ class SlideController(QtWidgets.QWidget, LogMixin, RegistryProperties): user uses to control the displaying of verses/slides/etc on the screen. """ + slidecontroller_changed = QtCore.pyqtSignal() + def __init__(self, *args, **kwargs): """ Set up the Slide Controller. @@ -1278,6 +1280,7 @@ class SlideController(QtWidgets.QWidget, LogMixin, RegistryProperties): The output has changed so we need to poke to POLL Websocket. """ self.slide_count += 1 + self.slidecontroller_changed.emit() def display_maindisplay(self): """ diff --git a/tests/openlp_core/api/test_websockets.py b/tests/openlp_core/api/test_websockets.py index 02f0ad624..bb1665909 100644 --- a/tests/openlp_core/api/test_websockets.py +++ b/tests/openlp_core/api/test_websockets.py @@ -45,12 +45,14 @@ def worker(settings): @patch('openlp.core.api.websockets.run_thread') def test_serverstart(mocked_run_thread, MockWebSocketWorker, registry): """ - Test the starting of the WebSockets Server with the disabled flag set on + Test the starting of the WebSockets Server with the disabled flag set off """ - # GIVEN: A new httpserver - # WHEN: I start the server + # GIVEN: A new WebSocketServer Registry().set_flag('no_web_server', False) - WebSocketServer() + server = WebSocketServer() + + # WHEN: I start the server + server.start() # THEN: the api environment should have been created assert mocked_run_thread.call_count == 1, 'The qthread should have been called once' @@ -61,14 +63,16 @@ def test_serverstart(mocked_run_thread, MockWebSocketWorker, registry): @patch('openlp.core.api.websockets.run_thread') def test_serverstart_not_required(mocked_run_thread, MockWebSocketWorker, registry): """ - Test the starting of the WebSockets Server with the disabled flag set off + Test the starting of the WebSockets Server with the disabled flag set on """ - # GIVEN: A new httpserver and the server is not required - # WHEN: I start the server + # GIVEN: A new WebSocketServer and the server is not required Registry().set_flag('no_web_server', True) - WebSocketServer() + server = WebSocketServer() - # THEN: the api environment should have been created + # WHEN: I start the server + server.start() + + # THEN: the api environment should have not been created assert mocked_run_thread.call_count == 0, 'The qthread should not have been called' assert MockWebSocketWorker.call_count == 0, 'The http thread should not have been called' @@ -103,31 +107,6 @@ def test_poller_get_state(poller, settings): assert poll_json['results']['item'] == '23-34-45', 'The item return value should match 23-34-45' -def test_poller_get_state_if_changed(poller, settings): - """ - Test the get_state_if_changed function returns None if the state has not changed - """ - # GIVEN: the system is configured with a set of data - poller._previous = {} - mocked_service_manager = MagicMock() - mocked_service_manager.service_id = 21 - mocked_live_controller = MagicMock() - mocked_live_controller.selected_row = 5 - mocked_live_controller.service_item = MagicMock() - mocked_live_controller.service_item.unique_identifier = '23-34-45' - mocked_live_controller.blank_screen.isChecked.return_value = True - mocked_live_controller.theme_screen.isChecked.return_value = False - mocked_live_controller.desktop_screen.isChecked.return_value = False - Registry().register('live_controller', mocked_live_controller) - Registry().register('service_manager', mocked_service_manager) - # WHEN: The poller polls twice - poll_json = poller.get_state_if_changed() - poll_json2 = poller.get_state_if_changed() - # THEN: The get_state_if_changed function should return None on the second call because the state has not changed - assert poll_json is not None, 'The first get_state_if_changed function call should have not returned None' - assert poll_json2 is None, 'The second get_state_if_changed function should return None' - - @patch('openlp.core.api.websockets.serve') @patch('openlp.core.api.websockets.asyncio') @patch('openlp.core.api.websockets.log') @@ -170,3 +149,99 @@ def test_worker_start_fail(mocked_log, mocked_asyncio, mocked_serve, worker, set event_loop.run_forever.assert_not_called() mocked_log.exception.assert_called_once() event_loop.close.assert_called_once_with() + + +def test_poller_event_attach(poller, settings): + """ + Test the event attach of WebSocketPoller + """ + # GIVEN: A mocked live_controlled, a mocked slide_controller and mocked change signals + servicemanager_changed_connect = MagicMock() + service_manager = MagicMock() + service_manager.servicemanager_changed = MagicMock() + service_manager.servicemanager_changed.connect = servicemanager_changed_connect + poller._service_manager = service_manager + live_controller = MagicMock() + slidecontroller_changed_connect = MagicMock() + live_controller.slidecontroller_changed = MagicMock() + live_controller.slidecontroller_changed.connect = slidecontroller_changed_connect + poller._live_controller = live_controller + + # WHEN: the hook_signals function is called + poller.hook_signals() + + # THEN: service_manager and live_controller changed signals should be connected + servicemanager_changed_connect.assert_called_once() + slidecontroller_changed_connect.assert_called_once() + + +def test_poller_on_change_emit(poller, settings): + """ + Test the change event emission of WebSocketPoller + """ + # GIVEN: A mocked changed signal and create_state function + poller_changed_emit = MagicMock() + poller.create_state = MagicMock(return_value={}) + poller.poller_changed = MagicMock() + poller.poller_changed.emit = poller_changed_emit + + # WHEN: The on_signal_received function is called + poller.on_signal_received() + + # THEN: poller_changed signal should be emitted once + poller_changed_emit.assert_called_once() + + +def test_poller_get_state_is_never_none(poller): + """ + Test the get_state call never returns None + """ + # GIVEN: A mocked poller create_state + poller.create_state = MagicMock(return_value={"key1": "2"}) + + # WHEN: poller.get_state is called + state = poller.get_state() + + # THEN: state is not None + assert state is not None, 'get_state() return should not be None' + + +@patch('openlp.core.api.websockets.poller') +@patch('openlp.core.api.websockets.run_thread') +def test_websocket_server_connects_to_poller(mock_run_thread, mock_poller, settings): + """ + Test if the websocket_server connects to WebSocketPoller + """ + # GIVEN: A mocked poller signal and a server + Registry().set_flag('no_web_server', False) + mock_poller.poller_changed = MagicMock() + mock_poller.poller_changed.connect = MagicMock() + server = WebSocketServer() + server.handle_poller_signal = MagicMock() + + # WHEN: WebSocketServer is started + server.start() + + # THEN: poller_changed should be connected with WebSocketServer and correct handler + mock_poller.poller_changed.connect.assert_called_once_with(server.handle_poller_signal) + + +@patch('openlp.core.api.websockets.poller') +@patch('openlp.core.api.websockets.WebSocketWorker.add_state_to_queues') +@patch('openlp.core.api.websockets.run_thread') +def test_websocket_worker_register_connections(mock_run_thread, mock_add_state_to_queues, mock_poller, settings): + """ + Test if the websocket_server can receive poller signals + """ + # GIVEN: A mocked poller create_state, a mocked server and a mocked worker + mock_state = {"key1": "2"} + mock_poller.get_state.return_value = mock_state + Registry().set_flag('no_web_server', False) + server = WebSocketServer() + server.start() + + # WHEN: server.handle_poller_signal is called + server.handle_poller_signal() + + # THEN: WebSocketWorker state notify function should be called + mock_add_state_to_queues.assert_called_once_with(mock_state)