Update WebSocket code to run on Thread and test from Thread.

This commit is contained in:
Tim 2020-06-20 11:18:05 +01:00
parent b616f41b4e
commit 3281f241fa
No known key found for this signature in database
GPG Key ID: 3D454289AF831A6D
3 changed files with 53 additions and 70 deletions

View File

@ -23,11 +23,13 @@ import requests
import string
import time
import random
import threading
import websocket
from websocket import create_connection
from test_api.apitest.constants import BookNames
from test_api.apitest.logger import print_text, print_error, print_ok
from test_api.apitest.logger import print_text, print_error, print_ok, print_info
class RunTestsController(object):
@ -37,7 +39,41 @@ class RunTestsController(object):
self.http_port = http_port
self.ws_port = ws_port
self.reserved_result_stage = None
self.reserved_result_live = None
self.received = False
def connect(self) -> None:
print_info("Starting thread")
self.ws = websocket.WebSocketApp(f'ws://{self.address}:{self.ws_port}',
on_message=self.on_message,
on_close=self.on_close,
on_open=self.on_open,
on_error=self.on_error,
)
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
self.wst.daemon = True
self.wst.start()
conn_timeout = 5
while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout:
human_delay(1)
conn_timeout -= 1
if not conn_timeout:
print_error("Could not connect to WS! Exiting.")
def on_message(self, message: str) -> None:
print_info("Message returned")
self.result_stage = message
self.compare_stage()
self.received = True
def on_open(self) -> None:
print_info("opened")
def on_close(self) -> None:
print_info("closed")
def on_error(self, error: str) -> None:
print_error(f'WebSocket Error: {error}')
def load_and_check_sockets(self, first_run: bool = False) -> bool:
ws = create_connection(f'ws://{self.address}:{self.ws_port}')
@ -61,16 +97,18 @@ class RunTestsController(object):
"""
:return:
"""
self.stage_diff = {}
reserved = json.loads(self.reserved_result_stage.decode('utf-8'))['results']
current = json.loads(self.result_stage.decode('utf-8'))['results']
# compare_strings(reserved, current)
assert len(reserved) == len(current)
for a, _ in current.items():
# compare_strings(reserved[a], current[a])
if reserved[a] != current[a]:
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
self.reserved_result_stage = self.result_stage
if self.received:
self.stage_diff = {}
reserved = json.loads(self.reserved_result_stage.decode('utf-8'))['results']
current = json.loads(self.result_stage.decode('utf-8'))['results']
self.received = False
# compare_strings(reserved, current)
assert len(reserved) == len(current)
for a, _ in current.items():
# compare_strings(reserved[a], current[a])
if reserved[a] != current[a]:
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
self.reserved_result_stage = self.result_stage
def marshal_full(self):
print_ok('Running full test script')
@ -295,8 +333,8 @@ def live_item(rtc: RunTestsController, plugin: str) -> None:
def check_websocket_changes(rtc: RunTestsController, manditary: int, optional: int) -> None:
rtc.load_and_check_sockets()
rtc.compare_stage()
while not rtc.received:
time.sleep(0.1)
if manditary <= len(rtc.stage_diff) <= optional:
pass
else:

View File

@ -41,6 +41,7 @@ def start() -> None:
print_ok(f'OpenLP is running network Address {op_address}')
print_ok(f'OpenLP is running on port (ws) {op_ws_port}')
rtc = RunTestsController(op_address, op_http_port, op_ws_port)
rtc.connect()
rtc.marshal_full()
#nrtc.marshal_media()
print_text('Finished running tests')

56
x1.py
View File

@ -1,56 +0,0 @@
import websocket
import ssl
import sys
from time import sleep
import threading
class MyWebSocket():
def connect(self, wsURL="wss://www.bitmex.com/realtime?subscribe=quote:XBTUSD"):
print("Starting thread")
ssl_defaults = ssl.get_default_verify_paths()
sslopt_ca_certs = {"ca_certs": ssl_defaults.cafile}
self.ws = websocket.WebSocketApp(wsURL,
on_message=self.__on_message,
on_close=self.__on_close,
on_open=self.__on_open,
on_error=self.__on_error,
)
self.wst = threading.Thread(target=lambda: self.ws.run_forever(sslopt=sslopt_ca_certs))
self.wst.daemon = True
self.wst.start()
conn_timeout = 5
while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout:
sleep(1)
conn_timeout -= 1
if not conn_timeout:
print("Could not connect to WS! Exiting.")
sys.exit(1)
def __on_message(self, message):
print(message)
def __on_open(self):
print("opened")
def __on_close(self):
print("closed")
def __on_error(self, error):
print("error")
def run(self):
self.connect()
while True:
print("hello mum")
sleep(1)
websocket.enableTrace(True)
mysocket = MyWebSocket()
mysocket.run()
#mysocket.connect()
#while True:#
# sleep(1)
print("the end")