Updates to add socket queue and timing changes

This commit is contained in:
Tim 2020-10-04 18:11:30 +01:00
parent bb0ad4c36d
commit 3f232728a1
No known key found for this signature in database
GPG Key ID: 3D454289AF831A6D
8 changed files with 81 additions and 148 deletions

View File

@ -1,5 +1,5 @@
---
process_name: test process
process_name: play media
step1:
max: 1
min: 1
@ -8,36 +8,16 @@ step2:
delay: 1
name: clear_preview_controller
step3:
max: 1
min: 1
name: new_service
step4:
delay: 5
name: media_play
step5:
delay: 1
name: clear_preview_controller
step6:
max: 1
min: 1
name: new_service
step7:
delay: 1
delay: 10
name: search_and_add
payload:
plugin: media
step8:
delay: 5
name: media_play
step9:
delay: 5
step4:
delay: 0.1
name: load_service_sequential
step5:
delay: 30
name: flush_pending
step6:
delay: 0.2
name: media_pause
step10:
delay: 5
name: media_pause
step11:
delay: 5
name: media_play
step12:
delay: 5
name: media_stop

5
pause.yaml Normal file
View File

@ -0,0 +1,5 @@
---
process_name: Pause Media
step1:
delay: 0.2
name: media_pause

5
play.yaml Normal file
View File

@ -0,0 +1,5 @@
---
process_name: Play Media
step1:
delay: 0.2
name: media_play

5
stop.yaml Normal file
View File

@ -0,0 +1,5 @@
---
process_name: Stop Media
step1:
delay: 0.2
name: media_stop

View File

@ -20,7 +20,7 @@ step5:
delay: 1
name: search_and_add
payload:
plugin: songs
plugin: bibles
step6:
delay: 0.1
name: load_service_sequential

View File

@ -1,24 +0,0 @@
import yaml
# ...
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('rargs', nargs='*', default=[])
a = parser.parse_args()
with open(a.rargs[0], 'r') as file:
# The FullLoader parameter handles the conversion from YAML
# scalar values to Python the dictionary format
xx = yaml.load(file)
print(yaml.dump(xx))
for x in xx:
print(x)
print(xx[x])
c = xx[x]
print(c)
if isinstance(c, str):
pass
else:
try:
print(c['delay'])
except KeyError:
pass

View File

@ -67,7 +67,7 @@ def search_and_add(rtc: object, payload: dict) -> None:
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
if ret.status_code == 200:
if ret.status_code == 200 and ret.text is not '[]\n':
break
else:
let = random.choice(string.ascii_letters)
@ -104,6 +104,12 @@ def search_and_live(rtc: object, payload: dict) -> None:
assert ret.status_code == 204, f'{ret.status_code} returned from add'
def flush_pending(rtc: object) -> None:
print_text('Flush pending to start of tasks')
pend = rtc.pending.popleft()
rtc.tasks.insert(0, pend)
def media_play(rtc: object) -> None:
print_text('Media_play')
ret = requests.post(rtc.base_url + 'media/play')
@ -113,38 +119,36 @@ def media_play(rtc: object) -> None:
def media_pause(rtc: object) -> None:
print_text('Media_pause')
ret = requests.post(rtc.base_url + 'media/pause')
assert ret.status_code == 204, f'{ret.status_code} returned'
assert ret.status_code == 400, f'{ret.status_code} returned'
def media_stop(rtc: object) -> None:
print_text('Media_stop')
ret = requests.post(rtc.base_url + 'media/stop')
assert ret.status_code == 204, f'{ret.status_code} returned'
assert ret.status_code == 400, f'{ret.status_code} returned'
def load_service_sequential(rtc: object) -> None:
print_text('Load__service_sequential')
print_text('Load_service_sequential')
items = requests.get(rtc.base_url + 'service/items')
service = json.loads(items.text)
# test sequentially
for item in service:
if item['plugin'] == 'video':
pass
else:
pl = {'max': 1, 'min': 1, 'name': 'service_item_show', 'payload': {'item': item}}
rtc.pending.append(Task(rtc, pl))
pl = {'max': 1, 'min': 1, 'name': 'service_item_show', 'payload': {'item': item}}
rtc.pending.append(Task(rtc, pl))
def service_item_show(rtc: object, payload: dict) -> None:
item = payload['item']
print_text('Service_item_show')
item = payload['item']
title = item['title']
id = item['id']
print_text(f'test_service_song {title} {id}')
print_text(f'test_service_item {title} {id}')
ret = requests.post(rtc.base_url + 'service/show', json=dict(uid=id))
assert ret.status_code == 204, ret.status_code
pl = {'max': 1, 'min': 2, 'name': 'play_live_item', 'payload': {'item': item}}
rtc.tasks.append(Task(rtc, pl))
if not item['plugin'] == 'media':
pl = {'max': 1, 'min': 2, 'delay': 0.5, 'name': 'play_live_item', 'payload': {'item': item}}
rtc.tasks.append(Task(rtc, pl))
def play_live_item(rtc: object, payload: dict) -> None:
@ -155,7 +159,7 @@ def play_live_item(rtc: object, payload: dict) -> None:
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
i = 0
for _ in json.loads(ret.text):
pl = {'max': 1, 'min': 2, 'delay': 0.2, 'name': 'controller_item_show', 'payload': {'id': i}}
pl = {'max': 1, 'min': 2, 'delay': 0.5, 'name': 'controller_item_show', 'payload': {'id': i}}
rtc.tasks.append(Task(rtc, pl))
i += 1
assert ret.status_code == 200, f'{ret.status_code} returned from show'

