mirror of
https://gitlab.com/openlp/openlp_api_tester.git
synced 2024-12-25 11:14:08 +00:00
342 lines
13 KiB
Python
342 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
##########################################################################
|
|
# OpenLP - Open Source Lyrics Projection #
|
|
# ---------------------------------------------------------------------- #
|
|
# Copyright (c) 2008-2020 OpenLP Developers #
|
|
# ---------------------------------------------------------------------- #
|
|
# This program is free software: you can redistribute it and/or modify #
|
|
# it under the terms of the GNU General Public License as published by #
|
|
# the Free Software Foundation, either version 3 of the License, or #
|
|
# (at your option) any later version. #
|
|
# #
|
|
# This program is distributed in the hope that it will be useful, #
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
|
# GNU General Public License for more details. #
|
|
# #
|
|
# You should have received a copy of the GNU General Public License #
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
|
##########################################################################
|
|
import json
|
|
import requests
|
|
import string
|
|
import time
|
|
import random
|
|
import threading
|
|
import websocket
|
|
|
|
from websocket import create_connection
|
|
from test_api.apitest.constants import BookNames
|
|
|
|
from test_api.apitest.logger import print_text, print_error, print_ok, print_info
|
|
|
|
|
|
class RunTestsController(object):
|
|
|
|
def __init__(self, address, http_port: str, ws_port: str):
|
|
self.address = address
|
|
self.http_port = http_port
|
|
self.ws_port = ws_port
|
|
self.reserved_result_stage = None
|
|
self.received = False
|
|
|
|
def connect(self) -> None:
|
|
print_info("Starting thread")
|
|
self.ws = websocket.WebSocketApp(f'ws://{self.address}:{self.ws_port}',
|
|
on_message=self.on_message,
|
|
on_close=self.on_close,
|
|
on_open=self.on_open,
|
|
on_error=self.on_error,
|
|
)
|
|
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
|
|
self.wst.daemon = True
|
|
self.wst.start()
|
|
|
|
conn_timeout = 5
|
|
while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout:
|
|
human_delay(1)
|
|
conn_timeout -= 1
|
|
if not conn_timeout:
|
|
print_error("Could not connect to WS! Exiting.")
|
|
|
|
def on_message(self, message: str) -> None:
|
|
print_info("Message returned")
|
|
self.result_stage = message
|
|
self.compare_stage()
|
|
self.received = True
|
|
|
|
def on_open(self) -> None:
|
|
print_info("opened")
|
|
|
|
def on_close(self) -> None:
|
|
print_info("closed")
|
|
|
|
def on_error(self, error: str) -> None:
|
|
print_error(f'WebSocket Error: {error}')
|
|
|
|
def load_and_check_sockets(self, first_run: bool = False) -> bool:
|
|
ws = create_connection(f'ws://{self.address}:{self.ws_port}')
|
|
if first_run:
|
|
print_text('Establishing Connection to State')
|
|
self.result_stage = ws.recv()
|
|
good = True
|
|
if self.result_stage:
|
|
if first_run:
|
|
print_ok('Connected to state... Good To test')
|
|
else:
|
|
if not self.reserved_result_stage:
|
|
self.reserved_result_stage = self.result_stage
|
|
else:
|
|
good = False
|
|
print_error('Not Connected to state ... you have issues')
|
|
ws.close()
|
|
return good
|
|
|
|
def compare_stage(self):
|
|
"""
|
|
:return:
|
|
"""
|
|
if self.received:
|
|
self.stage_diff = {}
|
|
reserved = json.loads(self.reserved_result_stage.decode('utf-8'))['results']
|
|
current = json.loads(self.result_stage.decode('utf-8'))['results']
|
|
self.received = False
|
|
# compare_strings(reserved, current)
|
|
assert len(reserved) == len(current)
|
|
for a, _ in current.items():
|
|
# compare_strings(reserved[a], current[a])
|
|
if reserved[a] != current[a]:
|
|
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
|
|
self.reserved_result_stage = self.result_stage
|
|
|
|
def marshal_full(self):
|
|
print_ok('Running full test script')
|
|
if self.load_and_check_sockets(True):
|
|
clear_controllers(self)
|
|
new_service(self)
|
|
search_and_add(self, 'songs', 5)
|
|
search_and_add(self, 'bibles', 5)
|
|
search_and_add(self, 'images', 1)
|
|
search_and_add(self, 'presentations', 1)
|
|
search_and_add(self, 'custom', 3)
|
|
self.load_service_sequential()
|
|
clear_controllers(self)
|
|
self.load_service_random()
|
|
|
|
def marshal_media(self):
|
|
print_ok('Running media test script')
|
|
if self.load_and_check_sockets(True):
|
|
clear_controllers(self)
|
|
#search_and_live(self, 'songs', 1)
|
|
#media_play(self)
|
|
#human_delay()
|
|
#clear_controllers(self)
|
|
search_and_live(self, 'media', 1)
|
|
media_play(self)
|
|
human_delay(5)
|
|
media_pause(self)
|
|
human_delay(5)
|
|
media_play(self)
|
|
human_delay(5)
|
|
media_stop(self)
|
|
|
|
def marshal_blank(self):
|
|
print_ok('Running blank test script')
|
|
if self.load_and_check_sockets(True):
|
|
new_service(self)
|
|
|
|
def marshal_alert(self):
|
|
print_ok('Running alert test script')
|
|
if self.load_and_check_sockets(True):
|
|
new_service(self)
|
|
|
|
def load_service_sequential(self) -> None:
|
|
print_text('Load_and_process_service_sequential')
|
|
items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items')
|
|
service = json.loads(items.text)
|
|
# test sequentially
|
|
for item in service:
|
|
if item['plugin'] == 'video':
|
|
pass
|
|
else:
|
|
service_item_show(self, item)
|
|
|
|
def load_service_random(self) -> None:
|
|
print_text('Load_and process_service_random')
|
|
items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items')
|
|
service = json.loads(items.text)
|
|
limit = len(service)
|
|
random_service = [random.randint(1, limit) for itr in range(limit)]
|
|
print_error(str(random_service))
|
|
for pos in random_service:
|
|
item = service[pos - 1]
|
|
service_item_show(self, item)
|
|
|
|
|
|
def compare_strings(a, b):
|
|
import difflib
|
|
print('{} => {}'.format(a, b))
|
|
if isinstance(a, str):
|
|
for i, s in enumerate(difflib.ndiff(a, b)):
|
|
if s[0] == ' ':
|
|
continue
|
|
elif s[0] == '-':
|
|
print(u'Delete "{}" from position {}'.format(s[-1], i))
|
|
elif s[0] == '+':
|
|
print(u'Add "{}" to position {}'.format(s[-1], i))
|
|
else:
|
|
if a != b:
|
|
print('{} => {} are different'.format(a, b))
|
|
|
|
|
|
def human_delay(delay: int = 2) -> None:
|
|
time.sleep(delay)
|
|
|
|
|
|
def media_play(rtc: RunTestsController) -> None:
|
|
print_text('Media_play')
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
ret = requests.post(base_url + 'media/play')
|
|
assert ret.status_code == 400, f'{ret.status_code} returned'
|
|
human_delay()
|
|
|
|
|
|
def media_pause(rtc: RunTestsController) -> None:
|
|
print_text('Media_pause')
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
ret = requests.post(base_url + 'media/pause')
|
|
assert ret.status_code == 204, f'{ret.status_code} returned'
|
|
human_delay()
|
|
|
|
|
|
def media_stop(rtc: RunTestsController) -> None:
|
|
print_text('Media_stop')
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
ret = requests.post(base_url + 'media/stop')
|
|
assert ret.status_code == 204, f'{ret.status_code} returned'
|
|
human_delay()
|
|
|
|
|
|
def new_service(rtc: RunTestsController) -> None:
|
|
print_text('New_service')
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
ret = requests.get(base_url + 'service/new')
|
|
assert ret.status_code == 204, f'{ret.status_code} returned'
|
|
human_delay()
|
|
check_websocket_changes(rtc, 1, 1)
|
|
|
|
|
|
def clear_controllers(rtc: RunTestsController) -> None:
|
|
print_text('Clear_controllers')
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
ret = requests.post(base_url + 'controller/clear/live')
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from clear live'
|
|
check_websocket_changes(rtc, 1, 1)
|
|
# Preview clear should not impact the WebSockets.
|
|
ret = requests.post(base_url + 'controller/clear/preview')
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from clear preview'
|
|
check_websocket_changes(rtc, 0, 0)
|
|
|
|
|
|
def search_and_live(rtc: RunTestsController, plugin: str, count: int) -> None:
|
|
print_text(f'Search_and_live for {plugin}')
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
|
|
while True:
|
|
let = random.choice(string.ascii_letters)
|
|
ret = requests.get(base_url + f'search?text={let}')
|
|
if ret.status_code == 200 and len(json.loads(ret.text)) > 0:
|
|
break
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from search'
|
|
items = json.loads(ret.text)
|
|
limit = len(items) - 1
|
|
if limit >= 0:
|
|
random_service = [1]
|
|
if limit > 0:
|
|
random_service = [random.randint(1, limit) for itr in range(count)]
|
|
for pos in random_service:
|
|
item = items[pos - 1]
|
|
ret = requests.post(base_url + f'live', json=dict(id=item[0]))
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
human_delay()
|
|
|
|
|
|
def search_and_add(rtc: RunTestsController, plugin: str, count: int) -> None:
|
|
print_text(f'Search_and_add for {plugin}')
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
|
|
if plugin == 'bibles':
|
|
for i in range(1, count):
|
|
bk_id = random.randint(1, len(BookNames) - 1)
|
|
bk = BookNames[bk_id]
|
|
ch = random.randint(1, 10)
|
|
vse = random.randint(1, 10)
|
|
let = f'{bk} {ch}:1-{vse}'
|
|
ret = requests.get(base_url + f'search?text={let}')
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from searcg'
|
|
items = json.loads(ret.text)
|
|
if items:
|
|
ret = requests.post(base_url + f'add', json=dict(id=items[0][0]))
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
human_delay()
|
|
else:
|
|
if plugin == 'media':
|
|
while True:
|
|
let = random.choice(string.ascii_letters)
|
|
ret = requests.get(base_url + f'search?text={let}')
|
|
if ret.status_code == 200:
|
|
break
|
|
else:
|
|
let = random.choice(string.ascii_letters)
|
|
ret = requests.get(base_url + f'search?text={let}')
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from search'
|
|
items = json.loads(ret.text)
|
|
limit = len(items) - 1
|
|
if limit >= 0:
|
|
random_service = [1]
|
|
if limit > 0:
|
|
random_service = [random.randint(1, limit) for itr in range(count)]
|
|
for pos in random_service:
|
|
item = items[pos - 1]
|
|
ret = requests.post(base_url + f'add', json=dict(id=item[0]))
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
human_delay()
|
|
|
|
|
|
def service_item_show(rtc: RunTestsController, item: dict) -> None:
|
|
print_text('Service_item_show')
|
|
title = item['title']
|
|
id = item['id']
|
|
print_text(f'test_service_song {title} {id}')
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
ret = requests.post(base_url + 'service/show', json=dict(uid=id))
|
|
human_delay()
|
|
assert ret.status_code == 204, ret.status_code
|
|
check_websocket_changes(rtc, 1, 2)
|
|
live_item(rtc, item['plugin'])
|
|
|
|
|
|
def live_item(rtc: RunTestsController, plugin: str) -> None:
|
|
print_text(f'test_live_item - {plugin}')
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
ret = requests.get(base_url + 'controller/live-item')
|
|
assert ret.status_code == 200
|
|
i = 0
|
|
for _ in json.loads(ret.text):
|
|
ret = requests.post(base_url + 'controller/show', json=dict(id=i))
|
|
i += 1
|
|
human_delay()
|
|
assert ret.status_code == 204
|
|
if plugin in {'image'}:
|
|
check_websocket_changes(rtc, 0, 0)
|
|
else:
|
|
check_websocket_changes(rtc, 1, 2)
|
|
|
|
|
|
def check_websocket_changes(rtc: RunTestsController, manditary: int, optional: int) -> None:
|
|
while not rtc.received:
|
|
time.sleep(0.1)
|
|
if manditary <= len(rtc.stage_diff) <= optional:
|
|
pass
|
|
else:
|
|
print(f'{manditary} stage field(s) must have changed and {optional} may change changed- {str(rtc.stage_diff)}')
|