Make Websocket Server and Poller to notify state change based on events

This commit is contained in:
Mateus Meyer Jiacomelli 2022-03-01 06:06:38 +00:00 committed by Raoul Snyman
parent 962f36967a
commit b4048e0d52
6 changed files with 260 additions and 112 deletions

View File

@ -25,6 +25,9 @@ changes from within OpenLP. It uses JSON to communicate with the remotes.
import asyncio
import json
import logging
import uuid
from PyQt5 import QtCore
import time
from websockets import serve
@ -44,63 +47,6 @@ ws_logger = logging.getLogger('websockets')
ws_logger.setLevel(logging.ERROR)
async def handle_websocket(websocket, path):
"""
Handle web socket requests and return the state information
Check every 0.2 seconds to get the latest position and send if it changed.
:param websocket: request from client
:param path: determines the endpoints supported - Not needed
"""
log.debug('WebSocket handle_websocket connection')
await register(websocket)
reply = poller.get_state()
if reply:
json_reply = json.dumps(reply).encode()
await websocket.send(json_reply)
while True:
try:
await notify_users()
await asyncio.wait_for(websocket.recv(), 0.2)
except asyncio.TimeoutError:
pass
except Exception:
await unregister(websocket)
break
async def register(websocket):
"""
Register Clients
:param websocket: The client details
:return:
"""
log.debug('WebSocket handler register')
USERS.add(websocket)
async def unregister(websocket):
"""
Unregister Clients
:param websocket: The client details
:return:
"""
log.debug('WebSocket handler unregister')
USERS.remove(websocket)
async def notify_users():
"""
Dispatch state to all registered users if we have any changes
:return:
"""
if USERS: # asyncio.wait doesn't accept an empty list
reply = poller.get_state_if_changed()
if reply:
json_reply = json.dumps(reply).encode()
await asyncio.wait([user.send(json_reply) for user in USERS])
class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
"""
A special Qt thread class to allow the WebSockets server to run at the same time as the UI.
@ -118,9 +64,10 @@ class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
# Create the websocker server
loop = 1
self.server = None
self.queues = set()
while not self.server:
try:
self.server = serve(handle_websocket, address, port)
self.server = serve(self.handle_websocket, address, port)
log.debug('WebSocket server started on {addr}:{port}'.format(addr=address, port=port))
except Exception:
log.exception('Failed to start WebSocket server')
@ -143,18 +90,121 @@ class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
"""
Stop the websocket server
"""
try:
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
except BaseException:
pass
async def handle_websocket(self, websocket, path):
"""
Handle web socket requests and return the state information
Waits for information to come over queue
:param websocket: request from client
:param path: determines the endpoints supported - Not needed
"""
client_id = uuid.uuid4() if log.getEffectiveLevel() == logging.DEBUG else 0
log.debug(f'(client_id={client_id}) WebSocket handle_websocket connection')
queue = asyncio.Queue()
await self.register(websocket, client_id, queue)
try:
reply = poller.get_state()
if reply:
await self.send_reply(websocket, client_id, reply)
while True:
done, pending = await asyncio.wait(
[asyncio.create_task(queue.get()), asyncio.create_task(websocket.wait_closed())],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
# If there is a new item in client's queue, await_result will contain an item, if the connection is
# closed, it will be None.
await_result = done.pop().result()
if await_result is not None:
await self.send_reply(websocket, client_id, await_result)
else:
break
finally:
await self.unregister(websocket, client_id, queue)
async def register(self, websocket, client_id, queue):
"""
Register Clients
:param websocket: The client details
:param queue: The Command Queue
:return:
"""
log.debug(f'(client_id={client_id}) WebSocket handler register')
USERS.add(websocket)
self.queues.add(queue)
log.debug('WebSocket clients count: {client_count}'.format(client_count=len(USERS)))
async def unregister(self, websocket, client_id, queue):
"""
Unregister Clients
:param websocket: The client details
:return:
"""
USERS.remove(websocket)
self.queues.remove(queue)
log.debug(f'(client_id={client_id}) WebSocket handler unregister')
log.debug('WebSocket clients count: {client_count}'.format(client_count=len(USERS)))
async def send_reply(self, websocket, client_id, reply):
json_reply = json.dumps(reply).encode()
await websocket.send(json_reply)
log.debug(f'(client_id={client_id}) WebSocket send reply: {json_reply}')
def add_state_to_queues(self, state):
"""
Inserts the state in each connection message queue
:param state: OpenLP State
"""
for queue in self.queues.copy():
self.event_loop.call_soon_threadsafe(queue.put_nowait, state)
class WebSocketServer(RegistryProperties, LogMixin):
class WebSocketServer(RegistryProperties, QtCore.QObject, LogMixin):
"""
Wrapper round a server instance
"""
def __init__(self):
"""
Initialise and start the WebSockets server
Initialise the WebSockets server
"""
super(WebSocketServer, self).__init__()
if not Registry().get_flag('no_web_server'):
worker = WebSocketWorker()
run_thread(worker, 'websocket_server')
self.worker = None
def start(self):
"""
Starts the WebSockets server
"""
if self.worker is None and not Registry().get_flag('no_web_server'):
self.worker = WebSocketWorker()
run_thread(self.worker, 'websocket_server')
poller.poller_changed.connect(self.handle_poller_signal)
# Only hooking poller signals after all UI is available
Registry().register_function('bootstrap_completion', self.try_poller_hook_signals)
@QtCore.pyqtSlot()
def handle_poller_signal(self):
if self.worker is not None:
self.worker.add_state_to_queues(poller.get_state())
def try_poller_hook_signals(self):
try:
poller.hook_signals()
except Exception:
log.error('Failed to hook poller signals!')
def close(self):
"""
Closes the WebSocket server and detach associated signals
"""
try:
poller.poller_changed.disconnect(self.handle_poller_signal)
poller.unhook_signals()
self.worker.stop()
finally:
self.worker = None

View File

@ -18,21 +18,30 @@
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
from PyQt5 import QtCore
from openlp.core.common.mixins import RegistryProperties
class WebSocketPoller(RegistryProperties):
class WebSocketPoller(QtCore.QObject, RegistryProperties):
"""
Accessed by web sockets to get status type information from the application
"""
poller_changed = QtCore.pyqtSignal()
def __init__(self):
"""
Constructor for the web sockets poll builder class.
"""
super(WebSocketPoller, self).__init__()
self._previous = {}
self._state = None
def get_state(self):
if self._state is None:
self._state = self.create_state()
return self._state
def create_state(self):
return {'results': {
'counter': self.live_controller.slide_count if self.live_controller.slide_count else 0,
'service': self.service_manager.service_id,
@ -47,17 +56,20 @@ class WebSocketPoller(RegistryProperties):
'chordNotation': self.settings.value('songs/chord notation')
}}
def get_state_if_changed(self):
"""
Poll OpenLP to determine current state if it has changed.
def hook_signals(self):
self.live_controller.slidecontroller_changed.connect(self.on_signal_received)
self.service_manager.servicemanager_changed.connect(self.on_signal_received)
This must only be used by web sockets or else we could miss a state change.
def unhook_signals(self):
try:
self.live_controller.slidecontroller_changed.disconnect(self.on_signal_received)
self.service_manager.servicemanager_changed.disconnect(self.on_signal_received)
except Exception:
pass
:return: The current application state or None if unchanged since last call
"""
current = self.get_state()
if self._previous != current:
self._previous = current
return current
else:
return None
@QtCore.pyqtSlot(list)
@QtCore.pyqtSlot(str)
@QtCore.pyqtSlot()
def on_signal_received(self):
self._state = self.create_state()
self.poller_changed.emit()

View File

@ -481,6 +481,7 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, LogMixin, RegistryPropert
self.settings.set_up_default_values()
self.about_form = AboutForm(self)
self.ws_server = WebSocketServer()
self.ws_server.start()
self.http_server = HttpServer(self)
start_zeroconf()
SettingsForm(self)
@ -1095,6 +1096,8 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, LogMixin, RegistryPropert
# Check if we need to change the data directory
if self.new_data_path:
self.change_data_directory()
# Close down WebSocketServer
self.ws_server.close()
# Close down the display
self.live_controller.close_displays()
# Clean temporary files used by services

View File

@ -312,6 +312,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
servicemanager_next_item = QtCore.pyqtSignal()
servicemanager_previous_item = QtCore.pyqtSignal()
servicemanager_new_file = QtCore.pyqtSignal()
servicemanager_changed = QtCore.pyqtSignal()
theme_update_service = QtCore.pyqtSignal()
def __init__(self, parent=None):
@ -380,6 +381,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
else:
service_file = translate('OpenLP.ServiceManager', 'Untitled Service')
self.main_window.set_service_modified(modified, service_file)
self.servicemanager_changed.emit()
def is_modified(self):
"""
@ -530,6 +532,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
self.settings.setValue('servicemanager/last file', None)
self.plugin_manager.new_service_created()
self.live_controller.slide_count = 0
self.servicemanager_changed.emit()
def create_basic_service(self):
"""
@ -1280,6 +1283,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
self.service_items.remove(self.service_items[item])
self.repaint_service_list(item - 1, -1)
self.set_modified()
self.servicemanager_changed.emit()
def repaint_service_list(self, service_item, service_item_child):
"""
@ -1577,6 +1581,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
child = row
self.application.set_busy_cursor()
if self.service_items[item]['service_item'].is_valid:
self.servicemanager_changed.emit()
self.live_controller.add_service_manager_item(self.service_items[item]['service_item'], child)
if self.settings.value('core/auto preview'):
item += 1

