This commit is contained in:
Tim Bentley 2016-01-08 17:09:29 +00:00
commit fd3a85a68c
11 changed files with 121 additions and 168 deletions

View File

@ -55,7 +55,7 @@ def init_db(url, auto_flush=True, auto_commit=False, base=None):
metadata = MetaData(bind=engine)
else:
base.metadata.bind = engine
metadata = None
metadata = base.metadata
session = scoped_session(sessionmaker(autoflush=auto_flush, autocommit=auto_commit, bind=engine))
return session, metadata
@ -227,13 +227,12 @@ class Manager(object):
"""
self.is_dirty = False
self.session = None
# See if we're using declarative_base with a pre-existing session.
log.debug('Manager: Testing for pre-existing session')
if session is not None:
log.debug('Manager: Using existing session')
else:
log.debug('Manager: Creating new session')
self.db_url = None
if db_file_name:
log.debug('Manager: Creating new DB url')
self.db_url = init_url(plugin_name, db_file_name)
else:
self.db_url = init_url(plugin_name)
if upgrade_mod:
try:
db_ver, up_ver = upgrade_db(self.db_url, upgrade_mod)
@ -248,10 +247,13 @@ class Manager(object):
'not be loaded.\n\nDatabase: %s') % (db_ver, up_ver, self.db_url)
)
return
try:
self.session = init_schema(self.db_url)
except (SQLAlchemyError, DBAPIError):
handle_db_error(plugin_name, db_file_name)
if not session:
try:
self.session = init_schema(self.db_url)
except (SQLAlchemyError, DBAPIError):
handle_db_error(plugin_name, db_file_name)
else:
self.session = session
def save_object(self, object_instance, commit=True):
"""
@ -344,13 +346,13 @@ class Manager(object):
for try_count in range(3):
try:
return self.session.query(object_class).filter(filter_clause).first()
except OperationalError:
except OperationalError as oe:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
if try_count >= 2:
if try_count >= 2 or 'MySQL has gone away' in str(oe):
raise
log.exception('Probably a MySQL issue, "MySQL has gone away"')
def get_all_objects(self, object_class, filter_clause=None, order_by_ref=None):
"""

View File

@ -20,18 +20,18 @@
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
:mod:`openlp.core.lib.projector.db` module
:mod:`openlp.core.lib.projector.db` module
Provides the database functions for the Projector module.
Provides the database functions for the Projector module.
The Manufacturer, Model, Source tables keep track of the video source
strings used for display of input sources. The Source table maps
manufacturer-defined or user-defined strings from PJLink default strings
to end-user readable strings; ex: PJLink code 11 would map "RGB 1"
default string to "RGB PC (analog)" string.
(Future feature).
The Manufacturer, Model, Source tables keep track of the video source
strings used for display of input sources. The Source table maps
manufacturer-defined or user-defined strings from PJLink default strings
to end-user readable strings; ex: PJLink code 11 would map "RGB 1"
default string to "RGB PC (analog)" string.
(Future feature).
The Projector table keeps track of entries for controlled projectors.
The Projector table keeps track of entries for controlled projectors.
"""
import logging
@ -218,19 +218,19 @@ class ProjectorDB(Manager):
"""
def __init__(self, *args, **kwargs):
log.debug('ProjectorDB().__init__(args="%s", kwargs="%s")' % (args, kwargs))
super().__init__(plugin_name='projector',
init_schema=self.init_schema)
super().__init__(plugin_name='projector', init_schema=self.init_schema)
log.debug('ProjectorDB() Initialized using db url %s' % self.db_url)
log.debug('Session: %s', self.session)
def init_schema(*args, **kwargs):
def init_schema(self, *args, **kwargs):
"""
Setup the projector database and initialize the schema.
Declarative uses table classes to define schema.
"""
url = init_url('projector')
session, metadata = init_db(url, base=Base)
Base.metadata.create_all(checkfirst=True)
self.db_url = init_url('projector')
session, metadata = init_db(self.db_url, base=Base)
metadata.create_all(checkfirst=True)
return session
def get_projector_by_id(self, dbid):

View File

@ -22,26 +22,3 @@
"""
Module-level functions for the functional test suite
"""
import os
from tests.functional import patch
from openlp.core.common import is_win
from .test_projectordb import tmpfile
def setUp():
if not is_win():
# Wine creates a sharing violation during tests. Ignore.
try:
os.remove(tmpfile)
except:
pass
def tearDown():
"""
Ensure test suite has been cleaned up after tests
"""
patch.stopall()

View File

@ -25,15 +25,13 @@ record functions.
PREREQUISITE: add_record() and get_all() functions validated.
"""
import os
from unittest import TestCase
from tests.functional import MagicMock, patch
from openlp.core.lib.projector.db import Projector, ProjectorDB, ProjectorSource
from tests.resources.projector.data import TEST1_DATA, TEST2_DATA, TEST3_DATA
tmpfile = '/tmp/openlp-test-projectordb.sql'
from tests.functional import MagicMock, patch
from tests.resources.projector.data import TEST_DB, TEST1_DATA, TEST2_DATA, TEST3_DATA
def compare_data(one, two):
@ -60,15 +58,15 @@ def compare_source(one, two):
one.text == two.text
def add_records(self, test):
def add_records(projector_db, test):
"""
Add record if not in database
"""
record_list = self.projector.get_projector_all()
record_list = projector_db.get_projector_all()
if len(record_list) < 1:
added = False
for record in test:
added = self.projector.add_projector(record) or added
added = projector_db.add_projector(record) or added
return added
for new_record in test:
@ -76,7 +74,7 @@ def add_records(self, test):
for record in record_list:
if compare_data(record, new_record):
break
added = self.projector.add_projector(new_record)
added = projector_db.add_projector(new_record)
return added
@ -88,15 +86,17 @@ class TestProjectorDB(TestCase):
"""
Set up anything necessary for all tests
"""
if not hasattr(self, 'projector'):
with patch('openlp.core.lib.projector.db.init_url') as mocked_init_url:
mocked_init_url.return_value = 'sqlite:///%s' % tmpfile
self.projector = ProjectorDB()
with patch('openlp.core.lib.projector.db.init_url') as mocked_init_url:
if os.path.exists(TEST_DB):
os.unlink(TEST_DB)
mocked_init_url.return_value = 'sqlite:///%s' % TEST_DB
self.projector = ProjectorDB()
def tearDown(self):
"""
Clean up
"""
self.projector.session.close()
self.projector = None
def find_record_by_ip_test(self):
@ -104,13 +104,13 @@ class TestProjectorDB(TestCase):
Test find record by IP
"""
# GIVEN: Record entries in database
add_records(self, [TEST1_DATA, TEST2_DATA])
add_records(self.projector, [Projector(**TEST1_DATA), Projector(**TEST2_DATA)])
# WHEN: Search for record using IP
record = self.projector.get_projector_by_ip(TEST2_DATA.ip)
record = self.projector.get_projector_by_ip(TEST2_DATA['ip'])
# THEN: Verify proper record returned
self.assertTrue(compare_data(TEST2_DATA, record),
self.assertTrue(compare_data(Projector(**TEST2_DATA), record),
'Record found should have been test_2 data')
def find_record_by_name_test(self):
@ -118,13 +118,13 @@ class TestProjectorDB(TestCase):
Test find record by name
"""
# GIVEN: Record entries in database
add_records(self, [TEST1_DATA, TEST2_DATA])
add_records(self.projector, [Projector(**TEST1_DATA), Projector(**TEST2_DATA)])
# WHEN: Search for record using name
record = self.projector.get_projector_by_name(TEST2_DATA.name)
record = self.projector.get_projector_by_name(TEST2_DATA['name'])
# THEN: Verify proper record returned
self.assertTrue(compare_data(TEST2_DATA, record),
self.assertTrue(compare_data(Projector(**TEST2_DATA), record),
'Record found should have been test_2 data')
def record_delete_test(self):
@ -132,14 +132,14 @@ class TestProjectorDB(TestCase):
Test record can be deleted
"""
# GIVEN: Record in database
add_records(self, [TEST3_DATA, ])
record = self.projector.get_projector_by_ip(TEST3_DATA.ip)
add_records(self.projector, [Projector(**TEST3_DATA), ])
record = self.projector.get_projector_by_ip(TEST3_DATA['ip'])
# WHEN: Record deleted
self.projector.delete_projector(record)
# THEN: Verify record not retrievable
found = self.projector.get_projector_by_ip(TEST3_DATA.ip)
found = self.projector.get_projector_by_ip(TEST3_DATA['ip'])
self.assertFalse(found, 'test_3 record should have been deleted')
def record_edit_test(self):
@ -147,34 +147,35 @@ class TestProjectorDB(TestCase):
Test edited record returns the same record ID with different data
"""
# GIVEN: Record entries in database
add_records(self, [TEST1_DATA, TEST2_DATA])
add_records(self.projector, [Projector(**TEST1_DATA), Projector(**TEST2_DATA)])
# WHEN: We retrieve a specific record
record = self.projector.get_projector_by_ip(TEST1_DATA.ip)
record = self.projector.get_projector_by_ip(TEST1_DATA['ip'])
record_id = record.id
# WHEN: Data is changed
record.ip = TEST3_DATA.ip
record.port = TEST3_DATA.port
record.pin = TEST3_DATA.pin
record.name = TEST3_DATA.name
record.location = TEST3_DATA.location
record.notes = TEST3_DATA.notes
record.ip = TEST3_DATA['ip']
record.port = TEST3_DATA['port']
record.pin = TEST3_DATA['pin']
record.name = TEST3_DATA['name']
record.location = TEST3_DATA['location']
record.notes = TEST3_DATA['notes']
updated = self.projector.update_projector(record)
self.assertTrue(updated, 'Save updated record should have returned True')
record = self.projector.get_projector_by_ip(TEST3_DATA.ip)
record = self.projector.get_projector_by_ip(TEST3_DATA['ip'])
# THEN: Record ID should remain the same, but data should be changed
self.assertEqual(record_id, record.id, 'Edited record should have the same ID')
self.assertTrue(compare_data(TEST3_DATA, record), 'Edited record should have new data')
self.assertTrue(compare_data(Projector(**TEST3_DATA), record), 'Edited record should have new data')
def source_add_test(self):
"""
Test source entry for projector item
"""
# GIVEN: Record entries in database
self.projector.add_projector(TEST1_DATA)
item = self.projector.get_projector_by_id(TEST1_DATA.id)
projector1 = Projector(**TEST1_DATA)
self.projector.add_projector(projector1)
item = self.projector.get_projector_by_id(projector1.id)
item_id = item.id
# WHEN: A source entry is saved for item
@ -184,3 +185,4 @@ class TestProjectorDB(TestCase):
# THEN: Projector should have the same source entry
item = self.projector.get_projector_by_id(item_id)
self.assertTrue(compare_source(item.source_list[0], source))

View File

@ -22,7 +22,7 @@
"""
Package to test the openlp.core.ui.slidecontroller package.
"""
from unittest import TestCase
from unittest import TestCase, skipUnless
from PyQt5 import QtCore
@ -141,13 +141,14 @@ class TestMainDisplay(TestCase, TestMixin):
mocked_songs_plugin.refresh_css.assert_called_with(main_display.frame)
mocked_bibles_plugin.refresh_css.assert_called_with(main_display.frame)
@patch('openlp.core.ui.maindisplay.is_macosx')
@skipUnless(is_macosx(), 'Can only run test on Mac OS X due to pyobjc dependency.')
def macosx_display_window_flags_state_test(self, is_macosx):
"""
Test that on Mac OS X we set the proper window flags
"""
if not is_macosx():
self.skipTest('Can only run test on Mac OS X due to pyobjc dependency.')
# GIVEN: A new SlideController instance on Mac OS X.
is_macosx.return_value = True
self.screens.set_current_display(0)
display = MagicMock()
@ -159,12 +160,11 @@ class TestMainDisplay(TestCase, TestMixin):
main_display.windowFlags(),
'The window flags should be Qt.Window, and Qt.FramelessWindowHint.')
@skipUnless(is_macosx(), 'Can only run test on Mac OS X due to pyobjc dependency.')
def macosx_display_test(self):
"""
Test display on Mac OS X
"""
if not is_macosx():
self.skipTest('Can only run test on Mac OS X due to pyobjc dependency.')
# GIVEN: A new SlideController instance on Mac OS X.
self.screens.set_current_display(0)
display = MagicMock()

