openlp_api_tester/test_api/apitest/callbacks.py

190 lines
7.8 KiB
Python

# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2021 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
import json
import random
import requests
import string
from test_api.apitest.constants import BookNames
from test_api.apitest.logger import print_text
from test_api.apitest.task import Task
def clear_live_controller(rtc: object) -> None:
print_text('Clear_live_controllers')
ret = requests.post(rtc.base_url + 'controller/clear/live')
assert ret.status_code == 204, f'{ret.status_code} returned from clear live'
def clear_preview_controller(rtc: object) -> None:
print_text('Clear_preview_controllers')
# Preview clear should not impact the WebSockets.
ret = requests.post(rtc.base_url + 'controller/clear/preview')
assert ret.status_code == 204, f'{ret.status_code} returned from clear preview'
def new_service(rtc: object) -> None:
print_text('New_service')
ret = requests.get(rtc.base_url + 'service/new')
assert ret.status_code == 204, f'{ret.status_code} returned'
def search_and_add(rtc: object, payload: dict) -> None:
plugin = payload['plugin']
print_text(f'Search_and_add for {plugin}')
if plugin == 'bibles':
bk_id = random.randint(1, len(BookNames) - 1)
bk = BookNames[bk_id]
ch = random.randint(1, 10)
vse = random.randint(1, 10)
let = f'{bk} {ch}:1-{vse}'
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
if items:
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=items[0][0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
else:
if plugin == 'media':
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
if ret.status_code == 200 and ret.text != '[]\n':
break
else:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_item = 0
if limit > 0:
random_item = random.randint(1, limit) - 1
item = items[random_item]
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
def search_and_live(rtc: object, payload: dict) -> None:
plugin = payload['plugin']
print_text(f'Search_and_live for {plugin}')
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
if ret.status_code == 200 and len(json.loads(ret.text)) > 0:
break
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_service = 0
if limit > 0:
random_service = random.randint(1, limit) - 1
item = items[random_service]
ret = requests.post(rtc.base_url + f'plugins/{plugin}/live', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
def flush_pending(rtc: object) -> None:
print_text('Flush pending to start of tasks')
pend = rtc.pending.popleft()
rtc.tasks.insert(0, pend)
def media_play(rtc: object) -> None:
print_text('Media_play')
ret = requests.post(rtc.base_url + 'media/play')
assert ret.status_code == 400, f'{ret.status_code} returned'
def media_pause(rtc: object) -> None:
print_text('Media_pause')
ret = requests.post(rtc.base_url + 'media/pause')
assert ret.status_code == 400, f'{ret.status_code} returned'
def media_stop(rtc: object) -> None:
print_text('Media_stop')
ret = requests.post(rtc.base_url + 'media/stop')
assert ret.status_code == 400, f'{ret.status_code} returned'
def load_service_sequential(rtc: object) -> None:
print_text('Load_service_sequential')
items = requests.get(rtc.base_url + 'service/items')
service = json.loads(items.text)
# test sequentially
for item in service:
pl = {'max': 1, 'min': 1, 'name': 'service_item_show', 'payload': {'item': item}}
rtc.pending.append(Task(rtc, pl))
def load_service_random(rtc: object) -> None:
print_text('Load_service_sequential')
items = requests.get(rtc.base_url + 'service/items')
service = json.loads(items.text)
limit = len(service)
random_service = [random.randint(1, limit) for itr in range(limit)]
# test sequentially
for item in random_service:
pl = {'max': 1, 'min': 1, 'name': 'service_item_show', 'payload': {'item': item}}
rtc.pending.append(Task(rtc, pl))
def service_item_show(rtc: object, payload: dict) -> None:
print_text('Service_item_show')
item = payload['item']
title = item['title']
id = item['id']
print_text(f'test_service_item {title} {id}')
ret = requests.post(rtc.base_url + 'service/show', json=dict(uid=id))
assert ret.status_code == 204, ret.status_code
if not item['plugin'] == 'media':
pl = {'max': 1, 'min': 2, 'delay': 0.5, 'name': 'play_live_items'}
rtc.tasks.append(Task(rtc, pl))
def play_live_items(rtc: object) -> None:
print_text('Play_live_items')
ret = requests.get(rtc.base_url + 'controller/live-items')
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
i = 0
for _ in json.loads(ret.text):
pl = {'max': 1, 'min': 2, 'delay': 0.5, 'name': 'controller_item_show', 'payload': {'id': i}}
rtc.tasks.append(Task(rtc, pl))
i += 1
assert ret.status_code == 200, f'{ret.status_code} returned from show'
def play_live_item(rtc: object, payload: dict) -> None:
print_text('Play_live_item')
ret = requests.get(rtc.base_url + 'controller/live-item')
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
aa = json.loads(ret.text)
print_text(str(aa))
def controller_item_show(rtc: object, payload: dict) -> None:
id = payload['id']
print_text('Controller_item_show')
ret = requests.post(rtc.base_url + 'controller/show', json=dict(id=int(id)))
assert ret.status_code == 204, ret.status_code