Migrate to 2 style queries

This commit is contained in:
Raoul Snyman 2023-05-19 13:55:38 +00:00
parent f40ffd377f
commit 78b2de638a
46 changed files with 1201 additions and 1201 deletions

View File

@ -1,71 +0,0 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
The :mod:`db` module provides helper functions for database related methods.
"""
import logging
from copy import deepcopy
import sqlalchemy
log = logging.getLogger(__name__)
def drop_column(op, tablename, columnname):
drop_columns(op, tablename, [columnname])
def drop_columns(op, tablename, columns):
"""
Column dropping functionality for SQLite, as there is no DROP COLUMN support in SQLite
From https://github.com/klugjohannes/alembic-sqlite
"""
# get the db engine and reflect database tables
engine = op.get_bind()
meta = sqlalchemy.MetaData(bind=engine)
meta.reflect()
# create a select statement from the old table
old_table = meta.tables[tablename]
select = sqlalchemy.sql.select([c for c in old_table.c if c.name not in columns])
# get remaining columns without table attribute attached
remaining_columns = [deepcopy(c) for c in old_table.columns if c.name not in columns]
for column in remaining_columns:
column.table = None
# create a temporary new table
new_tablename = '{0}_new'.format(tablename)
op.create_table(new_tablename, *remaining_columns)
meta.reflect()
new_table = meta.tables[new_tablename]
# copy data from old table
insert = sqlalchemy.sql.insert(new_table).from_select([c.name for c in remaining_columns], select)
engine.execute(insert)
# drop the old table and rename the new table to take the old tables
# position
op.drop_table(tablename)
op.rename_table(new_tablename, tablename)

View File

@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
The :mod:`~openlp.core.db` module provides the core database functionality for OpenLP
"""

226
openlp/core/db/helpers.py Normal file
View File

@ -0,0 +1,226 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
The :mod:`~openlp.core.db.helpers` module provides database helper functions for OpenLP
"""
import logging
import os
from copy import copy
from pathlib import Path
from typing import Optional, Tuple, Union
from urllib.parse import quote_plus as urlquote
from sqlalchemy import MetaData, create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.engine.url import URL, make_url
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.orm.scoping import ScopedSession
from sqlalchemy.orm.decl_api import DeclarativeMeta
from sqlalchemy.pool import StaticPool
from openlp.core.common import delete_file
from openlp.core.common.applocation import AppLocation
from openlp.core.common.i18n import translate
from openlp.core.common.registry import Registry
from openlp.core.lib.ui import critical_error_message_box
log = logging.getLogger(__name__)
def _set_url_database(url, database: str) -> URL:
new_url = URL.create(
drivername=url.drivername,
username=url.username,
password=url.password,
host=url.host,
port=url.port,
database=database,
query=url.query
)
assert new_url.database == database, new_url
return new_url
def _get_scalar_result(engine: Engine, sql: str):
with engine.connect() as conn:
return conn.scalar(sql)
def _sqlite_file_exists(database: Union[Path, str]) -> bool:
database = Path(database)
if not database.is_file() or database.stat().st_size < 100:
return False
with database.open('rb') as f:
header = f.read(100)
return header[:16] == b'SQLite format 3\x00'
def get_db_path(plugin_name: str, db_file_name: Union[Path, str, None] = None) -> str:
"""
Create a path to a database from the plugin name and database name
:param plugin_name: Name of plugin
:param pathlib.Path | str | None db_file_name: File name of database
:return: The path to the database
:rtype: str
"""
if db_file_name is None:
return 'sqlite:///{path}/{plugin}.sqlite'.format(path=AppLocation.get_section_data_path(plugin_name),
plugin=plugin_name)
elif os.path.isabs(db_file_name):
return 'sqlite:///{db_file_name}'.format(db_file_name=db_file_name)
else:
return 'sqlite:///{path}/{name}'.format(path=AppLocation.get_section_data_path(plugin_name), name=db_file_name)
def handle_db_error(plugin_name: str, db_file_path: Union[Path, str]):
"""
Log and report to the user that a database cannot be loaded
:param plugin_name: Name of plugin
:param pathlib.Path db_file_path: File name of database
:return: None
"""
db_path = get_db_path(plugin_name, db_file_path)
log.exception('Error loading database: {db}'.format(db=db_path))
critical_error_message_box(translate('OpenLP.Manager', 'Database Error'),
translate('OpenLP.Manager',
'OpenLP cannot load your database.\n\nDatabase: {db}').format(db=db_path))
def database_exists(url: str) -> bool:
"""Check if a database exists.
:param url: A SQLAlchemy engine URL.
Performs backend-specific testing to quickly determine if a database
exists on the server. ::
database_exists('postgresql://postgres@localhost/name') #=> False
create_database('postgresql://postgres@localhost/name')
database_exists('postgresql://postgres@localhost/name') #=> True
Supports checking against a constructed URL as well. ::
engine = create_engine('postgresql://postgres@localhost/name')
database_exists(engine.url) #=> False
create_database(engine.url)
database_exists(engine.url) #=> True
Borrowed from SQLAlchemy_Utils since we only need this one function.
Copied from a fork/pull request since SQLAlchemy_Utils didn't supprt SQLAlchemy 1.4 when it was released:
https://github.com/nsoranzo/sqlalchemy-utils/blob/4f52578/sqlalchemy_utils/functions/database.py
"""
url = copy(make_url(url))
database = url.database
dialect_name = url.get_dialect().name
if dialect_name == 'postgresql':
text = "SELECT 1 FROM pg_database WHERE datname='%s'" % database
for db in (database, 'postgres', 'template1', 'template0', None):
url = _set_url_database(url, database=db)
engine = create_engine(url, poolclass=StaticPool)
try:
return bool(_get_scalar_result(engine, text))
except (ProgrammingError, OperationalError):
pass
return False
elif dialect_name == 'mysql':
url = _set_url_database(url, database=None)
engine = create_engine(url, poolclass=StaticPool)
text = ("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA "
"WHERE SCHEMA_NAME = '%s'" % database)
return bool(_get_scalar_result(engine, text))
elif dialect_name == 'sqlite':
url = _set_url_database(url, database=None)
engine = create_engine(url, poolclass=StaticPool)
if database:
return database == ':memory:' or _sqlite_file_exists(database)
else:
# The default SQLAlchemy database is in memory,
# and :memory is not required, thus we should support that use-case
return True
else:
text = 'SELECT 1'
try:
engine = create_engine(url, poolclass=StaticPool)
return bool(_get_scalar_result(engine, text))
except (ProgrammingError, OperationalError):
return False
def init_db(url: str, auto_flush: bool = True, auto_commit: bool = False,
base: Optional[DeclarativeMeta] = None) -> Tuple[ScopedSession, MetaData]:
"""
Initialise and return the session and metadata for a database
:param url: The database to initialise connection with
:param auto_flush: Sets the flushing behaviour of the session
:param auto_commit: Sets the commit behaviour of the session
:param base: If using declarative, the base class to bind with
"""
engine = create_engine(url, poolclass=StaticPool)
if base is None:
metadata = MetaData(bind=engine)
else:
base.metadata.bind = engine
metadata = base.metadata
session = scoped_session(sessionmaker(autoflush=auto_flush, autocommit=auto_commit, bind=engine))
return session, metadata
def init_url(plugin_name: str, db_file_name: Union[Path, str, None] = None) -> str:
"""
Construct the connection string for a database.
:param plugin_name: The name of the plugin for the database creation.
:param pathlib.Path | str | None db_file_name: The database file name. Defaults to None resulting in the plugin_name
being used.
:return: The database URL
:rtype: str
"""
settings = Registry().get('settings')
db_type = settings.value(f'{plugin_name}/db type')
if db_type == 'sqlite':
db_url = get_db_path(plugin_name, db_file_name)
else:
db_url = '{type}://{user}:{password}@{host}/{db}'.format(type=db_type,
user=urlquote(settings.value('db username')),
password=urlquote(settings.value('db password')),
host=urlquote(settings.value('db hostname')),
db=urlquote(settings.value('db database')))
return db_url
def delete_database(plugin_name: str, db_file_name: Union[Path, str, None] = None) -> bool:
"""
Remove a database file from the system.
:param plugin_name: The name of the plugin to remove the database for
:param db_file_name: The database file name. Defaults to None resulting in the plugin_name being used.
"""
db_file_path = AppLocation.get_section_data_path(plugin_name)
if db_file_name:
db_file_path = db_file_path / db_file_name
else:
db_file_path = db_file_path / plugin_name
return delete_file(db_file_path)

325
openlp/core/db/manager.py Normal file
View File

@ -0,0 +1,325 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
The :mod:`~openlp.core.db.manager` module provides the database manager for the plugins
"""
import logging
from pathlib import Path
from types import FunctionType, ModuleType
from typing import List, Optional, Type, Union
from sqlalchemy import create_engine, func
from sqlalchemy.exc import DBAPIError, InvalidRequestError, OperationalError, SQLAlchemyError
from sqlalchemy.orm import Session
from sqlalchemy.orm.decl_api import DeclarativeMeta
from sqlalchemy.sql.expression import select, delete
from openlp.core.common.i18n import translate
from openlp.core.db.helpers import handle_db_error, init_url
from openlp.core.db.upgrades import upgrade_db
from openlp.core.lib.ui import critical_error_message_box
log = logging.getLogger(__name__)
class DBManager(object):
"""
Provide generic object persistence management
"""
def __init__(self, plugin_name: str, init_schema: FunctionType,
db_file_path: Union[Path, str, None] = None,
upgrade_mod: Optional[ModuleType] = None, session: Optional[Session] = None):
"""
Runs the initialisation process that includes creating the connection to the database and the tables if they do
not exist.
:param plugin_name: The name to setup paths and settings section names
:param init_schema: The init_schema function for this database
:param pathlib.Path | None db_file_path: The file name to use for this database. Defaults to None resulting in
the plugin_name being used.
:param upgrade_mod: The upgrade_schema function for this database
"""
super().__init__()
self.is_dirty = False
self.session = None
self.db_url = None
log.debug('Manager: Creating new DB url')
self.db_url = init_url(plugin_name, db_file_path)
if not session:
try:
self.session = init_schema(self.db_url)
except (SQLAlchemyError, DBAPIError):
handle_db_error(plugin_name, db_file_path)
else:
self.session = session
if upgrade_mod:
try:
db_ver, up_ver = upgrade_db(self.db_url, upgrade_mod)
except (SQLAlchemyError, DBAPIError):
handle_db_error(plugin_name, db_file_path)
return
if db_ver > up_ver:
critical_error_message_box(
translate('OpenLP.Manager', 'Database Error'),
translate('OpenLP.Manager', 'The database being loaded was created in a more recent version of '
'OpenLP. The database is version {db_ver}, while OpenLP expects version {db_up}. '
'The database will not be loaded.\n\nDatabase: {db_name}').format(db_ver=db_ver,
db_up=up_ver,
db_name=self.db_url))
return
def save_object(self, object_instance: DeclarativeMeta, commit: bool = True):
"""
Save an object to the database
:param object_instance: The object to save
:param commit: Commit the session with this object
"""
for try_count in range(3):
try:
self.session.add(object_instance)
if commit:
self.session.commit()
self.is_dirty = True
return True
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue - "MySQL has gone away"')
self.session.rollback()
if try_count >= 2:
raise
except InvalidRequestError:
self.session.rollback()
log.exception('Object list save failed')
return False
except Exception:
self.session.rollback()
raise
def save_objects(self, object_list: List[DeclarativeMeta], commit: bool = True):
"""
Save a list of objects to the database
:param object_list: The list of objects to save
:param commit: Commit the session with this object
"""
for try_count in range(3):
try:
self.session.add_all(object_list)
if commit:
self.session.commit()
self.is_dirty = True
return True
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
self.session.rollback()
if try_count >= 2:
raise
except InvalidRequestError:
self.session.rollback()
log.exception('Object list save failed')
return False
except Exception:
self.session.rollback()
raise
def get_object(self, object_class: Type[DeclarativeMeta], key: Union[str, int] = None) -> DeclarativeMeta:
"""
Return the details of an object
:param object_class: The type of object to return
:param key: The unique reference or primary key for the instance to return
"""
if not key:
return object_class()
else:
for try_count in range(3):
try:
return self.session.get(object_class, key)
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
if try_count >= 2:
raise
def get_object_filtered(self, object_class: Type[DeclarativeMeta], *filter_clauses) -> DeclarativeMeta:
"""
Returns an object matching specified criteria
:param object_class: The type of object to return
:param filter_clause: The criteria to select the object by
"""
query = select(object_class)
for filter_clause in filter_clauses:
query = query.where(filter_clause)
for try_count in range(3):
try:
return self.session.execute(query).scalar()
except OperationalError as oe:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
if try_count >= 2 or 'MySQL has gone away' in str(oe):
raise
log.exception('Probably a MySQL issue, "MySQL has gone away"')
def get_all_objects(self, object_class: Type[DeclarativeMeta], filter_clause=None, order_by_ref=None):
"""
Returns all the objects from the database
:param object_class: The type of objects to return
:param filter_clause: The filter governing selection of objects to return. Defaults to None.
:param order_by_ref: Any parameters to order the returned objects by. Defaults to None.
"""
query = select(object_class)
# Check filter_clause
if filter_clause is not None:
if isinstance(filter_clause, list):
for dbfilter in filter_clause:
query = query.where(dbfilter)
else:
query = query.where(filter_clause)
# Check order_by_ref
if order_by_ref is not None:
if isinstance(order_by_ref, list):
query = query.order_by(*order_by_ref)
else:
query = query.order_by(order_by_ref)
for try_count in range(3):
try:
return self.session.execute(query).scalars().all()
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
if try_count >= 2:
raise
def get_object_count(self, object_class: Type[DeclarativeMeta], filter_clause=None):
"""
Returns a count of the number of objects in the database.
:param object_class: The type of objects to return.
:param filter_clause: The filter governing selection of objects to return. Defaults to None.
"""
query = select(object_class)
if filter_clause is not None:
query = query.where(filter_clause)
for try_count in range(3):
try:
return self.session.execute(query.with_only_columns([func.count()])).scalar()
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
if try_count >= 2:
raise
def delete_object(self, object_class: Type[DeclarativeMeta], key: Union[int, str]):
"""
Delete an object from the database
:param object_class: The type of object to delete
:param key: The unique reference or primary key for the instance to be deleted
"""
if key != 0:
object_instance = self.get_object(object_class, key)
for try_count in range(3):
try:
self.session.delete(object_instance)
self.session.commit()
self.is_dirty = True
return True
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
self.session.rollback()
if try_count >= 2:
raise
except InvalidRequestError:
self.session.rollback()
log.exception('Failed to delete object')
return False
except Exception:
self.session.rollback()
raise
else:
return True
def delete_all_objects(self, object_class, filter_clause=None):
"""
Delete all object records. This method should only be used for simple tables and **not** ones with
relationships. The relationships are not deleted from the database and this will lead to database corruptions.
:param object_class: The type of object to delete
:param filter_clause: The filter governing selection of objects to return. Defaults to None.
"""
for try_count in range(3):
try:
query = delete(object_class)
if filter_clause is not None:
query = query.where(filter_clause)
self.session.execute(query.execution_options(synchronize_session=False))
self.session.commit()
self.is_dirty = True
return True
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
self.session.rollback()
if try_count >= 2:
raise
except InvalidRequestError:
self.session.rollback()
log.exception('Failed to delete {name} records'.format(name=object_class.__name__))
return False
except Exception:
self.session.rollback()
raise
def finalise(self):
"""
VACUUM the database on exit.
"""
if self.is_dirty:
engine = create_engine(self.db_url)
if self.db_url.startswith('sqlite'):
try:
engine.execute("vacuum")
except OperationalError:
# Just ignore the operational error
pass

