1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

archive - a first refactoring (#2061) (#2069)

* a first refactoring on archive

* added changelog fragment

* suggestion from PR

(cherry picked from commit 606eb0df15)

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2021-03-21 11:23:55 +01:00 committed by GitHub
parent 6f267d8f35
commit 853dd21eab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 59 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- archive - refactored some reused code out into a couple of functions (https://github.com/ansible-collections/community.general/pull/2061).

View file

@ -153,7 +153,6 @@ expanded_exclude_paths:
'''
import bz2
import filecmp
import glob
import gzip
import io
@ -186,6 +185,33 @@ else:
HAS_LZMA = False
def to_b(s):
return to_bytes(s, errors='surrogate_or_strict')
def to_n(s):
return to_native(s, errors='surrogate_or_strict')
def to_na(s):
return to_native(s, errors='surrogate_or_strict', encoding='ascii')
def expand_paths(paths):
expanded_path = []
is_globby = False
for path in paths:
b_path = to_b(path)
if b'*' in b_path or b'?' in b_path:
e_paths = glob.glob(b_path)
is_globby = True
else:
e_paths = [b_path]
expanded_path.extend(e_paths)
return expanded_path, is_globby
def main():
module = AnsibleModule(
argument_spec=dict(
@ -204,21 +230,17 @@ def main():
check_mode = module.check_mode
paths = params['path']
dest = params['dest']
b_dest = None if not dest else to_bytes(dest, errors='surrogate_or_strict')
b_dest = None if not dest else to_b(dest)
exclude_paths = params['exclude_path']
remove = params['remove']
b_expanded_paths = []
b_expanded_exclude_paths = []
fmt = params['format']
b_fmt = to_bytes(fmt, errors='surrogate_or_strict')
b_fmt = to_b(fmt)
force_archive = params['force_archive']
globby = False
changed = False
state = 'absent'
# Simple or archive file compression (inapplicable with 'zip' since it's always an archive)
archive = False
b_successes = []
# Fail early
@ -227,35 +249,7 @@ def main():
exception=LZMA_IMP_ERR)
module.fail_json(msg="lzma or backports.lzma is required when using xz format.")
for path in paths:
b_path = to_bytes(path, errors='surrogate_or_strict')
# Expand any glob characters. If found, add the expanded glob to the
# list of expanded_paths, which might be empty.
if (b'*' in b_path or b'?' in b_path):
b_expanded_paths.extend(glob.glob(b_path))
globby = True
# If there are no glob characters the path is added to the expanded paths
# whether the path exists or not
else:
b_expanded_paths.append(b_path)
# Only attempt to expand the exclude paths if it exists
if exclude_paths:
for exclude_path in exclude_paths:
b_exclude_path = to_bytes(exclude_path, errors='surrogate_or_strict')
# Expand any glob characters. If found, add the expanded glob to the
# list of expanded_paths, which might be empty.
if (b'*' in b_exclude_path or b'?' in b_exclude_path):
b_expanded_exclude_paths.extend(glob.glob(b_exclude_path))
# If there are no glob character the exclude path is added to the expanded
# exclude paths whether the path exists or not.
else:
b_expanded_exclude_paths.append(b_exclude_path)
b_expanded_paths, globby = expand_paths(paths)
if not b_expanded_paths:
return module.fail_json(
path=', '.join(paths),
@ -263,6 +257,9 @@ def main():
msg='Error, no source paths were found'
)
# Only attempt to expand the exclude paths if it exists
b_expanded_exclude_paths = expand_paths(exclude_paths)[0] if exclude_paths else []
# Only try to determine if we are working with an archive or not if we haven't set archive to true
if not force_archive:
# If we actually matched multiple files or TRIED to, then
@ -280,7 +277,7 @@ def main():
if archive and not b_dest:
module.fail_json(dest=dest, path=', '.join(paths), msg='Error, must specify "dest" when archiving multiple files or trees')
b_sep = to_bytes(os.sep, errors='surrogate_or_strict')
b_sep = to_b(os.sep)
b_archive_paths = []
b_missing = []
@ -321,7 +318,7 @@ def main():
# No source files were found but the named archive exists: are we 'compress' or 'archive' now?
if len(b_missing) == len(b_expanded_paths) and b_dest and os.path.exists(b_dest):
# Just check the filename to know if it's an archive or simple compressed file
if re.search(br'(\.tar|\.tar\.gz|\.tgz|\.tbz2|\.tar\.bz2|\.tar\.xz|\.zip)$', os.path.basename(b_dest), re.IGNORECASE):
if re.search(br'\.(tar|tar\.(gz|bz2|xz)|tgz|tbz2|zip)$', os.path.basename(b_dest), re.IGNORECASE):
state = 'archive'
else:
state = 'compress'
@ -352,7 +349,7 @@ def main():
# Slightly more difficult (and less efficient!) compression using zipfile module
if fmt == 'zip':
arcfile = zipfile.ZipFile(
to_native(b_dest, errors='surrogate_or_strict', encoding='ascii'),
to_na(b_dest),
'w',
zipfile.ZIP_DEFLATED,
True
@ -360,7 +357,7 @@ def main():
# Easier compression using tarfile module
elif fmt == 'gz' or fmt == 'bz2':
arcfile = tarfile.open(to_native(b_dest, errors='surrogate_or_strict', encoding='ascii'), 'w|' + fmt)
arcfile = tarfile.open(to_na(b_dest), 'w|' + fmt)
# python3 tarfile module allows xz format but for python2 we have to create the tarfile
# in memory and then compress it with lzma.
@ -370,7 +367,7 @@ def main():
# Or plain tar archiving
elif fmt == 'tar':
arcfile = tarfile.open(to_native(b_dest, errors='surrogate_or_strict', encoding='ascii'), 'w')
arcfile = tarfile.open(to_na(b_dest), 'w')
b_match_root = re.compile(br'^%s' % re.escape(b_arcroot))
for b_path in b_archive_paths:
@ -382,7 +379,7 @@ def main():
for b_dirname in b_dirnames:
b_fullpath = b_dirpath + b_dirname
n_fullpath = to_native(b_fullpath, errors='surrogate_or_strict', encoding='ascii')
n_fullpath = to_na(b_fullpath)
n_arcname = to_native(b_match_root.sub(b'', b_fullpath), errors='surrogate_or_strict')
try:
@ -396,8 +393,8 @@ def main():
for b_filename in b_filenames:
b_fullpath = b_dirpath + b_filename
n_fullpath = to_native(b_fullpath, errors='surrogate_or_strict', encoding='ascii')
n_arcname = to_native(b_match_root.sub(b'', b_fullpath), errors='surrogate_or_strict')
n_fullpath = to_na(b_fullpath)
n_arcname = to_n(b_match_root.sub(b'', b_fullpath))
try:
if fmt == 'zip':
@ -409,8 +406,8 @@ def main():
except Exception as e:
errors.append('Adding %s: %s' % (to_native(b_path), to_native(e)))
else:
path = to_native(b_path, errors='surrogate_or_strict', encoding='ascii')
arcname = to_native(b_match_root.sub(b'', b_path), errors='surrogate_or_strict')
path = to_na(b_path)
arcname = to_n(b_match_root.sub(b'', b_path))
if fmt == 'zip':
arcfile.write(path, arcname)
else:
@ -444,14 +441,14 @@ def main():
shutil.rmtree(b_path)
elif not check_mode:
os.remove(b_path)
except OSError as e:
except OSError:
errors.append(to_native(b_path))
for b_path in b_expanded_paths:
try:
if os.path.isdir(b_path):
shutil.rmtree(b_path)
except OSError as e:
except OSError:
errors.append(to_native(b_path))
if errors:
@ -490,25 +487,25 @@ def main():
try:
if fmt == 'zip':
arcfile = zipfile.ZipFile(
to_native(b_dest, errors='surrogate_or_strict', encoding='ascii'),
to_na(b_dest),
'w',
zipfile.ZIP_DEFLATED,
True
)
arcfile.write(
to_native(b_path, errors='surrogate_or_strict', encoding='ascii'),
to_native(b_path[len(b_arcroot):], errors='surrogate_or_strict')
to_na(b_path),
to_n(b_path[len(b_arcroot):])
)
arcfile.close()
state = 'archive' # because all zip files are archives
elif fmt == 'tar':
arcfile = tarfile.open(to_native(b_dest, errors='surrogate_or_strict', encoding='ascii'), 'w')
arcfile.add(to_native(b_path, errors='surrogate_or_strict', encoding='ascii'))
arcfile = tarfile.open(to_na(b_dest), 'w')
arcfile.add(to_na(b_path))
arcfile.close()
else:
f_in = open(b_path, 'rb')
n_dest = to_native(b_dest, errors='surrogate_or_strict', encoding='ascii')
n_dest = to_na(b_dest)
if fmt == 'gz':
f_out = gzip.open(n_dest, 'wb')
elif fmt == 'bz2':
@ -564,14 +561,14 @@ def main():
changed = module.set_fs_attributes_if_different(file_args, changed)
module.exit_json(
archived=[to_native(p, errors='surrogate_or_strict') for p in b_successes],
archived=[to_n(p) for p in b_successes],
dest=dest,
changed=changed,
state=state,
arcroot=to_native(b_arcroot, errors='surrogate_or_strict'),
missing=[to_native(p, errors='surrogate_or_strict') for p in b_missing],
expanded_paths=[to_native(p, errors='surrogate_or_strict') for p in b_expanded_paths],
expanded_exclude_paths=[to_native(p, errors='surrogate_or_strict') for p in b_expanded_exclude_paths],
arcroot=to_n(b_arcroot),
missing=[to_n(p) for p in b_missing],
expanded_paths=[to_n(p) for p in b_expanded_paths],
expanded_exclude_paths=[to_n(p) for p in b_expanded_exclude_paths],
)