1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Use a regexp to filter out arguments instead

pipes.quote is a bit overzealous for what we want to do, quoting ;
and other characters that you most likely want to use in your shell
invocations. The regexp is the best I could come up with to be able
to only replace the parts of the arguments that shouldn't be
executed.
This commit is contained in:
Daniel Hokka Zakrisson 2012-10-01 08:30:53 +02:00
parent 880328c10f
commit 6477bdc6fc
2 changed files with 24 additions and 14 deletions

View file

@ -22,8 +22,8 @@ import subprocess
import sys import sys
import datetime import datetime
import traceback import traceback
import re
import shlex import shlex
import pipes
import os import os
DOCUMENTATION = ''' DOCUMENTATION = '''
@ -131,7 +131,6 @@ class CommandModule(AnsibleModule):
def _load_params(self): def _load_params(self):
''' read the input and return a dictionary and the arguments string ''' ''' read the input and return a dictionary and the arguments string '''
args = MODULE_ARGS args = MODULE_ARGS
items = shlex.split(args)
params = {} params = {}
params['chdir'] = None params['chdir'] = None
params['shell'] = False params['shell'] = False
@ -139,14 +138,13 @@ class CommandModule(AnsibleModule):
args = args.replace("#USE_SHELL", "") args = args.replace("#USE_SHELL", "")
params['shell'] = True params['shell'] = True
check_args = shlex.split(args) r = re.compile(r'(^|\s)(creates|removes|chdir)=(?P<quote>[\'"])?(.*?)(?(quote)(?<!\\)(?P=quote))((?<!\\)(?=\s)|$)')
l_args = [] for m in r.finditer(args):
for x in check_args: v = m.group(4).replace("\\", "")
if x.startswith("creates="): if m.group(2) == "creates":
# do not run the command if the line contains creates=filename # do not run the command if the line contains creates=filename
# and the filename already exists. This allows idempotence # and the filename already exists. This allows idempotence
# of command executions. # of command executions.
(k,v) = x.split("=",1)
if os.path.exists(v): if os.path.exists(v):
self.exit_json( self.exit_json(
cmd=args, cmd=args,
@ -156,11 +154,10 @@ class CommandModule(AnsibleModule):
stderr=False, stderr=False,
rc=0 rc=0
) )
elif x.startswith("removes="): elif m.group(2) == "removes":
# do not run the command if the line contains removes=filename # do not run the command if the line contains removes=filename
# and the filename do not exists. This allows idempotence # and the filename do not exists. This allows idempotence
# of command executions. # of command executions.
(k,v) = x.split("=",1)
if not os.path.exists(v): if not os.path.exists(v):
self.exit_json( self.exit_json(
cmd=args, cmd=args,
@ -170,17 +167,15 @@ class CommandModule(AnsibleModule):
stderr=False, stderr=False,
rc=0 rc=0
) )
elif x.startswith("chdir="): elif m.group(2) == "chdir":
(k,v) = x.split("=", 1)
v = os.path.expanduser(v) v = os.path.expanduser(v)
if not (os.path.exists(v) and os.path.isdir(v)): if not (os.path.exists(v) and os.path.isdir(v)):
self.fail_json(msg="cannot change to directory '%s': path does not exist" % v) self.fail_json(msg="cannot change to directory '%s': path does not exist" % v)
elif v[0] != '/': elif v[0] != '/':
self.fail_json(msg="the path for 'chdir' argument must be fully qualified") self.fail_json(msg="the path for 'chdir' argument must be fully qualified")
params['chdir'] = v params['chdir'] = v
else: args = r.sub("", args)
l_args.append(x) params['args'] = args
params['args'] = " ".join([pipes.quote(x) for x in l_args])
return (params, params['args']) return (params, params['args'])
main() main()

View file

@ -138,6 +138,21 @@ class TestRunner(unittest.TestCase):
assert 'failed' not in result assert 'failed' not in result
assert result['rc'] == 0 assert result['rc'] == 0
result = self._run('command', [ "creates='/tmp/ansible command test'", "chdir=/tmp", "touch", "'ansible command test'" ])
assert 'changed' in result
assert result['rc'] == 0
result = self._run('command', [ "creates='/tmp/ansible command test'", "false" ])
assert 'skipped' in result
result = self._run('shell', [ "removes=/tmp/ansible\\ command\\ test", "chdir=/tmp", "rm -f 'ansible command test'; echo $?" ])
assert 'changed' in result
assert result['rc'] == 0
assert result['stdout'] == '0'
result = self._run('shell', [ "removes=/tmp/ansible\\ command\\ test", "false" ])
assert 'skipped' in result
def test_git(self): def test_git(self):
self._run('file',['path=/tmp/gitdemo','state=absent']) self._run('file',['path=/tmp/gitdemo','state=absent'])
self._run('file',['path=/tmp/gd','state=absent']) self._run('file',['path=/tmp/gd','state=absent'])