mirror of
https://gitlab.com/openlp/openlp_api_tester.git
synced 2024-12-25 11:14:08 +00:00
199 lines
7.8 KiB
Python
199 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
##########################################################################
|
|
# OpenLP - Open Source Lyrics Projection #
|
|
# ---------------------------------------------------------------------- #
|
|
# Copyright (c) 2008-2020 OpenLP Developers #
|
|
# ---------------------------------------------------------------------- #
|
|
# This program is free software: you can redistribute it and/or modify #
|
|
# it under the terms of the GNU General Public License as published by #
|
|
# the Free Software Foundation, either version 3 of the License, or #
|
|
# (at your option) any later version. #
|
|
# #
|
|
# This program is distributed in the hope that it will be useful, #
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
|
# GNU General Public License for more details. #
|
|
# #
|
|
# You should have received a copy of the GNU General Public License #
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
|
##########################################################################
|
|
import argparse
|
|
import threading
|
|
import websocket
|
|
import yaml
|
|
|
|
from collections import deque
|
|
from websocket import create_connection
|
|
|
|
from test_api.apitest.logger import print_error, print_ok, print_info, print_debug
|
|
|
|
from test_api.apitest.callbacks import * # noqa E403
|
|
|
|
|
|
from test_api.apitest.task import Task
|
|
from test_api.apitest.task import human_delay
|
|
|
|
|
|
class RunTestsController(object):
|
|
|
|
def __init__(self, address, http_port: str, ws_port: str):
|
|
self.address = address
|
|
self.http_port = http_port
|
|
self.ws_port = ws_port
|
|
reserved_result_stage = {'results': ''}
|
|
reserved_result_stage = json.dumps(reserved_result_stage)
|
|
self.reserved_result_stage = bytes(reserved_result_stage, 'utf-8')
|
|
self.received = False
|
|
self.tasks = deque()
|
|
self.pending = deque()
|
|
self.ws_data = deque()
|
|
self.base_url = f'http://{self.address}:{self.http_port}/api/v2/'
|
|
|
|
def connect(self) -> None:
|
|
print_info("Starting thread")
|
|
self.ws = websocket.WebSocketApp(f'ws://{self.address}:{self.ws_port}',
|
|
on_message=self.on_message,
|
|
on_close=self.on_close,
|
|
on_open=self.on_open,
|
|
on_error=self.on_error,
|
|
)
|
|
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
|
|
self.wst.daemon = True
|
|
self.wst.start()
|
|
|
|
conn_timeout = 5
|
|
while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout:
|
|
human_delay(1)
|
|
conn_timeout -= 1
|
|
if not conn_timeout:
|
|
print_error("Could not connect to WS! Exiting.")
|
|
|
|
def on_message(self, message: str) -> None:
|
|
print_debug(f"Message returned: {message}")
|
|
if not self.received:
|
|
self.reserved_result_stage = message
|
|
else:
|
|
self.ws_data.append(message)
|
|
|
|
@staticmethod
|
|
def on_open() -> None:
|
|
print_info("Socket Listener Opened")
|
|
|
|
@staticmethod
|
|
def on_close() -> None:
|
|
print_info("Socket Listener Closed")
|
|
|
|
@staticmethod
|
|
def on_error(error: str) -> None:
|
|
print_error(f'WebSocket Error: {error}')
|
|
|
|
def load_and_check_sockets(self, first_run: bool = False) -> bool:
|
|
"""
|
|
Method to make connect to websockets
|
|
:param first_run:
|
|
:return:
|
|
"""
|
|
ws = create_connection(f'ws://{self.address}:{self.ws_port}', ping_interval=None)
|
|
if first_run:
|
|
print_text('Establishing Connection to State')
|
|
self.result_stage = ws.recv()
|
|
good = True
|
|
if self.result_stage:
|
|
if first_run:
|
|
print_ok('Connected to state... Good To test')
|
|
else:
|
|
if not self.reserved_result_stage:
|
|
self.reserved_result_stage = self.result_stage
|
|
else:
|
|
good = False
|
|
print_error('Not Connected to state ... you have issues')
|
|
ws.close()
|
|
return good
|
|
|
|
def read_command_file(self) -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('rargs', nargs='*', default=[])
|
|
a = parser.parse_args()
|
|
with open(a.rargs[0], 'r') as file:
|
|
commands = yaml.load(file, Loader=yaml.FullLoader)
|
|
for step in commands:
|
|
if step == 'process_name':
|
|
print_info(f'Processing file {commands[step]}')
|
|
continue
|
|
self.tasks.append(Task(self, commands[step]))
|
|
self.process_queue()
|
|
|
|
def compare_stage(self):
|
|
"""
|
|
:return:
|
|
"""
|
|
self.stage_diff = {}
|
|
reserved = json.loads(self.reserved_result_stage.decode('utf-8'))['results']
|
|
current = json.loads(self.result_stage.decode('utf-8'))['results']
|
|
# compare_strings(reserved, current)
|
|
if len(reserved) > 0:
|
|
assert len(reserved) == len(current)
|
|
for a, _ in current.items():
|
|
# compare_strings(reserved[a], current[a])
|
|
if reserved[a] != current[a]:
|
|
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
|
|
self.reserved_result_stage = self.result_stage
|
|
|
|
def process_queue(self):
|
|
while len(self.tasks) > 0:
|
|
task = self.tasks.popleft()
|
|
task.run()
|
|
# we have finished but have pending tasks to complete
|
|
if len(self.tasks) == 0 and len(self.pending) > 0:
|
|
pend = self.pending.popleft()
|
|
self.tasks.append(pend)
|
|
|
|
def load_service_sequential(self) -> None:
|
|
print_text('Load_and_process_service_sequential - old')
|
|
items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items')
|
|
service = json.loads(items.text)
|
|
# test sequentially
|
|
for item in service:
|
|
if item['plugin'] == 'video':
|
|
pass
|
|
else:
|
|
service_item_show(self, item)
|
|
|
|
def load_service_random(self) -> None:
|
|
print_text('Load_and process_service_random - old')
|
|
items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items')
|
|
service = json.loads(items.text)
|
|
limit = len(service)
|
|
random_service = [random.randint(1, limit) for itr in range(limit)]
|
|
print_error(str(random_service))
|
|
for pos in random_service:
|
|
item = service[pos - 1]
|
|
service_item_show(self, item)
|
|
|
|
def check_websocket_changes(self, mandatory: int, optional: int) -> None:
|
|
while len(self.ws_data) > 0:
|
|
message = self.ws_data.popleft()
|
|
self.result_stage = message
|
|
self.compare_stage()
|
|
if mandatory <= len(self.stage_diff) <= mandatory + optional:
|
|
pass
|
|
else:
|
|
print(f'{mandatory} stage field(s) must have changed and '
|
|
f'{optional} may change changed- {str(self.stage_diff)}')
|
|
|
|
|
|
def compare_strings(a, b):
|
|
import difflib
|
|
print('{} => {}'.format(a, b))
|
|
if isinstance(a, str):
|
|
for i, s in enumerate(difflib.ndiff(a, b)):
|
|
if s[0] == ' ':
|
|
continue
|
|
elif s[0] == '-':
|
|
print(u'Delete "{}" from position {}'.format(s[-1], i))
|
|
elif s[0] == '+':
|
|
print(u'Add "{}" to position {}'.format(s[-1], i))
|
|
else:
|
|
if a != b:
|
|
print('{} => {} are different'.format(a, b))
|