forked from openlp/openlp
Merge branch 'websocket-poll-by-events' into 'master'
Make Websocket Server and Poller to notify state change based on events See merge request openlp/openlp!340
This commit is contained in:
commit
290252ce51
@ -25,6 +25,9 @@ changes from within OpenLP. It uses JSON to communicate with the remotes.
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from PyQt5 import QtCore
|
||||
import time
|
||||
|
||||
from websockets import serve
|
||||
@ -44,63 +47,6 @@ ws_logger = logging.getLogger('websockets')
|
||||
ws_logger.setLevel(logging.ERROR)
|
||||
|
||||
|
||||
async def handle_websocket(websocket, path):
|
||||
"""
|
||||
Handle web socket requests and return the state information
|
||||
Check every 0.2 seconds to get the latest position and send if it changed.
|
||||
|
||||
:param websocket: request from client
|
||||
:param path: determines the endpoints supported - Not needed
|
||||
"""
|
||||
log.debug('WebSocket handle_websocket connection')
|
||||
await register(websocket)
|
||||
reply = poller.get_state()
|
||||
if reply:
|
||||
json_reply = json.dumps(reply).encode()
|
||||
await websocket.send(json_reply)
|
||||
while True:
|
||||
try:
|
||||
await notify_users()
|
||||
await asyncio.wait_for(websocket.recv(), 0.2)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
except Exception:
|
||||
await unregister(websocket)
|
||||
break
|
||||
|
||||
|
||||
async def register(websocket):
|
||||
"""
|
||||
Register Clients
|
||||
:param websocket: The client details
|
||||
:return:
|
||||
"""
|
||||
log.debug('WebSocket handler register')
|
||||
USERS.add(websocket)
|
||||
|
||||
|
||||
async def unregister(websocket):
|
||||
"""
|
||||
Unregister Clients
|
||||
:param websocket: The client details
|
||||
:return:
|
||||
"""
|
||||
log.debug('WebSocket handler unregister')
|
||||
USERS.remove(websocket)
|
||||
|
||||
|
||||
async def notify_users():
|
||||
"""
|
||||
Dispatch state to all registered users if we have any changes
|
||||
:return:
|
||||
"""
|
||||
if USERS: # asyncio.wait doesn't accept an empty list
|
||||
reply = poller.get_state_if_changed()
|
||||
if reply:
|
||||
json_reply = json.dumps(reply).encode()
|
||||
await asyncio.wait([user.send(json_reply) for user in USERS])
|
||||
|
||||
|
||||
class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
|
||||
"""
|
||||
A special Qt thread class to allow the WebSockets server to run at the same time as the UI.
|
||||
@ -118,9 +64,10 @@ class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
|
||||
# Create the websocker server
|
||||
loop = 1
|
||||
self.server = None
|
||||
self.queues = set()
|
||||
while not self.server:
|
||||
try:
|
||||
self.server = serve(handle_websocket, address, port)
|
||||
self.server = serve(self.handle_websocket, address, port)
|
||||
log.debug('WebSocket server started on {addr}:{port}'.format(addr=address, port=port))
|
||||
except Exception:
|
||||
log.exception('Failed to start WebSocket server')
|
||||
@ -143,18 +90,121 @@ class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
|
||||
"""
|
||||
Stop the websocket server
|
||||
"""
|
||||
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
|
||||
try:
|
||||
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
async def handle_websocket(self, websocket, path):
|
||||
"""
|
||||
Handle web socket requests and return the state information
|
||||
Waits for information to come over queue
|
||||
|
||||
:param websocket: request from client
|
||||
:param path: determines the endpoints supported - Not needed
|
||||
"""
|
||||
client_id = uuid.uuid4() if log.getEffectiveLevel() == logging.DEBUG else 0
|
||||
log.debug(f'(client_id={client_id}) WebSocket handle_websocket connection')
|
||||
queue = asyncio.Queue()
|
||||
await self.register(websocket, client_id, queue)
|
||||
try:
|
||||
reply = poller.get_state()
|
||||
if reply:
|
||||
await self.send_reply(websocket, client_id, reply)
|
||||
while True:
|
||||
done, pending = await asyncio.wait(
|
||||
[asyncio.create_task(queue.get()), asyncio.create_task(websocket.wait_closed())],
|
||||
return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
# If there is a new item in client's queue, await_result will contain an item, if the connection is
|
||||
# closed, it will be None.
|
||||
await_result = done.pop().result()
|
||||
if await_result is not None:
|
||||
await self.send_reply(websocket, client_id, await_result)
|
||||
else:
|
||||
break
|
||||
finally:
|
||||
await self.unregister(websocket, client_id, queue)
|
||||
|
||||
async def register(self, websocket, client_id, queue):
|
||||
"""
|
||||
Register Clients
|
||||
:param websocket: The client details
|
||||
:param queue: The Command Queue
|
||||
:return:
|
||||
"""
|
||||
log.debug(f'(client_id={client_id}) WebSocket handler register')
|
||||
USERS.add(websocket)
|
||||
self.queues.add(queue)
|
||||
log.debug('WebSocket clients count: {client_count}'.format(client_count=len(USERS)))
|
||||
|
||||
async def unregister(self, websocket, client_id, queue):
|
||||
"""
|
||||
Unregister Clients
|
||||
:param websocket: The client details
|
||||
:return:
|
||||
"""
|
||||
USERS.remove(websocket)
|
||||
self.queues.remove(queue)
|
||||
log.debug(f'(client_id={client_id}) WebSocket handler unregister')
|
||||
log.debug('WebSocket clients count: {client_count}'.format(client_count=len(USERS)))
|
||||
|
||||
async def send_reply(self, websocket, client_id, reply):
|
||||
json_reply = json.dumps(reply).encode()
|
||||
await websocket.send(json_reply)
|
||||
log.debug(f'(client_id={client_id}) WebSocket send reply: {json_reply}')
|
||||
|
||||
def add_state_to_queues(self, state):
|
||||
"""
|
||||
Inserts the state in each connection message queue
|
||||
:param state: OpenLP State
|
||||
"""
|
||||
for queue in self.queues.copy():
|
||||
self.event_loop.call_soon_threadsafe(queue.put_nowait, state)
|
||||
|
||||
|
||||
class WebSocketServer(RegistryProperties, LogMixin):
|
||||
class WebSocketServer(RegistryProperties, QtCore.QObject, LogMixin):
|
||||
"""
|
||||
Wrapper round a server instance
|
||||
"""
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialise and start the WebSockets server
|
||||
Initialise the WebSockets server
|
||||
"""
|
||||
super(WebSocketServer, self).__init__()
|
||||
if not Registry().get_flag('no_web_server'):
|
||||
worker = WebSocketWorker()
|
||||
run_thread(worker, 'websocket_server')
|
||||
self.worker = None
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Starts the WebSockets server
|
||||
"""
|
||||
if self.worker is None and not Registry().get_flag('no_web_server'):
|
||||
self.worker = WebSocketWorker()
|
||||
run_thread(self.worker, 'websocket_server')
|
||||
poller.poller_changed.connect(self.handle_poller_signal)
|
||||
# Only hooking poller signals after all UI is available
|
||||
Registry().register_function('bootstrap_completion', self.try_poller_hook_signals)
|
||||
|
||||
@QtCore.pyqtSlot()
|
||||
def handle_poller_signal(self):
|
||||
if self.worker is not None:
|
||||
self.worker.add_state_to_queues(poller.get_state())
|
||||
|
||||
def try_poller_hook_signals(self):
|
||||
try:
|
||||
poller.hook_signals()
|
||||
except Exception:
|
||||
log.error('Failed to hook poller signals!')
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Closes the WebSocket server and detach associated signals
|
||||
"""
|
||||
try:
|
||||
poller.poller_changed.disconnect(self.handle_poller_signal)
|
||||
poller.unhook_signals()
|
||||
self.worker.stop()
|
||||
finally:
|
||||
self.worker = None
|
||||
|
@ -18,21 +18,30 @@
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
##########################################################################
|
||||
from PyQt5 import QtCore
|
||||
from openlp.core.common.mixins import RegistryProperties
|
||||
|
||||
|
||||
class WebSocketPoller(RegistryProperties):
|
||||
class WebSocketPoller(QtCore.QObject, RegistryProperties):
|
||||
"""
|
||||
Accessed by web sockets to get status type information from the application
|
||||
"""
|
||||
|
||||
poller_changed = QtCore.pyqtSignal()
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Constructor for the web sockets poll builder class.
|
||||
"""
|
||||
super(WebSocketPoller, self).__init__()
|
||||
self._previous = {}
|
||||
self._state = None
|
||||
|
||||
def get_state(self):
|
||||
if self._state is None:
|
||||
self._state = self.create_state()
|
||||
return self._state
|
||||
|
||||
def create_state(self):
|
||||
return {'results': {
|
||||
'counter': self.live_controller.slide_count if self.live_controller.slide_count else 0,
|
||||
'service': self.service_manager.service_id,
|
||||
@ -47,17 +56,20 @@ class WebSocketPoller(RegistryProperties):
|
||||
'chordNotation': self.settings.value('songs/chord notation')
|
||||
}}
|
||||
|
||||
def get_state_if_changed(self):
|
||||
"""
|
||||
Poll OpenLP to determine current state if it has changed.
|
||||
def hook_signals(self):
|
||||
self.live_controller.slidecontroller_changed.connect(self.on_signal_received)
|
||||
self.service_manager.servicemanager_changed.connect(self.on_signal_received)
|
||||
|
||||
This must only be used by web sockets or else we could miss a state change.
|
||||
def unhook_signals(self):
|
||||
try:
|
||||
self.live_controller.slidecontroller_changed.disconnect(self.on_signal_received)
|
||||
self.service_manager.servicemanager_changed.disconnect(self.on_signal_received)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
:return: The current application state or None if unchanged since last call
|
||||
"""
|
||||
current = self.get_state()
|
||||
if self._previous != current:
|
||||
self._previous = current
|
||||
return current
|
||||
else:
|
||||
return None
|
||||
@QtCore.pyqtSlot(list)
|
||||
@QtCore.pyqtSlot(str)
|
||||
@QtCore.pyqtSlot()
|
||||
def on_signal_received(self):
|
||||
self._state = self.create_state()
|
||||
self.poller_changed.emit()
|
||||
|
@ -481,6 +481,7 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, LogMixin, RegistryPropert
|
||||
self.settings.set_up_default_values()
|
||||
self.about_form = AboutForm(self)
|
||||
self.ws_server = WebSocketServer()
|
||||
self.ws_server.start()
|
||||
self.http_server = HttpServer(self)
|
||||
start_zeroconf()
|
||||
SettingsForm(self)
|
||||
@ -1095,6 +1096,8 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow, LogMixin, RegistryPropert
|
||||
# Check if we need to change the data directory
|
||||
if self.new_data_path:
|
||||
self.change_data_directory()
|
||||
# Close down WebSocketServer
|
||||
self.ws_server.close()
|
||||
# Close down the display
|
||||
self.live_controller.close_displays()
|
||||
# Clean temporary files used by services
|
||||
|
@ -312,6 +312,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
|
||||
servicemanager_next_item = QtCore.pyqtSignal()
|
||||
servicemanager_previous_item = QtCore.pyqtSignal()
|
||||
servicemanager_new_file = QtCore.pyqtSignal()
|
||||
servicemanager_changed = QtCore.pyqtSignal()
|
||||
theme_update_service = QtCore.pyqtSignal()
|
||||
|
||||
def __init__(self, parent=None):
|
||||
@ -392,6 +393,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
|
||||
else:
|
||||
service_file = translate('OpenLP.ServiceManager', 'Untitled Service')
|
||||
self.main_window.set_service_modified(modified, service_file)
|
||||
self.servicemanager_changed.emit()
|
||||
|
||||
def is_modified(self):
|
||||
"""
|
||||
@ -542,6 +544,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
|
||||
self.settings.setValue('servicemanager/last file', None)
|
||||
self.plugin_manager.new_service_created()
|
||||
self.live_controller.slide_count = 0
|
||||
self.servicemanager_changed.emit()
|
||||
|
||||
def create_basic_service(self):
|
||||
"""
|
||||
@ -1293,6 +1296,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
|
||||
self.service_items.remove(self.service_items[item])
|
||||
self.repaint_service_list(item - 1, -1)
|
||||
self.set_modified()
|
||||
self.servicemanager_changed.emit()
|
||||
|
||||
def repaint_service_list(self, service_item, service_item_child):
|
||||
"""
|
||||
@ -1590,6 +1594,7 @@ class ServiceManager(QtWidgets.QWidget, RegistryBase, Ui_ServiceManager, LogMixi
|
||||
child = row
|
||||
self.application.set_busy_cursor()
|
||||
if self.service_items[item]['service_item'].is_valid:
|
||||
self.servicemanager_changed.emit()
|
||||
self.live_controller.add_service_manager_item(self.service_items[item]['service_item'], child)
|
||||
if self.settings.value('core/auto preview'):
|
||||
item += 1
|
||||
|
@ -141,6 +141,8 @@ class SlideController(QtWidgets.QWidget, LogMixin, RegistryProperties):
|
||||
user uses to control the displaying of verses/slides/etc on the screen.
|
||||
"""
|
||||
|
||||
slidecontroller_changed = QtCore.pyqtSignal()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""
|
||||
Set up the Slide Controller.
|
||||
@ -1278,6 +1280,7 @@ class SlideController(QtWidgets.QWidget, LogMixin, RegistryProperties):
|
||||
The output has changed so we need to poke to POLL Websocket.
|
||||
"""
|
||||
self.slide_count += 1
|
||||
self.slidecontroller_changed.emit()
|
||||
|
||||
def display_maindisplay(self):
|
||||
"""
|
||||
|
@ -45,12 +45,14 @@ def worker(settings):
|
||||
@patch('openlp.core.api.websockets.run_thread')
|
||||
def test_serverstart(mocked_run_thread, MockWebSocketWorker, registry):
|
||||
"""
|
||||
Test the starting of the WebSockets Server with the disabled flag set on
|
||||
Test the starting of the WebSockets Server with the disabled flag set off
|
||||
"""
|
||||
# GIVEN: A new httpserver
|
||||
# WHEN: I start the server
|
||||
# GIVEN: A new WebSocketServer
|
||||
Registry().set_flag('no_web_server', False)
|
||||
WebSocketServer()
|
||||
server = WebSocketServer()
|
||||
|
||||
# WHEN: I start the server
|
||||
server.start()
|
||||
|
||||
# THEN: the api environment should have been created
|
||||
assert mocked_run_thread.call_count == 1, 'The qthread should have been called once'
|
||||
@ -61,14 +63,16 @@ def test_serverstart(mocked_run_thread, MockWebSocketWorker, registry):
|
||||
@patch('openlp.core.api.websockets.run_thread')
|
||||
def test_serverstart_not_required(mocked_run_thread, MockWebSocketWorker, registry):
|
||||
"""
|
||||
Test the starting of the WebSockets Server with the disabled flag set off
|
||||
Test the starting of the WebSockets Server with the disabled flag set on
|
||||
"""
|
||||
# GIVEN: A new httpserver and the server is not required
|
||||
# WHEN: I start the server
|
||||
# GIVEN: A new WebSocketServer and the server is not required
|
||||
Registry().set_flag('no_web_server', True)
|
||||
WebSocketServer()
|
||||
server = WebSocketServer()
|
||||
|
||||
# THEN: the api environment should have been created
|
||||
# WHEN: I start the server
|
||||
server.start()
|
||||
|
||||
# THEN: the api environment should have not been created
|
||||
assert mocked_run_thread.call_count == 0, 'The qthread should not have been called'
|
||||
assert MockWebSocketWorker.call_count == 0, 'The http thread should not have been called'
|
||||
|
||||
@ -103,31 +107,6 @@ def test_poller_get_state(poller, settings):
|
||||
assert poll_json['results']['item'] == '23-34-45', 'The item return value should match 23-34-45'
|
||||
|
||||
|
||||
def test_poller_get_state_if_changed(poller, settings):
|
||||
"""
|
||||
Test the get_state_if_changed function returns None if the state has not changed
|
||||
"""
|
||||
# GIVEN: the system is configured with a set of data
|
||||
poller._previous = {}
|
||||
mocked_service_manager = MagicMock()
|
||||
mocked_service_manager.service_id = 21
|
||||
mocked_live_controller = MagicMock()
|
||||
mocked_live_controller.selected_row = 5
|
||||
mocked_live_controller.service_item = MagicMock()
|
||||
mocked_live_controller.service_item.unique_identifier = '23-34-45'
|
||||
mocked_live_controller.blank_screen.isChecked.return_value = True
|
||||
mocked_live_controller.theme_screen.isChecked.return_value = False
|
||||
mocked_live_controller.desktop_screen.isChecked.return_value = False
|
||||
Registry().register('live_controller', mocked_live_controller)
|
||||
Registry().register('service_manager', mocked_service_manager)
|
||||
# WHEN: The poller polls twice
|
||||
poll_json = poller.get_state_if_changed()
|
||||
poll_json2 = poller.get_state_if_changed()
|
||||
# THEN: The get_state_if_changed function should return None on the second call because the state has not changed
|
||||
assert poll_json is not None, 'The first get_state_if_changed function call should have not returned None'
|
||||
assert poll_json2 is None, 'The second get_state_if_changed function should return None'
|
||||
|
||||
|
||||
@patch('openlp.core.api.websockets.serve')
|
||||
@patch('openlp.core.api.websockets.asyncio')
|
||||
@patch('openlp.core.api.websockets.log')
|
||||
@ -170,3 +149,99 @@ def test_worker_start_fail(mocked_log, mocked_asyncio, mocked_serve, worker, set
|
||||
event_loop.run_forever.assert_not_called()
|
||||
mocked_log.exception.assert_called_once()
|
||||
event_loop.close.assert_called_once_with()
|
||||
|
||||
|
||||
def test_poller_event_attach(poller, settings):
|
||||
"""
|
||||
Test the event attach of WebSocketPoller
|
||||
"""
|
||||
# GIVEN: A mocked live_controlled, a mocked slide_controller and mocked change signals
|
||||
servicemanager_changed_connect = MagicMock()
|
||||
service_manager = MagicMock()
|
||||
service_manager.servicemanager_changed = MagicMock()
|
||||
service_manager.servicemanager_changed.connect = servicemanager_changed_connect
|
||||
poller._service_manager = service_manager
|
||||
live_controller = MagicMock()
|
||||
slidecontroller_changed_connect = MagicMock()
|
||||
live_controller.slidecontroller_changed = MagicMock()
|
||||
live_controller.slidecontroller_changed.connect = slidecontroller_changed_connect
|
||||
poller._live_controller = live_controller
|
||||
|
||||
# WHEN: the hook_signals function is called
|
||||
poller.hook_signals()
|
||||
|
||||
# THEN: service_manager and live_controller changed signals should be connected
|
||||
servicemanager_changed_connect.assert_called_once()
|
||||
slidecontroller_changed_connect.assert_called_once()
|
||||
|
||||
|
||||
def test_poller_on_change_emit(poller, settings):
|
||||
"""
|
||||
Test the change event emission of WebSocketPoller
|
||||
"""
|
||||
# GIVEN: A mocked changed signal and create_state function
|
||||
poller_changed_emit = MagicMock()
|
||||
poller.create_state = MagicMock(return_value={})
|
||||
poller.poller_changed = MagicMock()
|
||||
poller.poller_changed.emit = poller_changed_emit
|
||||
|
||||
# WHEN: The on_signal_received function is called
|
||||
poller.on_signal_received()
|
||||
|
||||
# THEN: poller_changed signal should be emitted once
|
||||
poller_changed_emit.assert_called_once()
|
||||
|
||||
|
||||
def test_poller_get_state_is_never_none(poller):
|
||||
"""
|
||||
Test the get_state call never returns None
|
||||
"""
|
||||
# GIVEN: A mocked poller create_state
|
||||
poller.create_state = MagicMock(return_value={"key1": "2"})
|
||||
|
||||
# WHEN: poller.get_state is called
|
||||
state = poller.get_state()
|
||||
|
||||
# THEN: state is not None
|
||||
assert state is not None, 'get_state() return should not be None'
|
||||
|
||||
|
||||
@patch('openlp.core.api.websockets.poller')
|
||||
@patch('openlp.core.api.websockets.run_thread')
|
||||
def test_websocket_server_connects_to_poller(mock_run_thread, mock_poller, settings):
|
||||
"""
|
||||
Test if the websocket_server connects to WebSocketPoller
|
||||
"""
|
||||
# GIVEN: A mocked poller signal and a server
|
||||
Registry().set_flag('no_web_server', False)
|
||||
mock_poller.poller_changed = MagicMock()
|
||||
mock_poller.poller_changed.connect = MagicMock()
|
||||
server = WebSocketServer()
|
||||
server.handle_poller_signal = MagicMock()
|
||||
|
||||
# WHEN: WebSocketServer is started
|
||||
server.start()
|
||||
|
||||
# THEN: poller_changed should be connected with WebSocketServer and correct handler
|
||||
mock_poller.poller_changed.connect.assert_called_once_with(server.handle_poller_signal)
|
||||
|
||||
|
||||
@patch('openlp.core.api.websockets.poller')
|
||||
@patch('openlp.core.api.websockets.WebSocketWorker.add_state_to_queues')
|
||||
@patch('openlp.core.api.websockets.run_thread')
|
||||
def test_websocket_worker_register_connections(mock_run_thread, mock_add_state_to_queues, mock_poller, settings):
|
||||
"""
|
||||
Test if the websocket_server can receive poller signals
|
||||
"""
|
||||
# GIVEN: A mocked poller create_state, a mocked server and a mocked worker
|
||||
mock_state = {"key1": "2"}
|
||||
mock_poller.get_state.return_value = mock_state
|
||||
Registry().set_flag('no_web_server', False)
|
||||
server = WebSocketServer()
|
||||
server.start()
|
||||
|
||||
# WHEN: server.handle_poller_signal is called
|
||||
server.handle_poller_signal()
|
||||
|
||||
# THEN: WebSocketWorker state notify function should be called
|
||||
mock_add_state_to_queues.assert_called_once_with(mock_state)
|
||||
|
Loading…
Reference in New Issue
Block a user