View File

@ -22,35 +22,3 @@
"""
Module-level functions for the functional test suite
"""
import os
from openlp.core.common import is_win
from tests.interfaces import patch
from .test_projectormanager import tmpfile
def setUp():
"""
Set up this module of tests
"""
if not is_win():
# Wine creates a sharing violation during tests. Ignore.
try:
os.remove(tmpfile)
except:
pass
def tearDown():
"""
Ensure test suite has been cleaned up after tests
"""
patch.stopall()
if not is_win():
try:
# In case of changed schema, remove old test file
os.remove(tmpfile)
except FileNotFoundError:
pass

View File

@ -23,7 +23,7 @@
Interface tests to test the openlp.core.ui.projector.editform.ProjectorEditForm()
class and methods.
"""
import os
from unittest import TestCase
from openlp.core.common import Registry, Settings
@ -32,7 +32,7 @@ from openlp.core.ui import ProjectorEditForm
from tests.functional import patch
from tests.helpers.testmixin import TestMixin
from tests.resources.projector.data import TEST1_DATA, TEST2_DATA
from tests.resources.projector.data import TEST_DB, TEST1_DATA, TEST2_DATA
class TestProjectorEditForm(TestCase, TestMixin):
@ -49,7 +49,9 @@ class TestProjectorEditForm(TestCase, TestMixin):
self.setup_application()
Registry.create()
with patch('openlp.core.lib.projector.db.init_url') as mocked_init_url:
mocked_init_url.return_value = 'sqlite://'
if os.path.exists(TEST_DB):
os.unlink(TEST_DB)
mocked_init_url.return_value = 'sqlite:///' + TEST_DB
self.projectordb = ProjectorDB()
self.projector_form = ProjectorEditForm(projectordb=self.projectordb)
@ -93,11 +95,13 @@ class TestProjectorEditForm(TestCase, TestMixin):
with patch('openlp.core.ui.projector.editform.QDialog.exec'):
# WHEN: Calling edit form with existing projector instance
self.projector_form.exec(projector=TEST1_DATA)
self.projector_form.exec(projector=Projector(**TEST1_DATA))
item = self.projector_form.projector
# THEN: Should be editing an existing entry
self.assertFalse(self.projector_form.new_projector,
'Projector edit form should be marked as existing entry')
self.assertTrue((item.ip is TEST1_DATA.ip and item.name is TEST1_DATA.name),
self.assertTrue((item.ip is TEST1_DATA['ip'] and item.name is TEST1_DATA['name']),
'Projector edit form should have TEST1_DATA() instance to edit')

View File

@ -22,7 +22,6 @@
"""
Interface tests to test the themeManager class and related methods.
"""
import os
from unittest import TestCase
@ -33,9 +32,7 @@ from tests.helpers.testmixin import TestMixin
from openlp.core.ui import ProjectorManager, ProjectorEditForm
from openlp.core.lib.projector.db import Projector, ProjectorDB
from tests.resources.projector.data import TEST1_DATA, TEST2_DATA, TEST3_DATA
tmpfile = '/tmp/openlp-test-projectormanager.sql'
from tests.resources.projector.data import TEST_DB, TEST1_DATA, TEST2_DATA, TEST3_DATA
class TestProjectorManager(TestCase, TestMixin):
@ -49,12 +46,13 @@ class TestProjectorManager(TestCase, TestMixin):
self.build_settings()
self.setup_application()
Registry.create()
if not hasattr(self, 'projector_manager'):
with patch('openlp.core.lib.projector.db.init_url') as mocked_init_url:
mocked_init_url.return_value = 'sqlite:///%s' % tmpfile
self.projectordb = ProjectorDB()
if not hasattr(self, 'projector_manager'):
self.projector_manager = ProjectorManager(projectordb=self.projectordb)
with patch('openlp.core.lib.projector.db.init_url') as mocked_init_url:
if os.path.exists(TEST_DB):
os.unlink(TEST_DB)
mocked_init_url.return_value = 'sqlite:///%s' % TEST_DB
self.projectordb = ProjectorDB()
if not hasattr(self, 'projector_manager'):
self.projector_manager = ProjectorManager(projectordb=self.projectordb)
def tearDown(self):
"""

