Bye Bye CherryPy - Revised Http Server

bzr-revno: 2300
This commit is contained in:
Tim Bentley 2013-09-29 06:54:25 +01:00
commit 57464cfc6a
11 changed files with 761 additions and 814 deletions

View File

@ -75,12 +75,6 @@ try:
ICU_VERSION = 'OK'
except ImportError:
ICU_VERSION = '-'
try:
import cherrypy
CHERRYPY_VERSION = cherrypy.__version__
except ImportError:
CHERRYPY_VERSION = '-'
try:
WEBKIT_VERSION = QtWebKit.qWebKitVersion()
except AttributeError:
@ -140,7 +134,6 @@ class ExceptionForm(QtGui.QDialog, Ui_ExceptionDialog):
'Chardet: %s\n' % CHARDET_VERSION + \
'PyEnchant: %s\n' % ENCHANT_VERSION + \
'Mako: %s\n' % MAKO_VERSION + \
'CherryPy: %s\n' % CHERRYPY_VERSION + \
'pyICU: %s\n' % ICU_VERSION + \
'pyUNO bridge: %s\n' % self._pyuno_import() + \
'VLC: %s\n' % VLC_VERSION

View File

@ -40,6 +40,8 @@ window.OpenLP = {
// defeat Safari bug
targ = targ.parentNode;
}
var isSecure = false;
var isAuthorised = false;
return $(targ);
},
getSearchablePlugins: function () {
@ -147,11 +149,13 @@ window.OpenLP = {
},
pollServer: function () {
$.getJSON(
"/stage/poll",
"/api/poll",
function (data, status) {
var prevItem = OpenLP.currentItem;
OpenLP.currentSlide = data.results.slide;
OpenLP.currentItem = data.results.item;
OpenLP.isSecure = data.results.isSecure;
OpenLP.isAuthorised = data.results.isAuthorised;
if ($("#service-manager").is(":visible")) {
if (OpenLP.currentService != data.results.service) {
OpenLP.currentService = data.results.service;

View File

@ -26,7 +26,7 @@
window.OpenLP = {
loadService: function (event) {
$.getJSON(
"/stage/service/list",
"/api/service/list",
function (data, status) {
OpenLP.nextSong = "";
$("#notes").html("");
@ -46,7 +46,7 @@ window.OpenLP = {
},
loadSlides: function (event) {
$.getJSON(
"/stage/controller/live/text",
"/api/controller/live/text",
function (data, status) {
OpenLP.currentSlides = data.results.slides;
OpenLP.currentSlide = 0;
@ -137,7 +137,7 @@ window.OpenLP = {
},
pollServer: function () {
$.getJSON(
"/stage/poll",
"/api/poll",
function (data, status) {
OpenLP.updateClock(data);
if (OpenLP.currentItem != data.results.item ||

View File

@ -28,6 +28,7 @@
###############################################################################
from .remotetab import RemoteTab
from .httpserver import HttpServer
from .httprouter import HttpRouter
from .httpserver import OpenLPServer
__all__ = ['RemoteTab', 'HttpServer']
__all__ = ['RemoteTab', 'OpenLPServer', 'HttpRouter']

View File

@ -0,0 +1,638 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2013 Raoul Snyman #
# Portions copyright (c) 2008-2013 Tim Bentley, Gerald Britton, Jonathan #
# Corwin, Samuel Findlay, Michael Gorven, Scott Guerrieri, Matthias Hub, #
# Meinert Jordan, Armin Köhler, Erik Lundin, Edwin Lunando, Brian T. Meyer. #
# Joshua Miller, Stevan Pettit, Andreas Preikschat, Mattias Põldaru, #
# Christian Richter, Philip Ridout, Simon Scudder, Jeffrey Smith, #
# Maikel Stuivenberg, Martin Thompson, Jon Tibble, Dave Warnock, #
# Frode Woldsund, Martin Zibricky, Patrick Zimmermann #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
The :mod:`http` module contains the API web server. This is a lightweight web
server used by remotes to interact with OpenLP. It uses JSON to communicate with
the remotes.
*Routes:*
``/``
Go to the web interface.
``/stage``
Show the stage view.
``/files/{filename}``
Serve a static file.
``/stage/api/poll``
Poll to see if there are any changes. Returns a JSON-encoded dict of
any changes that occurred::
{"results": {"type": "controller"}}
Or, if there were no results, False::
{"results": False}
``/api/display/{hide|show}``
Blank or unblank the screen.
``/api/alert``
Sends an alert message to the alerts plugin. This method expects a
JSON-encoded dict like this::
{"request": {"text": "<your alert text>"}}
``/api/controller/{live|preview}/{action}``
Perform ``{action}`` on the live or preview controller. Valid actions
are:
``next``
Load the next slide.
``previous``
Load the previous slide.
``set``
Set a specific slide. Requires an id return in a JSON-encoded dict like
this::
{"request": {"id": 1}}
``first``
Load the first slide.
``last``
Load the last slide.
``text``
Fetches the text of the current song. The output is a JSON-encoded
dict which looks like this::
{"result": {"slides": ["...", "..."]}}
``/api/service/{action}``
Perform ``{action}`` on the service manager (e.g. go live). Data is
passed as a json-encoded ``data`` parameter. Valid actions are:
``next``
Load the next item in the service.
``previous``
Load the previews item in the service.
``set``
Set a specific item in the service. Requires an id returned in a
JSON-encoded dict like this::
{"request": {"id": 1}}
``list``
Request a list of items in the service. Returns a list of items in the
current service in a JSON-encoded dict like this::
{"results": {"items": [{...}, {...}]}}
"""
import base64
import json
import logging
import os
import re
import urllib.request
import urllib.error
from urllib.parse import urlparse, parse_qs
from mako.template import Template
from PyQt4 import QtCore
from openlp.core.lib import Registry, Settings, PluginStatus, StringContent, image_to_byte
from openlp.core.utils import AppLocation, translate
log = logging.getLogger(__name__)
class HttpRouter(object):
"""
This code is called by the HttpServer upon a request and it processes it based on the routing table.
This code is stateless and is created on each request.
Some variables may look incorrect but this extends BaseHTTPRequestHandler.
"""
def initialise(self):
"""
Initialise the router stack and any other variables.
"""
authcode = "%s:%s" % (Settings().value('remotes/user id'), Settings().value('remotes/password'))
try:
self.auth = base64.b64encode(authcode)
except TypeError:
self.auth = base64.b64encode(authcode.encode()).decode()
self.routes = [
('^/$', {'function': self.serve_file, 'secure': False}),
('^/(stage)$', {'function': self.serve_file, 'secure': False}),
('^/(main)$', {'function': self.serve_file, 'secure': False}),
(r'^/files/(.*)$', {'function': self.serve_file, 'secure': False}),
(r'^/api/poll$', {'function': self.poll, 'secure': False}),
(r'^/main/poll$', {'function': self.main_poll, 'secure': False}),
(r'^/main/image$', {'function': self.main_image, 'secure': False}),
(r'^/api/controller/(live|preview)/text$', {'function': self.controller_text, 'secure': False}),
(r'^/api/controller/(live|preview)/(.*)$', {'function': self.controller, 'secure': True}),
(r'^/api/service/list$', {'function': self.service_list, 'secure': False}),
(r'^/api/service/(.*)$', {'function': self.service, 'secure': True}),
(r'^/api/display/(hide|show|blank|theme|desktop)$', {'function': self.display, 'secure': True}),
(r'^/api/alert$', {'function': self.alert, 'secure': True}),
(r'^/api/plugin/(search)$', {'function': self.plugin_info, 'secure': False}),
(r'^/api/(.*)/search$', {'function': self.search, 'secure': False}),
(r'^/api/(.*)/live$', {'function': self.go_live, 'secure': True}),
(r'^/api/(.*)/add$', {'function': self.add_to_service, 'secure': True})
]
self.settings_section = 'remotes'
self.translate()
self.html_dir = os.path.join(AppLocation.get_directory(AppLocation.PluginsDir), 'remotes', 'html')
def do_post_processor(self):
"""
Handle the POST amd GET requests placed on the server.
"""
if self.path == '/favicon.ico':
return
if not hasattr(self, 'auth'):
self.initialise()
function, args = self.process_http_request(self.path)
if not function:
self.do_http_error()
return
self.authorised = self.headers['Authorization'] is None
if function['secure'] and Settings().value(self.settings_section + '/authentication enabled'):
if self.headers['Authorization'] is None:
self.do_authorisation()
self.wfile.write(bytes('no auth header received', 'UTF-8'))
elif self.headers['Authorization'] == 'Basic %s' % self.auth:
self.do_http_success()
self.call_function(function, *args)
else:
self.do_authorisation()
self.wfile.write(bytes(self.headers['Authorization'], 'UTF-8'))
self.wfile.write(bytes(' not authenticated', 'UTF-8'))
else:
self.call_function(function, *args)
def call_function(self, function, *args):
"""
Invoke the route function passing the relevant values
``function``
The function to be calledL.
``*args``
Any passed data.
"""
response = function['function'](*args)
if response:
self.wfile.write(response)
return
def process_http_request(self, url_path, *args):
"""
Common function to process HTTP requests
``url_path``
The requested URL.
``*args``
Any passed data.
"""
self.request_data = None
url_path_split = urlparse(url_path)
url_query = parse_qs(url_path_split.query)
if 'data' in url_query.keys():
self.request_data = url_query['data'][0]
for route, func in self.routes:
match = re.match(route, url_path_split.path)
if match:
log.debug('Route "%s" matched "%s"', route, url_path)
args = []
for param in match.groups():
args.append(param)
return func, args
return None, None
def do_http_success(self):
"""
Create a success http header.
"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
def do_json_header(self):
"""
Create a header for JSON messages
"""
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
def do_http_error(self):
"""
Create a error http header.
"""
self.send_response(404)
self.send_header('Content-type', 'text/html')
self.end_headers()
def do_authorisation(self):
"""
Create a needs authorisation http header.
"""
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic realm=\"Test\"')
self.send_header('Content-type', 'text/html')
self.end_headers()
def do_not_found(self):
"""
Create a not found http header.
"""
self.send_response(404)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(bytes('<html><body>Sorry, an error occurred </body></html>', 'UTF-8'))
def _get_service_items(self):
"""
Read the service item in use and return the data as a json object
"""
service_items = []
if self.live_controller.service_item:
current_unique_identifier = self.live_controller.service_item.unique_identifier
else:
current_unique_identifier = None
for item in self.service_manager.service_items:
service_item = item['service_item']
service_items.append({
'id': str(service_item.unique_identifier),
'title': str(service_item.get_display_title()),
'plugin': str(service_item.name),
'notes': str(service_item.notes),
'selected': (service_item.unique_identifier == current_unique_identifier)
})
return service_items
def translate(self):
"""
Translate various strings in the mobile app.
"""
self.template_vars = {
'app_title': translate('RemotePlugin.Mobile', 'OpenLP 2.1 Remote'),
'stage_title': translate('RemotePlugin.Mobile', 'OpenLP 2.1 Stage View'),
'live_title': translate('RemotePlugin.Mobile', 'OpenLP 2.1 Live View'),
'service_manager': translate('RemotePlugin.Mobile', 'Service Manager'),
'slide_controller': translate('RemotePlugin.Mobile', 'Slide Controller'),
'alerts': translate('RemotePlugin.Mobile', 'Alerts'),
'search': translate('RemotePlugin.Mobile', 'Search'),
'home': translate('RemotePlugin.Mobile', 'Home'),
'refresh': translate('RemotePlugin.Mobile', 'Refresh'),
'blank': translate('RemotePlugin.Mobile', 'Blank'),
'theme': translate('RemotePlugin.Mobile', 'Theme'),
'desktop': translate('RemotePlugin.Mobile', 'Desktop'),
'show': translate('RemotePlugin.Mobile', 'Show'),
'prev': translate('RemotePlugin.Mobile', 'Prev'),
'next': translate('RemotePlugin.Mobile', 'Next'),
'text': translate('RemotePlugin.Mobile', 'Text'),
'show_alert': translate('RemotePlugin.Mobile', 'Show Alert'),
'go_live': translate('RemotePlugin.Mobile', 'Go Live'),
'add_to_service': translate('RemotePlugin.Mobile', 'Add to Service'),
'add_and_go_to_service': translate('RemotePlugin.Mobile', 'Add &amp; Go to Service'),
'no_results': translate('RemotePlugin.Mobile', 'No Results'),
'options': translate('RemotePlugin.Mobile', 'Options'),
'service': translate('RemotePlugin.Mobile', 'Service'),
'slides': translate('RemotePlugin.Mobile', 'Slides')
}
def serve_file(self, file_name=None):
"""
Send a file to the socket. For now, just a subset of file types and must be top level inside the html folder.
If subfolders requested return 404, easier for security for the present.
Ultimately for i18n, this could first look for xx/file.html before falling back to file.html.
where xx is the language, e.g. 'en'
"""
log.debug('serve file request %s' % file_name)
if not file_name:
file_name = 'index.html'
elif file_name == 'stage':
file_name = 'stage.html'
elif file_name == 'main':
file_name = 'main.html'
path = os.path.normpath(os.path.join(self.html_dir, file_name))
if not path.startswith(self.html_dir):
return self.do_not_found()
ext = os.path.splitext(file_name)[1]
html = None
if ext == '.html':
self.send_header('Content-type', 'text/html')
variables = self.template_vars
html = Template(filename=path, input_encoding='utf-8', output_encoding='utf-8').render(**variables)
elif ext == '.css':
self.send_header('Content-type', 'text/css')
elif ext == '.js':
self.send_header('Content-type', 'application/javascript')
elif ext == '.jpg':
self.send_header('Content-type', 'image/jpeg')
elif ext == '.gif':
self.send_header('Content-type', 'image/gif')
elif ext == '.ico':
self.send_header('Content-type', 'image/x-icon')
elif ext == '.png':
self.send_header('Content-type', 'image/png')
else:
self.send_header('Content-type', 'text/plain')
file_handle = None
try:
if html:
content = html
else:
file_handle = open(path, 'rb')
log.debug('Opened %s' % path)
content = file_handle.read()
except IOError:
log.exception('Failed to open %s' % path)
return self.do_not_found()
finally:
if file_handle:
file_handle.close()
return content
def poll(self):
"""
Poll OpenLP to determine the current slide number and item name.
"""
result = {
'service': self.service_manager.service_id,
'slide': self.live_controller.selected_row or 0,
'item': self.live_controller.service_item.unique_identifier if self.live_controller.service_item else '',
'twelve': Settings().value('remotes/twelve hour'),
'blank': self.live_controller.blank_screen.isChecked(),
'theme': self.live_controller.theme_screen.isChecked(),
'display': self.live_controller.desktop_screen.isChecked(),
'version': 2,
'isSecure': Settings().value(self.settings_section + '/authentication enabled'),
'isAuthorised': self.authorised
}
self.do_json_header()
return json.dumps({'results': result}).encode()
def main_poll(self):
"""
Poll OpenLP to determine the current slide count.
"""
result = {
'slide_count': self.live_controller.slide_count
}
self.do_json_header()
return json.dumps({'results': result}).encode()
def main_image(self):
"""
Return the latest display image as a byte stream.
"""
result = {
'slide_image': 'data:image/png;base64,' + str(image_to_byte(self.live_controller.slide_image))
}
self.do_json_header()
return json.dumps({'results': result}).encode()
def display(self, action):
"""
Hide or show the display screen.
This is a cross Thread call and UI is updated so Events need to be used.
``action``
This is the action, either ``hide`` or ``show``.
"""
self.live_controller.emit(QtCore.SIGNAL('slidecontroller_toggle_display'), action)
self.do_json_header()
return json.dumps({'results': {'success': True}}).encode()
def alert(self):
"""
Send an alert.
"""
plugin = self.plugin_manager.get_plugin_by_name("alerts")
if plugin.status == PluginStatus.Active:
try:
text = json.loads(self.request_data)['request']['text']
except KeyError as ValueError:
return self.do_http_error()
text = urllib.parse.unquote(text)
self.alerts_manager.emit(QtCore.SIGNAL('alerts_text'), [text])
success = True
else:
success = False
self.do_json_header()
return json.dumps({'results': {'success': success}}).encode()
def controller_text(self, var):
"""
Perform an action on the slide controller.
"""
current_item = self.live_controller.service_item
data = []
if current_item:
for index, frame in enumerate(current_item.get_frames()):
item = {}
if current_item.is_text():
if frame['verseTag']:
item['tag'] = str(frame['verseTag'])
else:
item['tag'] = str(index + 1)
item['text'] = str(frame['text'])
item['html'] = str(frame['html'])
else:
item['tag'] = str(index + 1)
item['text'] = str(frame['title'])
item['html'] = str(frame['title'])
item['selected'] = (self.live_controller.selected_row == index)
data.append(item)
json_data = {'results': {'slides': data}}
if current_item:
json_data['results']['item'] = self.live_controller.service_item.unique_identifier
self.do_json_header()
return json.dumps(json_data).encode()
def controller(self, display_type, action):
"""
Perform an action on the slide controller.
``display_type``
This is the type of slide controller, either ``preview`` or ``live``.
``action``
The action to perform.
"""
event = 'slidecontroller_%s_%s' % (display_type, action)
if self.request_data:
try:
data = json.loads(self.request_data)['request']['id']
except KeyError as ValueError:
return self.do_http_error()
log.info(data)
# This slot expects an int within a list.
self.live_controller.emit(QtCore.SIGNAL(event), [data])
else:
self.live_controller.emit(QtCore.SIGNAL(event))
json_data = {'results': {'success': True}}
self.do_json_header()
return json.dumps(json_data).encode()
def service_list(self):
"""
Handles requests for service items in the service manager
``action``
The action to perform.
"""
self.do_json_header()
return json.dumps({'results': {'items': self._get_service_items()}}).encode()
def service(self, action):
"""
Handles requests for service items in the service manager
``action``
The action to perform.
"""
event = 'servicemanager_%s_item' % action
if self.request_data:
try:
data = json.loads(self.request_data)['request']['id']
except KeyError:
return self.do_http_error()
self.service_manager.emit(QtCore.SIGNAL(event), data)
else:
Registry().execute(event)
self.do_json_header()
return json.dumps({'results': {'success': True}}).encode()
def plugin_info(self, action):
"""
Return plugin related information, based on the action.
``action``
The action to perform. If *search* return a list of plugin names
which support search.
"""
if action == 'search':
searches = []
for plugin in self.plugin_manager.plugins:
if plugin.status == PluginStatus.Active and plugin.media_item and plugin.media_item.has_search:
searches.append([plugin.name, str(plugin.text_strings[StringContent.Name]['plural'])])
self.do_json_header()
return json.dumps({'results': {'items': searches}}).encode()
def search(self, plugin_name):
"""
Return a list of items that match the search text.
``plugin``
The plugin name to search in.
"""
try:
text = json.loads(self.request_data)['request']['text']
except KeyError as ValueError:
return self.do_http_error()
text = urllib.parse.unquote(text)
plugin = self.plugin_manager.get_plugin_by_name(plugin_name)
if plugin.status == PluginStatus.Active and plugin.media_item and plugin.media_item.has_search:
results = plugin.media_item.search(text, False)
else:
results = []
self.do_json_header()
return json.dumps({'results': {'items': results}}).encode()
def go_live(self, plugin_name):
"""
Go live on an item of type ``plugin``.
"""
try:
id = json.loads(self.request_data)['request']['id']
except KeyError as ValueError:
return self.do_http_error()
plugin = self.plugin_manager.get_plugin_by_name(plugin_name)
if plugin.status == PluginStatus.Active and plugin.media_item:
plugin.media_item.emit(QtCore.SIGNAL('%s_go_live' % plugin_name), [id, True])
return self.do_http_success()
def add_to_service(self, plugin_name):
"""
Add item of type ``plugin_name`` to the end of the service.
"""
try:
id = json.loads(self.request_data)['request']['id']
except KeyError as ValueError:
return self.do_http_error()
plugin = self.plugin_manager.get_plugin_by_name(plugin_name)
if plugin.status == PluginStatus.Active and plugin.media_item:
item_id = plugin.media_item.create_item_from_id(id)
plugin.media_item.emit(QtCore.SIGNAL('%s_add_to_service' % plugin_name), [item_id, True])
self.do_http_success()
def _get_service_manager(self):
"""
Adds the service manager to the class dynamically
"""
if not hasattr(self, '_service_manager'):
self._service_manager = Registry().get('service_manager')
return self._service_manager
service_manager = property(_get_service_manager)
def _get_live_controller(self):
"""
Adds the live controller to the class dynamically
"""
if not hasattr(self, '_live_controller'):
self._live_controller = Registry().get('live_controller')
return self._live_controller
live_controller = property(_get_live_controller)
def _get_plugin_manager(self):
"""
Adds the plugin manager to the class dynamically
"""
if not hasattr(self, '_plugin_manager'):
self._plugin_manager = Registry().get('plugin_manager')
return self._plugin_manager
plugin_manager = property(_get_plugin_manager)
def _get_alerts_manager(self):
"""
Adds the alerts manager to the class dynamically
"""
if not hasattr(self, '_alerts_manager'):
self._alerts_manager = Registry().get('alerts_manager')
return self._alerts_manager
alerts_manager = property(_get_alerts_manager)

View File

@ -31,661 +31,122 @@
The :mod:`http` module contains the API web server. This is a lightweight web
server used by remotes to interact with OpenLP. It uses JSON to communicate with
the remotes.
*Routes:*
``/``
Go to the web interface.
``/stage``
Show the stage view.
``/files/{filename}``
Serve a static file.
``/stage/api/poll``
Poll to see if there are any changes. Returns a JSON-encoded dict of
any changes that occurred::
{"results": {"type": "controller"}}
Or, if there were no results, False::
{"results": False}
``/api/display/{hide|show}``
Blank or unblank the screen.
``/api/alert``
Sends an alert message to the alerts plugin. This method expects a
JSON-encoded dict like this::
{"request": {"text": "<your alert text>"}}
``/api/controller/{live|preview}/{action}``
Perform ``{action}`` on the live or preview controller. Valid actions
are:
``next``
Load the next slide.
``previous``
Load the previous slide.
``set``
Set a specific slide. Requires an id return in a JSON-encoded dict like
this::
{"request": {"id": 1}}
``first``
Load the first slide.
``last``
Load the last slide.
``text``
Fetches the text of the current song. The output is a JSON-encoded
dict which looks like this::
{"result": {"slides": ["...", "..."]}}
``/api/service/{action}``
Perform ``{action}`` on the service manager (e.g. go live). Data is
passed as a json-encoded ``data`` parameter. Valid actions are:
``next``
Load the next item in the service.
``previous``
Load the previews item in the service.
``set``
Set a specific item in the service. Requires an id returned in a
JSON-encoded dict like this::
{"request": {"id": 1}}
``list``
Request a list of items in the service. Returns a list of items in the
current service in a JSON-encoded dict like this::
{"results": {"items": [{...}, {...}]}}
"""
import json
import logging
import ssl
import socket
import os
import re
import urllib.request, urllib.parse, urllib.error
import urllib.parse
import cherrypy
import logging
from urllib.parse import urlparse, parse_qs
from mako.template import Template
from PyQt4 import QtCore
from openlp.core.lib import Registry, Settings, PluginStatus, StringContent, image_to_byte
from openlp.core.utils import AppLocation, translate
from openlp.core.lib import Settings
from openlp.core.utils import AppLocation
from hashlib import sha1
from openlp.plugins.remotes.lib import HttpRouter
from socketserver import BaseServer, ThreadingMixIn
from http.server import BaseHTTPRequestHandler, HTTPServer
log = logging.getLogger(__name__)
def make_sha_hash(password):
class CustomHandler(BaseHTTPRequestHandler, HttpRouter):
"""
Create an encrypted password for the given password.
Stateless session handler to handle the HTTP request and process it.
This class handles just the overrides to the base methods and the logic to invoke the
methods within the HttpRouter class.
DO not try change the structure as this is as per the documentation.
"""
log.debug("make_sha_hash")
return sha1(password.encode()).hexdigest()
def do_POST(self):
"""
Present pages / data and invoke URL level user authentication.
"""
self.do_post_processor()
def do_GET(self):
"""
Present pages / data and invoke URL level user authentication.
"""
self.do_post_processor()
def fetch_password(username):
"""
Fetch the password for a provided user.
"""
log.debug("Fetch Password")
if username != Settings().value('remotes/user id'):
return None
return make_sha_hash(Settings().value('remotes/password'))
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
pass
class HttpServer(object):
class HttpThread(QtCore.QThread):
"""
Ability to control OpenLP via a web browser.
This class controls the Cherrypy server and configuration.
A special Qt thread class to allow the HTTP server to run at the same time as the UI.
"""
_cp_config = {
'tools.sessions.on': True,
'tools.auth.on': True
}
def __init__(self, server):
"""
Constructor for the thread class.
``server``
The http server class.
"""
super(HttpThread, self).__init__(None)
self.http_server = server
def run(self):
"""
Run the thread.
"""
self.http_server.start_server()
class OpenLPServer():
def __init__(self):
"""
Initialise the http server, and start the server.
Initialise the http server, and start the server of the correct type http / https
"""
log.debug('Initialise httpserver')
self.settings_section = 'remotes'
self.router = HttpRouter()
self.http_thread = HttpThread(self)
self.http_thread.start()
def start_server(self):
"""
Start the http server based on configuration.
"""
log.debug('Start CherryPy server')
# Define to security levels and inject the router code
self.root = self.Public()
self.root.files = self.Files()
self.root.stage = self.Stage()
self.root.main = self.Main()
self.root.router = self.router
self.root.files.router = self.router
self.root.stage.router = self.router
self.root.main.router = self.router
cherrypy.tree.mount(self.root, '/', config=self.define_config())
# Turn off the flood of access messages cause by poll
cherrypy.log.access_log.propagate = False
cherrypy.engine.start()
def define_config(self):
"""
Define the configuration of the server.
Start the correct server and save the handler
"""
address = Settings().value(self.settings_section + '/ip address')
if Settings().value(self.settings_section + '/https enabled'):
port = Settings().value(self.settings_section + '/https port')
address = Settings().value(self.settings_section + '/ip address')
local_data = AppLocation.get_directory(AppLocation.DataDir)
cherrypy.config.update({'server.socket_host': str(address),
'server.socket_port': port,
'server.ssl_certificate': os.path.join(local_data, 'remotes', 'openlp.crt'),
'server.ssl_private_key': os.path.join(local_data, 'remotes', 'openlp.key')})
self.httpd = HTTPSServer((address, port), CustomHandler)
log.debug('Started ssl httpd...')
else:
port = Settings().value(self.settings_section + '/port')
address = Settings().value(self.settings_section + '/ip address')
cherrypy.config.update({'server.socket_host': str(address)})
cherrypy.config.update({'server.socket_port': port})
cherrypy.config.update({'environment': 'embedded'})
cherrypy.config.update({'engine.autoreload_on': False})
directory_config = {'/': {'tools.staticdir.on': True,
'tools.staticdir.dir': self.router.html_dir,
'tools.basic_auth.on': Settings().value('remotes/authentication enabled'),
'tools.basic_auth.realm': 'OpenLP Remote Login',
'tools.basic_auth.users': fetch_password,
'tools.basic_auth.encrypt': make_sha_hash},
'/files': {'tools.staticdir.on': True,
'tools.staticdir.dir': self.router.html_dir,
'tools.basic_auth.on': False},
'/stage': {'tools.staticdir.on': True,
'tools.staticdir.dir': self.router.html_dir,
'tools.basic_auth.on': False},
'/main': {'tools.staticdir.on': True,
'tools.staticdir.dir': self.router.html_dir,
'tools.basic_auth.on': False}}
return directory_config
self.httpd = ThreadingHTTPServer((address, port), CustomHandler)
log.debug('Started non ssl httpd...')
self.httpd.serve_forever()
class Public(object):
def stop_server(self):
"""
Main access class with may have security enabled on it.
Stop the server
"""
@cherrypy.expose
def default(self, *args, **kwargs):
self.router.request_data = None
if isinstance(kwargs, dict):
self.router.request_data = kwargs.get('data', None)
url = urllib.parse.urlparse(cherrypy.url())
return self.router.process_http_request(url.path, *args)
class Files(object):
"""
Provides access to files and has no security available. These are read only accesses
"""
@cherrypy.expose
def default(self, *args, **kwargs):
url = urllib.parse.urlparse(cherrypy.url())
return self.router.process_http_request(url.path, *args)
class Stage(object):
"""
Stage view is read only so security is not relevant and would reduce it's usability
"""
@cherrypy.expose
def default(self, *args, **kwargs):
url = urllib.parse.urlparse(cherrypy.url())
return self.router.process_http_request(url.path, *args)
class Main(object):
"""
Main view is read only so security is not relevant and would reduce it's usability
"""
@cherrypy.expose
def default(self, *args, **kwargs):
url = urllib.parse.urlparse(cherrypy.url())
return self.router.process_http_request(url.path, *args)
def close(self):
"""
Close down the http server.
"""
log.debug('close http server')
cherrypy.engine.exit()
self.http_thread.exit(0)
self.httpd = None
log.debug('Stopped the server.')
class HttpRouter(object):
"""
This code is called by the HttpServer upon a request and it processes it based on the routing table.
"""
def __init__(self):
class HTTPSServer(HTTPServer):
def __init__(self, address, handler):
"""
Initialise the router
Initialise the secure handlers for the SSL server if required.s
"""
self.routes = [
('^/$', self.serve_file),
('^/(stage)$', self.serve_file),
('^/(main)$', self.serve_file),
(r'^/files/(.*)$', self.serve_file),
(r'^/api/poll$', self.poll),
(r'^/stage/poll$', self.poll),
(r'^/main/poll$', self.main_poll),
(r'^/main/image$', self.main_image),
(r'^/api/controller/(live|preview)/(.*)$', self.controller),
(r'^/stage/controller/(live|preview)/(.*)$', self.controller),
(r'^/api/service/(.*)$', self.service),
(r'^/stage/service/(.*)$', self.service),
(r'^/api/display/(hide|show|blank|theme|desktop)$', self.display),
(r'^/api/alert$', self.alert),
(r'^/api/plugin/(search)$', self.plugin_info),
(r'^/api/(.*)/search$', self.search),
(r'^/api/(.*)/live$', self.go_live),
(r'^/api/(.*)/add$', self.add_to_service)
]
self.translate()
self.html_dir = os.path.join(AppLocation.get_directory(AppLocation.PluginsDir), 'remotes', 'html')
BaseServer.__init__(self, address, handler)
local_data = AppLocation.get_directory(AppLocation.DataDir)
self.socket = ssl.SSLSocket(
sock=socket.socket(self.address_family, self.socket_type),
ssl_version=ssl.PROTOCOL_TLSv1,
certfile=os.path.join(local_data, 'remotes', 'openlp.crt'),
keyfile=os.path.join(local_data, 'remotes', 'openlp.key'),
server_side=True)
self.server_bind()
self.server_activate()
def process_http_request(self, url_path, *args):
"""
Common function to process HTTP requests
``url_path``
The requested URL.
``*args``
Any passed data.
"""
response = None
for route, func in self.routes:
match = re.match(route, url_path)
if match:
log.debug('Route "%s" matched "%s"', route, url_path)
args = []
for param in match.groups():
args.append(param)
response = func(*args)
break
if response:
return response
else:
log.debug('Path not found %s', url_path)
return self._http_not_found()
def _get_service_items(self):
"""
Read the service item in use and return the data as a json object
"""
service_items = []
if self.live_controller.service_item:
current_unique_identifier = self.live_controller.service_item.unique_identifier
else:
current_unique_identifier = None
for item in self.service_manager.service_items:
service_item = item['service_item']
service_items.append({
'id': str(service_item.unique_identifier),
'title': str(service_item.get_display_title()),
'plugin': str(service_item.name),
'notes': str(service_item.notes),
'selected': (service_item.unique_identifier == current_unique_identifier)
})
return service_items
def translate(self):
"""
Translate various strings in the mobile app.
"""
self.template_vars = {
'app_title': translate('RemotePlugin.Mobile', 'OpenLP 2.1 Remote'),
'stage_title': translate('RemotePlugin.Mobile', 'OpenLP 2.1 Stage View'),
'live_title': translate('RemotePlugin.Mobile', 'OpenLP 2.1 Live View'),
'service_manager': translate('RemotePlugin.Mobile', 'Service Manager'),
'slide_controller': translate('RemotePlugin.Mobile', 'Slide Controller'),
'alerts': translate('RemotePlugin.Mobile', 'Alerts'),
'search': translate('RemotePlugin.Mobile', 'Search'),
'home': translate('RemotePlugin.Mobile', 'Home'),
'refresh': translate('RemotePlugin.Mobile', 'Refresh'),
'blank': translate('RemotePlugin.Mobile', 'Blank'),
'theme': translate('RemotePlugin.Mobile', 'Theme'),
'desktop': translate('RemotePlugin.Mobile', 'Desktop'),
'show': translate('RemotePlugin.Mobile', 'Show'),
'prev': translate('RemotePlugin.Mobile', 'Prev'),
'next': translate('RemotePlugin.Mobile', 'Next'),
'text': translate('RemotePlugin.Mobile', 'Text'),
'show_alert': translate('RemotePlugin.Mobile', 'Show Alert'),
'go_live': translate('RemotePlugin.Mobile', 'Go Live'),
'add_to_service': translate('RemotePlugin.Mobile', 'Add to Service'),
'add_and_go_to_service': translate('RemotePlugin.Mobile', 'Add &amp; Go to Service'),
'no_results': translate('RemotePlugin.Mobile', 'No Results'),
'options': translate('RemotePlugin.Mobile', 'Options'),
'service': translate('RemotePlugin.Mobile', 'Service'),
'slides': translate('RemotePlugin.Mobile', 'Slides')
}
def serve_file(self, file_name=None):
"""
Send a file to the socket. For now, just a subset of file types and must be top level inside the html folder.
If subfolders requested return 404, easier for security for the present.
Ultimately for i18n, this could first look for xx/file.html before falling back to file.html.
where xx is the language, e.g. 'en'
"""
log.debug('serve file request %s' % file_name)
if not file_name:
file_name = 'index.html'
elif file_name == 'stage':
file_name = 'stage.html'
elif file_name == 'main':
file_name = 'main.html'
path = os.path.normpath(os.path.join(self.html_dir, file_name))
if not path.startswith(self.html_dir):
return self._http_not_found()
ext = os.path.splitext(file_name)[1]
html = None
if ext == '.html':
mimetype = 'text/html'
variables = self.template_vars
html = Template(filename=path, input_encoding='utf-8', output_encoding='utf-8').render(**variables)
elif ext == '.css':
mimetype = 'text/css'
elif ext == '.js':
mimetype = 'application/x-javascript'
elif ext == '.jpg':
mimetype = 'image/jpeg'
elif ext == '.gif':
mimetype = 'image/gif'
elif ext == '.png':
mimetype = 'image/png'
else:
mimetype = 'text/plain'
file_handle = None
try:
if html:
content = html
else:
file_handle = open(path, 'rb')
log.debug('Opened %s' % path)
content = file_handle.read()
except IOError:
log.exception('Failed to open %s' % path)
return self._http_not_found()
finally:
if file_handle:
file_handle.close()
cherrypy.response.headers['Content-Type'] = mimetype
return content
def poll(self):
"""
Poll OpenLP to determine the current slide number and item name.
"""
result = {
'service': self.service_manager.service_id,
'slide': self.live_controller.selected_row or 0,
'item': self.live_controller.service_item.unique_identifier if self.live_controller.service_item else '',
'twelve': Settings().value('remotes/twelve hour'),
'blank': self.live_controller.blank_screen.isChecked(),
'theme': self.live_controller.theme_screen.isChecked(),
'display': self.live_controller.desktop_screen.isChecked()
}
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps({'results': result}).encode()
def main_poll(self):
"""
Poll OpenLP to determine the current slide count.
"""
result = {
'slide_count': self.live_controller.slide_count
}
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps({'results': result}).encode()
def main_image(self):
"""
Return the latest display image as a byte stream.
"""
result = {
'slide_image': 'data:image/png;base64,' + str(image_to_byte(self.live_controller.slide_image))
}
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps({'results': result}).encode()
def display(self, action):
"""
Hide or show the display screen.
This is a cross Thread call and UI is updated so Events need to be used.
``action``
This is the action, either ``hide`` or ``show``.
"""
self.live_controller.emit(QtCore.SIGNAL('slidecontroller_toggle_display'), action)
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps({'results': {'success': True}}).encode()
def alert(self):
"""
Send an alert.
"""
plugin = self.plugin_manager.get_plugin_by_name("alerts")
if plugin.status == PluginStatus.Active:
try:
text = json.loads(self.request_data)['request']['text']
except KeyError as ValueError:
return self._http_bad_request()
text = urllib.parse.unquote(text)
self.alerts_manager.emit(QtCore.SIGNAL('alerts_text'), [text])
success = True
else:
success = False
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps({'results': {'success': success}}).encode()
def controller(self, display_type, action):
"""
Perform an action on the slide controller.
``display_type``
This is the type of slide controller, either ``preview`` or ``live``.
``action``
The action to perform.
"""
event = 'slidecontroller_%s_%s' % (display_type, action)
if action == 'text':
current_item = self.live_controller.service_item
data = []
if current_item:
for index, frame in enumerate(current_item.get_frames()):
item = {}
if current_item.is_text():
if frame['verseTag']:
item['tag'] = str(frame['verseTag'])
else:
item['tag'] = str(index + 1)
item['text'] = str(frame['text'])
item['html'] = str(frame['html'])
else:
item['tag'] = str(index + 1)
item['text'] = str(frame['title'])
item['html'] = str(frame['title'])
item['selected'] = (self.live_controller.selected_row == index)
data.append(item)
json_data = {'results': {'slides': data}}
if current_item:
json_data['results']['item'] = self.live_controller.service_item.unique_identifier
else:
if self.request_data:
try:
data = json.loads(self.request_data)['request']['id']
except KeyError as ValueError:
return self._http_bad_request()
log.info(data)
# This slot expects an int within a list.
self.live_controller.emit(QtCore.SIGNAL(event), [data])
else:
self.live_controller.emit(QtCore.SIGNAL(event))
json_data = {'results': {'success': True}}
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps(json_data).encode()
def service(self, action):
"""
Handles requests for service items in the service manager
``action``
The action to perform.
"""
event = 'servicemanager_%s' % action
if action == 'list':
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps({'results': {'items': self._get_service_items()}}).encode()
event += '_item'
if self.request_data:
try:
data = json.loads(self.request_data)['request']['id']
except KeyError:
return self._http_bad_request()
self.service_manager.emit(QtCore.SIGNAL(event), data)
else:
Registry().execute(event)
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps({'results': {'success': True}}).encode()
def plugin_info(self, action):
"""
Return plugin related information, based on the action.
``action``
The action to perform. If *search* return a list of plugin names
which support search.
"""
if action == 'search':
searches = []
for plugin in self.plugin_manager.plugins:
if plugin.status == PluginStatus.Active and plugin.media_item and plugin.media_item.has_search:
searches.append([plugin.name, str(plugin.text_strings[StringContent.Name]['plural'])])
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps({'results': {'items': searches}}).encode()
def search(self, plugin_name):
"""
Return a list of items that match the search text.
``plugin``
The plugin name to search in.
"""
try:
text = json.loads(self.request_data)['request']['text']
except KeyError as ValueError:
return self._http_bad_request()
text = urllib.parse.unquote(text)
plugin = self.plugin_manager.get_plugin_by_name(plugin_name)
if plugin.status == PluginStatus.Active and plugin.media_item and plugin.media_item.has_search:
results = plugin.media_item.search(text, False)
else:
results = []
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps({'results': {'items': results}}).encode()
def go_live(self, plugin_name):
"""
Go live on an item of type ``plugin``.
"""
try:
id = json.loads(self.request_data)['request']['id']
except KeyError as ValueError:
return self._http_bad_request()
plugin = self.plugin_manager.get_plugin_by_name(plugin_name)
if plugin.status == PluginStatus.Active and plugin.media_item:
plugin.media_item.emit(QtCore.SIGNAL('%s_go_live' % plugin_name), [id, True])
return self._http_success()
def add_to_service(self, plugin_name):
"""
Add item of type ``plugin_name`` to the end of the service.
"""
try:
id = json.loads(self.request_data)['request']['id']
except KeyError as ValueError:
return self._http_bad_request()
plugin = self.plugin_manager.get_plugin_by_name(plugin_name)
if plugin.status == PluginStatus.Active and plugin.media_item:
item_id = plugin.media_item.create_item_from_id(id)
plugin.media_item.emit(QtCore.SIGNAL('%s_add_to_service' % plugin_name), [item_id, True])
self._http_success()
def _http_success(self):
"""
Set the HTTP success return code.
"""
cherrypy.response.status = 200
def _http_bad_request(self):
"""
Set the HTTP bad response return code.
"""
cherrypy.response.status = 400
def _http_not_found(self):
"""
Set the HTTP not found return code.
"""
cherrypy.response.status = 404
cherrypy.response.body = [b'<html><body>Sorry, an error occurred </body></html>']
def _get_service_manager(self):
"""
Adds the service manager to the class dynamically
"""
if not hasattr(self, '_service_manager'):
self._service_manager = Registry().get('service_manager')
return self._service_manager
service_manager = property(_get_service_manager)
def _get_live_controller(self):
"""
Adds the live controller to the class dynamically
"""
if not hasattr(self, '_live_controller'):
self._live_controller = Registry().get('live_controller')
return self._live_controller
live_controller = property(_get_live_controller)
def _get_plugin_manager(self):
"""
Adds the plugin manager to the class dynamically
"""
if not hasattr(self, '_plugin_manager'):
self._plugin_manager = Registry().get('plugin_manager')
return self._plugin_manager
plugin_manager = property(_get_plugin_manager)
def _get_alerts_manager(self):
"""
Adds the alerts manager to the class dynamically
"""
if not hasattr(self, '_alerts_manager'):
self._alerts_manager = Registry().get('alerts_manager')
return self._alerts_manager
alerts_manager = property(_get_alerts_manager)

View File

@ -207,8 +207,8 @@ class RemoteTab(SettingsTab):
https_url_temp = https_url + 'stage'
self.stage_url.setText('<a href="%s">%s</a>' % (http_url_temp, http_url_temp))
self.stage_https_url.setText('<a href="%s">%s</a>' % (https_url_temp, https_url_temp))
http_url_temp = http_url + 'live'
https_url_temp = https_url + 'live'
http_url_temp = http_url + 'main'
https_url_temp = https_url + 'main'
self.live_url.setText('<a href="%s">%s</a>' % (http_url_temp, http_url_temp))
self.live_https_url.setText('<a href="%s">%s</a>' % (https_url_temp, https_url_temp))

View File

@ -28,11 +28,10 @@
###############################################################################
import logging
from PyQt4 import QtGui
import time
from openlp.core.lib import Plugin, StringContent, translate, build_icon
from openlp.plugins.remotes.lib import RemoteTab, HttpServer
from openlp.plugins.remotes.lib import RemoteTab, OpenLPServer
log = logging.getLogger(__name__)
@ -67,8 +66,7 @@ class RemotesPlugin(Plugin):
"""
log.debug('initialise')
super(RemotesPlugin, self).initialise()
self.server = HttpServer()
self.server.start_server()
self.server = OpenLPServer()
def finalise(self):
"""
@ -77,7 +75,7 @@ class RemotesPlugin(Plugin):
log.debug('finalise')
super(RemotesPlugin, self).finalise()
if self.server:
self.server.close()
self.server.stop_server()
self.server = None
def about(self):
@ -109,5 +107,6 @@ class RemotesPlugin(Plugin):
Called when Config is changed to restart the server on new address or port
"""
log.debug('remote config changed')
self.main_window.information_message(translate('RemotePlugin', 'Configuration Change'),
translate('RemotePlugin', 'OpenLP will need to be restarted for the Remote changes to become active.'))
self.finalise()
time.sleep(0.5)
self.initialise()

View File

@ -48,6 +48,7 @@ except ImportError:
IS_WIN = sys.platform.startswith('win')
VERS = {
'Python': '3.0',
'PyQt4': '4.6',
@ -84,7 +85,6 @@ MODULES = [
'enchant',
'bs4',
'mako',
'cherrypy',
'uno',
]
@ -98,6 +98,7 @@ OPTIONAL_MODULES = [
w = sys.stdout.write
def check_vers(version, required, text):
if not isinstance(version, str):
version = '.'.join(map(str, version))
@ -111,13 +112,16 @@ def check_vers(version, required, text):
w('FAIL' + os.linesep)
return False
def print_vers_fail(required, text):
print(' %s >= %s ... FAIL' % (text, required))
def verify_python():
if not check_vers(list(sys.version_info), VERS['Python'], text='Python'):
exit(1)
def verify_versions():
print('Verifying version of modules...')
try:
@ -138,6 +142,7 @@ def verify_versions():
except ImportError:
print_vers_fail(VERS['enchant'], 'enchant')
def check_module(mod, text='', indent=' '):
space = (30 - len(mod) - len(text)) * ' '
w(indent + '%s%s... ' % (mod, text) + space)
@ -148,6 +153,7 @@ def check_module(mod, text='', indent=' '):
w('FAIL')
w(os.linesep)
def verify_pyenchant():
w('Enchant (spell checker)... ')
try:
@ -160,6 +166,7 @@ def verify_pyenchant():
except ImportError:
w('FAIL' + os.linesep)
def verify_pyqt():
w('Qt4 image formats... ')
try:
@ -174,22 +181,19 @@ def verify_pyqt():
except ImportError:
w('FAIL' + os.linesep)
def main():
verify_python()
print('Checking for modules...')
for m in MODULES:
check_module(m)
print('Checking for optional modules...')
for m in OPTIONAL_MODULES:
check_module(m[0], text=m[1])
if IS_WIN:
print('Checking for Windows specific modules...')
for m in WIN32_MODULES:
check_module(m)
verify_versions()
verify_pyqt()
verify_pyenchant()

View File

@ -8,7 +8,7 @@ from tempfile import mkstemp
from mock import MagicMock
from openlp.core.lib import Settings
from openlp.plugins.remotes.lib.httpserver import HttpRouter, fetch_password, make_sha_hash
from openlp.plugins.remotes.lib.httpserver import HttpRouter
from PyQt4 import QtGui
__default_settings__ = {
@ -44,40 +44,22 @@ class TestRouter(TestCase):
del self.application
os.unlink(self.ini_file)
def fetch_password_unknown_test(self):
def password_encrypter_test(self):
"""
Test the fetch password code with an unknown userid
Test hash userid and password function
"""
# GIVEN: A default configuration
# WHEN: called with the defined userid
password = fetch_password('itwinkle')
Settings().setValue('remotes/user id', 'openlp')
Settings().setValue('remotes/password', 'password')
# THEN: the function should return None
self.assertEqual(password, None, 'The result for fetch_password should be None')
def fetch_password_known_test(self):
"""
Test the fetch password code with the defined userid
"""
# GIVEN: A default configuration
# WHEN: called with the defined userid
password = fetch_password('openlp')
required_password = make_sha_hash('password')
router = HttpRouter()
router.initialise()
test_value = 'b3BlbmxwOnBhc3N3b3Jk'
print(router.auth)
# THEN: the function should return the correct password
self.assertEqual(password, required_password, 'The result for fetch_password should be the defined password')
def sha_password_encrypter_test(self):
"""
Test hash password function
"""
# GIVEN: A default configuration
# WHEN: called with the defined userid
required_password = make_sha_hash('password')
test_value = '5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8'
# THEN: the function should return the correct password
self.assertEqual(required_password, test_value,
self.assertEqual(router.auth, test_value,
'The result for make_sha_hash should return the correct encrypted password')
def process_http_request_test(self):
@ -85,15 +67,18 @@ class TestRouter(TestCase):
Test the router control functionality
"""
# GIVEN: A testing set of Routes
router = HttpRouter()
mocked_function = MagicMock()
test_route = [
(r'^/stage/api/poll$', mocked_function),
(r'^/stage/api/poll$', {'function': mocked_function, 'secure': False}),
]
self.router.routes = test_route
router.routes = test_route
# WHEN: called with a poll route
self.router.process_http_request('/stage/api/poll', None)
function, args = router.process_http_request('/stage/api/poll', None)
# THEN: the function should have been called only once
assert mocked_function.call_count == 1, \
'The mocked function should have been matched and called once.'
assert function['function'] == mocked_function, \
'The mocked function should match defined value.'
assert function['secure'] == False, \
'The mocked function should not require any security.'

View File

@ -1,138 +0,0 @@
"""
This module contains tests for the lib submodule of the Remotes plugin.
"""
import os
from unittest import TestCase
from tempfile import mkstemp
from mock import MagicMock
import urllib.request, urllib.error, urllib.parse
import cherrypy
from bs4 import BeautifulSoup
from openlp.core.lib import Settings
from openlp.plugins.remotes.lib.httpserver import HttpServer
from PyQt4 import QtGui
__default_settings__ = {
'remotes/twelve hour': True,
'remotes/port': 4316,
'remotes/https port': 4317,
'remotes/https enabled': False,
'remotes/user id': 'openlp',
'remotes/password': 'password',
'remotes/authentication enabled': False,
'remotes/ip address': '0.0.0.0'
}
class TestRouter(TestCase):
"""
Test the functions in the :mod:`lib` module.
"""
def setUp(self):
"""
Create the UI
"""
fd, self.ini_file = mkstemp('.ini')
Settings().set_filename(self.ini_file)
self.application = QtGui.QApplication.instance()
Settings().extend_default_settings(__default_settings__)
self.server = HttpServer()
def tearDown(self):
"""
Delete all the C++ objects at the end so that we don't have a segfault
"""
del self.application
os.unlink(self.ini_file)
self.server.close()
def start_server(self):
"""
Common function to start server then mock out the router. CherryPy crashes if you mock before you start
"""
self.server.start_server()
self.server.router = MagicMock()
self.server.router.process_http_request = process_http_request
def start_default_server_test(self):
"""
Test the default server serves the correct initial page
"""
# GIVEN: A default configuration
Settings().setValue('remotes/authentication enabled', False)
self.start_server()
# WHEN: called the route location
code, page = call_remote_server('http://localhost:4316')
# THEN: default title will be returned
self.assertEqual(BeautifulSoup(page).title.text, 'OpenLP 2.1 Remote',
'The default menu should be returned')
def start_authenticating_server_test(self):
"""
Test the default server serves the correctly with authentication
"""
# GIVEN: A default authorised configuration
Settings().setValue('remotes/authentication enabled', True)
self.start_server()
# WHEN: called the route location with no user details
code, page = call_remote_server('http://localhost:4316')
# THEN: then server will ask for details
self.assertEqual(code, 401, 'The basic authorisation request should be returned')
# WHEN: called the route location with user details
code, page = call_remote_server('http://localhost:4316', 'openlp', 'password')
# THEN: default title will be returned
self.assertEqual(BeautifulSoup(page).title.text, 'OpenLP 2.1 Remote',
'The default menu should be returned')
# WHEN: called the route location with incorrect user details
code, page = call_remote_server('http://localhost:4316', 'itwinkle', 'password')
# THEN: then server will ask for details
self.assertEqual(code, 401, 'The basic authorisation request should be returned')
def call_remote_server(url, username=None, password=None):
"""
Helper function
``username``
The username.
``password``
The password.
"""
if username:
passman = urllib.request.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, url, username, password)
authhandler = urllib.request.HTTPBasicAuthHandler(passman)
opener = urllib.request.build_opener(authhandler)
urllib.request.install_opener(opener)
try:
page = urllib.request.urlopen(url)
return 0, page.read()
except urllib.error.HTTPError as e:
return e.code, ''
def process_http_request(url_path, *args):
"""
Override function to make the Mock work but does nothing.
``Url_path``
The url_path.
``*args``
Some args.
"""
cherrypy.response.status = 200
return None