forked from openlp/openlp
Moving over to requests
This commit is contained in:
parent
2f8cdc81e0
commit
15c8023357
@ -52,7 +52,7 @@ class Poller(RegistryProperties):
|
|||||||
'isSecure': Settings().value('api/authentication enabled'),
|
'isSecure': Settings().value('api/authentication enabled'),
|
||||||
'isAuthorised': False,
|
'isAuthorised': False,
|
||||||
'chordNotation': Settings().value('songs/chord notation'),
|
'chordNotation': Settings().value('songs/chord notation'),
|
||||||
'isStagedActive': self.is_stage_active(),
|
'isStageActive': self.is_stage_active(),
|
||||||
'isLiveActive': self.is_live_active(),
|
'isLiveActive': self.is_live_active(),
|
||||||
'isChordsActive': self.is_chords_active()
|
'isChordsActive': self.is_chords_active()
|
||||||
}
|
}
|
||||||
|
@ -25,17 +25,12 @@ The :mod:`openlp.core.utils` module provides the utility libraries for OpenLP.
|
|||||||
import hashlib
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import platform
|
|
||||||
import socket
|
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
|
||||||
import time
|
import time
|
||||||
import urllib.error
|
|
||||||
import urllib.parse
|
|
||||||
import urllib.request
|
|
||||||
from http.client import HTTPException
|
|
||||||
from random import randint
|
from random import randint
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
from openlp.core.common import Registry, trace_error_handler
|
from openlp.core.common import Registry, trace_error_handler
|
||||||
|
|
||||||
log = logging.getLogger(__name__ + '.__init__')
|
log = logging.getLogger(__name__ + '.__init__')
|
||||||
@ -69,33 +64,6 @@ CONNECTION_TIMEOUT = 30
|
|||||||
CONNECTION_RETRIES = 2
|
CONNECTION_RETRIES = 2
|
||||||
|
|
||||||
|
|
||||||
class HTTPRedirectHandlerFixed(urllib.request.HTTPRedirectHandler):
|
|
||||||
"""
|
|
||||||
Special HTTPRedirectHandler used to work around http://bugs.python.org/issue22248
|
|
||||||
(Redirecting to urls with special chars)
|
|
||||||
"""
|
|
||||||
def redirect_request(self, req, fp, code, msg, headers, new_url):
|
|
||||||
#
|
|
||||||
"""
|
|
||||||
Test if the new_url can be decoded to ascii
|
|
||||||
|
|
||||||
:param req:
|
|
||||||
:param fp:
|
|
||||||
:param code:
|
|
||||||
:param msg:
|
|
||||||
:param headers:
|
|
||||||
:param new_url:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
new_url.encode('latin1').decode('ascii')
|
|
||||||
fixed_url = new_url
|
|
||||||
except Exception:
|
|
||||||
# The url could not be decoded to ascii, so we do some url encoding
|
|
||||||
fixed_url = urllib.parse.quote(new_url.encode('latin1').decode('utf-8', 'replace'), safe='/:')
|
|
||||||
return super(HTTPRedirectHandlerFixed, self).redirect_request(req, fp, code, msg, headers, fixed_url)
|
|
||||||
|
|
||||||
|
|
||||||
def get_user_agent():
|
def get_user_agent():
|
||||||
"""
|
"""
|
||||||
Return a user agent customised for the platform the user is on.
|
Return a user agent customised for the platform the user is on.
|
||||||
@ -107,7 +75,7 @@ def get_user_agent():
|
|||||||
return browser_list[random_index]
|
return browser_list[random_index]
|
||||||
|
|
||||||
|
|
||||||
def get_web_page(url, header=None, update_openlp=False):
|
def get_web_page(url, headers=None, update_openlp=False, proxies=None):
|
||||||
"""
|
"""
|
||||||
Attempts to download the webpage at url and returns that page or None.
|
Attempts to download the webpage at url and returns that page or None.
|
||||||
|
|
||||||
@ -116,71 +84,37 @@ def get_web_page(url, header=None, update_openlp=False):
|
|||||||
:param update_openlp: Tells OpenLP to update itself if the page is successfully downloaded.
|
:param update_openlp: Tells OpenLP to update itself if the page is successfully downloaded.
|
||||||
Defaults to False.
|
Defaults to False.
|
||||||
"""
|
"""
|
||||||
# TODO: Add proxy usage. Get proxy info from OpenLP settings, add to a
|
|
||||||
# proxy_handler, build into an opener and install the opener into urllib2.
|
|
||||||
# http://docs.python.org/library/urllib2.html
|
|
||||||
if not url:
|
if not url:
|
||||||
return None
|
return None
|
||||||
# This is needed to work around http://bugs.python.org/issue22248 and https://bugs.launchpad.net/openlp/+bug/1251437
|
if headers and 'user-agent' not in [key.lower() for key in headers.keys()]:
|
||||||
opener = urllib.request.build_opener(HTTPRedirectHandlerFixed())
|
headers['User-Agent'] = get_user_agent()
|
||||||
urllib.request.install_opener(opener)
|
|
||||||
req = urllib.request.Request(url)
|
|
||||||
if not header or header[0].lower() != 'user-agent':
|
|
||||||
user_agent = get_user_agent()
|
|
||||||
req.add_header('User-Agent', user_agent)
|
|
||||||
if header:
|
|
||||||
req.add_header(header[0], header[1])
|
|
||||||
log.debug('Downloading URL = %s' % url)
|
log.debug('Downloading URL = %s' % url)
|
||||||
retries = 0
|
retries = 0
|
||||||
while retries <= CONNECTION_RETRIES:
|
while retries < CONNECTION_RETRIES:
|
||||||
retries += 1
|
# Put this at the bottom
|
||||||
time.sleep(0.1)
|
# retries += 1
|
||||||
|
# time.sleep(0.1)
|
||||||
try:
|
try:
|
||||||
page = urllib.request.urlopen(req, timeout=CONNECTION_TIMEOUT)
|
response = requests.get(url, headers=headers, proxies=proxies, timeout=float(CONNECTION_TIMEOUT))
|
||||||
log.debug('Downloaded page {text}'.format(text=page.geturl()))
|
log.debug('Downloaded page {url}'.format(url=response.url))
|
||||||
break
|
break
|
||||||
except urllib.error.URLError as err:
|
except IOError:
|
||||||
log.exception('URLError on {text}'.format(text=url))
|
# For now, catch IOError. All requests errors inherit from IOError
|
||||||
log.exception('URLError: {text}'.format(text=err.reason))
|
log.exception('Unable to connect to {url}'.format(url=url))
|
||||||
page = None
|
response = None
|
||||||
if retries > CONNECTION_RETRIES:
|
if retries > CONNECTION_RETRIES:
|
||||||
raise
|
raise ConnectionError('Unable to connect to {url}, see log for details'.format(url=url))
|
||||||
except socket.timeout:
|
|
||||||
log.exception('Socket timeout: {text}'.format(text=url))
|
|
||||||
page = None
|
|
||||||
if retries > CONNECTION_RETRIES:
|
|
||||||
raise
|
|
||||||
except socket.gaierror:
|
|
||||||
log.exception('Socket gaierror: {text}'.format(text=url))
|
|
||||||
page = None
|
|
||||||
if retries > CONNECTION_RETRIES:
|
|
||||||
raise
|
|
||||||
except ConnectionRefusedError:
|
|
||||||
log.exception('ConnectionRefused: {text}'.format(text=url))
|
|
||||||
page = None
|
|
||||||
if retries > CONNECTION_RETRIES:
|
|
||||||
raise
|
|
||||||
break
|
|
||||||
except ConnectionError:
|
|
||||||
log.exception('Connection error: {text}'.format(text=url))
|
|
||||||
page = None
|
|
||||||
if retries > CONNECTION_RETRIES:
|
|
||||||
raise
|
|
||||||
except HTTPException:
|
|
||||||
log.exception('HTTPException error: {text}'.format(text=url))
|
|
||||||
page = None
|
|
||||||
if retries > CONNECTION_RETRIES:
|
|
||||||
raise
|
|
||||||
except:
|
except:
|
||||||
# Don't know what's happening, so reraise the original
|
# Don't know what's happening, so reraise the original
|
||||||
|
log.exception('Unknown error when trying to connect to {url}'.format(url=url))
|
||||||
raise
|
raise
|
||||||
if update_openlp:
|
if update_openlp:
|
||||||
Registry().get('application').process_events()
|
Registry().get('application').process_events()
|
||||||
if not page:
|
if not response or not response.text:
|
||||||
log.exception('{text} could not be downloaded'.format(text=url))
|
log.error('{url} could not be downloaded'.format(url=url))
|
||||||
return None
|
return None
|
||||||
log.debug(page)
|
log.debug(response.text)
|
||||||
return page
|
return response.text
|
||||||
|
|
||||||
|
|
||||||
def get_url_file_size(url):
|
def get_url_file_size(url):
|
||||||
@ -192,19 +126,18 @@ def get_url_file_size(url):
|
|||||||
retries = 0
|
retries = 0
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
site = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
|
response = requests.head(url, timeout=float(CONNECTION_TIMEOUT), allow_redirects=True)
|
||||||
meta = site.info()
|
return int(response.headers['Content-Length'])
|
||||||
return int(meta.get("Content-Length"))
|
except IOError:
|
||||||
except urllib.error.URLError:
|
|
||||||
if retries > CONNECTION_RETRIES:
|
if retries > CONNECTION_RETRIES:
|
||||||
raise
|
raise ConnectionError('Unable to download {url}'.format(url=url))
|
||||||
else:
|
else:
|
||||||
retries += 1
|
retries += 1
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
||||||
def url_get_file(callback, url, f_path, sha256=None):
|
def url_get_file(callback, url, file_path, sha256=None):
|
||||||
""""
|
""""
|
||||||
Download a file given a URL. The file is retrieved in chunks, giving the ability to cancel the download at any
|
Download a file given a URL. The file is retrieved in chunks, giving the ability to cancel the download at any
|
||||||
point. Returns False on download error.
|
point. Returns False on download error.
|
||||||
@ -217,56 +150,42 @@ def url_get_file(callback, url, f_path, sha256=None):
|
|||||||
block_count = 0
|
block_count = 0
|
||||||
block_size = 4096
|
block_size = 4096
|
||||||
retries = 0
|
retries = 0
|
||||||
log.debug("url_get_file: " + url)
|
log.debug('url_get_file: %s', url)
|
||||||
while True:
|
while retries < CONNECTION_RETRIES:
|
||||||
try:
|
try:
|
||||||
filename = open(f_path, "wb")
|
with open(file_path, 'wb') as saved_file:
|
||||||
url_file = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
|
response = requests.get(url, timeout=float(CONNECTION_TIMEOUT), stream=True)
|
||||||
if sha256:
|
|
||||||
hasher = hashlib.sha256()
|
|
||||||
# Download until finished or canceled.
|
|
||||||
while not callback.was_cancelled:
|
|
||||||
data = url_file.read(block_size)
|
|
||||||
if not data:
|
|
||||||
break
|
|
||||||
filename.write(data)
|
|
||||||
if sha256:
|
if sha256:
|
||||||
hasher.update(data)
|
hasher = hashlib.sha256()
|
||||||
block_count += 1
|
# Download until finished or canceled.
|
||||||
callback._download_progress(block_count, block_size)
|
for chunk in response.iter_content(chunk_size=block_size):
|
||||||
filename.close()
|
if callback.was_cancelled:
|
||||||
|
break
|
||||||
|
saved_file.write(chunk)
|
||||||
|
if sha256:
|
||||||
|
hasher.update(chunk)
|
||||||
|
block_count += 1
|
||||||
|
callback._download_progress(block_count, block_size)
|
||||||
|
response.close()
|
||||||
if sha256 and hasher.hexdigest() != sha256:
|
if sha256 and hasher.hexdigest() != sha256:
|
||||||
log.error('sha256 sums did not match for file: {file}'.format(file=f_path))
|
log.error('sha256 sums did not match for file %s, got %s, expected %s', file_path, hasher.hexdigest(),
|
||||||
os.remove(f_path)
|
sha256)
|
||||||
|
os.remove(file_path)
|
||||||
return False
|
return False
|
||||||
except (urllib.error.URLError, socket.timeout) as err:
|
break
|
||||||
|
except IOError:
|
||||||
trace_error_handler(log)
|
trace_error_handler(log)
|
||||||
filename.close()
|
os.remove(file_path)
|
||||||
os.remove(f_path)
|
|
||||||
if retries > CONNECTION_RETRIES:
|
if retries > CONNECTION_RETRIES:
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
retries += 1
|
retries += 1
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
break
|
|
||||||
# Delete file if cancelled, it may be a partial file.
|
# Delete file if cancelled, it may be a partial file.
|
||||||
if callback.was_cancelled:
|
if callback.was_cancelled:
|
||||||
os.remove(f_path)
|
os.remove(file_path)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def ping(host):
|
|
||||||
"""
|
|
||||||
Returns True if host responds to a ping request
|
|
||||||
"""
|
|
||||||
# Ping parameters as function of OS
|
|
||||||
ping_str = "-n 1" if platform.system().lower() == "windows" else "-c 1"
|
|
||||||
args = "ping " + " " + ping_str + " " + host
|
|
||||||
need_sh = False if platform.system().lower() == "windows" else True
|
|
||||||
|
|
||||||
# Ping
|
|
||||||
return subprocess.call(args, shell=need_sh) == 0
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ['get_web_page']
|
__all__ = ['get_web_page']
|
||||||
|
@ -181,22 +181,16 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
|
|||||||
self.application.process_events()
|
self.application.process_events()
|
||||||
try:
|
try:
|
||||||
web_config = get_web_page('{host}{name}'.format(host=self.web, name='download.cfg'),
|
web_config = get_web_page('{host}{name}'.format(host=self.web, name='download.cfg'),
|
||||||
header=('User-Agent', user_agent))
|
headers={'User-Agent': user_agent})
|
||||||
except (urllib.error.URLError, ConnectionError) as err:
|
except ConnectionError:
|
||||||
msg = QtWidgets.QMessageBox()
|
QtWidgets.QMessageBox.critical(self, translate('OpenLP.FirstTimeWizard', 'Network Error'),
|
||||||
title = translate('OpenLP.FirstTimeWizard', 'Network Error')
|
translate('OpenLP.FirstTimeWizard', 'There was a network error attempting '
|
||||||
msg.setText('{title} {error}'.format(title=title,
|
'to connect to retrieve initial configuration information'),
|
||||||
error=err.code if hasattr(err, 'code') else ''))
|
QtWidgets.QMessageBox.Ok)
|
||||||
msg.setInformativeText(translate('OpenLP.FirstTimeWizard',
|
|
||||||
'There was a network error attempting to '
|
|
||||||
'connect to retrieve initial configuration information'))
|
|
||||||
msg.setStandardButtons(msg.Ok)
|
|
||||||
ans = msg.exec()
|
|
||||||
web_config = False
|
web_config = False
|
||||||
if web_config:
|
if web_config:
|
||||||
files = web_config.read()
|
|
||||||
try:
|
try:
|
||||||
self.config.read_string(files.decode())
|
self.config.read_string(web_config)
|
||||||
self.web = self.config.get('general', 'base url')
|
self.web = self.config.get('general', 'base url')
|
||||||
self.songs_url = self.web + self.config.get('songs', 'directory') + '/'
|
self.songs_url = self.web + self.config.get('songs', 'directory') + '/'
|
||||||
self.bibles_url = self.web + self.config.get('bibles', 'directory') + '/'
|
self.bibles_url = self.web + self.config.get('bibles', 'directory') + '/'
|
||||||
|
@ -139,7 +139,6 @@ def get_version():
|
|||||||
global APPLICATION_VERSION
|
global APPLICATION_VERSION
|
||||||
if APPLICATION_VERSION:
|
if APPLICATION_VERSION:
|
||||||
return APPLICATION_VERSION
|
return APPLICATION_VERSION
|
||||||
print(sys.argv)
|
|
||||||
if '--dev-version' in sys.argv or '-d' in sys.argv:
|
if '--dev-version' in sys.argv or '-d' in sys.argv:
|
||||||
# NOTE: The following code is a duplicate of the code in setup.py. Any fix applied here should also be applied
|
# NOTE: The following code is a duplicate of the code in setup.py. Any fix applied here should also be applied
|
||||||
# there.
|
# there.
|
||||||
|
@ -93,7 +93,7 @@ class BGExtract(RegistryProperties):
|
|||||||
NAME = 'BibleGateway'
|
NAME = 'BibleGateway'
|
||||||
|
|
||||||
def __init__(self, proxy_url=None):
|
def __init__(self, proxy_url=None):
|
||||||
log.debug('BGExtract.init("{url}")'.format(url=proxy_url))
|
log.debug('BGExtract.init(proxy_url="{url}")'.format(url=proxy_url))
|
||||||
self.proxy_url = proxy_url
|
self.proxy_url = proxy_url
|
||||||
socket.setdefaulttimeout(30)
|
socket.setdefaulttimeout(30)
|
||||||
|
|
||||||
@ -285,15 +285,15 @@ class BGExtract(RegistryProperties):
|
|||||||
log.debug('BGExtract.get_books_from_http("{version}")'.format(version=version))
|
log.debug('BGExtract.get_books_from_http("{version}")'.format(version=version))
|
||||||
url_params = urllib.parse.urlencode({'action': 'getVersionInfo', 'vid': '{version}'.format(version=version)})
|
url_params = urllib.parse.urlencode({'action': 'getVersionInfo', 'vid': '{version}'.format(version=version)})
|
||||||
reference_url = 'http://www.biblegateway.com/versions/?{url}#books'.format(url=url_params)
|
reference_url = 'http://www.biblegateway.com/versions/?{url}#books'.format(url=url_params)
|
||||||
page = get_web_page(reference_url)
|
page_source = get_web_page(reference_url)
|
||||||
if not page:
|
if not page_source:
|
||||||
send_error_message('download')
|
send_error_message('download')
|
||||||
return None
|
return None
|
||||||
page_source = page.read()
|
# TODO: Is this even necessary anymore?
|
||||||
try:
|
# try:
|
||||||
page_source = str(page_source, 'utf8')
|
# page_source = str(page_source, 'utf8')
|
||||||
except UnicodeDecodeError:
|
# except UnicodeDecodeError:
|
||||||
page_source = str(page_source, 'cp1251')
|
# page_source = str(page_source, 'cp1251')
|
||||||
try:
|
try:
|
||||||
soup = BeautifulSoup(page_source, 'lxml')
|
soup = BeautifulSoup(page_source, 'lxml')
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -759,7 +759,7 @@ class HTTPBible(BibleImport, RegistryProperties):
|
|||||||
return BiblesResourcesDB.get_verse_count(book_id, chapter)
|
return BiblesResourcesDB.get_verse_count(book_id, chapter)
|
||||||
|
|
||||||
|
|
||||||
def get_soup_for_bible_ref(reference_url, header=None, pre_parse_regex=None, pre_parse_substitute=None):
|
def get_soup_for_bible_ref(reference_url, headers=None, pre_parse_regex=None, pre_parse_substitute=None):
|
||||||
"""
|
"""
|
||||||
Gets a webpage and returns a parsed and optionally cleaned soup or None.
|
Gets a webpage and returns a parsed and optionally cleaned soup or None.
|
||||||
|
|
||||||
@ -772,15 +772,15 @@ def get_soup_for_bible_ref(reference_url, header=None, pre_parse_regex=None, pre
|
|||||||
if not reference_url:
|
if not reference_url:
|
||||||
return None
|
return None
|
||||||
try:
|
try:
|
||||||
page = get_web_page(reference_url, header, True)
|
page_source = get_web_page(reference_url, headers, update_openlp=True)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
page = None
|
log.exception('Unable to download Bible %s, unknown exception occurred', reference_url)
|
||||||
if not page:
|
page_source = None
|
||||||
|
if not page_source:
|
||||||
send_error_message('download')
|
send_error_message('download')
|
||||||
return None
|
return None
|
||||||
page_source = page.read()
|
|
||||||
if pre_parse_regex and pre_parse_substitute is not None:
|
if pre_parse_regex and pre_parse_substitute is not None:
|
||||||
page_source = re.sub(pre_parse_regex, pre_parse_substitute, page_source.decode())
|
page_source = re.sub(pre_parse_regex, pre_parse_substitute, page_source)
|
||||||
soup = None
|
soup = None
|
||||||
try:
|
try:
|
||||||
soup = BeautifulSoup(page_source, 'lxml')
|
soup = BeautifulSoup(page_source, 'lxml')
|
||||||
|
@ -49,10 +49,10 @@ def download_sha256():
|
|||||||
user_agent = 'OpenLP/' + Registry().get('application').applicationVersion()
|
user_agent = 'OpenLP/' + Registry().get('application').applicationVersion()
|
||||||
try:
|
try:
|
||||||
web_config = get_web_page('{host}{name}'.format(host='https://get.openlp.org/webclient/', name='download.cfg'),
|
web_config = get_web_page('{host}{name}'.format(host='https://get.openlp.org/webclient/', name='download.cfg'),
|
||||||
header=('User-Agent', user_agent))
|
headers={'User-Agent': user_agent})
|
||||||
except (urllib.error.URLError, ConnectionError) as err:
|
except (urllib.error.URLError, ConnectionError) as err:
|
||||||
return False
|
return False
|
||||||
file_bits = web_config.read().decode('utf-8').split()
|
file_bits = web_config.split()
|
||||||
return file_bits[0], file_bits[2]
|
return file_bits[0], file_bits[2]
|
||||||
|
|
||||||
|
|
||||||
|
@ -70,7 +70,7 @@ class TestWSServer(TestCase, TestMixin):
|
|||||||
"""
|
"""
|
||||||
# GIVEN: A new httpserver
|
# GIVEN: A new httpserver
|
||||||
# WHEN: I start the server
|
# WHEN: I start the server
|
||||||
server = WebSocketServer()
|
WebSocketServer()
|
||||||
|
|
||||||
# THEN: the api environment should have been created
|
# THEN: the api environment should have been created
|
||||||
self.assertEquals(1, mock_qthread.call_count, 'The qthread should have been called once')
|
self.assertEquals(1, mock_qthread.call_count, 'The qthread should have been called once')
|
||||||
@ -93,7 +93,7 @@ class TestWSServer(TestCase, TestMixin):
|
|||||||
"""
|
"""
|
||||||
Test the poll function returns the correct JSON
|
Test the poll function returns the correct JSON
|
||||||
"""
|
"""
|
||||||
# WHEN: the system is configured with a set of data
|
# GIVEN: the system is configured with a set of data
|
||||||
mocked_service_manager = MagicMock()
|
mocked_service_manager = MagicMock()
|
||||||
mocked_service_manager.service_id = 21
|
mocked_service_manager.service_id = 21
|
||||||
mocked_live_controller = MagicMock()
|
mocked_live_controller = MagicMock()
|
||||||
@ -105,8 +105,15 @@ class TestWSServer(TestCase, TestMixin):
|
|||||||
mocked_live_controller.desktop_screen.isChecked.return_value = False
|
mocked_live_controller.desktop_screen.isChecked.return_value = False
|
||||||
Registry().register('live_controller', mocked_live_controller)
|
Registry().register('live_controller', mocked_live_controller)
|
||||||
Registry().register('service_manager', mocked_service_manager)
|
Registry().register('service_manager', mocked_service_manager)
|
||||||
|
# WHEN: The poller polls
|
||||||
|
with patch.object(self.poll, 'is_stage_active') as mocked_is_stage_active, \
|
||||||
|
patch.object(self.poll, 'is_live_active') as mocked_is_live_active, \
|
||||||
|
patch.object(self.poll, 'is_chords_active') as mocked_is_chords_active:
|
||||||
|
mocked_is_stage_active.return_value = True
|
||||||
|
mocked_is_live_active.return_value = True
|
||||||
|
mocked_is_chords_active.return_value = True
|
||||||
|
poll_json = self.poll.poll()
|
||||||
# THEN: the live json should be generated and match expected results
|
# THEN: the live json should be generated and match expected results
|
||||||
poll_json = self.poll.poll()
|
|
||||||
self.assertTrue(poll_json['results']['blank'], 'The blank return value should be True')
|
self.assertTrue(poll_json['results']['blank'], 'The blank return value should be True')
|
||||||
self.assertFalse(poll_json['results']['theme'], 'The theme return value should be False')
|
self.assertFalse(poll_json['results']['theme'], 'The theme return value should be False')
|
||||||
self.assertFalse(poll_json['results']['display'], 'The display return value should be False')
|
self.assertFalse(poll_json['results']['display'], 'The display return value should be False')
|
||||||
|
@ -28,7 +28,7 @@ import socket
|
|||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
from openlp.core.common.httputils import get_user_agent, get_web_page, get_url_file_size, url_get_file, ping
|
from openlp.core.common.httputils import get_user_agent, get_web_page, get_url_file_size, url_get_file
|
||||||
|
|
||||||
from tests.helpers.testmixin import TestMixin
|
from tests.helpers.testmixin import TestMixin
|
||||||
|
|
||||||
@ -253,7 +253,7 @@ class TestHttpUtils(TestCase, TestMixin):
|
|||||||
fake_url = 'this://is.a.fake/url'
|
fake_url = 'this://is.a.fake/url'
|
||||||
|
|
||||||
# WHEN: The get_url_file_size() method is called
|
# WHEN: The get_url_file_size() method is called
|
||||||
size = get_url_file_size(fake_url)
|
get_url_file_size(fake_url)
|
||||||
|
|
||||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||||
mock_urlopen.assert_called_with(fake_url, timeout=30)
|
mock_urlopen.assert_called_with(fake_url, timeout=30)
|
||||||
@ -272,29 +272,3 @@ class TestHttpUtils(TestCase, TestMixin):
|
|||||||
# THEN: socket.timeout should have been caught
|
# THEN: socket.timeout should have been caught
|
||||||
# NOTE: Test is if $tmpdir/tempfile is still there, then test fails since ftw deletes bad downloaded files
|
# NOTE: Test is if $tmpdir/tempfile is still there, then test fails since ftw deletes bad downloaded files
|
||||||
self.assertFalse(os.path.exists(self.tempfile), 'FTW url_get_file should have caught socket.timeout')
|
self.assertFalse(os.path.exists(self.tempfile), 'FTW url_get_file should have caught socket.timeout')
|
||||||
|
|
||||||
def test_ping_valid(self):
|
|
||||||
"""
|
|
||||||
Test ping for OpenLP
|
|
||||||
"""
|
|
||||||
# GIVEN: a valid url to test
|
|
||||||
url = "openlp.io"
|
|
||||||
|
|
||||||
# WHEN: Attempt to check the url exists
|
|
||||||
url_found = ping(url)
|
|
||||||
|
|
||||||
# THEN: It should be found
|
|
||||||
self.assertTrue(url_found, 'OpenLP.io is not found')
|
|
||||||
|
|
||||||
def test_ping_invalid(self):
|
|
||||||
"""
|
|
||||||
Test ping for OpenLP
|
|
||||||
"""
|
|
||||||
# GIVEN: a valid url to test
|
|
||||||
url = "trb143.io"
|
|
||||||
|
|
||||||
# WHEN: Attempt to check the url exists
|
|
||||||
url_found = ping(url)
|
|
||||||
|
|
||||||
# THEN: It should be found
|
|
||||||
self.assertFalse(url_found, 'TRB143.io is found')
|
|
||||||
|
Loading…
Reference in New Issue
Block a user