Merge branch 'queues' into 'master'

Queues

See merge request openlp/openlp_api_tester!3
This commit is contained in:
Tim Bentley 2020-06-26 13:57:22 +00:00
commit ddb8d1ac73
16 changed files with 603 additions and 180 deletions

3
.flake8 Normal file
View File

@ -0,0 +1,3 @@
[flake8]
max-line-length = 120
ignore = E402,W503,W504,D,F405

16
.gitignore vendored
View File

@ -1,5 +1,4 @@
*.*~ *.*~
*.dll
*.e4* *.e4*
*.kate-swp *.kate-swp
*.kdev4 *.kdev4
@ -12,7 +11,6 @@
*.rej *.rej
*.ropeproject *.ropeproject
*.~\?~ *.~\?~
*eric[1-9]project
.cache .cache
.coverage .coverage
.directory .directory
@ -26,26 +24,14 @@
.eggs .eggs
.venv .venv
.mypy_cache .mypy_cache
OpenLP.egg-info
\#*\# \#*\#
__pycache__ __pycache__/
build build
cover cover
coverage coverage
dist
env env
htmlcov htmlcov
list list
node_modules
openlp.cfg
openlp.pro
openlp/core/resources.py
openlp/core/resources.py.old
openlp/plugins/presentations/lib/vendor/Pyro4
openlp/plugins/presentations/lib/vendor/serpent.py
output
package-lock.json
tags tags
test test
openlp-test-projectordb.sqlite
*/test-results.xml */test-results.xml

View File

@ -4,3 +4,9 @@ lint-python:
script: script:
- pip install tox flake8 - pip install tox flake8
- flake8 - flake8
lint-yaml:
stage: test
image: python:latest
script:
- pip install yamllint
- yamllint *.yaml

13
alert.yaml Normal file
View File

@ -0,0 +1,13 @@
---
process_name: test process
step1:
max: 1
min: 1
name: clear_live_controller
step2:
delay: 1
name: clear_preview_controller
step3:
max: 1
min: 1
name: new_service

13
blank.yaml Normal file
View File

@ -0,0 +1,13 @@
---
process_name: test process
step1:
max: 1
min: 1
name: clear_live_controller
step2:
delay: 1
name: clear_preview_controller
step3:
max: 1
min: 1
name: new_service

86
full_run_rand.yaml Normal file
View File

@ -0,0 +1,86 @@
---
process_name: test process
step1:
max: 1
min: 1
name: clear_live_controller
step2:
delay: 1
name: clear_preview_controller
step3:
max: 1
min: 1
name: new_service
step4:
delay: 1
name: search_and_add
payload:
plugin: songs
step5:
delay: 1
name: search_and_add
payload:
plugin: songs
step6:
delay: 1
name: search_and_add
payload:
plugin: bibles
step7:
delay: 1
name: search_and_add
payload:
plugin: bibles
step8:
delay: 1
name: search_and_add
payload:
plugin: custom
step9:
delay: 1
name: search_and_add
payload:
plugin: images
step10:
delay: 1
name: search_and_add
payload:
plugin: songs
step11:
delay: 1
name: search_and_add
payload:
plugin: presentation
step12:
delay: 1
name: search_and_add
payload:
plugin: bibles
step13:
delay: 1
name: search_and_add
payload:
plugin: bibles
step14:
delay: 1
name: search_and_add
payload:
plugin: songs
step15:
delay: 1
name: search_and_add
payload:
plugin: custom
step16:
delay: 1
name: search_and_add
payload:
plugin: bibles
step17:
delay: 1
name: search_and_add
payload:
plugin: bibles
step18:
delay: 0.1
name: load_service_sequential

86
full_run_seq.yaml Normal file
View File

@ -0,0 +1,86 @@
---
process_name: test process
step1:
max: 1
min: 1
name: clear_live_controller
step2:
delay: 1
name: clear_preview_controller
step3:
max: 1
min: 1
name: new_service
step4:
delay: 1
name: search_and_add
payload:
plugin: songs
step5:
delay: 1
name: search_and_add
payload:
plugin: songs
step6:
delay: 1
name: search_and_add
payload:
plugin: bibles
step7:
delay: 1
name: search_and_add
payload:
plugin: bibles
step8:
delay: 1
name: search_and_add
payload:
plugin: custom
step9:
delay: 1
name: search_and_add
payload:
plugin: images
step10:
delay: 1
name: search_and_add
payload:
plugin: songs
step11:
delay: 1
name: search_and_add
payload:
plugin: presentation
step12:
delay: 1
name: search_and_add
payload:
plugin: bibles
step13:
delay: 1
name: search_and_add
payload:
plugin: bibles
step14:
delay: 1
name: search_and_add
payload:
plugin: songs
step15:
delay: 1
name: search_and_add
payload:
plugin: custom
step16:
delay: 1
name: search_and_add
payload:
plugin: bibles
step17:
delay: 1
name: search_and_add
payload:
plugin: bibles
step18:
delay: 0.1
name: load_service_random

