openlp_api_tester/test_api/apitest/runner.py

196 lines
7.8 KiB
Python
Raw Normal View History

2020-06-20 07:53:44 +00:00
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2020 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
2020-06-26 13:57:22 +00:00
import argparse
import threading
import websocket
2020-06-26 13:57:22 +00:00
import yaml
2020-06-20 07:53:44 +00:00
2020-06-26 13:57:22 +00:00
from collections import deque
2020-06-20 07:53:44 +00:00
from websocket import create_connection
2020-10-04 18:47:01 +00:00
from test_api.apitest.logger import print_error, print_ok, print_info, print_debug
2020-06-26 14:19:30 +00:00
from test_api.apitest.callbacks import * # noqa E403
2020-06-26 14:26:48 +00:00
2020-06-26 13:57:22 +00:00
from test_api.apitest.task import Task
from test_api.apitest.task import human_delay
2020-06-20 07:53:44 +00:00
class RunTestsController(object):
def __init__(self, address, http_port: str, ws_port: str):
self.address = address
self.http_port = http_port
self.ws_port = ws_port
2020-06-26 13:57:22 +00:00
reserved_result_stage = {'results': ''}
reserved_result_stage = json.dumps(reserved_result_stage)
self.reserved_result_stage = bytes(reserved_result_stage, 'utf-8')
self.received = False
2020-06-26 13:57:22 +00:00
self.tasks = deque()
self.pending = deque()
self.ws_data = deque()
2020-06-26 13:57:22 +00:00
self.base_url = f'http://{self.address}:{self.http_port}/api/v2/'
def connect(self) -> None:
print_info("Starting thread")
self.ws = websocket.WebSocketApp(f'ws://{self.address}:{self.ws_port}',
on_message=self.on_message,
on_close=self.on_close,
on_open=self.on_open,
on_error=self.on_error,
)
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
self.wst.daemon = True
self.wst.start()
conn_timeout = 5
while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout:
human_delay(1)
conn_timeout -= 1
if not conn_timeout:
print_error("Could not connect to WS! Exiting.")
def on_message(self, message: str) -> None:
print_debug(f"Message returned: {message}")
if not self.received:
self.reserved_result_stage = message
else:
self.ws_data.append(message)
def on_open(self) -> None:
2020-06-26 13:57:22 +00:00
print_info("Socket Listener Opened")
def on_close(self) -> None:
2020-06-26 13:57:22 +00:00
print_info("Socket Listener Closed")
def on_error(self, error: str) -> None:
print_error(f'WebSocket Error: {error}')
2020-06-20 07:53:44 +00:00
def load_and_check_sockets(self, first_run: bool = False) -> bool:
"""
Method to make connect to webspockets
:param first_run:
:return:
"""
2020-06-26 13:57:22 +00:00
ws = create_connection(f'ws://{self.address}:{self.ws_port}', ping_interval=None)
2020-06-20 07:53:44 +00:00
if first_run:
print_text('Establishing Connection to State')
self.result_stage = ws.recv()
good = True
if self.result_stage:
if first_run:
print_ok('Connected to state... Good To test')
else:
if not self.reserved_result_stage:
self.reserved_result_stage = self.result_stage
else:
good = False
print_error('Not Connected to state ... you have issues')
ws.close()
return good
2020-06-26 13:57:22 +00:00
def read_command_file(self) -> None:
parser = argparse.ArgumentParser()
parser.add_argument('rargs', nargs='*', default=[])
a = parser.parse_args()
with open(a.rargs[0], 'r') as file:
commands = yaml.load(file, Loader=yaml.FullLoader)
for step in commands:
if step == 'process_name':
print_info(f'Processing file {commands[step]}')
continue
self.tasks.append(Task(self, commands[step]))
self.process_queue()
2020-06-20 07:53:44 +00:00
def compare_stage(self):
"""
:return:
"""
self.stage_diff = {}
reserved = json.loads(self.reserved_result_stage.decode('utf-8'))['results']
current = json.loads(self.result_stage.decode('utf-8'))['results']
# compare_strings(reserved, current)
if len(reserved) > 0:
assert len(reserved) == len(current)
for a, _ in current.items():
# compare_strings(reserved[a], current[a])
if reserved[a] != current[a]:
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
self.reserved_result_stage = self.result_stage
2020-06-20 07:53:44 +00:00
2020-06-26 13:57:22 +00:00
def process_queue(self):
while len(self.tasks) > 0:
task = self.tasks.popleft()
task.run()
# we have finished but have pending tasks to complete
if len(self.tasks) == 0 and len(self.pending) > 0:
pend = self.pending.popleft()
self.tasks.append(pend)
2020-06-20 07:53:44 +00:00
def load_service_sequential(self) -> None:
print_text('Load_and_process_service_sequential - old')
2020-06-20 07:53:44 +00:00
items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items')
service = json.loads(items.text)
# test sequentially
for item in service:
if item['plugin'] == 'video':
pass
else:
service_item_show(self, item)
def load_service_random(self) -> None:
print_text('Load_and process_service_random - old')
2020-06-20 07:53:44 +00:00
items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items')
service = json.loads(items.text)
limit = len(service)
random_service = [random.randint(1, limit) for itr in range(limit)]
print_error(str(random_service))
for pos in random_service:
item = service[pos - 1]
service_item_show(self, item)
def check_websocket_changes(self, manditory: int, optional: int) -> None:
while len(self.ws_data) > 0:
message = self.ws_data.popleft()
self.result_stage = message
self.compare_stage()
if manditory <= len(self.stage_diff) <= manditory + optional:
pass
else:
print(f'{manditory} stage field(s) must have changed and '
f'{optional} may change changed- {str(self.stage_diff)}')
2020-06-26 13:57:22 +00:00
2020-06-20 07:53:44 +00:00
def compare_strings(a, b):
import difflib
print('{} => {}'.format(a, b))
if isinstance(a, str):
for i, s in enumerate(difflib.ndiff(a, b)):
if s[0] == ' ':
continue
elif s[0] == '-':
print(u'Delete "{}" from position {}'.format(s[-1], i))
elif s[0] == '+':
print(u'Add "{}" to position {}'.format(s[-1], i))
else:
if a != b:
print('{} => {} are different'.format(a, b))