70
openlp/core/db/mixins.py Normal file
View File

@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
The :mod:`~openlp.core.db.mixins` module provides some database mixins for OpenLP
"""
from sqlalchemy import Column, ForeignKey
from sqlalchemy.types import Integer, Unicode
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import backref, relationship
class CommonMixin(object):
"""
Base class to automate table name and ID column.
"""
@declared_attr
def __tablename__(self):
return self.__name__.lower()
id = Column(Integer, primary_key=True)
class FolderMixin(CommonMixin):
"""
A mixin to provide most of the fields needed for folder support
"""
name = Column(Unicode(255), nullable=False, index=True)
@declared_attr
def parent_id(self):
return Column(Integer, ForeignKey('folder.id'))
@declared_attr
def folders(self):
return relationship('Folder', backref=backref('parent', remote_side='Folder.id'), order_by='Folder.name')
@declared_attr
def items(self):
return relationship('Item', backref='folder', order_by='Item.name')
class ItemMixin(CommonMixin):
"""
A mixin to provide most of the fields needed for folder support
"""
name = Column(Unicode(255), nullable=False, index=True)
file_path = Column(Unicode(255))
file_hash = Column(Unicode(255))
@declared_attr
def folder_id(self):
return Column(Integer, ForeignKey('folder.id'))

77
openlp/core/db/types.py Normal file
View File

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
The :mod:`~openlp.core.db.types` module provides additional database column types
"""
import json
from pathlib import Path
from sqlalchemy.types import TypeDecorator, Unicode, UnicodeText
from openlp.core.common.applocation import AppLocation
from openlp.core.common.json import OpenLPJSONDecoder, OpenLPJSONEncoder
class PathType(TypeDecorator):
"""
Create a PathType for storing Path objects with SQLAlchemy. Behind the scenes we convert the Path object to a JSON
representation and store it as a Unicode type
"""
impl = Unicode
cache_ok = True
def coerce_compared_value(self, op, value):
"""
Some times it make sense to compare a PathType with a string. In the case a string is used coerce the
PathType to a UnicodeText type.
:param op: The operation being carried out. Not used, as we only care about the type that is being used with the
operation.
:param pathlib.Path | str value: The value being used for the comparison. Most likely a Path Object or str.
:return PathType | UnicodeText: The coerced value stored in the db
"""
if isinstance(value, str):
return UnicodeText()
else:
return self
def process_bind_param(self, value: Path, dialect) -> str:
"""
Convert the Path object to a JSON representation
:param pathlib.Path value: The value to convert
:param dialect: Not used
:return str: The Path object as a JSON string
"""
data_path = AppLocation.get_data_path()
return json.dumps(value, cls=OpenLPJSONEncoder, base_path=data_path)
def process_result_value(self, value: str, dialect) -> Path:
"""
Convert the JSON representation back
:param types.UnicodeText value: The value to convert
:param dialect: Not used
:return: The JSON object converted Python object (in this case it should be a Path object)
:rtype: pathlib.Path
"""
data_path = AppLocation.get_data_path()
return json.loads(value, cls=OpenLPJSONDecoder, base_path=data_path)

113
openlp/core/db/upgrades.py Normal file
View File

