forked from openlp/openlp
Add files
This commit is contained in:
parent
4008ed008f
commit
95e70465de
203
openlp/core/common/httputils.py
Normal file
203
openlp/core/common/httputils.py
Normal file
@ -0,0 +1,203 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
|
||||
|
||||
###############################################################################
|
||||
# OpenLP - Open Source Lyrics Projection #
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Copyright (c) 2008-2016 OpenLP Developers #
|
||||
# --------------------------------------------------------------------------- #
|
||||
# This program is free software; you can redistribute it and/or modify it #
|
||||
# under the terms of the GNU General Public License as published by the Free #
|
||||
# Software Foundation; version 2 of the License. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT #
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
|
||||
# more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License along #
|
||||
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
|
||||
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
|
||||
###############################################################################
|
||||
"""
|
||||
The :mod:`openlp.core.utils` module provides the utility libraries for OpenLP.
|
||||
"""
|
||||
import logging
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from http.client import HTTPException
|
||||
from random import randint
|
||||
|
||||
from openlp.core.common import Registry
|
||||
|
||||
log = logging.getLogger(__name__ + '.__init__')
|
||||
|
||||
USER_AGENTS = {
|
||||
'win32': [
|
||||
'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36',
|
||||
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36',
|
||||
'Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.71 Safari/537.36'
|
||||
],
|
||||
'darwin': [
|
||||
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_3) AppleWebKit/537.31 (KHTML, like Gecko) '
|
||||
'Chrome/26.0.1410.43 Safari/537.31',
|
||||
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/536.11 (KHTML, like Gecko) '
|
||||
'Chrome/20.0.1132.57 Safari/536.11',
|
||||
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_8) AppleWebKit/536.11 (KHTML, like Gecko) '
|
||||
'Chrome/20.0.1132.47 Safari/536.11',
|
||||
],
|
||||
'linux2': [
|
||||
'Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.22 (KHTML, like Gecko) Ubuntu Chromium/25.0.1364.160 '
|
||||
'Chrome/25.0.1364.160 Safari/537.22',
|
||||
'Mozilla/5.0 (X11; CrOS armv7l 2913.260.0) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.99 '
|
||||
'Safari/537.11',
|
||||
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.27 (KHTML, like Gecko) Chrome/26.0.1389.0 Safari/537.27'
|
||||
],
|
||||
'default': [
|
||||
'Mozilla/5.0 (X11; NetBSD amd64; rv:18.0) Gecko/20130120 Firefox/18.0'
|
||||
]
|
||||
}
|
||||
CONNECTION_TIMEOUT = 30
|
||||
CONNECTION_RETRIES = 2
|
||||
|
||||
|
||||
class HTTPRedirectHandlerFixed(urllib.request.HTTPRedirectHandler):
|
||||
"""
|
||||
Special HTTPRedirectHandler used to work around http://bugs.python.org/issue22248
|
||||
(Redirecting to urls with special chars)
|
||||
"""
|
||||
def redirect_request(self, req, fp, code, msg, headers, new_url):
|
||||
#
|
||||
"""
|
||||
Test if the new_url can be decoded to ascii
|
||||
|
||||
:param req:
|
||||
:param fp:
|
||||
:param code:
|
||||
:param msg:
|
||||
:param headers:
|
||||
:param new_url:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
new_url.encode('latin1').decode('ascii')
|
||||
fixed_url = new_url
|
||||
except Exception:
|
||||
# The url could not be decoded to ascii, so we do some url encoding
|
||||
fixed_url = urllib.parse.quote(new_url.encode('latin1').decode('utf-8', 'replace'), safe='/:')
|
||||
return super(HTTPRedirectHandlerFixed, self).redirect_request(req, fp, code, msg, headers, fixed_url)
|
||||
|
||||
|
||||
def get_user_agent():
|
||||
"""
|
||||
Return a user agent customised for the platform the user is on.
|
||||
"""
|
||||
browser_list = USER_AGENTS.get(sys.platform, None)
|
||||
if not browser_list:
|
||||
browser_list = USER_AGENTS['default']
|
||||
random_index = randint(0, len(browser_list) - 1)
|
||||
return browser_list[random_index]
|
||||
|
||||
|
||||
def get_web_page(url, header=None, update_openlp=False):
|
||||
"""
|
||||
Attempts to download the webpage at url and returns that page or None.
|
||||
|
||||
:param url: The URL to be downloaded.
|
||||
:param header: An optional HTTP header to pass in the request to the web server.
|
||||
:param update_openlp: Tells OpenLP to update itself if the page is successfully downloaded.
|
||||
Defaults to False.
|
||||
"""
|
||||
# TODO: Add proxy usage. Get proxy info from OpenLP settings, add to a
|
||||
# proxy_handler, build into an opener and install the opener into urllib2.
|
||||
# http://docs.python.org/library/urllib2.html
|
||||
if not url:
|
||||
return None
|
||||
# This is needed to work around http://bugs.python.org/issue22248 and https://bugs.launchpad.net/openlp/+bug/1251437
|
||||
opener = urllib.request.build_opener(HTTPRedirectHandlerFixed())
|
||||
urllib.request.install_opener(opener)
|
||||
req = urllib.request.Request(url)
|
||||
if not header or header[0].lower() != 'user-agent':
|
||||
user_agent = get_user_agent()
|
||||
req.add_header('User-Agent', user_agent)
|
||||
if header:
|
||||
req.add_header(header[0], header[1])
|
||||
log.debug('Downloading URL = %s' % url)
|
||||
retries = 0
|
||||
while retries <= CONNECTION_RETRIES:
|
||||
retries += 1
|
||||
time.sleep(0.1)
|
||||
try:
|
||||
page = urllib.request.urlopen(req, timeout=CONNECTION_TIMEOUT)
|
||||
log.debug('Downloaded page {text}'.format(text=page.geturl()))
|
||||
break
|
||||
except urllib.error.URLError as err:
|
||||
log.exception('URLError on {text}'.format(text=url))
|
||||
log.exception('URLError: {text}'.format(text=err.reason))
|
||||
page = None
|
||||
if retries > CONNECTION_RETRIES:
|
||||
raise
|
||||
except socket.timeout:
|
||||
log.exception('Socket timeout: {text}'.format(text=url))
|
||||
page = None
|
||||
if retries > CONNECTION_RETRIES:
|
||||
raise
|
||||
except socket.gaierror:
|
||||
log.exception('Socket gaierror: {text}'.format(text=url))
|
||||
page = None
|
||||
if retries > CONNECTION_RETRIES:
|
||||
raise
|
||||
except ConnectionRefusedError:
|
||||
log.exception('ConnectionRefused: {text}'.format(text=url))
|
||||
page = None
|
||||
if retries > CONNECTION_RETRIES:
|
||||
raise
|
||||
break
|
||||
except ConnectionError:
|
||||
log.exception('Connection error: {text}'.format(text=url))
|
||||
page = None
|
||||
if retries > CONNECTION_RETRIES:
|
||||
raise
|
||||
except HTTPException:
|
||||
log.exception('HTTPException error: {text}'.format(text=url))
|
||||
page = None
|
||||
if retries > CONNECTION_RETRIES:
|
||||
raise
|
||||
except:
|
||||
# Don't know what's happening, so reraise the original
|
||||
raise
|
||||
if update_openlp:
|
||||
Registry().get('application').process_events()
|
||||
if not page:
|
||||
log.exception('{text} could not be downloaded'.format(text=url))
|
||||
return None
|
||||
log.debug(page)
|
||||
return page
|
||||
|
||||
|
||||
def get_url_file_size(url):
|
||||
"""
|
||||
Get the size of a file.
|
||||
|
||||
:param url: The URL of the file we want to download.
|
||||
"""
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
site = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
|
||||
meta = site.info()
|
||||
return int(meta.get("Content-Length"))
|
||||
except urllib.error.URLError:
|
||||
if retries > CONNECTION_RETRIES:
|
||||
raise
|
||||
else:
|
||||
retries += 1
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
|
||||
__all__ = ['get_web_page']
|
247
tests/functional/openlp_core_common/test_httputils.py
Normal file
247
tests/functional/openlp_core_common/test_httputils.py
Normal file
@ -0,0 +1,247 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
|
||||
|
||||
###############################################################################
|
||||
# OpenLP - Open Source Lyrics Projection #
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Copyright (c) 2008-2016 OpenLP Developers #
|
||||
# --------------------------------------------------------------------------- #
|
||||
# This program is free software; you can redistribute it and/or modify it #
|
||||
# under the terms of the GNU General Public License as published by the Free #
|
||||
# Software Foundation; version 2 of the License. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT #
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
|
||||
# more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License along #
|
||||
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
|
||||
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
|
||||
###############################################################################
|
||||
"""
|
||||
Functional tests to test the AppLocation class and related methods.
|
||||
"""
|
||||
from unittest import TestCase
|
||||
|
||||
from openlp.core.common.httputils import get_user_agent, get_web_page, get_url_file_size
|
||||
|
||||
from tests.functional import MagicMock, patch
|
||||
|
||||
|
||||
class TestHttpUtils(TestCase):
|
||||
"""
|
||||
A test suite to test out various methods around the AppLocation class.
|
||||
"""
|
||||
def test_get_user_agent_linux(self):
|
||||
"""
|
||||
Test that getting a user agent on Linux returns a user agent suitable for Linux
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.sys') as mocked_sys:
|
||||
|
||||
# GIVEN: The system is Linux
|
||||
mocked_sys.platform = 'linux2'
|
||||
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
result = 'Linux' in user_agent or 'CrOS' in user_agent
|
||||
self.assertTrue(result, 'The user agent should be a valid Linux user agent')
|
||||
|
||||
def test_get_user_agent_windows(self):
|
||||
"""
|
||||
Test that getting a user agent on Windows returns a user agent suitable for Windows
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.sys') as mocked_sys:
|
||||
|
||||
# GIVEN: The system is Linux
|
||||
mocked_sys.platform = 'win32'
|
||||
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
self.assertIn('Windows', user_agent, 'The user agent should be a valid Windows user agent')
|
||||
|
||||
def test_get_user_agent_macos(self):
|
||||
"""
|
||||
Test that getting a user agent on OS X returns a user agent suitable for OS X
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.sys') as mocked_sys:
|
||||
|
||||
# GIVEN: The system is Linux
|
||||
mocked_sys.platform = 'darwin'
|
||||
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
self.assertIn('Mac OS X', user_agent, 'The user agent should be a valid OS X user agent')
|
||||
|
||||
def test_get_user_agent_default(self):
|
||||
"""
|
||||
Test that getting a user agent on a non-Linux/Windows/OS X platform returns the default user agent
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.sys') as mocked_sys:
|
||||
|
||||
# GIVEN: The system is Linux
|
||||
mocked_sys.platform = 'freebsd'
|
||||
|
||||
# WHEN: We call get_user_agent()
|
||||
user_agent = get_user_agent()
|
||||
|
||||
# THEN: The user agent is a Linux (or ChromeOS) user agent
|
||||
self.assertIn('NetBSD', user_agent, 'The user agent should be the default user agent')
|
||||
|
||||
def test_get_web_page_no_url(self):
|
||||
"""
|
||||
Test that sending a URL of None to the get_web_page method returns None
|
||||
"""
|
||||
# GIVEN: A None url
|
||||
test_url = None
|
||||
|
||||
# WHEN: We try to get the test URL
|
||||
result = get_web_page(test_url)
|
||||
|
||||
# THEN: None should be returned
|
||||
self.assertIsNone(result, 'The return value of get_web_page should be None')
|
||||
|
||||
def test_get_web_page(self):
|
||||
"""
|
||||
Test that the get_web_page method works correctly
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent, \
|
||||
patch('openlp.core.common.Registry') as MockRegistry:
|
||||
# GIVEN: Mocked out objects and a fake URL
|
||||
mocked_request_object = MagicMock()
|
||||
MockRequest.return_value = mocked_request_object
|
||||
mocked_page_object = MagicMock()
|
||||
mock_urlopen.return_value = mocked_page_object
|
||||
mock_get_user_agent.return_value = 'user_agent'
|
||||
fake_url = 'this://is.a.fake/url'
|
||||
|
||||
# WHEN: The get_web_page() method is called
|
||||
returned_page = get_web_page(fake_url)
|
||||
|
||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||
MockRequest.assert_called_with(fake_url)
|
||||
mocked_request_object.add_header.assert_called_with('User-Agent', 'user_agent')
|
||||
self.assertEqual(1, mocked_request_object.add_header.call_count,
|
||||
'There should only be 1 call to add_header')
|
||||
mock_get_user_agent.assert_called_with()
|
||||
mock_urlopen.assert_called_with(mocked_request_object, timeout=30)
|
||||
mocked_page_object.geturl.assert_called_with()
|
||||
self.assertEqual(0, MockRegistry.call_count, 'The Registry() object should have never been called')
|
||||
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')
|
||||
|
||||
def test_get_web_page_with_header(self):
|
||||
"""
|
||||
Test that adding a header to the call to get_web_page() adds the header to the request
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent:
|
||||
# GIVEN: Mocked out objects, a fake URL and a fake header
|
||||
mocked_request_object = MagicMock()
|
||||
MockRequest.return_value = mocked_request_object
|
||||
mocked_page_object = MagicMock()
|
||||
mock_urlopen.return_value = mocked_page_object
|
||||
mock_get_user_agent.return_value = 'user_agent'
|
||||
fake_url = 'this://is.a.fake/url'
|
||||
fake_header = ('Fake-Header', 'fake value')
|
||||
|
||||
# WHEN: The get_web_page() method is called
|
||||
returned_page = get_web_page(fake_url, header=fake_header)
|
||||
|
||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||
MockRequest.assert_called_with(fake_url)
|
||||
mocked_request_object.add_header.assert_called_with(fake_header[0], fake_header[1])
|
||||
self.assertEqual(2, mocked_request_object.add_header.call_count,
|
||||
'There should only be 2 calls to add_header')
|
||||
mock_get_user_agent.assert_called_with()
|
||||
mock_urlopen.assert_called_with(mocked_request_object, timeout=30)
|
||||
mocked_page_object.geturl.assert_called_with()
|
||||
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')
|
||||
|
||||
def test_get_web_page_with_user_agent_in_headers(self):
|
||||
"""
|
||||
Test that adding a user agent in the header when calling get_web_page() adds that user agent to the request
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent:
|
||||
# GIVEN: Mocked out objects, a fake URL and a fake header
|
||||
mocked_request_object = MagicMock()
|
||||
MockRequest.return_value = mocked_request_object
|
||||
mocked_page_object = MagicMock()
|
||||
mock_urlopen.return_value = mocked_page_object
|
||||
fake_url = 'this://is.a.fake/url'
|
||||
user_agent_header = ('User-Agent', 'OpenLP/2.2.0')
|
||||
|
||||
# WHEN: The get_web_page() method is called
|
||||
returned_page = get_web_page(fake_url, header=user_agent_header)
|
||||
|
||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||
MockRequest.assert_called_with(fake_url)
|
||||
mocked_request_object.add_header.assert_called_with(user_agent_header[0], user_agent_header[1])
|
||||
self.assertEqual(1, mocked_request_object.add_header.call_count,
|
||||
'There should only be 1 call to add_header')
|
||||
self.assertEqual(0, mock_get_user_agent.call_count, 'get_user_agent should not have been called')
|
||||
mock_urlopen.assert_called_with(mocked_request_object, timeout=30)
|
||||
mocked_page_object.geturl.assert_called_with()
|
||||
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')
|
||||
|
||||
def test_get_web_page_update_openlp(self):
|
||||
"""
|
||||
Test that passing "update_openlp" as true to get_web_page calls Registry().get('app').process_events()
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \
|
||||
patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent, \
|
||||
patch('openlp.core.common.httputils.Registry') as MockRegistry:
|
||||
# GIVEN: Mocked out objects, a fake URL
|
||||
mocked_request_object = MagicMock()
|
||||
MockRequest.return_value = mocked_request_object
|
||||
mocked_page_object = MagicMock()
|
||||
mock_urlopen.return_value = mocked_page_object
|
||||
mock_get_user_agent.return_value = 'user_agent'
|
||||
mocked_registry_object = MagicMock()
|
||||
mocked_application_object = MagicMock()
|
||||
mocked_registry_object.get.return_value = mocked_application_object
|
||||
MockRegistry.return_value = mocked_registry_object
|
||||
fake_url = 'this://is.a.fake/url'
|
||||
|
||||
# WHEN: The get_web_page() method is called
|
||||
returned_page = get_web_page(fake_url, update_openlp=True)
|
||||
|
||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||
MockRequest.assert_called_with(fake_url)
|
||||
mocked_request_object.add_header.assert_called_with('User-Agent', 'user_agent')
|
||||
self.assertEqual(1, mocked_request_object.add_header.call_count,
|
||||
'There should only be 1 call to add_header')
|
||||
mock_urlopen.assert_called_with(mocked_request_object, timeout=30)
|
||||
mocked_page_object.geturl.assert_called_with()
|
||||
mocked_registry_object.get.assert_called_with('application')
|
||||
mocked_application_object.process_events.assert_called_with()
|
||||
self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object')
|
||||
|
||||
def test_get_url_file_size(self):
|
||||
"""
|
||||
Test that passing "update_openlp" as true to get_web_page calls Registry().get('app').process_events()
|
||||
"""
|
||||
with patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \
|
||||
patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent:
|
||||
# GIVEN: Mocked out objects, a fake URL
|
||||
mocked_page_object = MagicMock()
|
||||
mock_urlopen.return_value = mocked_page_object
|
||||
mock_get_user_agent.return_value = 'user_agent'
|
||||
fake_url = 'this://is.a.fake/url'
|
||||
|
||||
# WHEN: The get_url_file_size() method is called
|
||||
size = get_url_file_size(fake_url)
|
||||
|
||||
# THEN: The correct methods are called with the correct arguments and a web page is returned
|
||||
mock_urlopen.assert_called_with(fake_url, timeout=30)
|
Loading…
Reference in New Issue
Block a user