Word of Worship file importer fixes / rework

This commit is contained in:
Phill 2019-06-28 19:09:25 +01:00
parent 0653f2d64f
commit f79e8803c9
18 changed files with 463 additions and 95 deletions

View File

@ -385,7 +385,8 @@ class UiStrings(object):
self.Error = translate('OpenLP.Ui', 'Error')
self.Export = translate('OpenLP.Ui', 'Export')
self.File = translate('OpenLP.Ui', 'File')
self.FontSizePtUnit = translate('OpenLP.Ui', 'pt', 'Abbreviated font pointsize unit')
self.FileCorrupt = translate('OpenLP.Ui', 'File appears to be corrupt.')
self.FontSizePtUnit = translate('OpenLP.Ui', 'pt', 'Abbreviated font point size unit')
self.Help = translate('OpenLP.Ui', 'Help')
self.Hours = translate('OpenLP.Ui', 'h', 'The abbreviated unit for hours')
self.IFdSs = translate('OpenLP.Ui', 'Invalid Folder Selected', 'Singular')

View File

@ -24,15 +24,23 @@ The :mod:`lib` module contains most of the components and libraries that make
OpenLP work.
"""
import logging
import os
from enum import IntEnum
from pathlib import Path
from PyQt5 import QtCore, QtGui, QtWidgets
from openlp.core.common.i18n import translate
from openlp.core.common.i18n import UiStrings, translate
log = logging.getLogger(__name__ + '.__init__')
class DataType(IntEnum):
U8 = 1
U16 = 2
U32 = 4
class ServiceItemContext(object):
"""
The context in which a Service Item is being generated
@ -396,3 +404,48 @@ def create_separated_list(string_list):
else:
list_to_string = ''
return list_to_string
def read_or_fail(file_object, length):
"""
Ensure that the data read is as the exact length requested. Otherwise raise an OSError.
:param io.IOBase file_object: The file-lke object ot read from.
:param int length: The length of the data to read.
:return: The data read.
"""
data = file_object.read(length)
if len(data) != length:
raise OSError(UiStrings().FileCorrupt)
return data
def read_int(file_object, data_type, endian='big'):
"""
Read the correct amount of data from a file-like object to decode it to the specified type.
:param io.IOBase file_object: The file-like object to read from.
:param DataType data_type: A member from the :enum:`DataType`
:param endian: The endianess of the data to be read
:return int: The decoded int
"""
data = read_or_fail(file_object, data_type)
return int.from_bytes(data, endian)
def seek_or_fail(file_object, offset, how=os.SEEK_SET):
"""
See to a set position and return an error if the cursor has not moved to that position.
:param io.IOBase file_object: The file-like object to attempt to seek.
:param int offset: The offset / position to seek by / to.
:param [os.SEEK_CUR | os.SEEK_SET how: Currently only supports os.SEEK_CUR (0) or os.SEEK_SET (1)
:return int: The new position in the file.
"""
if how not in (os.SEEK_CUR, os.SEEK_SET):
raise NotImplementedError
prev_pos = file_object.tell()
new_pos = file_object.seek(offset, how)
if how == os.SEEK_SET and new_pos != offset or how == os.SEEK_CUR and new_pos != prev_pos + offset:
raise OSError(UiStrings().FileCorrupt)
return new_pos

View File

@ -84,7 +84,7 @@ class FormattingTagController(object):
'desc': desc,
'start tag': '{{{tag}}}'.format(tag=tag),
'start html': start_html,
'end tag': '{{{tag}}}'.format(tag=tag),
'end tag': '{{/{tag}}}'.format(tag=tag),
'end html': end_html,
'protected': False,
'temporary': False

View File