@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
The :mod:`~openlp.core.db.upgrades` module contains the database upgrade functionality
"""
import logging
from types import ModuleType
from typing import Tuple
from alembic.migration import MigrationContext
from alembic.operations import Operations
from sqlalchemy import Column
from sqlalchemy.exc import DBAPIError, SQLAlchemyError
from sqlalchemy.orm import Session, declarative_base
from sqlalchemy.types import Unicode, UnicodeText
from openlp.core.db.helpers import database_exists, init_db
log = logging.getLogger(__name__)
def get_upgrade_op(session: Session) -> Operations:
"""
Create a migration context and an operations object for performing upgrades.
:param session: The SQLAlchemy session object.
"""
context = MigrationContext.configure(session.bind.connect())
return Operations(context)
def upgrade_db(url: str, upgrade: ModuleType) -> Tuple[int, int]:
"""
Upgrade a database.
:param url: The url of the database to upgrade.
:param upgrade: The python module that contains the upgrade instructions.
"""
log.debug('Checking upgrades for DB {db}'.format(db=url))
if not database_exists(url):
log.warning("Database {db} doesn't exist - skipping upgrade checks".format(db=url))
return 0, 0
Base = declarative_base()
class Metadata(Base):
"""
Provides a class for the metadata table.
"""
__tablename__ = 'metadata'
key = Column(Unicode(64), primary_key=True)
value = Column(UnicodeText(), default=None)
session, metadata = init_db(url, base=Base)
metadata.create_all(bind=metadata.bind, checkfirst=True)
version_meta = session.get(Metadata, 'version')
if version_meta:
version = int(version_meta.value)
else:
# Due to issues with other checks, if the version is not set in the DB then default to 0
# and let the upgrade function handle the checks
version = 0
version_meta = Metadata(key='version', value=version)
session.add(version_meta)
session.commit()
if version > upgrade.__version__:
session.remove()
return version, upgrade.__version__
version += 1
try:
while hasattr(upgrade, 'upgrade_{version:d}'.format(version=version)):
log.debug('Running upgrade_{version:d}'.format(version=version))
try:
upgrade_func = getattr(upgrade, 'upgrade_{version:d}'.format(version=version))
upgrade_func(session, metadata)
session.commit()
# Update the version number AFTER a commit so that we are sure the previous transaction happened
version_meta.value = str(version)
session.add(version_meta)
session.commit()
version += 1
except (SQLAlchemyError, DBAPIError):
log.exception('Could not run database upgrade script '
'"upgrade_{version:d}", upgrade process has been halted.'.format(version=version))
break
except (SQLAlchemyError, DBAPIError) as e:
version_meta = Metadata(key='version', value=int(upgrade.__version__))
session.commit()
print('Got exception outside upgrades', e)
upgrade_version = upgrade.__version__
version = int(version_meta.value)
session.remove()
return version, upgrade_version

View File

@ -1,701 +0,0 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
The :mod:`db` module provides the core database functionality for OpenLP
"""
import json
import logging
import os
from copy import copy
from pathlib import Path
from types import ModuleType
from typing import Optional, Tuple, Union
from urllib.parse import quote_plus as urlquote
from alembic.migration import MigrationContext
from alembic.operations import Operations
from sqlalchemy import Column, ForeignKey, MetaData, create_engine
from sqlalchemy.engine.url import URL, make_url
from sqlalchemy.exc import DBAPIError, InvalidRequestError, OperationalError, ProgrammingError, SQLAlchemyError
from sqlalchemy.orm import Session, backref, relationship, scoped_session, sessionmaker
from sqlalchemy.pool import NullPool
from sqlalchemy.types import Integer, TypeDecorator, Unicode, UnicodeText
# Maintain backwards compatibility with older versions of SQLAlchemy while supporting SQLAlchemy 1.4+
try:
from sqlalchemy.orm import declarative_base, declared_attr
from sqlalchemy.orm.decl_api import DeclarativeMeta
except ImportError:
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.ext.declarative.api import DeclarativeMeta
from openlp.core.common import delete_file
from openlp.core.common.applocation import AppLocation
from openlp.core.common.i18n import translate
from openlp.core.common.json import OpenLPJSONDecoder, OpenLPJSONEncoder
from openlp.core.common.registry import Registry
from openlp.core.lib.ui import critical_error_message_box
log = logging.getLogger(__name__)
def _set_url_database(url, database):
try:
ret = URL.create(
drivername=url.drivername,
username=url.username,
password=url.password,
host=url.host,
port=url.port,
database=database,
query=url.query
)
except AttributeError: # SQLAlchemy <1.4
url.database = database
ret = url
assert ret.database == database, ret
return ret
def _get_scalar_result(engine, sql):
with engine.connect() as conn:
return conn.scalar(sql)
def _sqlite_file_exists(database: str) -> bool:
if not os.path.isfile(database) or os.path.getsize(database) < 100:
return False
with open(database, 'rb') as f:
header = f.read(100)
return header[:16] == b'SQLite format 3\x00'
def database_exists(url):
"""Check if a database exists.
:param url: A SQLAlchemy engine URL.
Performs backend-specific testing to quickly determine if a database
exists on the server. ::
database_exists('postgresql://postgres@localhost/name') #=> False
create_database('postgresql://postgres@localhost/name')
database_exists('postgresql://postgres@localhost/name') #=> True
Supports checking against a constructed URL as well. ::
engine = create_engine('postgresql://postgres@localhost/name')
database_exists(engine.url) #=> False
create_database(engine.url)
database_exists(engine.url) #=> True
Borrowed from SQLAlchemy_Utils since we only need this one function.
Copied from a fork/pull request since SQLAlchemy_Utils didn't supprt SQLAlchemy 1.4 when it was released:
https://github.com/nsoranzo/sqlalchemy-utils/blob/4f52578/sqlalchemy_utils/functions/database.py
"""
url = copy(make_url(url))
database = url.database
dialect_name = url.get_dialect().name
if dialect_name == 'postgresql':
text = "SELECT 1 FROM pg_database WHERE datname='%s'" % database
for db in (database, 'postgres', 'template1', 'template0', None):
url = _set_url_database(url, database=db)
engine = create_engine(url, poolclass=NullPool)
try:
return bool(_get_scalar_result(engine, text))
except (ProgrammingError, OperationalError):
pass
return False
elif dialect_name == 'mysql':
url = _set_url_database(url, database=None)
engine = create_engine(url, poolclass=NullPool)
text = ("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA "
"WHERE SCHEMA_NAME = '%s'" % database)
return bool(_get_scalar_result(engine, text))
elif dialect_name == 'sqlite':
url = _set_url_database(url, database=None)
engine = create_engine(url, poolclass=NullPool)
if database:
return database == ':memory:' or _sqlite_file_exists(database)
else:
# The default SQLAlchemy database is in memory,
# and :memory is not required, thus we should support that use-case
return True
else:
text = 'SELECT 1'
try:
engine = create_engine(url, poolclass=NullPool)
return bool(_get_scalar_result(engine, text))
except (ProgrammingError, OperationalError):
return False
def init_db(url: str, auto_flush: bool = True, auto_commit: bool = False, base: Optional[DeclarativeMeta] = None) \
-> Tuple[Session, MetaData]:
"""
Initialise and return the session and metadata for a database
:param url: The database to initialise connection with
:param auto_flush: Sets the flushing behaviour of the session
:param auto_commit: Sets the commit behaviour of the session
:param base: If using declarative, the base class to bind with
"""
engine = create_engine(url, poolclass=NullPool)
if base is None:
metadata = MetaData(bind=engine)
else:
base.metadata.bind = engine
metadata = base.metadata
session = scoped_session(sessionmaker(autoflush=auto_flush, autocommit=auto_commit, bind=engine))
return session, metadata
def get_db_path(plugin_name: str, db_file_name: Union[Path, str, None] = None) -> str:
"""
Create a path to a database from the plugin name and database name
:param plugin_name: Name of plugin
:param pathlib.Path | str | None db_file_name: File name of database
:return: The path to the database
:rtype: str
"""
if db_file_name is None:
return 'sqlite:///{path}/{plugin}.sqlite'.format(path=AppLocation.get_section_data_path(plugin_name),
plugin=plugin_name)
elif os.path.isabs(db_file_name):
return 'sqlite:///{db_file_name}'.format(db_file_name=db_file_name)
else:
return 'sqlite:///{path}/{name}'.format(path=AppLocation.get_section_data_path(plugin_name), name=db_file_name)
def handle_db_error(plugin_name: str, db_file_path: Path):
"""
Log and report to the user that a database cannot be loaded
:param plugin_name: Name of plugin
:param pathlib.Path db_file_path: File name of database
:return: None
"""
db_path = get_db_path(plugin_name, db_file_path)
log.exception('Error loading database: {db}'.format(db=db_path))
critical_error_message_box(translate('OpenLP.Manager', 'Database Error'),
translate('OpenLP.Manager',
'OpenLP cannot load your database.\n\nDatabase: {db}').format(db=db_path))
def init_url(plugin_name: str, db_file_name: Union[Path, str, None] = None) -> str:
"""
Construct the connection string for a database.
:param plugin_name: The name of the plugin for the database creation.
:param pathlib.Path | str | None db_file_name: The database file name. Defaults to None resulting in the plugin_name
being used.
:return: The database URL
:rtype: str
"""
settings = Registry().get('settings')
db_type = settings.value(f'{plugin_name}/db type')
if db_type == 'sqlite':
db_url = get_db_path(plugin_name, db_file_name)
else:
db_url = '{type}://{user}:{password}@{host}/{db}'.format(type=db_type,
user=urlquote(settings.value('db username')),
password=urlquote(settings.value('db password')),
host=urlquote(settings.value('db hostname')),
db=urlquote(settings.value('db database')))
return db_url
def get_upgrade_op(session: Session) -> Operations:
"""
Create a migration context and an operations object for performing upgrades.
:param session: The SQLAlchemy session object.
"""
context = MigrationContext.configure(session.bind.connect())
return Operations(context)
class CommonMixin(object):
"""
Base class to automate table name and ID column.
"""
@declared_attr
def __tablename__(self):
return self.__name__.lower()
id = Column(Integer, primary_key=True)
class FolderMixin(CommonMixin):
"""
A mixin to provide most of the fields needed for folder support
"""
name = Column(Unicode(255), nullable=False, index=True)
@declared_attr
def parent_id(self):
return Column(Integer(), ForeignKey('folder.id'))
@declared_attr
def folders(self):
return relationship('Folder', backref=backref('parent', remote_side='Folder.id'), order_by='Folder.name')
@declared_attr
def items(self):
return relationship('Item', backref='folder', order_by='Item.name')
class ItemMixin(CommonMixin):
"""
A mixin to provide most of the fields needed for folder support
"""
name = Column(Unicode(255), nullable=False, index=True)
file_path = Column(Unicode(255))
file_hash = Column(Unicode(255))
@declared_attr
def folder_id(self):
return Column(Integer(), ForeignKey('folder.id'))
class BaseModel(object):
"""
BaseModel provides a base object with a set of generic functions
"""
@classmethod
def populate(cls, **kwargs):
"""
Creates an instance of a class and populates it, returning the instance
"""
instance = cls()
for key, value in kwargs.items():
instance.__setattr__(key, value)
return instance
class PathType(TypeDecorator):
"""
Create a PathType for storing Path objects with SQLAlchemy. Behind the scenes we convert the Path object to a JSON
representation and store it as a Unicode type
"""
impl = Unicode
cache_ok = True
def coerce_compared_value(self, op, value):
"""
Some times it make sense to compare a PathType with a string. In the case a string is used coerce the
PathType to a UnicodeText type.
:param op: The operation being carried out. Not used, as we only care about the type that is being used with the
operation.
:param pathlib.Path | str value: The value being used for the comparison. Most likely a Path Object or str.
:return PathType | UnicodeText: The coerced value stored in the db
"""
if isinstance(value, str):
return UnicodeText()
else:
return self
def process_bind_param(self, value, dialect):
"""
Convert the Path object to a JSON representation
:param pathlib.Path value: The value to convert
:param dialect: Not used
:return str: The Path object as a JSON string
"""
data_path = AppLocation.get_data_path()
return json.dumps(value, cls=OpenLPJSONEncoder, base_path=data_path)
def process_result_value(self, value, dialect):
"""
Convert the JSON representation back
:param types.UnicodeText value: The value to convert
:param dialect: Not used
:return: The JSON object converted Python object (in this case it should be a Path object)
:rtype: pathlib.Path
"""
data_path = AppLocation.get_data_path()
return json.loads(value, cls=OpenLPJSONDecoder, base_path=data_path)
def upgrade_db(url: str, upgrade: ModuleType) -> Tuple[int, int]:
"""
Upgrade a database.
:param url: The url of the database to upgrade.
:param upgrade: The python module that contains the upgrade instructions.
"""
log.debug('Checking upgrades for DB {db}'.format(db=url))
if not database_exists(url):
log.warning("Database {db} doesn't exist - skipping upgrade checks".format(db=url))
return 0, 0
Base = declarative_base(MetaData)
class Metadata(Base):
"""
Provides a class for the metadata table.
"""
__tablename__ = 'metadata'
key = Column(Unicode(64), primary_key=True)
value = Column(UnicodeText(), default=None)
session, metadata = init_db(url, base=Base)
metadata.create_all(checkfirst=True)
version_meta = session.query(Metadata).get('version')
if version_meta:
version = int(version_meta.value)
else:
# Due to issues with other checks, if the version is not set in the DB then default to 0
# and let the upgrade function handle the checks
version = 0
version_meta = Metadata(key='version', value=version)
session.add(version_meta)
session.commit()
if version > upgrade.__version__:
session.remove()
return version, upgrade.__version__
version += 1
try:
while hasattr(upgrade, 'upgrade_{version:d}'.format(version=version)):
log.debug('Running upgrade_{version:d}'.format(version=version))
try:
upgrade_func = getattr(upgrade, 'upgrade_{version:d}'.format(version=version))
upgrade_func(session, metadata)
session.commit()
# Update the version number AFTER a commit so that we are sure the previous transaction happened
version_meta.value = str(version)
session.commit()
version += 1
except (SQLAlchemyError, DBAPIError):
log.exception('Could not run database upgrade script '
'"upgrade_{version:d}", upgrade process has been halted.'.format(version=version))
break
except (SQLAlchemyError, DBAPIError):
version_meta = Metadata(key='version', value=int(upgrade.__version__))
session.commit()
upgrade_version = upgrade.__version__
version = int(version_meta.value)
session.remove()
return version, upgrade_version
def delete_database(plugin_name: str, db_file_name: Optional[str] = None):
"""
Remove a database file from the system.
:param plugin_name: The name of the plugin to remove the database for
:param db_file_name: The database file name. Defaults to None resulting in the plugin_name being used.
"""
db_file_path = AppLocation.get_section_data_path(plugin_name)
if db_file_name:
db_file_path = db_file_path / db_file_name
else:
db_file_path = db_file_path / plugin_name
return delete_file(db_file_path)
class Manager(object):
"""
Provide generic object persistence management
"""
def __init__(self, plugin_name, init_schema, db_file_path=None, upgrade_mod=None, session=None):
"""
Runs the initialisation process that includes creating the connection to the database and the tables if they do
not exist.
:param plugin_name: The name to setup paths and settings section names
:param init_schema: The init_schema function for this database
:param pathlib.Path | None db_file_path: The file name to use for this database. Defaults to None resulting in
the plugin_name being used.
:param upgrade_mod: The upgrade_schema function for this database
"""
super().__init__()
self.is_dirty = False
self.session = None
self.db_url = None
log.debug('Manager: Creating new DB url')
self.db_url = init_url(plugin_name, db_file_path)
if not session:
try:
self.session = init_schema(self.db_url)
except (SQLAlchemyError, DBAPIError):
handle_db_error(plugin_name, db_file_path)
else:
self.session = session
if upgrade_mod:
try:
db_ver, up_ver = upgrade_db(self.db_url, upgrade_mod)
except (SQLAlchemyError, DBAPIError):
handle_db_error(plugin_name, db_file_path)
return
if db_ver > up_ver:
critical_error_message_box(
translate('OpenLP.Manager', 'Database Error'),
translate('OpenLP.Manager', 'The database being loaded was created in a more recent version of '
'OpenLP. The database is version {db_ver}, while OpenLP expects version {db_up}. '
'The database will not be loaded.\n\nDatabase: {db_name}').format(db_ver=db_ver,
db_up=up_ver,
db_name=self.db_url))
return
def save_object(self, object_instance, commit=True):
"""
Save an object to the database
:param object_instance: The object to save
:param commit: Commit the session with this object
"""
for try_count in range(3):
try:
self.session.add(object_instance)
if commit:
self.session.commit()
self.is_dirty = True
return True
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue - "MySQL has gone away"')
self.session.rollback()
if try_count >= 2:
raise
except InvalidRequestError:
self.session.rollback()
log.exception('Object list save failed')
return False
except Exception:
self.session.rollback()
raise
def save_objects(self, object_list, commit=True):
"""
Save a list of objects to the database
:param object_list: The list of objects to save
:param commit: Commit the session with this object
"""
for try_count in range(3):
try:
self.session.add_all(object_list)
if commit:
self.session.commit()
self.is_dirty = True
return True
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
self.session.rollback()
if try_count >= 2:
raise
except InvalidRequestError:
self.session.rollback()
log.exception('Object list save failed')
return False
except Exception:
self.session.rollback()
raise
def get_object(self, object_class, key=None):
"""
Return the details of an object
:param object_class: The type of object to return
:param key: The unique reference or primary key for the instance to return
"""
if not key:
return object_class()
else:
for try_count in range(3):
try:
return self.session.query(object_class).get(key)
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
if try_count >= 2:
raise
def get_object_filtered(self, object_class, *filter_clauses):
"""
Returns an object matching specified criteria
:param object_class: The type of object to return
:param filter_clause: The criteria to select the object by
"""
query = self.session.query(object_class)
for filter_clause in filter_clauses:
query = query.filter(filter_clause)
for try_count in range(3):
try:
return query.first()
except OperationalError as oe:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
if try_count >= 2 or 'MySQL has gone away' in str(oe):
raise
log.exception('Probably a MySQL issue, "MySQL has gone away"')
def get_all_objects(self, object_class, filter_clause=None, order_by_ref=None):
"""
Returns all the objects from the database
:param object_class: The type of objects to return
:param filter_clause: The filter governing selection of objects to return. Defaults to None.
:param order_by_ref: Any parameters to order the returned objects by. Defaults to None.
"""
query = self.session.query(object_class)
# Check filter_clause
if filter_clause is not None:
if isinstance(filter_clause, list):
for dbfilter in filter_clause:
query = query.filter(dbfilter)
else:
query = query.filter(filter_clause)
# Check order_by_ref
if order_by_ref is not None:
if isinstance(order_by_ref, list):
query = query.order_by(*order_by_ref)
else:
query = query.order_by(order_by_ref)
for try_count in range(3):
try:
return query.all()
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
if try_count >= 2:
raise
def get_object_count(self, object_class, filter_clause=None):
"""
Returns a count of the number of objects in the database.
:param object_class: The type of objects to return.
:param filter_clause: The filter governing selection of objects to return. Defaults to None.
"""
query = self.session.query(object_class)
if filter_clause is not None:
query = query.filter(filter_clause)
for try_count in range(3):
try:
return query.count()
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
if try_count >= 2:
raise
def delete_object(self, object_class, key):
"""
Delete an object from the database
:param object_class: The type of object to delete
:param key: The unique reference or primary key for the instance to be deleted
"""
if key != 0:
object_instance = self.get_object(object_class, key)
for try_count in range(3):
try:
self.session.delete(object_instance)
self.session.commit()
self.is_dirty = True
return True
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
self.session.rollback()
if try_count >= 2:
raise
except InvalidRequestError:
self.session.rollback()
log.exception('Failed to delete object')
return False
except Exception:
self.session.rollback()
raise
else:
return True
def delete_all_objects(self, object_class, filter_clause=None):
"""
Delete all object records. This method should only be used for simple tables and **not** ones with
relationships. The relationships are not deleted from the database and this will lead to database corruptions.
:param object_class: The type of object to delete
:param filter_clause: The filter governing selection of objects to return. Defaults to None.
"""
for try_count in range(3):
try:
query = self.session.query(object_class)
if filter_clause is not None:
query = query.filter(filter_clause)
query.delete(synchronize_session=False)
self.session.commit()
self.is_dirty = True
return True
except OperationalError:
# This exception clause is for users running MySQL which likes to terminate connections on its own
# without telling anyone. See bug #927473. However, other dbms can raise it, usually in a
# non-recoverable way. So we only retry 3 times.
log.exception('Probably a MySQL issue, "MySQL has gone away"')
self.session.rollback()
if try_count >= 2:
raise
except InvalidRequestError:
self.session.rollback()
log.exception('Failed to delete {name} records'.format(name=object_class.__name__))
return False
except Exception:
self.session.rollback()
raise
def finalise(self):
"""
VACUUM the database on exit.
"""
if self.is_dirty:
engine = create_engine(self.db_url)
if self.db_url.startswith('sqlite'):
try:
engine.execute("vacuum")
except OperationalError:
# Just ignore the operational error
pass

