Fix issue #1618 by ignoring the messages if the event loop is not running

This commit is contained in:
Raoul Snyman 2023-08-12 15:44:19 -07:00
parent 3593ae22b3
commit 62aadcf7b9
2 changed files with 153 additions and 32 deletions

View File

@ -23,16 +23,14 @@ The :mod:`websockets` module contains the websockets server. This is a server us
changes from within OpenLP. It uses JSON to communicate with the remotes.
"""
import asyncio
import dataclasses
from dataclasses import dataclass
import json
import logging
from typing import Optional, Union
import uuid
from dataclasses import asdict, dataclass
from typing import Optional, Union
from PyQt5 import QtCore
import time
from PyQt5 import QtCore
from websockets import serve
from openlp.core.common.mixins import LogMixin, RegistryProperties
@ -86,10 +84,11 @@ class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
log.debug('WebSocket server started on {addr}:{port}'.format(addr=address, port=port))
except Exception:
log.exception('Failed to start WebSocket server')
loop += 1
time.sleep(0.1)
if not self.server and loop > 3:
log.error('Unable to start WebSocket server {addr}:{port}, giving up'.format(addr=address, port=port))
break
loop += 1
if self.server:
# If the websocket server exists, start listening
try:
@ -184,6 +183,10 @@ class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
Inserts the state in each connection message queue
:param state: OpenLP State
"""
if not self.event_loop.is_running():
# Sometimes the event loop doesn't run when we call this method -- probably because it is shutting down
# See https://gitlab.com/openlp/openlp/-/issues/1618
return
for queue in self.state_queues.copy():
self.event_loop.call_soon_threadsafe(queue.put_nowait, state)
@ -192,8 +195,12 @@ class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
Inserts the message in each connection message queue
:param state: OpenLP State
"""
if not self.event_loop.is_running():
# Sometimes the event loop doesn't run when we call this method -- probably because it is shutting down
# See https://gitlab.com/openlp/openlp/-/issues/1618
return
for queue in self.message_queues.copy():
self.event_loop.call_soon_threadsafe(queue.put_nowait, dataclasses.asdict(message))
self.event_loop.call_soon_threadsafe(queue.put_nowait, asdict(message))
class WebSocketServer(RegistryBase, RegistryProperties, QtCore.QObject, LogMixin):
@ -261,8 +268,7 @@ def websocket_send_message(message: WebSocketMessage):
"""
Sends a message over websocket to all connected clients.
"""
ws: WebSocketServer = Registry().get("web_socket_server")
if ws:
if ws := Registry().get("web_socket_server"):
ws.send_message(message)
return True
return False

View File

@ -22,26 +22,27 @@
Functional tests to test the Http Server Class.
"""
import pytest
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, call, patch
from openlp.core.api.websocketspoll import WebSocketPoller
from openlp.core.api.websockets import WebSocketMessage, WebSocketWorker, WebSocketServer, websocket_send_message
from openlp.core.common.registry import Registry
from openlp.core.common.settings import Settings
@pytest.fixture
def poller(settings):
def poller(settings: Settings):
poll = WebSocketPoller()
yield poll
@pytest.fixture
def worker(settings):
def worker(settings: Settings):
worker = WebSocketWorker()
yield worker
def test_poller_get_state(poller, settings):
def test_poller_get_state(poller: WebSocketPoller, settings: Settings):
"""
Test the get_state function returns the correct JSON
"""
@ -71,7 +72,7 @@ def test_poller_get_state(poller, settings):
assert poll_json['results']['item'] == '23-34-45', 'The item return value should match 23-34-45'
def test_poller_event_attach(poller, settings):
def test_poller_event_attach(poller: WebSocketPoller, settings: Settings):
"""
Test the event attach of WebSocketPoller
"""
@ -95,7 +96,7 @@ def test_poller_event_attach(poller, settings):
slidecontroller_changed_connect.assert_called_once()
def test_poller_on_change_emit(poller, settings):
def test_poller_on_change_emit(poller: WebSocketPoller, settings: Settings):
"""
Test the change event emission of WebSocketPoller
"""
@ -112,7 +113,7 @@ def test_poller_on_change_emit(poller, settings):
poller_changed_emit.assert_called_once()
def test_poller_get_state_is_never_none(poller):
def test_poller_get_state_is_never_none(poller: WebSocketPoller):
"""
Test the get_state call never returns None
"""
@ -126,7 +127,7 @@ def test_poller_get_state_is_never_none(poller):
assert state is not None, 'get_state() return should not be None'
def test_send_message_works(settings):
def test_send_message_works(settings: Settings):
"""
Test the send_message_works really works
"""
@ -142,26 +143,38 @@ def test_send_message_works(settings):
server.worker.add_message_to_queues.assert_called_once_with(message)
def test_websocket_send_message_works(settings):
"""
Test the send_message_works really works
"""
def test_websocket_send_message_works(registry: Registry, settings: Settings):
"""Test the send_message_works really works"""
# GIVEN: A mocked WebSocketWorker and a message
server = WebSocketServer()
server.worker = MagicMock()
message = WebSocketMessage(plugin="core", key="test", value="test")
# WHEN: send_message is called
websocket_send_message(message)
result = websocket_send_message(message)
# THEN: Worker add_message_to_queues should be called
assert result is True
server.worker.add_message_to_queues.assert_called_once_with(message)
def test_websocket_send_message_fail(registry: Registry, settings: Settings):
"""Test the send_message_works returns False when there is no WebSocker server"""
# GIVEN: A message
message = WebSocketMessage(plugin="core", key="test", value="test")
# WHEN: send_message is called
result = websocket_send_message(message)
# THEN: The return value should be false
assert result is False
@patch('openlp.core.api.websockets.serve')
@patch('openlp.core.api.websockets.asyncio')
@patch('openlp.core.api.websockets.log')
def test_websocket_worker_start(mocked_log, mocked_asyncio, mocked_serve, worker, settings):
def test_websocket_worker_start(mocked_log: MagicMock, mocked_asyncio: MagicMock, mocked_serve: MagicMock,
worker: WebSocketWorker, settings: Settings):
"""
Test the start function of the worker
"""
@ -183,7 +196,8 @@ def test_websocket_worker_start(mocked_log, mocked_asyncio, mocked_serve, worker
@patch('openlp.core.api.websockets.serve')
@patch('openlp.core.api.websockets.asyncio')
@patch('openlp.core.api.websockets.log')
def test_websocket_worker_start_fail(mocked_log, mocked_asyncio, mocked_serve, worker, settings):
def test_websocket_worker_start_fail(mocked_log: MagicMock, mocked_asyncio: MagicMock, mocked_serve: MagicMock,
worker: WebSocketWorker, settings: Settings):
"""
Test the start function of the worker handles a error nicely
"""
@ -192,8 +206,10 @@ def test_websocket_worker_start_fail(mocked_log, mocked_asyncio, mocked_serve, w
event_loop = MagicMock()
mocked_asyncio.new_event_loop.return_value = event_loop
event_loop.run_until_complete.side_effect = Exception()
# WHEN: The start function is called
worker.start()
# THEN: An exception is logged but is handled and the event_loop is closed
mocked_serve.assert_called_once()
event_loop.run_until_complete.assert_called_once_with('server_thing')
@ -202,7 +218,30 @@ def test_websocket_worker_start_fail(mocked_log, mocked_asyncio, mocked_serve, w
event_loop.close.assert_called_once_with()
def test_websocket_server_bootstrap_post_set_up(settings):
@patch('openlp.core.api.websockets.serve')
@patch('openlp.core.api.websockets.asyncio')
@patch('openlp.core.api.websockets.log')
def test_websocket_worker_start_exception(mocked_log: MagicMock, mocked_asyncio: MagicMock, mocked_serve: MagicMock,
worker: WebSocketWorker, settings: Settings):
"""
Test the start function of the worker handles a error nicely
"""
# GIVEN: A mocked serve function and event loop. run_until_complete returns a error
mocked_serve.return_value = None
mocked_serve.side_effect = Exception('Test')
# WHEN: The start function is called
worker.start()
# THEN: An exception is logged but is handled and the event_loop is closed
assert worker.server is None
assert not worker.state_queues
assert not worker.message_queues
mocked_log.exception.assert_called_with('Failed to start WebSocket server')
mocked_log.error.assert_called_once_with('Unable to start WebSocket server 0.0.0.0:4317, giving up')
def test_websocket_server_bootstrap_post_set_up(settings: Settings):
"""
Test that the bootstrap_post_set_up() method calls the start method
"""
@ -220,7 +259,7 @@ def test_websocket_server_bootstrap_post_set_up(settings):
@patch('openlp.core.api.websockets.WebSocketWorker')
@patch('openlp.core.api.websockets.run_thread')
def test_websocket_server_start(mocked_run_thread, MockWebSocketWorker, registry):
def test_websocket_server_start(mocked_run_thread: MagicMock, MockWebSocketWorker: MagicMock, registry: Registry):
"""
Test the starting of the WebSockets Server with the disabled flag set off
"""
@ -238,7 +277,7 @@ def test_websocket_server_start(mocked_run_thread, MockWebSocketWorker, registry
@patch('openlp.core.api.websockets.WebSocketWorker')
@patch('openlp.core.api.websockets.run_thread')
def test_websocket_server_start_not_required(mocked_run_thread, MockWebSocketWorker, registry):
def test_websocket_server_start_not_required(mocked_run_thread, MockWebSocketWorker, registry: Registry):
"""
Test the starting of the WebSockets Server with the disabled flag set on
"""
@ -256,7 +295,7 @@ def test_websocket_server_start_not_required(mocked_run_thread, MockWebSocketWor
@patch('openlp.core.api.websockets.poller')
@patch('openlp.core.api.websockets.run_thread')
def test_websocket_server_connects_to_poller(mock_run_thread, mock_poller, settings):
def test_websocket_server_connects_to_poller(mock_run_thread: MagicMock, mock_poller: MagicMock, settings: Settings):
"""
Test if the websocket_server connects to WebSocketPoller
"""
@ -277,7 +316,8 @@ def test_websocket_server_connects_to_poller(mock_run_thread, mock_poller, setti
@patch('openlp.core.api.websockets.poller')
@patch('openlp.core.api.websockets.WebSocketWorker.add_state_to_queues')
@patch('openlp.core.api.websockets.run_thread')
def test_websocket_worker_register_connections(mock_run_thread, mock_add_state_to_queues, mock_poller, settings):
def test_websocket_worker_register_connections(mock_run_thread: MagicMock, mock_add_state_to_queues: MagicMock,
mock_poller: MagicMock, settings: Settings):
"""
Test if the websocket_server can receive poller signals
"""
@ -297,7 +337,7 @@ def test_websocket_worker_register_connections(mock_run_thread, mock_add_state_t
@patch('openlp.core.api.websockets.poller')
@patch('openlp.core.api.websockets.log')
def test_websocket_server_try_poller_hook_signals(mocked_log, mock_poller, settings):
def test_websocket_server_try_poller_hook_signals(mocked_log: MagicMock, mock_poller: MagicMock, settings: Settings):
"""
Test if the websocket_server invokes poller.hook_signals
"""
@ -315,7 +355,7 @@ def test_websocket_server_try_poller_hook_signals(mocked_log, mock_poller, setti
@patch('openlp.core.api.websockets.poller')
def test_websocket_server_close(mock_poller, settings):
def test_websocket_server_close(mock_poller: MagicMock, settings: Settings):
"""
Test that the websocket_server close method works correctly
"""
@ -338,7 +378,7 @@ def test_websocket_server_close(mock_poller, settings):
@patch('openlp.core.api.websockets.poller')
def test_websocket_server_close_when_disabled(mock_poller, registry, settings):
def test_websocket_server_close_when_disabled(mock_poller: MagicMock, registry: Registry, settings: Settings):
"""
Test if the websocket_server close method correctly skips teardown when disabled
"""
@ -355,3 +395,78 @@ def test_websocket_server_close_when_disabled(mock_poller, registry, settings):
# THEN: poller_changed should be connected with WebSocketServer and correct handler
assert mock_poller.poller_changed.disconnect.call_count == 0
assert mock_poller.unhook_signals.call_count == 0
def test_add_state_to_queues(worker: WebSocketWorker, settings: Settings):
"""Test that adding the state adds the state to each item in the queue"""
# GIVEN: A WebSocketWorker and some mocked methods
worker.event_loop = MagicMock(**{'is_running.return_value': True})
mocked_queue_1 = MagicMock(put_nowait='put_nowait1')
mocked_queue_2 = MagicMock(put_nowait='put_nowait2')
worker.state_queues = MagicMock(**{'copy.return_value': [mocked_queue_1, mocked_queue_2]})
# WHEN: add_state_to_queues is called
worker.add_state_to_queues({'results': {'service': 'service-id'}})
# THEN: The correct calls should have been made
assert worker.event_loop.call_soon_threadsafe.call_args_list == [
call('put_nowait1', {'results': {'service': 'service-id'}}),
call('put_nowait2', {'results': {'service': 'service-id'}})
]
def test_add_state_to_queues_no_loop(worker: WebSocketWorker, settings: Settings):
"""Test that adding the state when there's no event loop just exits early"""
# GIVEN: A WebSocketWorker and some mocked methods
worker.event_loop = MagicMock(**{'is_running.return_value': False})
# WHEN: add_state_to_queues is called
worker.add_state_to_queues({'results': {'service': 'service-id'}})
# THEN: Worker add_message_to_queues should be called
worker.event_loop.call_soon_threadsafe.assert_not_called()
def test_add_message_to_queues(worker: WebSocketWorker, settings: Settings):
"""Test that adding the message adds the message to each item in the queue"""
# GIVEN: A WebSocketWorker and some mocked methods
worker.event_loop = MagicMock(**{'is_running.return_value': True})
mocked_queue_1 = MagicMock(put_nowait='put_nowait1')
mocked_queue_2 = MagicMock(put_nowait='put_nowait2')
worker.message_queues = MagicMock(**{'copy.return_value': [mocked_queue_1, mocked_queue_2]})
message = WebSocketMessage(plugin="core", key="test", value="test")
# WHEN: add_state_to_queues is called
worker.add_message_to_queues(message)
# THEN: The correct calls should have been made
assert worker.event_loop.call_soon_threadsafe.call_args_list == [
call('put_nowait1', {'plugin': 'core', 'key': 'test', 'value': 'test'}),
call('put_nowait2', {'plugin': 'core', 'key': 'test', 'value': 'test'}),
]
def test_add_message_to_queues_no_loop(worker: WebSocketWorker, settings: Settings):
"""Test that adding the state when there's no event loop just exits early"""
# GIVEN: A WebSocketWorker and some mocked methods
worker.event_loop = MagicMock(**{'is_running.return_value': False})
message = WebSocketMessage(plugin="core", key="test", value="test")
# WHEN: add_state_to_queues is called
worker.add_message_to_queues(message)
# THEN: Worker add_message_to_queues should be called
worker.event_loop.call_soon_threadsafe.assert_not_called()
def test_worker_stop(worker: WebSocketWorker, settings: Settings):
"""Test that the worker stops"""
# GIVEN: A WebSocketWorker
worker.event_loop = MagicMock()
worker.event_loop.call_soon_threadsafe.side_effect = Exception('Test')
# WHEN: stop is called
worker.stop()
# THEN: No exception and the method should have been called
worker.event_loop.call_soon_threadsafe.assert_called_once()