Migrate more services

This commit is contained in:
Tim 2020-06-24 16:24:40 +01:00
parent 3900613cb1
commit a333370532
No known key found for this signature in database
GPG Key ID: 3D454289AF831A6D
3 changed files with 137 additions and 134 deletions

View File

View File

@ -1,3 +1,23 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2020 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
import json
import random
import requests
@ -31,7 +51,7 @@ def new_service(rtc: object) -> None:
assert ret.status_code == 204, f'{ret.status_code} returned'
def search_and_add1(rtc: object, plugin: str) -> None:
def search_and_add(rtc: object, plugin: str) -> None:
print_text(f'Search_and_add for {plugin}')
if plugin == 'bibles':
bk_id = random.randint(1, len(BookNames) - 1)
@ -46,22 +66,88 @@ def search_and_add1(rtc: object, plugin: str) -> None:
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=items[0][0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
else:
#if plugin == 'media':
# while True:
# let = random.choice(string.ascii_letters)
# ret = requests.get(base_url + f'search?text={let}')
# if ret.status_code == 200:
# break
#else:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
if plugin == 'media':
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
if ret.status_code == 200:
break
else:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_item = 1
random_item = 0
if limit > 0:
random_item = random.randint(1, limit) - 1
item = items[random_item - 1]
item = items[random_item]
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
def search_and_live(rtc: object, plugin: str) -> None:
print_text(f'Search_and_live for {plugin}')
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
if ret.status_code == 200 and len(json.loads(ret.text)) > 0:
break
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_service = 0
if limit > 0:
random_service = random.randint(1, limit) - 1
item = items[random_service]
ret = requests.post(rtc.base_url + f'plugins/{plugin}/live', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
def media_play(rtc: object) -> None:
print_text('Media_play')
ret = requests.post(rtc.base_url + 'media/play')
assert ret.status_code == 400, f'{ret.status_code} returned'
def media_pause(rtc: object) -> None:
print_text('Media_pause')
ret = requests.post(rtc.base_url + 'media/pause')
assert ret.status_code == 204, f'{ret.status_code} returned'
def media_stop(rtc: object) -> None:
print_text('Media_stop')
ret = requests.post(rtc.base_url + 'media/stop')
assert ret.status_code == 204, f'{ret.status_code} returned'
def service_item_show(rtc: object, item: dict) -> None:
print_text('Service_item_show')
title = item['title']
id = item['id']
print_text(f'test_service_song {title} {id}')
ret = requests.post(rtc.base_url + 'service/show', json=dict(uid=id))
assert ret.status_code == 204, ret.status_code
check_websocket_changes(rtc, 1, 2)
live_item(rtc, item['plugin'])
def live_item(rtc: object, plugin: str) -> None:
print_text(f'test_live_item - {plugin}')
ret = requests.get(rtc.base_url + 'controller/live-item')
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
i = 0
for _ in json.loads(ret.text):
ret = requests.post(rtc.base_url + 'controller/show', json=dict(id=i))
i += 1
human_delay()
assert ret.status_code == 204, f'{ret.status_code} returned from show'
if plugin in {'image'}:
check_websocket_changes(rtc, 0, 0)
else:
check_websocket_changes(rtc, 1, 2)

View File

@ -23,7 +23,6 @@ import websocket
from collections import deque
from websocket import create_connection
from test_api.apitest.constants import BookNames
from test_api.apitest.callbacks import *
from test_api.apitest.logger import print_text, print_error, print_ok, print_info
# from test_api.apitest.logger import print_debug
@ -119,25 +118,27 @@ class RunTestsController(object):
self.tasks.append(Task(self, clear_live_controller, poll_m=1, poll_o=1))
self.tasks.append(Task(self, clear_preview_controller, delay=1))
self.tasks.append(Task(self, new_service, poll_m=1, poll_o=1))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='custom'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='images'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='presentations'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='custom'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='custom'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='custom'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='images'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='presentations'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='custom'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='bibles'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='custom'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='songs'))
self.tasks.append(Task(self, search_and_add, delay=1, ag='bibles'))
#self.load_service_sequential()
#clear_controllers(self)
self.tasks.append(Task(self, clear_live_controller, poll_m=1, poll_o=1))
self.tasks.append(Task(self, clear_preview_controller, delay=1))
self.tasks.append(Task(self, new_service, poll_m=1, poll_o=1))
#self.load_service_random()
self.process_queue()
@ -149,29 +150,33 @@ class RunTestsController(object):
def marshal_media(self):
print_ok('Running media test script')
if self.load_and_check_sockets(True):
#clear_controllers(self)
# search_and_live(self, 'songs', 1)
# media_play(self)
# human_delay()
# clear_controllers(self)
search_and_live(self, 'media', 1)
media_play(self)
human_delay(5)
media_pause(self)
human_delay(5)
media_play(self)
human_delay(5)
media_stop(self)
self.tasks.append(Task(self, clear_live_controller, poll_m=1, poll_o=1))
self.tasks.append(Task(self, clear_preview_controller, delay=1))
self.tasks.append(Task(self, new_service, poll_m=1, poll_o=1))
self.tasks.append(Task(self, search_and_live, delay=1, ag='songs'))
self.tasks.append(Task(self, media_play, delay=2))
self.tasks.append(Task(self, clear_preview_controller, delay=1))
self.tasks.append(Task(self, new_service, poll_m=1, poll_o=1))
self.tasks.append(Task(self, search_and_live, delay=1, ag='media'))
self.tasks.append(Task(self, media_play, delay=5))
self.tasks.append(Task(self, media_pause, delay=5))
self.tasks.append(Task(self, media_pause, delay=5))
self.tasks.append(Task(self, media_play, delay=5))
self.tasks.append(Task(self, media_stop, delay=1))
def marshal_blank(self):
print_ok('Running blank test script')
if self.load_and_check_sockets(True):
new_service(self)
self.tasks.append(Task(self, clear_live_controller, poll_m=1, poll_o=1))
self.tasks.append(Task(self, clear_preview_controller, delay=1))
self.tasks.append(Task(self, new_service, poll_m=1, poll_o=1))
def marshal_alert(self):
print_ok('Running alert test script')
if self.load_and_check_sockets(True):
new_service(self)
self.tasks.append(Task(self, clear_live_controller, poll_m=1, poll_o=1))
self.tasks.append(Task(self, clear_preview_controller, delay=1))
self.tasks.append(Task(self, new_service, poll_m=1, poll_o=1))
def load_service_sequential(self) -> None:
print_text('Load_and_process_service_sequential')
@ -222,98 +227,10 @@ def compare_strings(a, b):
print('{} => {} are different'.format(a, b))
def human_delay(delay: int = 2) -> None:
time.sleep(delay)
def media_play(rtc: RunTestsController) -> None:
print_text('Media_play')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'media/play')
assert ret.status_code == 400, f'{ret.status_code} returned'
human_delay()
def media_pause(rtc: RunTestsController) -> None:
print_text('Media_pause')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'media/pause')
assert ret.status_code == 204, f'{ret.status_code} returned'
human_delay()
def media_stop(rtc: RunTestsController) -> None:
print_text('Media_stop')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'media/stop')
assert ret.status_code == 204, f'{ret.status_code} returned'
human_delay()
def search_and_live(rtc: RunTestsController, plugin: str, count: int) -> None:
print_text(f'Search_and_live for {plugin}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(base_url + f'search?text={let}')
if ret.status_code == 200 and len(json.loads(ret.text)) > 0:
break
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_service = [1]
if limit > 0:
random_service = [random.randint(1, limit) for itr in range(count)]
for pos in random_service:
item = items[pos - 1]
ret = requests.post(base_url + 'live', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
human_delay()
def search_and_add(rtc: RunTestsController, plugin: str, count: int) -> None:
print_text(f'Search_and_add for {plugin}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
if plugin == 'bibles':
for i in range(1, count):
bk_id = random.randint(1, len(BookNames) - 1)
bk = BookNames[bk_id]
ch = random.randint(1, 10)
vse = random.randint(1, 10)
let = f'{bk} {ch}:1-{vse}'
ret = requests.get(base_url + f'search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from searcg'
items = json.loads(ret.text)
if items:
ret = requests.post(base_url + 'add', json=dict(id=items[0][0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
human_delay()
else:
if plugin == 'media':
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(base_url + f'search?text={let}')
if ret.status_code == 200:
break
else:
let = random.choice(string.ascii_letters)
ret = requests.get(base_url + f'search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_service = [1]
if limit > 0:
random_service = [random.randint(1, limit) for itr in range(count)]
for pos in random_service:
item = items[pos - 1]
ret = requests.post(base_url + 'add', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
human_delay()
def service_item_show(rtc: RunTestsController, item: dict) -> None:
print_text('Service_item_show')
title = item['title']