From 8ef650b6cbb76e974650b3213c54b75ea82b19da Mon Sep 17 00:00:00 2001 From: Tomas Groth Date: Fri, 26 Mar 2021 21:21:42 +0000 Subject: [PATCH] Update database_exists method. maybe it will support multiple versions. --- openlp/core/lib/db.py | 94 +++++++++++++++++++++++++++++-------------- 1 file changed, 63 insertions(+), 31 deletions(-) diff --git a/openlp/core/lib/db.py b/openlp/core/lib/db.py index dcff08497..a4df3aa61 100644 --- a/openlp/core/lib/db.py +++ b/openlp/core/lib/db.py @@ -31,7 +31,7 @@ from urllib.parse import quote_plus as urlquote from alembic.migration import MigrationContext from alembic.operations import Operations from sqlalchemy import Column, MetaData, Table, UnicodeText, create_engine, types -from sqlalchemy.engine.url import make_url +from sqlalchemy.engine.url import make_url, URL from sqlalchemy.exc import DBAPIError, InvalidRequestError, OperationalError, ProgrammingError, SQLAlchemyError from sqlalchemy.orm import mapper, scoped_session, sessionmaker from sqlalchemy.pool import NullPool @@ -47,62 +47,94 @@ from openlp.core.lib.ui import critical_error_message_box log = logging.getLogger(__name__) +def _set_url_database(url, database): + try: + ret = URL.create( + drivername=url.drivername, + username=url.username, + password=url.password, + host=url.host, + port=url.port, + database=database, + query=url.query + ) + except AttributeError: # SQLAlchemy <1.4 + url.database = database + ret = url + assert ret.database == database, ret + return ret + + +def _get_scalar_result(engine, sql): + with engine.connect() as conn: + return conn.scalar(sql) + + +def _sqlite_file_exists(database): + if not os.path.isfile(database) or os.path.getsize(database) < 100: + return False + + with open(database, 'rb') as f: + header = f.read(100) + + return header[:16] == b'SQLite format 3\x00' + + def database_exists(url): """Check if a database exists. - :param url: A SQLAlchemy engine URL. - Performs backend-specific testing to quickly determine if a database exists on the server. :: - - database_exists('postgres://postgres@localhost/name') #=> False - create_database('postgres://postgres@localhost/name') - database_exists('postgres://postgres@localhost/name') #=> True - + database_exists('postgresql://postgres@localhost/name') #=> False + create_database('postgresql://postgres@localhost/name') + database_exists('postgresql://postgres@localhost/name') #=> True Supports checking against a constructed URL as well. :: - - engine = create_engine('postgres://postgres@localhost/name') + engine = create_engine('postgresql://postgres@localhost/name') database_exists(engine.url) #=> False create_database(engine.url) database_exists(engine.url) #=> True - Borrowed from SQLAlchemy_Utils (v0.32.14) since we only need this one function. + Borrowed from SQLAlchemy_Utils since we only need this one function. + Copied from a fork/pull request since SQLAlchemy_Utils didn't supprt SQLAlchemy 1.4 when it was released: + https://github.com/nsoranzo/sqlalchemy-utils/blob/4f52578/sqlalchemy_utils/functions/database.py """ url = copy(make_url(url)) database = url.database - if url.drivername.startswith('postgresql'): - url.database = 'template1' - else: - url.database = None + dialect_name = url.get_dialect().name - engine = create_engine(url) + if dialect_name == 'postgresql': + text = "SELECT 1 FROM pg_database WHERE datname='%s'" % database + for db in (database, 'postgres', 'template1', 'template0', None): + url = _set_url_database(url, database=db) + engine = create_engine(url, poolclass=NullPool) + try: + return bool(_get_scalar_result(engine, text)) + except (ProgrammingError, OperationalError): + pass + return False - if engine.dialect.name == 'postgresql': - text = "SELECT 1 FROM pg_database WHERE datname='{db}'".format(db=database) - return bool(engine.execute(text).scalar()) - - elif engine.dialect.name == 'mysql': + elif dialect_name == 'mysql': + url = _set_url_database(url, database=None) + engine = create_engine(url, poolclass=NullPool) text = ("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA " - "WHERE SCHEMA_NAME = '{db}'".format(db=database)) - return bool(engine.execute(text).scalar()) + "WHERE SCHEMA_NAME = '%s'" % database) + return bool(_get_scalar_result(engine, text)) - elif engine.dialect.name == 'sqlite': + elif dialect_name == 'sqlite': + url = _set_url_database(url, database=None) + engine = create_engine(url, poolclass=NullPool) if database: - return database == ':memory:' or os.path.exists(database) + return database == ':memory:' or _sqlite_file_exists(database) else: # The default SQLAlchemy database is in memory, # and :memory is not required, thus we should support that use-case return True - else: text = 'SELECT 1' try: - url.database = database - engine = create_engine(url) - engine.execute(text) - return True - + engine = create_engine(url, poolclass=NullPool) + return bool(_get_scalar_result(engine, text)) except (ProgrammingError, OperationalError): return False