Add support for importing EasyWorship 6 databases

bzr-revno: 2741
Fixes: https://launchpad.net/bugs/1675641
This commit is contained in:
Gary Talent 2017-05-22 12:07:07 -07:00 committed by Raoul Snyman
commit 549ec886a5
6 changed files with 177 additions and 78 deletions

View File

@ -512,10 +512,13 @@ def strip_rtf(text, default_encoding=None):
elif not ignorable:
ebytes.append(int(hex_, 16))
elif tchar:
if curskip > 0:
curskip -= 1
elif not ignorable:
if not ignorable:
ebytes += tchar.encode()
if len(ebytes) >= curskip:
ebytes = ebytes[curskip:]
else:
curskip -= len(ebytes)
ebytes = ""
text = ''.join(out)
return text, default_encoding

View File

@ -160,26 +160,27 @@ class SongFormat(object):
DreamBeam = 5
EasySlides = 6
EasyWorshipDB = 7
EasyWorshipService = 8
FoilPresenter = 9
Lyrix = 10
MediaShout = 11
OpenSong = 12
OPSPro = 13
PowerPraise = 14
PowerSong = 15
PresentationManager = 16
ProPresenter = 17
SongBeamer = 18
SongPro = 19
SongShowPlus = 20
SongsOfFellowship = 21
SundayPlus = 22
VideoPsalm = 23
WordsOfWorship = 24
WorshipAssistant = 25
WorshipCenterPro = 26
ZionWorx = 27
EasyWorshipSqliteDB = 8
EasyWorshipService = 9
FoilPresenter = 10
Lyrix = 11
MediaShout = 12
OpenSong = 13
OPSPro = 14
PowerPraise = 15
PowerSong = 16
PresentationManager = 17
ProPresenter = 18
SongBeamer = 19
SongPro = 20
SongShowPlus = 21
SongsOfFellowship = 22
SundayPlus = 23
VideoPsalm = 24
WordsOfWorship = 25
WorshipAssistant = 26
WorshipCenterPro = 27
ZionWorx = 28
# Set optional attribute defaults
__defaults__ = {
@ -251,9 +252,17 @@ class SongFormat(object):
'name': 'EasyWorship Song Database',
'prefix': 'ew',
'selectMode': SongFormatSelect.SingleFile,
'filter': '{text} (*.db)'.format(text=translate('SongsPlugin.ImportWizardForm',
'filter': '{text} (*.DB)'.format(text=translate('SongsPlugin.ImportWizardForm',
'EasyWorship Song Database'))
},
EasyWorshipSqliteDB: {
'class': EasyWorshipSongImport,
'name': 'EasyWorship 6 Song Database',
'prefix': 'ew',
'selectMode': SongFormatSelect.SingleFolder,
'filter': '{text} (*.db)'.format(text=translate('SongsPlugin.ImportWizardForm',
'EasyWorship 6 Song Data Directory'))
},
EasyWorshipService: {
'class': EasyWorshipSongImport,
'name': 'EasyWorship Service File',
@ -440,6 +449,7 @@ class SongFormat(object):
SongFormat.DreamBeam,
SongFormat.EasySlides,
SongFormat.EasyWorshipDB,
SongFormat.EasyWorshipSqliteDB,
SongFormat.EasyWorshipService,
SongFormat.FoilPresenter,
SongFormat.Lyrix,

View File

@ -28,6 +28,7 @@ import struct
import re
import zlib
import logging
import sqlite3
from openlp.core.lib import translate
from openlp.plugins.songs.lib import VerseType
@ -77,8 +78,10 @@ class EasyWorshipSongImport(SongImport):
"""
if self.import_source.lower().endswith('ews'):
self.import_ews()
else:
elif self.import_source.endswith('DB'):
self.import_db()
else:
self.import_sqlite_db()
def import_ews(self):
"""
@ -125,8 +128,8 @@ class EasyWorshipSongImport(SongImport):
else:
log.debug('Given ews file is of unknown version.')
return
entry_count = self.get_i32(file_pos)
entry_length = self.get_i16(file_pos + 4)
entry_count = self.ews_get_i32(file_pos)
entry_length = self.ews_get_i16(file_pos + 4)
file_pos += 6
self.import_wizard.progress_bar.setMaximum(entry_count)
# Loop over songs
@ -144,13 +147,13 @@ class EasyWorshipSongImport(SongImport):
# 0x08 = Audio, 0x09 = Web
# 1410 Song number cstring 10
self.set_defaults()
self.title = self.get_string(file_pos + 0, 50)
authors = self.get_string(file_pos + 307, 50)
copyright = self.get_string(file_pos + 358, 100)
admin = self.get_string(file_pos + 459, 50)
cont_ptr = self.get_i32(file_pos + 800)
cont_type = self.get_i32(file_pos + 820)
self.ccli_number = self.get_string(file_pos + 1410, 10)
self.title = self.ews_get_string(file_pos + 0, 50)
authors = self.ews_get_string(file_pos + 307, 50)
copyright = self.ews_get_string(file_pos + 358, 100)
admin = self.ews_get_string(file_pos + 459, 50)
cont_ptr = self.ews_get_i32(file_pos + 800)
cont_type = self.ews_get_i32(file_pos + 820)
self.ccli_number = self.ews_get_string(file_pos + 1410, 10)
# Only handle content type 1 (songs)
if cont_type != 1:
file_pos += entry_length
@ -164,9 +167,9 @@ class EasyWorshipSongImport(SongImport):
# Checksum int32be 4 Alder-32 checksum.
# (unknown) 4 0x51 0x4b 0x03 0x04
# Content length int32le 4 Length of content after decompression
content_length = self.get_i32(cont_ptr)
deflated_content = self.get_bytes(cont_ptr + 4, content_length - 10)
deflated_length = self.get_i32(cont_ptr + 4 + content_length - 6)
content_length = self.ews_get_i32(cont_ptr)
deflated_content = self.ews_get_bytes(cont_ptr + 4, content_length - 10)
deflated_length = self.ews_get_i32(cont_ptr + 4 + content_length - 6)
inflated_content = zlib.decompress(deflated_content, 15, deflated_length)
if copyright:
self.copyright = copyright
@ -196,7 +199,7 @@ class EasyWorshipSongImport(SongImport):
Import the songs from the database
"""
# Open the DB and MB files if they exist
import_source_mb = self.import_source.replace('.DB', '.MB').replace('.db', '.mb')
import_source_mb = self.import_source.replace('.DB', '.MB')
if not os.path.isfile(self.import_source):
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'This file does not exist.'))
@ -260,16 +263,16 @@ class EasyWorshipSongImport(SongImport):
for i, field_name in enumerate(field_names):
field_type, field_size = struct.unpack_from('BB', field_info, i * 2)
field_descriptions.append(FieldDescEntry(field_name, field_type, field_size))
self.set_record_struct(field_descriptions)
self.db_set_record_struct(field_descriptions)
# Pick out the field description indexes we will need
try:
success = True
fi_title = self.find_field(b'Title')
fi_author = self.find_field(b'Author')
fi_copy = self.find_field(b'Copyright')
fi_admin = self.find_field(b'Administrator')
fi_words = self.find_field(b'Words')
fi_ccli = self.find_field(b'Song Number')
fi_title = self.db_find_field(b'Title')
fi_author = self.db_find_field(b'Author')
fi_copy = self.db_find_field(b'Copyright')
fi_admin = self.db_find_field(b'Administrator')
fi_words = self.db_find_field(b'Words')
fi_ccli = self.db_find_field(b'Song Number')
except IndexError:
# This is the wrong table
success = False
@ -297,13 +300,13 @@ class EasyWorshipSongImport(SongImport):
raw_record = db_file.read(record_size)
self.fields = self.record_structure.unpack(raw_record)
self.set_defaults()
self.title = self.get_field(fi_title).decode(self.encoding)
self.title = self.db_get_field(fi_title).decode(self.encoding)
# Get remaining fields.
copy = self.get_field(fi_copy)
admin = self.get_field(fi_admin)
ccli = self.get_field(fi_ccli)
authors = self.get_field(fi_author)
words = self.get_field(fi_words)
copy = self.db_get_field(fi_copy)
admin = self.db_get_field(fi_admin)
ccli = self.db_get_field(fi_ccli)
authors = self.db_get_field(fi_author)
words = self.db_get_field(fi_words)
if copy:
self.copyright = copy.decode(self.encoding)
if admin:
@ -337,6 +340,82 @@ class EasyWorshipSongImport(SongImport):
db_file.close()
self.memo_file.close()
def _find_file(self, base_path, path_list):
"""
Find the specified file, with the option of the file being at any level in the specified directory structure.
:param base_path: the location search in
:param path_list: the targeted file, preceded by directories that may be their parents relative to the base_path
:return: path for targeted file
"""
target_file = ''
while len(path_list) > 0:
target_file = os.path.join(path_list[-1], target_file)
path_list = path_list[:len(path_list) - 1]
full_path = os.path.join(base_path, target_file)
full_path = full_path[:len(full_path) - 1]
if os.path.isfile(full_path):
return full_path
return ''
def import_sqlite_db(self):
"""
Import the songs from an EasyWorship 6 SQLite database
"""
songs_db_path = self._find_file(self.import_source, ["Databases", "Data", "Songs.db"])
song_words_db_path = self._find_file(self.import_source, ["Databases", "Data", "SongWords.db"])
invalid_dir_msg = 'This does not appear to be a valid Easy Worship 6 database directory.'
# check to see if needed files are there
if not os.path.isfile(songs_db_path):
self.log_error(songs_db_path, translate('SongsPlugin.EasyWorshipSongImport', invalid_dir_msg))
return
if not os.path.isfile(song_words_db_path):
self.log_error(song_words_db_path, translate('SongsPlugin.EasyWorshipSongImport', invalid_dir_msg))
return
# get database handles
songs_conn = sqlite3.connect(songs_db_path)
words_conn = sqlite3.connect(song_words_db_path)
if songs_conn is None or words_conn is None:
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'This is not a valid Easy Worship 6 database.'))
songs_conn.close()
words_conn.close()
return
songs_db = songs_conn.cursor()
words_db = words_conn.cursor()
if songs_conn is None or words_conn is None:
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'This is not a valid Easy Worship 6 database.'))
songs_conn.close()
words_conn.close()
return
# Take a stab at how text is encoded
self.encoding = 'cp1252'
self.encoding = retrieve_windows_encoding(self.encoding)
if not self.encoding:
log.debug('No encoding set.')
return
# import songs
songs = songs_db.execute('SELECT rowid,title,author,copyright,vendor_id FROM song;')
for song in songs:
song_id = song[0]
# keep extra copy of title for error message because error check clears it
self.title = title = song[1]
self.author = song[2]
self.copyright = song[3]
self.ccli_number = song[4]
words = words_db.execute('SELECT words FROM word WHERE song_id = ?;', (song_id,))
self.set_song_import_object(self.author, words.fetchone()[0].encode())
if not self.finish():
self.log_error(self.import_source,
translate('SongsPlugin.EasyWorshipSongImport',
'"{title}" could not be imported. {entry}').
format(title=title, entry=self.entry_error_log))
# close database handles
songs_conn.close()
words_conn.close()
return
def set_song_import_object(self, authors, words):
"""
Set the SongImport object members.
@ -409,7 +488,7 @@ class EasyWorshipSongImport(SongImport):
self.comments += str(translate('SongsPlugin.EasyWorshipSongImport',
'\n[above are Song Tags with notes imported from EasyWorship]'))
def find_field(self, field_name):
def db_find_field(self, field_name):
"""
Find a field in the descriptions
@ -417,7 +496,7 @@ class EasyWorshipSongImport(SongImport):
"""
return [i for i, x in enumerate(self.field_descriptions) if x.name == field_name][0]
def set_record_struct(self, field_descriptions):
def db_set_record_struct(self, field_descriptions):
"""
Save the record structure
@ -445,7 +524,7 @@ class EasyWorshipSongImport(SongImport):
self.record_structure = struct.Struct(''.join(fsl))
self.field_descriptions = field_descriptions
def get_field(self, field_desc_index):
def db_get_field(self, field_desc_index):
"""
Extract the field
@ -489,7 +568,7 @@ class EasyWorshipSongImport(SongImport):
else:
return 0
def get_bytes(self, pos, length):
def ews_get_bytes(self, pos, length):
"""
Get bytes from ews_file
@ -500,7 +579,7 @@ class EasyWorshipSongImport(SongImport):
self.ews_file.seek(pos)
return self.ews_file.read(length)
def get_string(self, pos, length):
def ews_get_string(self, pos, length):
"""
Get string from ews_file
@ -508,12 +587,12 @@ class EasyWorshipSongImport(SongImport):
:param length: Characters to read
:return: String read
"""
bytes = self.get_bytes(pos, length)
bytes = self.ews_get_bytes(pos, length)
mask = '<' + str(length) + 's'
byte_str, = struct.unpack(mask, bytes)
return byte_str.decode(self.encoding).replace('\0', '').strip()
def get_i16(self, pos):
def ews_get_i16(self, pos):
"""
Get short int from ews_file
@ -521,19 +600,19 @@ class EasyWorshipSongImport(SongImport):
:return: Short integer read
"""
bytes = self.get_bytes(pos, 2)
bytes = self.ews_get_bytes(pos, 2)
mask = '<h'
number, = struct.unpack(mask, bytes)
return number
def get_i32(self, pos):
def ews_get_i32(self, pos):
"""
Get long int from ews_file
:param pos: Position to read from
:return: Long integer read
"""
bytes = self.get_bytes(pos, 4)
bytes = self.ews_get_bytes(pos, 4)
mask = '<i'
number, = struct.unpack(mask, bytes)
return number

View File

@ -187,7 +187,7 @@ class TestEasyWorshipSongImport(TestCase):
def test_find_field_exists(self):
"""
Test finding an existing field in a given list using the :mod:`find_field`
Test finding an existing field in a given list using the :mod:`db_find_field`
"""
# GIVEN: A mocked out SongImport class, a mocked out "manager" and a list of field descriptions.
with patch('openlp.plugins.songs.lib.importers.easyworship.SongImport'):
@ -201,11 +201,11 @@ class TestEasyWorshipSongImport(TestCase):
for field_name in existing_fields:
# THEN: The item corresponding the index returned should have the same name attribute
self.assertEqual(importer.field_descriptions[importer.find_field(field_name)].name, field_name)
self.assertEqual(importer.field_descriptions[importer.db_find_field(field_name)].name, field_name)
def test_find_non_existing_field(self):
"""
Test finding an non-existing field in a given list using the :mod:`find_field`
Test finding an non-existing field in a given list using the :mod:`db_find_field`
"""
# GIVEN: A mocked out SongImport class, a mocked out "manager" and a list of field descriptions
with patch('openlp.plugins.songs.lib.importers.easyworship.SongImport'):
@ -218,11 +218,11 @@ class TestEasyWorshipSongImport(TestCase):
for field_name in non_existing_fields:
# THEN: The importer object should not be None
self.assertRaises(IndexError, importer.find_field, field_name)
self.assertRaises(IndexError, importer.db_find_field, field_name)
def test_set_record_struct(self):
"""
Test the :mod:`set_record_struct` module
Test the :mod:`db_set_record_struct` module
"""
# GIVEN: A mocked out SongImport class, a mocked out struct class, and a mocked out "manager" and a list of
# field descriptions
@ -231,17 +231,17 @@ class TestEasyWorshipSongImport(TestCase):
mocked_manager = MagicMock()
importer = EasyWorshipSongImport(mocked_manager, filenames=[])
# WHEN: set_record_struct is called with a list of field descriptions
return_value = importer.set_record_struct(TEST_FIELD_DESCS)
# WHEN: db_set_record_struct is called with a list of field descriptions
return_value = importer.db_set_record_struct(TEST_FIELD_DESCS)
# THEN: set_record_struct should return None and Struct should be called with a value representing
# THEN: db_set_record_struct should return None and Struct should be called with a value representing
# the list of field descriptions
self.assertIsNone(return_value, 'set_record_struct should return None')
self.assertIsNone(return_value, 'db_set_record_struct should return None')
mocked_struct.Struct.assert_called_with('>50sHIB250s250s10sQ')
def test_get_field(self):
"""
Test the :mod:`get_field` module
Test the :mod:`db_get_field` module
"""
# GIVEN: A mocked out SongImport class, a mocked out "manager", an encoding and some test data and known results
with patch('openlp.plugins.songs.lib.importers.easyworship.SongImport'):
@ -254,16 +254,16 @@ class TestEasyWorshipSongImport(TestCase):
# WHEN: Called with test data
for field_index, result in field_results:
return_value = importer.get_field(field_index)
return_value = importer.db_get_field(field_index)
# THEN: get_field should return the known results
# THEN: db_get_field should return the known results
self.assertEqual(return_value, result,
'get_field should return "%s" when called with "%s"' %
'db_get_field should return "%s" when called with "%s"' %
(result, TEST_FIELDS[field_index]))
def test_get_memo_field(self):
"""
Test the :mod:`get_field` module
Test the :mod:`db_get_field` module
"""
for test_results in GET_MEMO_FIELD_TEST_RESULTS:
# GIVEN: A mocked out SongImport class, a mocked out "manager", a mocked out memo_file and an encoding
@ -283,8 +283,9 @@ class TestEasyWorshipSongImport(TestCase):
get_field_read_calls = test_results[2]['read']
get_field_seek_calls = test_results[2]['seek']
# THEN: get_field should return the appropriate value with the appropriate mocked objects being called
self.assertEqual(importer.get_field(field_index), get_field_result)
# THEN: db_get_field should return the appropriate value with the appropriate mocked objects being
# called
self.assertEqual(importer.db_get_field(field_index), get_field_result)
for call in get_field_read_calls:
mocked_memo_file.read.assert_any_call(call)
for call in get_field_seek_calls:
@ -405,6 +406,12 @@ class TestEasyWorshipSongImport(TestCase):
mocked_retrieve_windows_encoding.assert_any_call(encoding)
def test_db_file_import(self):
return self._test_db_file_import(os.path.join(TEST_PATH, 'Songs.DB'))
def test_sqlite_db_file_import(self):
return self._test_db_file_import(os.path.join(TEST_PATH, 'ew6'))
def _test_db_file_import(self, source_path):
"""
Test the actual import of real song database files and check that the imported data is correct.
"""
@ -432,7 +439,7 @@ class TestEasyWorshipSongImport(TestCase):
importer.topics = []
# WHEN: Importing each file
importer.import_source = os.path.join(TEST_PATH, 'Songs.DB')
importer.import_source = source_path
import_result = importer.do_import()
# THEN: do_import should return none, the song data should be as expected, and finish should have been