Refactor the code to use a task queue to simplify things
This commit is contained in:
parent
f9c603eeea
commit
3900613cb1
|
@ -0,0 +1,67 @@
|
|||
import json
|
||||
import random
|
||||
import requests
|
||||
import string
|
||||
import time
|
||||
|
||||
from test_api.apitest.constants import BookNames
|
||||
from test_api.apitest.logger import print_text, print_error, print_ok, print_info
|
||||
|
||||
|
||||
def human_delay(delay: int = 2) -> None:
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
def clear_live_controller(rtc: object) -> None:
|
||||
print_text('Clear_live_controllers')
|
||||
ret = requests.post(rtc.base_url + 'controller/clear/live')
|
||||
assert ret.status_code == 204, f'{ret.status_code} returned from clear live'
|
||||
|
||||
|
||||
def clear_preview_controller(rtc: object) -> None:
|
||||
print_text('Clear_preview_controllers')
|
||||
# Preview clear should not impact the WebSockets.
|
||||
ret = requests.post(rtc.base_url + 'controller/clear/preview')
|
||||
assert ret.status_code == 204, f'{ret.status_code} returned from clear preview'
|
||||
|
||||
|
||||
def new_service(rtc: object) -> None:
|
||||
print_text('New_service')
|
||||
ret = requests.get(rtc.base_url + 'service/new')
|
||||
assert ret.status_code == 204, f'{ret.status_code} returned'
|
||||
|
||||
|
||||
def search_and_add1(rtc: object, plugin: str) -> None:
|
||||
print_text(f'Search_and_add for {plugin}')
|
||||
if plugin == 'bibles':
|
||||
bk_id = random.randint(1, len(BookNames) - 1)
|
||||
bk = BookNames[bk_id]
|
||||
ch = random.randint(1, 10)
|
||||
vse = random.randint(1, 10)
|
||||
let = f'{bk} {ch}:1-{vse}'
|
||||
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
|
||||
assert ret.status_code == 200, f'{ret.status_code} returned from search'
|
||||
items = json.loads(ret.text)
|
||||
if items:
|
||||
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=items[0][0]))
|
||||
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
||||
else:
|
||||
#if plugin == 'media':
|
||||
# while True:
|
||||
# let = random.choice(string.ascii_letters)
|
||||
# ret = requests.get(base_url + f'search?text={let}')
|
||||
# if ret.status_code == 200:
|
||||
# break
|
||||
#else:
|
||||
let = random.choice(string.ascii_letters)
|
||||
ret = requests.get(rtc.base_url + f'plugins/{plugin}/search?text={let}')
|
||||
assert ret.status_code == 200, f'{ret.status_code} returned from search'
|
||||
items = json.loads(ret.text)
|
||||
limit = len(items) - 1
|
||||
if limit >= 0:
|
||||
random_item = 1
|
||||
if limit > 0:
|
||||
random_item = random.randint(1, limit) - 1
|
||||
item = items[random_item - 1]
|
||||
ret = requests.post(rtc.base_url + f'plugins/{plugin}/add', json=dict(id=item[0]))
|
||||
assert ret.status_code == 204, f'{ret.status_code} returned from add'
|
|
@ -18,18 +18,16 @@
|
|||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
##########################################################################
|
||||
import json
|
||||
import requests
|
||||
import string
|
||||
import time
|
||||
import random
|
||||
import threading
|
||||
import websocket
|
||||
|
||||
from collections import deque
|
||||
from websocket import create_connection
|
||||
from test_api.apitest.constants import BookNames
|
||||
from test_api.apitest.callbacks import *
|
||||
from test_api.apitest.logger import print_text, print_error, print_ok, print_info
|
||||
# from test_api.apitest.logger import print_debug
|
||||
from test_api.apitest.task import Task
|
||||
|
||||
|
||||
class RunTestsController(object):
|
||||
|
@ -38,8 +36,12 @@ class RunTestsController(object):
|
|||
self.address = address
|
||||
self.http_port = http_port
|
||||
self.ws_port = ws_port
|
||||
self.reserved_result_stage = None
|
||||
reserved_result_stage = {'results': ''}
|
||||
reserved_result_stage = json.dumps(reserved_result_stage)
|
||||
self.reserved_result_stage = bytes(reserved_result_stage, 'utf-8')
|
||||
self.received = False
|
||||
self.tasks = deque()
|
||||
self.base_url = f'http://{self.address}:{self.http_port}/api/v2/'
|
||||
|
||||
def connect(self) -> None:
|
||||
print_info("Starting thread")
|
||||
|
@ -103,31 +105,51 @@ class RunTestsController(object):
|
|||
current = json.loads(self.result_stage.decode('utf-8'))['results']
|
||||
self.received = False
|
||||
# compare_strings(reserved, current)
|
||||
assert len(reserved) == len(current)
|
||||
for a, _ in current.items():
|
||||
# compare_strings(reserved[a], current[a])
|
||||
if reserved[a] != current[a]:
|
||||
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
|
||||
if len(reserved) > 0:
|
||||
assert len(reserved) == len(current)
|
||||
for a, _ in current.items():
|
||||
# compare_strings(reserved[a], current[a])
|
||||
if reserved[a] != current[a]:
|
||||
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
|
||||
self.reserved_result_stage = self.result_stage
|
||||
|
||||
def marshal_full(self):
|
||||
print_ok('Running full test script')
|
||||
if self.load_and_check_sockets(True):
|
||||
clear_controllers(self)
|
||||
new_service(self)
|
||||
search_and_add(self, 'songs', 5)
|
||||
search_and_add(self, 'bibles', 5)
|
||||
search_and_add(self, 'images', 1)
|
||||
search_and_add(self, 'presentations', 1)
|
||||
search_and_add(self, 'custom', 3)
|
||||
self.load_service_sequential()
|
||||
clear_controllers(self)
|
||||
self.load_service_random()
|
||||
self.tasks.append(Task(self, clear_live_controller, poll_m=1, poll_o=1))
|
||||
self.tasks.append(Task(self, clear_preview_controller, delay=1))
|
||||
self.tasks.append(Task(self, new_service, poll_m=1, poll_o=1))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='custom'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='images'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='presentations'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='custom'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='custom'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='songs'))
|
||||
self.tasks.append(Task(self, search_and_add1, delay=1, ag='bibles'))
|
||||
#self.load_service_sequential()
|
||||
#clear_controllers(self)
|
||||
#self.load_service_random()
|
||||
self.process_queue()
|
||||
|
||||
def process_queue(self):
|
||||
while len(self.tasks) > 0:
|
||||
task = self.tasks.popleft()
|
||||
task.run()
|
||||
|
||||
def marshal_media(self):
|
||||
print_ok('Running media test script')
|
||||
if self.load_and_check_sockets(True):
|
||||
clear_controllers(self)
|
||||
#clear_controllers(self)
|
||||
# search_and_live(self, 'songs', 1)
|
||||
# media_play(self)
|
||||
# human_delay()
|
||||
|
@ -173,6 +195,16 @@ class RunTestsController(object):
|
|||
item = service[pos - 1]
|
||||
service_item_show(self, item)
|
||||
|
||||
def check_websocket_changes(self, manditary: int, optional: int) -> None:
|
||||
while not self.received:
|
||||
time.sleep(0.1)
|
||||
self.compare_stage()
|
||||
if manditary <= len(self.stage_diff) <= optional:
|
||||
pass
|
||||
else:
|
||||
print(f'{manditary} stage field(s) must have changed and '
|
||||
f'{optional} may change changed- {str(self.stage_diff)}')
|
||||
|
||||
|
||||
def compare_strings(a, b):
|
||||
import difflib
|
||||
|
@ -190,6 +222,7 @@ def compare_strings(a, b):
|
|||
print('{} => {} are different'.format(a, b))
|
||||
|
||||
|
||||
|
||||
def human_delay(delay: int = 2) -> None:
|
||||
time.sleep(delay)
|
||||
|
||||
|
@ -218,27 +251,6 @@ def media_stop(rtc: RunTestsController) -> None:
|
|||
human_delay()
|
||||
|
||||
|
||||
def new_service(rtc: RunTestsController) -> None:
|
||||
print_text('New_service')
|
||||
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
||||
ret = requests.get(base_url + 'service/new')
|
||||
assert ret.status_code == 204, f'{ret.status_code} returned'
|
||||
human_delay()
|
||||
check_websocket_changes(rtc, 1, 1)
|
||||
|
||||
|
||||
def clear_controllers(rtc: RunTestsController) -> None:
|
||||
print_text('Clear_controllers')
|
||||
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
|
||||
ret = requests.post(base_url + 'controller/clear/live')
|
||||
assert ret.status_code == 204, f'{ret.status_code} returned from clear live'
|
||||
check_websocket_changes(rtc, 1, 1)
|
||||
# Preview clear should not impact the WebSockets.
|
||||
ret = requests.post(base_url + 'controller/clear/preview')
|
||||
assert ret.status_code == 204, f'{ret.status_code} returned from clear preview'
|
||||
check_websocket_changes(rtc, 0, 0)
|
||||
|
||||
|
||||
def search_and_live(rtc: RunTestsController, plugin: str, count: int) -> None:
|
||||
print_text(f'Search_and_live for {plugin}')
|
||||
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
from test_api.apitest.callbacks import human_delay
|
||||
|
||||
|
||||
class Task(object):
|
||||
|
||||
def __init__(self, caller: object, callback: object, delay: int = 0, poll_m: int = -1,
|
||||
poll_o: int = -1, ag: str = None) -> None:
|
||||
self.caller = caller
|
||||
self.delay = delay
|
||||
self.poll_m = poll_m
|
||||
self.poll_o = poll_o
|
||||
self.__setattr__("callback", callback)
|
||||
self.arg = ag
|
||||
|
||||
def run(self):
|
||||
if self.arg:
|
||||
self.callback(self.caller, self.arg)
|
||||
else:
|
||||
self.callback(self.caller)
|
||||
if self.poll_m >= 0:
|
||||
self.caller.check_websocket_changes(self.poll_m, self.poll_m)
|
||||
else:
|
||||
human_delay(self.delay)
|
Loading…
Reference in New Issue