From 5887e96b270b2f0d19d3e34a09e16e21824c4887 Mon Sep 17 00:00:00 2001 From: Abhijit Menon-Sen Date: Sat, 1 Aug 2015 09:26:07 +0530 Subject: [PATCH 1/3] Introduce a connection locking infrastructure The lock file is (a temporary file) opened in the parent process, whose open fd is inherited by the workers after fork, and passed down through the PlayContext. Connection grows lock/unlock methods which can be used by individual connection plugins. --- lib/ansible/executor/task_queue_manager.py | 5 +++++ lib/ansible/playbook/play_context.py | 6 ++++++ lib/ansible/plugins/connections/__init__.py | 10 ++++++++++ 3 files changed, 21 insertions(+) diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 59f48142b1..1189e35f91 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -23,6 +23,7 @@ import multiprocessing import os import socket import sys +import tempfile from ansible import constants as C from ansible.errors import AnsibleError @@ -78,6 +79,10 @@ class TaskQueueManager: self._failed_hosts = dict() self._unreachable_hosts = dict() + # A temporary file (opened pre-fork) used by connection plugins for + # inter-process locking. + self._options.connection_lockfile = tempfile.TemporaryFile() + self._final_q = multiprocessing.Queue() # create the pool of worker threads, based on the number of forks specified diff --git a/lib/ansible/playbook/play_context.py b/lib/ansible/playbook/play_context.py index 355efbaf26..d4a184fa61 100644 --- a/lib/ansible/playbook/play_context.py +++ b/lib/ansible/playbook/play_context.py @@ -161,6 +161,7 @@ class PlayContext(Base): _private_key_file = FieldAttribute(isa='string', default=C.DEFAULT_PRIVATE_KEY_FILE) _timeout = FieldAttribute(isa='int', default=C.DEFAULT_TIMEOUT) _shell = FieldAttribute(isa='string') + _connection_lockfd= FieldAttribute(isa='int', default=None) # privilege escalation fields _become = FieldAttribute(isa='bool') @@ -244,6 +245,11 @@ class PlayContext(Base): if options.connection: self.connection = options.connection + # The lock file is opened in the parent process, and the workers will + # inherit the open file, so we just need to help them find it. + if options.connection_lockfile: + self.connection_lockfd = options.connection_lockfile.fileno() + self.remote_user = options.remote_user self.private_key_file = options.private_key_file diff --git a/lib/ansible/plugins/connections/__init__.py b/lib/ansible/plugins/connections/__init__.py index 1ad2876381..5dfcf4c344 100644 --- a/lib/ansible/plugins/connections/__init__.py +++ b/lib/ansible/plugins/connections/__init__.py @@ -155,3 +155,13 @@ class ConnectionBase(with_metaclass(ABCMeta, object)): if incorrect_password in output: raise AnsibleError('Incorrect %s password' % self._play_context.become_method) + def lock_connection(self): + f = self._play_context.connection_lockfd + self._display.vvvv('CONNECTION: pid %d waiting for lock on %d' % (os.getpid(), f)) + fcntl.lockf(f, fcntl.LOCK_EX) + self._display.vvvv('CONNECTION: pid %d acquired lock on %d' % (os.getpid(), f)) + + def unlock_connection(self): + f = self._play_context.connection_lockfd + fcntl.lockf(f, fcntl.LOCK_UN) + self._display.vvvv('CONNECTION: pid %d released lock on %d' % (os.getpid(), f)) From 9378c8e2dae0e50a186181c50b3e462fcfa0fdaa Mon Sep 17 00:00:00 2001 From: Abhijit Menon-Sen Date: Wed, 2 Sep 2015 22:52:35 +0530 Subject: [PATCH 2/3] Make the paramiko plugin use locking --- .../plugins/connections/paramiko_ssh.py | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/ansible/plugins/connections/paramiko_ssh.py b/lib/ansible/plugins/connections/paramiko_ssh.py index df97a6e3a5..214e6b5a03 100644 --- a/lib/ansible/plugins/connections/paramiko_ssh.py +++ b/lib/ansible/plugins/connections/paramiko_ssh.py @@ -71,16 +71,15 @@ class MyAddPolicy(object): local L{HostKeys} object, and saving it. This is used by L{SSHClient}. """ - def __init__(self, new_stdin): + def __init__(self, new_stdin, connection): self._new_stdin = new_stdin + self.connection = connection def missing_host_key(self, client, hostname, key): if C.HOST_KEY_CHECKING: - # FIXME: need to fix lock file stuff - #fcntl.lockf(self.runner.process_lockfile, fcntl.LOCK_EX) - #fcntl.lockf(self.runner.output_lockfile, fcntl.LOCK_EX) + self.connection.lock_connection() old_stdin = sys.stdin sys.stdin = self._new_stdin @@ -94,17 +93,11 @@ class MyAddPolicy(object): inp = raw_input(AUTHENTICITY_MSG % (hostname, ktype, fingerprint)) sys.stdin = old_stdin + self.connection.unlock_connection() + if inp not in ['yes','y','']: - # FIXME: lock file stuff - #fcntl.flock(self.runner.output_lockfile, fcntl.LOCK_UN) - #fcntl.flock(self.runner.process_lockfile, fcntl.LOCK_UN) raise AnsibleError("host connection rejected by user") - # FIXME: lock file stuff - #fcntl.lockf(self.runner.output_lockfile, fcntl.LOCK_UN) - #fcntl.lockf(self.runner.process_lockfile, fcntl.LOCK_UN) - - key._added_by_ansible_this_time = True # existing implementation below: @@ -159,7 +152,7 @@ class Connection(ConnectionBase): pass # file was not found, but not required to function ssh.load_system_host_keys() - ssh.set_missing_host_key_policy(MyAddPolicy(self._new_stdin)) + ssh.set_missing_host_key_policy(MyAddPolicy(self._new_stdin, self)) allow_agent = True @@ -365,6 +358,9 @@ class Connection(ConnectionBase): if C.HOST_KEY_CHECKING and C.PARAMIKO_RECORD_HOST_KEYS and self._any_keys_added(): # add any new SSH host keys -- warning -- this could be slow + # (This doesn't acquire the connection lock because it needs + # to exclude only other known_hosts writers, not connections + # that are starting up.) lockfile = self.keyfile.replace("known_hosts",".known_hosts.lock") dirname = os.path.dirname(self.keyfile) makedirs_safe(dirname) From b9afbf0ee468c0deb87a4fe316b92546db3fa554 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Thu, 3 Sep 2015 00:18:52 -0400 Subject: [PATCH 3/3] Reorganizing the way the connection lockfile is created --- lib/ansible/executor/task_queue_manager.py | 5 ----- lib/ansible/playbook/play_context.py | 16 ++++++++++------ lib/ansible/plugins/connections/__init__.py | 14 +++++++------- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 1189e35f91..59f48142b1 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -23,7 +23,6 @@ import multiprocessing import os import socket import sys -import tempfile from ansible import constants as C from ansible.errors import AnsibleError @@ -79,10 +78,6 @@ class TaskQueueManager: self._failed_hosts = dict() self._unreachable_hosts = dict() - # A temporary file (opened pre-fork) used by connection plugins for - # inter-process locking. - self._options.connection_lockfile = tempfile.TemporaryFile() - self._final_q = multiprocessing.Queue() # create the pool of worker threads, based on the number of forks specified diff --git a/lib/ansible/playbook/play_context.py b/lib/ansible/playbook/play_context.py index d4a184fa61..5b56d2515b 100644 --- a/lib/ansible/playbook/play_context.py +++ b/lib/ansible/playbook/play_context.py @@ -24,6 +24,7 @@ __metaclass__ = type import pipes import random import re +import tempfile from ansible import constants as C from ansible.errors import AnsibleError @@ -161,7 +162,6 @@ class PlayContext(Base): _private_key_file = FieldAttribute(isa='string', default=C.DEFAULT_PRIVATE_KEY_FILE) _timeout = FieldAttribute(isa='int', default=C.DEFAULT_TIMEOUT) _shell = FieldAttribute(isa='string') - _connection_lockfd= FieldAttribute(isa='int', default=None) # privilege escalation fields _become = FieldAttribute(isa='bool') @@ -200,6 +200,10 @@ class PlayContext(Base): self.password = passwords.get('conn_pass','') self.become_pass = passwords.get('become_pass','') + # A temporary file (opened pre-fork) used by connection + # plugins for inter-process locking. + self.connection_lockf = tempfile.TemporaryFile() + # set options before play to allow play to override them if options: self.set_options(options) @@ -245,11 +249,6 @@ class PlayContext(Base): if options.connection: self.connection = options.connection - # The lock file is opened in the parent process, and the workers will - # inherit the open file, so we just need to help them find it. - if options.connection_lockfile: - self.connection_lockfd = options.connection_lockfile.fileno() - self.remote_user = options.remote_user self.private_key_file = options.private_key_file @@ -328,6 +327,11 @@ class PlayContext(Base): return new_info + def copy(self, exclude_block=False): + new_me = super(PlayContext, self).copy() + new_me.connection_lockf = self.connection_lockf + return new_me + def make_become_cmd(self, cmd, executable=None): """ helper function to create privilege escalation commands """ diff --git a/lib/ansible/plugins/connections/__init__.py b/lib/ansible/plugins/connections/__init__.py index 5dfcf4c344..0e4b3466e5 100644 --- a/lib/ansible/plugins/connections/__init__.py +++ b/lib/ansible/plugins/connections/__init__.py @@ -156,12 +156,12 @@ class ConnectionBase(with_metaclass(ABCMeta, object)): raise AnsibleError('Incorrect %s password' % self._play_context.become_method) def lock_connection(self): - f = self._play_context.connection_lockfd - self._display.vvvv('CONNECTION: pid %d waiting for lock on %d' % (os.getpid(), f)) - fcntl.lockf(f, fcntl.LOCK_EX) - self._display.vvvv('CONNECTION: pid %d acquired lock on %d' % (os.getpid(), f)) + f = self._play_context.connection_lockf + self._display.vvvv('CONNECTION: pid %d waiting for lock on %d' % (os.getpid(), f.fileno())) + fcntl.lockf(f.fileno(), fcntl.LOCK_EX) + self._display.vvvv('CONNECTION: pid %d acquired lock on %d' % (os.getpid(), f.fileno())) def unlock_connection(self): - f = self._play_context.connection_lockfd - fcntl.lockf(f, fcntl.LOCK_UN) - self._display.vvvv('CONNECTION: pid %d released lock on %d' % (os.getpid(), f)) + f = self._play_context.connection_lockf + fcntl.lockf(f.fileno(), fcntl.LOCK_UN) + self._display.vvvv('CONNECTION: pid %d released lock on %d' % (os.getpid(), f.fileno()))