43
media.yaml Normal file
View File

@ -0,0 +1,43 @@
---
process_name: test process
step1:
max: 1
min: 1
name: clear_live_controller
step2:
delay: 1
name: clear_preview_controller
step3:
max: 1
min: 1
name: new_service
step4:
delay: 5
name: media_play
step5:
delay: 1
name: clear_preview_controller
step6:
max: 1
min: 1
name: new_service
step7:
delay: 1
name: search_and_add
payload:
plugin: media
step8:
delay: 5
name: media_play
step9:
delay: 5
name: media_pause
step10:
delay: 5
name: media_pause
step11:
delay: 5
name: media_play
step12:
delay: 5
name: media_stop

26
test.yaml Normal file
View File

@ -0,0 +1,26 @@
---
process_name: test process
step1:
max: 1
min: 1
name: clear_live_controller
step2:
delay: 1
name: clear_preview_controller
step3:
max: 1
min: 1
name: new_service
step4:
delay: 1
name: search_and_add
payload:
plugin: songs
step5:
delay: 1
name: search_and_add
payload:
plugin: songs
step6:
delay: 0.1
name: load_service_sequential

24
test2.py Normal file
View File

@ -0,0 +1,24 @@
import yaml
# ...
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('rargs', nargs='*', default=[])
a = parser.parse_args()
with open(a.rargs[0], 'r') as file:
# The FullLoader parameter handles the conversion from YAML
# scalar values to Python the dictionary format
xx = yaml.load(file)
print(yaml.dump(xx))
for x in xx:
print(x)
print(xx[x])
c = xx[x]
print(c)
if isinstance(c, str):
pass
else:
try:
print(c['delay'])
except KeyError:
pass

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2020 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################

View File

