Merge branch 'sqlalchemy-update' into 'master'

Update database_exists method. maybe it will support multiple versions.

See merge request openlp/openlp!311
This commit is contained in:
Raoul Snyman 2021-03-26 21:21:43 +00:00
commit 5a55b28bb3

View File

@ -31,7 +31,7 @@ from urllib.parse import quote_plus as urlquote
from alembic.migration import MigrationContext
from alembic.operations import Operations
from sqlalchemy import Column, MetaData, Table, UnicodeText, create_engine, types
from sqlalchemy.engine.url import make_url
from sqlalchemy.engine.url import make_url, URL
from sqlalchemy.exc import DBAPIError, InvalidRequestError, OperationalError, ProgrammingError, SQLAlchemyError
from sqlalchemy.orm import mapper, scoped_session, sessionmaker
from sqlalchemy.pool import NullPool
@ -47,62 +47,94 @@ from openlp.core.lib.ui import critical_error_message_box
log = logging.getLogger(__name__)
def _set_url_database(url, database):
try:
ret = URL.create(
drivername=url.drivername,
username=url.username,
password=url.password,
host=url.host,
port=url.port,
database=database,
query=url.query
)
except AttributeError: # SQLAlchemy <1.4
url.database = database
ret = url
assert ret.database == database, ret
return ret
def _get_scalar_result(engine, sql):
with engine.connect() as conn:
return conn.scalar(sql)
def _sqlite_file_exists(database):
if not os.path.isfile(database) or os.path.getsize(database) < 100:
return False
with open(database, 'rb') as f:
header = f.read(100)
return header[:16] == b'SQLite format 3\x00'
def database_exists(url):
"""Check if a database exists.
:param url: A SQLAlchemy engine URL.
Performs backend-specific testing to quickly determine if a database
exists on the server. ::
database_exists('postgres://postgres@localhost/name') #=> False
create_database('postgres://postgres@localhost/name')
database_exists('postgres://postgres@localhost/name') #=> True
database_exists('postgresql://postgres@localhost/name') #=> False
create_database('postgresql://postgres@localhost/name')
database_exists('postgresql://postgres@localhost/name') #=> True
Supports checking against a constructed URL as well. ::
engine = create_engine('postgres://postgres@localhost/name')
engine = create_engine('postgresql://postgres@localhost/name')
database_exists(engine.url) #=> False
create_database(engine.url)
database_exists(engine.url) #=> True
Borrowed from SQLAlchemy_Utils (v0.32.14) since we only need this one function.
Borrowed from SQLAlchemy_Utils since we only need this one function.
Copied from a fork/pull request since SQLAlchemy_Utils didn't supprt SQLAlchemy 1.4 when it was released:
https://github.com/nsoranzo/sqlalchemy-utils/blob/4f52578/sqlalchemy_utils/functions/database.py
"""
url = copy(make_url(url))
database = url.database
if url.drivername.startswith('postgresql'):
url.database = 'template1'
else:
url.database = None
dialect_name = url.get_dialect().name
engine = create_engine(url)
if dialect_name == 'postgresql':
text = "SELECT 1 FROM pg_database WHERE datname='%s'" % database
for db in (database, 'postgres', 'template1', 'template0', None):
url = _set_url_database(url, database=db)
engine = create_engine(url, poolclass=NullPool)
try:
return bool(_get_scalar_result(engine, text))
except (ProgrammingError, OperationalError):
pass
return False
if engine.dialect.name == 'postgresql':
text = "SELECT 1 FROM pg_database WHERE datname='{db}'".format(db=database)
return bool(engine.execute(text).scalar())
elif engine.dialect.name == 'mysql':
elif dialect_name == 'mysql':
url = _set_url_database(url, database=None)
engine = create_engine(url, poolclass=NullPool)
text = ("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA "
"WHERE SCHEMA_NAME = '{db}'".format(db=database))
return bool(engine.execute(text).scalar())
"WHERE SCHEMA_NAME = '%s'" % database)
return bool(_get_scalar_result(engine, text))
elif engine.dialect.name == 'sqlite':
elif dialect_name == 'sqlite':
url = _set_url_database(url, database=None)
engine = create_engine(url, poolclass=NullPool)
if database:
return database == ':memory:' or os.path.exists(database)
return database == ':memory:' or _sqlite_file_exists(database)
else:
# The default SQLAlchemy database is in memory,
# and :memory is not required, thus we should support that use-case
return True
else:
text = 'SELECT 1'
try:
url.database = database
engine = create_engine(url)
engine.execute(text)
return True
engine = create_engine(url, poolclass=NullPool)
return bool(_get_scalar_result(engine, text))
except (ProgrammingError, OperationalError):
return False