mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Adding chunked file transfers to fireball2
This commit is contained in:
parent
959138d00d
commit
af1dd7075f
2 changed files with 128 additions and 40 deletions
|
@ -26,6 +26,13 @@ from ansible import utils
|
|||
from ansible import errors
|
||||
from ansible import constants
|
||||
|
||||
# the chunk size to read and send, assuming mtu 1500 and
|
||||
# leaving room for base64 (+33%) encoding and header (8 bytes)
|
||||
# ((1400-8)/4)*3) = 1044
|
||||
# which leaves room for the TCP/IP header. We set this to a
|
||||
# multiple of the value to speed up file reads.
|
||||
CHUNK_SIZE=1044*20
|
||||
|
||||
class Connection(object):
|
||||
''' raw socket accelerated connection '''
|
||||
|
||||
|
@ -148,27 +155,42 @@ class Connection(object):
|
|||
if not os.path.exists(in_path):
|
||||
raise errors.AnsibleFileNotFound("file or module does not exist: %s" % in_path)
|
||||
|
||||
data = file(in_path).read()
|
||||
data = base64.b64encode(data)
|
||||
data = dict(mode='put', data=data, out_path=out_path)
|
||||
fd = file(in_path, 'rb')
|
||||
fstat = os.stat(in_path)
|
||||
try:
|
||||
vvv("PUT file is %d bytes" % fstat.st_size)
|
||||
while fd.tell() < fstat.st_size:
|
||||
data = fd.read(CHUNK_SIZE)
|
||||
last = False
|
||||
if fd.tell() >= fstat.st_size:
|
||||
last = True
|
||||
data = dict(mode='put', data=base64.b64encode(data), out_path=out_path, last=last)
|
||||
if self.runner.sudo:
|
||||
data['user'] = self.runner.sudo_user
|
||||
data = utils.jsonify(data)
|
||||
data = utils.encrypt(self.key, data)
|
||||
|
||||
if self.runner.sudo:
|
||||
data['user'] = self.runner.sudo_user
|
||||
if self.send_data(data):
|
||||
raise errors.AnsibleError("failed to send the file to %s" % self.host)
|
||||
|
||||
# TODO: support chunked file transfer
|
||||
data = utils.jsonify(data)
|
||||
data = utils.encrypt(self.key, data)
|
||||
if self.send_data(data):
|
||||
raise errors.AnsibleError("failed to send the file to %s" % self.host)
|
||||
response = self.recv_data()
|
||||
if not response:
|
||||
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
|
||||
response = utils.decrypt(self.key, response)
|
||||
response = utils.parse_json(response)
|
||||
|
||||
response = self.recv_data()
|
||||
if not response:
|
||||
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
|
||||
response = utils.decrypt(self.key, response)
|
||||
response = utils.parse_json(response)
|
||||
if response.get('failed',False):
|
||||
raise errors.AnsibleError("failed to put the file in the requested location")
|
||||
finally:
|
||||
fd.close()
|
||||
response = self.recv_data()
|
||||
if not response:
|
||||
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
|
||||
response = utils.decrypt(self.key, response)
|
||||
response = utils.parse_json(response)
|
||||
|
||||
if response.get('failed',False):
|
||||
raise errors.AnsibleError("failed to put the file in the requested location")
|
||||
if response.get('failed',False):
|
||||
raise errors.AnsibleError("failed to put the file in the requested location")
|
||||
|
||||
def fetch_file(self, in_path, out_path):
|
||||
''' save a remote file to the specified path '''
|
||||
|
@ -180,17 +202,36 @@ class Connection(object):
|
|||
if self.send_data(data):
|
||||
raise errors.AnsibleError("failed to initiate the file fetch with %s" % self.host)
|
||||
|
||||
response = self.recv_data()
|
||||
if not response:
|
||||
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
|
||||
response = utils.decrypt(self.key, response)
|
||||
response = utils.parse_json(response)
|
||||
response = response['data']
|
||||
response = base64.b64decode(response)
|
||||
|
||||
fh = open(out_path, "w")
|
||||
fh.write(response)
|
||||
fh.close()
|
||||
try:
|
||||
bytes = 0
|
||||
while True:
|
||||
response = self.recv_data()
|
||||
if not response:
|
||||
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
|
||||
response = utils.decrypt(self.key, response)
|
||||
response = utils.parse_json(response)
|
||||
if response.get('failed', False):
|
||||
raise errors.AnsibleError("Error during file fetch, aborting")
|
||||
out = base64.b64decode(response['data'])
|
||||
fh.write(out)
|
||||
bytes += len(out)
|
||||
# send an empty response back to signify we
|
||||
# received the last chunk without errors
|
||||
data = utils.jsonify(dict())
|
||||
data = utils.encrypt(self.key, data)
|
||||
if self.send_data(data):
|
||||
raise errors.AnsibleError("failed to send ack during file fetch")
|
||||
if response.get('last', False):
|
||||
break
|
||||
finally:
|
||||
# we don't currently care about this final response,
|
||||
# we just receive it and drop it. It may be used at some
|
||||
# point in the future or we may just have the put/fetch
|
||||
# operations not send back a final response at all
|
||||
response = self.recv_data()
|
||||
vvv("FETCH wrote %d bytes to %s" % (bytes, out_path))
|
||||
fh.close()
|
||||
|
||||
def close(self):
|
||||
''' terminate the connection '''
|
||||
|
|
|
@ -80,6 +80,12 @@ import SocketServer
|
|||
syslog.openlog('ansible-%s' % os.path.basename(__file__))
|
||||
PIDFILE = os.path.expanduser("~/.fireball2.pid")
|
||||
|
||||
# the chunk size to read and send, assuming mtu 1500 and
|
||||
# leaving room for base64 (+33%) encoding and header (100 bytes)
|
||||
# 4 * (975/3) + 100 = 1400
|
||||
# which leaves room for the TCP/IP header
|
||||
CHUNK_SIZE=10240
|
||||
|
||||
def log(msg):
|
||||
syslog.syslog(syslog.LOG_NOTICE, msg)
|
||||
|
||||
|
@ -227,13 +233,40 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
|
|||
if 'in_path' not in data:
|
||||
return dict(failed=True, msg='internal error: in_path is required')
|
||||
|
||||
# FIXME: should probably support chunked file transfer for binary files
|
||||
# at some point. For now, just base64 encodes the file
|
||||
# so don't use it to move ISOs, use rsync.
|
||||
try:
|
||||
fd = file(data['in_path'], 'rb')
|
||||
fstat = os.stat(data['in_path'])
|
||||
log("FETCH file is %d bytes" % fstat.st_size)
|
||||
while fd.tell() < fstat.st_size:
|
||||
data = fd.read(CHUNK_SIZE)
|
||||
last = False
|
||||
if fd.tell() >= fstat.st_size:
|
||||
last = True
|
||||
data = dict(data=base64.b64encode(data), last=last)
|
||||
data = json.dumps(data)
|
||||
data = self.server.key.Encrypt(data)
|
||||
|
||||
fh = open(data['in_path'])
|
||||
data = base64.b64encode(fh.read())
|
||||
return dict(data=data)
|
||||
if self.send_data(data):
|
||||
return dict(failed=True, stderr="failed to send data")
|
||||
|
||||
response = self.recv_data()
|
||||
if not response:
|
||||
log("failed to get a response, aborting")
|
||||
return dict(failed=True, stderr="Failed to get a response from %s" % self.host)
|
||||
response = self.server.key.Decrypt(response)
|
||||
response = json.loads(response)
|
||||
|
||||
if response.get('failed',False):
|
||||
log("got a failed response from the master")
|
||||
return dict(failed=True, stderr="Master reported failure, aborting transfer")
|
||||
except Exception, e:
|
||||
tb = traceback.format_exc()
|
||||
log("failed to fetch the file: %s" % tb)
|
||||
return dict(failed=True, stderr="Could not fetch the file: %s" % str(e))
|
||||
finally:
|
||||
fd.close()
|
||||
|
||||
return dict()
|
||||
|
||||
def put(self, data):
|
||||
if 'data' not in data:
|
||||
|
@ -251,19 +284,33 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
|
|||
out_path = data['out_path']
|
||||
out_fd = open(out_path, 'w')
|
||||
|
||||
# FIXME: should probably support chunked file transfer for binary files
|
||||
# at some point. For now, just base64 encodes the file
|
||||
# so don't use it to move ISOs, use rsync.
|
||||
|
||||
try:
|
||||
out_fd.write(base64.b64decode(data['data']))
|
||||
out_fd.close()
|
||||
bytes=0
|
||||
while True:
|
||||
out = base64.b64decode(data['data'])
|
||||
bytes += len(out)
|
||||
out_fd.write(out)
|
||||
response = json.dumps(dict())
|
||||
response = self.server.key.Encrypt(response)
|
||||
self.send_data(response)
|
||||
if data['last']:
|
||||
break
|
||||
data = self.recv_data()
|
||||
if not data:
|
||||
raise ""
|
||||
data = self.server.key.Decrypt(data)
|
||||
data = json.loads(data)
|
||||
except:
|
||||
tb = traceback.format_exc()
|
||||
log("failed to put the file: %s" % tb)
|
||||
return dict(failed=True, stdout="Could not write the file")
|
||||
finally:
|
||||
log("wrote %d bytes" % bytes)
|
||||
out_fd.close()
|
||||
|
||||
if final_path:
|
||||
log("moving %s to %s" % (out_path, final_path))
|
||||
args = ['sudo','mv',out_path,final_path]
|
||||
args = ['sudo','cp',out_path,final_path]
|
||||
rc, stdout, stderr = self.server.module.run_command(args, close_fds=True)
|
||||
if rc != 0:
|
||||
return dict(failed=True, stdout="failed to copy the file into position with sudo")
|
||||
|
|
Loading…
Reference in a new issue