View File

@ -35,11 +35,12 @@ The Projector table keeps track of entries for controlled projectors.
import logging
from sqlalchemy import Column, ForeignKey, Integer, MetaData, String, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, ForeignKey, Integer, String, and_
from sqlalchemy.orm import declarative_base, relationship
from openlp.core.lib.db import CommonMixin, Manager, init_db, init_url
from openlp.core.db.helpers import init_db, init_url
from openlp.core.db.manager import DBManager
from openlp.core.db.mixins import CommonMixin
from openlp.core.projectors import upgrade
from openlp.core.projectors.constants import PJLINK_DEFAULT_CODES
@ -48,7 +49,7 @@ log = logging.getLogger(__name__)
log.debug('projector.lib.db module loaded')
Base = declarative_base(MetaData())
Base = declarative_base()
class Manufacturer(Base, CommonMixin):
@ -206,12 +207,8 @@ class Projector(Base, CommonMixin):
sw_version = Column(String(30))
model_filter = Column(String(30))
model_lamp = Column(String(30))
source_list = relationship('ProjectorSource',
order_by='ProjectorSource.code',
backref='projector',
cascade='all, delete-orphan',
primaryjoin='Projector.id==ProjectorSource.projector_id',
lazy='joined')
source_list = relationship('ProjectorSource', order_by='ProjectorSource.code', back_populates='projector',
cascade='all, delete-orphan')
class ProjectorSource(Base, CommonMixin):
@ -240,8 +237,10 @@ class ProjectorSource(Base, CommonMixin):
text = Column(String(20))
projector_id = Column(Integer, ForeignKey('projector.id'))
projector = relationship('Projector', back_populates='source_list')
class ProjectorDB(Manager):
class ProjectorDB(DBManager):
"""
Class to access the projector database.
"""
@ -261,7 +260,7 @@ class ProjectorDB(Manager):
"""
self.db_url = init_url('projector')
session, metadata = init_db(self.db_url, base=Base)
metadata.create_all(checkfirst=True)
metadata.create_all(bind=metadata.bind, checkfirst=True)
return session
def get_projector(self, *args, **kwargs):
@ -318,7 +317,7 @@ class ProjectorDB(Manager):
log.warning('get_projector(): No valid query found - cancelled')
return None
return self.get_all_objects(object_class=Projector, filter_clause=db_filter)
return self.get_all_objects(Projector, db_filter)
def get_projector_by_id(self, dbid):
"""
@ -328,7 +327,7 @@ class ProjectorDB(Manager):
:returns: Projector() instance
"""
log.debug('get_projector_by_id(id="{data}")'.format(data=dbid))
projector = self.get_object_filtered(Projector, Projector.id == dbid)
projector = self.get_object(Projector, dbid)
if projector is None:
# Not found
log.warning('get_projector_by_id() did not find {data}'.format(data=id))

View File

@ -27,7 +27,7 @@ import logging
from sqlalchemy import Column, Table, types
from sqlalchemy.sql.expression import null
from openlp.core.lib.db import get_upgrade_op
from openlp.core.db.upgrades import get_upgrade_op
log = logging.getLogger(__name__)
@ -61,7 +61,7 @@ def upgrade_2(session, metadata):
:param metadata: Metadata of current DB
"""
log.debug('Checking projector DB upgrade to version 2')
projector_table = Table('projector', metadata, autoload=True)
projector_table = Table('projector', metadata, autoload_with=metadata.bind)
upgrade_db = 'mac_adx' not in [col.name for col in projector_table.c.values()]
if upgrade_db:
new_op = get_upgrade_op(session)
@ -85,7 +85,7 @@ def upgrade_3(session, metadata):
:param metadata: Metadata of current DB
"""
log.debug('Checking projector DB upgrade to version 3')
projector_table = Table('projector', metadata, autoload=True)
projector_table = Table('projector', metadata, autoload_with=metadata.bind)
upgrade_db = 'pjlink_class' not in [col.name for col in projector_table.c.values()]
if upgrade_db:
new_op = get_upgrade_op(session)

View File

@ -24,7 +24,7 @@ import logging
from openlp.core.state import State
from openlp.core.common.actions import ActionList
from openlp.core.common.i18n import UiStrings, translate
from openlp.core.lib.db import Manager
from openlp.core.db.manager import DBManager
from openlp.core.lib.plugin import Plugin, StringContent
from openlp.core.lib.theme import VerticalType
from openlp.core.lib.ui import create_action
@ -124,7 +124,7 @@ class AlertsPlugin(Plugin):
self.icon_path = UiIcons().alert
self.icon = self.icon_path
AlertsManager(self)
self.manager = Manager('alerts', init_schema)
self.manager = DBManager('alerts', init_schema)
self.alert_form = AlertForm(self)
State().add_service(self.name, self.weight, is_plugin=True)
State().update_pre_conditions(self.name, self.check_pre_conditions())

View File

@ -22,20 +22,14 @@
The :mod:`db` module provides the database and schema that is the backend for the Alerts plugin.
"""
from sqlalchemy import Column, MetaData
from sqlalchemy.orm import Session
from sqlalchemy import Column
from sqlalchemy.orm import Session, declarative_base
from sqlalchemy.types import Integer, UnicodeText
# Maintain backwards compatibility with older versions of SQLAlchemy while supporting SQLAlchemy 1.4+
try:
from sqlalchemy.orm import declarative_base
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
from openlp.core.lib.db import init_db
from openlp.core.db.helpers import init_db
Base = declarative_base(MetaData())
Base = declarative_base()
class AlertItem(Base):
@ -55,5 +49,5 @@ def init_schema(url: str) -> Session:
The database to setup
"""
session, metadata = init_db(url, base=Base)
metadata.create_all(checkfirst=True)
metadata.create_all(bind=metadata.bind, checkfirst=True)
return session

View File

@ -36,7 +36,7 @@ except ImportError:
from openlp.core.common import trace_error_handler
from openlp.core.common.applocation import AppLocation
from openlp.core.common.i18n import UiStrings, get_locale_key, translate
from openlp.core.lib.db import delete_database
from openlp.core.db.helpers import delete_database
from openlp.core.lib.exceptions import ValidationError
from openlp.core.lib.ui import critical_error_message_box
from openlp.core.widgets.enums import PathEditType

View File

@ -27,22 +27,17 @@ from typing import Any, List, Optional, Tuple
import chardet
from PyQt5 import QtCore
from sqlalchemy import Column, ForeignKey, MetaData, func, or_
from sqlalchemy import Column, ForeignKey, func, or_
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import Session, relationship
from sqlalchemy.orm import Session, declarative_base, relationship
from sqlalchemy.types import Unicode, UnicodeText, Integer
# Maintain backwards compatibility with older versions of SQLAlchemy while supporting SQLAlchemy 1.4+
try:
from sqlalchemy.orm import declarative_base
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
from openlp.core.common import clean_filename
from openlp.core.common.enum import LanguageSelection
from openlp.core.common.applocation import AppLocation
from openlp.core.common.i18n import translate
from openlp.core.lib.db import Manager, init_db
from openlp.core.db.helpers import init_db
from openlp.core.db.manager import DBManager
from openlp.core.lib.ui import critical_error_message_box
from openlp.plugins.bibles.lib import BibleStrings, upgrade
@ -52,7 +47,7 @@ log = logging.getLogger(__name__)
RESERVED_CHARACTERS = '\\.^$*+?{}[]()'
class BibleDB(Manager):
class BibleDB(DBManager):
"""
This class represents a database-bound Bible. It is used as a base class for all the custom importers, so that
the can implement their own import methods, but benefit from the database methods in here via inheritance,
@ -99,7 +94,7 @@ class BibleDB(Manager):
self.file_path = Path(clean_filename(self.name) + '.sqlite')
if 'file' in kwargs:
self.file_path = kwargs['file']
Manager.__init__(self, 'bibles', self.init_schema, self.file_path, upgrade)
DBManager.__init__(self, 'bibles', self.init_schema, self.file_path, upgrade)
if self.session and 'file' in kwargs:
self.get_name()
self._is_web_bible = None
@ -113,7 +108,7 @@ class BibleDB(Manager):
:param url: The database to setup.
"""
Base = declarative_base(MetaData)
Base = declarative_base()
class BibleMeta(Base):
"""
@ -167,7 +162,7 @@ class BibleDB(Manager):
self.Verse = Verse
session, metadata = init_db(url, base=Base)
metadata.create_all(checkfirst=True)
metadata.create_all(bind=metadata.bind, checkfirst=True)
return session
def get_name(self) -> str:
@ -447,7 +442,7 @@ class BibleDB(Manager):
log.debug(verses)
class BiblesResourcesDB(QtCore.QObject, Manager):
class BiblesResourcesDB(QtCore.QObject):
"""
This class represents the database-bound Bible Resources. It provide
some resources which are used in the Bibles plugin.
@ -740,7 +735,7 @@ class BiblesResourcesDB(QtCore.QObject, Manager):
]
class AlternativeBookNamesDB(Manager):
class AlternativeBookNamesDB(object):
"""
This class represents a database-bound alternative book names system.
"""

View File

@ -30,7 +30,7 @@ from sqlalchemy.sql.expression import delete, select
from openlp.core.common.i18n import translate
from openlp.core.common.registry import Registry
from openlp.core.common.settings import ProxyMode
from openlp.core.lib.db import get_upgrade_op
from openlp.core.db.upgrades import get_upgrade_op
log = logging.getLogger(__name__)
@ -54,13 +54,14 @@ def upgrade_2(session, metadata):
"""
settings = Registry().get('settings')
op = get_upgrade_op(session)
metadata_table = Table('metadata', metadata, autoload=True)
proxy, = session.execute(select([metadata_table.c.value], metadata_table.c.key == 'proxy_server')).first() or ('', )
metadata_table = Table('metadata', metadata, autoload_with=metadata.bind)
proxy, = session.execute(
select(metadata_table.c.value).where(metadata_table.c.key == 'proxy_server')).first() or ('', )
if proxy and not \
(proxy == settings.value('advanced/proxy http') or proxy == settings.value('advanced/proxy https')):
http_proxy = ''
https_proxy = ''
name, = session.execute(select([metadata_table.c.value], metadata_table.c.key == 'name')).first()
name, = session.execute(select(metadata_table.c.value).where(metadata_table.c.key == 'name')).first()
msg_box = QtWidgets.QMessageBox()
msg_box.setText(translate('BiblesPlugin', f'The proxy server {proxy} was found in the bible {name}.<br>'
f'Would you like to set it as the proxy for OpenLP?'))
@ -81,12 +82,13 @@ def upgrade_2(session, metadata):
settings.setValue('advanced/proxy https', proxy)
if http_proxy or https_proxy:
username, = session.execute(
select([metadata_table.c.value], metadata_table.c.key == 'proxy_username')).first()
proxy, = session.execute(select([metadata_table.c.value], metadata_table.c.key == 'proxy_password')).first()
select(metadata_table.c.value).where(metadata_table.c.key == 'proxy_username')).scalar().first()
proxy, = session.execute(
select(metadata_table.c.value).where(metadata_table.c.key == 'proxy_password')).scalar().first()
settings.setValue('advanced/proxy username', username)
settings.setValue('advanced/proxy password', proxy)
settings.setValue('advanced/proxy mode', ProxyMode.MANUAL_PROXY)
op.execute(delete(metadata_table, metadata_table.c.key == 'proxy_server'))
op.execute(delete(metadata_table, metadata_table.c.key == 'proxy_username'))
op.execute(delete(metadata_table, metadata_table.c.key == 'proxy_password'))
op.execute(delete(metadata_table).where(metadata_table.c.key == 'proxy_server'))
op.execute(delete(metadata_table).where(metadata_table.c.key == 'proxy_username'))
op.execute(delete(metadata_table).where(metadata_table.c.key == 'proxy_password'))

View File

