forked from openlp/openlp
Move some http code around to allow it to be reused in the remote updates.
lp:~trb143/openlp/httpfixes (revision 2718) [SUCCESS] https://ci.openlp.io/job/Branch-01-Pull/1888/ [SUCCESS] https://ci.openlp.io/job/Branch-02-Functional-Tests/1799/ [SUCCESS] https://ci.openlp.io/job/Branch-03-Interface-Tests/1738/ [SUCCESS] https://ci.openlp.io/job/Branch-04a-Windows_Functional_Tests/1474/ [SUCCESS] https://ci.openlp.io/job/Branch-04b-Windows_Interface_Tests/1064/ [SUCCESS] https://ci.openlp.io/jo... bzr-revno: 2713
This commit is contained in:
commit
ff0e8dce0c
@ -22,7 +22,9 @@
|
||||
"""
|
||||
The :mod:`openlp.core.utils` module provides the utility libraries for OpenLP.
|
||||
"""
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
@ -32,7 +34,7 @@ import urllib.request
|
||||
from http.client import HTTPException
|
||||
from random import randint
|
||||
|
||||
from openlp.core.common import Registry
|
||||
from openlp.core.common import Registry, trace_error_handler
|
||||
|
||||
log = logging.getLogger(__name__ + '.__init__')
|
||||
|
||||
@ -92,7 +94,7 @@ class HTTPRedirectHandlerFixed(urllib.request.HTTPRedirectHandler):
|
||||
return super(HTTPRedirectHandlerFixed, self).redirect_request(req, fp, code, msg, headers, fixed_url)
|
||||
|
||||
|
||||
def _get_user_agent():
|
||||
def get_user_agent():
|
||||
"""
|
||||
Return a user agent customised for the platform the user is on.
|
||||
"""
|
||||
@ -122,7 +124,7 @@ def get_web_page(url, header=None, update_openlp=False):
|
||||
urllib.request.install_opener(opener)
|
||||
req = urllib.request.Request(url)
|
||||
if not header or header[0].lower() != 'user-agent':
|
||||
user_agent = _get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
req.add_header('User-Agent', user_agent)
|
||||
if header:
|
||||
req.add_header(header[0], header[1])
|
||||
@ -179,4 +181,75 @@ def get_web_page(url, header=None, update_openlp=False):
|
||||
return page
|
||||
|
||||
|
||||
def get_url_file_size(url):
|
||||
"""
|
||||
Get the size of a file.
|
||||
|
||||
:param url: The URL of the file we want to download.
|
||||
"""
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
site = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
|
||||
meta = site.info()
|
||||
return int(meta.get("Content-Length"))
|
||||
except urllib.error.URLError:
|
||||
if retries > CONNECTION_RETRIES:
|
||||
raise
|
||||
else:
|
||||
retries += 1
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
|
||||
def url_get_file(callback, url, f_path, sha256=None):
|
||||
""""
|
||||
Download a file given a URL. The file is retrieved in chunks, giving the ability to cancel the download at any
|
||||
point. Returns False on download error.
|
||||
|
||||
:param callback: the class which needs to be updated
|
||||
:param url: URL to download
|
||||
:param f_path: Destination file
|
||||
:param sha256: The check sum value to be checked against the download value
|
||||
"""
|
||||
block_count = 0
|
||||
block_size = 4096
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
filename = open(f_path, "wb")
|
||||
url_file = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
|
||||
if sha256:
|
||||
hasher = hashlib.sha256()
|
||||
# Download until finished or canceled.
|
||||
while not callback.was_cancelled:
|
||||
data = url_file.read(block_size)
|
||||
if not data:
|
||||
break
|
||||
filename.write(data)
|
||||
if sha256:
|
||||
hasher.update(data)
|
||||
block_count += 1
|
||||
callback._download_progress(block_count, block_size)
|
||||
filename.close()
|
||||
if sha256 and hasher.hexdigest() != sha256:
|
||||
log.error('sha256 sums did not match for file: {file}'.format(file=f_path))
|
||||
os.remove(f_path)
|
||||
return False
|
||||
except (urllib.error.URLError, socket.timeout) as err:
|
||||
trace_error_handler(log)
|
||||
filename.close()
|
||||
os.remove(f_path)
|
||||
if retries > CONNECTION_RETRIES:
|
||||
return False
|
||||
else:
|
||||
retries += 1
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
break
|
||||
# Delete file if cancelled, it may be a partial file.
|
||||
if callback.was_cancelled:
|
||||
os.remove(f_path)
|
||||
return True
|
||||
|
||||
__all__ = ['get_web_page']
|
@ -22,7 +22,6 @@
|
||||
"""
|
||||
This module contains the first time wizard.
|
||||
"""
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
@ -39,7 +38,7 @@ from openlp.core.common import Registry, RegistryProperties, AppLocation, Settin
|
||||
translate, clean_button_text, trace_error_handler
|
||||
from openlp.core.lib import PluginStatus, build_icon
|
||||
from openlp.core.lib.ui import critical_error_message_box
|
||||
from openlp.core.lib.webpagereader import get_web_page, CONNECTION_RETRIES, CONNECTION_TIMEOUT
|
||||
from openlp.core.common.httputils import get_web_page, get_url_file_size, url_get_file, CONNECTION_TIMEOUT
|
||||
from .firsttimewizard import UiFirstTimeWizard, FirstTimePage
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -395,54 +394,6 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
|
||||
self.was_cancelled = True
|
||||
self.close()
|
||||
|
||||
def url_get_file(self, url, f_path, sha256=None):
|
||||
""""
|
||||
Download a file given a URL. The file is retrieved in chunks, giving the ability to cancel the download at any
|
||||
point. Returns False on download error.
|
||||
|
||||
:param url: URL to download
|
||||
:param f_path: Destination file
|
||||
"""
|
||||
block_count = 0
|
||||
block_size = 4096
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
filename = open(f_path, "wb")
|
||||
url_file = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
|
||||
if sha256:
|
||||
hasher = hashlib.sha256()
|
||||
# Download until finished or canceled.
|
||||
while not self.was_cancelled:
|
||||
data = url_file.read(block_size)
|
||||
if not data:
|
||||
break
|
||||
filename.write(data)
|
||||
if sha256:
|
||||
hasher.update(data)
|
||||
block_count += 1
|
||||
self._download_progress(block_count, block_size)
|
||||
filename.close()
|
||||
if sha256 and hasher.hexdigest() != sha256:
|
||||
log.error('sha256 sums did not match for file: {file}'.format(file=f_path))
|
||||
os.remove(f_path)
|
||||
return False
|
||||
except (urllib.error.URLError, socket.timeout) as err:
|
||||
trace_error_handler(log)
|
||||
filename.close()
|
||||
os.remove(f_path)
|
||||
if retries > CONNECTION_RETRIES:
|
||||
return False
|
||||
else:
|
||||
retries += 1
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
break
|
||||
# Delete file if cancelled, it may be a partial file.
|
||||
if self.was_cancelled:
|
||||
os.remove(f_path)
|
||||
return True
|
||||
|
||||
def _build_theme_screenshots(self):
|
||||
"""
|
||||
This method builds the theme screenshots' icons for all items in the ``self.themes_list_widget``.
|
||||
@ -455,26 +406,6 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
|
||||
if item:
|
||||
item.setIcon(build_icon(os.path.join(gettempdir(), 'openlp', screenshot)))
|
||||
|
||||
def _get_file_size(self, url):
|
||||
"""
|
||||
Get the size of a file.
|
||||
|
||||
:param url: The URL of the file we want to download.
|
||||
"""
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
site = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
|
||||
meta = site.info()
|
||||
return int(meta.get("Content-Length"))
|
||||
except urllib.error.URLError:
|
||||
if retries > CONNECTION_RETRIES:
|
||||
raise
|
||||
else:
|
||||
retries += 1
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
def _download_progress(self, count, block_size):
|
||||
"""
|
||||
Calculate and display the download progress.
|
||||
@ -510,7 +441,7 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
|
||||
item = self.songs_list_widget.item(i)
|
||||
if item.checkState() == QtCore.Qt.Checked:
|
||||
filename, sha256 = item.data(QtCore.Qt.UserRole)
|
||||
size = self._get_file_size('{path}{name}'.format(path=self.songs_url, name=filename))
|
||||
size = get_url_file_size('{path}{name}'.format(path=self.songs_url, name=filename))
|
||||
self.max_progress += size
|
||||
# Loop through the Bibles list and increase for each selected item
|
||||
iterator = QtWidgets.QTreeWidgetItemIterator(self.bibles_tree_widget)
|
||||
@ -519,7 +450,7 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
|
||||
item = iterator.value()
|
||||
if item.parent() and item.checkState(0) == QtCore.Qt.Checked:
|
||||
filename, sha256 = item.data(0, QtCore.Qt.UserRole)
|
||||
size = self._get_file_size('{path}{name}'.format(path=self.bibles_url, name=filename))
|
||||
size = get_url_file_size('{path}{name}'.format(path=self.bibles_url, name=filename))
|
||||
self.max_progress += size
|
||||
iterator += 1
|
||||
# Loop through the themes list and increase for each selected item
|
||||
@ -528,7 +459,7 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
|
||||
item = self.themes_list_widget.item(i)
|
||||
if item.checkState() == QtCore.Qt.Checked:
|
||||
filename, sha256 = item.data(QtCore.Qt.UserRole)
|
||||
size = self._get_file_size('{path}{name}'.format(path=self.themes_url, name=filename))
|
||||
size = get_url_file_size('{path}{name}'.format(path=self.themes_url, name=filename))
|
||||
self.max_progress += size
|
||||
except urllib.error.URLError:
|
||||
trace_error_handler(log)
|
||||
@ -636,8 +567,8 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
|
||||
self._increment_progress_bar(self.downloading.format(name=filename), 0)
|
||||
self.previous_size = 0
|
||||
destination = os.path.join(songs_destination, str(filename))
|
||||
if not self.url_get_file('{path}{name}'.format(path=self.songs_url, name=filename),
|
||||
destination, sha256):
|
||||
if not url_get_file(self, '{path}{name}'.format(path=self.songs_url, name=filename),
|
||||
destination, sha256):
|
||||
missed_files.append('Song: {name}'.format(name=filename))
|
||||
# Download Bibles
|
||||
bibles_iterator = QtWidgets.QTreeWidgetItemIterator(self.bibles_tree_widget)
|
||||
@ -648,9 +579,9 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
|
||||
# TODO: Tested at home
|
||||
self._increment_progress_bar(self.downloading.format(name=bible), 0)
|
||||
self.previous_size = 0
|
||||
if not self.url_get_file('{path}{name}'.format(path=self.bibles_url, name=bible),
|
||||
os.path.join(bibles_destination, bible),
|
||||
sha256):
|
||||
if not url_get_file(self, '{path}{name}'.format(path=self.bibles_url, name=bible),
|
||||
os.path.join(bibles_destination, bible),
|
||||
sha256):
|
||||
missed_files.append('Bible: {name}'.format(name=bible))
|
||||
bibles_iterator += 1
|
||||
# Download themes
|
||||
@ -661,9 +592,9 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
|
||||
# TODO: Tested at home
|
||||
self._increment_progress_bar(self.downloading.format(name=theme), 0)
|
||||
self.previous_size = 0
|
||||
if not self.url_get_file('{path}{name}'.format(path=self.themes_url, name=theme),
|
||||
os.path.join(themes_destination, theme),
|
||||
sha256):
|
||||
if not url_get_file(self, '{path}{name}'.format(path=self.themes_url, name=theme),
|
||||
os.path.join(themes_destination, theme),
|
||||
sha256):
|
||||
missed_files.append('Theme: {name}'.format(name=theme))
|
||||
if missed_files:
|
||||
file_list = ''
|
||||
|
@ -32,7 +32,7 @@ from bs4 import BeautifulSoup, NavigableString, Tag
|
||||
|
||||
from openlp.core.common import Registry, RegistryProperties, translate
|
||||
from openlp.core.lib.ui import critical_error_message_box
|
||||
from openlp.core.lib.webpagereader import get_web_page
|
||||
from openlp.core.common.httputils import get_web_page
|
||||
from openlp.plugins.bibles.lib import SearchResults
|
||||
from openlp.plugins.bibles.lib.bibleimport import BibleImport
|
||||
from openlp.plugins.bibles.lib.db import BibleDB, BiblesResourcesDB, Book
|
||||
|
@ -22,28 +22,40 @@
|
||||
"""
|
||||
Functional tests to test the AppLocation class and related methods.
|
||||
"""
|
||||
import os
|
||||
import tempfile
|
||||
import socket
|
||||
from unittest import TestCase
|
||||
|
||||
from openlp.core.lib.webpagereader import _get_user_agent, get_web_page
|
||||
from openlp.core.common.httputils import get_user_agent, get_web_page, get_url_file_size, url_get_file
|
||||
|
||||
from tests.functional import MagicMock, patch
|
||||
from tests.helpers.testmixin import TestMixin
|
||||
|
||||
|
||||
class TestUtils(TestCase):
|
||||
class TestHttpUtils(TestCase, TestMixin):
|
||||
|
||||
"""
|
||||
A test suite to test out various methods around the AppLocation class.
|
||||
A test suite to test out various http helper functions.
|
||||
"""
|
||||
def setUp(self):
|
||||
self.tempfile = os.path.join(tempfile.gettempdir(), 'testfile')
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.isfile(self.tempfile):
|
||||
os.remove(self.tempfile)
|
||||
|
||||
def test_get_user_agent_linux(self):
|
||||
"""
|
||||
Test that getting a user agent on Linux returns a user agent suitable for Linux
|
||||
"""
|
||||
with patch('openlp.core.lib.webpagereader.sys') as mocked_sys:
|
||||
with patch('openlp.core.common.httputils.sys') as mocked_sys:
|
||||
|
||||
# GIVEN: The system is Linux
|
||||
mocked_sys.platform = 'linux2'
|
||||
|
||||
# WHEN: We call _get_user_agent()
|
||||
user_agent = _get_user_agent()
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
result = 'Linux' in user_agent or 'CrOS' in user_agent
|
||||
@ -53,13 +65,13 @@ class TestUtils(TestCase):
|
||||
"""
|
||||
Test that getting a user agent on Windows returns a user agent suitable for Windows
|
||||
"""
|
||||
with patch('openlp.core.lib.webpagereader.sys') as mocked_sys:
|
||||
with patch('openlp.core.common.httputils.sys') as mocked_sys:
|
||||
|
||||
# GIVEN: The system is Linux
|
||||
mocked_sys.platform = 'win32'
|
||||
|
||||
# WHEN: We call _get_user_agent()
|
||||
user_agent = _get_user_agent()
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
self.assertIn('Windows', user_agent, 'The user agent should be a valid Windows user agent')
|
||||
@ -68,13 +80,13 @@ class TestUtils(TestCase):
|
||||
"""
|
||||
Test that getting a user agent on OS X returns a user agent suitable for OS X
|
||||
"""
|
||||
with patch('openlp.core.lib.webpagereader.sys') as mocked_sys:
|
||||
with patch('openlp.core.common.httputils.sys') as mocked_sys:
|
||||
|
||||
# GIVEN: The system is Linux
|
||||
mocked_sys.platform = 'darwin'
|
||||
|
||||
# WHEN: We call _get_user_agent()
|
||||
user_agent = _get_user_agent()
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
self.assertIn('Mac OS X', user_agent, 'The user agent should be a valid OS X user agent')
|
||||
@ -83,13 +95,13 @@ class TestUtils(TestCase):
|
||||
"""
|
||||
Test that getting a user agent on a non-Linux/Windows/OS X platform returns the default user agent
|
||||
"""
|
||||
with patch('openlp.core.lib.webpagereader.sys') as mocked_sys:
|
||||
with patch('openlp.core.common.httputils.sys') as mocked_sys:
|
||||
|
||||
# GIVEN: The system is Linux
|
||||
mocked_sys.platform = 'freebsd'
|
||||
|
||||
# WHEN: We call _get_user_agent()
|
||||
user_agent = _get_user_agent()
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
self.assertIn('NetBSD', user_agent, 'The user agent should be the default user agent')
|
||||
@ -111,9 +123,9 @@ class TestUtils(TestCase):
|
||||
"""
|
||||
Test that the get_web_page method works correctly
|
||||
"""
|
||||
with patch('openlp.core.lib.webpagereader.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.lib.webpagereader.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.lib.webpagereader._get_user_agent') as mock_get_user_agent, \
|
||||
with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent, \
|
||||
patch('openlp.core.common.Registry') as MockRegistry:
|
||||
# GIVEN: Mocked out objects and a fake URL
|
||||
mocked_request_object = MagicMock()
|
||||
@ -141,9 +153,9 @@ class TestUtils(TestCase):
|
||||
"""
|
||||
Test that adding a header to the call to get_web_page() adds the header to the request
|
||||
"""
|
||||
with patch('openlp.core.lib.webpagereader.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.lib.webpagereader.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.lib.webpagereader._get_user_agent') as mock_get_user_agent:
|
||||
with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent:
|
||||
# GIVEN: Mocked out objects, a fake URL and a fake header
|
||||
mocked_request_object = MagicMock()
|
||||
MockRequest.return_value = mocked_request_object
|
||||
@ -170,9 +182,9 @@ class TestUtils(TestCase):
|
||||
"""
|
||||
Test that adding a user agent in the header when calling get_web_page() adds that user agent to the request
|
||||
"""
|
||||
with patch('openlp.core.lib.webpagereader.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.lib.webpagereader.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.lib.webpagereader._get_user_agent') as mock_get_user_agent:
|
||||
with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent:
|
||||
# GIVEN: Mocked out objects, a fake URL and a fake header
|
||||
mocked_request_object = MagicMock()
|
||||
MockRequest.return_value = mocked_request_object
|
||||
@ -189,7 +201,7 @@ class TestUtils(TestCase):
|
||||
mocked_request_object.add_header.assert_called_with(user_agent_header[0], user_agent_header[1])
|
||||
self.assertEqual(1, mocked_request_object.add_header.call_count,
|
||||
'There should only be 1 call to add_header')
|
||||
self.assertEqual(0, mock_get_user_agent.call_count, '_get_user_agent should not have been called')
|
||||
self.assertEqual(0, mock_get_user_agent.call_count, 'get_user_agent should not have been called')
|
||||
mock_urlopen.assert_called_with(mocked_request_object, timeout=30)
|
||||
mocked_page_object.geturl.assert_called_with()
|
||||
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')
|
||||
@ -198,10 +210,10 @@ class TestUtils(TestCase):
|
||||
"""
|
||||
Test that passing "update_openlp" as true to get_web_page calls Registry().get('app').process_events()
|
||||
"""
|
||||
with patch('openlp.core.lib.webpagereader.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.lib.webpagereader.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.lib.webpagereader._get_user_agent') as mock_get_user_agent, \
|
||||
patch('openlp.core.lib.webpagereader.Registry') as MockRegistry:
|
||||
with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent, \
|
||||
patch('openlp.core.common.httputils.Registry') as MockRegistry:
|
||||
# GIVEN: Mocked out objects, a fake URL
|
||||
mocked_request_object = MagicMock()
|
||||
MockRequest.return_value = mocked_request_object
|
||||
@ -227,3 +239,36 @@ class TestUtils(TestCase):
|
||||
mocked_registry_object.get.assert_called_with('application')
|
||||
mocked_application_object.process_events.assert_called_with()
|
||||
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')
|
||||
|
||||
def test_get_url_file_size(self):
|
||||
"""
|
||||
Test that passing "update_openlp" as true to get_web_page calls Registry().get('app').process_events()
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent:
|
||||
# GIVEN: Mocked out objects, a fake URL
|
||||
mocked_page_object = MagicMock()
|
||||
mock_urlopen.return_value = mocked_page_object
|
||||
mock_get_user_agent.return_value = 'user_agent'
|
||||
fake_url = 'this://is.a.fake/url'
|
||||
|
||||
# WHEN: The get_url_file_size() method is called
|
||||
size = get_url_file_size(fake_url)
|
||||
|
||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||
mock_urlopen.assert_called_with(fake_url, timeout=30)
|
||||
|
||||
@patch('openlp.core.ui.firsttimeform.urllib.request.urlopen')
|
||||
def test_socket_timeout(self, mocked_urlopen):
|
||||
"""
|
||||
Test socket timeout gets caught
|
||||
"""
|
||||
# GIVEN: Mocked urlopen to fake a network disconnect in the middle of a download
|
||||
mocked_urlopen.side_effect = socket.timeout()
|
||||
|
||||
# WHEN: Attempt to retrieve a file
|
||||
url_get_file(MagicMock(), url='http://localhost/test', f_path=self.tempfile)
|
||||
|
||||
# THEN: socket.timeout should have been caught
|
||||
# NOTE: Test is if $tmpdir/tempfile is still there, then test fails since ftw deletes bad downloaded files
|
||||
self.assertFalse(os.path.exists(self.tempfile), 'FTW url_get_file should have caught socket.timeout')
|
@ -31,7 +31,7 @@ import urllib.parse
|
||||
from tests.functional import patch
|
||||
from tests.helpers.testmixin import TestMixin
|
||||
|
||||
from openlp.core.lib.webpagereader import CONNECTION_RETRIES, get_web_page
|
||||
from openlp.core.common.httputils import CONNECTION_RETRIES, get_web_page
|
||||
|
||||
|
||||
class TestFirstTimeWizard(TestMixin, TestCase):
|
||||
|
@ -23,7 +23,6 @@
|
||||
Package to test the openlp.core.ui.firsttimeform package.
|
||||
"""
|
||||
import os
|
||||
import socket
|
||||
import tempfile
|
||||
import urllib
|
||||
from unittest import TestCase
|
||||
@ -236,20 +235,3 @@ class TestFirstTimeForm(TestCase, TestMixin):
|
||||
# THEN: the critical_error_message_box should have been called
|
||||
self.assertEquals(mocked_message_box.mock_calls[1][1][0], 'Network Error 407',
|
||||
'first_time_form should have caught Network Error')
|
||||
|
||||
@patch('openlp.core.ui.firsttimeform.urllib.request.urlopen')
|
||||
def test_socket_timeout(self, mocked_urlopen):
|
||||
"""
|
||||
Test socket timeout gets caught
|
||||
"""
|
||||
# GIVEN: Mocked urlopen to fake a network disconnect in the middle of a download
|
||||
first_time_form = FirstTimeForm(None)
|
||||
first_time_form.initialize(MagicMock())
|
||||
mocked_urlopen.side_effect = socket.timeout()
|
||||
|
||||
# WHEN: Attempt to retrieve a file
|
||||
first_time_form.url_get_file(url='http://localhost/test', f_path=self.tempfile)
|
||||
|
||||
# THEN: socket.timeout should have been caught
|
||||
# NOTE: Test is if $tmpdir/tempfile is still there, then test fails since ftw deletes bad downloaded files
|
||||
self.assertFalse(os.path.exists(self.tempfile), 'FTW url_get_file should have caught socket.timeout')
|
||||
|
Loading…
Reference in New Issue
Block a user