for queue management

This commit is contained in:
Tim 2020-06-25 08:38:07 +01:00
parent 085375875b
commit a96a96a4d4
No known key found for this signature in database
GPG Key ID: 3D454289AF831A6D
5 changed files with 34 additions and 25 deletions

View File

@ -17,4 +17,4 @@
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
##########################################################################

View File

@ -22,17 +22,12 @@ import json
import random
import requests
import string
import time
from test_api.apitest.constants import BookNames
from test_api.apitest.logger import print_text
from test_api.apitest.task import Task
def human_delay(delay: int = 2) -> None:
time.sleep(delay)
def clear_live_controller(rtc: object) -> None:
print_text('Clear_live_controllers')
ret = requests.post(rtc.base_url + 'controller/clear/live')
@ -134,10 +129,11 @@ def load_service_sequential(rtc: object) -> None:
if item['plugin'] == 'video':
pass
else:
rtc.tasks.append(Task(rtc, service_item_show, poll_m=1, poll_o=2, ag=item))
rtc.tasks.append(Task(rtc, play_live_item, poll_m=1, poll_o=2, ag=item))
rtc.tasks.append(Task(rtc, clear_live_controller, poll_m=1, poll_o=1))
rtc.tasks.append(Task(rtc, new_service, poll_m=1, poll_o=1))
rtc.pending.append(Task(rtc, service_item_show, poll_m=1, poll_o=2, ag=item))
# rtc.tasks.append(Task(rtc, play_live_item, poll_m=1, poll_o=2, ag=item))
#rtc.tasks.append(Task(rtc, clear_live_controller, poll_m=1, poll_o=1))
#rtc.tasks.append(Task(rtc, new_service, poll_m=1, poll_o=1))
#self.load_service_random()
@ -148,22 +144,22 @@ def service_item_show(rtc: object, item: dict) -> None:
print_text(f'test_service_song {title} {id}')
ret = requests.post(rtc.base_url + 'service/show', json=dict(uid=id))
assert ret.status_code == 204, ret.status_code
rtc.tasks.append(Task(rtc, play_live_item, poll_m=1, poll_o=2, ag=item))
def play_live_item(rtc: object, item: dict) -> None:
plugin = item['plugin']
print_text(f'test_live_item - {plugin}')
print_text(f'Play_live_item - {plugin}')
ret = requests.get(rtc.base_url + 'controller/live-item')
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
i = 0
for _ in json.loads(ret.text):
rtc.tasks.append(Task(rtc, controller_item_show, poll_m=1, poll_o=2, ag=str(i)))
rtc.tasks.append(Task(rtc, controller_item_show, delay=0.1, poll_m=1, poll_o=2, ag=str(i)))
i += 1
assert ret.status_code == 204, f'{ret.status_code} returned from show'
assert ret.status_code == 200, f'{ret.status_code} returned from show'
def controller_item_show(rtc: object, id: str) -> None:
print_text('Controller_item_show')
ret = requests.post(rtc.base_url + 'controller/show', json=dict(uid=int(id)))
ret = requests.post(rtc.base_url + 'controller/show', json=dict(id=int(id)))
assert ret.status_code == 204, ret.status_code

View File

@ -31,7 +31,7 @@ def print_error(text: str):
def print_ok(text: str):
print(Fore.GREEN + '[*] = ' + text)
print(Fore.GREEN + '[_] = ' + text)
def print_warn(text: str):

View File

@ -20,10 +20,11 @@
##########################################################################
import threading
import websocket
import time
from collections import deque
from websocket import create_connection
from test_api.apitest.callbacks import * #noqa E403
from test_api.apitest.callbacks import * # noqa E403
from test_api.apitest.logger import print_text, print_error, print_ok, print_info
# from test_api.apitest.logger import print_debug #n
from test_api.apitest.task import Task
@ -40,6 +41,7 @@ class RunTestsController(object):
self.reserved_result_stage = bytes(reserved_result_stage, 'utf-8')
self.received = False
self.tasks = deque()
self.pending = deque()
self.base_url = f'http://{self.address}:{self.http_port}/api/v2/'
def connect(self) -> None:
@ -68,10 +70,10 @@ class RunTestsController(object):
self.received = True
def on_open(self) -> None:
print_info("opened")
print_info("Socket Listener Opened")
def on_close(self) -> None:
print_info("closed")
print_info("Socket Listener Closed")
def on_error(self, error: str) -> None:
print_error(f'WebSocket Error: {error}')
@ -142,6 +144,10 @@ class RunTestsController(object):
while len(self.tasks) > 0:
task = self.tasks.popleft()
task.run()
# we have finished but have pending tasks to complete
if len(self.tasks) == 0 and len(self.pending) > 0:
pend = self.pending.popleft()
self.tasks.append(pend)
def marshal_media(self):
print_ok('Running media test script')
@ -186,10 +192,12 @@ class RunTestsController(object):
service_item_show(self, item)
def check_websocket_changes(self, manditary: int, optional: int) -> None:
while not self.received:
count = 0
while not self.received and count < 200:
time.sleep(0.1)
count += 1
self.compare_stage()
if manditary <= len(self.stage_diff) <= optional:
if manditary <= len(self.stage_diff) <= manditary + optional:
pass
else:
print(f'{manditary} stage field(s) must have changed and '

View File

@ -18,12 +18,16 @@
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
from test_api.apitest.callbacks import human_delay
import time
def human_delay(delay: int = 2) -> None:
time.sleep(delay)
class Task(object):
def __init__(self, caller: object, callback: object, delay: int = 0, poll_m: int = -1,
def __init__(self, caller: object, callback: object, delay: float = 0, poll_m: int = -1,
poll_o: int = -1, ag: str = None) -> None:
self.caller = caller
self.delay = delay
@ -37,7 +41,8 @@ class Task(object):
self.callback(self.caller, self.arg)
else:
self.callback(self.caller)
if self.delay > 0:
human_delay(self.delay)
if self.poll_m >= 0:
self.caller.check_websocket_changes(self.poll_m, self.poll_m)
else:
human_delay(self.delay)