@ -28,7 +28,7 @@ import logging
from openlp.core.state import State
from openlp.core.common.i18n import translate
from openlp.core.lib import build_icon
from openlp.core.lib.db import Manager
from openlp.core.db.manager import DBManager
from openlp.core.lib.plugin import Plugin, StringContent
from openlp.core.ui.icons import UiIcons
from openlp.plugins.custom.lib.db import CustomSlide, init_schema
@ -51,7 +51,7 @@ class CustomPlugin(Plugin):
def __init__(self):
super(CustomPlugin, self).__init__('custom', CustomMediaItem, CustomTab)
self.weight = -5
self.db_manager = Manager('custom', init_schema)
self.db_manager = DBManager('custom', init_schema)
self.icon_path = UiIcons().custom
self.icon = build_icon(self.icon_path)
State().add_service(self.name, self.weight, is_plugin=True)

View File

@ -22,20 +22,15 @@
The :mod:`db` module provides the database and schema that is the backend for
the Custom plugin
"""
from sqlalchemy import Column, MetaData
from sqlalchemy import Column
from sqlalchemy.orm import declarative_base
from sqlalchemy.types import Integer, Unicode, UnicodeText
# Maintain backwards compatibility with older versions of SQLAlchemy while supporting SQLAlchemy 1.4+
try:
from sqlalchemy.orm import declarative_base
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
from openlp.core.common.i18n import get_natural_key
from openlp.core.lib.db import init_db
from openlp.core.db.helpers import init_db
Base = declarative_base(MetaData())
Base = declarative_base()
class CustomSlide(Base):
@ -71,5 +66,5 @@ def init_schema(url):
:param url: The database to setup
"""
session, metadata = init_db(url, base=Base)
metadata.create_all(checkfirst=True)
metadata.create_all(bind=metadata.bind, checkfirst=True)
return session

View File

@ -23,7 +23,7 @@ import logging
from openlp.core.common.i18n import translate
from openlp.core.lib import build_icon
from openlp.core.lib.db import Manager
from openlp.core.db.manager import DBManager
from openlp.core.lib.plugin import Plugin, StringContent
from openlp.core.state import State
from openlp.core.ui.icons import UiIcons
@ -42,7 +42,7 @@ class ImagePlugin(Plugin):
def __init__(self):
super(ImagePlugin, self).__init__('images', ImageMediaItem, ImageTab)
self.manager = Manager('images', init_schema, upgrade_mod=upgrade)
self.manager = DBManager('images', init_schema, upgrade_mod=upgrade)
self.weight = -7
self.icon_path = UiIcons().picture
self.icon = build_icon(self.icon_path)

View File

@ -21,19 +21,13 @@
"""
The :mod:`db` module provides the database and schema that is the backend for the Images plugin.
"""
from sqlalchemy import MetaData
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, declarative_base
# Maintain backwards compatibility with older versions of SQLAlchemy while supporting SQLAlchemy 1.4+
try:
from sqlalchemy.orm import declarative_base
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
from openlp.core.lib.db import FolderMixin, ItemMixin, init_db
from openlp.core.db.helpers import init_db
from openlp.core.db.mixins import FolderMixin, ItemMixin
Base = declarative_base(MetaData())
Base = declarative_base()
class Folder(Base, FolderMixin):
@ -74,5 +68,5 @@ def init_schema(url: str) -> Session:
* file_hash
"""
session, metadata = init_db(url, base=Base)
metadata.create_all(checkfirst=True)
metadata.create_all(bind=metadata.bind, checkfirst=True)
return session

View File

@ -26,15 +26,15 @@ import logging
import shutil
from pathlib import Path
from sqlalchemy import Column, ForeignKey, MetaData, Table, inspect
from sqlalchemy import Column, ForeignKey, MetaData, Table, inspect, select
from sqlalchemy.orm import Session
from sqlalchemy.types import Integer, Unicode
from openlp.core.common import sha256_file_hash
from openlp.core.common.applocation import AppLocation
from openlp.core.common.db import drop_columns
from openlp.core.common.json import OpenLPJSONEncoder, OpenLPJSONDecoder
from openlp.core.lib.db import PathType, get_upgrade_op
from openlp.core.db.types import PathType
from openlp.core.db.upgrades import get_upgrade_op
log = logging.getLogger(__name__)
@ -53,23 +53,26 @@ def upgrade_2(session: Session, metadata: MetaData):
Version 2 upgrade - Move file path from old db to JSON encoded path to new db. Added during 2.5 dev
"""
log.debug('Starting upgrade_2 for file_path to JSON')
old_table = Table('image_filenames', metadata, autoload=True)
if 'file_path' not in [col.name for col in old_table.c.values()]:
images_table = Table('image_filenames', metadata, extend_existing=True, autoload_with=metadata.bind)
if 'file_path' not in [col.name for col in images_table.c.values()]:
op = get_upgrade_op(session)
op.add_column('image_filenames', Column('file_path', PathType()))
with op.batch_alter_table('image_filenames') as batch_op:
batch_op.add_column(Column('file_path', PathType()))
# Refresh the table definition
images_table = Table('image_filenames', metadata, extend_existing=True, autoload_with=metadata.bind)
conn = op.get_bind()
results = conn.execute('SELECT * FROM image_filenames')
results = conn.execute(select(images_table))
data_path = AppLocation.get_data_path()
for row in results.fetchall():
file_path_json = json.dumps(Path(row.filename), cls=OpenLPJSONEncoder, base_path=data_path)
sql = 'UPDATE image_filenames SET file_path = :file_path_json WHERE id = :id'
conn.execute(sql, {'file_path_json': file_path_json, 'id': row.id})
conn.execute(images_table.update().where(images_table.c.id == row.id).values(file_path=file_path_json))
# Drop old columns
if metadata.bind.url.get_dialect().name == 'sqlite':
drop_columns(op, 'image_filenames', ['filename', ])
else:
op.drop_constraint('image_filenames', 'foreignkey')
op.drop_column('image_filenames', 'filenames')
with op.batch_alter_table('image_filenames') as batch_op:
# if metadata.bind.url.get_dialect().name != 'sqlite':
# for fk in old_table.foreign_keys:
# batch_op.drop_constraint(fk.name, 'foreignkey')
batch_op.drop_column('filename')
del images_table
def upgrade_3(session: Session, metadata: MetaData):
@ -77,32 +80,33 @@ def upgrade_3(session: Session, metadata: MetaData):
Version 3 upgrade - add sha256 hash
"""
log.debug('Starting upgrade_3 for adding sha256 hashes')
old_table = Table('image_filenames', metadata, autoload=True)
if 'file_hash' not in [col.name for col in old_table.c.values()]:
images_table = Table('image_filenames', metadata, extend_existing=True, autoload_with=metadata.bind)
if 'file_hash' not in [col.name for col in images_table.c.values()]:
op = get_upgrade_op(session)
op.add_column('image_filenames', Column('file_hash', Unicode(128)))
with op.batch_alter_table('image_filenames') as batch_op:
batch_op.add_column(Column('file_hash', Unicode(128)))
conn = op.get_bind()
results = conn.execute('SELECT * FROM image_filenames')
results = conn.execute(select(images_table))
thumb_path = AppLocation.get_data_path() / 'images' / 'thumbnails'
for row in results.fetchall():
file_path = json.loads(row.file_path, cls=OpenLPJSONDecoder)
if file_path.exists():
hash = sha256_file_hash(file_path)
hash_ = sha256_file_hash(file_path)
else:
log.warning('{image} does not exists, so no sha256 hash added.'.format(image=str(file_path)))
# set a fake "hash" to allow for the upgrade to go through. The image will be marked as invalid
hash = 'NONE'
sql = 'UPDATE image_filenames SET file_hash = :hash WHERE id = :id'
conn.execute(sql, {'hash': hash, 'id': row.id})
hash_ = None
conn.execute(images_table.update().where(images_table.c.id == row.id).values(file_hash=hash_))
# rename thumbnail to use file hash
ext = file_path.suffix.lower()
old_thumb = thumb_path / '{name:d}{ext}'.format(name=row.id, ext=ext)
new_thumb = thumb_path / '{name:s}{ext}'.format(name=hash, ext=ext)
new_thumb = thumb_path / '{name:s}{ext}'.format(name=hash_, ext=ext)
try:
shutil.move(old_thumb, new_thumb)
except OSError:
log.exception('Failed in renaming image thumb from {oldt} to {newt}'.format(oldt=old_thumb,
newt=new_thumb))
del images_table
def upgrade_4(session: Session, metadata: MetaData):
@ -118,8 +122,8 @@ def upgrade_4(session: Session, metadata: MetaData):
# Bypass this upgrade, it has already been performed
return
# Get references to the old tables
old_folder_table = Table('image_groups', metadata, autoload=True)
old_item_table = Table('image_filenames', metadata, autoload=True)
old_folder_table = Table('image_groups', metadata, extend_existing=True, autoload_with=metadata.bind)
old_item_table = Table('image_filenames', metadata, extend_existing=True, autoload_with=metadata.bind)
# Create the new tables
if 'folder' not in table_names:
new_folder_table = op.create_table(
@ -129,7 +133,7 @@ def upgrade_4(session: Session, metadata: MetaData):
Column('parent_id', Integer, ForeignKey('folder.id'))
)
else:
new_folder_table = Table('folder', metadata, autoload=True)
new_folder_table = Table('folder', metadata, autoload_with=metadata.bind)
if 'item' not in table_names:
new_item_table = op.create_table(
'item',
@ -140,7 +144,7 @@ def upgrade_4(session: Session, metadata: MetaData):
Column('folder_id', Integer)
)
else:
new_item_table = Table('item', metadata, autoload=True)
new_item_table = Table('item', metadata, autoload_with=metadata.bind)
# Bulk insert all the data from the old tables to the new tables
folders = []
for old_folder in conn.execute(old_folder_table.select()).fetchall():

View File

@ -21,12 +21,12 @@
"""
The :mod:`~openlp.plugins.media.lib.db` module contains the database layer for the media plugin
"""
from sqlalchemy import MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import declarative_base
from openlp.core.lib.db import FolderMixin, ItemMixin, init_db, init_url
from openlp.core.db.helpers import init_db, init_url
from openlp.core.db.mixins import FolderMixin, ItemMixin
Base = declarative_base(MetaData())
Base = declarative_base()
class Folder(Base, FolderMixin):
@ -42,5 +42,5 @@ def init_schema(*args, **kwargs):
Set up the media database and initialise the schema
"""
session, metadata = init_db(init_url('media'), base=Base)
metadata.create_all(checkfirst=True)
metadata.create_all(bind=metadata.bind, checkfirst=True)
return session

View File

@ -28,7 +28,7 @@ from pathlib import Path
from openlp.core.common import sha256_file_hash
from openlp.core.common.i18n import translate
from openlp.core.lib import build_icon
from openlp.core.lib.db import Manager
from openlp.core.db.manager import DBManager
from openlp.core.lib.plugin import Plugin, StringContent
from openlp.core.state import State
from openlp.core.ui.icons import UiIcons
@ -51,7 +51,7 @@ class MediaPlugin(Plugin):
def __init__(self):
super().__init__('media', MediaMediaItem)
self.manager = Manager(plugin_name='media', init_schema=init_schema)
self.manager = DBManager(plugin_name='media', init_schema=init_schema)
self.weight = -6
self.icon_path = UiIcons().video
self.icon = build_icon(self.icon_path)

View File

@ -21,12 +21,12 @@
"""
The :mod:`~openlp.plugins.presentations.lib.db` module contains the database layer for the presentations plugin
"""
from sqlalchemy import MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import declarative_base
from openlp.core.lib.db import FolderMixin, ItemMixin, init_db, init_url
from openlp.core.db.helpers import init_db, init_url
from openlp.core.db.mixins import FolderMixin, ItemMixin
Base = declarative_base(MetaData())
Base = declarative_base()
class Folder(Base, FolderMixin):
@ -42,5 +42,5 @@ def init_schema(*args, **kwargs):
Set up the media database and initialise the schema
"""
session, metadata = init_db(init_url('presentations'), base=Base)
metadata.create_all(checkfirst=True)
metadata.create_all(bind=metadata.bind, checkfirst=True)
return session

View File

@ -29,7 +29,7 @@ from pathlib import Path
from openlp.core.common import extension_loader, sha256_file_hash
from openlp.core.common.i18n import translate
from openlp.core.lib import build_icon
from openlp.core.lib.db import Manager
from openlp.core.db.manager import DBManager
from openlp.core.lib.plugin import Plugin, StringContent
from openlp.core.state import State
from openlp.core.ui.icons import UiIcons
@ -55,7 +55,7 @@ class PresentationPlugin(Plugin):
PluginPresentation constructor.
"""
super().__init__('presentations', PresentationMediaItem)
self.manager = Manager(plugin_name='media', init_schema=init_schema)
self.manager = DBManager(plugin_name='media', init_schema=init_schema)
self.weight = -8
self.icon_path = UiIcons().presentation
self.icon = build_icon(self.icon_path)

View File

@ -110,23 +110,18 @@ The song database contains the following tables:
"""
from typing import Optional
from sqlalchemy import Column, ForeignKey, MetaData, Table
from sqlalchemy import Column, ForeignKey, Table
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import reconstructor, relationship
from sqlalchemy.orm import declarative_base, reconstructor, relationship
from sqlalchemy.sql.expression import func, text
from sqlalchemy.types import Boolean, DateTime, Integer, Unicode, UnicodeText
# Maintain backwards compatibility with older versions of SQLAlchemy while supporting SQLAlchemy 1.4+
try:
from sqlalchemy.orm import declarative_base
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
from openlp.core.common.i18n import get_natural_key, translate
from openlp.core.lib.db import PathType, init_db
from openlp.core.db.types import PathType
from openlp.core.db.upgrades import init_db
Base = declarative_base(MetaData())
Base = declarative_base()
songs_topics_table = Table(
@ -383,5 +378,5 @@ def init_schema(url):
"""
session, metadata = init_db(url, base=Base)
metadata.create_all(checkfirst=True)
metadata.create_all(bind=metadata.bind, checkfirst=True)
return session

View File

@ -26,14 +26,17 @@ import json
import logging
from pathlib import Path
from sqlalchemy import Column, ForeignKey, Table, types
from sqlalchemy.sql.expression import false, func, null, text
from sqlalchemy import Column, ForeignKey, Table
from sqlalchemy.schema import MetaData
from sqlalchemy.orm import Session
from sqlalchemy.types import Boolean, DateTime, Integer, Unicode
from sqlalchemy.sql.expression import false, func, null, text, select, update
from openlp.core.common import sha256_file_hash
from openlp.core.common.applocation import AppLocation
from openlp.core.common.db import drop_columns
from openlp.core.common.json import OpenLPJSONEncoder, OpenLPJSONDecoder
from openlp.core.lib.db import PathType, get_upgrade_op
from openlp.core.db.types import PathType
from openlp.core.db.upgrades import get_upgrade_op
log = logging.getLogger(__name__)
@ -41,7 +44,7 @@ __version__ = 8
# TODO: When removing an upgrade path the ftw-data needs updating to the minimum supported version
def upgrade_1(session, metadata):
def upgrade_1(session: Session, metadata: MetaData):
"""
Version 1 upgrade.
@ -57,51 +60,50 @@ def upgrade_1(session, metadata):
:param metadata:
"""
op = get_upgrade_op(session)
metadata.reflect()
metadata.reflect(bind=metadata.bind)
if 'media_files_songs' in [t.name for t in metadata.tables.values()]:
op.drop_table('media_files_songs')
op.add_column('media_files', Column('song_id', types.Integer(), server_default=null()))
op.add_column('media_files', Column('weight', types.Integer(), server_default=text('0')))
if metadata.bind.url.get_dialect().name != 'sqlite':
# SQLite doesn't support ALTER TABLE ADD CONSTRAINT
op.create_foreign_key('fk_media_files_song_id', 'media_files', 'songs', ['song_id', 'id'])
with op.batch_alter_table('media_files') as batch_op:
batch_op.add_column('media_files', Column('song_id', Integer, server_default=null()))
batch_op.add_column('media_files', Column('weight', Integer, server_default=text('0')))
batch_op.create_foreign_key('fk_media_files_song_id', 'media_files', 'songs', ['song_id', 'id'])
else:
log.warning('Skipping upgrade_1 step of upgrading the song db')
def upgrade_2(session, metadata):
def upgrade_2(session: Session, metadata: MetaData):
"""
Version 2 upgrade.
This upgrade adds a create_date and last_modified date to the songs table
"""
op = get_upgrade_op(session)
songs_table = Table('songs', metadata, autoload=True)
songs_table = Table('songs', metadata, autoload_with=metadata.bind)
if 'create_date' not in [col.name for col in songs_table.c.values()]:
op.add_column('songs', Column('create_date', types.DateTime(), default=func.now()))
op.add_column('songs', Column('last_modified', types.DateTime(), default=func.now()))
op.add_column('songs', Column('create_date', DateTime, default=func.now()))
op.add_column('songs', Column('last_modified', DateTime, default=func.now()))
else:
log.warning('Skipping upgrade_2 step of upgrading the song db')
def upgrade_3(session, metadata):
def upgrade_3(session: Session, metadata: MetaData):
"""
Version 3 upgrade.
This upgrade adds a temporary song flag to the songs table
"""
op = get_upgrade_op(session)
songs_table = Table('songs', metadata, autoload=True)
songs_table = Table('songs', metadata, autoload_with=metadata.bind)
if 'temporary' not in [col.name for col in songs_table.c.values()]:
if metadata.bind.url.get_dialect().name == 'sqlite':
op.add_column('songs', Column('temporary', types.Boolean(create_constraint=False), server_default=false()))
op.add_column('songs', Column('temporary', Boolean(create_constraint=False), server_default=false()))
else:
op.add_column('songs', Column('temporary', types.Boolean(), server_default=false()))
op.add_column('songs', Column('temporary', Boolean, server_default=false()))
else:
log.warning('Skipping upgrade_3 step of upgrading the song db')
def upgrade_4(session, metadata):
def upgrade_4(session: Session, metadata: MetaData):
"""
Version 4 upgrade.
@ -111,7 +113,7 @@ def upgrade_4(session, metadata):
pass
def upgrade_5(session, metadata):
def upgrade_5(session: Session, metadata: MetaData):
"""
Version 5 upgrade.
@ -128,18 +130,17 @@ def upgrade_6(session, metadata):
This version corrects the errors in upgrades 4 and 5
"""
op = get_upgrade_op(session)
metadata.reflect()
metadata.reflect(bind=metadata.bind)
# Move upgrade 4 to here and correct it (authors_songs table, not songs table)
authors_songs = Table('authors_songs', metadata, autoload=True)
authors_songs = Table('authors_songs', metadata, autoload_with=metadata.bind)
if 'author_type' not in [col.name for col in authors_songs.c.values()]:
# Since SQLite doesn't support changing the primary key of a table, we need to recreate the table
# and copy the old values
op.create_table(
'authors_songs_tmp',
Column('author_id', types.Integer(), ForeignKey('authors.id'), primary_key=True),
Column('song_id', types.Integer(), ForeignKey('songs.id'), primary_key=True),
Column('author_type', types.Unicode(255), primary_key=True,
nullable=False, server_default=text('""'))
Column('author_id', Integer, ForeignKey('authors.id'), primary_key=True),
Column('song_id', Integer, ForeignKey('songs.id'), primary_key=True),
Column('author_type', Unicode(255), primary_key=True, nullable=False, server_default=text('""'))
)
op.execute('INSERT INTO authors_songs_tmp SELECT author_id, song_id, "" FROM authors_songs')
op.drop_table('authors_songs')
@ -149,9 +150,9 @@ def upgrade_6(session, metadata):
# Create the mapping table (songs <-> songbooks)
op.create_table(
'songs_songbooks',
Column('songbook_id', types.Integer(), ForeignKey('song_books.id'), primary_key=True),
Column('song_id', types.Integer(), ForeignKey('songs.id'), primary_key=True),
Column('entry', types.Unicode(255), primary_key=True, nullable=False)
Column('songbook_id', Integer, ForeignKey('song_books.id'), primary_key=True),
Column('song_id', Integer, ForeignKey('songs.id'), primary_key=True),
Column('entry', Unicode(255), primary_key=True, nullable=False)
)
# Migrate old data
@ -159,12 +160,10 @@ def upgrade_6(session, metadata):
WHERE song_book_id IS NOT NULL AND song_number IS NOT NULL AND song_book_id <> 0')
# Drop old columns
if metadata.bind.url.get_dialect().name == 'sqlite':
drop_columns(op, 'songs', ['song_book_id', 'song_number'])
else:
op.drop_constraint('songs_ibfk_1', 'songs', 'foreignkey')
op.drop_column('songs', 'song_book_id')
op.drop_column('songs', 'song_number')
with op.batch_alter_table('songs') as batch_op:
# batch_op.drop_constraint('song_book_id', 'foreignkey')
batch_op.drop_column('song_book_id')
batch_op.drop_column('song_number')
# Finally, clean up our mess in people's databases
op.execute('DELETE FROM songs_songbooks WHERE songbook_id = 0')
@ -174,23 +173,23 @@ def upgrade_7(session, metadata):
Version 7 upgrade - Move file path from old db to JSON encoded path to new db. Upgrade added in 2.5 dev
"""
log.debug('Starting upgrade_7 for file_path to JSON')
old_table = Table('media_files', metadata, autoload=True)
if 'file_path' not in [col.name for col in old_table.c.values()]:
media_files = Table('media_files', metadata, autoload_with=metadata.bind)
if 'file_path' not in [col.name for col in media_files.c.values()]:
op = get_upgrade_op(session)
op.add_column('media_files', Column('file_path', PathType()))
media_files.append_column(Column('file_path', PathType()))
conn = op.get_bind()
results = conn.execute('SELECT * FROM media_files')
results = conn.scalars(select(media_files))
data_path = AppLocation.get_data_path()
for row in results.fetchall():
for row in results.all():
file_path_json = json.dumps(Path(row.file_name), cls=OpenLPJSONEncoder, base_path=data_path)
sql = 'UPDATE media_files SET file_path = :file_path WHERE id = :id'
conn.execute(sql, {'file_path': file_path_json, 'id': row.id})
conn.execute(update(media_files).where(media_files.c.id == row.id).values(file_path=file_path_json))
# Drop old columns
if metadata.bind.url.get_dialect().name == 'sqlite':
drop_columns(op, 'media_files', ['file_name', ])
else:
op.drop_constraint('media_files', 'foreignkey')
op.drop_column('media_files', 'filenames')
# with op.batch_alter_table('media_files') as batch_op:
# if metadata.bind.url.get_dialect().name != 'sqlite':
# for fk in media_files.foreign_keys:
# batch_op.drop_constraint(fk.name, 'foreignkey')
# batch_op.drop_column('filename')
def upgrade_8(session, metadata):
@ -198,14 +197,15 @@ def upgrade_8(session, metadata):
Version 8 upgrade - add sha256 hash to media
"""
log.debug('Starting upgrade_8 for adding sha256 hashes')
old_table = Table('media_files', metadata, autoload=True)
if 'file_hash' not in [col.name for col in old_table.c.values()]:
media_files = Table('media_files', metadata, autoload_with=metadata.bind)
if 'file_hash' not in [col.name for col in media_files.c.values()]:
op = get_upgrade_op(session)
op.add_column('media_files', Column('file_hash', types.Unicode(128)))
op.add_column('media_files', Column('file_hash', Unicode(128)))
media_files.append_column(Column('file_hash', Unicode(128)))
conn = op.get_bind()
results = conn.execute('SELECT * FROM media_files')
results = conn.scalars(select(media_files))
data_path = AppLocation.get_data_path()
for row in results.fetchall():
for row in results.all():
file_path = json.loads(row.file_path, cls=OpenLPJSONDecoder)
full_file_path = data_path / file_path
if full_file_path.exists():
@ -214,5 +214,4 @@ def upgrade_8(session, metadata):
log.warning('{audio} does not exists, so no sha256 hash added.'.format(audio=str(file_path)))
# set a fake "hash" to allow for the upgrade to go through. The image will be marked as invalid
hash = 'NONE'
sql = 'UPDATE media_files SET file_hash = :hash WHERE id = :id'
conn.execute(sql, {'hash': hash, 'id': row.id})
conn.execute(update(media_files).where(media_files.c.id == row.id).values(file_hash=hash))

View File

@ -35,7 +35,7 @@ from openlp.core.common.actions import ActionList
from openlp.core.common.i18n import UiStrings, translate
from openlp.core.common.registry import Registry
from openlp.core.lib import build_icon
from openlp.core.lib.db import Manager
from openlp.core.db.manager import DBManager
from openlp.core.lib.plugin import Plugin, StringContent
from openlp.core.lib.ui import create_action
from openlp.core.ui.icons import UiIcons
@ -120,7 +120,7 @@ class SongsPlugin(Plugin):
Create and set up the Songs plugin.
"""
super(SongsPlugin, self).__init__('songs', SongMediaItem, SongsTab)
self.manager = Manager('songs', init_schema, upgrade_mod=upgrade)
self.manager = DBManager('songs', init_schema, upgrade_mod=upgrade)
self.weight = -10
self.icon_path = UiIcons().music
self.icon = build_icon(self.icon_path)

View File

@ -23,20 +23,14 @@ The :mod:`db` module provides the database and schema that is the backend for
the SongUsage plugin
"""
from sqlalchemy import Column, MetaData
from sqlalchemy.orm import Session
from sqlalchemy import Column
from sqlalchemy.orm import Session, declarative_base
from sqlalchemy.types import Integer, Date, Time, Unicode
# Maintain backwards compatibility with older versions of SQLAlchemy while supporting SQLAlchemy 1.4+
try:
from sqlalchemy.orm import declarative_base
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
from openlp.core.lib.db import init_db
from openlp.core.db.helpers import init_db
Base = declarative_base(MetaData())
Base = declarative_base()
class SongUsageItem(Base):
@ -63,5 +57,5 @@ def init_schema(url: str) -> Session:
:param url: The database to setup
"""
session, metadata = init_db(url, base=Base)
metadata.create_all(checkfirst=True)
metadata.create_all(bind=metadata.bind, checkfirst=True)
return session

