Move url size

This commit is contained in:
Tim Bentley 2016-12-20 21:20:54 +00:00
parent 739f083286
commit 4008ed008f
5 changed files with 6 additions and 437 deletions

View File

@ -1,182 +0,0 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2016 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
The :mod:`openlp.core.utils` module provides the utility libraries for OpenLP.
"""
import logging
import socket
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from http.client import HTTPException
from random import randint
from openlp.core.common import Registry
log = logging.getLogger(__name__ + '.__init__')
USER_AGENTS = {
'win32': [
'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.71 Safari/537.36'
],
'darwin': [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_3) AppleWebKit/537.31 (KHTML, like Gecko) '
'Chrome/26.0.1410.43 Safari/537.31',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/536.11 (KHTML, like Gecko) '
'Chrome/20.0.1132.57 Safari/536.11',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_8) AppleWebKit/536.11 (KHTML, like Gecko) '
'Chrome/20.0.1132.47 Safari/536.11',
],
'linux2': [
'Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.22 (KHTML, like Gecko) Ubuntu Chromium/25.0.1364.160 '
'Chrome/25.0.1364.160 Safari/537.22',
'Mozilla/5.0 (X11; CrOS armv7l 2913.260.0) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.99 '
'Safari/537.11',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.27 (KHTML, like Gecko) Chrome/26.0.1389.0 Safari/537.27'
],
'default': [
'Mozilla/5.0 (X11; NetBSD amd64; rv:18.0) Gecko/20130120 Firefox/18.0'
]
}
CONNECTION_TIMEOUT = 30
CONNECTION_RETRIES = 2
class HTTPRedirectHandlerFixed(urllib.request.HTTPRedirectHandler):
"""
Special HTTPRedirectHandler used to work around http://bugs.python.org/issue22248
(Redirecting to urls with special chars)
"""
def redirect_request(self, req, fp, code, msg, headers, new_url):
#
"""
Test if the new_url can be decoded to ascii
:param req:
:param fp:
:param code:
:param msg:
:param headers:
:param new_url:
:return:
"""
try:
new_url.encode('latin1').decode('ascii')
fixed_url = new_url
except Exception:
# The url could not be decoded to ascii, so we do some url encoding
fixed_url = urllib.parse.quote(new_url.encode('latin1').decode('utf-8', 'replace'), safe='/:')
return super(HTTPRedirectHandlerFixed, self).redirect_request(req, fp, code, msg, headers, fixed_url)
def _get_user_agent():
"""
Return a user agent customised for the platform the user is on.
"""
browser_list = USER_AGENTS.get(sys.platform, None)
if not browser_list:
browser_list = USER_AGENTS['default']
random_index = randint(0, len(browser_list) - 1)
return browser_list[random_index]
def get_web_page(url, header=None, update_openlp=False):
"""
Attempts to download the webpage at url and returns that page or None.
:param url: The URL to be downloaded.
:param header: An optional HTTP header to pass in the request to the web server.
:param update_openlp: Tells OpenLP to update itself if the page is successfully downloaded.
Defaults to False.
"""
# TODO: Add proxy usage. Get proxy info from OpenLP settings, add to a
# proxy_handler, build into an opener and install the opener into urllib2.
# http://docs.python.org/library/urllib2.html
if not url:
return None
# This is needed to work around http://bugs.python.org/issue22248 and https://bugs.launchpad.net/openlp/+bug/1251437
opener = urllib.request.build_opener(HTTPRedirectHandlerFixed())
urllib.request.install_opener(opener)
req = urllib.request.Request(url)
if not header or header[0].lower() != 'user-agent':
user_agent = _get_user_agent()
req.add_header('User-Agent', user_agent)
if header:
req.add_header(header[0], header[1])
log.debug('Downloading URL = %s' % url)
retries = 0
while retries <= CONNECTION_RETRIES:
retries += 1
time.sleep(0.1)
try:
page = urllib.request.urlopen(req, timeout=CONNECTION_TIMEOUT)
log.debug('Downloaded page {text}'.format(text=page.geturl()))
break
except urllib.error.URLError as err:
log.exception('URLError on {text}'.format(text=url))
log.exception('URLError: {text}'.format(text=err.reason))
page = None
if retries > CONNECTION_RETRIES:
raise
except socket.timeout:
log.exception('Socket timeout: {text}'.format(text=url))
page = None
if retries > CONNECTION_RETRIES:
raise
except socket.gaierror:
log.exception('Socket gaierror: {text}'.format(text=url))
page = None
if retries > CONNECTION_RETRIES:
raise
except ConnectionRefusedError:
log.exception('ConnectionRefused: {text}'.format(text=url))
page = None
if retries > CONNECTION_RETRIES:
raise
break
except ConnectionError:
log.exception('Connection error: {text}'.format(text=url))
page = None
if retries > CONNECTION_RETRIES:
raise
except HTTPException:
log.exception('HTTPException error: {text}'.format(text=url))
page = None
if retries > CONNECTION_RETRIES:
raise
except:
# Don't know what's happening, so reraise the original
raise
if update_openlp:
Registry().get('application').process_events()
if not page:
log.exception('{text} could not be downloaded'.format(text=url))
return None
log.debug(page)
return page
__all__ = ['get_web_page']

View File

@ -39,7 +39,7 @@ from openlp.core.common import Registry, RegistryProperties, AppLocation, Settin
translate, clean_button_text, trace_error_handler
from openlp.core.lib import PluginStatus, build_icon
from openlp.core.lib.ui import critical_error_message_box
from openlp.core.lib.webpagereader import get_web_page, CONNECTION_RETRIES, CONNECTION_TIMEOUT
from openlp.core.common.httputils import get_web_page, get_url_file_size, CONNECTION_RETRIES, CONNECTION_TIMEOUT
from .firsttimewizard import UiFirstTimeWizard, FirstTimePage
log = logging.getLogger(__name__)
@ -455,26 +455,6 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
if item:
item.setIcon(build_icon(os.path.join(gettempdir(), 'openlp', screenshot)))
def _get_file_size(self, url):
"""
Get the size of a file.
:param url: The URL of the file we want to download.
"""
retries = 0
while True:
try:
site = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
meta = site.info()
return int(meta.get("Content-Length"))
except urllib.error.URLError:
if retries > CONNECTION_RETRIES:
raise
else:
retries += 1
time.sleep(0.1)
continue
def _download_progress(self, count, block_size):
"""
Calculate and display the download progress.
@ -510,7 +490,7 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
item = self.songs_list_widget.item(i)
if item.checkState() == QtCore.Qt.Checked:
filename, sha256 = item.data(QtCore.Qt.UserRole)
size = self._get_file_size('{path}{name}'.format(path=self.songs_url, name=filename))
size = get_url_file_size('{path}{name}'.format(path=self.songs_url, name=filename))
self.max_progress += size
# Loop through the Bibles list and increase for each selected item
iterator = QtWidgets.QTreeWidgetItemIterator(self.bibles_tree_widget)
@ -519,7 +499,7 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
item = iterator.value()
if item.parent() and item.checkState(0) == QtCore.Qt.Checked:
filename, sha256 = item.data(0, QtCore.Qt.UserRole)
size = self._get_file_size('{path}{name}'.format(path=self.bibles_url, name=filename))
size = get_url_file_size('{path}{name}'.format(path=self.bibles_url, name=filename))
self.max_progress += size
iterator += 1
# Loop through the themes list and increase for each selected item
@ -528,7 +508,7 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
item = self.themes_list_widget.item(i)
if item.checkState() == QtCore.Qt.Checked:
filename, sha256 = item.data(QtCore.Qt.UserRole)
size = self._get_file_size('{path}{name}'.format(path=self.themes_url, name=filename))
size = get_url_file_size('{path}{name}'.format(path=self.themes_url, name=filename))
self.max_progress += size
except urllib.error.URLError:
trace_error_handler(log)

