# -*- coding: utf-8 -*- ########################################################################## # OpenLP - Open Source Lyrics Projection # # ---------------------------------------------------------------------- # # Copyright (c) 2008-2020 OpenLP Developers # # ---------------------------------------------------------------------- # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # ########################################################################## import json import requests import string import time import random import threading import websocket from websocket import create_connection from test_api.apitest.constants import BookNames <<<<<<< HEAD from test_api.apitest.logger import print_text, print_error, print_ok, print_info ======= from test_api.apitest.logger import print_text, print_error, print_ok, print_info # from test_api.apitest.logger import print_debug >>>>>>> de2f630de03530dae811b691f625e645b76450e4 class RunTestsController(object): def __init__(self, address, http_port: str, ws_port: str): self.address = address self.http_port = http_port self.ws_port = ws_port self.reserved_result_stage = None self.received = False def connect(self) -> None: print_info("Starting thread") self.ws = websocket.WebSocketApp(f'ws://{self.address}:{self.ws_port}', on_message=self.on_message, on_close=self.on_close, on_open=self.on_open, on_error=self.on_error, ) self.wst = threading.Thread(target=lambda: self.ws.run_forever()) self.wst.daemon = True self.wst.start() conn_timeout = 5 while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout: human_delay(1) conn_timeout -= 1 if not conn_timeout: print_error("Could not connect to WS! Exiting.") def on_message(self, message: str) -> None: <<<<<<< HEAD print_info("Message returned") ======= # print_debug("Message returned") >>>>>>> de2f630de03530dae811b691f625e645b76450e4 self.result_stage = message self.compare_stage() self.received = True def on_open(self) -> None: print_info("opened") def on_close(self) -> None: print_info("closed") def on_error(self, error: str) -> None: print_error(f'WebSocket Error: {error}') def load_and_check_sockets(self, first_run: bool = False) -> bool: ws = create_connection(f'ws://{self.address}:{self.ws_port}') if first_run: print_text('Establishing Connection to State') self.result_stage = ws.recv() good = True if self.result_stage: if first_run: print_ok('Connected to state... Good To test') else: if not self.reserved_result_stage: self.reserved_result_stage = self.result_stage else: good = False print_error('Not Connected to state ... you have issues') ws.close() return good def compare_stage(self): """ :return: """ if self.received: self.stage_diff = {} reserved = json.loads(self.reserved_result_stage.decode('utf-8'))['results'] current = json.loads(self.result_stage.decode('utf-8'))['results'] self.received = False # compare_strings(reserved, current) assert len(reserved) == len(current) for a, _ in current.items(): # compare_strings(reserved[a], current[a]) if reserved[a] != current[a]: self.stage_diff[a] = {'before': reserved[a], 'after': current[a]} self.reserved_result_stage = self.result_stage def marshal_full(self): print_ok('Running full test script') if self.load_and_check_sockets(True): clear_controllers(self) new_service(self) search_and_add(self, 'songs', 5) search_and_add(self, 'bibles', 5) search_and_add(self, 'images', 1) search_and_add(self, 'presentations', 1) search_and_add(self, 'custom', 3) self.load_service_sequential() clear_controllers(self) self.load_service_random() def marshal_media(self): print_ok('Running media test script') if self.load_and_check_sockets(True): clear_controllers(self) # search_and_live(self, 'songs', 1) # media_play(self) # human_delay() # clear_controllers(self) search_and_live(self, 'media', 1) media_play(self) human_delay(5) media_pause(self) human_delay(5) media_play(self) human_delay(5) media_stop(self) def marshal_blank(self): print_ok('Running blank test script') if self.load_and_check_sockets(True): new_service(self) def marshal_alert(self): print_ok('Running alert test script') if self.load_and_check_sockets(True): new_service(self) def load_service_sequential(self) -> None: print_text('Load_and_process_service_sequential') items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items') service = json.loads(items.text) # test sequentially for item in service: if item['plugin'] == 'video': pass else: service_item_show(self, item) def load_service_random(self) -> None: print_text('Load_and process_service_random') items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items') service = json.loads(items.text) limit = len(service) random_service = [random.randint(1, limit) for itr in range(limit)] print_error(str(random_service)) for pos in random_service: item = service[pos - 1] service_item_show(self, item) def compare_strings(a, b): import difflib print('{} => {}'.format(a, b)) if isinstance(a, str): for i, s in enumerate(difflib.ndiff(a, b)): if s[0] == ' ': continue elif s[0] == '-': print(u'Delete "{}" from position {}'.format(s[-1], i)) elif s[0] == '+': print(u'Add "{}" to position {}'.format(s[-1], i)) else: if a != b: print('{} => {} are different'.format(a, b)) def human_delay(delay: int = 2) -> None: time.sleep(delay) def media_play(rtc: RunTestsController) -> None: print_text('Media_play') base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/' ret = requests.post(base_url + 'media/play') assert ret.status_code == 400, f'{ret.status_code} returned' human_delay() def media_pause(rtc: RunTestsController) -> None: print_text('Media_pause') base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/' ret = requests.post(base_url + 'media/pause') assert ret.status_code == 204, f'{ret.status_code} returned' human_delay() def media_stop(rtc: RunTestsController) -> None: print_text('Media_stop') base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/' ret = requests.post(base_url + 'media/stop') assert ret.status_code == 204, f'{ret.status_code} returned' human_delay() def new_service(rtc: RunTestsController) -> None: print_text('New_service') base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/' ret = requests.get(base_url + 'service/new') assert ret.status_code == 204, f'{ret.status_code} returned' human_delay() check_websocket_changes(rtc, 1, 1) def clear_controllers(rtc: RunTestsController) -> None: print_text('Clear_controllers') base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/' ret = requests.post(base_url + 'controller/clear/live') assert ret.status_code == 204, f'{ret.status_code} returned from clear live' check_websocket_changes(rtc, 1, 1) # Preview clear should not impact the WebSockets. ret = requests.post(base_url + 'controller/clear/preview') assert ret.status_code == 204, f'{ret.status_code} returned from clear preview' check_websocket_changes(rtc, 0, 0) def search_and_live(rtc: RunTestsController, plugin: str, count: int) -> None: print_text(f'Search_and_live for {plugin}') base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/' while True: let = random.choice(string.ascii_letters) ret = requests.get(base_url + f'search?text={let}') if ret.status_code == 200 and len(json.loads(ret.text)) > 0: break assert ret.status_code == 200, f'{ret.status_code} returned from search' items = json.loads(ret.text) limit = len(items) - 1 if limit >= 0: random_service = [1] if limit > 0: random_service = [random.randint(1, limit) for itr in range(count)] for pos in random_service: item = items[pos - 1] ret = requests.post(base_url + 'live', json=dict(id=item[0])) assert ret.status_code == 204, f'{ret.status_code} returned from add' human_delay() def search_and_add(rtc: RunTestsController, plugin: str, count: int) -> None: print_text(f'Search_and_add for {plugin}') base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/' if plugin == 'bibles': for i in range(1, count): bk_id = random.randint(1, len(BookNames) - 1) bk = BookNames[bk_id] ch = random.randint(1, 10) vse = random.randint(1, 10) let = f'{bk} {ch}:1-{vse}' ret = requests.get(base_url + f'search?text={let}') assert ret.status_code == 200, f'{ret.status_code} returned from searcg' items = json.loads(ret.text) if items: ret = requests.post(base_url + 'add', json=dict(id=items[0][0])) assert ret.status_code == 204, f'{ret.status_code} returned from add' human_delay() else: if plugin == 'media': while True: let = random.choice(string.ascii_letters) ret = requests.get(base_url + f'search?text={let}') if ret.status_code == 200: break else: let = random.choice(string.ascii_letters) ret = requests.get(base_url + f'search?text={let}') assert ret.status_code == 200, f'{ret.status_code} returned from search' items = json.loads(ret.text) limit = len(items) - 1 if limit >= 0: random_service = [1] if limit > 0: random_service = [random.randint(1, limit) for itr in range(count)] for pos in random_service: item = items[pos - 1] ret = requests.post(base_url + 'add', json=dict(id=item[0])) assert ret.status_code == 204, f'{ret.status_code} returned from add' human_delay() def service_item_show(rtc: RunTestsController, item: dict) -> None: print_text('Service_item_show') title = item['title'] id = item['id'] print_text(f'test_service_song {title} {id}') base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/' ret = requests.post(base_url + 'service/show', json=dict(uid=id)) human_delay() assert ret.status_code == 204, ret.status_code check_websocket_changes(rtc, 1, 2) live_item(rtc, item['plugin']) def live_item(rtc: RunTestsController, plugin: str) -> None: print_text(f'test_live_item - {plugin}') base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/' ret = requests.get(base_url + 'controller/live-item') assert ret.status_code == 200, f'{ret.status_code} returned from live_item' i = 0 for _ in json.loads(ret.text): ret = requests.post(base_url + 'controller/show', json=dict(id=i)) i += 1 human_delay() assert ret.status_code == 204, f'{ret.status_code} returned from show' if plugin in {'image'}: check_websocket_changes(rtc, 0, 0) else: check_websocket_changes(rtc, 1, 2) def check_websocket_changes(rtc: RunTestsController, manditary: int, optional: int) -> None: while not rtc.received: time.sleep(0.1) if manditary <= len(rtc.stage_diff) <= optional: pass else: print(f'{manditary} stage field(s) must have changed and {optional} may change changed- {str(rtc.stage_diff)}')