diff --git a/openlp/core/lib/db.py b/openlp/core/lib/db.py index fc3ac69ff..0eebe1c79 100644 --- a/openlp/core/lib/db.py +++ b/openlp/core/lib/db.py @@ -568,12 +568,20 @@ class Manager(object): :param order_by_ref: Any parameters to order the returned objects by. Defaults to None. """ query = self.session.query(object_class) + # Check filter_clause if filter_clause is not None: - query = query.filter(filter_clause) - if isinstance(order_by_ref, list): - query = query.order_by(*order_by_ref) - elif order_by_ref is not None: - query = query.order_by(order_by_ref) + if isinstance(filter_clause, list): + for dbfilter in filter_clause: + query = query.filter(dbfilter) + else: + query = query.filter(filter_clause) + # Check order_by_ref + if order_by_ref is not None: + if isinstance(order_by_ref, list): + query = query.order_by(*order_by_ref) + else: + query = query.order_by(order_by_ref) + for try_count in range(3): try: return query.all() diff --git a/openlp/core/projectors/db.py b/openlp/core/projectors/db.py index 55f0ffa5e..092952eb5 100644 --- a/openlp/core/projectors/db.py +++ b/openlp/core/projectors/db.py @@ -159,29 +159,36 @@ class Projector(Base, CommonMixin): """ Return basic representation of Source table entry. """ - return '< Projector(id="{data}", ip="{ip}", port="{port}", mac_adx="{mac}", pin="{pin}", name="{name}", ' \ - 'location="{location}", notes="{notes}", pjlink_name="{pjlink_name}", pjlink_class="{pjlink_class}", ' \ - 'manufacturer="{manufacturer}", model="{model}", serial_no="{serial}", other="{other}", ' \ - 'sources="{sources}", source_list="{source_list}", model_filter="{mfilter}", ' \ - 'model_lamp="{mlamp}", sw_version="{sw_ver}") >'.format(data=self.id, - ip=self.ip, - port=self.port, - mac=self.mac_adx, - pin=self.pin, - name=self.name, - location=self.location, - notes=self.notes, - pjlink_name=self.pjlink_name, - pjlink_class=self.pjlink_class, - manufacturer=self.manufacturer, - model=self.model, - other=self.other, - sources=self.sources, - source_list=self.source_list, - serial=self.serial_no, - mfilter=self.model_filter, - mlamp=self.model_lamp, - sw_ver=self.sw_version) + return f'< Projector(id="{self.id}", ip="{self.ip}", port="{self.port}", mac_adx="{self.mac_adx}", ' \ + f'pin="{self.pin}", name="{self.name}", location="{self.location}", notes="{self.notes}", ' \ + f'pjlink_name="{self.pjlink_name}", pjlink_class="{self.pjlink_class}", ' \ + f'manufacturer="{self.manufacturer}", model="{self.model}", serial_no="{self.serial_no}", ' \ + f'other="{self.other}", sources="{self.sources}", source_list="{self.source_list}", ' \ + f'model_filter="{self.model_filter}", model_lamp="{self.model_lamp}", ' \ + f'sw_version="{self.sw_version}") >' + + def __eq__(self, other): + if not isinstance(other, Projector): + return False + # Does not check self.id == other.id + return \ + self.ip == other.ip and \ + self.port == other.port and \ + self.mac_adx == other.mac_adx and \ + self.pin == other.pin and \ + self.name == other.name and \ + self.location == other.location and \ + self.notes == other.notes and \ + self.pjlink_name == other.pjlink_name and \ + self.pjlink_class == other.pjlink_class and \ + self.manufacturer == other.manufacturer and \ + self.model == other.model and \ + self.other == other.other and \ + self.serial_no == other.serial_no and \ + self.sw_version == other.sw_version and \ + self.model_filter == other.model_filter and \ + self.model_lamp == other.model_lamp + ip = Column(String(100)) port = Column(String(8)) mac_adx = Column(String(18)) @@ -257,6 +264,62 @@ class ProjectorDB(Manager): metadata.create_all(checkfirst=True) return session + def get_projector(self, *args, **kwargs): + """ + Get projector instance(s) in database + + If projector=Projector() instance, use projector as filter object. + + id= or projector.id is not None: Filter by record.id + name= or projector.name is not None: Filter by record.name + ip= or projector.ip is not None: Filter by record.ip + port= or projector.port is not None: Filter by record.port + + Any other options ignored + + In order: + id returns 1 record - all other following options ignored + name returns 1 record - all other following options ignored + ip AND port returns 1 record + ip only may return 1+ records + port only may return 1+ records + + :returns: None if no record found, otherwise list + """ + db_filter = [] + projector = Projector() if 'projector' not in kwargs else kwargs['projector'] + if projector.id is None and 'id' in kwargs: + projector.id = int(kwargs['id']) + if projector.name is None and 'name' in kwargs: + projector.name = kwargs['name'] + if projector.ip is None and 'ip' in kwargs: + projector.ip = kwargs['ip'] + if projector.port is None and 'port' in kwargs: + projector.port = kwargs['port'] + + if projector.id is not None: + log.debug('Filter by ID') + db_filter.append(Projector.id == projector.id) + elif projector.name is not None: + log.debug('Filter by Name') + db_filter.append(Projector.name == projector.name) + else: + p = '' + if projector.ip is not None: + db_filter.append(Projector.ip == projector.ip) + p += " IP" + if projector.port is not None: + db_filter.append(Projector.port == projector.port) + p += " Port" + if len(p) > 0: + log.debug(f'Filter by{p}') + + if len(db_filter) < 1: + log.warning('get_projector(): No valid query found - cancelled') + return None + + return self.get_all_objects(object_class=Projector, filter_clause=db_filter) + def get_projector_by_id(self, dbid): """ Locate a DB record by record ID. @@ -333,15 +396,13 @@ class ProjectorDB(Manager): True if entry added False if entry already in DB or db error """ - old_projector = self.get_object_filtered(Projector, Projector.ip == projector.ip) + old_projector = self.get_object_filtered(Projector, + Projector.ip == projector.ip, + Projector.port == projector.port) if old_projector is not None: - log.warning('add_projector() skipping entry ip="{ip}" (Already saved)'.format(ip=old_projector.ip)) + log.warning(f'add_projector() Duplicate record ip={old_projector} port={old_projector.port}') return False - log.debug('add_projector() saving new entry') - log.debug('ip="{ip}", name="{name}", location="{location}"'.format(ip=projector.ip, - name=projector.name, - location=projector.location)) - log.debug('notes="{notes}"'.format(notes=projector.notes)) + log.debug(f'add_projector() saving new entry name="{projector.name}" ip={projector.ip} port={projector.port}') return self.save_object(projector) def update_projector(self, projector=None): diff --git a/tests/openlp_core/projectors/conftest.py b/tests/openlp_core/projectors/conftest.py index fb8a5ae0f..9a42319c1 100644 --- a/tests/openlp_core/projectors/conftest.py +++ b/tests/openlp_core/projectors/conftest.py @@ -23,13 +23,14 @@ Fixtures for projector tests """ import pytest +from pathlib import PurePath from unittest.mock import patch from openlp.core.projectors.db import Projector, ProjectorDB from openlp.core.projectors.manager import ProjectorManager from openlp.core.projectors.pjlink import PJLink from tests.helpers.projector import FakePJLink -from tests.resources.projector.data import TEST_DB, TEST1_DATA +from tests.resources.projector.data import TEST_DB, TEST1_DATA, TEST2_DATA, TEST3_DATA ''' NOTE: Since Registry is a singleton, sleight of hand allows us to verify @@ -86,6 +87,37 @@ def fake_pjlink(): del(faker) +@pytest.fixture() +def projectordb_mtdb(temp_folder, settings): + """ + Set up anything necessary for all tests + """ + tmpdb_url = f'sqlite:///{PurePath(temp_folder, TEST_DB)}' + with patch('openlp.core.projectors.db.init_url') as mocked_init_url: + mocked_init_url.return_value = tmpdb_url + proj = ProjectorDB() + yield proj + proj.session.close() + del proj + + +@pytest.fixture() +def projectordb(temp_folder, settings): + """ + Set up anything necessary for all tests + """ + tmpdb_url = f'sqlite:///{PurePath(temp_folder, TEST_DB)}' + with patch('openlp.core.projectors.db.init_url') as mocked_init_url: + mocked_init_url.return_value = tmpdb_url + proj = ProjectorDB() + proj.add_projector(Projector(**TEST1_DATA)) + proj.add_projector(Projector(**TEST2_DATA)) + proj.add_projector(Projector(**TEST3_DATA)) + yield proj + proj.session.close() + del proj + + @pytest.fixture() def pjlink(): pj_link = PJLink(Projector(**TEST1_DATA), no_poll=True) diff --git a/tests/openlp_core/projectors/db/test_get_projector.py b/tests/openlp_core/projectors/db/test_get_projector.py new file mode 100644 index 000000000..13274c92d --- /dev/null +++ b/tests/openlp_core/projectors/db/test_get_projector.py @@ -0,0 +1,320 @@ +# -*- coding: utf-8 -*- + +########################################################################## +# OpenLP - Open Source Lyrics Projection # +# ---------------------------------------------------------------------- # +# Copyright (c) 2008-2022 OpenLP Developers # +# ---------------------------------------------------------------------- # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +########################################################################## +""" +Test ProjectorDB.get_projectors() +""" + +import logging +import openlp.core.projectors.db + +from openlp.core.projectors.constants import PJLINK_PORT + +from tests.resources.projector.data import TEST1_DATA, TEST2_DATA, TEST3_DATA + +test_module = openlp.core.projectors.db.__name__ +Projector = openlp.core.projectors.db.Projector + + +def test_invalid_projector(projectordb, caplog): + """ + Test get_projector with no Projector() instance and no options + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.WARNING, 'get_projector(): No valid query found - cancelled')] + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector() + + # THEN: Only log entries found and return is None + assert caplog.record_tuples == logs, 'Invalid log entries' + assert t_chk is None, 'Should have returned None' + + +def test_invald_key(projectordb, caplog): + """ + Test returning None if not one of the main keys + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.WARNING, 'get_projector(): No valid query found - cancelled')] + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(pin='') + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert t_chk is None, 'Should have returned None' + + +def test_by_id(projectordb, caplog): + """ + Test returning one entry by ID + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by ID')] + t_p2 = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(id=2) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2 == t_chk[0], 'DB record != t_chk' + + +def test_by_name(projectordb, caplog): + """ + Test returning one entry by name + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by Name')] + t_p2 = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(name=t_p2.name) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2 == t_chk[0], 'DB record != t_chk' + + +def test_by_ip_single(projectordb, caplog): + """ + Test returning one entry by IP + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by IP')] + t_p2 = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(ip=t_p2.ip) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2 == t_chk[0], 'DB record != t_chk' + + +def test_by_ip_multiple(projectordb, caplog): + """ + Test returning multiple entries by IP + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by IP')] + t_p2 = Projector(**TEST2_DATA) + t_p2.id = None + t_p2.port = PJLINK_PORT + projectordb.add_projector(projector=t_p2) + t_p2chk = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(ip=t_p2.ip) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 2, 'Should have returned 2 records' + assert t_p2.name == t_chk[0].name and t_p2.name == t_chk[1].name, 'DB records names do not match' + assert t_p2.ip == t_chk[0].ip and t_p2.ip == t_chk[1].ip, 'DB records IP addresses do not match' + assert t_p2.name == t_chk[0].name and t_p2.name == t_chk[1].name, 'DB records names do not match' + assert t_p2chk.port == t_chk[0].port and t_p2.port == t_chk[1].port, 'DB records ports do not match' + + +def test_by_port_single(projectordb, caplog): + """ + Test returning one entry by Port + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by Port')] + t_p2 = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(port=t_p2.port) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2 == t_chk[0], 'DB record != t_chk' + + +def test_by_port_multiple(projectordb_mtdb, caplog): + """ + Test returning one entry by Port + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by Port')] + + t_p1 = Projector(**TEST1_DATA) + t_p1.port = PJLINK_PORT + projectordb_mtdb.add_projector(t_p1) + t_p2 = Projector(**TEST2_DATA) + t_p2.port = PJLINK_PORT + projectordb_mtdb.add_projector(t_p2) + t_p3 = Projector(**TEST3_DATA) + t_p3.port = PJLINK_PORT + projectordb_mtdb.add_projector(t_p3) + + # WHEN: Called + caplog.clear() + t_chk = projectordb_mtdb.get_projector(port=PJLINK_PORT) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 3, 'Should have returned 3 records' + assert t_chk[0].ip == TEST1_DATA['ip'], 'DB record 1 IP does not match' + assert t_chk[1].ip == TEST2_DATA['ip'], 'DB record 2 IP does not match' + assert t_chk[2].ip == TEST3_DATA['ip'], 'DB record 3 IP does not match' + + +def test_by_ip_port(projectordb, caplog): + """ + Test returning one entry by IP and Port + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by IP Port')] + t_p2 = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(port=t_p2.port, ip=t_p2.ip) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2 == t_chk[0], 'DB record != t_chk' + + +def test_by_projector_id(projectordb, caplog): + """ + Test returning one entry by projector.id + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by ID')] + t_p2 = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(projector=t_p2) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2 == t_chk[0], 'DB record != t_chk' + + +def test_by_projector_name(projectordb, caplog): + """ + Test returning one entry by projector.name + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by Name')] + t_p2 = Projector(**TEST2_DATA) + t_p2.id = None + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(projector=t_p2) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2 == t_chk[0], 'DB record != t_chk' + + +def test_by_projector_ip(projectordb, caplog): + """ + Test returning one entry by projector.ip + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by IP')] + t_p2 = Projector(**TEST2_DATA) + t_p2.id = t_p2.name = t_p2.port = None + t_p2chk = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(projector=t_p2) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2chk == t_chk[0], 'DB record != t_chk' + + +def test_by_projector_port(projectordb, caplog): + """ + Test returning one entry by projector.port + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by Port')] + t_p2 = Projector(**TEST2_DATA) + t_p2.id = t_p2.name = t_p2.ip = None + t_p2chk = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(projector=t_p2) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2chk == t_chk[0], 'DB record != t_chk' + + +def test_by_projector_ip_port(projectordb, caplog): + """ + Test returning one entry by projector.port + """ + # GIVEN: Test setup + caplog.set_level(logging.DEBUG) + logs = [(test_module, logging.DEBUG, 'Filter by IP Port')] + t_p2 = Projector(**TEST2_DATA) + t_p2.id = t_p2.name = None + t_p2chk = Projector(**TEST2_DATA) + + # WHEN: Called + caplog.clear() + t_chk = projectordb.get_projector(projector=t_p2) + + # THEN: Logs and one returned item + assert caplog.record_tuples == logs, 'Invalid log entries' + assert len(t_chk) == 1, 'More than one record returned' + assert t_p2chk == t_chk[0], 'DB record != t_chk'