@ -353,7 +353,7 @@ class PathEdit(QtWidgets.QWidget):
:rtype: None
"""
if self._path != path:
self._path = path
self.path = path
self.pathChanged.emit(path)

View File

@ -26,7 +26,8 @@ Worship songs into the OpenLP database.
import logging
import os
from openlp.core.common.i18n import translate
from openlp.core.common.i18n import UiStrings, translate
from openlp.core.lib import DataType, read_int, read_or_fail, seek_or_fail
from openlp.plugins.songs.lib.importers.songimport import SongImport
@ -48,52 +49,138 @@ class WordsOfWorshipImport(SongImport):
the author and the copyright.
* A block can be a verse, chorus or bridge.
Little endian is used.
File Header:
Bytes are counted from one, i.e. the first byte is byte 1. The first 19
bytes should be "WoW File \\nSong Words" The bytes after this and up to
the 56th byte, can change but no real meaning has been found. The
56th byte specifies how many blocks there are. The first block starts
with byte 83 after the "CSongDoc::CBlock" declaration.
Bytes are counted from one, i.e. the first byte is byte 1.
0x00 - 0x13 Should be "WoW File \nSong Words\n"
0x14 - 0x1F Minimum version of Words Of Worship required to open this file
0x20 - 0x2B Minimum version of Words Of Worship required to save this file without data loss
0x2C - 0x37 The version of Words of Worship that this file is from. From test data, it looks like this might be
the version that originally created this file, not the last version to save it.
The Words Of Worship versioning system seems to be in the format:
``Major.Minor.Patch``
Where each part of the version number is stored by a 32-bit int
0x38 - 0x3B Specifies how many blocks there are.
0x42 - 0x51 Should be "CSongDoc::CBlock"
0x52 The first song blocks start from here.
Blocks:
Each block has a starting header, some lines of text, and an ending
footer. Each block starts with a 32 bit number, which specifies how
many lines are in that block.
Each block starts with a 32-bit int which specifies how many lines are in that block.
Then there are a number of lines corresponding to the value above.
Each block ends with a 32 bit number, which defines what type of
block it is:
* ``NUL`` (0x00) - Verse
* ``SOH`` (0x01) - Chorus
* ``STX`` (0x02) - Bridge
* 0x00000000 = Verse
* 0x01000000 = Chorus
* 0x02000000 = Bridge
Blocks are separated by two bytes. The first byte is 0x01, and the
second byte is 0x80.
Lines:
Each line starts with a byte which specifies how long that line is,
the line text, and ends with a null byte.
Each line consists of a "Pascal" string.
In later versions, a byte follows which denotes the formatting of the line:
* 0x00 = Normal
* 0x01 = Minor
It looks like this may have been introduced in Words of Worship song version 1.2.2, though this is an educated
guess.
Footer:
The footer follows on after the last block, the first byte specifies
the length of the author text, followed by the author text, if
this byte is null, then there is no author text. The byte after the
author text specifies the length of the copyright text, followed
by the copyright text.
The footer follows on after the last block. Its format is as follows:
The file is ended with four null bytes.
Author String (as a 'Pascal' string)
Copyright String (as a 'Pascal' string)
Finally in newer versions of Word Of Worship song files there is a 32 bit int describing the copyright.
0x00000000 = Covered by CCL
0x01000000 = Authors explicit permission
0x02000000 = Public Domain
0x03000000 = Copyright expired
0x04000000 = Other
Pascal Strings:
Strings are preceded by a variable length integer which specifies how many bytes are in the string. An example
of the variable length integer is below.
Lentgh bytes 'Little'| Str len
-------------------------------
01 | 01
02 | 02
.... |
FD | FD
FE | FE
FF FF 00 | FF
FF 00 01 | 01 00
FF 01 01 | 01 01
FF 02 01 | 01 02
.... |
FF FC FF | FF FC
FF FD FF | FF FD
FF FF FF FE FF | FF FE
FF FF FF FF FF 00 00 | FF FF
FF FF FF 00 00 01 00 | 01 00 00
FF FF FF 01 00 01 00 | 01 00 01
FF FF FF 02 00 02 00 | 01 00 02
Valid extensions for a Words of Worship song file are:
* .wsg
* .wow-song
"""
@staticmethod
def parse_string(song_data):
length_bytes = song_data.read(DataType.U8)
if length_bytes == b'\xff':
length_bytes = song_data.read(DataType.U16)
length = int.from_bytes(length_bytes, 'little')
return read_or_fail(song_data, length).decode('cp1252')
def __init__(self, manager, **kwargs):
"""
Initialise the Words of Worship importer.
"""
super(WordsOfWorshipImport, self).__init__(manager, **kwargs)
def parse_lines(self, song_data):
lines = []
lines_to_read = read_int(song_data, DataType.U32, 'little')
for line_no in range(0, lines_to_read):
line_text = self.parse_string(song_data)
if self.read_version >= (1, 2, 2):
if read_or_fail(song_data, DataType.U8) == b'\x01':
line_text = '{{minor}}{text}{{/minor}}'.format(text=line_text)
lines.append(line_text)
return '\n'.join(lines)
@staticmethod
def parse_version(song_data):
return (read_int(song_data, DataType.U32, 'little'),
read_int(song_data, DataType.U32, 'little'),
read_int(song_data, DataType.U32, 'little'))
def vaildate(self, file_path, song_data):
seek_or_fail(song_data, 0x00)
err_text = b''
data = read_or_fail(song_data, 20)
if data != b'WoW File\nSong Words\n':
err_text = data
seek_or_fail(song_data, 0x42)
data = read_or_fail(song_data, 16)
if data != b'CSongDoc::CBlock':
err_text = data
if err_text:
self.log_error(file_path,
translate('SongsPlugin.WordsofWorshipSongImport',
'Invalid Words of Worship song file. Missing {text!r} header.'
).format(text=err_text))
return False
return True
def do_import(self):
"""
@ -104,57 +191,37 @@ class WordsOfWorshipImport(SongImport):
for file_path in self.import_source:
if self.stop_import_flag:
return
self.set_defaults()
with file_path.open('rb') as song_data:
if song_data.read(19).decode() != 'WoW File\nSong Words':
self.log_error(file_path,
translate('SongsPlugin.WordsofWorshipSongImport',
'Invalid Words of Worship song file. Missing "{text}" '
'header.').format(text='WoW File\\nSong Words'))
continue
# Seek to byte which stores number of blocks in the song
song_data.seek(56)
no_of_blocks = ord(song_data.read(1))
song_data.seek(66)
if song_data.read(16).decode() != 'CSongDoc::CBlock':
self.log_error(file_path,
translate('SongsPlugin.WordsofWorshipSongImport',
'Invalid Words of Worship song file. Missing "{text}" '
'string.').format(text='CSongDoc::CBlock'))
continue
# Seek to the beginning of the first block
song_data.seek(82)
for block in range(no_of_blocks):
skip_char_at_end = True
self.lines_to_read = ord(song_data.read(4)[:1])
block_text = ''
while self.lines_to_read:
self.line_text = str(song_data.read(ord(song_data.read(1))), 'cp1252')
if skip_char_at_end:
skip_char = ord(song_data.read(1))
# Check if we really should skip a char. In some wsg files we shouldn't
if skip_char != 0:
song_data.seek(-1, os.SEEK_CUR)
skip_char_at_end = False
if block_text:
block_text += '\n'
block_text += self.line_text
self.lines_to_read -= 1
block_type = BLOCK_TYPES[ord(song_data.read(4)[:1])]
# Blocks are separated by 2 bytes, skip them, but not if
# this is the last block!
if block + 1 < no_of_blocks:
song_data.seek(2, os.SEEK_CUR)
self.add_verse(block_text, block_type)
# Now to extract the author
author_length = ord(song_data.read(1))
if author_length:
self.parse_author(str(song_data.read(author_length), 'cp1252'))
# Finally the copyright
copyright_length = ord(song_data.read(1))
if copyright_length:
self.add_copyright(str(song_data.read(copyright_length), 'cp1252'))
log.debug('Importing %s', file_path)
try:
self.set_defaults()
# Get the song title
self.title = file_path.stem
if not self.finish():
self.log_error(file_path)
with file_path.open('rb') as song_data:
if not self.vaildate(file_path, song_data):
continue
seek_or_fail(song_data, 24)
self.read_version = self.parse_version(song_data)
# Seek to byte which stores number of blocks in the song
seek_or_fail(song_data, 56)
no_of_blocks = read_int(song_data, DataType.U8)
# Seek to the beginning of the first block
seek_or_fail(song_data, 82)
for block_no in range(no_of_blocks):
# Blocks are separated by 2 bytes, skip them, but not if this is the last block!
if block_no != 0:
seek_or_fail(song_data, 2, os.SEEK_CUR)
text = self.parse_lines(song_data)
block_type = BLOCK_TYPES[read_int(song_data, DataType.U32, 'little')]
self.add_verse(text, block_type)
# Now to extract the author
self.parse_author(self.parse_string(song_data))
# Finally the copyright
self.add_copyright(self.parse_string(song_data))
if not self.finish():
self.log_error(file_path)
except IndexError:
self.log_error(file_path, UiStrings().FileCorrupt)
except Exception as e:
self.log_error(file_path, e)

View File

@ -336,7 +336,7 @@ class OpenLyrics(object):
:return: the lyrics with the converted chords
"""
# Process chords.
new_text = re.sub(r'\[(\w.*?)\]', r'<chord name="\1"/>', text)
new_text = re.sub(r'\[(?!CDATA)(\w.*?)\]', r'<chord name="\1"/>', text)
return new_text
def _get_missing_tags(self, text):

