# -*- coding: utf-8 -*- ########################################################################## # OpenLP - Open Source Lyrics Projection # # ---------------------------------------------------------------------- # # Copyright (c) 2008-2020 OpenLP Developers # # ---------------------------------------------------------------------- # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # ########################################################################## import argparse import threading import websocket import yaml from collections import deque from websocket import create_connection from test_api.apitest.logger import print_error, print_ok, print_info, print_debug from test_api.apitest.callbacks import * # noqa E403 from test_api.apitest.task import Task from test_api.apitest.task import human_delay class RunTestsController(object): def __init__(self, address, http_port: str, ws_port: str): self.address = address self.http_port = http_port self.ws_port = ws_port reserved_result_stage = {'results': ''} reserved_result_stage = json.dumps(reserved_result_stage) self.reserved_result_stage = bytes(reserved_result_stage, 'utf-8') self.received = False self.tasks = deque() self.pending = deque() self.ws_data = deque() self.base_url = f'http://{self.address}:{self.http_port}/api/v2/' def connect(self) -> None: print_info("Starting thread") self.ws = websocket.WebSocketApp(f'ws://{self.address}:{self.ws_port}', on_message=self.on_message, on_close=self.on_close, on_open=self.on_open, on_error=self.on_error, ) self.wst = threading.Thread(target=lambda: self.ws.run_forever()) self.wst.daemon = True self.wst.start() conn_timeout = 5 while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout: human_delay(1) conn_timeout -= 1 if not conn_timeout: print_error("Could not connect to WS! Exiting.") def on_message(self, message: str) -> None: print_debug(f"Message returned: {message}") if not self.received: self.reserved_result_stage = message else: self.ws_data.append(message) @staticmethod def on_open() -> None: print_info("Socket Listener Opened") @staticmethod def on_close() -> None: print_info("Socket Listener Closed") @staticmethod def on_error(error: str) -> None: print_error(f'WebSocket Error: {error}') def load_and_check_sockets(self, first_run: bool = False) -> bool: """ Method to make connect to websockets :param first_run: :return: """ ws = create_connection(f'ws://{self.address}:{self.ws_port}', ping_interval=None) if first_run: print_text('Establishing Connection to State') self.result_stage = ws.recv() good = True if self.result_stage: if first_run: print_ok('Connected to state... Good To test') else: if not self.reserved_result_stage: self.reserved_result_stage = self.result_stage else: good = False print_error('Not Connected to state ... you have issues') ws.close() return good def read_command_file(self) -> None: parser = argparse.ArgumentParser() parser.add_argument('rargs', nargs='*', default=[]) a = parser.parse_args() with open(a.rargs[0], 'r') as file: commands = yaml.load(file, Loader=yaml.FullLoader) for step in commands: if step == 'process_name': print_info(f'Processing file {commands[step]}') continue self.tasks.append(Task(self, commands[step])) self.process_queue() def compare_stage(self): """ :return: """ self.stage_diff = {} reserved = json.loads(self.reserved_result_stage.decode('utf-8'))['results'] current = json.loads(self.result_stage.decode('utf-8'))['results'] # compare_strings(reserved, current) if len(reserved) > 0: assert len(reserved) == len(current) for a, _ in current.items(): # compare_strings(reserved[a], current[a]) if reserved[a] != current[a]: self.stage_diff[a] = {'before': reserved[a], 'after': current[a]} self.reserved_result_stage = self.result_stage def process_queue(self): while len(self.tasks) > 0: task = self.tasks.popleft() task.run() # we have finished but have pending tasks to complete if len(self.tasks) == 0 and len(self.pending) > 0: pend = self.pending.popleft() self.tasks.append(pend) def load_service_sequential(self) -> None: print_text('Load_and_process_service_sequential - old') items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items') service = json.loads(items.text) # test sequentially for item in service: if item['plugin'] == 'video': pass else: service_item_show(self, item) def load_service_random(self) -> None: print_text('Load_and process_service_random - old') items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items') service = json.loads(items.text) limit = len(service) random_service = [random.randint(1, limit) for itr in range(limit)] print_error(str(random_service)) for pos in random_service: item = service[pos - 1] service_item_show(self, item) def check_websocket_changes(self, mandatory: int, optional: int) -> None: while len(self.ws_data) > 0: message = self.ws_data.popleft() self.result_stage = message self.compare_stage() if mandatory <= len(self.stage_diff) <= mandatory + optional: pass else: print(f'{mandatory} stage field(s) must have changed and ' f'{optional} may change changed- {str(self.stage_diff)}') def compare_strings(a, b): import difflib print('{} => {}'.format(a, b)) if isinstance(a, str): for i, s in enumerate(difflib.ndiff(a, b)): if s[0] == ' ': continue elif s[0] == '-': print(u'Delete "{}" from position {}'.format(s[-1], i)) elif s[0] == '+': print(u'Add "{}" to position {}'.format(s[-1], i)) else: if a != b: print('{} => {} are different'.format(a, b))