mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Merge pull request #12106 from amenonsen/vault-cleanups
Vault cleanups, pass #1
This commit is contained in:
commit
111c0cc204
3 changed files with 50 additions and 77 deletions
|
@ -30,7 +30,6 @@ class VaultCLI(CLI):
|
||||||
""" Vault command line class """
|
""" Vault command line class """
|
||||||
|
|
||||||
VALID_ACTIONS = ("create", "decrypt", "edit", "encrypt", "rekey", "view")
|
VALID_ACTIONS = ("create", "decrypt", "edit", "encrypt", "rekey", "view")
|
||||||
CIPHER = 'AES256'
|
|
||||||
|
|
||||||
def __init__(self, args, display=None):
|
def __init__(self, args, display=None):
|
||||||
|
|
||||||
|
@ -84,6 +83,8 @@ class VaultCLI(CLI):
|
||||||
if not self.vault_pass:
|
if not self.vault_pass:
|
||||||
raise AnsibleOptionsError("A password is required to use Ansible's Vault")
|
raise AnsibleOptionsError("A password is required to use Ansible's Vault")
|
||||||
|
|
||||||
|
self.editor = VaultEditor(self.vault_pass)
|
||||||
|
|
||||||
self.execute()
|
self.execute()
|
||||||
|
|
||||||
def execute_create(self):
|
def execute_create(self):
|
||||||
|
@ -91,39 +92,30 @@ class VaultCLI(CLI):
|
||||||
if len(self.args) > 1:
|
if len(self.args) > 1:
|
||||||
raise AnsibleOptionsError("ansible-vault create can take only one filename argument")
|
raise AnsibleOptionsError("ansible-vault create can take only one filename argument")
|
||||||
|
|
||||||
cipher = getattr(self.options, 'cipher', self.CIPHER)
|
self.editor.create_file(self.args[0])
|
||||||
this_editor = VaultEditor(cipher, self.vault_pass, self.args[0])
|
|
||||||
this_editor.create_file()
|
|
||||||
|
|
||||||
def execute_decrypt(self):
|
def execute_decrypt(self):
|
||||||
|
|
||||||
cipher = getattr(self.options, 'cipher', self.CIPHER)
|
|
||||||
for f in self.args:
|
for f in self.args:
|
||||||
this_editor = VaultEditor(cipher, self.vault_pass, f)
|
self.editor.decrypt_file(f)
|
||||||
this_editor.decrypt_file()
|
|
||||||
|
|
||||||
self.display.display("Decryption successful")
|
self.display.display("Decryption successful", stderr=True)
|
||||||
|
|
||||||
def execute_edit(self):
|
def execute_edit(self):
|
||||||
|
|
||||||
for f in self.args:
|
for f in self.args:
|
||||||
this_editor = VaultEditor(None, self.vault_pass, f)
|
self.editor.edit_file(f)
|
||||||
this_editor.edit_file()
|
|
||||||
|
|
||||||
def execute_view(self):
|
def execute_view(self):
|
||||||
|
|
||||||
for f in self.args:
|
for f in self.args:
|
||||||
this_editor = VaultEditor(None, self.vault_pass, f)
|
self.editor.view_file(f)
|
||||||
this_editor.view_file()
|
|
||||||
|
|
||||||
def execute_encrypt(self):
|
def execute_encrypt(self):
|
||||||
|
|
||||||
cipher = getattr(self.options, 'cipher', self.CIPHER)
|
|
||||||
for f in self.args:
|
for f in self.args:
|
||||||
this_editor = VaultEditor(cipher, self.vault_pass, f)
|
self.editor.encrypt_file(f)
|
||||||
this_editor.encrypt_file()
|
|
||||||
|
|
||||||
self.display.display("Encryption successful")
|
self.display.display("Encryption successful", stderr=True)
|
||||||
|
|
||||||
def execute_rekey(self):
|
def execute_rekey(self):
|
||||||
for f in self.args:
|
for f in self.args:
|
||||||
|
@ -136,7 +128,6 @@ class VaultCLI(CLI):
|
||||||
__, new_password = self.ask_vault_passwords(ask_vault_pass=False, ask_new_vault_pass=True, confirm_new=True)
|
__, new_password = self.ask_vault_passwords(ask_vault_pass=False, ask_new_vault_pass=True, confirm_new=True)
|
||||||
|
|
||||||
for f in self.args:
|
for f in self.args:
|
||||||
this_editor = VaultEditor(None, self.vault_pass, f)
|
self.editor.rekey_file(f, new_password)
|
||||||
this_editor.rekey_file(new_password)
|
|
||||||
|
|
||||||
self.display.display("Rekey successful")
|
self.display.display("Rekey successful", stderr=True)
|
||||||
|
|
|
@ -12,11 +12,6 @@
|
||||||
#
|
#
|
||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||||
#
|
|
||||||
# ansible-pull is a script that runs ansible in local mode
|
|
||||||
# after checking out a playbooks directory from source repo. There is an
|
|
||||||
# example playbook to bootstrap this script in the examples/ dir which
|
|
||||||
# installs ansible and sets it up to run on cron.
|
|
||||||
|
|
||||||
# Make coding more python3-ish
|
# Make coding more python3-ish
|
||||||
from __future__ import (absolute_import, division, print_function)
|
from __future__ import (absolute_import, division, print_function)
|
||||||
|
@ -137,11 +132,11 @@ class VaultLib:
|
||||||
if self.is_encrypted(b_data):
|
if self.is_encrypted(b_data):
|
||||||
raise AnsibleError("data is already encrypted")
|
raise AnsibleError("data is already encrypted")
|
||||||
|
|
||||||
if not self.cipher_name:
|
if not self.cipher_name or self.cipher_name not in CIPHER_WRITE_WHITELIST:
|
||||||
self.cipher_name = u"AES256"
|
self.cipher_name = u"AES256"
|
||||||
|
|
||||||
cipher_class_name = u'Vault{0}'.format(self.cipher_name)
|
cipher_class_name = u'Vault{0}'.format(self.cipher_name)
|
||||||
if cipher_class_name in globals() and self.cipher_name in CIPHER_WHITELIST:
|
if cipher_class_name in globals():
|
||||||
Cipher = globals()[cipher_class_name]
|
Cipher = globals()[cipher_class_name]
|
||||||
this_cipher = Cipher()
|
this_cipher = Cipher()
|
||||||
else:
|
else:
|
||||||
|
@ -230,18 +225,11 @@ class VaultLib:
|
||||||
|
|
||||||
|
|
||||||
class VaultEditor:
|
class VaultEditor:
|
||||||
# uses helper methods for write_file(self, filename, data)
|
|
||||||
# to write a file so that code isn't duplicated for simple
|
|
||||||
# file I/O, ditto read_file(self, filename) and launch_editor(self, filename)
|
|
||||||
# ... "Don't Repeat Yourself", etc.
|
|
||||||
|
|
||||||
def __init__(self, cipher_name, password, filename):
|
def __init__(self, password):
|
||||||
# instantiates a member variable for VaultLib
|
|
||||||
self.cipher_name = cipher_name
|
|
||||||
self.password = password
|
self.password = password
|
||||||
self.filename = filename
|
|
||||||
|
|
||||||
def _edit_file_helper(self, existing_data=None, cipher=None, force_save=False):
|
def _edit_file_helper(self, filename, existing_data=None, force_save=False):
|
||||||
# make sure the umask is set to a sane value
|
# make sure the umask is set to a sane value
|
||||||
old_umask = os.umask(0o077)
|
old_umask = os.umask(0o077)
|
||||||
|
|
||||||
|
@ -262,70 +250,68 @@ class VaultEditor:
|
||||||
|
|
||||||
# create new vault
|
# create new vault
|
||||||
this_vault = VaultLib(self.password)
|
this_vault = VaultLib(self.password)
|
||||||
if cipher:
|
|
||||||
this_vault.cipher_name = cipher
|
|
||||||
|
|
||||||
# encrypt new data and write out to tmp
|
# encrypt new data and write out to tmp
|
||||||
enc_data = this_vault.encrypt(tmpdata)
|
enc_data = this_vault.encrypt(tmpdata)
|
||||||
self.write_data(enc_data, tmp_path)
|
self.write_data(enc_data, tmp_path)
|
||||||
|
|
||||||
# shuffle tmp file into place
|
# shuffle tmp file into place
|
||||||
self.shuffle_files(tmp_path, self.filename)
|
self.shuffle_files(tmp_path, filename)
|
||||||
|
|
||||||
# and restore umask
|
# and restore umask
|
||||||
os.umask(old_umask)
|
os.umask(old_umask)
|
||||||
|
|
||||||
def create_file(self):
|
def create_file(self, filename):
|
||||||
""" create a new encrypted file """
|
""" create a new encrypted file """
|
||||||
|
|
||||||
check_prereqs()
|
check_prereqs()
|
||||||
|
|
||||||
if os.path.isfile(self.filename):
|
if os.path.isfile(filename):
|
||||||
raise AnsibleError("%s exists, please use 'edit' instead" % self.filename)
|
raise AnsibleError("%s exists, please use 'edit' instead" % filename)
|
||||||
|
|
||||||
# Let the user specify contents and save file
|
# Let the user specify contents and save file
|
||||||
self._edit_file_helper(cipher=self.cipher_name)
|
self._edit_file_helper(filename)
|
||||||
|
|
||||||
def decrypt_file(self):
|
def decrypt_file(self, filename):
|
||||||
|
|
||||||
check_prereqs()
|
check_prereqs()
|
||||||
|
|
||||||
if not os.path.isfile(self.filename):
|
if not os.path.isfile(filename):
|
||||||
raise AnsibleError("%s does not exist" % self.filename)
|
raise AnsibleError("%s does not exist" % filename)
|
||||||
|
|
||||||
tmpdata = self.read_data(self.filename)
|
tmpdata = self.read_data(filename)
|
||||||
this_vault = VaultLib(self.password)
|
this_vault = VaultLib(self.password)
|
||||||
if this_vault.is_encrypted(tmpdata):
|
if this_vault.is_encrypted(tmpdata):
|
||||||
dec_data = this_vault.decrypt(tmpdata)
|
dec_data = this_vault.decrypt(tmpdata)
|
||||||
if dec_data is None:
|
if dec_data is None:
|
||||||
raise AnsibleError("Decryption failed")
|
raise AnsibleError("Decryption failed")
|
||||||
else:
|
else:
|
||||||
self.write_data(dec_data, self.filename)
|
self.write_data(dec_data, filename)
|
||||||
else:
|
else:
|
||||||
raise AnsibleError("%s is not encrypted" % self.filename)
|
raise AnsibleError("%s is not encrypted" % filename)
|
||||||
|
|
||||||
def edit_file(self):
|
def edit_file(self, filename):
|
||||||
|
|
||||||
check_prereqs()
|
check_prereqs()
|
||||||
|
|
||||||
# decrypt to tmpfile
|
# decrypt to tmpfile
|
||||||
tmpdata = self.read_data(self.filename)
|
tmpdata = self.read_data(filename)
|
||||||
this_vault = VaultLib(self.password)
|
this_vault = VaultLib(self.password)
|
||||||
dec_data = this_vault.decrypt(tmpdata)
|
dec_data = this_vault.decrypt(tmpdata)
|
||||||
|
|
||||||
# let the user edit the data and save
|
# let the user edit the data and save
|
||||||
if this_vault.cipher_name not in CIPHER_WRITE_WHITELIST:
|
if this_vault.cipher_name not in CIPHER_WRITE_WHITELIST:
|
||||||
# we want to get rid of files encrypted with the AES cipher
|
# we want to get rid of files encrypted with the AES cipher
|
||||||
self._edit_file_helper(existing_data=dec_data, cipher=None, force_save=True)
|
self._edit_file_helper(filename, existing_data=dec_data, force_save=True)
|
||||||
else:
|
else:
|
||||||
self._edit_file_helper(existing_data=dec_data, cipher=this_vault.cipher_name, force_save=False)
|
self._edit_file_helper(filename, existing_data=dec_data, force_save=False)
|
||||||
|
|
||||||
def view_file(self):
|
def view_file(self, filename):
|
||||||
|
|
||||||
check_prereqs()
|
check_prereqs()
|
||||||
|
|
||||||
# decrypt to tmpfile
|
# decrypt to tmpfile
|
||||||
tmpdata = self.read_data(self.filename)
|
tmpdata = self.read_data(filename)
|
||||||
this_vault = VaultLib(self.password)
|
this_vault = VaultLib(self.password)
|
||||||
dec_data = this_vault.decrypt(tmpdata)
|
dec_data = this_vault.decrypt(tmpdata)
|
||||||
_, tmp_path = tempfile.mkstemp()
|
_, tmp_path = tempfile.mkstemp()
|
||||||
|
@ -335,40 +321,36 @@ class VaultEditor:
|
||||||
call(self._pager_shell_command(tmp_path))
|
call(self._pager_shell_command(tmp_path))
|
||||||
os.remove(tmp_path)
|
os.remove(tmp_path)
|
||||||
|
|
||||||
def encrypt_file(self):
|
def encrypt_file(self, filename):
|
||||||
|
|
||||||
check_prereqs()
|
check_prereqs()
|
||||||
|
|
||||||
if not os.path.isfile(self.filename):
|
if not os.path.isfile(filename):
|
||||||
raise AnsibleError("%s does not exist" % self.filename)
|
raise AnsibleError("%s does not exist" % filename)
|
||||||
|
|
||||||
tmpdata = self.read_data(self.filename)
|
tmpdata = self.read_data(filename)
|
||||||
this_vault = VaultLib(self.password)
|
this_vault = VaultLib(self.password)
|
||||||
this_vault.cipher_name = self.cipher_name
|
|
||||||
if not this_vault.is_encrypted(tmpdata):
|
if not this_vault.is_encrypted(tmpdata):
|
||||||
enc_data = this_vault.encrypt(tmpdata)
|
enc_data = this_vault.encrypt(tmpdata)
|
||||||
self.write_data(enc_data, self.filename)
|
self.write_data(enc_data, filename)
|
||||||
else:
|
else:
|
||||||
raise AnsibleError("%s is already encrypted" % self.filename)
|
raise AnsibleError("%s is already encrypted" % filename)
|
||||||
|
|
||||||
def rekey_file(self, new_password):
|
def rekey_file(self, filename, new_password):
|
||||||
|
|
||||||
check_prereqs()
|
check_prereqs()
|
||||||
|
|
||||||
# decrypt
|
# decrypt
|
||||||
tmpdata = self.read_data(self.filename)
|
tmpdata = self.read_data(filename)
|
||||||
this_vault = VaultLib(self.password)
|
this_vault = VaultLib(self.password)
|
||||||
dec_data = this_vault.decrypt(tmpdata)
|
dec_data = this_vault.decrypt(tmpdata)
|
||||||
|
|
||||||
# create new vault
|
# create new vault
|
||||||
new_vault = VaultLib(new_password)
|
new_vault = VaultLib(new_password)
|
||||||
|
|
||||||
# we want to force cipher to the default
|
|
||||||
#new_vault.cipher_name = this_vault.cipher_name
|
|
||||||
|
|
||||||
# re-encrypt data and re-write file
|
# re-encrypt data and re-write file
|
||||||
enc_data = new_vault.encrypt(dec_data)
|
enc_data = new_vault.encrypt(dec_data)
|
||||||
self.write_data(enc_data, self.filename)
|
self.write_data(enc_data, filename)
|
||||||
|
|
||||||
def read_data(self, filename):
|
def read_data(self, filename):
|
||||||
f = open(filename, "rb")
|
f = open(filename, "rb")
|
||||||
|
|
|
@ -81,7 +81,7 @@ class TestVaultEditor(unittest.TestCase):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def test_methods_exist(self):
|
def test_methods_exist(self):
|
||||||
v = VaultEditor(None, None, None)
|
v = VaultEditor(None)
|
||||||
slots = ['create_file',
|
slots = ['create_file',
|
||||||
'decrypt_file',
|
'decrypt_file',
|
||||||
'edit_file',
|
'edit_file',
|
||||||
|
@ -103,8 +103,8 @@ class TestVaultEditor(unittest.TestCase):
|
||||||
tmp_file = tempfile.NamedTemporaryFile()
|
tmp_file = tempfile.NamedTemporaryFile()
|
||||||
os.unlink(tmp_file.name)
|
os.unlink(tmp_file.name)
|
||||||
|
|
||||||
ve = VaultEditor(None, "ansible", tmp_file.name)
|
ve = VaultEditor("ansible")
|
||||||
ve.create_file()
|
ve.create_file(tmp_file.name)
|
||||||
|
|
||||||
self.assertTrue(os.path.exists(tmp_file.name))
|
self.assertTrue(os.path.exists(tmp_file.name))
|
||||||
|
|
||||||
|
@ -120,12 +120,12 @@ class TestVaultEditor(unittest.TestCase):
|
||||||
with v10_file as f:
|
with v10_file as f:
|
||||||
f.write(to_bytes(v10_data))
|
f.write(to_bytes(v10_data))
|
||||||
|
|
||||||
ve = VaultEditor(None, "ansible", v10_file.name)
|
ve = VaultEditor("ansible")
|
||||||
|
|
||||||
# make sure the password functions for the cipher
|
# make sure the password functions for the cipher
|
||||||
error_hit = False
|
error_hit = False
|
||||||
try:
|
try:
|
||||||
ve.decrypt_file()
|
ve.decrypt_file(v10_file.name)
|
||||||
except errors.AnsibleError as e:
|
except errors.AnsibleError as e:
|
||||||
error_hit = True
|
error_hit = True
|
||||||
|
|
||||||
|
@ -148,12 +148,12 @@ class TestVaultEditor(unittest.TestCase):
|
||||||
with v11_file as f:
|
with v11_file as f:
|
||||||
f.write(to_bytes(v11_data))
|
f.write(to_bytes(v11_data))
|
||||||
|
|
||||||
ve = VaultEditor(None, "ansible", v11_file.name)
|
ve = VaultEditor("ansible")
|
||||||
|
|
||||||
# make sure the password functions for the cipher
|
# make sure the password functions for the cipher
|
||||||
error_hit = False
|
error_hit = False
|
||||||
try:
|
try:
|
||||||
ve.decrypt_file()
|
ve.decrypt_file(v11_file.name)
|
||||||
except errors.AnsibleError as e:
|
except errors.AnsibleError as e:
|
||||||
error_hit = True
|
error_hit = True
|
||||||
|
|
||||||
|
@ -180,12 +180,12 @@ class TestVaultEditor(unittest.TestCase):
|
||||||
with v10_file as f:
|
with v10_file as f:
|
||||||
f.write(to_bytes(v10_data))
|
f.write(to_bytes(v10_data))
|
||||||
|
|
||||||
ve = VaultEditor(None, "ansible", v10_file.name)
|
ve = VaultEditor("ansible")
|
||||||
|
|
||||||
# make sure the password functions for the cipher
|
# make sure the password functions for the cipher
|
||||||
error_hit = False
|
error_hit = False
|
||||||
try:
|
try:
|
||||||
ve.rekey_file('ansible2')
|
ve.rekey_file(v10_file.name, 'ansible2')
|
||||||
except errors.AnsibleError as e:
|
except errors.AnsibleError as e:
|
||||||
error_hit = True
|
error_hit = True
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue