From 95e70465de6737ddd4377b871874187bdb67c25f Mon Sep 17 00:00:00 2001 From: Tim Bentley Date: Tue, 20 Dec 2016 21:21:17 +0000 Subject: [PATCH] Add files --- openlp/core/common/httputils.py | 203 ++++++++++++++ .../openlp_core_common/test_httputils.py | 247 ++++++++++++++++++ 2 files changed, 450 insertions(+) create mode 100644 openlp/core/common/httputils.py create mode 100644 tests/functional/openlp_core_common/test_httputils.py diff --git a/openlp/core/common/httputils.py b/openlp/core/common/httputils.py new file mode 100644 index 000000000..cab3ffb49 --- /dev/null +++ b/openlp/core/common/httputils.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4 + +############################################################################### +# OpenLP - Open Source Lyrics Projection # +# --------------------------------------------------------------------------- # +# Copyright (c) 2008-2016 OpenLP Developers # +# --------------------------------------------------------------------------- # +# This program is free software; you can redistribute it and/or modify it # +# under the terms of the GNU General Public License as published by the Free # +# Software Foundation; version 2 of the License. # +# # +# This program is distributed in the hope that it will be useful, but WITHOUT # +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # +# more details. # +# # +# You should have received a copy of the GNU General Public License along # +# with this program; if not, write to the Free Software Foundation, Inc., 59 # +# Temple Place, Suite 330, Boston, MA 02111-1307 USA # +############################################################################### +""" +The :mod:`openlp.core.utils` module provides the utility libraries for OpenLP. +""" +import logging +import socket +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from http.client import HTTPException +from random import randint + +from openlp.core.common import Registry + +log = logging.getLogger(__name__ + '.__init__') + +USER_AGENTS = { + 'win32': [ + 'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36', + 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36', + 'Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.71 Safari/537.36' + ], + 'darwin': [ + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_3) AppleWebKit/537.31 (KHTML, like Gecko) ' + 'Chrome/26.0.1410.43 Safari/537.31', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/536.11 (KHTML, like Gecko) ' + 'Chrome/20.0.1132.57 Safari/536.11', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_8) AppleWebKit/536.11 (KHTML, like Gecko) ' + 'Chrome/20.0.1132.47 Safari/536.11', + ], + 'linux2': [ + 'Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.22 (KHTML, like Gecko) Ubuntu Chromium/25.0.1364.160 ' + 'Chrome/25.0.1364.160 Safari/537.22', + 'Mozilla/5.0 (X11; CrOS armv7l 2913.260.0) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.99 ' + 'Safari/537.11', + 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.27 (KHTML, like Gecko) Chrome/26.0.1389.0 Safari/537.27' + ], + 'default': [ + 'Mozilla/5.0 (X11; NetBSD amd64; rv:18.0) Gecko/20130120 Firefox/18.0' + ] +} +CONNECTION_TIMEOUT = 30 +CONNECTION_RETRIES = 2 + + +class HTTPRedirectHandlerFixed(urllib.request.HTTPRedirectHandler): + """ + Special HTTPRedirectHandler used to work around http://bugs.python.org/issue22248 + (Redirecting to urls with special chars) + """ + def redirect_request(self, req, fp, code, msg, headers, new_url): + # + """ + Test if the new_url can be decoded to ascii + + :param req: + :param fp: + :param code: + :param msg: + :param headers: + :param new_url: + :return: + """ + try: + new_url.encode('latin1').decode('ascii') + fixed_url = new_url + except Exception: + # The url could not be decoded to ascii, so we do some url encoding + fixed_url = urllib.parse.quote(new_url.encode('latin1').decode('utf-8', 'replace'), safe='/:') + return super(HTTPRedirectHandlerFixed, self).redirect_request(req, fp, code, msg, headers, fixed_url) + + +def get_user_agent(): + """ + Return a user agent customised for the platform the user is on. + """ + browser_list = USER_AGENTS.get(sys.platform, None) + if not browser_list: + browser_list = USER_AGENTS['default'] + random_index = randint(0, len(browser_list) - 1) + return browser_list[random_index] + + +def get_web_page(url, header=None, update_openlp=False): + """ + Attempts to download the webpage at url and returns that page or None. + + :param url: The URL to be downloaded. + :param header: An optional HTTP header to pass in the request to the web server. + :param update_openlp: Tells OpenLP to update itself if the page is successfully downloaded. + Defaults to False. + """ + # TODO: Add proxy usage. Get proxy info from OpenLP settings, add to a + # proxy_handler, build into an opener and install the opener into urllib2. + # http://docs.python.org/library/urllib2.html + if not url: + return None + # This is needed to work around http://bugs.python.org/issue22248 and https://bugs.launchpad.net/openlp/+bug/1251437 + opener = urllib.request.build_opener(HTTPRedirectHandlerFixed()) + urllib.request.install_opener(opener) + req = urllib.request.Request(url) + if not header or header[0].lower() != 'user-agent': + user_agent = get_user_agent() + req.add_header('User-Agent', user_agent) + if header: + req.add_header(header[0], header[1]) + log.debug('Downloading URL = %s' % url) + retries = 0 + while retries <= CONNECTION_RETRIES: + retries += 1 + time.sleep(0.1) + try: + page = urllib.request.urlopen(req, timeout=CONNECTION_TIMEOUT) + log.debug('Downloaded page {text}'.format(text=page.geturl())) + break + except urllib.error.URLError as err: + log.exception('URLError on {text}'.format(text=url)) + log.exception('URLError: {text}'.format(text=err.reason)) + page = None + if retries > CONNECTION_RETRIES: + raise + except socket.timeout: + log.exception('Socket timeout: {text}'.format(text=url)) + page = None + if retries > CONNECTION_RETRIES: + raise + except socket.gaierror: + log.exception('Socket gaierror: {text}'.format(text=url)) + page = None + if retries > CONNECTION_RETRIES: + raise + except ConnectionRefusedError: + log.exception('ConnectionRefused: {text}'.format(text=url)) + page = None + if retries > CONNECTION_RETRIES: + raise + break + except ConnectionError: + log.exception('Connection error: {text}'.format(text=url)) + page = None + if retries > CONNECTION_RETRIES: + raise + except HTTPException: + log.exception('HTTPException error: {text}'.format(text=url)) + page = None + if retries > CONNECTION_RETRIES: + raise + except: + # Don't know what's happening, so reraise the original + raise + if update_openlp: + Registry().get('application').process_events() + if not page: + log.exception('{text} could not be downloaded'.format(text=url)) + return None + log.debug(page) + return page + + +def get_url_file_size(url): + """ + Get the size of a file. + + :param url: The URL of the file we want to download. + """ + retries = 0 + while True: + try: + site = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT) + meta = site.info() + return int(meta.get("Content-Length")) + except urllib.error.URLError: + if retries > CONNECTION_RETRIES: + raise + else: + retries += 1 + time.sleep(0.1) + continue + + +__all__ = ['get_web_page'] diff --git a/tests/functional/openlp_core_common/test_httputils.py b/tests/functional/openlp_core_common/test_httputils.py new file mode 100644 index 000000000..7a8d478c5 --- /dev/null +++ b/tests/functional/openlp_core_common/test_httputils.py @@ -0,0 +1,247 @@ +# -*- coding: utf-8 -*- +# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4 + +############################################################################### +# OpenLP - Open Source Lyrics Projection # +# --------------------------------------------------------------------------- # +# Copyright (c) 2008-2016 OpenLP Developers # +# --------------------------------------------------------------------------- # +# This program is free software; you can redistribute it and/or modify it # +# under the terms of the GNU General Public License as published by the Free # +# Software Foundation; version 2 of the License. # +# # +# This program is distributed in the hope that it will be useful, but WITHOUT # +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # +# more details. # +# # +# You should have received a copy of the GNU General Public License along # +# with this program; if not, write to the Free Software Foundation, Inc., 59 # +# Temple Place, Suite 330, Boston, MA 02111-1307 USA # +############################################################################### +""" +Functional tests to test the AppLocation class and related methods. +""" +from unittest import TestCase + +from openlp.core.common.httputils import get_user_agent, get_web_page, get_url_file_size + +from tests.functional import MagicMock, patch + + +class TestHttpUtils(TestCase): + """ + A test suite to test out various methods around the AppLocation class. + """ + def test_get_user_agent_linux(self): + """ + Test that getting a user agent on Linux returns a user agent suitable for Linux + """ + with patch('openlp.core.common.httputils.sys') as mocked_sys: + + # GIVEN: The system is Linux + mocked_sys.platform = 'linux2' + + # WHEN: We call get_user_agent() + user_agent = get_user_agent() + + # THEN: The user agent is a Linux (or ChromeOS) user agent + result = 'Linux' in user_agent or 'CrOS' in user_agent + self.assertTrue(result, 'The user agent should be a valid Linux user agent') + + def test_get_user_agent_windows(self): + """ + Test that getting a user agent on Windows returns a user agent suitable for Windows + """ + with patch('openlp.core.common.httputils.sys') as mocked_sys: + + # GIVEN: The system is Linux + mocked_sys.platform = 'win32' + + # WHEN: We call get_user_agent() + user_agent = get_user_agent() + + # THEN: The user agent is a Linux (or ChromeOS) user agent + self.assertIn('Windows', user_agent, 'The user agent should be a valid Windows user agent') + + def test_get_user_agent_macos(self): + """ + Test that getting a user agent on OS X returns a user agent suitable for OS X + """ + with patch('openlp.core.common.httputils.sys') as mocked_sys: + + # GIVEN: The system is Linux + mocked_sys.platform = 'darwin' + + # WHEN: We call get_user_agent() + user_agent = get_user_agent() + + # THEN: The user agent is a Linux (or ChromeOS) user agent + self.assertIn('Mac OS X', user_agent, 'The user agent should be a valid OS X user agent') + + def test_get_user_agent_default(self): + """ + Test that getting a user agent on a non-Linux/Windows/OS X platform returns the default user agent + """ + with patch('openlp.core.common.httputils.sys') as mocked_sys: + + # GIVEN: The system is Linux + mocked_sys.platform = 'freebsd' + + # WHEN: We call get_user_agent() + user_agent = get_user_agent() + + # THEN: The user agent is a Linux (or ChromeOS) user agent + self.assertIn('NetBSD', user_agent, 'The user agent should be the default user agent') + + def test_get_web_page_no_url(self): + """ + Test that sending a URL of None to the get_web_page method returns None + """ + # GIVEN: A None url + test_url = None + + # WHEN: We try to get the test URL + result = get_web_page(test_url) + + # THEN: None should be returned + self.assertIsNone(result, 'The return value of get_web_page should be None') + + def test_get_web_page(self): + """ + Test that the get_web_page method works correctly + """ + with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \ + patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \ + patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent, \ + patch('openlp.core.common.Registry') as MockRegistry: + # GIVEN: Mocked out objects and a fake URL + mocked_request_object = MagicMock() + MockRequest.return_value = mocked_request_object + mocked_page_object = MagicMock() + mock_urlopen.return_value = mocked_page_object + mock_get_user_agent.return_value = 'user_agent' + fake_url = 'this://is.a.fake/url' + + # WHEN: The get_web_page() method is called + returned_page = get_web_page(fake_url) + + # THEN: The correct methods are called with the correct arguments and a web page is returned + MockRequest.assert_called_with(fake_url) + mocked_request_object.add_header.assert_called_with('User-Agent', 'user_agent') + self.assertEqual(1, mocked_request_object.add_header.call_count, + 'There should only be 1 call to add_header') + mock_get_user_agent.assert_called_with() + mock_urlopen.assert_called_with(mocked_request_object, timeout=30) + mocked_page_object.geturl.assert_called_with() + self.assertEqual(0, MockRegistry.call_count, 'The Registry() object should have never been called') + self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object') + + def test_get_web_page_with_header(self): + """ + Test that adding a header to the call to get_web_page() adds the header to the request + """ + with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \ + patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \ + patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent: + # GIVEN: Mocked out objects, a fake URL and a fake header + mocked_request_object = MagicMock() + MockRequest.return_value = mocked_request_object + mocked_page_object = MagicMock() + mock_urlopen.return_value = mocked_page_object + mock_get_user_agent.return_value = 'user_agent' + fake_url = 'this://is.a.fake/url' + fake_header = ('Fake-Header', 'fake value') + + # WHEN: The get_web_page() method is called + returned_page = get_web_page(fake_url, header=fake_header) + + # THEN: The correct methods are called with the correct arguments and a web page is returned + MockRequest.assert_called_with(fake_url) + mocked_request_object.add_header.assert_called_with(fake_header[0], fake_header[1]) + self.assertEqual(2, mocked_request_object.add_header.call_count, + 'There should only be 2 calls to add_header') + mock_get_user_agent.assert_called_with() + mock_urlopen.assert_called_with(mocked_request_object, timeout=30) + mocked_page_object.geturl.assert_called_with() + self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object') + + def test_get_web_page_with_user_agent_in_headers(self): + """ + Test that adding a user agent in the header when calling get_web_page() adds that user agent to the request + """ + with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \ + patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \ + patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent: + # GIVEN: Mocked out objects, a fake URL and a fake header + mocked_request_object = MagicMock() + MockRequest.return_value = mocked_request_object + mocked_page_object = MagicMock() + mock_urlopen.return_value = mocked_page_object + fake_url = 'this://is.a.fake/url' + user_agent_header = ('User-Agent', 'OpenLP/2.2.0') + + # WHEN: The get_web_page() method is called + returned_page = get_web_page(fake_url, header=user_agent_header) + + # THEN: The correct methods are called with the correct arguments and a web page is returned + MockRequest.assert_called_with(fake_url) + mocked_request_object.add_header.assert_called_with(user_agent_header[0], user_agent_header[1]) + self.assertEqual(1, mocked_request_object.add_header.call_count, + 'There should only be 1 call to add_header') + self.assertEqual(0, mock_get_user_agent.call_count, 'get_user_agent should not have been called') + mock_urlopen.assert_called_with(mocked_request_object, timeout=30) + mocked_page_object.geturl.assert_called_with() + self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object') + + def test_get_web_page_update_openlp(self): + """ + Test that passing "update_openlp" as true to get_web_page calls Registry().get('app').process_events() + """ + with patch('openlp.core.common.httputils.urllib.request.Request') as MockRequest, \ + patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \ + patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent, \ + patch('openlp.core.common.httputils.Registry') as MockRegistry: + # GIVEN: Mocked out objects, a fake URL + mocked_request_object = MagicMock() + MockRequest.return_value = mocked_request_object + mocked_page_object = MagicMock() + mock_urlopen.return_value = mocked_page_object + mock_get_user_agent.return_value = 'user_agent' + mocked_registry_object = MagicMock() + mocked_application_object = MagicMock() + mocked_registry_object.get.return_value = mocked_application_object + MockRegistry.return_value = mocked_registry_object + fake_url = 'this://is.a.fake/url' + + # WHEN: The get_web_page() method is called + returned_page = get_web_page(fake_url, update_openlp=True) + + # THEN: The correct methods are called with the correct arguments and a web page is returned + MockRequest.assert_called_with(fake_url) + mocked_request_object.add_header.assert_called_with('User-Agent', 'user_agent') + self.assertEqual(1, mocked_request_object.add_header.call_count, + 'There should only be 1 call to add_header') + mock_urlopen.assert_called_with(mocked_request_object, timeout=30) + mocked_page_object.geturl.assert_called_with() + mocked_registry_object.get.assert_called_with('application') + mocked_application_object.process_events.assert_called_with() + self.assertEqual(mocked_page_object, returned_page, 'The returned page should be the mock object') + + def test_get_url_file_size(self): + """ + Test that passing "update_openlp" as true to get_web_page calls Registry().get('app').process_events() + """ + with patch('openlp.core.common.httputils.urllib.request.urlopen') as mock_urlopen, \ + patch('openlp.core.common.httputils.get_user_agent') as mock_get_user_agent: + # GIVEN: Mocked out objects, a fake URL + mocked_page_object = MagicMock() + mock_urlopen.return_value = mocked_page_object + mock_get_user_agent.return_value = 'user_agent' + fake_url = 'this://is.a.fake/url' + + # WHEN: The get_url_file_size() method is called + size = get_url_file_size(fake_url) + + # THEN: The correct methods are called with the correct arguments and a web page is returned + mock_urlopen.assert_called_with(fake_url, timeout=30)