View File

@ -20,24 +20,24 @@
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
:mod: `tests.interfaces.openlp_core_ui.test_projectorsourceform` module
:mod: `tests.interfaces.openlp_core_ui.test_projectorsourceform` module
Tests for the Projector Source Select form.
Tests for the Projector Source Select form.
"""
import logging
log = logging.getLogger(__name__)
log.debug('test_projectorsourceform loaded')
import os
from unittest import TestCase
from PyQt5.QtWidgets import QDialog
from tests.functional import patch
from tests.functional.openlp_core_lib.test_projectordb import tmpfile
from tests.helpers.testmixin import TestMixin
from tests.resources.projector.data import TEST_DB, TEST1_DATA
from openlp.core.common import Registry, Settings
from openlp.core.lib.projector.db import ProjectorDB
from openlp.core.lib.projector.db import ProjectorDB, Projector
from openlp.core.lib.projector.constants import PJLINK_DEFAULT_CODES, PJLINK_DEFAULT_SOURCES
from openlp.core.ui.projector.sourceselectform import source_group, SourceSelectSingle
@ -65,7 +65,9 @@ class ProjectorSourceFormTest(TestCase, TestMixin):
"""
Set up anything necessary for all tests
"""
mocked_init_url.return_value = 'sqlite:///{}'.format(tmpfile)
if os.path.exists(TEST_DB):
os.unlink(TEST_DB)
mocked_init_url.return_value = 'sqlite:///{}'.format(TEST_DB)
self.build_settings()
self.setup_application()
Registry.create()
@ -73,10 +75,10 @@ class ProjectorSourceFormTest(TestCase, TestMixin):
if not hasattr(self, 'projectordb'):
self.projectordb = ProjectorDB()
# Retrieve/create a database record
self.projector = self.projectordb.get_projector_by_ip(TEST1_DATA.ip)
self.projector = self.projectordb.get_projector_by_ip(TEST1_DATA['ip'])
if not self.projector:
self.projectordb.add_projector(projector=TEST1_DATA)
self.projector = self.projectordb.get_projector_by_ip(TEST1_DATA.ip)
self.projectordb.add_projector(projector=Projector(**TEST1_DATA))
self.projector = self.projectordb.get_projector_by_ip(TEST1_DATA['ip'])
self.projector.dbid = self.projector.id
self.projector.db_item = self.projector

View File

@ -130,7 +130,7 @@ class TestBibleHTTP(TestCase):
# THEN: The list should not be None, and some known bibles should be there
self.assertIsNotNone(bibles)
self.assertIn(('New Int. Readers Version', 'NIRV', 'en'), bibles)
self.assertIn(('Българската Библия', 'BLG', 'bg'), bibles)
self.assertIn(('Священное Писание, Восточный перевод', 'CARS', 'ru'), bibles)
def biblegateway_get_bibles_test(self):
"""

