Pathlib changes

This commit is contained in:
Philip Ridout 2017-09-05 21:48:55 +01:00
parent 28591ed04a
commit 9cb2b2e3c2
7 changed files with 294 additions and 53 deletions

View File

@ -29,8 +29,6 @@ logging and a plugin framework are contained within the openlp.core module.
import argparse
import logging
import os
import shutil
import sys
import time
from datetime import datetime
@ -43,6 +41,7 @@ from openlp.core.common import Registry, OpenLPMixin, AppLocation, LanguageManag
from openlp.core.common.path import Path
from openlp.core.common.versionchecker import VersionThread, get_application_version
from openlp.core.lib import ScreenList
from openlp.core.lib.shutil import copytree
from openlp.core.resources import qInitResources
from openlp.core.ui import SplashScreen
from openlp.core.ui.exceptionform import ExceptionForm
@ -181,25 +180,20 @@ class OpenLP(OpenLPMixin, QtWidgets.QApplication):
"""
Check if the data folder path exists.
"""
data_folder_path = str(AppLocation.get_data_path())
if not os.path.exists(data_folder_path):
log.critical('Database was not found in: ' + data_folder_path)
status = QtWidgets.QMessageBox.critical(None, translate('OpenLP', 'Data Directory Error'),
translate('OpenLP', 'OpenLP data folder was not found in:\n\n{path}'
'\n\nThe location of the data folder was '
'previously changed from the OpenLP\'s '
'default location. If the data was stored on '
'removable device, that device needs to be '
'made available.\n\nYou may reset the data '
'location back to the default location, '
'or you can try to make the current location '
'available.\n\nDo you want to reset to the '
'default data location? If not, OpenLP will be '
'closed so you can try to fix the the problem.')
.format(path=data_folder_path),
QtWidgets.QMessageBox.StandardButtons(QtWidgets.QMessageBox.Yes |
QtWidgets.QMessageBox.No),
QtWidgets.QMessageBox.No)
data_folder_path = AppLocation.get_data_path()
if not data_folder_path.exists():
log.critical('Database was not found in: %s', data_folder_path)
status = QtWidgets.QMessageBox.critical(
None, translate('OpenLP', 'Data Directory Error'),
translate('OpenLP', 'OpenLP data folder was not found in:\n\n{path}\n\nThe location of the data folder '
'was previously changed from the OpenLP\'s default location. If the data was '
'stored on removable device, that device needs to be made available.\n\nYou may '
'reset the data location back to the default location, or you can try to make the '
'current location available.\n\nDo you want to reset to the default data location? '
'If not, OpenLP will be closed so you can try to fix the the problem.')
.format(path=data_folder_path),
QtWidgets.QMessageBox.StandardButtons(QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No),
QtWidgets.QMessageBox.No)
if status == QtWidgets.QMessageBox.No:
# If answer was "No", return "True", it will shutdown OpenLP in def main
log.info('User requested termination')
@ -253,11 +247,11 @@ class OpenLP(OpenLPMixin, QtWidgets.QApplication):
'a backup of the old data folder?'),
defaultButton=QtWidgets.QMessageBox.Yes) == QtWidgets.QMessageBox.Yes:
# Create copy of data folder
data_folder_path = str(AppLocation.get_data_path())
data_folder_path = AppLocation.get_data_path()
timestamp = time.strftime("%Y%m%d-%H%M%S")
data_folder_backup_path = data_folder_path + '-' + timestamp
data_folder_backup_path = data_folder_path.with_name(data_folder_path.name + '-' + timestamp)
try:
shutil.copytree(data_folder_path, data_folder_backup_path)
copytree(data_folder_path, data_folder_backup_path)
except OSError:
QtWidgets.QMessageBox.warning(None, translate('OpenLP', 'Backup'),
translate('OpenLP', 'Backup of the data folder failed!'))

View File