View File

@ -21,19 +21,19 @@
import argparse
import threading
import websocket
import time
import yaml
from collections import deque
from websocket import create_connection
from test_api.apitest.logger import print_text, print_error, print_ok, print_info
# from test_api.apitest.logger import print_debug
from test_api.apitest.logger import print_error, print_ok, print_info
from test_api.apitest.logger import print_debug
from test_api.apitest.callbacks import * # noqa E403
from test_api.apitest.task import Task
from test_api.apitest.task import human_delay
class RunTestsController(object):
@ -48,6 +48,7 @@ class RunTestsController(object):
self.received = False
self.tasks = deque()
self.pending = deque()
self.ws_data = deque()
self.base_url = f'http://{self.address}:{self.http_port}/api/v2/'
def connect(self) -> None:
@ -70,10 +71,11 @@ class RunTestsController(object):
print_error("Could not connect to WS! Exiting.")
def on_message(self, message: str) -> None:
# print_debug("Message returned")
self.result_stage = message
self.compare_stage()
self.received = True
print_debug(f"Message returned: {message}")
if not self.received:
self.reserved_result_stage = message
else:
self.ws_data.append(message)
def on_open(self) -> None:
print_info("Socket Listener Opened")
@ -85,6 +87,11 @@ class RunTestsController(object):
print_error(f'WebSocket Error: {error}')
def load_and_check_sockets(self, first_run: bool = False) -> bool:
"""
Method to make connect to webspockets
:param first_run:
:return:
"""
ws = create_connection(f'ws://{self.address}:{self.ws_port}', ping_interval=None)
if first_run:
print_text('Establishing Connection to State')
@ -120,19 +127,17 @@ class RunTestsController(object):
"""
:return:
"""
if self.received:
self.stage_diff = {}
reserved = json.loads(self.reserved_result_stage.decode('utf-8'))['results']
current = json.loads(self.result_stage.decode('utf-8'))['results']
self.received = False
# compare_strings(reserved, current)
if len(reserved) > 0:
assert len(reserved) == len(current)
for a, _ in current.items():
# compare_strings(reserved[a], current[a])
if reserved[a] != current[a]:
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
self.reserved_result_stage = self.result_stage
self.stage_diff = {}
reserved = json.loads(self.reserved_result_stage.decode('utf-8'))['results']
current = json.loads(self.result_stage.decode('utf-8'))['results']
# compare_strings(reserved, current)
if len(reserved) > 0:
assert len(reserved) == len(current)
for a, _ in current.items():
# compare_strings(reserved[a], current[a])
if reserved[a] != current[a]:
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
self.reserved_result_stage = self.result_stage
def process_queue(self):
while len(self.tasks) > 0:
@ -144,7 +149,7 @@ class RunTestsController(object):
self.tasks.append(pend)
def load_service_sequential(self) -> None:
print_text('Load_and_process_service_sequential')
print_text('Load_and_process_service_sequential - old')
items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items')
service = json.loads(items.text)
# test sequentially
@ -155,7 +160,7 @@ class RunTestsController(object):
service_item_show(self, item)
def load_service_random(self) -> None:
print_text('Load_and process_service_random')
print_text('Load_and process_service_random - old')
items = requests.get(f'http://{self.address}:{self.http_port}/api/v2/service/items')
service = json.loads(items.text)
limit = len(service)
@ -165,17 +170,16 @@ class RunTestsController(object):
item = service[pos - 1]
service_item_show(self, item)
def check_websocket_changes(self, manditary: int, optional: int) -> None:
count = 0
while not self.received and count < 200:
time.sleep(0.1)
count += 1
self.compare_stage()
if manditary <= len(self.stage_diff) <= manditary + optional:
pass
else:
print(f'{manditary} stage field(s) must have changed and '
f'{optional} may change changed- {str(self.stage_diff)}')
def check_websocket_changes(self, manditory: int, optional: int) -> None:
while len(self.ws_data) > 0:
message = self.ws_data.popleft()
self.result_stage = message
self.compare_stage()
if manditory <= len(self.stage_diff) <= manditory + optional:
pass
else:
print(f'{manditory} stage field(s) must have changed and '
f'{optional} may change changed- {str(self.stage_diff)}')
def compare_strings(a, b):
@ -192,49 +196,3 @@ def compare_strings(a, b):
else:
if a != b:
print('{} => {} are different'.format(a, b))
def human_delay(delay: int = 2) -> None:
time.sleep(delay)
def media_play(rtc: RunTestsController) -> None:
print_text('Media_play')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'media/play')
assert ret.status_code == 400, f'{ret.status_code} returned'
human_delay()
def media_pause(rtc: RunTestsController) -> None:
print_text('Media_pause')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'media/pause')
assert ret.status_code == 204, f'{ret.status_code} returned'
human_delay()
def media_stop(rtc: RunTestsController) -> None:
print_text('Media_stop')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'media/stop')
assert ret.status_code == 204, f'{ret.status_code} returned'
human_delay()
def new_service(rtc: RunTestsController) -> None:
print_text('New_service')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.get(base_url + 'service/new')
assert ret.status_code == 204, f'{ret.status_code} returned'
human_delay()
check_websocket_changes(rtc, 1, 1)
def check_websocket_changes(rtc: RunTestsController, manditary: int, optional: int) -> None:
while not rtc.received:
time.sleep(0.1)
if manditary <= len(rtc.stage_diff) <= optional:
pass
else:
print(f'{manditary} stage field(s) must have changed and {optional} may change changed- {str(rtc.stage_diff)}')