mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
59999a89f1
csv module utils: detect unicode BOM in content (#6662)
* csv module utils: detect unicode BOM in content
* fix handling of py2
* fix comment
* add changelog frag
* add missing link
* simplification
(cherry picked from commit bb2169340d
)
Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
70 lines
2 KiB
Python
70 lines
2 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2021, Andrew Pantuso (@ajpantuso) <ajpantuso@gmail.com>
|
|
# Copyright (c) 2018, Dag Wieers (@dagwieers) <dag@wieers.com>
|
|
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
__metaclass__ = type
|
|
|
|
import csv
|
|
from io import BytesIO, StringIO
|
|
|
|
from ansible.module_utils.common.text.converters import to_native
|
|
from ansible.module_utils.six import PY3
|
|
|
|
|
|
class CustomDialectFailureError(Exception):
|
|
pass
|
|
|
|
|
|
class DialectNotAvailableError(Exception):
|
|
pass
|
|
|
|
|
|
CSVError = csv.Error
|
|
|
|
|
|
def initialize_dialect(dialect, **kwargs):
|
|
# Add Unix dialect from Python 3
|
|
class unix_dialect(csv.Dialect):
|
|
"""Describe the usual properties of Unix-generated CSV files."""
|
|
delimiter = ','
|
|
quotechar = '"'
|
|
doublequote = True
|
|
skipinitialspace = False
|
|
lineterminator = '\n'
|
|
quoting = csv.QUOTE_ALL
|
|
|
|
csv.register_dialect("unix", unix_dialect)
|
|
|
|
if dialect not in csv.list_dialects():
|
|
raise DialectNotAvailableError("Dialect '%s' is not supported by your version of python." % dialect)
|
|
|
|
# Create a dictionary from only set options
|
|
dialect_params = dict((k, v) for k, v in kwargs.items() if v is not None)
|
|
if dialect_params:
|
|
try:
|
|
csv.register_dialect('custom', dialect, **dialect_params)
|
|
except TypeError as e:
|
|
raise CustomDialectFailureError("Unable to create custom dialect: %s" % to_native(e))
|
|
dialect = 'custom'
|
|
|
|
return dialect
|
|
|
|
|
|
def read_csv(data, dialect, fieldnames=None):
|
|
BOM = to_native(u'\ufeff')
|
|
data = to_native(data, errors='surrogate_or_strict')
|
|
if data.startswith(BOM):
|
|
data = data[len(BOM):]
|
|
|
|
if PY3:
|
|
fake_fh = StringIO(data)
|
|
else:
|
|
fake_fh = BytesIO(data)
|
|
|
|
reader = csv.DictReader(fake_fh, fieldnames=fieldnames, dialect=dialect)
|
|
|
|
return reader
|