View File

@ -22,14 +22,16 @@
"""
Package to test the openlp.core.lib package.
"""
import io
import os
from pathlib import Path
from unittest import TestCase
from unittest.mock import MagicMock, patch
from PyQt5 import QtCore, QtGui
from openlp.core.lib import build_icon, check_item_selected, create_separated_list, create_thumb, \
get_text_file_string, image_to_byte, resize_image, str_to_bool, validate_thumb
from openlp.core.lib import DataType, build_icon, check_item_selected, create_separated_list, create_thumb, \
get_text_file_string, image_to_byte, read_or_fail, read_int, resize_image, seek_or_fail, str_to_bool, validate_thumb
from tests.utils.constants import RESOURCE_PATH
@ -680,3 +682,179 @@ class TestLib(TestCase):
# THEN: We should have "Author 1, Author 2 and Author 3"
assert string_result == 'Author 1, Author 2 and Author 3', \
'The string should be "Author 1, Author 2, and Author 3".'
def test_read_or_fail_fail(self):
"""
Test the :func:`read_or_fail` function when attempting to read more data than the buffer contains.
"""
# GIVEN: Some test data
test_data = io.BytesIO(b'test data')
# WHEN: Attempting to read past the end of the buffer
# THEN: An OSError should be raised.
with self.assertRaises(OSError):
read_or_fail(test_data, 15)
def test_read_or_fail_success(self):
"""
Test the :func:`read_or_fail` function when reading data that is in the buffer.
"""
# GIVEN: Some test data
test_data = io.BytesIO(b'test data')
# WHEN: Attempting to read data that should exist.
result = read_or_fail(test_data, 4)
# THEN: The data of the requested length should be returned
assert result == b'test'
def test_read_int_u8_big(self):
"""
Test the :func:`read_int` function when reading an unsigned 8-bit int using 'big' endianness.
"""
# GIVEN: Some test data
test_data = io.BytesIO(b'\x0f\xf0\x0f\xf0')
# WHEN: Reading a an unsigned 8-bit int
result = read_int(test_data, DataType.U8, 'big')
# THEN: The an int should have been returned of the expected value
assert result == 15
def test_read_int_u8_little(self):
"""
Test the :func:`read_int` function when reading an unsigned 8-bit int using 'little' endianness.
"""
# GIVEN: Some test data
test_data = io.BytesIO(b'\x0f\xf0\x0f\xf0')
# WHEN: Reading a an unsigned 8-bit int
result = read_int(test_data, DataType.U8, 'little')
# THEN: The an int should have been returned of the expected value
assert result == 15
def test_read_int_u16_big(self):
"""
Test the :func:`read_int` function when reading an unsigned 16-bit int using 'big' endianness.
"""
# GIVEN: Some test data
test_data = io.BytesIO(b'\x0f\xf0\x0f\xf0')
# WHEN: Reading a an unsigned 16-bit int
result = read_int(test_data, DataType.U16, 'big')
# THEN: The an int should have been returned of the expected value
assert result == 4080
def test_read_int_u16_little(self):
"""
Test the :func:`read_int` function when reading an unsigned 16-bit int using 'little' endianness.
"""
# GIVEN: Some test data
test_data = io.BytesIO(b'\x0f\xf0\x0f\xf0')
# WHEN: Reading a an unsigned 16-bit int
result = read_int(test_data, DataType.U16, 'little')
# THEN: The an int should have been returned of the expected value
assert result == 61455
def test_read_int_u32_big(self):
"""
Test the :func:`read_int` function when reading an unsigned 32-bit int using 'big' endianness.
"""
# GIVEN: Some test data
test_data = io.BytesIO(b'\x0f\xf0\x0f\xf0')
# WHEN: Reading a an unsigned 32-bit int
result = read_int(test_data, DataType.U32, 'big')
# THEN: The an int should have been returned of the expected value
assert result == 267390960
def test_read_int_u32_little(self):
"""
Test the :func:`read_int` function when reading an unsigned 32-bit int using 'little' endianness.
"""
# GIVEN: Some test data
test_data = io.BytesIO(b'\x0f\xf0\x0f\xf0')
# WHEN: Reading a an unsigned 32-bit int
result = read_int(test_data, DataType.U32, 'little')
# THEN: The an int should have been returned of the expected value
assert result == 4027576335
def test_seek_or_fail_default_method(self):
"""
Test the :func:`seek_or_fail` function when using the default value for the :arg:`how`
"""
# GIVEN: A mocked_file_like_object
mocked_file_like_object = MagicMock(**{'seek.return_value': 5, 'tell.return_value': 0})
# WHEN: Calling seek_or_fail with out the how arg set
seek_or_fail(mocked_file_like_object, 5)
# THEN: seek should be called using the os.SEEK_SET constant
mocked_file_like_object.seek.assert_called_once_with(5, os.SEEK_SET)
def test_seek_or_fail_os_end(self):
"""
Test the :func:`seek_or_fail` function when called with an unsupported seek operation.
"""
# GIVEN: A Mocked object
# WHEN: Attempting to seek relative to the end
# THEN: An NotImplementedError should have been raised
with self.assertRaises(NotImplementedError):
seek_or_fail(MagicMock(), 1, os.SEEK_END)
def test_seek_or_fail_valid_seek_set(self):
"""
Test that :func:`seek_or_fail` successfully seeks to the correct position.
"""
# GIVEN: A mocked file-like object
mocked_file_like_object = MagicMock(**{'tell.return_value': 3, 'seek.return_value': 5})
# WHEN: Attempting to seek from the beginning
result = seek_or_fail(mocked_file_like_object, 5, os.SEEK_SET)
# THEN: The new position should be 5 from the beginning
assert result == 5
def test_seek_or_fail_invalid_seek_set(self):
"""
Test that :func:`seek_or_fail` raises an exception when seeking past the end.
"""
# GIVEN: A Mocked file-like object
mocked_file_like_object = MagicMock(**{'tell.return_value': 3, 'seek.return_value': 10})
# WHEN: Attempting to seek from the beginning past the end
# THEN: An OSError should have been raised
with self.assertRaises(OSError):
seek_or_fail(mocked_file_like_object, 15, os.SEEK_SET)
def test_seek_or_fail_valid_seek_cur(self):
"""
Test that :func:`seek_or_fail` successfully seeks to the correct position.
"""
# GIVEN: A mocked file_like object
mocked_file_like_object = MagicMock(**{'tell.return_value': 3, 'seek.return_value': 8})
# WHEN: Attempting to seek from the current position
result = seek_or_fail(mocked_file_like_object, 5, os.SEEK_CUR)
# THEN: The new position should be 8 (5 from its starting position)
assert result == 8
def test_seek_or_fail_invalid_seek_cur(self):
"""
Test that :func:`seek_or_fail` raises an exception when seeking past the end.
"""
# GIVEN: A mocked file_like object
mocked_file_like_object = MagicMock(**{'tell.return_value': 3, 'seek.return_value': 10})
# WHEN: Attempting to seek from the current position pas the end.
# THEN: An OSError should have been raised
with self.assertRaises(OSError):
seek_or_fail(mocked_file_like_object, 15, os.SEEK_CUR)

