Convert Bibles to use declarative_base

This commit is contained in:
Raoul Snyman 2023-03-08 22:42:34 -07:00
parent efda2c9e53
commit dfc53a40f3
4 changed files with 134 additions and 129 deletions

View File

@ -18,25 +18,31 @@
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
##########################################################################
import logging
import re
import sqlite3
import time
from pathlib import Path
from typing import Any, List, Optional, Tuple
import chardet
from PyQt5 import QtCore
from sqlalchemy import Column, ForeignKey, Table, func, or_, types
from sqlalchemy import Column, ForeignKey, MetaData, func, or_
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import class_mapper, mapper, relation
from sqlalchemy.orm.exc import UnmappedClassError
from sqlalchemy.orm import Session, relationship
from sqlalchemy.types import Unicode, UnicodeText, Integer
# Maintain backwards compatibility with older versions of SQLAlchemy while supporting SQLAlchemy 1.4+
try:
from sqlalchemy.orm import declarative_base
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
from openlp.core.common import clean_filename
from openlp.core.common.enum import LanguageSelection
from openlp.core.common.applocation import AppLocation
from openlp.core.common.i18n import translate
from openlp.core.lib.db import BaseModel, Manager, init_db
from openlp.core.lib.db import Manager, init_db
from openlp.core.lib.ui import critical_error_message_box
from openlp.plugins.bibles.lib import BibleStrings, upgrade
@ -46,74 +52,6 @@ log = logging.getLogger(__name__)
RESERVED_CHARACTERS = '\\.^$*+?{}[]()'
class BibleMeta(BaseModel):
"""
Bible Meta Data
"""
pass
class Book(BaseModel):
"""
Bible Book model
"""
def get_name(self, language_selection=LanguageSelection.Bible):
if language_selection == LanguageSelection.Bible:
return self.name
elif language_selection == LanguageSelection.Application:
return BibleStrings().BookNames[BiblesResourcesDB.get_book_by_id(self.book_reference_id)['abbreviation']]
elif language_selection == LanguageSelection.English:
return BiblesResourcesDB.get_book_by_id(self.book_reference_id)['name']
class Verse(BaseModel):
"""
Topic model
"""
pass
def init_schema(url):
"""
Setup a bible database connection and initialise the database schema.
:param url: The database to setup.
"""
session, metadata = init_db(url)
meta_table = Table('metadata', metadata,
Column('key', types.Unicode(255), primary_key=True, index=True),
Column('value', types.Unicode(255)))
book_table = Table('book', metadata,
Column('id', types.Integer, primary_key=True),
Column('book_reference_id', types.Integer, index=True),
Column('testament_reference_id', types.Integer),
Column('name', types.Unicode(50), index=True))
verse_table = Table('verse', metadata,
Column('id', types.Integer, primary_key=True, index=True),
Column('book_id', types.Integer, ForeignKey('book.id'), index=True),
Column('chapter', types.Integer, index=True),
Column('verse', types.Integer, index=True),
Column('text', types.UnicodeText, index=True))
try:
class_mapper(BibleMeta)
except UnmappedClassError:
mapper(BibleMeta, meta_table)
try:
class_mapper(Book)
except UnmappedClassError:
mapper(Book, book_table, properties={'verses': relation(Verse, backref='book')})
try:
class_mapper(Verse)
except UnmappedClassError:
mapper(Verse, verse_table)
metadata.create_all(checkfirst=True)
return session
class BibleDB(Manager):
"""
This class represents a database-bound Bible. It is used as a base class for all the custom importers, so that
@ -161,20 +99,86 @@ class BibleDB(Manager):
self.file_path = Path(clean_filename(self.name) + '.sqlite')
if 'file' in kwargs:
self.file_path = kwargs['file']
Manager.__init__(self, 'bibles', init_schema, self.file_path, upgrade)
Manager.__init__(self, 'bibles', self.init_schema, self.file_path, upgrade)
if self.session and 'file' in kwargs:
self.get_name()
self._is_web_bible = None
def get_name(self):
def init_schema(self, url: str) -> Session:
"""
Setup a bible database connection and initialise the database schema.
Due to the fact that we have to set up separate ``Base`` classes for each database, all the models are declared
here. They can be subsequently referenced via ``self.ModelName`` or ``bible.ModelName``.
:param url: The database to setup.
"""
Base = declarative_base(MetaData)
class BibleMeta(Base):
"""
Bible Meta Data
"""
__tablename__ = 'metadata'
key = Column(Unicode(255), primary_key=True, index=True)
value = Column(Unicode(255))
class Book(Base):
"""
Bible Book model
"""
__tablename__ = 'book'
id = Column(Integer, primary_key=True)
book_reference_id = Column(Integer, index=True)
testament_reference_id = Column(Integer)
name = Column(Unicode(50), index=True)
verses = relationship('Verse', back_populates='book')
def get_name(self, language_selection=LanguageSelection.Bible) -> str:
if language_selection == LanguageSelection.Bible:
return self.name
elif language_selection == LanguageSelection.Application:
return BibleStrings().BookNames[
BiblesResourcesDB.get_book_by_id(self.book_reference_id)['abbreviation']
]
elif language_selection == LanguageSelection.English:
return BiblesResourcesDB.get_book_by_id(self.book_reference_id)['name']
class Verse(Base):
"""
Topic model
"""
__tablename__ = 'verse'
id = Column(Integer, primary_key=True, index=True)
book_id = Column(Integer, ForeignKey('book.id'), index=True)
chapter = Column(Integer, index=True)
verse = Column(Integer, index=True)
text = Column(UnicodeText, index=True)
book = relationship('Book', back_populates='verses')
# Assign the classes so that they can be used elsewhere in the BibleDB class
self.BibleMeta = BibleMeta
self.Book = Book
self.Verse = Verse
session, metadata = init_db(url, base=Base)
metadata.create_all(checkfirst=True)
return session
def get_name(self) -> str:
"""
Returns the version name of the Bible.
"""
version_name = self.get_object(BibleMeta, 'name')
version_name = self.get_object(self.BibleMeta, 'name')
self.name = version_name.value if version_name else None
return self.name
def create_book(self, name, bk_ref_id, testament=1):
def create_book(self, name: str, bk_ref_id: str, testament: int = 1):
"""
Add a book to the database.
@ -184,7 +188,7 @@ class BibleDB(Manager):
bibles_resources.sqlite of the testament this book belongs to.
"""
log.debug('BibleDB.create_book("{name}", "{number}")'.format(name=name, number=bk_ref_id))
book = Book.populate(name=name, book_reference_id=bk_ref_id, testament_reference_id=testament)
book = self.Book(name=name, book_reference_id=bk_ref_id, testament_reference_id=testament)
self.save_object(book)
return book
@ -197,18 +201,18 @@ class BibleDB(Manager):
log.debug('BibleDB.update_book("{name}")'.format(name=book.name))
return self.save_object(book)
def delete_book(self, db_book):
def delete_book(self, db_book) -> bool:
"""
Delete a book from the database.
:param db_book: The book object.
"""
log.debug('BibleDB.delete_book("{name}")'.format(name=db_book.name))
if self.delete_object(Book, db_book.id):
if self.delete_object(self.Book, db_book.id):
return True
return False
def create_chapter(self, book_id, chapter, text_list):
def create_chapter(self, book_id: int, chapter: int, text_list: List[str]):
"""
Add a chapter and its verses to a book.
@ -220,12 +224,7 @@ class BibleDB(Manager):
log.debug('BibleDBcreate_chapter("{number}", "{chapter}")'.format(number=book_id, chapter=chapter))
# Text list has book and chapter as first two elements of the array.
for verse_number, verse_text in text_list.items():
verse = Verse.populate(
book_id=book_id,
chapter=chapter,
verse=verse_number,
text=verse_text
)
verse = self.Verse(book_id=book_id, chapter=chapter, verse=verse_number, text=verse_text)
self.session.add(verse)
try:
self.session.commit()
@ -234,7 +233,7 @@ class BibleDB(Manager):
time.sleep(0.01)
self.session.commit()
def create_verse(self, book_id, chapter, verse, text):
def create_verse(self, book_id: int, chapter: int, verse: int, text: str):
"""
Add a single verse to a chapter.
@ -246,16 +245,11 @@ class BibleDB(Manager):
if not isinstance(text, str):
details = chardet.detect(text)
text = str(text, details['encoding'])
verse = Verse.populate(
book_id=book_id,
chapter=chapter,
verse=verse,
text=text
)
verse = self.Verse(book_id=book_id, chapter=chapter, verse=verse, text=text)
self.session.add(verse)
return verse
def save_meta(self, key, value):
def save_meta(self, key: str, value: Any):
"""
Utility method to save or update BibleMeta objects in a Bible database.
@ -265,41 +259,41 @@ class BibleDB(Manager):
if not isinstance(value, str):
value = str(value)
log.debug('BibleDB.save_meta("{key}/{val}")'.format(key=key, val=value))
meta = self.get_object(BibleMeta, key)
meta = self.get_object(self.BibleMeta, key)
if meta:
meta.value = value
self.save_object(meta)
else:
self.save_object(BibleMeta.populate(key=key, value=value))
self.save_object(self.BibleMeta(key=key, value=value))
def get_book(self, book):
def get_book(self, book: str):
"""
Return a book object from the database.
:param book: The name of the book to return.
"""
log.debug('BibleDB.get_book("{book}")'.format(book=book))
return self.get_object_filtered(Book, Book.name.like(book + '%'))
return self.get_object_filtered(self.Book, self.Book.name.like(book + '%'))
def get_books(self, book=None):
def get_books(self, book: Optional[str] = None):
"""
A wrapper so both local and web bibles have a get_books() method that
manager can call. Used in the media manager advanced search tab.
"""
log.debug('BibleDB.get_books("{book}")'.format(book=book))
filter = Book.name.like(book + '%') if book else None
return self.get_all_objects(Book, filter_clause=filter, order_by_ref=Book.id)
filter = self.Book.name.like(book + '%') if book else None
return self.get_all_objects(self.Book, filter_clause=filter, order_by_ref=self.Book.id)
def get_book_by_book_ref_id(self, ref_id):
def get_book_by_book_ref_id(self, ref_id: str):
"""
Return a book object from the database.
:param ref_id: The reference id of the book to return.
"""
log.debug('BibleDB.get_book_by_book_ref_id("{ref}")'.format(ref=ref_id))
return self.get_object_filtered(Book, Book.book_reference_id.like(ref_id))
return self.get_object_filtered(self.Book, self.Book.book_reference_id.like(ref_id))
def get_book_ref_id_by_localised_name(self, book, language_selection):
def get_book_ref_id_by_localised_name(self, book: str, language_selection):
"""
Return the ids of a matching named book.
@ -331,7 +325,7 @@ class BibleDB(Manager):
book_list = books
return [value['id'] for value in book_list if self.get_book_by_book_ref_id(value['id'])]
def get_verses(self, reference_list, show_error=True):
def get_verses(self, reference_list: List[Tuple[str, int, int, int]], show_error: bool = True):
"""
This is probably the most used function. It retrieves the list of
verses based on the user's query.
@ -358,12 +352,12 @@ class BibleDB(Manager):
log.debug('Book name corrected to "{book}"'.format(book=db_book.name))
if end_verse == -1:
end_verse = self.get_verse_count(book_id, chapter)
verses = self.session.query(Verse) \
verses = self.session.query(self.Verse) \
.filter_by(book_id=db_book.id) \
.filter_by(chapter=chapter) \
.filter(Verse.verse >= start_verse) \
.filter(Verse.verse <= end_verse) \
.order_by(Verse.verse) \
.filter(self.Verse.verse >= start_verse) \
.filter(self.Verse.verse <= end_verse) \
.order_by(self.Verse.verse) \
.all()
verse_list.extend(verses)
else:
@ -376,7 +370,7 @@ class BibleDB(Manager):
'could be found in this Bible. Check that you have spelled the name of the book correctly.'))
return verse_list
def verse_search(self, text):
def verse_search(self, text: str):
"""
Search for verses containing text ``text``.
@ -387,32 +381,32 @@ class BibleDB(Manager):
values.
"""
log.debug('BibleDB.verse_search("{text}")'.format(text=text))
verses = self.session.query(Verse)
verses = self.session.query(self.Verse)
if text.find(',') > -1:
keywords = ['%{keyword}%'.format(keyword=keyword.strip()) for keyword in text.split(',') if keyword.strip()]
or_clause = [Verse.text.like(keyword) for keyword in keywords]
or_clause = [self.Verse.text.like(keyword) for keyword in keywords]
verses = verses.filter(or_(*or_clause))
else:
keywords = ['%{keyword}%'.format(keyword=keyword.strip()) for keyword in text.split(' ') if keyword.strip()]
for keyword in keywords:
verses = verses.filter(Verse.text.like(keyword))
verses = verses.filter(self.Verse.text.like(keyword))
verses = verses.all()
return verses
def get_chapter_count(self, book):
def get_chapter_count(self, book) -> int:
"""
Return the number of chapters in a book.
:param book: The book object to get the chapter count for.
"""
log.debug('BibleDB.get_chapter_count("{book}")'.format(book=book.name))
count = self.session.query(func.max(Verse.chapter)).join(Book).filter(
Book.book_reference_id == book.book_reference_id).scalar()
count = self.session.query(func.max(self.Verse.chapter)).join(self.Book).filter(
self.Book.book_reference_id == book.book_reference_id).scalar()
if not count:
return 0
return count
def get_verse_count(self, book_ref_id, chapter):
def get_verse_count(self, book_ref_id: str, chapter: int) -> int:
"""
Return the number of verses in a chapter.
@ -420,16 +414,16 @@ class BibleDB(Manager):
:param chapter: The chapter to get the verse count for.
"""
log.debug('BibleDB.get_verse_count("{ref}", "{chapter}")'.format(ref=book_ref_id, chapter=chapter))
count = self.session.query(func.max(Verse.verse)).join(Book) \
.filter(Book.book_reference_id == book_ref_id) \
.filter(Verse.chapter == chapter) \
count = self.session.query(func.max(self.Verse.verse)).join(self.Book) \
.filter(self.Book.book_reference_id == book_ref_id) \
.filter(self.Verse.chapter == chapter) \
.scalar()
if not count:
return 0
return count
@property
def is_web_bible(self):
def is_web_bible(self) -> bool:
"""
A read only property indicating if the bible is a 'web bible'
@ -437,7 +431,7 @@ class BibleDB(Manager):
:rtype: bool
"""
if self._is_web_bible is None:
self._is_web_bible = bool(self.get_object(BibleMeta, 'download_source'))
self._is_web_bible = bool(self.get_object(self.BibleMeta, 'download_source'))
return self._is_web_bible
def dump_bible(self):
@ -446,10 +440,10 @@ class BibleDB(Manager):
"""
log.debug('.........Dumping Bible Database')
log.debug('...............................Books ')
books = self.session.query(Book).all()
books = self.session.query(self.Book).all()
log.debug(books)
log.debug('...............................Verses ')
verses = self.session.query(Verse).all()
verses = self.session.query(self.Verse).all()
log.debug(verses)

View File

@ -29,7 +29,7 @@ from openlp.core.common.i18n import UiStrings, translate
from openlp.core.common.mixins import LogMixin, RegistryProperties
from openlp.core.common.registry import Registry
from openlp.plugins.bibles.lib import parse_reference
from openlp.plugins.bibles.lib.db import BibleDB, BibleMeta
from openlp.plugins.bibles.lib.db import BibleDB
from .importers.csvbible import CSVBible
from .importers.http import HTTPBible
@ -151,8 +151,8 @@ class BibleManager(LogMixin, RegistryProperties):
self.db_cache[name] = bible
# Look to see if lazy load bible exists and get create getter.
if self.db_cache[name].is_web_bible:
source = self.db_cache[name].get_object(BibleMeta, 'download_source')
download_name = self.db_cache[name].get_object(BibleMeta, 'download_name').value
source = self.db_cache[name].get_object(bible.BibleMeta, 'download_source')
download_name = self.db_cache[name].get_object(bible.BibleMeta, 'download_name').value
web_bible = HTTPBible(self.parent, path=self.path, file=file_path, download_source=source.value,
download_name=download_name)
self.db_cache[name] = web_bible
@ -376,7 +376,8 @@ class BibleManager(LogMixin, RegistryProperties):
Returns the meta data for a given key.
"""
log.debug('get_meta {bible},{key}'.format(bible=bible, key=key))
return self.db_cache[bible].get_object(BibleMeta, key)
bible_db = self.db_cache[bible]
return bible_db.get_object(bible_db.BibleMeta, key)
def update_book(self, bible, book):
"""

View File

@ -412,6 +412,8 @@ def test_file_import(manager, mock_settings):
importer.session = MagicMock()
importer.get_language = MagicMock()
importer.get_language.return_value = 'Danish'
importer.get_language_id = MagicMock()
importer.get_language_id.return_value = 'dk'
# WHEN: Importing bible file
importer.file_path = TEST_PATH / bible_file

View File

@ -432,6 +432,8 @@ class TestOsisImportFileImports(TestCase):
importer.session = MagicMock()
importer.get_language = MagicMock()
importer.get_language.return_value = 'Danish'
importer.get_language_id = MagicMock()
importer.get_language_id.return_value = 'dk'
# WHEN: Importing bible file
importer.file_path = TEST_PATH / bible_file
@ -461,6 +463,8 @@ class TestOsisImportFileImports(TestCase):
importer.session = MagicMock()
importer.get_language = MagicMock()
importer.get_language.return_value = 'English'
importer.get_language_id = MagicMock()
importer.get_language_id.return_value = 'en'
# WHEN: Importing bible file
importer.file_path = TEST_PATH / bible_file
@ -490,6 +494,8 @@ class TestOsisImportFileImports(TestCase):
importer.session = MagicMock()
importer.get_language = MagicMock()
importer.get_language.return_value = 'English'
importer.get_language_id = MagicMock()
importer.get_language_id.return_value = 'en'
# WHEN: Importing bible file
importer.file_path = TEST_PATH / bible_file
@ -519,6 +525,8 @@ class TestOsisImportFileImports(TestCase):
importer.session = MagicMock()
importer.get_language = MagicMock()
importer.get_language.return_value = 'Danish'
importer.get_language_id = MagicMock()
importer.get_language_id.return_value = 'dk'
# WHEN: Importing bible file
importer.file_path = TEST_PATH / bible_file