forked from openlp/openlp
More cleanups
This commit is contained in:
parent
b868ecc57d
commit
539ed55777
@ -134,11 +134,12 @@ log = logging.getLogger(__name__)
|
||||
class HttpRouter(object):
|
||||
"""
|
||||
This code is called by the HttpServer upon a request and it processes it based on the routing table.
|
||||
This code is stateless so need
|
||||
This code is stateless and is created on each request.
|
||||
Some variables may look incorrect but this extends BaseHTTPRequestHandler.
|
||||
"""
|
||||
def initialise(self):
|
||||
"""
|
||||
Initialise the router stack and any other varables.
|
||||
Initialise the router stack and any other variables.
|
||||
"""
|
||||
authcode = "%s:%s" % (Settings().value('remotes/user id'), Settings().value('remotes/password'))
|
||||
try:
|
||||
@ -168,7 +169,43 @@ class HttpRouter(object):
|
||||
self.translate()
|
||||
self.html_dir = os.path.join(AppLocation.get_directory(AppLocation.PluginsDir), 'remotes', 'html')
|
||||
|
||||
def do_post_processor(self):
|
||||
"""
|
||||
Handle the POST amd GET requests placed on the server.
|
||||
"""
|
||||
if self.path == '/favicon.ico':
|
||||
return
|
||||
if not hasattr(self, 'auth'):
|
||||
self.initialise()
|
||||
function, args = self.process_http_request(self.path)
|
||||
if not function:
|
||||
self.do_http_error()
|
||||
return
|
||||
self.authorised = self.headers['Authorization'] is None
|
||||
if function['secure'] and Settings().value(self.settings_section + '/authentication enabled'):
|
||||
if self.headers['Authorization'] is None:
|
||||
self.do_authorisation()
|
||||
self.wfile.write(bytes('no auth header received', 'UTF-8'))
|
||||
elif self.headers['Authorization'] == 'Basic %s' % self.auth:
|
||||
self.do_http_success()
|
||||
self.call_function(function, *args)
|
||||
else:
|
||||
self.do_authorisation()
|
||||
self.wfile.write(bytes(self.headers['Authorization'], 'UTF-8'))
|
||||
self.wfile.write(bytes(' not authenticated', 'UTF-8'))
|
||||
else:
|
||||
self.call_function(function, *args)
|
||||
|
||||
def call_function(self, function, *args):
|
||||
"""
|
||||
Invoke the route function passing the relevant values
|
||||
|
||||
``function``
|
||||
The function to be calledL.
|
||||
|
||||
``*args``
|
||||
Any passed data.
|
||||
"""
|
||||
response = function['function'](*args)
|
||||
if response:
|
||||
self.wfile.write(response)
|
||||
@ -200,22 +237,34 @@ class HttpRouter(object):
|
||||
return None, None
|
||||
|
||||
def do_http_success(self):
|
||||
"""
|
||||
Create a success http header.
|
||||
"""
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'text/html')
|
||||
self.end_headers()
|
||||
|
||||
def do_http_error(self):
|
||||
"""
|
||||
Create a error http header.
|
||||
"""
|
||||
self.send_response(404)
|
||||
self.send_header('Content-type', 'text/html')
|
||||
self.end_headers()
|
||||
|
||||
def do_authorisation(self):
|
||||
"""
|
||||
Create a needs authorisation http header.
|
||||
"""
|
||||
self.send_response(401)
|
||||
self.send_header('WWW-Authenticate', 'Basic realm=\"Test\"')
|
||||
self.send_header('Content-type', 'text/html')
|
||||
self.end_headers()
|
||||
|
||||
def do_not_found(self):
|
||||
"""
|
||||
Create a not found http header.
|
||||
"""
|
||||
self.send_response(404)
|
||||
self.send_header('Content-type', 'text/html')
|
||||
self.end_headers()
|
||||
|
@ -64,34 +64,13 @@ class CustomHandler(BaseHTTPRequestHandler, HttpRouter):
|
||||
"""
|
||||
Present pages / data and invoke URL level user authentication.
|
||||
"""
|
||||
self.do_GET()
|
||||
self.do_post_processor()
|
||||
|
||||
def do_GET(self):
|
||||
"""
|
||||
Present pages / data and invoke URL level user authentication.
|
||||
"""
|
||||
if self.path == '/favicon.ico':
|
||||
return
|
||||
if not hasattr(self, 'auth'):
|
||||
self.initialise()
|
||||
function, args = self.process_http_request(self.path)
|
||||
if not function:
|
||||
self.do_http_error()
|
||||
return
|
||||
self.authorised = self.headers['Authorization'] is None
|
||||
if function['secure'] and Settings().value(self.settings_section + '/authentication enabled'):
|
||||
if self.headers['Authorization'] is None:
|
||||
self.do_authorisation()
|
||||
self.wfile.write(bytes('no auth header received', 'UTF-8'))
|
||||
elif self.headers['Authorization'] == 'Basic %s' % self.auth:
|
||||
self.do_http_success()
|
||||
self.call_function(function, *args)
|
||||
else:
|
||||
self.do_authorisation()
|
||||
self.wfile.write(bytes(self.headers['Authorization'], 'UTF-8'))
|
||||
self.wfile.write(bytes(' not authenticated', 'UTF-8'))
|
||||
else:
|
||||
self.call_function(function, *args)
|
||||
self.do_post_processor()
|
||||
|
||||
|
||||
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
|
||||
@ -148,7 +127,7 @@ class OpenLPServer():
|
||||
"""
|
||||
Stop the server
|
||||
"""
|
||||
self.httpd.socket.close()
|
||||
self.http_thread.exit(0)
|
||||
self.httpd = None
|
||||
log.debug('Stopped the server.')
|
||||
|
||||
|
@ -8,7 +8,7 @@ from tempfile import mkstemp
|
||||
from mock import MagicMock
|
||||
|
||||
from openlp.core.lib import Settings
|
||||
from openlp.plugins.remotes.lib.httpserver import HttpRouter, fetch_password, make_sha_hash
|
||||
from openlp.plugins.remotes.lib.httpserver import HttpRouter
|
||||
from PyQt4 import QtGui
|
||||
|
||||
__default_settings__ = {
|
||||
@ -44,40 +44,22 @@ class TestRouter(TestCase):
|
||||
del self.application
|
||||
os.unlink(self.ini_file)
|
||||
|
||||
def fetch_password_unknown_test(self):
|
||||
def password_encrypter_test(self):
|
||||
"""
|
||||
Test the fetch password code with an unknown userid
|
||||
Test hash userid and password function
|
||||
"""
|
||||
# GIVEN: A default configuration
|
||||
# WHEN: called with the defined userid
|
||||
password = fetch_password('itwinkle')
|
||||
Settings().setValue('remotes/user id', 'openlp')
|
||||
Settings().setValue('remotes/password', 'password')
|
||||
|
||||
# THEN: the function should return None
|
||||
self.assertEqual(password, None, 'The result for fetch_password should be None')
|
||||
|
||||
def fetch_password_known_test(self):
|
||||
"""
|
||||
Test the fetch password code with the defined userid
|
||||
"""
|
||||
# GIVEN: A default configuration
|
||||
# WHEN: called with the defined userid
|
||||
password = fetch_password('openlp')
|
||||
required_password = make_sha_hash('password')
|
||||
router = HttpRouter()
|
||||
router.initialise()
|
||||
test_value = 'b3BlbmxwOnBhc3N3b3Jk'
|
||||
print(router.auth)
|
||||
|
||||
# THEN: the function should return the correct password
|
||||
self.assertEqual(password, required_password, 'The result for fetch_password should be the defined password')
|
||||
|
||||
def sha_password_encrypter_test(self):
|
||||
"""
|
||||
Test hash password function
|
||||
"""
|
||||
# GIVEN: A default configuration
|
||||
# WHEN: called with the defined userid
|
||||
required_password = make_sha_hash('password')
|
||||
test_value = '5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8'
|
||||
|
||||
# THEN: the function should return the correct password
|
||||
self.assertEqual(required_password, test_value,
|
||||
self.assertEqual(router.auth, test_value,
|
||||
'The result for make_sha_hash should return the correct encrypted password')
|
||||
|
||||
def process_http_request_test(self):
|
||||
@ -85,15 +67,18 @@ class TestRouter(TestCase):
|
||||
Test the router control functionality
|
||||
"""
|
||||
# GIVEN: A testing set of Routes
|
||||
router = HttpRouter()
|
||||
mocked_function = MagicMock()
|
||||
test_route = [
|
||||
(r'^/stage/api/poll$', mocked_function),
|
||||
(r'^/stage/api/poll$', {'function': mocked_function, 'secure': False}),
|
||||
]
|
||||
self.router.routes = test_route
|
||||
router.routes = test_route
|
||||
|
||||
# WHEN: called with a poll route
|
||||
self.router.process_http_request('/stage/api/poll', None)
|
||||
function, args = router.process_http_request('/stage/api/poll', None)
|
||||
|
||||
# THEN: the function should have been called only once
|
||||
assert mocked_function.call_count == 1, \
|
||||
'The mocked function should have been matched and called once.'
|
||||
assert function['function'] == mocked_function, \
|
||||
'The mocked function should match defined value.'
|
||||
assert function['secure'] == False, \
|
||||
'The mocked function should not require any security.'
|
@ -1,136 +0,0 @@
|
||||
"""
|
||||
This module contains tests for the lib submodule of the Remotes plugin.
|
||||
"""
|
||||
import os
|
||||
|
||||
from unittest import TestCase
|
||||
from tempfile import mkstemp
|
||||
from mock import MagicMock
|
||||
import urllib.request, urllib.error, urllib.parse
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from openlp.core.lib import Settings
|
||||
from openlp.plugins.remotes.lib.httpserver import HttpServer
|
||||
from PyQt4 import QtGui
|
||||
|
||||
__default_settings__ = {
|
||||
'remotes/twelve hour': True,
|
||||
'remotes/port': 4316,
|
||||
'remotes/https port': 4317,
|
||||
'remotes/https enabled': False,
|
||||
'remotes/user id': 'openlp',
|
||||
'remotes/password': 'password',
|
||||
'remotes/authentication enabled': False,
|
||||
'remotes/ip address': '0.0.0.0'
|
||||
}
|
||||
|
||||
|
||||
class TestRouter(TestCase):
|
||||
"""
|
||||
Test the functions in the :mod:`lib` module.
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
Create the UI
|
||||
"""
|
||||
fd, self.ini_file = mkstemp('.ini')
|
||||
Settings().set_filename(self.ini_file)
|
||||
self.application = QtGui.QApplication.instance()
|
||||
Settings().extend_default_settings(__default_settings__)
|
||||
self.server = HttpServer()
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
Delete all the C++ objects at the end so that we don't have a segfault
|
||||
"""
|
||||
del self.application
|
||||
os.unlink(self.ini_file)
|
||||
self.server.close()
|
||||
|
||||
def start_server(self):
|
||||
"""
|
||||
Common function to start server then mock out the router. CherryPy crashes if you mock before you start
|
||||
"""
|
||||
self.server.start_server()
|
||||
self.server.router = MagicMock()
|
||||
self.server.router.process_http_request = process_http_request
|
||||
|
||||
def start_default_server_test(self):
|
||||
"""
|
||||
Test the default server serves the correct initial page
|
||||
"""
|
||||
# GIVEN: A default configuration
|
||||
Settings().setValue('remotes/authentication enabled', False)
|
||||
self.start_server()
|
||||
|
||||
# WHEN: called the route location
|
||||
code, page = call_remote_server('http://localhost:4316')
|
||||
|
||||
# THEN: default title will be returned
|
||||
self.assertEqual(BeautifulSoup(page).title.text, 'OpenLP 2.1 Remote',
|
||||
'The default menu should be returned')
|
||||
|
||||
def start_authenticating_server_test(self):
|
||||
"""
|
||||
Test the default server serves the correctly with authentication
|
||||
"""
|
||||
# GIVEN: A default authorised configuration
|
||||
Settings().setValue('remotes/authentication enabled', True)
|
||||
self.start_server()
|
||||
|
||||
# WHEN: called the route location with no user details
|
||||
code, page = call_remote_server('http://localhost:4316')
|
||||
|
||||
# THEN: then server will ask for details
|
||||
self.assertEqual(code, 401, 'The basic authorisation request should be returned')
|
||||
|
||||
# WHEN: called the route location with user details
|
||||
code, page = call_remote_server('http://localhost:4316', 'openlp', 'password')
|
||||
|
||||
# THEN: default title will be returned
|
||||
self.assertEqual(BeautifulSoup(page).title.text, 'OpenLP 2.1 Remote',
|
||||
'The default menu should be returned')
|
||||
|
||||
# WHEN: called the route location with incorrect user details
|
||||
code, page = call_remote_server('http://localhost:4316', 'itwinkle', 'password')
|
||||
|
||||
# THEN: then server will ask for details
|
||||
self.assertEqual(code, 401, 'The basic authorisation request should be returned')
|
||||
|
||||
|
||||
def call_remote_server(url, username=None, password=None):
|
||||
"""
|
||||
Helper function
|
||||
|
||||
``username``
|
||||
The username.
|
||||
|
||||
``password``
|
||||
The password.
|
||||
"""
|
||||
if username:
|
||||
passman = urllib.request.HTTPPasswordMgrWithDefaultRealm()
|
||||
passman.add_password(None, url, username, password)
|
||||
authhandler = urllib.request.HTTPBasicAuthHandler(passman)
|
||||
opener = urllib.request.build_opener(authhandler)
|
||||
urllib.request.install_opener(opener)
|
||||
try:
|
||||
page = urllib.request.urlopen(url)
|
||||
return 0, page.read()
|
||||
except urllib.error.HTTPError as e:
|
||||
return e.code, ''
|
||||
|
||||
|
||||
def process_http_request(url_path, *args):
|
||||
"""
|
||||
Override function to make the Mock work but does nothing.
|
||||
|
||||
``Url_path``
|
||||
The url_path.
|
||||
|
||||
``*args``
|
||||
Some args.
|
||||
"""
|
||||
return None
|
||||
|
Loading…
Reference in New Issue
Block a user