From ae29245e050c4dd613c4cc9c0d2f504766f160b3 Mon Sep 17 00:00:00 2001 From: Brian Coca Date: Sat, 16 Sep 2017 23:35:50 -0400 Subject: [PATCH] decelerate! (#30160) removed accelerate code removed keyczar dep for accelerate --- lib/ansible/config/base.yml | 95 --- lib/ansible/executor/task_executor.py | 38 - .../modules/utilities/helper/_accelerate.py | 672 +----------------- lib/ansible/playbook/play.py | 9 +- lib/ansible/playbook/play_context.py | 12 - lib/ansible/plugins/action/__init__.py | 5 - lib/ansible/plugins/connection/accelerate.py | 327 --------- lib/ansible/utils/encrypt.py | 118 +-- 8 files changed, 8 insertions(+), 1268 deletions(-) delete mode 100644 lib/ansible/plugins/connection/accelerate.py diff --git a/lib/ansible/config/base.yml b/lib/ansible/config/base.yml index 0c4e6cb263..1b016d79d0 100644 --- a/lib/ansible/config/base.yml +++ b/lib/ansible/config/base.yml @@ -1,101 +1,6 @@ # Copyright (c) 2017 Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) --- -ACCELERATE_CONNECT_TIMEOUT: - default: 1.0 - description: - - "This setting controls the timeout for the socket connect call, and should be kept relatively low. - The connection to the accelerate_port will be attempted 3 times before Ansible will fall back to ssh or paramiko - (depending on your default connection setting) to try and start the accelerate daemon remotely." - - "Note, this value can be set to less than one second, however it is probably not a good idea to do so - unless you are on a very fast and reliable LAN. If you are connecting to systems over the internet, it may be necessary to increase this timeout." - env: [{name: ACCELERATE_CONNECT_TIMEOUT }] - ini: - - {key: accelerate_connect_timeout, section: accelerate} - type: float - deprecated: - why: Removing accelerate as a connection method, settings not needed either. - version: "2.5" - alternatives: ssh and paramiko - version_added: "1.4" -ACCELERATE_DAEMON_TIMEOUT: - default: 30 - description: - - This setting controls the timeout for the accelerated daemon, as measured in minutes. The default daemon timeout is 30 minutes. - - "Prior to 1.6, the timeout was hard-coded from the time of the daemon's launch." - - For version 1.6+, the timeout is now based on the last activity to the daemon and is configurable via this option. - env: [{name: ACCELERATE_DAEMON_TIMEOUT}] - ini: - - {key: accelerate_daemon_timeout, section: accelerate} - type: integer - deprecated: - why: Removing accelerate as a connection method, settings not needed either. - version: "2.5" - alternatives: ssh and paramiko - version_added: "1.6" -ACCELERATE_KEYS_DIR: - default: ~/.fireball.keys - description: '' - deprecated: - why: Removing accelerate as a connection method, settings not needed either. - version: "2.5" - alternatives: ssh and paramiko - env: [{name: ACCELERATE_KEYS_DIR}] - ini: - - {key: accelerate_keys_dir, section: accelerate} -ACCELERATE_KEYS_DIR_PERMS: - default: '700' - description: 'TODO: write it' - env: [{name: ACCELERATE_KEYS_DIR_PERMS}] - ini: - - {key: accelerate_keys_dir_perms, section: accelerate} - deprecated: - why: Removing accelerate as a connection method, settings not needed either. - version: "2.5" - alternatives: ssh and paramiko -ACCELERATE_KEYS_FILE_PERMS: - default: '600' - description: 'TODO: write it' - env: [{name: ACCELERATE_KEYS_FILE_PERMS}] - ini: - - {key: accelerate_keys_file_perms, section: accelerate} - deprecated: - why: Removing accelerate as a connection method, settings not needed either. - version: "2.5" - alternatives: ssh and paramiko -ACCELERATE_MULTI_KEY: - default: False - description: 'TODO: write it' - env: [{name: ACCELERATE_MULTI_KEY}] - ini: - - {key: accelerate_multi_key, section: accelerate} - type: boolean - deprecated: - why: Removing accelerate as a connection method, settings not needed either. - version: "2.5" - alternatives: ssh and paramiko -ACCELERATE_PORT: - default: 5099 - description: 'TODO: write it' - env: [{name: ACCELERATE_PORT}] - ini: - - {key: accelerate_port, section: accelerate} - type: integer - deprecated: - why: Removing accelerate as a connection method, settings not needed either. - version: "2.5" - alternatives: ssh and paramiko -ACCELERATE_TIMEOUT: - default: 30 - description: 'TODO: write it' - env: [{name: ACCELERATE_TIMEOUT}] - ini: - - {key: accelerate_timeout, section: accelerate} - type: integer - deprecated: - why: Removing accelerate as a connection method, settings not needed either. - version: "2.5" - alternatives: ssh and paramiko ALLOW_WORLD_READABLE_TMPFILES: name: Allow world readable temporary files default: False diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index 66e371deb4..0464b9a515 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -19,7 +19,6 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type -import base64 import time import traceback @@ -32,7 +31,6 @@ from ansible.playbook.conditional import Conditional from ansible.playbook.task import Task from ansible.plugins.connection import ConnectionBase from ansible.template import Templar -from ansible.utils.encrypt import key_for_hostname from ansible.utils.listify import listify_lookup_plugin_terms from ansible.utils.unsafe_proxy import UnsafeProxy, wrap_var @@ -738,42 +736,6 @@ class TaskExecutor: self._play_context.set_options_from_plugin(connection) - if self._play_context.accelerate: - # accelerate is deprecated as of 2.1... - display.deprecated('Accelerated mode is deprecated. Consider using SSH with ControlPersist and pipelining enabled instead', version='2.6') - # launch the accelerated daemon here - ssh_connection = connection - handler = self._shared_loader_obj.action_loader.get( - 'normal', - task=self._task, - connection=ssh_connection, - play_context=self._play_context, - loader=self._loader, - templar=templar, - shared_loader_obj=self._shared_loader_obj, - ) - - key = key_for_hostname(self._play_context.remote_addr) - accelerate_args = dict( - password=base64.b64encode(key.__str__()), - port=self._play_context.accelerate_port, - minutes=C.ACCELERATE_DAEMON_TIMEOUT, - ipv6=self._play_context.accelerate_ipv6, - debug=self._play_context.verbosity, - ) - - connection = self._shared_loader_obj.connection_loader.get('accelerate', self._play_context, self._new_stdin) - if not connection: - raise AnsibleError("the connection plugin '%s' was not found" % conn_type) - - try: - connection._connect() - except AnsibleConnectionFailure: - display.debug('connection failed, fallback to accelerate') - res = handler._execute_module(module_name='accelerate', module_args=accelerate_args, task_vars=variables, delete_remote_tmp=False) - display.debug(res) - connection._connect() - return connection def _get_action_handler(self, connection, templar): diff --git a/lib/ansible/modules/utilities/helper/_accelerate.py b/lib/ansible/modules/utilities/helper/_accelerate.py index f6d92797f4..e14535810c 100644 --- a/lib/ansible/modules/utilities/helper/_accelerate.py +++ b/lib/ansible/modules/utilities/helper/_accelerate.py @@ -16,9 +16,11 @@ ANSIBLE_METADATA = {'metadata_version': '1.1', DOCUMENTATION = ''' --- module: accelerate +removed: True short_description: Enable accelerated mode on remote node deprecated: "Use SSH with ControlPersist instead." description: + - This module has been removed, this file is kept for historicaly documentation purposes - This modules launches an ephemeral I(accelerate) daemon on the remote node which Ansible can use to communicate with nodes at high speed. - The daemon listens on a configurable port for a configurable amount of time. @@ -75,673 +77,3 @@ EXAMPLES = ''' tasks: - command: /usr/bin/anything ''' - -import base64 -import errno -import getpass -import json -import os -import os.path -import pwd -import signal -import socket -import struct -import sys -import syslog -import tempfile -import time -import traceback - - -import datetime -from threading import Thread, Lock - -# import module snippets -# we must import this here at the top so we can use get_module_path() -from ansible.module_utils.basic import AnsibleModule, get_module_path -from ansible.module_utils.six.moves import socketserver - -# the chunk size to read and send, assuming mtu 1500 and -# leaving room for base64 (+33%) encoding and header (100 bytes) -# 4 * (975/3) + 100 = 1400 -# which leaves room for the TCP/IP header -CHUNK_SIZE = 10240 - -# FIXME: this all should be moved to module_common, as it's -# pretty much a copy from the callbacks/util code -DEBUG_LEVEL = 0 - - -def log(msg, cap=0): - global DEBUG_LEVEL - if DEBUG_LEVEL >= cap: - syslog.syslog(syslog.LOG_NOTICE | syslog.LOG_DAEMON, msg) - - -def v(msg): - log(msg, cap=1) - - -def vv(msg): - log(msg, cap=2) - - -def vvv(msg): - log(msg, cap=3) - - -def vvvv(msg): - log(msg, cap=4) - - -HAS_KEYCZAR = False -try: - from keyczar.keys import AesKey - HAS_KEYCZAR = True -except ImportError: - pass - -SOCKET_FILE = os.path.join(get_module_path(), '.ansible-accelerate', ".local.socket") - - -def get_pid_location(module): - """ - Try to find a pid directory in the common locations, falling - back to the user's home directory if no others exist - """ - for dir in ['/var/run', '/var/lib/run', '/run', os.path.expanduser("~/")]: - try: - if os.path.isdir(dir) and os.access(dir, os.R_OK | os.W_OK): - return os.path.join(dir, '.accelerate.pid') - except: - pass - module.fail_json(msg="couldn't find any valid directory to use for the accelerate pid file") - - -# NOTE: this shares a fair amount of code in common with async_wrapper, if async_wrapper were a new module we could move -# this into utils.module_common and probably should anyway - -def daemonize_self(module, password, port, minutes, pid_file): - # daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 - try: - pid = os.fork() - if pid > 0: - vvv("exiting pid %s" % pid) - # exit first parent - module.exit_json(msg="daemonized accelerate on port %s for %s minutes with pid %s" % (port, minutes, str(pid))) - except OSError as e: - message = "fork #1 failed: %d (%s)" % (e.errno, e.strerror) - module.fail_json(msg=message) - - # decouple from parent environment - os.chdir("/") - os.setsid() - os.umask(int('O22', 8)) - - # do second fork - try: - pid = os.fork() - if pid > 0: - log("daemon pid %s, writing %s" % (pid, pid_file)) - pid_file = open(pid_file, "w") - pid_file.write("%s" % pid) - pid_file.close() - vvv("pid file written") - sys.exit(0) - except OSError as e: - log('fork #2 failed: %d (%s)' % (e.errno, e.strerror)) - sys.exit(1) - - dev_null = open('/dev/null', 'rw') - os.dup2(dev_null.fileno(), sys.stdin.fileno()) - os.dup2(dev_null.fileno(), sys.stdout.fileno()) - os.dup2(dev_null.fileno(), sys.stderr.fileno()) - log("daemonizing successful") - - -class LocalSocketThread(Thread): - server = None - terminated = False - - def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, Verbose=None): - kwargs = {} if kwargs is None else kwargs - - self.server = kwargs.get('server') - Thread.__init__(self, group, target, name, args, kwargs, Verbose) - - def run(self): - try: - if os.path.exists(SOCKET_FILE): - os.remove(SOCKET_FILE) - else: - dir = os.path.dirname(SOCKET_FILE) - if os.path.exists(dir): - if not os.path.isdir(dir): - log("The socket file path (%s) exists, but is not a directory. No local connections will be available" % dir) - return - else: - # make sure the directory is accessible only to this - # user, as socket files derive their permissions from - # the directory that contains them - os.chmod(dir, int('0700', 8)) - elif not os.path.exists(dir): - os.makedirs(dir, int('O700', 8)) - except OSError: - pass - self.s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.s.bind(SOCKET_FILE) - self.s.listen(5) - while not self.terminated: - try: - conn, addr = self.s.accept() - vv("received local connection") - data = "" - while "\n" not in data: - data += conn.recv(2048) - try: - try: - new_key = AesKey.Read(data.strip()) - found = False - for key in self.server.key_list: - try: - new_key.Decrypt(key.Encrypt("foo")) - found = True - break - except: - pass - if not found: - vv("adding new key to the key list") - self.server.key_list.append(new_key) - conn.sendall("OK\n") - else: - vv("key already exists in the key list, ignoring") - conn.sendall("EXISTS\n") - - # update the last event time so the server doesn't - # shutdown sooner than expected for new clients - try: - self.server.last_event_lock.acquire() - self.server.last_event = datetime.datetime.now() - finally: - self.server.last_event_lock.release() - except Exception as e: - vv("key loaded locally was invalid, ignoring (%s)" % e) - conn.sendall("BADKEY\n") - finally: - try: - conn.close() - except: - pass - except: - pass - - def terminate(self): - super(LocalSocketThread, self).terminate() - self.terminated = True - self.s.shutdown(socket.SHUT_RDWR) - self.s.close() - - -class ThreadWithReturnValue(Thread): - def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, Verbose=None): - kwargs = {} if kwargs is None else kwargs - - Thread.__init__(self, group, target, name, args, kwargs, Verbose) - self._return = None - - def run(self): - if self._Thread__target is not None: - self._return = self._Thread__target(*self._Thread__args, - **self._Thread__kwargs) - - def join(self, timeout=None): - Thread.join(self, timeout=timeout) - return self._return - - -class ThreadedTCPServer(socketserver.ThreadingTCPServer): - key_list = [] - last_event = datetime.datetime.now() - last_event_lock = Lock() - - def __init__(self, server_address, RequestHandlerClass, module, password, timeout, use_ipv6=False): - self.module = module - self.key_list.append(AesKey.Read(password)) - self.allow_reuse_address = True - self.timeout = timeout - - if use_ipv6: - self.address_family = socket.AF_INET6 - - if self.module.params.get('multi_key', False): - vv("starting thread to handle local connections for multiple keys") - self.local_thread = LocalSocketThread(kwargs=dict(server=self)) - self.local_thread.start() - - socketserver.ThreadingTCPServer.__init__(self, server_address, RequestHandlerClass) - - def shutdown(self): - self.running = False - socketserver.ThreadingTCPServer.shutdown(self) - - -class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): - # the key to use for this connection - active_key = None - - def send_data(self, data): - try: - self.server.last_event_lock.acquire() - self.server.last_event = datetime.datetime.now() - finally: - self.server.last_event_lock.release() - - packed_len = struct.pack('!Q', len(data)) - return self.request.sendall(packed_len + data) - - def recv_data(self): - header_len = 8 # size of a packed unsigned long long - data = "" - vvvv("in recv_data(), waiting for the header") - while len(data) < header_len: - try: - d = self.request.recv(header_len - len(data)) - if not d: - vvv("received nothing, bailing out") - return None - data += d - except: - # probably got a connection reset - vvvv("exception received while waiting for recv(), returning None") - return None - vvvv("in recv_data(), got the header, unpacking") - data_len = struct.unpack('!Q', data[:header_len])[0] - data = data[header_len:] - vvvv("data received so far (expecting %d): %d" % (data_len, len(data))) - while len(data) < data_len: - try: - d = self.request.recv(data_len - len(data)) - if not d: - vvv("received nothing, bailing out") - return None - data += d - vvvv("data received so far (expecting %d): %d" % (data_len, len(data))) - except: - # probably got a connection reset - vvvv("exception received while waiting for recv(), returning None") - return None - vvvv("received all of the data, returning") - - try: - self.server.last_event_lock.acquire() - self.server.last_event = datetime.datetime.now() - finally: - self.server.last_event_lock.release() - - return data - - def handle(self): - try: - while True: - vvvv("waiting for data") - data = self.recv_data() - if not data: - vvvv("received nothing back from recv_data(), breaking out") - break - vvvv("got data, decrypting") - if not self.active_key: - for key in self.server.key_list: - try: - data = key.Decrypt(data) - self.active_key = key - break - except: - pass - else: - vv("bad decrypt, exiting the connection handler") - return - else: - try: - data = self.active_key.Decrypt(data) - except: - vv("bad decrypt, exiting the connection handler") - return - - vvvv("decryption done, loading json from the data") - data = json.loads(data) - - mode = data['mode'] - response = {} - last_pong = datetime.datetime.now() - if mode == 'command': - vvvv("received a command request, running it") - twrv = ThreadWithReturnValue(target=self.command, args=(data,)) - twrv.start() - response = None - while twrv.is_alive(): - if (datetime.datetime.now() - last_pong).seconds >= 15: - last_pong = datetime.datetime.now() - vvvv("command still running, sending keepalive packet") - data2 = json.dumps(dict(pong=True)) - data2 = self.active_key.Encrypt(data2) - self.send_data(data2) - time.sleep(0.1) - response = twrv._return - vvvv("thread is done, response from join was %s" % response) - elif mode == 'put': - vvvv("received a put request, putting it") - response = self.put(data) - elif mode == 'fetch': - vvvv("received a fetch request, getting it") - response = self.fetch(data) - elif mode == 'validate_user': - vvvv("received a request to validate the user id") - response = self.validate_user(data) - - vvvv("response result is %s" % str(response)) - json_response = json.dumps(response) - vvvv("dumped json is %s" % json_response) - data2 = self.active_key.Encrypt(json_response) - vvvv("sending the response back to the controller") - self.send_data(data2) - vvvv("done sending the response") - - if mode == 'validate_user' and response.get('rc') == 1: - vvvv("detected a uid mismatch, shutting down") - self.server.shutdown() - except: - tb = traceback.format_exc() - log("encountered an unhandled exception in the handle() function") - log("error was:\n%s" % tb) - if self.active_key: - data2 = json.dumps(dict(rc=1, failed=True, msg="unhandled error in the handle() function")) - data2 = self.active_key.Encrypt(data2) - self.send_data(data2) - - def validate_user(self, data): - if 'username' not in data: - return dict(failed=True, msg='No username specified') - - vvvv("validating we're running as %s" % data['username']) - - # get the current uid - c_uid = os.getuid() - try: - # the target uid - t_uid = pwd.getpwnam(data['username']).pw_uid - except: - vvvv("could not find user %s" % data['username']) - return dict(failed=True, msg='could not find user %s' % data['username']) - - # and return rc=0 for success, rc=1 for failure - if c_uid == t_uid: - return dict(rc=0) - else: - return dict(rc=1) - - def command(self, data): - if 'cmd' not in data: - return dict(failed=True, msg='internal error: cmd is required') - - vvvv("executing: %s" % data['cmd']) - - use_unsafe_shell = False - executable = data.get('executable') - if executable: - use_unsafe_shell = True - - rc, stdout, stderr = self.server.module.run_command(data['cmd'], executable=executable, use_unsafe_shell=use_unsafe_shell, close_fds=True) - if stdout is None: - stdout = '' - if stderr is None: - stderr = '' - vvvv("got stdout: %s" % stdout) - vvvv("got stderr: %s" % stderr) - - return dict(rc=rc, stdout=stdout, stderr=stderr) - - def fetch(self, data): - if 'in_path' not in data: - return dict(failed=True, msg='internal error: in_path is required') - - try: - fd = open(data['in_path'], 'rb') - fstat = os.stat(data['in_path']) - vvv("FETCH file is %d bytes" % fstat.st_size) - while fd.tell() < fstat.st_size: - data = fd.read(CHUNK_SIZE) - last = False - if fd.tell() >= fstat.st_size: - last = True - data = dict(data=base64.b64encode(data), last=last) - data = json.dumps(data) - data = self.active_key.Encrypt(data) - - if self.send_data(data): - return dict(failed=True, stderr="failed to send data") - - response = self.recv_data() - if not response: - log("failed to get a response, aborting") - return dict(failed=True, stderr="Failed to get a response from %s" % self.host) - response = self.active_key.Decrypt(response) - response = json.loads(response) - - if response.get('failed', False): - log("got a failed response from the master") - return dict(failed=True, stderr="Master reported failure, aborting transfer") - except Exception as e: - fd.close() - tb = traceback.format_exc() - log("failed to fetch the file: %s" % tb) - return dict(failed=True, stderr="Could not fetch the file: %s" % e) - - fd.close() - return dict() - - def put(self, data): - if 'data' not in data: - return dict(failed=True, msg='internal error: data is required') - if 'out_path' not in data: - return dict(failed=True, msg='internal error: out_path is required') - - final_path = None - if 'user' in data and data.get('user') != getpass.getuser(): - vvv("the target user doesn't match this user, we'll move the file into place via sudo") - tmp_path = os.path.expanduser('~/.ansible/tmp/') - if not os.path.exists(tmp_path): - try: - os.makedirs(tmp_path, int('O700', 8)) - except: - return dict(failed=True, msg='could not create a temporary directory at %s' % tmp_path) - (fd, out_path) = tempfile.mkstemp(prefix='ansible.', dir=tmp_path) - out_fd = os.fdopen(fd, 'w', 0) - final_path = data['out_path'] - else: - out_path = data['out_path'] - out_fd = open(out_path, 'w') - - try: - bytes = 0 - while True: - out = base64.b64decode(data['data']) - bytes += len(out) - out_fd.write(out) - response = json.dumps(dict()) - response = self.active_key.Encrypt(response) - self.send_data(response) - if data['last']: - break - data = self.recv_data() - if not data: - raise "" - data = self.active_key.Decrypt(data) - data = json.loads(data) - except: - out_fd.close() - tb = traceback.format_exc() - log("failed to put the file: %s" % tb) - return dict(failed=True, stdout="Could not write the file") - - vvvv("wrote %d bytes" % bytes) - out_fd.close() - - if final_path: - vvv("moving %s to %s" % (out_path, final_path)) - self.server.module.atomic_move(out_path, final_path) - return dict() - - -def daemonize(module, password, port, timeout, minutes, use_ipv6, pid_file): - try: - daemonize_self(module, password, port, minutes, pid_file) - - def timer_handler(signum, _): - try: - try: - server.last_event_lock.acquire() - td = datetime.datetime.now() - server.last_event - # older python timedelta objects don't have total_seconds(), - # so we use the formula from the docs to calculate it - total_seconds = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 - if total_seconds >= minutes * 60: - log("server has been idle longer than the timeout, shutting down") - server.running = False - server.shutdown() - else: - # reschedule the check - signal.alarm(1) - except: - pass - finally: - server.last_event_lock.release() - - signal.signal(signal.SIGALRM, timer_handler) - signal.alarm(1) - - tries = 5 - while tries > 0: - try: - if use_ipv6: - address = ("::", port) - else: - address = ("0.0.0.0", port) - server = ThreadedTCPServer(address, ThreadedTCPRequestHandler, module, password, timeout, use_ipv6=use_ipv6) - server.allow_reuse_address = True - break - except Exception as e: - vv("Failed to create the TCP server (tries left = %d) (error: %s) " % (tries, e)) - tries -= 1 - time.sleep(0.2) - - if tries == 0: - vv("Maximum number of attempts to create the TCP server reached, bailing out") - raise Exception("max # of attempts to serve reached") - - # run the server in a separate thread to make signal handling work - server_thread = Thread(target=server.serve_forever, kwargs=dict(poll_interval=0.1)) - server_thread.start() - server.running = True - - v("serving!") - while server.running: - time.sleep(1) - - # wait for the thread to exit fully - server_thread.join() - - v("server thread terminated, exiting!") - sys.exit(0) - except Exception as e: - tb = traceback.format_exc() - log("exception caught, exiting accelerated mode: %s\n%s" % (e, tb)) - sys.exit(0) - - -def main(): - global DEBUG_LEVEL - module = AnsibleModule( - argument_spec=dict( - port=dict(type='int', default=5099), - ipv6=dict(type='bool', default=False), - multi_key=dict(type='bool', default=False), - timeout=dict(type='int', default=300), - password=dict(type='str', required=True, no_log=True), - minutes=dict(type='int', default=30), - debug=dict(type='int', default=0) - ), - supports_check_mode=True - ) - - syslog.openlog('ansible-%s' % module._name) - - password = base64.b64decode(module.params['password']) - port = int(module.params['port']) - timeout = int(module.params['timeout']) - minutes = int(module.params['minutes']) - debug = int(module.params['debug']) - ipv6 = module.params['ipv6'] - multi_key = module.params['multi_key'] - - if not HAS_KEYCZAR: - module.fail_json(msg="keyczar is not installed (on the remote side)") - - DEBUG_LEVEL = debug - pid_file = get_pid_location(module) - - daemon_pid = None - daemon_running = False - if os.path.exists(pid_file): - try: - daemon_pid = int(open(pid_file).read()) - try: - # sending signal 0 doesn't do anything to the - # process, other than tell the calling program - # whether other signals can be sent - os.kill(daemon_pid, 0) - except OSError as e: - message = 'the accelerate daemon appears to be running as a different user that this user cannot access pid=%s' % daemon_pid - - if e.errno == errno.EPERM: - # no permissions means the pid is probably - # running, but as a different user, so fail - module.fail_json(msg=message) - else: - daemon_running = True - except ValueError: - # invalid pid file, unlink it - otherwise we don't care - try: - os.unlink(pid_file) - except: - pass - - if daemon_running and multi_key: - # try to connect to the file socket for the daemon if it exists - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - try: - s.connect(SOCKET_FILE) - s.sendall(password + '\n') - data = "" - while '\n' not in data: - data += s.recv(2048) - res = data.strip() - except: - module.fail_json(msg="failed to connect to the local socket file") - finally: - try: - s.close() - except: - pass - - if res in ("OK", "EXISTS"): - module.exit_json(msg="transferred new key to the existing daemon") - else: - module.fail_json(msg="could not transfer new key: %s" % data.strip()) - else: - # try to start up the daemon - daemonize(module, password, port, timeout, minutes, ipv6, pid_file) - - -if __name__ == '__main__': - main() diff --git a/lib/ansible/playbook/play.py b/lib/ansible/playbook/play.py index b9b5e3d3ac..2fe4a9020d 100644 --- a/lib/ansible/playbook/play.py +++ b/lib/ansible/playbook/play.py @@ -55,18 +55,13 @@ class Play(Base, Taggable, Become): # ================================================================================= _name = FieldAttribute(isa='string', default='', always_post_validate=True) + _hosts = FieldAttribute(isa='list', required=True, listof=string_types, always_post_validate=True) - # TODO: generalize connection - _accelerate = FieldAttribute(isa='bool', default=False, always_post_validate=True) - _accelerate_ipv6 = FieldAttribute(isa='bool', default=False, always_post_validate=True) - _accelerate_port = FieldAttribute(isa='int', default=5099, always_post_validate=True) - - # Connection + # Facts _fact_path = FieldAttribute(isa='string', default=None) _gather_facts = FieldAttribute(isa='bool', default=None, always_post_validate=True) _gather_subset = FieldAttribute(isa='barelist', default=None, always_post_validate=True) _gather_timeout = FieldAttribute(isa='int', default=None, always_post_validate=True) - _hosts = FieldAttribute(isa='list', required=True, listof=string_types, always_post_validate=True) # Variable Attributes _vars_files = FieldAttribute(isa='list', default=[], priority=99) diff --git a/lib/ansible/playbook/play_context.py b/lib/ansible/playbook/play_context.py index 42fa67293b..bf764e5196 100644 --- a/lib/ansible/playbook/play_context.py +++ b/lib/ansible/playbook/play_context.py @@ -55,7 +55,6 @@ __all__ = ['PlayContext'] # in variable names. MAGIC_VARIABLE_MAPPING = dict( - accelerate_port=('ansible_accelerate_port', ), # base connection=('ansible_connection', ), @@ -217,11 +216,6 @@ class PlayContext(Base): # ??? _connection_lockfd = FieldAttribute(isa='int') - # accelerate FIXME: remove as soon as deprecation period expires - _accelerate = FieldAttribute(isa='bool', default=False) - _accelerate_ipv6 = FieldAttribute(isa='bool', default=False, always_post_validate=True) - _accelerate_port = FieldAttribute(isa='int', default=C.ACCELERATE_PORT, always_post_validate=True) - # privilege escalation fields _become = FieldAttribute(isa='bool') _become_method = FieldAttribute(isa='string') @@ -281,12 +275,6 @@ class PlayContext(Base): the play class. ''' - # special handling for accelerated mode, as it is set in a separate - # play option from the connection parameter - self.accelerate = play.accelerate - self.accelerate_ipv6 = play.accelerate_ipv6 - self.accelerate_port = play.accelerate_port - if play.connection: self.connection = play.connection diff --git a/lib/ansible/plugins/action/__init__.py b/lib/ansible/plugins/action/__init__.py index 00dd23fa07..d1241a9f99 100644 --- a/lib/ansible/plugins/action/__init__.py +++ b/lib/ansible/plugins/action/__init__.py @@ -722,11 +722,6 @@ class ActionBase(with_metaclass(ABCMeta, object)): cmd = self._connection._shell.build_module_command(environment_string, shebang, cmd, arg_path=args_file_path, rm_tmp=rm_tmp).strip() - if module_name == "accelerate": - # always run the accelerate module as the user - # specified in the play, not the sudo_user - sudoable = False - # Fix permissions of the tmp path and tmp files. This should be called after all files have been transferred. if remote_files: # remove none/empty diff --git a/lib/ansible/plugins/connection/accelerate.py b/lib/ansible/plugins/connection/accelerate.py deleted file mode 100644 index 4e6e3884e7..0000000000 --- a/lib/ansible/plugins/connection/accelerate.py +++ /dev/null @@ -1,327 +0,0 @@ -# (c) 2012, Michael DeHaan -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -DOCUMENTATION = """ - author: Ansible Core Team - connection: accelerate - short_description: Temporary 0mq agent - description: - - This plugin uses one of the other ssh plugins to setup a temporary 0mq daemon on the target to execute subsequent tasks - deprecated: - why: paramiko and ssh + controlpersist perform the same or better without the problems of having an agent. - version: 2.5 - alternative: paramiko and ssh with conrol persistence. -""" - -import base64 -import json -import os -import socket -import struct -import time - -from ansible import constants as C -from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleConnectionFailure -from ansible.module_utils._text import to_bytes -from ansible.parsing.utils.jsonify import jsonify -from ansible.plugins.connection import ConnectionBase -from ansible.utils.encrypt import key_for_hostname, keyczar_encrypt, keyczar_decrypt - - -try: - from __main__ import display -except ImportError: - from ansible.utils.display import Display - display = Display() - -# the chunk size to read and send, assuming mtu 1500 and -# leaving room for base64 (+33%) encoding and header (8 bytes) -# ((1400-8)/4)*3) = 1044 -# which leaves room for the TCP/IP header. We set this to a -# multiple of the value to speed up file reads. -CHUNK_SIZE = 1044 * 20 - - -class Connection(ConnectionBase): - ''' raw socket accelerated connection ''' - - transport = 'accelerate' - has_pipelining = False - become_methods = frozenset(C.BECOME_METHODS).difference(['runas']) - - def __init__(self, *args, **kwargs): - - super(Connection, self).__init__(*args, **kwargs) - - self.conn = None - self.key = key_for_hostname(self._play_context.remote_addr) - - def _connect(self): - ''' activates the connection object ''' - - if not self._connected: - wrong_user = False - tries = 3 - self.conn = socket.socket() - self.conn.settimeout(C.ACCELERATE_CONNECT_TIMEOUT) - display.vvvv("attempting connection to %s via the accelerated port %d" % (self._play_context.remote_addr, self._play_context.accelerate_port), - host=self._play_context.remote_addr) - while tries > 0: - try: - self.conn.connect((self._play_context.remote_addr, self._play_context.accelerate_port)) - break - except socket.error: - display.vvvv("connection to %s failed, retrying..." % self._play_context.remote_addr, host=self._play_context.remote_addr) - time.sleep(0.1) - tries -= 1 - if tries == 0: - display.vvv("Could not connect via the accelerated connection, exceeded # of tries", host=self._play_context.remote_addr) - raise AnsibleConnectionFailure("Failed to connect to %s on the accelerated port %s" % (self._play_context.remote_addr, - self._play_context.accelerate_port)) - elif wrong_user: - display.vvv("Restarting daemon with a different remote_user", host=self._play_context.remote_addr) - raise AnsibleError("The accelerated daemon was started on the remote with a different user") - - self.conn.settimeout(C.ACCELERATE_TIMEOUT) - if not self.validate_user(): - # the accelerated daemon was started with a - # different remote_user. The above command - # should have caused the accelerate daemon to - # shutdown, so we'll reconnect. - wrong_user = True - - self._connected = True - return self - - def transport_test(self, connect_timeout): - ''' Test the transport mechanism, if available ''' - host = self._play_context.remote_addr - port = int(self._play_context.accelerate_port or 5099) - display.vvv("attempting transport test to %s:%s" % (host, port)) - sock = socket.create_connection((host, port), connect_timeout) - sock.close() - - def send_data(self, data): - packed_len = struct.pack('!Q', len(data)) - return self.conn.sendall(packed_len + data) - - def recv_data(self): - header_len = 8 # size of a packed unsigned long long - data = b"" - try: - display.vvvv("in recv_data(), waiting for the header", host=self._play_context.remote_addr) - while len(data) < header_len: - d = self.conn.recv(header_len - len(data)) - if not d: - display.vvvv("received nothing, bailing out", host=self._play_context.remote_addr) - return None - data += d - display.vvvv("got the header, unpacking", host=self._play_context.remote_addr) - data_len = struct.unpack('!Q', data[:header_len])[0] - data = data[header_len:] - display.vvvv("data received so far (expecting %d): %d" % (data_len, len(data)), host=self._play_context.remote_addr) - while len(data) < data_len: - d = self.conn.recv(data_len - len(data)) - if not d: - display.vvvv("received nothing, bailing out", host=self._play_context.remote_addr) - return None - display.vvvv("received %d bytes" % (len(d)), host=self._play_context.remote_addr) - data += d - display.vvvv("received all of the data, returning", host=self._play_context.remote_addr) - return data - except socket.timeout: - raise AnsibleError("timed out while waiting to receive data") - - def validate_user(self): - ''' - Checks the remote uid of the accelerated daemon vs. the - one specified for this play and will cause the accel - daemon to exit if they don't match - ''' - - display.vvvv("sending request for validate_user", host=self._play_context.remote_addr) - data = dict( - mode='validate_user', - username=self._play_context.remote_user, - ) - data = jsonify(data) - data = keyczar_encrypt(self.key, data) - if self.send_data(data): - raise AnsibleError("Failed to send command to %s" % self._play_context.remote_addr) - - display.vvvv("waiting for validate_user response", host=self._play_context.remote_addr) - while True: - # we loop here while waiting for the response, because a - # long running command may cause us to receive keepalive packets - # ({"pong":"true"}) rather than the response we want. - response = self.recv_data() - if not response: - raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr) - response = keyczar_decrypt(self.key, response) - response = json.loads(response) - if "pong" in response: - # it's a keepalive, go back to waiting - display.vvvv("received a keepalive packet", host=self._play_context.remote_addr) - continue - else: - display.vvvv("received the validate_user response: %s" % (response), host=self._play_context.remote_addr) - break - - if response.get('failed'): - return False - else: - return response.get('rc') == 0 - - def exec_command(self, cmd, in_data=None, sudoable=True): - - ''' run a command on the remote host ''' - - super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable) - - if in_data: - raise AnsibleError("Internal Error: this module does not support optimized module pipelining") - - display.vvv("EXEC COMMAND %s" % cmd, host=self._play_context.remote_addr) - - data = dict( - mode='command', - cmd=cmd, - executable=C.DEFAULT_EXECUTABLE, - ) - data = jsonify(data) - data = keyczar_encrypt(self.key, data) - if self.send_data(data): - raise AnsibleError("Failed to send command to %s" % self._play_context.remote_addr) - - while True: - # we loop here while waiting for the response, because a - # long running command may cause us to receive keepalive packets - # ({"pong":"true"}) rather than the response we want. - response = self.recv_data() - if not response: - raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr) - response = keyczar_decrypt(self.key, response) - response = json.loads(response) - if "pong" in response: - # it's a keepalive, go back to waiting - display.vvvv("received a keepalive packet", host=self._play_context.remote_addr) - continue - else: - display.vvvv("received the response", host=self._play_context.remote_addr) - break - - return (response.get('rc', None), response.get('stdout', ''), response.get('stderr', '')) - - def put_file(self, in_path, out_path): - - ''' transfer a file from local to remote ''' - display.vvv("PUT %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr) - - in_path = to_bytes(in_path, errors='surrogate_or_strict') - - if not os.path.exists(in_path): - raise AnsibleFileNotFound("file or module does not exist: %s" % in_path) - - fd = open(in_path, 'rb') - fstat = os.stat(in_path) - try: - display.vvv("PUT file is %d bytes" % fstat.st_size, host=self._play_context.remote_addr) - last = False - while fd.tell() <= fstat.st_size and not last: - display.vvvv("file position currently %ld, file size is %ld" % (fd.tell(), fstat.st_size), host=self._play_context.remote_addr) - data = fd.read(CHUNK_SIZE) - if fd.tell() >= fstat.st_size: - last = True - data = dict(mode='put', data=base64.b64encode(data), out_path=out_path, last=last) - if self._play_context.become: - data['user'] = self._play_context.become_user - data = jsonify(data) - data = keyczar_encrypt(self.key, data) - - if self.send_data(data): - raise AnsibleError("failed to send the file to %s" % self._play_context.remote_addr) - - response = self.recv_data() - if not response: - raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr) - response = keyczar_decrypt(self.key, response) - response = json.loads(response) - - if response.get('failed', False): - raise AnsibleError("failed to put the file in the requested location") - finally: - fd.close() - display.vvvv("waiting for final response after PUT", host=self._play_context.remote_addr) - response = self.recv_data() - if not response: - raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr) - response = keyczar_decrypt(self.key, response) - response = json.loads(response) - - if response.get('failed', False): - raise AnsibleError("failed to put the file in the requested location") - - def fetch_file(self, in_path, out_path): - ''' save a remote file to the specified path ''' - display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr) - - data = dict(mode='fetch', in_path=in_path) - data = jsonify(data) - data = keyczar_encrypt(self.key, data) - if self.send_data(data): - raise AnsibleError("failed to initiate the file fetch with %s" % self._play_context.remote_addr) - - fh = open(to_bytes(out_path, errors='surrogate_or_strict'), "w") - try: - bytes = 0 - while True: - response = self.recv_data() - if not response: - raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr) - response = keyczar_decrypt(self.key, response) - response = json.loads(response) - if response.get('failed', False): - raise AnsibleError("Error during file fetch, aborting") - out = base64.b64decode(response['data']) - fh.write(out) - bytes += len(out) - # send an empty response back to signify we - # received the last chunk without errors - data = jsonify(dict()) - data = keyczar_encrypt(self.key, data) - if self.send_data(data): - raise AnsibleError("failed to send ack during file fetch") - if response.get('last', False): - break - finally: - # we don't currently care about this final response, - # we just receive it and drop it. It may be used at some - # point in the future or we may just have the put/fetch - # operations not send back a final response at all - response = self.recv_data() - display.vvv("FETCH wrote %d bytes to %s" % (bytes, out_path), host=self._play_context.remote_addr) - fh.close() - - def close(self): - ''' terminate the connection ''' - # Be a good citizen - try: - self.conn.close() - except: - pass diff --git a/lib/ansible/utils/encrypt.py b/lib/ansible/utils/encrypt.py index de156d00e5..b9576dcb32 100644 --- a/lib/ansible/utils/encrypt.py +++ b/lib/ansible/utils/encrypt.py @@ -1,28 +1,10 @@ # (c) 2012-2014, Michael DeHaan -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import (absolute_import, division, print_function) __metaclass__ = type import multiprocessing -import os -import stat -import tempfile -import time -import warnings import random from ansible import constants as C @@ -44,37 +26,12 @@ except ImportError: from ansible.utils.display import Display display = Display() -KEYCZAR_AVAILABLE = False -try: - try: - # some versions of pycrypto may not have this? - from Crypto.pct_warnings import PowmInsecureWarning - except ImportError: - PowmInsecureWarning = RuntimeWarning - - with warnings.catch_warnings(record=True) as warning_handler: - warnings.simplefilter("error", PowmInsecureWarning) - try: - import keyczar.errors as key_errors - from keyczar.keys import AesKey - except PowmInsecureWarning: - display.system_warning( - "The version of gmp you have installed has a known issue regarding " - "timing vulnerabilities when used with pycrypto. " - "If possible, you should update it (i.e. yum update gmp)." - ) - warnings.resetwarnings() - warnings.simplefilter("ignore") - import keyczar.errors as key_errors - from keyczar.keys import AesKey - KEYCZAR_AVAILABLE = True -except ImportError: - pass - __all__ = ['do_encrypt'] _LOCK = multiprocessing.Lock() +DEFAULT_PASSWORD_LENGTH = 20 + def do_encrypt(result, encrypt, salt_size=None, salt=None): if PASSLIB_AVAILABLE: @@ -103,73 +60,6 @@ def do_encrypt(result, encrypt, salt_size=None, salt=None): return to_text(result, errors='strict') -def key_for_hostname(hostname): - # fireball mode is an implementation of ansible firing up zeromq via SSH - # to use no persistent daemons or key management - - if not KEYCZAR_AVAILABLE: - raise AnsibleError("python-keyczar must be installed on the control machine to use accelerated modes") - - key_path = os.path.expanduser(C.ACCELERATE_KEYS_DIR) - if not os.path.exists(key_path): - # avoid race with multiple forks trying to create paths on host - # but limit when locking is needed to creation only - with(_LOCK): - if not os.path.exists(key_path): - # use a temp directory and rename to ensure the directory - # searched for only appears after permissions applied. - tmp_dir = tempfile.mkdtemp(dir=os.path.dirname(key_path)) - os.chmod(tmp_dir, int(C.ACCELERATE_KEYS_DIR_PERMS, 8)) - os.rename(tmp_dir, key_path) - elif not os.path.isdir(key_path): - raise AnsibleError('ACCELERATE_KEYS_DIR is not a directory.') - - if stat.S_IMODE(os.stat(key_path).st_mode) != int(C.ACCELERATE_KEYS_DIR_PERMS, 8): - raise AnsibleError('Incorrect permissions on the private key directory. Use `chmod 0%o %s` to correct this issue, and make sure any of the keys files ' - 'contained within that directory are set to 0%o' % (int(C.ACCELERATE_KEYS_DIR_PERMS, 8), C.ACCELERATE_KEYS_DIR, - int(C.ACCELERATE_KEYS_FILE_PERMS, 8))) - - key_path = os.path.join(key_path, hostname) - - # use new AES keys every 2 hours, which means fireball must not allow running for longer either - if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60 * 60 * 2): - # avoid race with multiple forks trying to create key - # but limit when locking is needed to creation only - with(_LOCK): - if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60 * 60 * 2): - key = AesKey.Generate() - # use temp file to ensure file only appears once it has - # desired contents and permissions - with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(key_path), delete=False) as fh: - tmp_key_path = fh.name - fh.write(str(key)) - os.chmod(tmp_key_path, int(C.ACCELERATE_KEYS_FILE_PERMS, 8)) - os.rename(tmp_key_path, key_path) - return key - - if stat.S_IMODE(os.stat(key_path).st_mode) != int(C.ACCELERATE_KEYS_FILE_PERMS, 8): - raise AnsibleError('Incorrect permissions on the key file for this host. Use `chmod 0%o %s` to ' - 'correct this issue.' % (int(C.ACCELERATE_KEYS_FILE_PERMS, 8), key_path)) - fh = open(key_path) - key = AesKey.Read(fh.read()) - fh.close() - return key - - -def keyczar_encrypt(key, msg): - return key.Encrypt(msg.encode('utf-8')) - - -def keyczar_decrypt(key, msg): - try: - return key.Decrypt(msg) - except key_errors.InvalidSignatureError: - raise AnsibleError("decryption failed") - - -DEFAULT_PASSWORD_LENGTH = 20 - - def random_password(length=DEFAULT_PASSWORD_LENGTH, chars=C.DEFAULT_PASSWORD_CHARS): '''Return a random password string of length containing only chars