@ -0,0 +1,168 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2020 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
import json
import random
import requests
import string
from test_api.apitest.constants import BookNames
from test_api.apitest.logger import print_text
from test_api.apitest.task import Task
def clear_live_controller(rtc: object) -> None:
print_text('Clear_live_controllers')
ret = requests.post(rtc.base_url + 'controller/clear/live')
assert ret.status_code == 204, f'{ret.status_code} returned from clear live'
def clear_preview_controller(rtc: object) -> None:
print_text('Clear_preview_controllers')
# Preview clear should not impact the WebSockets.
ret = requests.post(rtc.base_url + 'controller/clear/preview')
assert ret.status_code == 204, f'{ret.status_code} returned from clear preview'
def new_service(rtc: object) -> None:
print_text('New_service')
ret = requests.get(rtc.base_url + 'service/new')
assert ret.status_code == 204, f'{ret.status_code} returned'
def search_and_add(rtc: object, payload: dict) -> None:
plugin = payload['plugin']
print_text(f'Search_and_add for {plugin}')
if plugin == 'bibles':
bk_id = random.randint(1, len(BookNames) - 1)
bk = BookNames[bk_id]
ch = random.randint(1, 10)
vse = random.randint(1, 10)
let = f'{bk} {ch}:1-{vse}'
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
if items:
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=items[0][0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
else:
if plugin == 'media':
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
if ret.status_code == 200:
break
else:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_item = 0
if limit > 0:
random_item = random.randint(1, limit) - 1
item = items[random_item]
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
def search_and_live(rtc: object, payload: dict) -> None:
plugin = payload['plugin']
print_text(f'Search_and_live for {plugin}')
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
if ret.status_code == 200 and len(json.loads(ret.text)) > 0:
break
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_service = 0
if limit > 0:
random_service = random.randint(1, limit) - 1
item = items[random_service]
ret = requests.post(rtc.base_url + f'plugins/{plugin}/live', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
def media_play(rtc: object) -> None:
print_text('Media_play')
ret = requests.post(rtc.base_url + 'media/play')
assert ret.status_code == 400, f'{ret.status_code} returned'
def media_pause(rtc: object) -> None:
print_text('Media_pause')
ret = requests.post(rtc.base_url + 'media/pause')
assert ret.status_code == 204, f'{ret.status_code} returned'
def media_stop(rtc: object) -> None:
print_text('Media_stop')
ret = requests.post(rtc.base_url + 'media/stop')
assert ret.status_code == 204, f'{ret.status_code} returned'
def load_service_sequential(rtc: object) -> None:
print_text('Load__service_sequential')
items = requests.get(rtc.base_url + 'service/items')
service = json.loads(items.text)
# test sequentially
for item in service:
if item['plugin'] == 'video':
pass
else:
pl = {'max': 1, 'min': 1, 'name': 'service_item_show', 'payload': {'item': item}}
rtc.pending.append(Task(rtc, pl))
def service_item_show(rtc: object, payload: dict) -> None:
item = payload['item']
print_text('Service_item_show')
title = item['title']
id = item['id']
print_text(f'test_service_song {title} {id}')
ret = requests.post(rtc.base_url + 'service/show', json=dict(uid=id))
assert ret.status_code == 204, ret.status_code
pl = {'max': 1, 'min': 2, 'name': 'play_live_item', 'payload': {'item': item}}
rtc.tasks.append(Task(rtc, pl))
def play_live_item(rtc: object, payload: dict) -> None:
item = payload['item']
plugin = item['plugin']
print_text(f'Play_live_item - {plugin}')
ret = requests.get(rtc.base_url + 'controller/live-item')
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
i = 0
for _ in json.loads(ret.text):
pl = {'max': 1, 'min': 2, 'delay': 0.2, 'name': 'controller_item_show', 'payload': {'id': i}}
rtc.tasks.append(Task(rtc, pl))
i += 1
assert ret.status_code == 200, f'{ret.status_code} returned from show'
def controller_item_show(rtc: object, payload: dict) -> None:
id = payload['id']
print_text('Controller_item_show')
ret = requests.post(rtc.base_url + 'controller/show', json=dict(id=int(id)))
assert ret.status_code == 204, ret.status_code

View File

@ -31,7 +31,7 @@ def print_error(text: str):
def print_ok(text: str): def print_ok(text: str):
print(Fore.GREEN + '[*] = ' + text) print(Fore.GREEN + '[_] = ' + text)
def print_warn(text: str): def print_warn(text: str):

View File

@ -18,18 +18,18 @@
# You should have received a copy of the GNU General Public License # # You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. # # along with this program. If not, see <https://www.gnu.org/licenses/>. #
########################################################################## ##########################################################################
import json import argparse
import requests
import string
import time
import random
import threading import threading
import websocket import websocket
import time
import yaml
from collections import deque
from websocket import create_connection from websocket import create_connection
from test_api.apitest.constants import BookNames from test_api.apitest.callbacks import * # noqa E403
from test_api.apitest.logger import print_text, print_error, print_ok, print_info from test_api.apitest.logger import print_text, print_error, print_ok, print_info
# from test_api.apitest.logger import print_debug # from test_api.apitest.logger import print_debug #n
from test_api.apitest.task import Task
class RunTestsController(object): class RunTestsController(object):
@ -38,8 +38,13 @@ class RunTestsController(object):
self.address = address self.address = address
self.http_port = http_port self.http_port = http_port
self.ws_port = ws_port self.ws_port = ws_port
self.reserved_result_stage = None reserved_result_stage = {'results': ''}
reserved_result_stage = json.dumps(reserved_result_stage)
self.reserved_result_stage = bytes(reserved_result_stage, 'utf-8')
self.received = False self.received = False
self.tasks = deque()
self.pending = deque()
self.base_url = f'http://{self.address}:{self.http_port}/api/v2/'
def connect(self) -> None: def connect(self) -> None:
print_info("Starting thread") print_info("Starting thread")
@ -67,16 +72,16 @@ class RunTestsController(object):
self.received = True self.received = True
def on_open(self) -> None: def on_open(self) -> None:
print_info("opened") print_info("Socket Listener Opened")
def on_close(self) -> None: def on_close(self) -> None:
print_info("closed") print_info("Socket Listener Closed")
def on_error(self, error: str) -> None: def on_error(self, error: str) -> None:
print_error(f'WebSocket Error: {error}') print_error(f'WebSocket Error: {error}')
def load_and_check_sockets(self, first_run: bool = False) -> bool: def load_and_check_sockets(self, first_run: bool = False) -> bool:
ws = create_connection(f'ws://{self.address}:{self.ws_port}') ws = create_connection(f'ws://{self.address}:{self.ws_port}', ping_interval=None)
if first_run: if first_run:
print_text('Establishing Connection to State') print_text('Establishing Connection to State')
self.result_stage = ws.recv() self.result_stage = ws.recv()
@ -93,6 +98,20 @@ class RunTestsController(object):
ws.close() ws.close()
return good return good
def read_command_file(self) -> None:
parser = argparse.ArgumentParser()
parser.add_argument('rargs', nargs='*', default=[])
a = parser.parse_args()
with open(a.rargs[0], 'r') as file:
commands = yaml.load(file, Loader=yaml.FullLoader)
for step in commands:
if step == 'process_name':
print_info(f'Processing file {commands[step]}')
continue
self.tasks.append(Task(self, commands[step]))
self.process_queue()
def compare_stage(self): def compare_stage(self):
""" """
:return: :return:
@ -103,6 +122,7 @@ class RunTestsController(object):
current = json.loads(self.result_stage.decode('utf-8'))['results'] current = json.loads(self.result_stage.decode('utf-8'))['results']
self.received = False self.received = False
# compare_strings(reserved, current) # compare_strings(reserved, current)
if len(reserved) > 0:
assert len(reserved) == len(current) assert len(reserved) == len(current)
for a, _ in current.items(): for a, _ in current.items():
# compare_strings(reserved[a], current[a]) # compare_strings(reserved[a], current[a])
@ -110,46 +130,14 @@ class RunTestsController(object):
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]} self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
self.reserved_result_stage = self.result_stage self.reserved_result_stage = self.result_stage
def marshal_full(self): def process_queue(self):
print_ok('Running full test script') while len(self.tasks) > 0:
if self.load_and_check_sockets(True): task = self.tasks.popleft()
clear_controllers(self) task.run()
new_service(self) # we have finished but have pending tasks to complete
search_and_add(self, 'songs', 5) if len(self.tasks) == 0 and len(self.pending) > 0:
search_and_add(self, 'bibles', 5) pend = self.pending.popleft()
search_and_add(self, 'images', 1) self.tasks.append(pend)
search_and_add(self, 'presentations', 1)
search_and_add(self, 'custom', 3)
self.load_service_sequential()
clear_controllers(self)
self.load_service_random()
def marshal_media(self):
print_ok('Running media test script')
if self.load_and_check_sockets(True):
clear_controllers(self)
# search_and_live(self, 'songs', 1)
# media_play(self)
# human_delay()
# clear_controllers(self)
search_and_live(self, 'media', 1)
media_play(self)
human_delay(5)
media_pause(self)
human_delay(5)
media_play(self)
human_delay(5)
media_stop(self)
def marshal_blank(self):
print_ok('Running blank test script')
if self.load_and_check_sockets(True):
new_service(self)
def marshal_alert(self):
print_ok('Running alert test script')
if self.load_and_check_sockets(True):
new_service(self)
def load_service_sequential(self) -> None: def load_service_sequential(self) -> None:
print_text('Load_and_process_service_sequential') print_text('Load_and_process_service_sequential')
@ -173,6 +161,18 @@ class RunTestsController(object):
item = service[pos - 1] item = service[pos - 1]
service_item_show(self, item) service_item_show(self, item)
def check_websocket_changes(self, manditary: int, optional: int) -> None:
count = 0
while not self.received and count < 200:
time.sleep(0.1)
count += 1
self.compare_stage()
if manditary <= len(self.stage_diff) <= manditary + optional:
pass
else:
print(f'{manditary} stage field(s) must have changed and '
f'{optional} may change changed- {str(self.stage_diff)}')
def compare_strings(a, b): def compare_strings(a, b):
import difflib import difflib
@ -227,111 +227,6 @@ def new_service(rtc: RunTestsController) -> None:
check_websocket_changes(rtc, 1, 1) check_websocket_changes(rtc, 1, 1)
def clear_controllers(rtc: RunTestsController) -> None:
print_text('Clear_controllers')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'controller/clear/live')
assert ret.status_code == 204, f'{ret.status_code} returned from clear live'
check_websocket_changes(rtc, 1, 1)
# Preview clear should not impact the WebSockets.
ret = requests.post(base_url + 'controller/clear/preview')
assert ret.status_code == 204, f'{ret.status_code} returned from clear preview'
check_websocket_changes(rtc, 0, 0)
def search_and_live(rtc: RunTestsController, plugin: str, count: int) -> None:
print_text(f'Search_and_live for {plugin}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(base_url + f'search?text={let}')
if ret.status_code == 200 and len(json.loads(ret.text)) > 0:
break
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_service = [1]
if limit > 0:
random_service = [random.randint(1, limit) for itr in range(count)]
for pos in random_service:
item = items[pos - 1]
ret = requests.post(base_url + 'live', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
human_delay()
def search_and_add(rtc: RunTestsController, plugin: str, count: int) -> None:
print_text(f'Search_and_add for {plugin}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
if plugin == 'bibles':
for i in range(1, count):
bk_id = random.randint(1, len(BookNames) - 1)
bk = BookNames[bk_id]
ch = random.randint(1, 10)
vse = random.randint(1, 10)
let = f'{bk} {ch}:1-{vse}'
ret = requests.get(base_url + f'search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from searcg'
items = json.loads(ret.text)
if items:
ret = requests.post(base_url + 'add', json=dict(id=items[0][0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
human_delay()
else:
if plugin == 'media':
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(base_url + f'search?text={let}')
if ret.status_code == 200:
break
else:
let = random.choice(string.ascii_letters)
ret = requests.get(base_url + f'search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_service = [1]
if limit > 0:
random_service = [random.randint(1, limit) for itr in range(count)]
for pos in random_service:
item = items[pos - 1]
ret = requests.post(base_url + 'add', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
human_delay()
def service_item_show(rtc: RunTestsController, item: dict) -> None:
print_text('Service_item_show')
title = item['title']
id = item['id']
print_text(f'test_service_song {title} {id}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'service/show', json=dict(uid=id))
human_delay()
assert ret.status_code == 204, ret.status_code
check_websocket_changes(rtc, 1, 2)
live_item(rtc, item['plugin'])
def live_item(rtc: RunTestsController, plugin: str) -> None:
print_text(f'test_live_item - {plugin}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.get(base_url + 'controller/live-item')
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
i = 0
for _ in json.loads(ret.text):
ret = requests.post(base_url + 'controller/show', json=dict(id=i))
i += 1
human_delay()
assert ret.status_code == 204, f'{ret.status_code} returned from show'
if plugin in {'image'}:
check_websocket_changes(rtc, 0, 0)
else:
check_websocket_changes(rtc, 1, 2)
def check_websocket_changes(rtc: RunTestsController, manditary: int, optional: int) -> None: def check_websocket_changes(rtc: RunTestsController, manditary: int, optional: int) -> None:
while not rtc.received: while not rtc.received:
time.sleep(0.1) time.sleep(0.1)

55
test_api/apitest/task.py Normal file
View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2020 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
import importlib
import time
from test_api.apitest.callbacks import * # noqa E403
def human_delay(delay: int = 2) -> None:
time.sleep(delay)
class Task(object):
def __init__(self, caller: object, request: dict) -> None:
self.caller = caller
self.request = request
callback = self.request['name']
function_string = f'test_api.apitest.callbacks.{callback}'
mod_name, func_name = function_string.rsplit('.', 1)
mod = importlib.import_module(mod_name)
func = getattr(mod, func_name)
self.__setattr__("callback", func)
def run(self):
try:
payload = self.request['payload']
self.callback(self.caller, payload)
except KeyError:
self.callback(self.caller)
try:
human_delay(self.request['delay'])
except KeyError:
pass
try:
self.caller.check_websocket_changes(self.request['max'], self.request['min'])
except KeyError:
pass

View File

@ -42,9 +42,8 @@ def start() -> None:
print_ok(f'OpenLP is running on port (ws) {op_ws_port}') print_ok(f'OpenLP is running on port (ws) {op_ws_port}')
rtc = RunTestsController(op_address, op_http_port, op_ws_port) rtc = RunTestsController(op_address, op_http_port, op_ws_port)
rtc.connect() rtc.connect()
rtc.marshal_full() rtc.read_command_file()
# nrtc.marshal_media() print_ok('Finished running tests')
print_text('Finished running tests')
if __name__ == '__main__': if __name__ == '__main__':