|
|
|
@ -31,6 +31,9 @@ from test_api.apitest.logger import print_text, print_error, print_ok, print_inf
|
|
|
|
|
# from test_api.apitest.logger import print_debug #n
|
|
|
|
|
from test_api.apitest.task import Task
|
|
|
|
|
|
|
|
|
|
from test_api.apitest.logger import print_text, print_error, print_ok, print_info
|
|
|
|
|
# from test_api.apitest.logger import print_debug
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RunTestsController(object):
|
|
|
|
|
|
|
|
|
@ -66,7 +69,15 @@ class RunTestsController(object):
|
|
|
|
|
print_error("Could not connect to WS! Exiting.")
|
|
|
|
|
|
|
|
|
|
def on_message(self, message: str) -> None:
|
|
|
|
|
<<<<<<< HEAD
|
|
|
|
|
# print_debug("Message returned")
|
|
|
|
|
=======
|
|
|
|
|
<<<<<<< HEAD
|
|
|
|
|
print_info("Message returned")
|
|
|
|
|
=======
|
|
|
|
|
# print_debug("Message returned")
|
|
|
|
|
>>>>>>> de2f630de03530dae811b691f625e645b76450e4
|
|
|
|
|
>>>>>>> master
|
|
|
|
|
self.result_stage = message
|
|
|
|
|
self.compare_stage()
|
|
|
|
|
self.received = True
|
|
|
|
@ -130,6 +141,7 @@ class RunTestsController(object):
|
|
|
|
|
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
|
|
|
|
|
self.reserved_result_stage = self.result_stage
|
|
|
|
|
|
|
|
|
|
<<<<<<< HEAD
|
|
|
|
|
def process_queue(self):
|
|
|
|
|
while len(self.tasks) > 0:
|
|
|
|
|
task = self.tasks.popleft()
|
|
|
|
@ -138,6 +150,59 @@ class RunTestsController(object):
|
|
|
|
|
if len(self.tasks) == 0 and len(self.pending) > 0:
|
|
|
|
|
pend = self.pending.popleft()
|
|
|
|
|
self.tasks.append(pend)
|
|
|
|
|
=======
|
|
|
|
|
def marshal_full(self):
|
|
|
|
|
print_ok('Running full test script')
|
|
|
|
|
if self.load_and_check_sockets(True):
|
|
|
|
|
clear_controllers(self)
|
|
|
|
|
new_service(self)
|
|
|
|
|
search_and_add(self, 'songs', 5)
|
|
|
|
|
search_and_add(self, 'bibles', 5)
|
|
|
|
|
search_and_add(self, 'images', 1)
|
|
|
|
|
search_and_add(self, 'presentations', 1)
|
|
|
|
|
search_and_add(self, 'custom', 3)
|
|
|
|
|
self.load_service_sequential()
|
|
|
|
|
clear_controllers(self)
|
|
|
|
|
self.load_service_random()
|
|
|
|
|
|
|
|
|
|
def marshal_media(self):
|
|
|
|
|
print_ok('Running media test script')
|
|
|
|
|
if self.load_and_check_sockets(True):
|
|
|
|
|
clear_controllers(self)
|
|
|
|
|
# search_and_live(self, 'songs', 1)
|
|
|
|
|
# media_play(self)
|
|
|
|
|
# human_delay()
|
|
|
|
|
# clear_controllers(self)
|
|
|
|
|
search_and_live(self, 'media', 1)
|
|
|
|
|
media_play(self)
|
|
|
|
|
human_delay(5)
|
|
|
|
|
media_pause(self)
|
|
|
|
|
human_delay(5)
|
|
|
|
|
media_play(self)
|
|
|
|
|
human_delay(5)
|
|
|
|
|
media_stop(self)
|
|
|
|
|
|
|
|
|
|
def marshal_blank(self):
|
|
|
|
|
print_ok('Running blank test script')
|
|
|
|
|
if self.load_and_check_sockets(True):
|
|
|
|
|
new_service(self)
|
|
|
|
|
|
|
|
|
|
def marshal_alert(self):
|
|
|
|
|
print_ok('Running alert test script')
|
|
|
|
|
if self.load_and_check_sockets(True):
|
|
|
|
|
new_service(self)
|
|
|
|
|
|
|
|
|
|
def load_service_sequential(self) -> None:
|
|
|
|
|
print_text('Load_and_process_service_sequential')
|
|
|
|
|
items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items')
|
|
|
|
|
service = json.loads(items.text)
|
|
|
|
|
# test sequentially
|
|
|
|
|
for item in service:
|
|
|
|
|
if item['plugin'] == 'video':
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
service_item_show(self, item)
|
|
|
|
|
>>>>>>> master
|
|
|
|
|
|
|
|
|
|
def load_service_random(self) -> None:
|
|
|
|
|
print_text('Load_and process_service_random')
|
|
|
|
@ -183,6 +248,117 @@ def human_delay(delay: int = 2) -> None:
|
|
|
|
|
time.sleep(delay)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<<<<<<< HEAD
|
|
|
|
|
=======
|
|
|
|
|
def media_play(rtc: RunTestsController) -> None:
|
|
|
|
|
print_text('Media_play')
|
|
|
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
|
|
|
ret = requests.post(base_url + 'media/play')
|
|
|
|
|
assert ret.status_code == 400, f'{ret.status_code} returned'
|
|
|
|
|
human_delay()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def media_pause(rtc: RunTestsController) -> None:
|
|
|
|
|
print_text('Media_pause')
|
|
|
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
|
|
|
ret = requests.post(base_url + 'media/pause')
|
|
|
|
|
assert ret.status_code == 204, f'{ret.status_code} returned'
|
|
|
|
|
human_delay()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def media_stop(rtc: RunTestsController) -> None:
|
|
|
|
|
print_text('Media_stop')
|
|
|
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
|
|
|
ret = requests.post(base_url + 'media/stop')
|
|
|
|
|
assert ret.status_code == 204, f'{ret.status_code} returned'
|
|
|
|
|
human_delay()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def new_service(rtc: RunTestsController) -> None:
|
|
|
|
|
print_text('New_service')
|
|
|
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
|
|
|
ret = requests.get(base_url + 'service/new')
|
|
|
|
|
assert ret.status_code == 204, f'{ret.status_code} returned'
|
|
|
|
|
human_delay()
|
|
|
|
|
check_websocket_changes(rtc, 1, 1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clear_controllers(rtc: RunTestsController) -> None:
|
|
|
|
|
print_text('Clear_controllers')
|
|
|
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
|
|
|
|
ret = requests.post(base_url + 'controller/clear/live')
|
|
|
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from clear live'
|
|
|
|
|
check_websocket_changes(rtc, 1, 1)
|
|
|
|
|
# Preview clear should not impact the WebSockets.
|
|
|
|
|
ret = requests.post(base_url + 'controller/clear/preview')
|
|
|
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from clear preview'
|
|
|
|
|
check_websocket_changes(rtc, 0, 0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def search_and_live(rtc: RunTestsController, plugin: str, count: int) -> None:
|
|
|
|
|
print_text(f'Search_and_live for {plugin}')
|
|
|
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
|
|
|
|
|
while True:
|
|
|
|
|
let = random.choice(string.ascii_letters)
|
|
|
|
|
ret = requests.get(base_url + f'search?text={let}')
|
|
|
|
|
if ret.status_code == 200 and len(json.loads(ret.text)) > 0:
|
|
|
|
|
break
|
|
|
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from search'
|
|
|
|
|
items = json.loads(ret.text)
|
|
|
|
|
limit = len(items) - 1
|
|
|
|
|
if limit >= 0:
|
|
|
|
|
random_service = [1]
|
|
|
|
|
if limit > 0:
|
|
|
|
|
random_service = [random.randint(1, limit) for itr in range(count)]
|
|
|
|
|
for pos in random_service:
|
|
|
|
|
item = items[pos - 1]
|
|
|
|
|
ret = requests.post(base_url + 'live', json=dict(id=item[0]))
|
|
|
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
|
|
|
human_delay()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def search_and_add(rtc: RunTestsController, plugin: str, count: int) -> None:
|
|
|
|
|
print_text(f'Search_and_add for {plugin}')
|
|
|
|
|
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
|
|
|
|
|
if plugin == 'bibles':
|
|
|
|
|
for i in range(1, count):
|
|
|
|
|
bk_id = random.randint(1, len(BookNames) - 1)
|
|
|
|
|
bk = BookNames[bk_id]
|
|
|
|
|
ch = random.randint(1, 10)
|
|
|
|
|
vse = random.randint(1, 10)
|
|
|
|
|
let = f'{bk} {ch}:1-{vse}'
|
|
|
|
|
ret = requests.get(base_url + f'search?text={let}')
|
|
|
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from searcg'
|
|
|
|
|
items = json.loads(ret.text)
|
|
|
|
|
if items:
|
|
|
|
|
ret = requests.post(base_url + 'add', json=dict(id=items[0][0]))
|
|
|
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
|
|
|
human_delay()
|
|
|
|
|
else:
|
|
|
|
|
if plugin == 'media':
|
|
|
|
|
while True:
|
|
|
|
|
let = random.choice(string.ascii_letters)
|
|
|
|
|
ret = requests.get(base_url + f'search?text={let}')
|
|
|
|
|
if ret.status_code == 200:
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
let = random.choice(string.ascii_letters)
|
|
|
|
|
ret = requests.get(base_url + f'search?text={let}')
|
|
|
|
|
assert ret.status_code == 200, f'{ret.status_code} returned from search'
|
|
|
|
|
items = json.loads(ret.text)
|
|
|
|
|
limit = len(items) - 1
|
|
|
|
|
if limit >= 0:
|
|
|
|
|
random_service = [1]
|
|
|
|
|
if limit > 0:
|
|
|
|
|
random_service = [random.randint(1, limit) for itr in range(count)]
|
|
|
|
|
for pos in random_service:
|
|
|
|
|
item = items[pos - 1]
|
|
|
|
|
ret = requests.post(base_url + 'add', json=dict(id=item[0]))
|
|
|
|
|
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
|
|
|
human_delay()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>>>>>>> master
|
|
|
|
|
def service_item_show(rtc: RunTestsController, item: dict) -> None:
|
|
|
|
|
print_text('Service_item_show')
|
|
|
|
|
title = item['title']
|
|
|
|
|