From 3e9073275affa8dcf93b381939f585bda701a5b0 Mon Sep 17 00:00:00 2001 From: Raoul Snyman Date: Thu, 4 Jan 2018 22:32:12 -0700 Subject: [PATCH] Fix an issue with versions of websockets > 4.0 --- openlp/core/api/websockets.py | 147 ++++++++++++++-------------------- 1 file changed, 59 insertions(+), 88 deletions(-) diff --git a/openlp/core/api/websockets.py b/openlp/core/api/websockets.py index 95c50a0b7..785f3c8ff 100644 --- a/openlp/core/api/websockets.py +++ b/openlp/core/api/websockets.py @@ -28,7 +28,7 @@ import json import logging import time -import websockets +from websockets import serve from openlp.core.common.mixins import LogMixin, RegistryProperties from openlp.core.common.registry import Registry @@ -38,31 +38,76 @@ from openlp.core.threading import ThreadWorker, run_thread log = logging.getLogger(__name__) -class WebSocketWorker(ThreadWorker): +async def handle_websocket(request, path): + """ + Handle web socket requests and return the poll information. + Check ever 0.2 seconds to get the latest position and send if changed. + Only gets triggered when 1st client attaches + + :param request: request from client + :param path: determines the endpoints supported + :return: + """ + log.debug('WebSocket handler registered with client') + previous_poll = None + previous_main_poll = None + poller = Registry().get('poller') + if path == '/state': + while True: + current_poll = poller.poll() + if current_poll != previous_poll: + await request.send(json.dumps(current_poll).encode()) + previous_poll = current_poll + await asyncio.sleep(0.2) + elif path == '/live_changed': + while True: + main_poll = poller.main_poll() + if main_poll != previous_main_poll: + await request.send(main_poll) + previous_main_poll = main_poll + await asyncio.sleep(0.2) + + +class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin): """ A special Qt thread class to allow the WebSockets server to run at the same time as the UI. """ - def __init__(self, server): - """ - Constructor for the thread class. - - :param server: The http server class. - """ - self.ws_server = server - super().__init__() - def start(self): """ Run the worker. """ - self.ws_server.start_server() + address = Settings().value('api/ip address') + port = Settings().value('api/websocket port') + # Start the event loop + event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(event_loop) + # Create the websocker server + loop = 1 + self.server = None + while not self.server: + try: + self.server = serve(handle_websocket, address, port) + log.debug('WebSocket server started on {addr}:{port}'.format(addr=address, port=port)) + except Exception as e: + log.exception('Failed to start WebSocket server') + loop += 1 + time.sleep(0.1) + if not self.server and loop > 3: + log.error('Unable to start WebSocket server {addr}:{port}, giving up'.format(addr=address, port=port)) + if self.server: + # If the websocket server exists, start listening + event_loop.run_until_complete(self.server) + event_loop.run_forever() self.quit.emit() def stop(self): """ Stop the websocket server """ - self.ws_server.stop_server() + if hasattr(self.server, 'ws_server'): + self.server.ws_server.close() + elif hasattr(self.server, 'server'): + self.server.server.close() class WebSocketServer(RegistryProperties, LogMixin): @@ -75,79 +120,5 @@ class WebSocketServer(RegistryProperties, LogMixin): """ super(WebSocketServer, self).__init__() if Registry().get_flag('no_web_server'): - self.settings_section = 'api' - worker = WebSocketWorker(self) + worker = WebSocketWorker() run_thread(worker, 'websocket_server') - - def start_server(self): - """ - Start the correct server and save the handler - """ - address = Settings().value(self.settings_section + '/ip address') - port = Settings().value(self.settings_section + '/websocket port') - self.start_websocket_instance(address, port) - # If web socket server start listening - if hasattr(self, 'ws_server') and self.ws_server: - event_loop = asyncio.new_event_loop() - asyncio.set_event_loop(event_loop) - event_loop.run_until_complete(self.ws_server) - event_loop.run_forever() - else: - log.debug('Failed to start ws server on port {port}'.format(port=port)) - - def stop_server(self): - """ - Stop the websocket server - """ - if hasattr(self.ws_server, 'ws_server'): - self.ws_server.ws_server.close() - elif hasattr(self.ws_server, 'server'): - self.ws_server.server.close() - - def start_websocket_instance(self, address, port): - """ - Start the server - - :param address: The server address - :param port: The run port - """ - loop = 1 - while loop < 4: - try: - self.ws_server = websockets.serve(self.handle_websocket, address, port) - log.debug("Web Socket Server started for class {address} {port}".format(address=address, port=port)) - break - except Exception as e: - log.exception('Failed to start ws server {why}'.format(why=e)) - loop += 1 - time.sleep(0.1) - - @staticmethod - async def handle_websocket(request, path): - """ - Handle web socket requests and return the poll information. - Check ever 0.2 seconds to get the latest position and send if changed. - Only gets triggered when 1st client attaches - - :param request: request from client - :param path: determines the endpoints supported - :return: - """ - log.debug("web socket handler registered with client") - previous_poll = None - previous_main_poll = None - poller = Registry().get('poller') - if path == '/state': - while True: - current_poll = poller.poll() - if current_poll != previous_poll: - await request.send(json.dumps(current_poll).encode()) - previous_poll = current_poll - await asyncio.sleep(0.2) - elif path == '/live_changed': - while True: - main_poll = poller.main_poll() - if main_poll != previous_main_poll: - await request.send(main_poll) - previous_main_poll = main_poll - await asyncio.sleep(0.2)