Fix an issue with versions of websockets > 4.0

This commit is contained in:
Raoul Snyman 2018-01-04 22:32:12 -07:00
parent 2d009f1884
commit 3e9073275a

View File

@ -28,7 +28,7 @@ import json
import logging import logging
import time import time
import websockets from websockets import serve
from openlp.core.common.mixins import LogMixin, RegistryProperties from openlp.core.common.mixins import LogMixin, RegistryProperties
from openlp.core.common.registry import Registry from openlp.core.common.registry import Registry
@ -38,31 +38,76 @@ from openlp.core.threading import ThreadWorker, run_thread
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class WebSocketWorker(ThreadWorker): async def handle_websocket(request, path):
"""
Handle web socket requests and return the poll information.
Check ever 0.2 seconds to get the latest position and send if changed.
Only gets triggered when 1st client attaches
:param request: request from client
:param path: determines the endpoints supported
:return:
"""
log.debug('WebSocket handler registered with client')
previous_poll = None
previous_main_poll = None
poller = Registry().get('poller')
if path == '/state':
while True:
current_poll = poller.poll()
if current_poll != previous_poll:
await request.send(json.dumps(current_poll).encode())
previous_poll = current_poll
await asyncio.sleep(0.2)
elif path == '/live_changed':
while True:
main_poll = poller.main_poll()
if main_poll != previous_main_poll:
await request.send(main_poll)
previous_main_poll = main_poll
await asyncio.sleep(0.2)
class WebSocketWorker(ThreadWorker, RegistryProperties, LogMixin):
""" """
A special Qt thread class to allow the WebSockets server to run at the same time as the UI. A special Qt thread class to allow the WebSockets server to run at the same time as the UI.
""" """
def __init__(self, server):
"""
Constructor for the thread class.
:param server: The http server class.
"""
self.ws_server = server
super().__init__()
def start(self): def start(self):
""" """
Run the worker. Run the worker.
""" """
self.ws_server.start_server() address = Settings().value('api/ip address')
port = Settings().value('api/websocket port')
# Start the event loop
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
# Create the websocker server
loop = 1
self.server = None
while not self.server:
try:
self.server = serve(handle_websocket, address, port)
log.debug('WebSocket server started on {addr}:{port}'.format(addr=address, port=port))
except Exception as e:
log.exception('Failed to start WebSocket server')
loop += 1
time.sleep(0.1)
if not self.server and loop > 3:
log.error('Unable to start WebSocket server {addr}:{port}, giving up'.format(addr=address, port=port))
if self.server:
# If the websocket server exists, start listening
event_loop.run_until_complete(self.server)
event_loop.run_forever()
self.quit.emit() self.quit.emit()
def stop(self): def stop(self):
""" """
Stop the websocket server Stop the websocket server
""" """
self.ws_server.stop_server() if hasattr(self.server, 'ws_server'):
self.server.ws_server.close()
elif hasattr(self.server, 'server'):
self.server.server.close()
class WebSocketServer(RegistryProperties, LogMixin): class WebSocketServer(RegistryProperties, LogMixin):
@ -75,79 +120,5 @@ class WebSocketServer(RegistryProperties, LogMixin):
""" """
super(WebSocketServer, self).__init__() super(WebSocketServer, self).__init__()
if Registry().get_flag('no_web_server'): if Registry().get_flag('no_web_server'):
self.settings_section = 'api' worker = WebSocketWorker()
worker = WebSocketWorker(self)
run_thread(worker, 'websocket_server') run_thread(worker, 'websocket_server')
def start_server(self):
"""
Start the correct server and save the handler
"""
address = Settings().value(self.settings_section + '/ip address')
port = Settings().value(self.settings_section + '/websocket port')
self.start_websocket_instance(address, port)
# If web socket server start listening
if hasattr(self, 'ws_server') and self.ws_server:
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
event_loop.run_until_complete(self.ws_server)
event_loop.run_forever()
else:
log.debug('Failed to start ws server on port {port}'.format(port=port))
def stop_server(self):
"""
Stop the websocket server
"""
if hasattr(self.ws_server, 'ws_server'):
self.ws_server.ws_server.close()
elif hasattr(self.ws_server, 'server'):
self.ws_server.server.close()
def start_websocket_instance(self, address, port):
"""
Start the server
:param address: The server address
:param port: The run port
"""
loop = 1
while loop < 4:
try:
self.ws_server = websockets.serve(self.handle_websocket, address, port)
log.debug("Web Socket Server started for class {address} {port}".format(address=address, port=port))
break
except Exception as e:
log.exception('Failed to start ws server {why}'.format(why=e))
loop += 1
time.sleep(0.1)
@staticmethod
async def handle_websocket(request, path):
"""
Handle web socket requests and return the poll information.
Check ever 0.2 seconds to get the latest position and send if changed.
Only gets triggered when 1st client attaches
:param request: request from client
:param path: determines the endpoints supported
:return:
"""
log.debug("web socket handler registered with client")
previous_poll = None
previous_main_poll = None
poller = Registry().get('poller')
if path == '/state':
while True:
current_poll = poller.poll()
if current_poll != previous_poll:
await request.send(json.dumps(current_poll).encode())
previous_poll = current_poll
await asyncio.sleep(0.2)
elif path == '/live_changed':
while True:
main_poll = poller.main_poll()
if main_poll != previous_main_poll:
await request.send(main_poll)
previous_main_poll = main_poll
await asyncio.sleep(0.2)