View File

@ -24,28 +24,28 @@ The :mod:`tests.resources.projector.data file contains test data
"""
import os
from openlp.core.lib.projector.db import Projector
from tempfile import gettempdir
# Test data
TEST_DB = os.path.join('tmp', 'openlp-test-projectordb.sql')
TEST_DB = os.path.join(gettempdir(), 'openlp-test-projectordb.sql')
TEST1_DATA = Projector(ip='111.111.111.111',
port='1111',
pin='1111',
name='___TEST_ONE___',
location='location one',
notes='notes one')
TEST1_DATA = dict(ip='111.111.111.111',
port='1111',
pin='1111',
name='___TEST_ONE___',
location='location one',
notes='notes one')
TEST2_DATA = Projector(ip='222.222.222.222',
port='2222',
pin='2222',
name='___TEST_TWO___',
location='location two',
notes='notes two')
TEST2_DATA = dict(ip='222.222.222.222',
port='2222',
pin='2222',
name='___TEST_TWO___',
location='location two',
notes='notes two')
TEST3_DATA = Projector(ip='333.333.333.333',
port='3333',
pin='3333',
name='___TEST_THREE___',
location='location three',
notes='notes three')
TEST3_DATA = dict(ip='333.333.333.333',
port='3333',
pin='3333',
name='___TEST_THREE___',
location='location three',
notes='notes three')