View File

@ -34,15 +34,40 @@ class TestWordsOfWorshipFileImport(SongImportTestHelper):
def __init__(self, *args, **kwargs):
self.importer_class_name = 'WordsOfWorshipImport'
self.importer_module_name = 'wordsofworship'
super(TestWordsOfWorshipFileImport, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
def test_song_import(self):
def test_amazing_grace_song_import(self):
"""
Test that loading a Words of Worship file works correctly
"""
self.file_import([TEST_PATH / 'Amazing Grace (6 Verses).wow-song'],
self.load_external_result_data(TEST_PATH / 'Amazing Grace (6 Verses).json'))
self.file_import([TEST_PATH / 'When morning gilds the skies.wsg'],
self.load_external_result_data(TEST_PATH / 'When morning gilds the skies.json'))
self.file_import([TEST_PATH / 'Holy Holy Holy Lord God Almighty.wow-song'],
self.load_external_result_data(TEST_PATH / 'Holy Holy Holy Lord God Almighty.json'))
self.file_import([TEST_PATH / 'Amazing Grace (6 Verses)_v2_1_2.wow-song'],
self.load_external_result_data(TEST_PATH / 'Amazing Grace (6 Verses)_v2_1_2.json'))
def test_when_morning_gilds_song_import(self):
"""
Test that loading a Words of Worship file v2.0.0 works correctly
"""
self.file_import([TEST_PATH / 'When morning gilds the skies_v2_0_0.wsg'],
self.load_external_result_data(TEST_PATH / 'When morning gilds the skies_v2_0_0.json'))
def test_holy_holy_holy_song_import(self):
"""
Test that loading a Words of Worship file works correctly
"""
self.file_import([TEST_PATH / 'Holy Holy Holy Lord God Almighty_v2_1_2.wow-song'],
self.load_external_result_data(TEST_PATH / 'Holy Holy Holy Lord God Almighty_v2_1_2.json'))
def test_test_song_v2_0_0_song_import(self):
"""
Test that loading a Words of Worship file v2.0.0 works correctly
"""
self.file_import([TEST_PATH / 'Test_Song_v2_0_0.wsg'],
self.load_external_result_data(TEST_PATH / 'Test_Song_v2_0_0.json'))
def test_test_song_song_import(self):
"""
Test that loading a Words of Worship file v2.1.2 works correctly
"""
self.file_import([TEST_PATH / 'Test_Song_v2_1_2.wow-song'],
self.load_external_result_data(TEST_PATH / 'Test_Song_v2_1_2.json'))

View File

@ -2,7 +2,7 @@
"authors": [
"John Newton (1725-1807)"
],
"title": "Amazing Grace (6 Verses)",
"title": "Amazing Grace (6 Verses)_v2_1_2",
"verse_order_list": [],
"verses": [
[

View File

@ -2,7 +2,7 @@
"authors": [
"Words: Reginald Heber (1783-1826). Music: John B. Dykes (1823-1876)"
],
"title": "Holy Holy Holy Lord God Almighty",
"title": "Holy Holy Holy Lord God Almighty_v2_1_2",
"verse_order_list": [],
"verses": [
[

View File

@ -0,0 +1,18 @@
{
"authors": [
"Author"
],
"copyright": "Copyright",
"title": "Test_Song_v2_0_0",
"verse_order_list": [],
"verses": [
[
"Verse 1 Line 1\nVerse 1 Line 2\nVerse 1 Line 3\nVerse 1 Line 4",
"V"
],
[
"Chorus 1 Line 1\nChorus 1 Line 2\nChorus 1 Line 3\nChorus 1 Line 4\nChorus 1 Line 5",
"C"
]
]
}

View File

@ -0,0 +1,26 @@
{
"authors": [
"Author"
],
"copyright": "Copyright",
"title": "Test_Song_v2_1_2",
"verse_order_list": [],
"verses": [
[
"Verse 1 Line 1\n{minor}Verse 1 Line 2 Minor{/minor}",
"V"
],
[
"Chorus 1 Line 1\n{minor}Chorus 1 Line 2 Minor{/minor}",
"C"
],
[
"Bridge 1 Line 1\n{minor}Bridge 1 Line 2{/minor}",
"B"
],
[
"Verse 2 Line 1\n{minor}Verse 2 Line 2{/minor}",
"V"
]
]
}

View File

@ -2,7 +2,7 @@
"authors": [
"Author Unknown. Tr. Edward Caswall"
],
"title": "When morning gilds the skies",
"title": "When morning gilds the skies_v2_1_2",
"verse_order_list": [],
"verses": [
[