Support for importing ews files (EasyWorship Service files)

bzr-revno: 2372
This commit is contained in:
Tomas Groth 2014-04-29 17:54:14 +01:00 committed by Tim Bentley
commit cc0f0e1eb0
4 changed files with 312 additions and 74 deletions

View File

@ -34,13 +34,13 @@ EasyWorship song databases into the current installation database.
import os
import struct
import re
import zlib
from openlp.core.lib import translate
from openlp.plugins.songs.lib import VerseType
from openlp.plugins.songs.lib import retrieve_windows_encoding, strip_rtf
from .songimport import SongImport
RTF_STRIPPING_REGEX = re.compile(r'\{\\tx[^}]*\}')
# regex: at least two newlines, can have spaces between them
SLIDE_BREAK_REGEX = re.compile(r'\n *?\n[\n ]*')
NUMBER_REGEX = re.compile(r'[0-9]+')
@ -77,9 +77,121 @@ class EasyWorshipSongImport(SongImport):
def do_import(self):
"""
Import the songs
Determines the type of file to import and calls the appropiate method
"""
if self.import_source.lower().endswith('ews'):
self.import_ews()
else:
self.import_db()
:return:
def import_ews(self):
"""
Import the songs from service file
The full spec of the ews files can be found here:
https://github.com/meinders/lithium-ews/blob/master/docs/ews%20file%20format.md
or here: http://wiki.openlp.org/Development:EasyWorship_EWS_Format
"""
# Open ews file if it exists
if not os.path.isfile(self.import_source):
log.debug('Given ews file does not exists.')
return
# Make sure there is room for at least a header and one entry
if os.path.getsize(self.import_source) < 892:
log.debug('Given ews file is to small to contain valid data.')
return
# Take a stab at how text is encoded
self.encoding = 'cp1252'
self.encoding = retrieve_windows_encoding(self.encoding)
if not self.encoding:
log.debug('No encoding set.')
return
self.ews_file = open(self.import_source, 'rb')
# EWS header, version '1.6'/' 3'/' 5':
# Offset Field Data type Length Details
# --------------------------------------------------------------------------------------------------
# 0 Filetype string 38 Specifies the file type and version.
# "EasyWorship Schedule File Version 1.6" or
# "EasyWorship Schedule File Version 3" or
# "EasyWorship Schedule File Version 5"
# 40/48/56 Entry count int32le 4 Number of items in the schedule
# 44/52/60 Entry length int16le 2 Length of schedule entries: 0x0718 = 1816
# Get file version
type, = struct.unpack('<38s', self.ews_file.read(38))
version = type.decode()[-3:]
# Set fileposition based on filetype/version
file_pos = 0
if version == ' 5':
file_pos = 56
elif version == ' 3':
file_pos = 48
elif version == '1.6':
file_pos = 40
else:
log.debug('Given ews file is of unknown version.')
return
entry_count = self.get_i32(file_pos)
entry_length = self.get_i16(file_pos+4)
file_pos += 6
self.import_wizard.progress_bar.setMaximum(entry_count)
# Loop over songs
for i in range(entry_count):
# Load EWS entry metadata:
# Offset Field Data type Length Details
# ------------------------------------------------------------------------------------------------
# 0 Title cstring 50
# 307 Author cstring 50
# 358 Copyright cstring 100
# 459 Administrator cstring 50
# 800 Content pointer int32le 4 Position of the content for this entry.
# 820 Content type int32le 4 0x01 = Song, 0x02 = Scripture, 0x03 = Presentation,
# 0x04 = Video, 0x05 = Live video, 0x07 = Image,
# 0x08 = Audio, 0x09 = Web
# 1410 Song number cstring 10
self.set_defaults()
self.title = self.get_string(file_pos + 0, 50)
authors = self.get_string(file_pos + 307, 50)
copyright = self.get_string(file_pos + 358, 100)
admin = self.get_string(file_pos + 459, 50)
cont_ptr = self.get_i32(file_pos + 800)
cont_type = self.get_i32(file_pos + 820)
self.ccli_number = self.get_string(file_pos + 1410, 10)
# Only handle content type 1 (songs)
if cont_type != 1:
file_pos += entry_length
continue
# Load song content
# Offset Field Data type Length Details
# ------------------------------------------------------------------------------------------------
# 0 Length int32le 4 Length (L) of content, including the compressed content
# and the following fields (14 bytes total).
# 4 Content string L-14 Content compressed with deflate.
# Checksum int32be 4 Alder-32 checksum.
# (unknown) 4 0x51 0x4b 0x03 0x04
# Content length int32le 4 Length of content after decompression
content_length = self.get_i32(cont_ptr)
deflated_content = self.get_bytes(cont_ptr + 4, content_length - 10)
deflated_length = self.get_i32(cont_ptr + 4 + content_length - 6)
inflated_content = zlib.decompress(deflated_content, 15, deflated_length)
if copyright:
self.copyright = copyright
if admin:
if copyright:
self.copyright += ', '
self.copyright += translate('SongsPlugin.EasyWorshipSongImport',
'Administered by %s') % admin
# Set the SongImport object members.
self.set_song_import_object(authors, inflated_content)
if self.stop_import_flag:
break
if not self.finish():
self.log_error(self.import_source)
# Set file_pos for next entry
file_pos += entry_length
self.ews_file.close()
def import_db(self):
"""
Import the songs from the database
"""
# Open the DB and MB files if they exist
import_source_mb = self.import_source.replace('.DB', '.MB')
@ -176,7 +288,6 @@ class EasyWorshipSongImport(SongImport):
ccli = self.get_field(fi_ccli)
authors = self.get_field(fi_author)
words = self.get_field(fi_words)
# Set the SongImport object members.
if copy:
self.copyright = copy.decode()
if admin:
@ -187,55 +298,11 @@ class EasyWorshipSongImport(SongImport):
if ccli:
self.ccli_number = ccli.decode()
if authors:
# Split up the authors
author_list = authors.split(b'/')
if len(author_list) < 2:
author_list = authors.split(b';')
if len(author_list) < 2:
author_list = authors.split(b',')
for author_name in author_list:
self.add_author(author_name.decode().strip())
if words:
# Format the lyrics
result = strip_rtf(words.decode(), self.encoding)
if result is None:
return
words, self.encoding = result
verse_type = VerseType.tags[VerseType.Verse]
for verse in SLIDE_BREAK_REGEX.split(words):
verse = verse.strip()
if not verse:
continue
verse_split = verse.split('\n', 1)
first_line_is_tag = False
# EW tags: verse, chorus, pre-chorus, bridge, tag,
# intro, ending, slide
for tag in VerseType.tags + ['tag', 'slide']:
tag = tag.lower()
ew_tag = verse_split[0].strip().lower()
if ew_tag.startswith(tag):
verse_type = tag[0]
if tag == 'tag' or tag == 'slide':
verse_type = VerseType.tags[VerseType.Other]
first_line_is_tag = True
number_found = False
# check if tag is followed by number and/or note
if len(ew_tag) > len(tag):
match = NUMBER_REGEX.search(ew_tag)
if match:
number = match.group()
verse_type += number
number_found = True
match = NOTE_REGEX.search(ew_tag)
if match:
self.comments += ew_tag + '\n'
if not number_found:
verse_type += '1'
break
self.add_verse(verse_split[-1].strip() if first_line_is_tag else verse, verse_type)
if len(self.comments) > 5:
self.comments += str(translate('SongsPlugin.EasyWorshipSongImport',
'\n[above are Song Tags with notes imported from EasyWorship]'))
authors = authors.decode()
else:
authors = ''
# Set the SongImport object members.
self.set_song_import_object(authors, words)
if self.stop_import_flag:
break
if not self.finish():
@ -243,12 +310,69 @@ class EasyWorshipSongImport(SongImport):
db_file.close()
self.memo_file.close()
def set_song_import_object(self, authors, words):
"""
Set the SongImport object members.
:param authors: String with authons
:param words: Bytes with rtf-encoding
"""
if authors:
# Split up the authors
author_list = authors.split('/')
if len(author_list) < 2:
author_list = authors.split(';')
if len(author_list) < 2:
author_list = authors.split(',')
for author_name in author_list:
self.add_author(author_name.strip())
if words:
# Format the lyrics
result = strip_rtf(words.decode(), self.encoding)
if result is None:
return
words, self.encoding = result
verse_type = VerseType.tags[VerseType.Verse]
for verse in SLIDE_BREAK_REGEX.split(words):
verse = verse.strip()
if not verse:
continue
verse_split = verse.split('\n', 1)
first_line_is_tag = False
# EW tags: verse, chorus, pre-chorus, bridge, tag,
# intro, ending, slide
for tag in VerseType.tags + ['tag', 'slide']:
tag = tag.lower()
ew_tag = verse_split[0].strip().lower()
if ew_tag.startswith(tag):
verse_type = tag[0]
if tag == 'tag' or tag == 'slide':
verse_type = VerseType.tags[VerseType.Other]
first_line_is_tag = True
number_found = False
# check if tag is followed by number and/or note
if len(ew_tag) > len(tag):
match = NUMBER_REGEX.search(ew_tag)
if match:
number = match.group()
verse_type += number
number_found = True
match = NOTE_REGEX.search(ew_tag)
if match:
self.comments += ew_tag + '\n'
if not number_found:
verse_type += '1'
break
self.add_verse(verse_split[-1].strip() if first_line_is_tag else verse, verse_type)
if len(self.comments) > 5:
self.comments += str(translate('SongsPlugin.EasyWorshipSongImport',
'\n[above are Song Tags with notes imported from EasyWorship]'))
def find_field(self, field_name):
"""
Find a field in the descriptions
:param field_name: field to find
:return:
"""
return [i for i, x in enumerate(self.field_descriptions) if x.name == field_name][0]
@ -285,7 +409,7 @@ class EasyWorshipSongImport(SongImport):
Extract the field
:param field_desc_index: Field index value
:return:
:return: The field value
"""
field = self.fields[field_desc_index]
field_desc = self.field_descriptions[field_desc_index]
@ -323,3 +447,52 @@ class EasyWorshipSongImport(SongImport):
return self.memo_file.read(blob_size)
else:
return 0
def get_bytes(self, pos, length):
"""
Get bytes from ews_file
:param pos: Position to read from
:param length: Bytes to read
:return: Bytes read
"""
self.ews_file.seek(pos)
return self.ews_file.read(length)
def get_string(self, pos, length):
"""
Get string from ews_file
:param pos: Position to read from
:param length: Characters to read
:return: String read
"""
bytes = self.get_bytes(pos, length)
mask = '<' + str(length) + 's'
byte_str, = struct.unpack(mask, bytes)
return byte_str.decode('unicode-escape').replace('\0', '').strip()
def get_i16(self, pos):
"""
Get short int from ews_file
:param pos: Position to read from
:return: Short integer read
"""
bytes = self.get_bytes(pos, 2)
mask = '<h'
number, = struct.unpack(mask, bytes)
return number
def get_i32(self, pos):
"""
Get long int from ews_file
:param pos: Position to read from
:return: Long integer read
"""
bytes = self.get_bytes(pos, 4)
mask = '<i'
number, = struct.unpack(mask, bytes)
return number

View File

@ -153,19 +153,20 @@ class SongFormat(object):
CCLI = 3
DreamBeam = 4
EasySlides = 5
EasyWorship = 6
FoilPresenter = 7
MediaShout = 8
OpenSong = 9
PowerSong = 10
SongBeamer = 11
SongPro = 12
SongShowPlus = 13
SongsOfFellowship = 14
SundayPlus = 15
WordsOfWorship = 16
WorshipCenterPro = 17
ZionWorx = 18
EasyWorshipDB = 6
EasyWorshipService = 7
FoilPresenter = 8
MediaShout = 9
OpenSong = 10
PowerSong = 11
SongBeamer = 12
SongPro = 13
SongShowPlus = 14
SongsOfFellowship = 15
SundayPlus = 16
WordsOfWorship = 17
WorshipCenterPro = 18
ZionWorx = 19
# Set optional attribute defaults
__defaults__ = {
@ -224,13 +225,20 @@ class SongFormat(object):
'selectMode': SongFormatSelect.SingleFile,
'filter': '%s (*.xml)' % translate('SongsPlugin.ImportWizardForm', 'EasySlides XML File')
},
EasyWorship: {
EasyWorshipDB: {
'class': EasyWorshipSongImport,
'name': 'EasyWorship',
'name': 'EasyWorship Song Database',
'prefix': 'ew',
'selectMode': SongFormatSelect.SingleFile,
'filter': '%s (*.db)' % translate('SongsPlugin.ImportWizardForm', 'EasyWorship Song Database')
},
EasyWorshipService: {
'class': EasyWorshipSongImport,
'name': 'EasyWorship Service File',
'prefix': 'ew',
'selectMode': SongFormatSelect.SingleFile,
'filter': '%s (*.ews)' % translate('SongsPlugin.ImportWizardForm', 'EasyWorship Service File')
},
FoilPresenter: {
'class': FoilPresenterImport,
'name': 'Foilpresenter',
@ -341,7 +349,8 @@ class SongFormat(object):
SongFormat.CCLI,
SongFormat.DreamBeam,
SongFormat.EasySlides,
SongFormat.EasyWorship,
SongFormat.EasyWorshipDB,
SongFormat.EasyWorshipService,
SongFormat.FoilPresenter,
SongFormat.MediaShout,
SongFormat.OpenSong,

View File

@ -69,6 +69,20 @@ SONG_TEST_DATA = [
'Just to bow and receive a new blessing,\nIn the beautiful garden of prayer.', 'v3')],
'verse_order_list': []}]
EWS_SONG_TEST_DATA =\
{'title': 'Vi pløjed og vi så\'de',
'authors': ['Matthias Claudius'],
'verses':
[('Vi pløjed og vi så\'de\nvor sæd i sorten jord,\nså bad vi ham os hjælpe,\nsom højt i Himlen bor,\n'
'og han lod snefald hegne\nmod frosten barsk og hård,\nhan lod det tø og regne\nog varme mildt i vår.',
'v1'),
('Alle gode gaver\nde kommer ovenned,\nså tak da Gud, ja, pris dog Gud\nfor al hans kærlighed!', 'c1'),
('Han er jo den, hvis vilje\nopholder alle ting,\nhan klæder markens lilje\nog runder himlens ring,\n'
'ham lyder vind og vove,\nham rører ravnes nød,\nhvi skulle ej hans småbørn\nda og få dagligt brød?', 'v2'),
('Ja, tak, du kære Fader,\nså mild, så rig, så rund,\nfor korn i hæs og lader,\nfor godt i allen stund!\n'
'Vi kan jo intet give,\nsom nogen ting er værd,\nmen tag vort stakkels hjerte,\nså ringe som det er!',
'v3')]}
class EasyWorshipSongImportLogger(EasyWorshipSongImport):
"""
@ -357,9 +371,9 @@ class TestEasyWorshipSongImport(TestCase):
self.assertIsNone(importer.do_import(), 'do_import should return None when db_size is less than 0x800')
mocked_retrieve_windows_encoding.assert_call(encoding)
def file_import_test(self):
def db_file_import_test(self):
"""
Test the actual import of real song files and check that the imported data is correct.
Test the actual import of real song database files and check that the imported data is correct.
"""
# GIVEN: Test files with a mocked out SongImport class, a mocked out "manager", a mocked out "import_wizard",
@ -386,10 +400,11 @@ class TestEasyWorshipSongImport(TestCase):
# WHEN: Importing each file
importer.import_source = os.path.join(TEST_PATH, 'Songs.DB')
import_result = importer.do_import()
# THEN: do_import should return none, the song data should be as expected, and finish should have been
# called.
self.assertIsNone(importer.do_import(), 'do_import should return None when it has completed')
self.assertIsNone(import_result, 'do_import should return None when it has completed')
for song_data in SONG_TEST_DATA:
title = song_data['title']
author_calls = song_data['authors']
@ -411,3 +426,44 @@ class TestEasyWorshipSongImport(TestCase):
self.assertEqual(importer.verse_order_list, verse_order_list,
'verse_order_list for %s should be %s' % (title, verse_order_list))
mocked_finish.assert_called_with()
def ews_file_import_test(self):
"""
Test the actual import of song from ews file and check that the imported data is correct.
"""
# GIVEN: Test files with a mocked out SongImport class, a mocked out "manager", a mocked out "import_wizard",
# and mocked out "author", "add_copyright", "add_verse", "finish" methods.
with patch('openlp.plugins.songs.lib.ewimport.SongImport'), \
patch('openlp.plugins.songs.lib.ewimport.retrieve_windows_encoding') \
as mocked_retrieve_windows_encoding:
mocked_retrieve_windows_encoding.return_value = 'cp1252'
mocked_manager = MagicMock()
mocked_import_wizard = MagicMock()
mocked_add_author = MagicMock()
mocked_add_verse = MagicMock()
mocked_finish = MagicMock()
mocked_title = MagicMock()
mocked_finish.return_value = True
importer = EasyWorshipSongImportLogger(mocked_manager)
importer.import_wizard = mocked_import_wizard
importer.stop_import_flag = False
importer.add_author = mocked_add_author
importer.add_verse = mocked_add_verse
importer.title = mocked_title
importer.finish = mocked_finish
importer.topics = []
# WHEN: Importing ews file
importer.import_source = os.path.join(TEST_PATH, 'test1.ews')
import_result = importer.do_import()
# THEN: do_import should return none, the song data should be as expected, and finish should have been
# called.
title = EWS_SONG_TEST_DATA['title']
self.assertIsNone(import_result, 'do_import should return None when it has completed')
self.assertIn(title, importer._title_assignment_list, 'title for should be "%s"' % title)
mocked_add_author.assert_any_call(EWS_SONG_TEST_DATA['authors'][0])
for verse_text, verse_tag in EWS_SONG_TEST_DATA['verses']:
mocked_add_verse.assert_any_call(verse_text, verse_tag)
mocked_finish.assert_called_with()

Binary file not shown.