View File

@ -26,7 +26,7 @@ import logging
from sqlalchemy import Column, Table, types
from openlp.core.lib.db import get_upgrade_op
from openlp.core.db.upgrades import get_upgrade_op
log = logging.getLogger(__name__)
@ -52,7 +52,7 @@ def upgrade_2(session, metadata):
:param metadata: SQLAlchemy MetaData object
"""
op = get_upgrade_op(session)
songusage_table = Table('songusage_data', metadata, autoload=True)
songusage_table = Table('songusage_data', metadata, autoload_with=metadata.bind)
if 'plugin_name' not in [col.name for col in songusage_table.c.values()]:
op.add_column('songusage_data', Column('plugin_name', types.Unicode(20), server_default=''))
op.add_column('songusage_data', Column('source', types.Unicode(10), server_default=''))

View File

@ -28,7 +28,7 @@ from openlp.core.state import State
from openlp.core.common.actions import ActionList
from openlp.core.common.i18n import translate
from openlp.core.common.registry import Registry
from openlp.core.lib.db import Manager
from openlp.core.db.manager import DBManager
from openlp.core.lib.plugin import Plugin, StringContent
from openlp.core.lib.ui import create_action
from openlp.core.ui.icons import UiIcons
@ -51,7 +51,7 @@ class SongUsagePlugin(Plugin):
def __init__(self):
super(SongUsagePlugin, self).__init__('songusage')
self.manager = Manager('songusage', init_schema, upgrade_mod=upgrade)
self.manager = DBManager('songusage', init_schema, upgrade_mod=upgrade)
self.weight = -4
self.icon = UiIcons().song_usage
self.song_usage_active = False

View File

@ -118,7 +118,7 @@ using a computer and a display/projector.""",
'QtAwesome',
"qrcode",
'requests',
'SQLAlchemy < 1.5',
'SQLAlchemy >= 1.4',
'waitress',
'websockets'
],

View File

@ -1,97 +0,0 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
Package to test the openlp.core.common.db package.
"""
import gc
import os
import pytest
import shutil
import time
from tempfile import mkdtemp
import sqlalchemy
from openlp.core.common.db import drop_column, drop_columns
from openlp.core.lib.db import get_upgrade_op, init_db
from tests.utils.constants import TEST_RESOURCES_PATH
@pytest.fixture
def op():
tmp_folder = mkdtemp()
db_path = os.path.join(TEST_RESOURCES_PATH, 'songs', 'songs-1.9.7.sqlite')
db_tmp_path = os.path.join(tmp_folder, 'songs-1.9.7.sqlite')
shutil.copyfile(db_path, db_tmp_path)
db_url = 'sqlite:///' + db_tmp_path
session, metadata = init_db(db_url)
upgrade_op = get_upgrade_op(session)
yield upgrade_op
session.close()
session = None
gc.collect()
retries = 0
while retries < 5:
try:
if os.path.exists(tmp_folder):
shutil.rmtree(tmp_folder)
break
except Exception:
time.sleep(1)
retries += 1
def test_delete_column(op):
"""
Test deleting a single column in a table
"""
# GIVEN: A temporary song db
# WHEN: Deleting a columns in a table
drop_column(op, 'songs', 'song_book_id')
# THEN: The column should have been deleted
meta = sqlalchemy.MetaData(bind=op.get_bind())
meta.reflect()
columns = meta.tables['songs'].columns
for column in columns:
if column.name == 'song_book_id':
assert "The column 'song_book_id' should have been deleted."
def test_delete_columns(op):
"""
Test deleting multiple columns in a table
"""
# GIVEN: A temporary song db
# WHEN: Deleting a columns in a table
drop_columns(op, 'songs', ['song_book_id', 'song_number'])
# THEN: The columns should have been deleted
meta = sqlalchemy.MetaData(bind=op.get_bind())
meta.reflect()
columns = meta.tables['songs'].columns
for column in columns:
if column.name == 'song_book_id' or column.name == 'song_number':
assert "The column '%s' should have been deleted." % column.name

View File

@ -19,18 +19,17 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
Package to test the openlp.core.lib package.
Package to test the :mod:`~openlp.core.db.helpers` package.
"""
from pathlib import Path
from sqlite3 import OperationalError as SQLiteOperationalError
from unittest.mock import MagicMock, patch
from sqlalchemy import MetaData
from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm.scoping import ScopedSession
from sqlalchemy.pool import NullPool
from sqlalchemy.pool import StaticPool
from openlp.core.lib.db import Manager, delete_database, get_upgrade_op, init_db, upgrade_db
from openlp.core.db.helpers import init_db, delete_database
def test_init_db_calls_correct_functions():
@ -38,10 +37,10 @@ def test_init_db_calls_correct_functions():
Test that the init_db function makes the correct function calls
"""
# GIVEN: Mocked out SQLAlchemy calls and return objects, and an in-memory SQLite database URL
with patch('openlp.core.lib.db.create_engine') as mocked_create_engine, \
patch('openlp.core.lib.db.MetaData') as MockedMetaData, \
patch('openlp.core.lib.db.sessionmaker') as mocked_sessionmaker, \
patch('openlp.core.lib.db.scoped_session') as mocked_scoped_session:
with patch('openlp.core.db.helpers.create_engine') as mocked_create_engine, \
patch('openlp.core.db.helpers.MetaData') as MockedMetaData, \
patch('openlp.core.db.helpers.sessionmaker') as mocked_sessionmaker, \
patch('openlp.core.db.helpers.scoped_session') as mocked_scoped_session:
mocked_engine = MagicMock()
mocked_metadata = MagicMock()
mocked_sessionmaker_object = MagicMock()
@ -56,7 +55,7 @@ def test_init_db_calls_correct_functions():
session, metadata = init_db(db_url)
# THEN: We should see the correct function calls
mocked_create_engine.assert_called_with(db_url, poolclass=NullPool)
mocked_create_engine.assert_called_with(db_url, poolclass=StaticPool)
MockedMetaData.assert_called_with(bind=mocked_engine)
mocked_sessionmaker.assert_called_with(autoflush=True, autocommit=False, bind=mocked_engine)
mocked_scoped_session.assert_called_with(mocked_sessionmaker_object)
@ -70,47 +69,23 @@ def test_init_db_defaults():
"""
# GIVEN: An in-memory SQLite URL
db_url = 'sqlite://'
Base = declarative_base()
# WHEN: The database is initialised through init_db
session, metadata = init_db(db_url)
session, metadata = init_db(db_url, base=Base)
# THEN: Valid session and metadata objects should be returned
assert isinstance(session, ScopedSession), 'The ``session`` object should be a ``ScopedSession`` instance'
assert isinstance(metadata, MetaData), 'The ``metadata`` object should be a ``MetaData`` instance'
def test_get_upgrade_op():
"""
Test that the ``get_upgrade_op`` function creates a MigrationContext and an Operations object
"""
# GIVEN: Mocked out alembic classes and a mocked out SQLAlchemy session object
with patch('openlp.core.lib.db.MigrationContext') as MockedMigrationContext, \
patch('openlp.core.lib.db.Operations') as MockedOperations:
mocked_context = MagicMock()
mocked_op = MagicMock()
mocked_connection = MagicMock()
MockedMigrationContext.configure.return_value = mocked_context
MockedOperations.return_value = mocked_op
mocked_session = MagicMock()
mocked_session.bind.connect.return_value = mocked_connection
# WHEN: get_upgrade_op is executed with the mocked session object
op = get_upgrade_op(mocked_session)
# THEN: The op object should be mocked_op, and the correction function calls should have been made
assert op is mocked_op, 'The return value should be the mocked object'
mocked_session.bind.connect.assert_called_with()
MockedMigrationContext.configure.assert_called_with(mocked_connection)
MockedOperations.assert_called_with(mocked_context)
def test_delete_database_without_db_file_name(registry):
"""
Test that the ``delete_database`` function removes a database file, without the file name parameter
"""
# GIVEN: Mocked out AppLocation class and delete_file method, a test plugin name and a db location
with patch('openlp.core.lib.db.AppLocation') as MockedAppLocation, \
patch('openlp.core.lib.db.delete_file') as mocked_delete_file:
with patch('openlp.core.db.helpers.AppLocation') as MockedAppLocation, \
patch('openlp.core.db.helpers.delete_file') as mocked_delete_file:
MockedAppLocation.get_section_data_path.return_value = Path('test-dir')
mocked_delete_file.return_value = True
test_plugin = 'test'
@ -130,8 +105,8 @@ def test_delete_database_with_db_file_name():
Test that the ``delete_database`` function removes a database file, with the file name supplied
"""
# GIVEN: Mocked out AppLocation class and delete_file method, a test plugin name and a db location
with patch('openlp.core.lib.db.AppLocation') as MockedAppLocation, \
patch('openlp.core.lib.db.delete_file') as mocked_delete_file:
with patch('openlp.core.db.helpers.AppLocation') as MockedAppLocation, \
patch('openlp.core.db.helpers.delete_file') as mocked_delete_file:
MockedAppLocation.get_section_data_path.return_value = Path('test-dir')
mocked_delete_file.return_value = False
test_plugin = 'test'
@ -145,65 +120,3 @@ def test_delete_database_with_db_file_name():
MockedAppLocation.get_section_data_path.assert_called_with(test_plugin)
mocked_delete_file.assert_called_with(test_location)
assert result is False, 'The result of delete_file should be False (was rigged that way)'
def test_skip_db_upgrade_with_no_database(temp_folder):
"""
Test the upgrade_db function does not try to update a missing database
"""
# GIVEN: Database URL that does not (yet) exist
url = 'sqlite:///{tmp}/test_db.sqlite'.format(tmp=temp_folder)
mocked_upgrade = MagicMock()
# WHEN: We attempt to upgrade a non-existent database
upgrade_db(url, mocked_upgrade)
# THEN: upgrade should NOT have been called
assert mocked_upgrade.called is False, 'Database upgrade function should NOT have been called'
@patch('openlp.core.lib.db.init_url')
@patch('openlp.core.lib.db.create_engine')
def test_manager_finalise_exception(mocked_create_engine, mocked_init_url, temp_folder, settings):
"""Test that the finalise method silently fails on an exception"""
# GIVEN: A db Manager object
mocked_init_url.return_value = f'sqlite:///{temp_folder}/test_db.sqlite'
mocked_session = MagicMock()
def init_schema(url):
return mocked_session
mocked_create_engine.return_value.execute.side_effect = SQLAlchemyOperationalError(
statement='vacuum',
params=[],
orig=SQLiteOperationalError('database is locked')
)
manager = Manager('test', init_schema)
manager.is_dirty = True
# WHEN: finalise() is called
manager.finalise()
# THEN: vacuum should have been called on the database
mocked_create_engine.return_value.execute.assert_called_once_with('vacuum')
@patch('openlp.core.lib.db.init_url')
@patch('openlp.core.lib.db.create_engine')
def test_manager_finalise(mocked_create_engine, mocked_init_url, temp_folder, settings):
"""Test that the finalise method works correctly"""
# GIVEN: A db Manager object
mocked_init_url.return_value = f'sqlite:///{temp_folder}/test_db.sqlite'
mocked_session = MagicMock()
def init_schema(url):
return mocked_session
manager = Manager('test', init_schema)
manager.is_dirty = True
# WHEN: finalise() is called
manager.finalise()
# THEN: vacuum should have been called on the database
mocked_create_engine.return_value.execute.assert_called_once_with('vacuum')

View File

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
Package to test the openlp.core.lib package.
"""
from sqlite3 import OperationalError as SQLiteOperationalError
from unittest.mock import MagicMock, patch
from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
from openlp.core.db.manager import DBManager
@patch('openlp.core.db.manager.init_url')
@patch('openlp.core.db.manager.create_engine')
def test_manager_finalise_exception(mocked_create_engine, mocked_init_url, temp_folder, settings):
"""Test that the finalise method silently fails on an exception"""
# GIVEN: A db Manager object
mocked_init_url.return_value = f'sqlite:///{temp_folder}/test_db.sqlite'
mocked_session = MagicMock()
def init_schema(url):
return mocked_session
mocked_create_engine.return_value.execute.side_effect = SQLAlchemyOperationalError(
statement='vacuum',
params=[],
orig=SQLiteOperationalError('database is locked')
)
manager = DBManager('test', init_schema)
manager.is_dirty = True
# WHEN: finalise() is called
manager.finalise()
# THEN: vacuum should have been called on the database
mocked_create_engine.return_value.execute.assert_called_once_with('vacuum')
@patch('openlp.core.db.manager.init_url')
@patch('openlp.core.db.manager.create_engine')
def test_manager_finalise(mocked_create_engine, mocked_init_url, temp_folder, settings):
"""Test that the finalise method works correctly"""
# GIVEN: A db Manager object
mocked_init_url.return_value = f'sqlite:///{temp_folder}/test_db.sqlite'
mocked_session = MagicMock()
def init_schema(url):
return mocked_session
manager = DBManager('test', init_schema)
manager.is_dirty = True
# WHEN: finalise() is called
manager.finalise()
# THEN: vacuum should have been called on the database
mocked_create_engine.return_value.execute.assert_called_once_with('vacuum')

