1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/lib/ansible/module_common.py

166 lines
5.8 KiB
Python
Raw Normal View History

# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
REPLACER = "#<<INCLUDE_ANSIBLE_MODULE_COMMON>>"
MODULE_COMMON = """
# == BEGIN DYNAMICALLY INSERTED CODE ==
# ansible modules can be written in any language. To simplify
# development of Python modules, the functions available here
# can be inserted in any module source automatically by including
# #<<INCLUDE_ANSIBLE_MODULE_COMMON>> on a blank line by itself inside
# of an ansible module. The source of this common code lives
# in lib/ansible/module_common.py
try:
import json
except ImportError:
import simplejson as json
import os
import re
import shlex
import subprocess
import sys
import syslog
try:
from hashlib import md5 as _md5
except ImportError:
from md5 import md5 as _md5
class AnsibleModule(object):
def __init__(self, argument_spec, bypass_checks=False, no_log=False):
'''
2012-07-18 05:09:57 +02:00
common code for quickly building an ansible module in Python
(although you can write modules in anything that can return JSON)
see library/slurp and others for examples
'''
self.argument_spec = argument_spec
(self.params, self.args) = self._load_params()
2012-07-18 05:09:57 +02:00
self._handle_aliases()
self._set_defaults()
if not bypass_checks:
self._check_required_arguments()
self._check_argument_types()
if not no_log:
self._log_invocation()
2012-07-18 05:09:57 +02:00
def _handle_aliases(self):
for (k,v) in self.argument_spec.iteritems():
aliases = v.get('aliases', None)
if aliases is None:
continue
if type(aliases) != list:
self.fail_json(msg='internal error: aliases must be a list')
for alias in aliases:
if alias in self.params:
self.params[k] = self.params[alias]
def _check_required_arguments(self):
''' ensure all required arguments are present '''
missing = []
for (k,v) in self.argument_spec.iteritems():
2012-07-18 05:09:57 +02:00
required = v.get('required', False)
if required and k not in self.params:
missing.append(k)
if len(missing) > 0:
self.fail_json(msg="missing required arguments: %s" % ",".join(missing))
def _check_argument_types(self):
''' ensure all arguments have the requested values, and there are no stray arguments '''
for (k,v) in self.argument_spec.iteritems():
2012-07-18 05:09:57 +02:00
choices = v.get('choices',None)
if choices is None:
continue
if type(choices) == list:
if k in self.params:
if self.params[k] not in choices:
choices_str=",".join(choices)
msg="value of %s must be one of: %s, got: %s" % (k, choices_str, self.params[k])
self.fail_json(msg=msg)
else:
self.fail_json(msg="internal error: do not know how to interpret argument_spec")
def _set_defaults(self):
for (k,v) in self.argument_spec.iteritems():
default = v.get('default', '__NO_DEFAULT__')
if default != '__NO_DEFAULT__' and k not in self.params:
self.params[k] = default
def _load_params(self):
''' read the input and return a dictionary and the arguments string '''
if len(sys.argv) == 2 and os.path.exists(sys.argv[1]):
argfile = sys.argv[1]
args = open(argfile, 'r').read()
else:
args = ' '.join(sys.argv[1:])
items = shlex.split(args)
params = {}
for x in items:
(k, v) = x.split("=",1)
params[k] = v
return (params, args)
def _log_invocation(self):
''' log that ansible ran the module '''
syslog.openlog('ansible-%s' % os.path.basename(__file__))
# Sanitize possible password argument when logging
log_args = re.sub(r'password=.+ (.*)', r"password=NOT_LOGGING_PASSWORD \1", self.args)
syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % log_args)
def jsonify(self, data):
return json.dumps(data)
2012-07-18 05:09:57 +02:00
def exit_json(self, **kwargs):
''' return from the module, without error '''
print self.jsonify(kwargs)
2012-07-18 05:09:57 +02:00
sys.exit(0)
def fail_json(self, **kwargs):
''' return from the module, with an error message '''
assert 'msg' in kwargs, "implementation error -- msg to explain the error is required"
kwargs['failed'] = True
print self.jsonify(kwargs)
2012-07-18 05:09:57 +02:00
sys.exit(1)
def md5(self, filename):
''' Return MD5 hex digest of local file, or None if file is not present. '''
if not os.path.exists(filename):
return None
if os.path.isdir(filename):
self.fail_json(msg="attempted to take md5sum of directory: %s" % filename)
digest = _md5()
blocksize = 64 * 1024
infile = open(filename, 'rb')
block = infile.read(blocksize)
while block:
digest.update(block)
block = infile.read(blocksize)
infile.close()
return digest.hexdigest()
# == END DYNAMICALLY INSERTED CODE ===
"""