View File

@ -141,6 +141,8 @@ class SlideController(QtWidgets.QWidget, LogMixin, RegistryProperties):
user uses to control the displaying of verses/slides/etc on the screen.
"""
slidecontroller_changed = QtCore.pyqtSignal()
def __init__(self, *args, **kwargs):
"""
Set up the Slide Controller.
@ -1278,6 +1280,7 @@ class SlideController(QtWidgets.QWidget, LogMixin, RegistryProperties):
The output has changed so we need to poke to POLL Websocket.
"""
self.slide_count += 1
self.slidecontroller_changed.emit()
def display_maindisplay(self):
"""

View File

@ -45,12 +45,14 @@ def worker(settings):
@patch('openlp.core.api.websockets.run_thread')
def test_serverstart(mocked_run_thread, MockWebSocketWorker, registry):
"""
Test the starting of the WebSockets Server with the disabled flag set on
Test the starting of the WebSockets Server with the disabled flag set off
"""
# GIVEN: A new httpserver
# WHEN: I start the server
# GIVEN: A new WebSocketServer
Registry().set_flag('no_web_server', False)
WebSocketServer()
server = WebSocketServer()
# WHEN: I start the server
server.start()
# THEN: the api environment should have been created
assert mocked_run_thread.call_count == 1, 'The qthread should have been called once'
@ -61,14 +63,16 @@ def test_serverstart(mocked_run_thread, MockWebSocketWorker, registry):
@patch('openlp.core.api.websockets.run_thread')
def test_serverstart_not_required(mocked_run_thread, MockWebSocketWorker, registry):
"""
Test the starting of the WebSockets Server with the disabled flag set off
Test the starting of the WebSockets Server with the disabled flag set on
"""
# GIVEN: A new httpserver and the server is not required
# WHEN: I start the server
# GIVEN: A new WebSocketServer and the server is not required
Registry().set_flag('no_web_server', True)
WebSocketServer()
server = WebSocketServer()
# THEN: the api environment should have been created
# WHEN: I start the server
server.start()
# THEN: the api environment should have not been created
assert mocked_run_thread.call_count == 0, 'The qthread should not have been called'
assert MockWebSocketWorker.call_count == 0, 'The http thread should not have been called'
@ -103,31 +107,6 @@ def test_poller_get_state(poller, settings):
assert poll_json['results']['item'] == '23-34-45', 'The item return value should match 23-34-45'
def test_poller_get_state_if_changed(poller, settings):
"""
Test the get_state_if_changed function returns None if the state has not changed
"""
# GIVEN: the system is configured with a set of data
poller._previous = {}
mocked_service_manager = MagicMock()
mocked_service_manager.service_id = 21
mocked_live_controller = MagicMock()
mocked_live_controller.selected_row = 5
mocked_live_controller.service_item = MagicMock()
mocked_live_controller.service_item.unique_identifier = '23-34-45'
mocked_live_controller.blank_screen.isChecked.return_value = True
mocked_live_controller.theme_screen.isChecked.return_value = False
mocked_live_controller.desktop_screen.isChecked.return_value = False
Registry().register('live_controller', mocked_live_controller)
Registry().register('service_manager', mocked_service_manager)
# WHEN: The poller polls twice
poll_json = poller.get_state_if_changed()
poll_json2 = poller.get_state_if_changed()
# THEN: The get_state_if_changed function should return None on the second call because the state has not changed
assert poll_json is not None, 'The first get_state_if_changed function call should have not returned None'
assert poll_json2 is None, 'The second get_state_if_changed function should return None'
@patch('openlp.core.api.websockets.serve')
@patch('openlp.core.api.websockets.asyncio')
@patch('openlp.core.api.websockets.log')
@ -170,3 +149,99 @@ def test_worker_start_fail(mocked_log, mocked_asyncio, mocked_serve, worker, set
event_loop.run_forever.assert_not_called()
mocked_log.exception.assert_called_once()
event_loop.close.assert_called_once_with()
def test_poller_event_attach(poller, settings):
"""
Test the event attach of WebSocketPoller
"""
# GIVEN: A mocked live_controlled, a mocked slide_controller and mocked change signals
servicemanager_changed_connect = MagicMock()
service_manager = MagicMock()
service_manager.servicemanager_changed = MagicMock()
service_manager.servicemanager_changed.connect = servicemanager_changed_connect
poller._service_manager = service_manager
live_controller = MagicMock()
slidecontroller_changed_connect = MagicMock()
live_controller.slidecontroller_changed = MagicMock()
live_controller.slidecontroller_changed.connect = slidecontroller_changed_connect
poller._live_controller = live_controller
# WHEN: the hook_signals function is called
poller.hook_signals()
# THEN: service_manager and live_controller changed signals should be connected
servicemanager_changed_connect.assert_called_once()
slidecontroller_changed_connect.assert_called_once()
def test_poller_on_change_emit(poller, settings):
"""
Test the change event emission of WebSocketPoller
"""
# GIVEN: A mocked changed signal and create_state function
poller_changed_emit = MagicMock()
poller.create_state = MagicMock(return_value={})
poller.poller_changed = MagicMock()
poller.poller_changed.emit = poller_changed_emit
# WHEN: The on_signal_received function is called
poller.on_signal_received()
# THEN: poller_changed signal should be emitted once
poller_changed_emit.assert_called_once()
def test_poller_get_state_is_never_none(poller):
"""
Test the get_state call never returns None
"""
# GIVEN: A mocked poller create_state
poller.create_state = MagicMock(return_value={"key1": "2"})
# WHEN: poller.get_state is called
state = poller.get_state()
# THEN: state is not None
assert state is not None, 'get_state() return should not be None'
@patch('openlp.core.api.websockets.poller')
@patch('openlp.core.api.websockets.run_thread')
def test_websocket_server_connects_to_poller(mock_run_thread, mock_poller, settings):
"""
Test if the websocket_server connects to WebSocketPoller
"""
# GIVEN: A mocked poller signal and a server
Registry().set_flag('no_web_server', False)
mock_poller.poller_changed = MagicMock()
mock_poller.poller_changed.connect = MagicMock()
server = WebSocketServer()
server.handle_poller_signal = MagicMock()
# WHEN: WebSocketServer is started
server.start()
# THEN: poller_changed should be connected with WebSocketServer and correct handler
mock_poller.poller_changed.connect.assert_called_once_with(server.handle_poller_signal)
@patch('openlp.core.api.websockets.poller')
@patch('openlp.core.api.websockets.WebSocketWorker.add_state_to_queues')
@patch('openlp.core.api.websockets.run_thread')
def test_websocket_worker_register_connections(mock_run_thread, mock_add_state_to_queues, mock_poller, settings):
"""
Test if the websocket_server can receive poller signals
"""
# GIVEN: A mocked poller create_state, a mocked server and a mocked worker
mock_state = {"key1": "2"}
mock_poller.get_state.return_value = mock_state
Registry().set_flag('no_web_server', False)
server = WebSocketServer()
server.start()
# WHEN: server.handle_poller_signal is called
server.handle_poller_signal()
# THEN: WebSocketWorker state notify function should be called
mock_add_state_to_queues.assert_called_once_with(mock_state)