mirror of
https://gitlab.com/openlp/openlp_api_tester.git
synced 2024-12-25 11:14:08 +00:00
190 lines
7.8 KiB
Python
190 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
##########################################################################
|
|
# OpenLP - Open Source Lyrics Projection #
|
|
# ---------------------------------------------------------------------- #
|
|
# Copyright (c) 2008-2021 OpenLP Developers #
|
|
# ---------------------------------------------------------------------- #
|
|
# This program is free software: you can redistribute it and/or modify #
|
|
# it under the terms of the GNU General Public License as published by #
|
|
# the Free Software Foundation, either version 3 of the License, or #
|
|
# (at your option) any later version. #
|
|
# #
|
|
# This program is distributed in the hope that it will be useful, #
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
|
# GNU General Public License for more details. #
|
|
# #
|
|
# You should have received a copy of the GNU General Public License #
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
|
##########################################################################
|
|
import json
|
|
import random
|
|
import requests
|
|
import string
|
|
|
|
from test_api.apitest.constants import BookNames
|
|
from test_api.apitest.logger import print_text
|
|
from test_api.apitest.task import Task
|
|
|
|
|
|
def clear_live_controller(rtc: object) -> None:
|
|
print_text('Clear_live_controllers')
|
|
ret = requests.post(rtc.base_url + 'controller/clear/live')
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from clear live'
|
|
|
|
|
|
def clear_preview_controller(rtc: object) -> None:
|
|
print_text('Clear_preview_controllers')
|
|
# Preview clear should not impact the WebSockets.
|
|
ret = requests.post(rtc.base_url + 'controller/clear/preview')
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from clear preview'
|
|
|
|
|
|
def new_service(rtc: object) -> None:
|
|
print_text('New_service')
|
|
ret = requests.get(rtc.base_url + 'service/new')
|
|
assert ret.status_code == 204, f'{ret.status_code} returned'
|
|
|
|
|
|
def search_and_add(rtc: object, payload: dict) -> None:
|
|
plugin = payload['plugin']
|
|
print_text(f'Search_and_add for {plugin}')
|
|
if plugin == 'bibles':
|
|
bk_id = random.randint(1, len(BookNames) - 1)
|
|
bk = BookNames[bk_id]
|
|
ch = random.randint(1, 10)
|
|
vse = random.randint(1, 10)
|
|
let = f'{bk} {ch}:1-{vse}'
|
|
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from search'
|
|
items = json.loads(ret.text)
|
|
if items:
|
|
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=items[0][0]))
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
else:
|
|
if plugin == 'media':
|
|
while True:
|
|
let = random.choice(string.ascii_letters)
|
|
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
|
|
if ret.status_code == 200 and ret.text != '[]\n':
|
|
break
|
|
else:
|
|
let = random.choice(string.ascii_letters)
|
|
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from search'
|
|
items = json.loads(ret.text)
|
|
limit = len(items) - 1
|
|
if limit >= 0:
|
|
random_item = 0
|
|
if limit > 0:
|
|
random_item = random.randint(1, limit) - 1
|
|
item = items[random_item]
|
|
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=item[0]))
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
|
|
|
|
def search_and_live(rtc: object, payload: dict) -> None:
|
|
plugin = payload['plugin']
|
|
print_text(f'Search_and_live for {plugin}')
|
|
while True:
|
|
let = random.choice(string.ascii_letters)
|
|
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
|
|
if ret.status_code == 200 and len(json.loads(ret.text)) > 0:
|
|
break
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from search'
|
|
items = json.loads(ret.text)
|
|
limit = len(items) - 1
|
|
if limit >= 0:
|
|
random_service = 0
|
|
if limit > 0:
|
|
random_service = random.randint(1, limit) - 1
|
|
item = items[random_service]
|
|
ret = requests.post(rtc.base_url + f'plugins/{plugin}/live', json=dict(id=item[0]))
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
|
|
|
|
def flush_pending(rtc: object) -> None:
|
|
print_text('Flush pending to start of tasks')
|
|
pend = rtc.pending.popleft()
|
|
rtc.tasks.insert(0, pend)
|
|
|
|
|
|
def media_play(rtc: object) -> None:
|
|
print_text('Media_play')
|
|
ret = requests.post(rtc.base_url + 'media/play')
|
|
assert ret.status_code == 400, f'{ret.status_code} returned'
|
|
|
|
|
|
def media_pause(rtc: object) -> None:
|
|
print_text('Media_pause')
|
|
ret = requests.post(rtc.base_url + 'media/pause')
|
|
assert ret.status_code == 400, f'{ret.status_code} returned'
|
|
|
|
|
|
def media_stop(rtc: object) -> None:
|
|
print_text('Media_stop')
|
|
ret = requests.post(rtc.base_url + 'media/stop')
|
|
assert ret.status_code == 400, f'{ret.status_code} returned'
|
|
|
|
|
|
def load_service_sequential(rtc: object) -> None:
|
|
print_text('Load_service_sequential')
|
|
items = requests.get(rtc.base_url + 'service/items')
|
|
service = json.loads(items.text)
|
|
# test sequentially
|
|
for item in service:
|
|
pl = {'max': 1, 'min': 1, 'name': 'service_item_show', 'payload': {'item': item}}
|
|
rtc.pending.append(Task(rtc, pl))
|
|
|
|
|
|
def load_service_random(rtc: object) -> None:
|
|
print_text('Load_service_sequential')
|
|
items = requests.get(rtc.base_url + 'service/items')
|
|
service = json.loads(items.text)
|
|
limit = len(service)
|
|
random_service = [random.randint(1, limit) for itr in range(limit)]
|
|
# test sequentially
|
|
for item in random_service:
|
|
pl = {'max': 1, 'min': 1, 'name': 'service_item_show', 'payload': {'item': item}}
|
|
rtc.pending.append(Task(rtc, pl))
|
|
|
|
|
|
def service_item_show(rtc: object, payload: dict) -> None:
|
|
print_text('Service_item_show')
|
|
item = payload['item']
|
|
title = item['title']
|
|
id = item['id']
|
|
print_text(f'test_service_item {title} {id}')
|
|
ret = requests.post(rtc.base_url + 'service/show', json=dict(uid=id))
|
|
assert ret.status_code == 204, ret.status_code
|
|
if not item['plugin'] == 'media':
|
|
pl = {'max': 1, 'min': 2, 'delay': 0.5, 'name': 'play_live_items'}
|
|
rtc.tasks.append(Task(rtc, pl))
|
|
|
|
|
|
def play_live_items(rtc: object) -> None:
|
|
print_text('Play_live_items')
|
|
ret = requests.get(rtc.base_url + 'controller/live-items')
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
|
|
i = 0
|
|
for _ in json.loads(ret.text):
|
|
pl = {'max': 1, 'min': 2, 'delay': 0.5, 'name': 'controller_item_show', 'payload': {'id': i}}
|
|
rtc.tasks.append(Task(rtc, pl))
|
|
i += 1
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from show'
|
|
|
|
|
|
def play_live_item(rtc: object, payload: dict) -> None:
|
|
print_text('Play_live_item')
|
|
ret = requests.get(rtc.base_url + 'controller/live-item')
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
|
|
aa = json.loads(ret.text)
|
|
print_text(str(aa))
|
|
|
|
|
|
def controller_item_show(rtc: object, payload: dict) -> None:
|
|
id = payload['id']
|
|
print_text('Controller_item_show')
|
|
ret = requests.post(rtc.base_url + 'controller/show', json=dict(id=int(id)))
|
|
assert ret.status_code == 204, ret.status_code
|