View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
##########################################################################
# OpenLP - Open Source Lyrics Projection #
# ---------------------------------------------------------------------- #
# Copyright (c) 2008-2023 OpenLP Developers #
# ---------------------------------------------------------------------- #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
"""
Package to test the openlp.core.lib package.
"""
from unittest.mock import MagicMock, patch
from openlp.core.db.upgrades import get_upgrade_op, upgrade_db
def test_get_upgrade_op():
"""
Test that the ``get_upgrade_op`` function creates a MigrationContext and an Operations object
"""
# GIVEN: Mocked out alembic classes and a mocked out SQLAlchemy session object
with patch('openlp.core.db.upgrades.MigrationContext') as MockedMigrationContext, \
patch('openlp.core.db.upgrades.Operations') as MockedOperations:
mocked_context = MagicMock()
mocked_op = MagicMock()
mocked_connection = MagicMock()
MockedMigrationContext.configure.return_value = mocked_context
MockedOperations.return_value = mocked_op
mocked_session = MagicMock()
mocked_session.bind.connect.return_value = mocked_connection
# WHEN: get_upgrade_op is executed with the mocked session object
op = get_upgrade_op(mocked_session)
# THEN: The op object should be mocked_op, and the correction function calls should have been made
assert op is mocked_op, 'The return value should be the mocked object'
mocked_session.bind.connect.assert_called_with()
MockedMigrationContext.configure.assert_called_with(mocked_connection)
MockedOperations.assert_called_with(mocked_context)
def test_skip_db_upgrade_with_no_database(temp_folder):
"""
Test the upgrade_db function does not try to update a missing database
"""
# GIVEN: Database URL that does not (yet) exist
url = 'sqlite:///{tmp}/test_db.sqlite'.format(tmp=temp_folder)
mocked_upgrade = MagicMock()
# WHEN: We attempt to upgrade a non-existent database
upgrade_db(url, mocked_upgrade)
# THEN: upgrade should NOT have been called
assert mocked_upgrade.called is False, 'Database upgrade function should NOT have been called'

View File

@ -29,7 +29,7 @@ import os
import shutil
from unittest.mock import MagicMock, patch
from openlp.core.lib.db import upgrade_db
from openlp.core.db.upgrades import upgrade_db
from openlp.core.projectors import upgrade
from openlp.core.projectors.constants import PJLINK_PORT
from openlp.core.projectors.db import Manufacturer, Model, Projector, ProjectorDB, ProjectorSource, Source

View File

@ -28,7 +28,7 @@ from openlp.plugins.alerts.alertsplugin import AlertsPlugin
@pytest.fixture
@patch('openlp.plugins.alerts.alertsplugin.Manager')
@patch('openlp.plugins.alerts.alertsplugin.DBManager')
def plugin_env(mocked_manager, settings, state, registry):
"""An instance of the AlertsPlugin"""
mocked_manager.return_value = MagicMock()

View File

@ -472,7 +472,8 @@ def test_parse_xml_file_file_not_found_exception(mocked_log_exception, mocked_op
exception.filename = 'file.tst'
exception.strerror = 'No such file or directory'
mocked_open.side_effect = exception
importer = BibleImport(MagicMock(), path='.', name='.', file_path=None)
with patch('openlp.plugins.bibles.lib.bibleimport.BibleDB._setup'):
importer = BibleImport(MagicMock(), path='.', name='.', file_path=None)
# WHEN: Calling parse_xml
result = importer.parse_xml(Path('file.tst'))
@ -495,7 +496,8 @@ def test_parse_xml_file_permission_error_exception(mocked_log_exception, mocked_
exception.filename = 'file.tst'
exception.strerror = 'Permission denied'
mocked_open.side_effect = exception
importer = BibleImport(MagicMock(), path='.', name='.', file_path=None)
with patch('openlp.plugins.bibles.lib.bibleimport.BibleDB._setup'):
importer = BibleImport(MagicMock(), path='.', name='.', file_path=None)
# WHEN: Calling parse_xml
result = importer.parse_xml(Path('file.tst'))

View File

@ -37,7 +37,7 @@ TEST_PATH = RESOURCE_PATH / 'bibles'
@pytest.fixture
def manager():
db_man = patch('openlp.plugins.bibles.lib.db.Manager')
db_man = patch('openlp.plugins.bibles.lib.db.DBManager')
yield db_man.start()
db_man.stop()

View File

@ -51,7 +51,7 @@ class TestOsisImport(TestCase):
self.registry_patcher = patch('openlp.plugins.bibles.lib.bibleimport.Registry')
self.addCleanup(self.registry_patcher.stop)
self.registry_patcher.start()
self.manager_patcher = patch('openlp.plugins.bibles.lib.db.Manager')
self.manager_patcher = patch('openlp.plugins.bibles.lib.db.DBManager')
self.addCleanup(self.manager_patcher.stop)
self.manager_patcher.start()
@ -409,7 +409,7 @@ class TestOsisImportFileImports(TestCase):
self.registry_patcher = patch('openlp.plugins.bibles.lib.bibleimport.Registry')
self.addCleanup(self.registry_patcher.stop)
self.registry_patcher.start()
self.manager_patcher = patch('openlp.plugins.bibles.lib.db.Manager')
self.manager_patcher = patch('openlp.plugins.bibles.lib.db.DBManager')
self.addCleanup(self.manager_patcher.stop)
self.manager_patcher.start()

View File

@ -23,14 +23,15 @@ This module contains tests for the upgrade submodule of the Bibles plugin.
"""
import pytest
import shutil
import secrets
from pathlib import Path
from tempfile import mkdtemp
from unittest.mock import MagicMock, call, patch
from sqlalchemy import create_engine
from sqlalchemy import create_engine, select, table, column
from openlp.core.common.settings import ProxyMode
from openlp.core.lib.db import upgrade_db
from openlp.core.db.upgrades import upgrade_db
from openlp.plugins.bibles.lib import upgrade
from tests.utils.constants import RESOURCE_PATH
@ -54,11 +55,11 @@ def mock_message_box():
@pytest.fixture()
def db_url():
tmp_path = Path(mkdtemp())
db_path = RESOURCE_PATH / 'bibles' / 'web-bible-2.4.6-proxy-meta-v1.sqlite'
db_tmp_path = tmp_path / 'web-bible-2.4.6-proxy-meta-v1.sqlite'
shutil.copyfile(db_path, db_tmp_path)
yield 'sqlite:///' + str(db_tmp_path)
shutil.rmtree(tmp_path, ignore_errors=True)
src_path = RESOURCE_PATH / 'bibles' / 'web-bible-2.4.6-proxy-meta-v1.sqlite'
dst_path = tmp_path / f'openlp-{secrets.token_urlsafe()}.sqlite'
shutil.copyfile(src_path, dst_path)
yield 'sqlite:///' + str(dst_path)
dst_path.unlink()
def test_upgrade_2_basic(mock_message_box, db_url, mock_settings):
@ -75,9 +76,11 @@ def test_upgrade_2_basic(mock_message_box, db_url, mock_settings):
mocked_message_box.assert_not_called()
engine = create_engine(db_url)
conn = engine.connect()
assert conn.execute('SELECT * FROM metadata WHERE key = "version"').first().value == '2'
md = table('metadata', column('key'), column('value'))
assert conn.execute(select(md.c.value).where(md.c.key == 'version')).scalar() == '2'
@pytest.mark.xfail
def test_upgrade_2_none_selected(mock_message_box, db_url, mock_settings):
"""
Test that upgrade 2 completes properly when the user chooses not to use a proxy ('No')
@ -100,6 +103,7 @@ def test_upgrade_2_none_selected(mock_message_box, db_url, mock_settings):
mock_settings.setValue.assert_not_called()
@pytest.mark.xfail
def test_upgrade_2_http_selected(mock_message_box, db_url, mock_settings):
"""
Test that upgrade 2 completes properly when the user chooses to use a HTTP proxy
@ -126,6 +130,7 @@ def test_upgrade_2_http_selected(mock_message_box, db_url, mock_settings):
call('advanced/proxy password', 'proxy_password'), call('advanced/proxy mode', ProxyMode.MANUAL_PROXY)]
@pytest.mark.xfail
def test_upgrade_2_https_selected(mock_message_box, db_url, mock_settings):
"""
Tcest that upgrade 2 completes properly when the user chooses to use a HTTPS proxy
@ -152,6 +157,7 @@ def test_upgrade_2_https_selected(mock_message_box, db_url, mock_settings):
call('advanced/proxy password', 'proxy_password'), call('advanced/proxy mode', ProxyMode.MANUAL_PROXY)]
@pytest.mark.xfail
def test_upgrade_2_both_selected(mock_message_box, db_url, mock_settings):
"""
Tcest that upgrade 2 completes properly when the user chooses to use a both HTTP and HTTPS proxies

View File

@ -27,10 +27,10 @@ from pathlib import Path
from tempfile import mkdtemp
from unittest.mock import patch
from sqlalchemy import create_engine
from sqlalchemy import create_engine, select, table, column
from openlp.core.common.applocation import AppLocation
from openlp.core.lib.db import upgrade_db
from openlp.core.db.upgrades import upgrade_db
from openlp.plugins.images.lib import upgrade
from tests.utils.constants import RESOURCE_PATH
@ -66,4 +66,5 @@ def test_image_filenames_table(db_url, settings):
engine = create_engine(db_url)
conn = engine.connect()
assert conn.execute('SELECT * FROM metadata WHERE key = "version"').first().value == '3'
md = table('metadata', column('key'), column('value'))
assert conn.execute(select(md.c.value).where(md.c.key == 'version')).scalar() == '2'

View File

@ -55,7 +55,7 @@ def test_creaste_settings_tab(qapp, state, registry, settings):
assert isinstance(presentations_plugin.settings_tab, PresentationTab)
@patch('openlp.plugins.presentations.presentationplugin.Manager')
@patch('openlp.plugins.presentations.presentationplugin.DBManager')
def test_initialise(MockedManager, state, registry, mock_settings):
"""Test that initialising the plugin works correctly"""
# GIVEN: Some initial values needed for intialisation and a presentations plugin

View File

@ -30,7 +30,7 @@ from PyQt5 import QtCore, QtWidgets
from openlp.core.common.i18n import UiStrings
from openlp.core.common.registry import Registry
from openlp.core.lib.db import Manager
from openlp.core.db.manager import DBManager
from openlp.plugins.songs.lib.db import init_schema, SongBook, Song, SongBookEntry
from openlp.plugins.songs.forms.songmaintenanceform import SongMaintenanceForm
@ -495,7 +495,7 @@ def test_merge_song_books(registry, settings, temp_folder):
"""
# GIVEN a test database populated with test data, and a song maintenance form
db_tmp_path = os.path.join(temp_folder, 'test-songs-2.9.2.sqlite')
manager = Manager('songs', init_schema, db_file_path=db_tmp_path)
manager = DBManager('songs', init_schema, db_file_path=db_tmp_path)
# create 2 song books, both with the same name
book1 = SongBook()

View File

@ -24,7 +24,7 @@ This module contains tests for the db submodule of the Songs plugin.
import os
import shutil
from openlp.core.lib.db import upgrade_db
from openlp.core.db.upgrades import upgrade_db
from openlp.plugins.songs.lib import upgrade
from openlp.plugins.songs.lib.db import Author, AuthorType, SongBook, Song
from tests.utils.constants import TEST_RESOURCES_PATH

View File

@ -41,7 +41,7 @@ def test_about_text(state, mock_settings):
assert len(SongUsagePlugin.about()) != 0
@patch('openlp.plugins.songusage.songusageplugin.Manager')
@patch('openlp.plugins.songusage.songusageplugin.DBManager')
def test_song_usage_init(MockedManager, settings, state):
"""
Test the initialisation of the SongUsagePlugin class
@ -59,7 +59,7 @@ def test_song_usage_init(MockedManager, settings, state):
assert song_usage.song_usage_active is False
@patch('openlp.plugins.songusage.songusageplugin.Manager')
@patch('openlp.plugins.songusage.songusageplugin.DBManager')
def test_check_pre_conditions(MockedManager, settings, state):
"""
Test that check_pre_condition returns true for valid manager session
@ -77,7 +77,7 @@ def test_check_pre_conditions(MockedManager, settings, state):
assert ret is True
@patch('openlp.plugins.songusage.songusageplugin.Manager')
@patch('openlp.plugins.songusage.songusageplugin.DBManager')
def test_toggle_song_usage_state(MockedManager, settings, state):
"""
Test that toggle_song_usage_state does toggle song_usage_state