@ -211,38 +211,32 @@ def url_get_file(callback, url, f_path, sha256=None):
:param callback: the class which needs to be updated
:param url: URL to download
:param f_path: Destination file
:param openlp.core.common.path.Path f_path: Destination file
:param sha256: The check sum value to be checked against the download value
"""
block_count = 0
block_size = 4096
retries = 0
log.debug("url_get_file: " + url)
if sha256:
hasher = hashlib.sha256()
while True:
try:
filename = open(f_path, "wb")
url_file = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
if sha256:
hasher = hashlib.sha256()
# Download until finished or canceled.
while not callback.was_cancelled:
data = url_file.read(block_size)
if not data:
break
filename.write(data)
if sha256:
hasher.update(data)
block_count += 1
callback._download_progress(block_count, block_size)
filename.close()
if sha256 and hasher.hexdigest() != sha256:
log.error('sha256 sums did not match for file: {file}'.format(file=f_path))
os.remove(f_path)
return False
except (urllib.error.URLError, socket.timeout) as err:
with f_path.open('wb') as file:
url_file = urllib.request.urlopen(url, timeout=CONNECTION_TIMEOUT)
# Download until finished or canceled.
while not callback.was_cancelled:
data = url_file.read(block_size)
if not data:
break
file.write(data)
if sha256:
hasher.update(data)
block_count += 1
callback._download_progress(block_count, block_size)
except (urllib.error.URLError, socket.timeout):
trace_error_handler(log)
filename.close()
os.remove(f_path)
f_path.unlink()
if retries > CONNECTION_RETRIES:
return False
else:
@ -251,8 +245,12 @@ def url_get_file(callback, url, f_path, sha256=None):
continue
break
# Delete file if cancelled, it may be a partial file.
if sha256 and hasher.hexdigest() != sha256:
log.error('sha256 sums did not match for file: {file}'.format(file=f_path))
f_path.unlink()
return False
if callback.was_cancelled:
os.remove(f_path)
f_path.unlink()
return True

97
openlp/core/lib/shutil.py Executable file
View File

@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
###############################################################################
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2017 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
""" Patch the shutil methods we use so they accept and return Path objects"""
import shutil
from openlp.core.common.path import path_to_str, str_to_path
from openlp.core.lib import replace_params
def copy(*args, **kwargs):
"""
Wraps :func:`shutil.copy` so that we can accept Path objects.
:param src openlp.core.common.path.Path: Takes a Path object which is then converted to a str object
:param dst openlp.core.common.path.Path: Takes a Path object which is then converted to a str object
:return: Converts the str object received from :func:`shutil.copy` to a Path or NoneType object
:rtype: openlp.core.common.path.Path | None
See the following link for more information on the other parameters:
https://docs.python.org/3/library/shutil.html#shutil.copy
"""
args, kwargs = replace_params(args, kwargs, ((0, 'src', path_to_str), (1, 'dst', path_to_str)))
return str_to_path(shutil.copy(*args, **kwargs))
def copyfile(*args, **kwargs):
"""
Wraps :func:`shutil.copyfile` so that we can accept Path objects.
:param openlp.core.common.path.Path src: Takes a Path object which is then converted to a str object
:param openlp.core.common.path.Path dst: Takes a Path object which is then converted to a str object
:return: Converts the str object received from :func:`shutil.copyfile` to a Path or NoneType object
:rtype: openlp.core.common.path.Path | None
See the following link for more information on the other parameters:
https://docs.python.org/3/library/shutil.html#shutil.copyfile
"""
args, kwargs = replace_params(args, kwargs, ((0, 'src', path_to_str), (1, 'dst', path_to_str)))
return str_to_path(shutil.copyfile(*args, **kwargs))
def copytree(*args, **kwargs):
"""
Wraps :func:shutil.copytree` so that we can accept Path objects.
:param openlp.core.common.path.Path src : Takes a Path object which is then converted to a str object
:param openlp.core.common.path.Path dst: Takes a Path object which is then converted to a str object
:return: Converts the str object received from :func:`shutil.copytree` to a Path or NoneType object
:rtype: openlp.core.common.path.Path | None
See the following link for more information on the other parameters:
https://docs.python.org/3/library/shutil.html#shutil.copytree
"""
args, kwargs = replace_params(args, kwargs, ((0, 'src', path_to_str), (1, 'dst', path_to_str)))
return str_to_path(shutil.copytree(*args, **kwargs))
def rmtree(*args, **kwargs):
"""
Wraps :func:shutil.rmtree` so that we can accept Path objects.
:param openlp.core.common.path.Path path: Takes a Path object which is then converted to a str object
:return: Passes the return from :func:`shutil.rmtree` back
:rtype: None
See the following link for more information on the other parameters:
https://docs.python.org/3/library/shutil.html#shutil.rmtree
"""
args, kwargs = replace_params(args, kwargs, ((0, 'path', path_to_str),))
return shutil.rmtree(*args, **kwargs)

View File

@ -563,7 +563,7 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
filename, sha256 = item.data(QtCore.Qt.UserRole)
self._increment_progress_bar(self.downloading.format(name=filename), 0)
self.previous_size = 0
destination = os.path.join(songs_destination, str(filename))
destination = Path(songs_destination, str(filename))
if not url_get_file(self, '{path}{name}'.format(path=self.songs_url, name=filename),
destination, sha256):
missed_files.append('Song: {name}'.format(name=filename))
@ -576,7 +576,7 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
self._increment_progress_bar(self.downloading.format(name=bible), 0)
self.previous_size = 0
if not url_get_file(self, '{path}{name}'.format(path=self.bibles_url, name=bible),
os.path.join(bibles_destination, bible),
Path(bibles_destination, bible),
sha256):
missed_files.append('Bible: {name}'.format(name=bible))
bibles_iterator += 1
@ -588,7 +588,7 @@ class FirstTimeForm(QtWidgets.QWizard, UiFirstTimeWizard, RegistryProperties):
self._increment_progress_bar(self.downloading.format(name=theme), 0)
self.previous_size = 0
if not url_get_file(self, '{path}{name}'.format(path=self.themes_url, name=theme),
os.path.join(themes_destination, theme),
Path(themes_destination, theme),
sha256):
missed_files.append('Theme: {name}'.format(name=theme))
if missed_files:

View File

@ -64,6 +64,6 @@ def download_and_check(callback=None):
file_size = get_url_file_size('https://get.openlp.org/webclient/site.zip')
callback.setRange(0, file_size)
if url_get_file(callback, '{host}{name}'.format(host='https://get.openlp.org/webclient/', name='site.zip'),
os.path.join(str(AppLocation.get_section_data_path('remotes')), 'site.zip'),
AppLocation.get_section_data_path('remotes') / 'site.zip',
sha256=sha256):
deploy_zipfile(str(AppLocation.get_section_data_path('remotes')), 'site.zip')

View File

@ -29,6 +29,7 @@ from unittest import TestCase
from unittest.mock import MagicMock, patch
from openlp.core.common.httputils import get_user_agent, get_web_page, get_url_file_size, url_get_file, ping
from openlp.core.common.path import Path
from tests.helpers.testmixin import TestMixin
@ -267,7 +268,7 @@ class TestHttpUtils(TestCase, TestMixin):
mocked_urlopen.side_effect = socket.timeout()
# WHEN: Attempt to retrieve a file
url_get_file(MagicMock(), url='http://localhost/test', f_path=self.tempfile)
url_get_file(MagicMock(), url='http://localhost/test', f_path=Path(self.tempfile))
# THEN: socket.timeout should have been caught
# NOTE: Test is if $tmpdir/tempfile is still there, then test fails since ftw deletes bad downloaded files

View File

@ -0,0 +1,151 @@
import os
from unittest import TestCase
from unittest.mock import ANY, MagicMock, patch
from openlp.core.common.path import Path
from openlp.core.lib import shutilpatches
class TestShutilPatches(TestCase):
"""
Tests for the :mod:`openlp.core.lib.shutil` module
"""
def test_pcopy(self):
"""
Test :func:`copy`
"""
# GIVEN: A mocked `shutil.copy` which returns a test path as a string
with patch('openlp.core.lib.shutil.shutil.copy', return_value=os.path.join('destination', 'test', 'path')) \
as mocked_shutil_copy:
# WHEN: Calling shutilpatches.copy with the src and dst parameters as Path object types
result = shutilpatches.copy(Path('source', 'test', 'path'), Path('destination', 'test', 'path'))
# THEN: `shutil.copy` should have been called with the str equivalents of the Path objects.
# `shutilpatches.copy` should return the str type result of calling `shutil.copy` as a Path
# object.
mocked_shutil_copy.assert_called_once_with(os.path.join('source', 'test', 'path'),
os.path.join('destination', 'test', 'path'))
self.assertEqual(result, Path('destination', 'test', 'path'))
def test_pcopy_follow_optional_params(self):
"""
Test :func:`copy` when follow_symlinks is set to false
"""
# GIVEN: A mocked `shutil.copy`
with patch('openlp.core.lib.shutil.shutil.copy', return_value='') as mocked_shutil_copy:
# WHEN: Calling shutilpatches.copy with `follow_symlinks` set to False
shutilpatches.copy(Path('source', 'test', 'path'),
Path('destination', 'test', 'path'),
follow_symlinks=False)
# THEN: `shutil.copy` should have been called with follow_symlinks is set to false
mocked_shutil_copy.assert_called_once_with(ANY, ANY, follow_symlinks=False)
def test_pcopyfile(self):
"""
Test :func:`copyfile`
"""
# GIVEN: A mocked `shutil.copyfile` which returns a test path as a string
with patch('openlp.core.lib.shutil.shutil.copyfile', return_value=os.path.join('destination', 'test', 'path')) \
as mocked_shutil_copyfile:
# WHEN: Calling shutilpatches.copyfile with the src and dst parameters as Path object types
result = shutilpatches.copyfile(Path('source', 'test', 'path'), Path('destination', 'test', 'path'))
# THEN: `shutil.copyfile` should have been called with the str equivalents of the Path objects.
# `shutilpatches.copyfile` should return the str type result of calling `shutil.copyfile` as a Path
# object.
mocked_shutil_copyfile.assert_called_once_with(os.path.join('source', 'test', 'path'),
os.path.join('destination', 'test', 'path'))
self.assertEqual(result, Path('destination', 'test', 'path'))
def test_pcopyfile_optional_params(self):
"""
Test :func:`copyfile` when follow_symlinks is set to false
"""
# GIVEN: A mocked `shutil.copyfile`
with patch('openlp.core.lib.shutil.shutil.copyfile', return_value='') as mocked_shutil_copyfile:
# WHEN: Calling shutilpatches.copyfile with `follow_symlinks` set to False
shutilpatches.copyfile(Path('source', 'test', 'path'),
Path('destination', 'test', 'path'),
follow_symlinks=False)
# THEN: `shutil.copyfile` should have been called with the optional parameters, with out any of the values
# being modified
mocked_shutil_copyfile.assert_called_once_with(ANY, ANY, follow_symlinks=False)
def test_pcopytree(self):
"""
Test :func:`copytree`
"""
# GIVEN: A mocked `shutil.copytree` which returns a test path as a string
with patch('openlp.core.lib.shutil.shutil.copytree', return_value=os.path.join('destination', 'test', 'path')) \
as mocked_shutil_copytree:
# WHEN: Calling shutilpatches.copytree with the src and dst parameters as Path object types
result = shutilpatches.copytree(Path('source', 'test', 'path'), Path('destination', 'test', 'path'))
# THEN: `shutil.copytree` should have been called with the str equivalents of the Path objects.
# `shutilpatches.copytree` should return the str type result of calling `shutil.copytree` as a Path
# object.
mocked_shutil_copytree.assert_called_once_with(os.path.join('source', 'test', 'path'),
os.path.join('destination', 'test', 'path'))
self.assertEqual(result, Path('destination', 'test', 'path'))
def test_pcopytree_optional_params(self):
"""
Test :func:`copytree` when optional parameters are passed
"""
# GIVEN: A mocked `shutil.copytree`
with patch('openlp.core.lib.shutil.shutil.copytree', return_value='') as mocked_shutil_copytree:
mocked_ignore = MagicMock()
mocked_copy_function = MagicMock()
# WHEN: Calling shutilpatches.copytree with the optional parameters set
shutilpatches.copytree(Path('source', 'test', 'path'),
Path('destination', 'test', 'path'),
symlinks=True,
ignore=mocked_ignore,
copy_function=mocked_copy_function,
ignore_dangling_symlinks=True)
# THEN: `shutil.copytree` should have been called with the optional parameters, with out any of the values
# being modified
mocked_shutil_copytree.assert_called_once_with(ANY, ANY,
symlinks=True,
ignore=mocked_ignore,
copy_function=mocked_copy_function,
ignore_dangling_symlinks=True)
def test_prmtree(self):
"""
Test :func:`rmtree`
"""
# GIVEN: A mocked `shutil.rmtree`
with patch('openlp.core.lib.shutil.shutil.rmtree', return_value=None) as mocked_rmtree:
# WHEN: Calling shutilpatches.rmtree with the path parameter as Path object type
result = shutilpatches.rmtree(Path('test', 'path'))
# THEN: `shutil.rmtree` should have been called with the str equivalents of the Path object.
mocked_rmtree.assert_called_once_with(os.path.join('test', 'path'))
self.assertIsNone(result)
def test_prmtree_optional_params(self):
"""
Test :func:`rmtree` when optional parameters are passed
"""
# GIVEN: A mocked `shutil.rmtree`
with patch('openlp.core.lib.shutil.shutil.rmtree', return_value='') as mocked_shutil_rmtree:
mocked_on_error = MagicMock()
# WHEN: Calling shutilpatches.rmtree with `ignore_errors` set to True and `onerror` set to a mocked object
shutilpatches.rmtree(Path('test', 'path'), ignore_errors=True, onerror=mocked_on_error)
# THEN: `shutil.rmtree` should have been called with the optional parameters, with out any of the values
# being modified
mocked_shutil_rmtree.assert_called_once_with(ANY, ignore_errors=True, onerror=mocked_on_error)