Fix merge issues

This commit is contained in:
Tim 2020-06-26 14:50:29 +01:00
parent b71a1912a3
commit dc62c18d46
No known key found for this signature in database
GPG Key ID: 3D454289AF831A6D
2 changed files with 0 additions and 168 deletions

View File

@ -31,9 +31,6 @@ from test_api.apitest.logger import print_text, print_error, print_ok, print_inf
# from test_api.apitest.logger import print_debug #n
from test_api.apitest.task import Task
from test_api.apitest.logger import print_text, print_error, print_ok, print_info
# from test_api.apitest.logger import print_debug
class RunTestsController(object):
@ -69,15 +66,7 @@ class RunTestsController(object):
print_error("Could not connect to WS! Exiting.")
def on_message(self, message: str) -> None:
<<<<<<< HEAD
# print_debug("Message returned")
=======
<<<<<<< HEAD
print_info("Message returned")
=======
# print_debug("Message returned")
>>>>>>> de2f630de03530dae811b691f625e645b76450e4
>>>>>>> master
self.result_stage = message
self.compare_stage()
self.received = True
@ -141,7 +130,6 @@ class RunTestsController(object):
self.stage_diff[a] = {'before': reserved[a], 'after': current[a]}
self.reserved_result_stage = self.result_stage
<<<<<<< HEAD
def process_queue(self):
while len(self.tasks) > 0:
task = self.tasks.popleft()
@ -150,47 +138,6 @@ class RunTestsController(object):
if len(self.tasks) == 0 and len(self.pending) > 0:
pend = self.pending.popleft()
self.tasks.append(pend)
=======
def marshal_full(self):
print_ok('Running full test script')
if self.load_and_check_sockets(True):
clear_controllers(self)
new_service(self)
search_and_add(self, 'songs', 5)
search_and_add(self, 'bibles', 5)
search_and_add(self, 'images', 1)
search_and_add(self, 'presentations', 1)
search_and_add(self, 'custom', 3)
self.load_service_sequential()
clear_controllers(self)
self.load_service_random()
def marshal_media(self):
print_ok('Running media test script')
if self.load_and_check_sockets(True):
clear_controllers(self)
# search_and_live(self, 'songs', 1)
# media_play(self)
# human_delay()
# clear_controllers(self)
search_and_live(self, 'media', 1)
media_play(self)
human_delay(5)
media_pause(self)
human_delay(5)
media_play(self)
human_delay(5)
media_stop(self)
def marshal_blank(self):
print_ok('Running blank test script')
if self.load_and_check_sockets(True):
new_service(self)
def marshal_alert(self):
print_ok('Running alert test script')
if self.load_and_check_sockets(True):
new_service(self)
def load_service_sequential(self) -> None:
print_text('Load_and_process_service_sequential')
@ -202,7 +149,6 @@ class RunTestsController(object):
pass
else:
service_item_show(self, item)
>>>>>>> master
def load_service_random(self) -> None:
print_text('Load_and process_service_random')
@ -248,8 +194,6 @@ def human_delay(delay: int = 2) -> None:
time.sleep(delay)
<<<<<<< HEAD
=======
def media_play(rtc: RunTestsController) -> None:
print_text('Media_play')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
@ -283,112 +227,6 @@ def new_service(rtc: RunTestsController) -> None:
check_websocket_changes(rtc, 1, 1)
def clear_controllers(rtc: RunTestsController) -> None:
print_text('Clear_controllers')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'controller/clear/live')
assert ret.status_code == 204, f'{ret.status_code} returned from clear live'
check_websocket_changes(rtc, 1, 1)
# Preview clear should not impact the WebSockets.
ret = requests.post(base_url + 'controller/clear/preview')
assert ret.status_code == 204, f'{ret.status_code} returned from clear preview'
check_websocket_changes(rtc, 0, 0)
def search_and_live(rtc: RunTestsController, plugin: str, count: int) -> None:
print_text(f'Search_and_live for {plugin}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(base_url + f'search?text={let}')
if ret.status_code == 200 and len(json.loads(ret.text)) > 0:
break
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_service = [1]
if limit > 0:
random_service = [random.randint(1, limit) for itr in range(count)]
for pos in random_service:
item = items[pos - 1]
ret = requests.post(base_url + 'live', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
human_delay()
def search_and_add(rtc: RunTestsController, plugin: str, count: int) -> None:
print_text(f'Search_and_add for {plugin}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/plugins/{plugin}/'
if plugin == 'bibles':
for i in range(1, count):
bk_id = random.randint(1, len(BookNames) - 1)
bk = BookNames[bk_id]
ch = random.randint(1, 10)
vse = random.randint(1, 10)
let = f'{bk} {ch}:1-{vse}'
ret = requests.get(base_url + f'search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from searcg'
items = json.loads(ret.text)
if items:
ret = requests.post(base_url + 'add', json=dict(id=items[0][0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
human_delay()
else:
if plugin == 'media':
while True:
let = random.choice(string.ascii_letters)
ret = requests.get(base_url + f'search?text={let}')
if ret.status_code == 200:
break
else:
let = random.choice(string.ascii_letters)
ret = requests.get(base_url + f'search?text={let}')
assert ret.status_code == 200, f'{ret.status_code} returned from search'
items = json.loads(ret.text)
limit = len(items) - 1
if limit >= 0:
random_service = [1]
if limit > 0:
random_service = [random.randint(1, limit) for itr in range(count)]
for pos in random_service:
item = items[pos - 1]
ret = requests.post(base_url + 'add', json=dict(id=item[0]))
assert ret.status_code == 204, f'{ret.status_code} returned from add'
human_delay()
>>>>>>> master
def service_item_show(rtc: RunTestsController, item: dict) -> None:
print_text('Service_item_show')
title = item['title']
id = item['id']
print_text(f'test_service_song {title} {id}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.post(base_url + 'service/show', json=dict(uid=id))
human_delay()
assert ret.status_code == 204, ret.status_code
check_websocket_changes(rtc, 1, 2)
live_item(rtc, item['plugin'])
def live_item(rtc: RunTestsController, plugin: str) -> None:
print_text(f'test_live_item - {plugin}')
base_url = f'http://{rtc.address}:{rtc.http_port}/api/v2/'
ret = requests.get(base_url + 'controller/live-item')
assert ret.status_code == 200, f'{ret.status_code} returned from live_item'
i = 0
for _ in json.loads(ret.text):
ret = requests.post(base_url + 'controller/show', json=dict(id=i))
i += 1
human_delay()
assert ret.status_code == 204, f'{ret.status_code} returned from show'
if plugin in {'image'}:
check_websocket_changes(rtc, 0, 0)
else:
check_websocket_changes(rtc, 1, 2)
def check_websocket_changes(rtc: RunTestsController, manditary: int, optional: int) -> None:
while not rtc.received:
time.sleep(0.1)

View File

@ -42,14 +42,8 @@ def start() -> None:
print_ok(f'OpenLP is running on port (ws) {op_ws_port}')
rtc = RunTestsController(op_address, op_http_port, op_ws_port)
rtc.connect()
<<<<<<< HEAD
rtc.read_command_file()
print_ok('Finished running tests')
=======
rtc.marshal_full()
# nrtc.marshal_media()
print_text('Finished running tests')
>>>>>>> master
if __name__ == '__main__':