1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Initial work for the AES cipher class

This is based somewhat loosely on how Keyczar does things. Their
implementation does things in a much more generic way to allow for more
variance in how the cipher is created, but since we're only using one
key type most of our values are hard-coded. They also add a header to
their messages, which I am not doing (don't see the need for it
currently).
This commit is contained in:
James Cammarata 2013-08-07 09:12:25 -04:00
parent ffb4d480cf
commit fd2aabaa27

View file

@ -40,6 +40,10 @@ import warnings
import traceback
import getpass
from Crypto.Cipher import
from Crypto import Random
from Crypto.Random.random import StrongRandom
VERBOSITY=0
MAX_FILE_SIZE_FOR_DIFF=1*1024*1024
@ -61,50 +65,128 @@ try:
except:
pass
KEYCZAR_AVAILABLE=False
try:
import keyczar.errors as key_errors
from keyczar.keys import AesKey
KEYCZAR_AVAILABLE=True
except ImportError:
pass
###############################################################
# abtractions around keyczar
# Abstractions around PyCrypto
###############################################################
def key_for_hostname(hostname):
# fireball mode is an implementation of ansible firing up zeromq via SSH
# to use no persistent daemons or key management
class AES256Cipher(object):
"""
Class abstraction of an AES 256 cipher. This class
also keeps track of the time since the key was last
generated, so you know when to rekey. Rekeying would
be done as follows:
if not KEYCZAR_AVAILABLE:
raise errors.AnsibleError("python-keyczar must be installed to use fireball mode")
k = AES256Cipher.gen_key()
<exchange new key with client securely>
AES26Cipher.set_key(k)
key_path = os.path.expanduser("~/.fireball.keys")
if not os.path.exists(key_path):
os.makedirs(key_path)
key_path = os.path.expanduser("~/.fireball.keys/%s" % hostname)
From this point on the new key would be used until
the lifetime is exceeded.
"""
def __init__(self, lifetime=60*30, mode=AES.MODE_CFB):
self.lifetime = lifetime
self.mode = mode
self.set_key(self.gen_key())
# use new AES keys every 2 hours, which means fireball must not allow running for longer either
if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60*60*2):
key = AesKey.Generate()
fh = open(key_path, "w")
fh.write(str(key))
fh.close()
return key
else:
fh = open(key_path)
key = AesKey.Read(fh.read())
fh.close()
return key
def gen_key(self):
"""
Generates a 256-bit (32 byte) key to be used for the
AES block encryption.
"""
return b"".join(StrongRandom().sample(string.letters+string.digits+string.punctuation,32))
def encrypt(key, msg):
return key.Encrypt(msg)
def set_key(self,key):
"""
Sets the internal key to the one provided and resets the
internal time to now. This key should ONLY be set to one
generated by gen_key()
"""
self.init_time = time.time()
self.key = key
def decrypt(key, msg):
try:
return key.Decrypt(msg)
except key_errors.InvalidSignatureError:
raise errors.AnsibleError("decryption failed")
def should_rekey(self):
"""
Returns true if the lifetime of the current key has
exceeded the set lifetime.
"""
if ((time.time() - self.init_time) > self.lifetime):
return True
else:
return False
def _pad(self, msg):
"""
Adds padding to the message so that it is a full
AES block size. Used during encryption of the message.
"""
pad = AES.block_size - len(msg) % AES.block_size
return msg + pad * chr(pad)
def _unpad(self, msg):
"""
Strips out the padding that _pad added. Used during
the decryption of the message.
"""
pad = ord(msg[-1])
return msg[:-pad]
def gen_sig(self, msg):
"""
Generates an HMAC-SHA1 signature for the message
"""
return hmac.new(self.key, msg, hashlib.sha1).digest()
def validate_sig(self, msg, sig):
"""
Verifies the generated signature of the message matches
the signature provided.
"""
new_sig = self.gen_sig(msg)
return (new_sig == sig)
def encrypt(self, msg):
"""
Encrypt the message using AES. The signature
is appended to the end of the message and is
used to verify the integrity of the IV and data.
Returns a base64-encoded version of the following:
rval[0:16] = initialization vector
rval[16:-20] = cipher text
rval[-20:] = signature
"""
msg = self._pad(msg)
iv = Random.new().read(AES.block_size)
cipher = AES.new(self.key, self.mode, iv)
data = iv + cipher.encrypt(msg)
sig = self.gen_sig(data)
return (data + sig).encode('base64')
def decrypt(self, msg):
"""
Decrypt the message using AES. The signature is
used to verify the IV and data before decoding to
ensure the integrity of the message. This is an
HMAC-SHA1 hash, so it is always 20 characters
The incoming message format (after base64 decoding)
is as follows:
msg[0:16] = initialization vector
msg[16:-20] = cipher text
msg[-20:] = signature (HMAC-SHA1)
Returns the plain-text of the cipher.
"""
msg = msg.decode('base64')
data = msg[0:-20] # iv + cipher text
msig = msg[-20:] # hmac-sha1 hash
if not self.validate_sig(data,msig):
raise Exception("Failed to validate the message signature")
iv = msg[:AES.block_size]
cipher = AES.new(self.key, self.mode, iv)
return self._unpad(cipher.decrypt(msg)[AES.block_size:])
###############################################################
# UTILITY FUNCTIONS FOR COMMAND LINE TOOLS