View File

@ -32,7 +32,7 @@ from bs4 import BeautifulSoup, NavigableString, Tag
from openlp.core.common import Registry, RegistryProperties, translate
from openlp.core.lib.ui import critical_error_message_box
from openlp.core.lib.webpagereader import get_web_page
from openlp.core.common.httputils import get_web_page
from openlp.plugins.bibles.lib import SearchResults
from openlp.plugins.bibles.lib.bibleimport import BibleImport
from openlp.plugins.bibles.lib.db import BibleDB, BiblesResourcesDB, Book

View File

@ -1,229 +0,0 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2016 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
Functional tests to test the AppLocation class and related methods.
"""
from unittest import TestCase
from openlp.core.lib.webpagereader import _get_user_agent, get_web_page
from tests.functional import MagicMock, patch
class TestUtils(TestCase):
"""
A test suite to test out various methods around the AppLocation class.
"""
def test_get_user_agent_linux(self):
"""
Test that getting a user agent on Linux returns a user agent suitable for Linux
"""
with patch('openlp.core.lib.webpagereader.sys') as mocked_sys:
# GIVEN: The system is Linux
mocked_sys.platform = 'linux2'
# WHEN: We call _get_user_agent()
user_agent = _get_user_agent()
# THEN: The user agent is a Linux (or ChromeOS) user agent
result = 'Linux' in user_agent or 'CrOS' in user_agent
self.assertTrue(result, 'The user agent should be a valid Linux user agent')
def test_get_user_agent_windows(self):
"""
Test that getting a user agent on Windows returns a user agent suitable for Windows
"""
with patch('openlp.core.lib.webpagereader.sys') as mocked_sys:
# GIVEN: The system is Linux
mocked_sys.platform = 'win32'
# WHEN: We call _get_user_agent()
user_agent = _get_user_agent()
# THEN: The user agent is a Linux (or ChromeOS) user agent
self.assertIn('Windows', user_agent, 'The user agent should be a valid Windows user agent')
def test_get_user_agent_macos(self):
"""
Test that getting a user agent on OS X returns a user agent suitable for OS X
"""
with patch('openlp.core.lib.webpagereader.sys') as mocked_sys:
# GIVEN: The system is Linux
mocked_sys.platform = 'darwin'
# WHEN: We call _get_user_agent()
user_agent = _get_user_agent()
# THEN: The user agent is a Linux (or ChromeOS) user agent
self.assertIn('Mac OS X', user_agent, 'The user agent should be a valid OS X user agent')
def test_get_user_agent_default(self):
"""
Test that getting a user agent on a non-Linux/Windows/OS X platform returns the default user agent
"""
with patch('openlp.core.lib.webpagereader.sys') as mocked_sys:
# GIVEN: The system is Linux
mocked_sys.platform = 'freebsd'
# WHEN: We call _get_user_agent()
user_agent = _get_user_agent()
# THEN: The user agent is a Linux (or ChromeOS) user agent
self.assertIn('NetBSD', user_agent, 'The user agent should be the default user agent')
def test_get_web_page_no_url(self):
"""
Test that sending a URL of None to the get_web_page method returns None
"""
# GIVEN: A None url
test_url = None
# WHEN: We try to get the test URL
result = get_web_page(test_url)
# THEN: None should be returned
self.assertIsNone(result, 'The return value of get_web_page should be None')
def test_get_web_page(self):
"""
Test that the get_web_page method works correctly
"""
with patch('openlp.core.lib.webpagereader.urllib.request.Request') as MockRequest, \
patch('openlp.core.lib.webpagereader.urllib.request.urlopen') as mock_urlopen, \
patch('openlp.core.lib.webpagereader._get_user_agent') as mock_get_user_agent, \
patch('openlp.core.common.Registry') as MockRegistry:
# GIVEN: Mocked out objects and a fake URL
mocked_request_object = MagicMock()
MockRequest.return_value = mocked_request_object
mocked_page_object = MagicMock()
mock_urlopen.return_value = mocked_page_object
mock_get_user_agent.return_value = 'user_agent'
fake_url = 'this://is.a.fake/url'
# WHEN: The get_web_page() method is called
returned_page = get_web_page(fake_url)
# THEN: The correct methods are called with the correct arguments and a web page is returned
MockRequest.assert_called_with(fake_url)
mocked_request_object.add_header.assert_called_with('User-Agent', 'user_agent')
self.assertEqual(1, mocked_request_object.add_header.call_count,
'There should only be 1 call to add_header')
mock_get_user_agent.assert_called_with()
mock_urlopen.assert_called_with(mocked_request_object, timeout=30)
mocked_page_object.geturl.assert_called_with()
self.assertEqual(0, MockRegistry.call_count, 'The Registry() object should have never been called')
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')
def test_get_web_page_with_header(self):
"""
Test that adding a header to the call to get_web_page() adds the header to the request
"""
with patch('openlp.core.lib.webpagereader.urllib.request.Request') as MockRequest, \
patch('openlp.core.lib.webpagereader.urllib.request.urlopen') as mock_urlopen, \
patch('openlp.core.lib.webpagereader._get_user_agent') as mock_get_user_agent:
# GIVEN: Mocked out objects, a fake URL and a fake header
mocked_request_object = MagicMock()
MockRequest.return_value = mocked_request_object
mocked_page_object = MagicMock()
mock_urlopen.return_value = mocked_page_object
mock_get_user_agent.return_value = 'user_agent'
fake_url = 'this://is.a.fake/url'
fake_header = ('Fake-Header', 'fake value')
# WHEN: The get_web_page() method is called
returned_page = get_web_page(fake_url, header=fake_header)
# THEN: The correct methods are called with the correct arguments and a web page is returned
MockRequest.assert_called_with(fake_url)
mocked_request_object.add_header.assert_called_with(fake_header[0], fake_header[1])
self.assertEqual(2, mocked_request_object.add_header.call_count,
'There should only be 2 calls to add_header')
mock_get_user_agent.assert_called_with()
mock_urlopen.assert_called_with(mocked_request_object, timeout=30)
mocked_page_object.geturl.assert_called_with()
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')
def test_get_web_page_with_user_agent_in_headers(self):
"""
Test that adding a user agent in the header when calling get_web_page() adds that user agent to the request
"""
with patch('openlp.core.lib.webpagereader.urllib.request.Request') as MockRequest, \
patch('openlp.core.lib.webpagereader.urllib.request.urlopen') as mock_urlopen, \
patch('openlp.core.lib.webpagereader._get_user_agent') as mock_get_user_agent:
# GIVEN: Mocked out objects, a fake URL and a fake header
mocked_request_object = MagicMock()
MockRequest.return_value = mocked_request_object
mocked_page_object = MagicMock()
mock_urlopen.return_value = mocked_page_object
fake_url = 'this://is.a.fake/url'
user_agent_header = ('User-Agent', 'OpenLP/2.2.0')
# WHEN: The get_web_page() method is called
returned_page = get_web_page(fake_url, header=user_agent_header)
# THEN: The correct methods are called with the correct arguments and a web page is returned
MockRequest.assert_called_with(fake_url)
mocked_request_object.add_header.assert_called_with(user_agent_header[0], user_agent_header[1])
self.assertEqual(1, mocked_request_object.add_header.call_count,
'There should only be 1 call to add_header')
self.assertEqual(0, mock_get_user_agent.call_count, '_get_user_agent should not have been called')
mock_urlopen.assert_called_with(mocked_request_object, timeout=30)
mocked_page_object.geturl.assert_called_with()
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')
def test_get_web_page_update_openlp(self):
"""
Test that passing "update_openlp" as true to get_web_page calls Registry().get('app').process_events()
"""
with patch('openlp.core.lib.webpagereader.urllib.request.Request') as MockRequest, \
patch('openlp.core.lib.webpagereader.urllib.request.urlopen') as mock_urlopen, \
patch('openlp.core.lib.webpagereader._get_user_agent') as mock_get_user_agent, \
patch('openlp.core.lib.webpagereader.Registry') as MockRegistry:
# GIVEN: Mocked out objects, a fake URL
mocked_request_object = MagicMock()
MockRequest.return_value = mocked_request_object
mocked_page_object = MagicMock()
mock_urlopen.return_value = mocked_page_object
mock_get_user_agent.return_value = 'user_agent'
mocked_registry_object = MagicMock()
mocked_application_object = MagicMock()
mocked_registry_object.get.return_value = mocked_application_object
MockRegistry.return_value = mocked_registry_object
fake_url = 'this://is.a.fake/url'
# WHEN: The get_web_page() method is called
returned_page = get_web_page(fake_url, update_openlp=True)
# THEN: The correct methods are called with the correct arguments and a web page is returned
MockRequest.assert_called_with(fake_url)
mocked_request_object.add_header.assert_called_with('User-Agent', 'user_agent')
self.assertEqual(1, mocked_request_object.add_header.call_count,
'There should only be 1 call to add_header')
mock_urlopen.assert_called_with(mocked_request_object, timeout=30)
mocked_page_object.geturl.assert_called_with()
mocked_registry_object.get.assert_called_with('application')
mocked_application_object.process_events.assert_called_with()
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')

View File

@ -31,7 +31,7 @@ import urllib.parse
from tests.functional import patch
from tests.helpers.testmixin import TestMixin
from openlp.core.lib.webpagereader import CONNECTION_RETRIES, get_web_page
from openlp.core.common.httputils import CONNECTION_RETRIES, get_web_page
class TestFirstTimeWizard(TestMixin, TestCase):