1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Refactor all Proxmox modules to use shared module_utils. (#4029) (#4164)

* Refactor Proxmox modules to use `module_utils`.

* Fix tests.

* Rename `node_check`.

* Add `ignore_missing` to `get_vm`.

* Refactor `proxmox` module.

* Add changelog entry.

* Add `choose_first_if_multiple` parameter for deprecation.

(cherry picked from commit a61bdbadd5)

Co-authored-by: Markus Reiter <me@reitermark.us>
This commit is contained in:
patchback[bot] 2022-02-07 17:48:11 +01:00 committed by GitHub
parent fab30c5e55
commit a678029bd2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 849 additions and 1072 deletions

View file

@ -0,0 +1,3 @@
---
minor_changes:
- proxmox modules - move common code into ``module_utils`` (https://github.com/ansible-collections/community.general/pull/4029).

View file

@ -21,6 +21,8 @@ except ImportError:
from ansible.module_utils.basic import env_fallback, missing_required_lib from ansible.module_utils.basic import env_fallback, missing_required_lib
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.general.plugins.module_utils.version import LooseVersion
def proxmox_auth_argument_spec(): def proxmox_auth_argument_spec():
@ -98,3 +100,46 @@ class ProxmoxAnsible(object):
return ProxmoxAPI(api_host, verify_ssl=validate_certs, **auth_args) return ProxmoxAPI(api_host, verify_ssl=validate_certs, **auth_args)
except Exception as e: except Exception as e:
self.module.fail_json(msg='%s' % e, exception=traceback.format_exc()) self.module.fail_json(msg='%s' % e, exception=traceback.format_exc())
def version(self):
apireturn = self.proxmox_api.version.get()
return LooseVersion(apireturn['version'])
def get_node(self, node):
nodes = [n for n in self.proxmox_api.nodes.get() if n['node'] == node]
return nodes[0] if nodes else None
def get_nextvmid(self):
vmid = self.proxmox_api.cluster.nextid.get()
return vmid
def get_vmid(self, name, ignore_missing=False, choose_first_if_multiple=False):
vms = [vm['vmid'] for vm in self.proxmox_api.cluster.resources.get(type='vm') if vm.get('name') == name]
if not vms:
if ignore_missing:
return None
self.module.fail_json(msg='No VM with name %s found' % name)
elif len(vms) > 1:
if choose_first_if_multiple:
self.module.deprecate(
'Multiple VMs with name %s found, choosing the first one. ' % name +
'This will be an error in the future. To ensure the correct VM is used, ' +
'also pass the vmid parameter.',
version='5.0.0', collection_name='community.general')
else:
self.module.fail_json(msg='Multiple VMs with name %s found, provide vmid instead' % name)
return vms[0]
def get_vm(self, vmid, ignore_missing=False):
vms = [vm for vm in self.proxmox_api.cluster.resources.get(type='vm') if vm['vmid'] == int(vmid)]
if vms:
return vms[0]
else:
if ignore_missing:
return None
self.module.fail_json(msg='VM with vmid %s does not exist in cluster' % vmid)

View file

@ -1,5 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
#
# Copyright: Ansible Project # Copyright: Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
@ -392,62 +393,27 @@ import traceback
from ansible_collections.community.general.plugins.module_utils.version import LooseVersion from ansible_collections.community.general.plugins.module_utils.version import LooseVersion
try:
from proxmoxer import ProxmoxAPI
HAS_PROXMOXER = True
except ImportError:
HAS_PROXMOXER = False
from ansible.module_utils.basic import AnsibleModule, env_fallback from ansible.module_utils.basic import AnsibleModule, env_fallback
from ansible.module_utils.common.text.converters import to_native from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.general.plugins.module_utils.proxmox import ( from ansible_collections.community.general.plugins.module_utils.proxmox import (
ansible_to_proxmox_bool ansible_to_proxmox_bool, proxmox_auth_argument_spec, ProxmoxAnsible)
)
VZ_TYPE = None VZ_TYPE = None
def get_nextvmid(module, proxmox): class ProxmoxLxcAnsible(ProxmoxAnsible):
try: def content_check(self, node, ostemplate, template_store):
vmid = proxmox.cluster.nextid.get() return [True for cnt in self.proxmox_api.nodes(node).storage(template_store).content.get() if cnt['volid'] == ostemplate]
return vmid
except Exception as e:
module.fail_json(msg="Unable to get next vmid. Failed with exception: %s" % to_native(e),
exception=traceback.format_exc())
def is_template_container(self, node, vmid):
def get_vmid(proxmox, hostname):
return [vm['vmid'] for vm in proxmox.cluster.resources.get(type='vm') if 'name' in vm and vm['name'] == hostname]
def get_instance(proxmox, vmid):
return [vm for vm in proxmox.cluster.resources.get(type='vm') if vm['vmid'] == int(vmid)]
def content_check(proxmox, node, ostemplate, template_store):
return [True for cnt in proxmox.nodes(node).storage(template_store).content.get() if cnt['volid'] == ostemplate]
def is_template_container(proxmox, node, vmid):
"""Check if the specified container is a template.""" """Check if the specified container is a template."""
proxmox_node = proxmox.nodes(node) proxmox_node = self.proxmox_api.nodes(node)
config = getattr(proxmox_node, VZ_TYPE)(vmid).config.get() config = getattr(proxmox_node, VZ_TYPE)(vmid).config.get()
return config['template'] return config['template']
def create_instance(self, vmid, node, disk, storage, cpus, memory, swap, timeout, clone, **kwargs):
def node_check(proxmox, node): proxmox_node = self.proxmox_api.nodes(node)
return [True for nd in proxmox.nodes.get() if nd['node'] == node]
def proxmox_version(proxmox):
apireturn = proxmox.version.get()
return LooseVersion(apireturn['version'])
def create_instance(module, proxmox, vmid, node, disk, storage, cpus, memory, swap, timeout, clone, **kwargs):
proxmox_node = proxmox.nodes(node)
# Remove all empty kwarg entries # Remove all empty kwarg entries
kwargs = dict((k, v) for k, v in kwargs.items() if v is not None) kwargs = dict((k, v) for k, v in kwargs.items() if v is not None)
@ -462,7 +428,7 @@ def create_instance(module, proxmox, vmid, node, disk, storage, cpus, memory, sw
kwargs.update(kwargs['mounts']) kwargs.update(kwargs['mounts'])
del kwargs['mounts'] del kwargs['mounts']
if 'pubkey' in kwargs: if 'pubkey' in kwargs:
if proxmox_version(proxmox) >= LooseVersion('4.2'): if self.version() >= LooseVersion('4.2'):
kwargs['ssh-public-keys'] = kwargs['pubkey'] kwargs['ssh-public-keys'] = kwargs['pubkey']
del kwargs['pubkey'] del kwargs['pubkey']
else: else:
@ -471,31 +437,31 @@ def create_instance(module, proxmox, vmid, node, disk, storage, cpus, memory, sw
if clone is not None: if clone is not None:
if VZ_TYPE != 'lxc': if VZ_TYPE != 'lxc':
module.fail_json(changed=False, msg="Clone operator is only supported for LXC enabled proxmox clusters.") self.module.fail_json(changed=False, msg="Clone operator is only supported for LXC enabled proxmox clusters.")
clone_is_template = is_template_container(proxmox, node, clone) clone_is_template = self.is_template_container(node, clone)
# By default, create a full copy only when the cloned container is not a template. # By default, create a full copy only when the cloned container is not a template.
create_full_copy = not clone_is_template create_full_copy = not clone_is_template
# Only accept parameters that are compatible with the clone endpoint. # Only accept parameters that are compatible with the clone endpoint.
valid_clone_parameters = ['hostname', 'pool', 'description'] valid_clone_parameters = ['hostname', 'pool', 'description']
if module.params['storage'] is not None and clone_is_template: if self.module.params['storage'] is not None and clone_is_template:
# Cloning a template, so create a full copy instead of a linked copy # Cloning a template, so create a full copy instead of a linked copy
create_full_copy = True create_full_copy = True
elif module.params['storage'] is None and not clone_is_template: elif self.module.params['storage'] is None and not clone_is_template:
# Not cloning a template, but also no defined storage. This isn't possible. # Not cloning a template, but also no defined storage. This isn't possible.
module.fail_json(changed=False, msg="Cloned container is not a template, storage needs to be specified.") self.module.fail_json(changed=False, msg="Cloned container is not a template, storage needs to be specified.")
if module.params['clone_type'] == 'linked': if self.module.params['clone_type'] == 'linked':
if not clone_is_template: if not clone_is_template:
module.fail_json(changed=False, msg="'linked' clone type is specified, but cloned container is not a template container.") self.module.fail_json(changed=False, msg="'linked' clone type is specified, but cloned container is not a template container.")
# Don't need to do more, by default create_full_copy is set to false already # Don't need to do more, by default create_full_copy is set to false already
elif module.params['clone_type'] == 'opportunistic': elif self.module.params['clone_type'] == 'opportunistic':
if not clone_is_template: if not clone_is_template:
# Cloned container is not a template, so we need our 'storage' parameter # Cloned container is not a template, so we need our 'storage' parameter
valid_clone_parameters.append('storage') valid_clone_parameters.append('storage')
elif module.params['clone_type'] == 'full': elif self.module.params['clone_type'] == 'full':
create_full_copy = True create_full_copy = True
valid_clone_parameters.append('storage') valid_clone_parameters.append('storage')
@ -506,8 +472,8 @@ def create_instance(module, proxmox, vmid, node, disk, storage, cpus, memory, sw
else: else:
clone_parameters['full'] = '0' clone_parameters['full'] = '0'
for param in valid_clone_parameters: for param in valid_clone_parameters:
if module.params[param] is not None: if self.module.params[param] is not None:
clone_parameters[param] = module.params[param] clone_parameters[param] = self.module.params[param]
taskid = getattr(proxmox_node, VZ_TYPE)(clone).clone.post(newid=vmid, **clone_parameters) taskid = getattr(proxmox_node, VZ_TYPE)(clone).clone.post(newid=vmid, **clone_parameters)
else: else:
@ -519,71 +485,62 @@ def create_instance(module, proxmox, vmid, node, disk, storage, cpus, memory, sw
return True return True
timeout -= 1 timeout -= 1
if timeout == 0: if timeout == 0:
module.fail_json(msg='Reached timeout while waiting for creating VM. Last line in task before timeout: %s' % self.module.fail_json(msg='Reached timeout while waiting for creating VM. Last line in task before timeout: %s' %
proxmox_node.tasks(taskid).log.get()[:1]) proxmox_node.tasks(taskid).log.get()[:1])
time.sleep(1) time.sleep(1)
return False return False
def start_instance(self, vm, vmid, timeout):
def start_instance(module, proxmox, vm, vmid, timeout): taskid = getattr(self.proxmox_api.nodes(vm['node']), VZ_TYPE)(vmid).status.start.post()
taskid = getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.start.post()
while timeout: while timeout:
if (proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['status'] == 'stopped' and if (self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['status'] == 'stopped' and
proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'): self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'):
return True return True
timeout -= 1 timeout -= 1
if timeout == 0: if timeout == 0:
module.fail_json(msg='Reached timeout while waiting for starting VM. Last line in task before timeout: %s' % self.module.fail_json(msg='Reached timeout while waiting for starting VM. Last line in task before timeout: %s' %
proxmox.nodes(vm[0]['node']).tasks(taskid).log.get()[:1]) self.proxmox_api.nodes(vm['node']).tasks(taskid).log.get()[:1])
time.sleep(1) time.sleep(1)
return False return False
def stop_instance(self, vm, vmid, timeout, force):
def stop_instance(module, proxmox, vm, vmid, timeout, force):
if force: if force:
taskid = getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.shutdown.post(forceStop=1) taskid = getattr(self.proxmox_api.nodes(vm['node']), VZ_TYPE)(vmid).status.shutdown.post(forceStop=1)
else: else:
taskid = getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.shutdown.post() taskid = getattr(self.proxmox_api.nodes(vm['node']), VZ_TYPE)(vmid).status.shutdown.post()
while timeout: while timeout:
if (proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['status'] == 'stopped' and if (self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['status'] == 'stopped' and
proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'): self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'):
return True return True
timeout -= 1 timeout -= 1
if timeout == 0: if timeout == 0:
module.fail_json(msg='Reached timeout while waiting for stopping VM. Last line in task before timeout: %s' % self.module.fail_json(msg='Reached timeout while waiting for stopping VM. Last line in task before timeout: %s' %
proxmox.nodes(vm[0]['node']).tasks(taskid).log.get()[:1]) self.proxmox_api.nodes(vm['node']).tasks(taskid).log.get()[:1])
time.sleep(1) time.sleep(1)
return False return False
def umount_instance(self, vm, vmid, timeout):
def umount_instance(module, proxmox, vm, vmid, timeout): taskid = getattr(self.proxmox_api.nodes(vm['node']), VZ_TYPE)(vmid).status.umount.post()
taskid = getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.umount.post()
while timeout: while timeout:
if (proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['status'] == 'stopped' and if (self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['status'] == 'stopped' and
proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'): self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'):
return True return True
timeout -= 1 timeout -= 1
if timeout == 0: if timeout == 0:
module.fail_json(msg='Reached timeout while waiting for unmounting VM. Last line in task before timeout: %s' % self.module.fail_json(msg='Reached timeout while waiting for unmounting VM. Last line in task before timeout: %s' %
proxmox.nodes(vm[0]['node']).tasks(taskid).log.get()[:1]) self.proxmox_api.nodes(vm['node']).tasks(taskid).log.get()[:1])
time.sleep(1) time.sleep(1)
return False return False
def main(): def main():
module = AnsibleModule( module_args = proxmox_auth_argument_spec()
argument_spec=dict( proxmox_args = dict(
api_host=dict(required=True),
api_password=dict(no_log=True, fallback=(env_fallback, ['PROXMOX_PASSWORD'])),
api_token_id=dict(no_log=True),
api_token_secret=dict(no_log=True),
api_user=dict(required=True),
vmid=dict(type='int', required=False), vmid=dict(type='int', required=False),
validate_certs=dict(type='bool', default=False),
node=dict(), node=dict(),
pool=dict(), pool=dict(),
password=dict(no_log=True), password=dict(no_log=True),
@ -614,7 +571,11 @@ def main():
proxmox_default_behavior=dict(type='str', default='no_defaults', choices=['compatibility', 'no_defaults']), proxmox_default_behavior=dict(type='str', default='no_defaults', choices=['compatibility', 'no_defaults']),
clone=dict(type='int'), clone=dict(type='int'),
clone_type=dict(default='opportunistic', choices=['full', 'linked', 'opportunistic']), clone_type=dict(default='opportunistic', choices=['full', 'linked', 'opportunistic']),
), )
module_args.update(proxmox_args)
module = AnsibleModule(
argument_spec=module_args,
required_if=[ required_if=[
('state', 'present', ['node', 'hostname']), ('state', 'present', ['node', 'hostname']),
('state', 'present', ('clone', 'ostemplate'), True), # Require one of clone and ostemplate. Together with mutually_exclusive this ensures that we ('state', 'present', ('clone', 'ostemplate'), True), # Require one of clone and ostemplate. Together with mutually_exclusive this ensures that we
@ -627,17 +588,13 @@ def main():
mutually_exclusive=[('clone', 'ostemplate')], # Creating a new container is done either by cloning an existing one, or based on a template. mutually_exclusive=[('clone', 'ostemplate')], # Creating a new container is done either by cloning an existing one, or based on a template.
) )
if not HAS_PROXMOXER: proxmox = ProxmoxLxcAnsible(module)
module.fail_json(msg='proxmoxer required for this module')
global VZ_TYPE
VZ_TYPE = 'openvz' if proxmox.version() < LooseVersion('4.0') else 'lxc'
state = module.params['state'] state = module.params['state']
api_host = module.params['api_host']
api_password = module.params['api_password']
api_token_id = module.params['api_token_id']
api_token_secret = module.params['api_token_secret']
api_user = module.params['api_user']
vmid = module.params['vmid'] vmid = module.params['vmid']
validate_certs = module.params['validate_certs']
node = module.params['node'] node = module.params['node']
disk = module.params['disk'] disk = module.params['disk']
cpus = module.params['cpus'] cpus = module.params['cpus']
@ -664,50 +621,36 @@ def main():
if module.params[param] is None: if module.params[param] is None:
module.params[param] = value module.params[param] = value
auth_args = {'user': api_user}
if not api_token_id:
auth_args['password'] = api_password
else:
auth_args['token_name'] = api_token_id
auth_args['token_value'] = api_token_secret
try:
proxmox = ProxmoxAPI(api_host, verify_ssl=validate_certs, **auth_args)
global VZ_TYPE
VZ_TYPE = 'openvz' if proxmox_version(proxmox) < LooseVersion('4.0') else 'lxc'
except Exception as e:
module.fail_json(msg='authorization on proxmox cluster failed with exception: %s' % e)
# If vmid not set get the Next VM id from ProxmoxAPI # If vmid not set get the Next VM id from ProxmoxAPI
# If hostname is set get the VM id from ProxmoxAPI # If hostname is set get the VM id from ProxmoxAPI
if not vmid and state == 'present': if not vmid and state == 'present':
vmid = get_nextvmid(module, proxmox) vmid = proxmox.get_nextvmid()
elif not vmid and hostname: elif not vmid and hostname:
hosts = get_vmid(proxmox, hostname) vmid = proxmox.get_vmid(hostname, choose_first_if_multiple=True)
if len(hosts) == 0:
module.fail_json(msg="Vmid could not be fetched => Hostname doesn't exist (action: %s)" % state)
vmid = hosts[0]
elif not vmid: elif not vmid:
module.exit_json(changed=False, msg="Vmid could not be fetched for the following action: %s" % state) module.exit_json(changed=False, msg="Vmid could not be fetched for the following action: %s" % state)
# Create a new container # Create a new container
if state == 'present' and clone is None: if state == 'present' and clone is None:
try: try:
if get_instance(proxmox, vmid) and not module.params['force']: if proxmox.get_vm(vmid, ignore_missing=True) and not module.params['force']:
module.exit_json(changed=False, msg="VM with vmid = %s is already exists" % vmid) module.exit_json(changed=False, msg="VM with vmid = %s is already exists" % vmid)
# If no vmid was passed, there cannot be another VM named 'hostname' # If no vmid was passed, there cannot be another VM named 'hostname'
if not module.params['vmid'] and get_vmid(proxmox, hostname) and not module.params['force']: if (not module.params['vmid'] and
module.exit_json(changed=False, msg="VM with hostname %s already exists and has ID number %s" % (hostname, get_vmid(proxmox, hostname)[0])) proxmox.get_vmid(hostname, ignore_missing=True, choose_first_if_multiple=True) and
elif not node_check(proxmox, node): not module.params['force']):
vmid = proxmox.get_vmid(hostname, choose_first_if_multiple=True)
module.exit_json(changed=False, msg="VM with hostname %s already exists and has ID number %s" % (hostname, vmid))
elif not proxmox.get_node(node):
module.fail_json(msg="node '%s' not exists in cluster" % node) module.fail_json(msg="node '%s' not exists in cluster" % node)
elif not content_check(proxmox, node, module.params['ostemplate'], template_store): elif not proxmox.content_check(node, module.params['ostemplate'], template_store):
module.fail_json(msg="ostemplate '%s' not exists on node %s and storage %s" module.fail_json(msg="ostemplate '%s' not exists on node %s and storage %s"
% (module.params['ostemplate'], node, template_store)) % (module.params['ostemplate'], node, template_store))
except Exception as e: except Exception as e:
module.fail_json(msg="Pre-creation checks of {VZ_TYPE} VM {vmid} failed with exception: {e}".format(VZ_TYPE=VZ_TYPE, vmid=vmid, e=e)) module.fail_json(msg="Pre-creation checks of {VZ_TYPE} VM {vmid} failed with exception: {e}".format(VZ_TYPE=VZ_TYPE, vmid=vmid, e=e))
try: try:
create_instance(module, proxmox, vmid, node, disk, storage, cpus, memory, swap, timeout, clone, proxmox.create_instance(vmid, node, disk, storage, cpus, memory, swap, timeout, clone,
cores=module.params['cores'], cores=module.params['cores'],
pool=module.params['pool'], pool=module.params['pool'],
password=module.params['password'], password=module.params['password'],
@ -734,18 +677,21 @@ def main():
# Clone a container # Clone a container
elif state == 'present' and clone is not None: elif state == 'present' and clone is not None:
try: try:
if get_instance(proxmox, vmid) and not module.params['force']: if proxmox.get_vm(vmid, ignore_missing=True) and not module.params['force']:
module.exit_json(changed=False, msg="VM with vmid = %s is already exists" % vmid) module.exit_json(changed=False, msg="VM with vmid = %s is already exists" % vmid)
# If no vmid was passed, there cannot be another VM named 'hostname' # If no vmid was passed, there cannot be another VM named 'hostname'
if not module.params['vmid'] and get_vmid(proxmox, hostname) and not module.params['force']: if (not module.params['vmid'] and
module.exit_json(changed=False, msg="VM with hostname %s already exists and has ID number %s" % (hostname, get_vmid(proxmox, hostname)[0])) proxmox.get_vmid(hostname, ignore_missing=True, choose_first_if_multiple=True) and
if not get_instance(proxmox, clone): not module.params['force']):
vmid = proxmox.get_vmid(hostname, choose_first_if_multiple=True)
module.exit_json(changed=False, msg="VM with hostname %s already exists and has ID number %s" % (hostname, vmid))
if not proxmox.get_vm(clone, ignore_missing=True):
module.exit_json(changed=False, msg="Container to be cloned does not exist") module.exit_json(changed=False, msg="Container to be cloned does not exist")
except Exception as e: except Exception as e:
module.fail_json(msg="Pre-clone checks of {VZ_TYPE} VM {vmid} failed with exception: {e}".format(VZ_TYPE=VZ_TYPE, vmid=vmid, e=e)) module.fail_json(msg="Pre-clone checks of {VZ_TYPE} VM {vmid} failed with exception: {e}".format(VZ_TYPE=VZ_TYPE, vmid=vmid, e=e))
try: try:
create_instance(module, proxmox, vmid, node, disk, storage, cpus, memory, swap, timeout, clone) proxmox.create_instance(vmid, node, disk, storage, cpus, memory, swap, timeout, clone)
module.exit_json(changed=True, msg="Cloned VM %s from %s" % (vmid, clone)) module.exit_json(changed=True, msg="Cloned VM %s from %s" % (vmid, clone))
except Exception as e: except Exception as e:
@ -753,64 +699,60 @@ def main():
elif state == 'started': elif state == 'started':
try: try:
vm = get_instance(proxmox, vmid) vm = proxmox.get_vm(vmid)
if not vm: if getattr(proxmox.proxmox_api.nodes(vm['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'running':
module.fail_json(msg='VM with vmid = %s not exists in cluster' % vmid)
if getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'running':
module.exit_json(changed=False, msg="VM %s is already running" % vmid) module.exit_json(changed=False, msg="VM %s is already running" % vmid)
if start_instance(module, proxmox, vm, vmid, timeout): if proxmox.start_instance(vm, vmid, timeout):
module.exit_json(changed=True, msg="VM %s started" % vmid) module.exit_json(changed=True, msg="VM %s started" % vmid)
except Exception as e: except Exception as e:
module.fail_json(msg="starting of VM %s failed with exception: %s" % (vmid, e)) module.fail_json(msg="starting of VM %s failed with exception: %s" % (vmid, e))
elif state == 'stopped': elif state == 'stopped':
try: try:
vm = get_instance(proxmox, vmid) vm = proxmox.get_vm(vmid)
if not vm:
module.fail_json(msg='VM with vmid = %s not exists in cluster' % vmid)
if getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'mounted': if getattr(proxmox.proxmox_api.nodes(vm['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'mounted':
if module.params['force']: if module.params['force']:
if umount_instance(module, proxmox, vm, vmid, timeout): if proxmox.umount_instance(vm, vmid, timeout):
module.exit_json(changed=True, msg="VM %s is shutting down" % vmid) module.exit_json(changed=True, msg="VM %s is shutting down" % vmid)
else: else:
module.exit_json(changed=False, msg=("VM %s is already shutdown, but mounted. " module.exit_json(changed=False, msg=("VM %s is already shutdown, but mounted. "
"You can use force option to umount it.") % vmid) "You can use force option to umount it.") % vmid)
if getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'stopped': if getattr(proxmox.proxmox_api.nodes(vm['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'stopped':
module.exit_json(changed=False, msg="VM %s is already shutdown" % vmid) module.exit_json(changed=False, msg="VM %s is already shutdown" % vmid)
if stop_instance(module, proxmox, vm, vmid, timeout, force=module.params['force']): if proxmox.stop_instance(vm, vmid, timeout, force=module.params['force']):
module.exit_json(changed=True, msg="VM %s is shutting down" % vmid) module.exit_json(changed=True, msg="VM %s is shutting down" % vmid)
except Exception as e: except Exception as e:
module.fail_json(msg="stopping of VM %s failed with exception: %s" % (vmid, e)) module.fail_json(msg="stopping of VM %s failed with exception: %s" % (vmid, e))
elif state == 'restarted': elif state == 'restarted':
try: try:
vm = get_instance(proxmox, vmid) vm = proxmox.get_vm(vmid)
if not vm:
module.fail_json(msg='VM with vmid = %s not exists in cluster' % vmid) vm_status = getattr(proxmox.proxmox_api.nodes(vm['node']), VZ_TYPE)(vmid).status.current.get()['status']
if (getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'stopped' or if vm_status in ['stopped', 'mounted']:
getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'mounted'):
module.exit_json(changed=False, msg="VM %s is not running" % vmid) module.exit_json(changed=False, msg="VM %s is not running" % vmid)
if (stop_instance(module, proxmox, vm, vmid, timeout, force=module.params['force']) and if (proxmox.stop_instance(vm, vmid, timeout, force=module.params['force']) and
start_instance(module, proxmox, vm, vmid, timeout)): proxmox.start_instance(vm, vmid, timeout)):
module.exit_json(changed=True, msg="VM %s is restarted" % vmid) module.exit_json(changed=True, msg="VM %s is restarted" % vmid)
except Exception as e: except Exception as e:
module.fail_json(msg="restarting of VM %s failed with exception: %s" % (vmid, e)) module.fail_json(msg="restarting of VM %s failed with exception: %s" % (vmid, e))
elif state == 'absent': elif state == 'absent':
try: try:
vm = get_instance(proxmox, vmid) vm = proxmox.get_vm(vmid, ignore_missing=True)
if not vm: if not vm:
module.exit_json(changed=False, msg="VM %s does not exist" % vmid) module.exit_json(changed=False, msg="VM %s does not exist" % vmid)
if getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'running': vm_status = getattr(proxmox.proxmox_api.nodes(vm['node']), VZ_TYPE)(vmid).status.current.get()['status']
if vm_status == 'running':
module.exit_json(changed=False, msg="VM %s is running. Stop it before deletion." % vmid) module.exit_json(changed=False, msg="VM %s is running. Stop it before deletion." % vmid)
if getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).status.current.get()['status'] == 'mounted': if vm_status == 'mounted':
module.exit_json(changed=False, msg="VM %s is mounted. Stop it with force option before deletion." % vmid) module.exit_json(changed=False, msg="VM %s is mounted. Stop it with force option before deletion." % vmid)
delete_params = {} delete_params = {}
@ -818,16 +760,16 @@ def main():
if module.params['purge']: if module.params['purge']:
delete_params['purge'] = 1 delete_params['purge'] = 1
taskid = getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE).delete(vmid, **delete_params) taskid = getattr(proxmox.proxmox_api.nodes(vm['node']), VZ_TYPE).delete(vmid, **delete_params)
while timeout: while timeout:
if (proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['status'] == 'stopped' and task_status = proxmox.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()
proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'): if (task_status['status'] == 'stopped' and task_status['exitstatus'] == 'OK'):
module.exit_json(changed=True, msg="VM %s removed" % vmid) module.exit_json(changed=True, msg="VM %s removed" % vmid)
timeout -= 1 timeout -= 1
if timeout == 0: if timeout == 0:
module.fail_json(msg='Reached timeout while waiting for removing VM. Last line in task before timeout: %s' module.fail_json(msg='Reached timeout while waiting for removing VM. Last line in task before timeout: %s'
% proxmox.nodes(vm[0]['node']).tasks(taskid).log.get()[:1]) % proxmox.proxmox_api.nodes(vm['node']).tasks(taskid).log.get()[:1])
time.sleep(1) time.sleep(1)
except Exception as e: except Exception as e:

View file

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
#
# Copyright: (c) 2016, Abdoul Bah (@helldorado) <bahabdoul at gmail.com> # Copyright: (c) 2016, Abdoul Bah (@helldorado) <bahabdoul at gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
@ -728,47 +728,30 @@ import traceback
from ansible.module_utils.six.moves.urllib.parse import quote from ansible.module_utils.six.moves.urllib.parse import quote
from ansible_collections.community.general.plugins.module_utils.version import LooseVersion from ansible_collections.community.general.plugins.module_utils.version import LooseVersion
from ansible_collections.community.general.plugins.module_utils.proxmox import (proxmox_auth_argument_spec, ProxmoxAnsible)
try: from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from proxmoxer import ProxmoxAPI
HAS_PROXMOXER = True
except ImportError:
HAS_PROXMOXER = False
from ansible.module_utils.basic import AnsibleModule, env_fallback
from ansible.module_utils.common.text.converters import to_native from ansible.module_utils.common.text.converters import to_native
def get_nextvmid(module, proxmox): def parse_mac(netstr):
try: return re.search('=(.*?),', netstr).group(1)
vmid = proxmox.cluster.nextid.get()
return vmid
except Exception as e:
module.fail_json(msg="Unable to get next vmid. Failed with exception: %s" % to_native(e),
exception=traceback.format_exc())
def get_vmid(proxmox, name): def parse_dev(devstr):
return [vm['vmid'] for vm in proxmox.cluster.resources.get(type='vm') if vm.get('name') == name] return re.search('(.*?)(,|$)', devstr).group(1)
def get_vm(proxmox, vmid): class ProxmoxKvmAnsible(ProxmoxAnsible):
return [vm for vm in proxmox.cluster.resources.get(type='vm') if vm['vmid'] == int(vmid)] def get_vminfo(self, node, vmid, **kwargs):
def node_check(proxmox, node):
return [True for nd in proxmox.nodes.get() if nd['node'] == node]
def get_vminfo(module, proxmox, node, vmid, **kwargs):
global results global results
results = {} results = {}
mac = {} mac = {}
devices = {} devices = {}
try: try:
vm = proxmox.nodes(node).qemu(vmid).config.get() vm = self.proxmox_api.nodes(node).qemu(vmid).config.get()
except Exception as e: except Exception as e:
module.fail_json(msg='Getting information for VM with vmid = %s failed with exception: %s' % (vmid, e)) self.module.fail_json(msg='Getting information for VM with vmid = %s failed with exception: %s' % (vmid, e))
# Sanitize kwargs. Remove not defined args and ensure True and False converted to int. # Sanitize kwargs. Remove not defined args and ensure True and False converted to int.
kwargs = dict((k, v) for k, v in kwargs.items() if v is not None) kwargs = dict((k, v) for k, v in kwargs.items() if v is not None)
@ -793,29 +776,19 @@ def get_vminfo(module, proxmox, node, vmid, **kwargs):
results['devices'] = devices results['devices'] = devices
results['vmid'] = int(vmid) results['vmid'] = int(vmid)
def settings(self, vmid, node, **kwargs):
def parse_mac(netstr): proxmox_node = self.proxmox_api.nodes(node)
return re.search('=(.*?),', netstr).group(1)
def parse_dev(devstr):
return re.search('(.*?)(,|$)', devstr).group(1)
def settings(proxmox, vmid, node, **kwargs):
proxmox_node = proxmox.nodes(node)
# Sanitize kwargs. Remove not defined args and ensure True and False converted to int. # Sanitize kwargs. Remove not defined args and ensure True and False converted to int.
kwargs = dict((k, v) for k, v in kwargs.items() if v is not None) kwargs = dict((k, v) for k, v in kwargs.items() if v is not None)
return proxmox_node.qemu(vmid).config.set(**kwargs) is None return proxmox_node.qemu(vmid).config.set(**kwargs) is None
def wait_for_task(self, node, taskid):
def wait_for_task(module, proxmox, node, taskid): timeout = self.module.params['timeout']
timeout = module.params['timeout']
while timeout: while timeout:
task = proxmox.nodes(node).tasks(taskid).status.get() task = self.proxmox_api.nodes(node).tasks(taskid).status.get()
if task['status'] == 'stopped' and task['exitstatus'] == 'OK': if task['status'] == 'stopped' and task['exitstatus'] == 'OK':
# Wait an extra second as the API can be a ahead of the hypervisor # Wait an extra second as the API can be a ahead of the hypervisor
time.sleep(1) time.sleep(1)
@ -826,8 +799,7 @@ def wait_for_task(module, proxmox, node, taskid):
time.sleep(1) time.sleep(1)
return False return False
def create_vm(self, vmid, newid, node, name, memory, cpu, cores, sockets, update, **kwargs):
def create_vm(module, proxmox, vmid, newid, node, name, memory, cpu, cores, sockets, update, **kwargs):
# Available only in PVE 4 # Available only in PVE 4
only_v4 = ['force', 'protection', 'skiplock'] only_v4 = ['force', 'protection', 'skiplock']
only_v6 = ['ciuser', 'cipassword', 'sshkeys', 'ipconfig', 'tags'] only_v6 = ['ciuser', 'cipassword', 'sshkeys', 'ipconfig', 'tags']
@ -838,20 +810,23 @@ def create_vm(module, proxmox, vmid, newid, node, name, memory, cpu, cores, sock
# Default args for vm. Note: -args option is for experts only. It allows you to pass arbitrary arguments to kvm. # Default args for vm. Note: -args option is for experts only. It allows you to pass arbitrary arguments to kvm.
vm_args = "-serial unix:/var/run/qemu-server/{0}.serial,server,nowait".format(vmid) vm_args = "-serial unix:/var/run/qemu-server/{0}.serial,server,nowait".format(vmid)
proxmox_node = proxmox.nodes(node) proxmox_node = self.proxmox_api.nodes(node)
# Sanitize kwargs. Remove not defined args and ensure True and False converted to int. # Sanitize kwargs. Remove not defined args and ensure True and False converted to int.
kwargs = dict((k, v) for k, v in kwargs.items() if v is not None) kwargs = dict((k, v) for k, v in kwargs.items() if v is not None)
kwargs.update(dict([k, int(v)] for k, v in kwargs.items() if isinstance(v, bool))) kwargs.update(dict([k, int(v)] for k, v in kwargs.items() if isinstance(v, bool)))
version = self.version()
pve_major_version = 3 if version < LooseVersion('4.0') else version.version[0]
# The features work only on PVE 4+ # The features work only on PVE 4+
if PVE_MAJOR_VERSION < 4: if pve_major_version < 4:
for p in only_v4: for p in only_v4:
if p in kwargs: if p in kwargs:
del kwargs[p] del kwargs[p]
# The features work only on PVE 6 # The features work only on PVE 6
if PVE_MAJOR_VERSION < 6: if pve_major_version < 6:
for p in only_v6: for p in only_v6:
if p in kwargs: if p in kwargs:
del kwargs[p] del kwargs[p]
@ -892,12 +867,12 @@ def create_vm(module, proxmox, vmid, newid, node, name, memory, cpu, cores, sock
del kwargs['numa_enabled'] del kwargs['numa_enabled']
# PVE api expects strings for the following params # PVE api expects strings for the following params
if 'nameservers' in module.params: if 'nameservers' in self.module.params:
nameservers = module.params.pop('nameservers') nameservers = self.module.params.pop('nameservers')
if nameservers: if nameservers:
kwargs['nameserver'] = ' '.join(nameservers) kwargs['nameserver'] = ' '.join(nameservers)
if 'searchdomains' in module.params: if 'searchdomains' in self.module.params:
searchdomains = module.params.pop('searchdomains') searchdomains = self.module.params.pop('searchdomains')
if searchdomains: if searchdomains:
kwargs['searchdomain'] = ' '.join(searchdomains) kwargs['searchdomain'] = ' '.join(searchdomains)
@ -906,80 +881,68 @@ def create_vm(module, proxmox, vmid, newid, node, name, memory, cpu, cores, sock
re_tag = re.compile(r'^[a-z0-9_][a-z0-9_\-\+\.]*$') re_tag = re.compile(r'^[a-z0-9_][a-z0-9_\-\+\.]*$')
for tag in kwargs['tags']: for tag in kwargs['tags']:
if not re_tag.match(tag): if not re_tag.match(tag):
module.fail_json(msg='%s is not a valid tag' % tag) self.module.fail_json(msg='%s is not a valid tag' % tag)
kwargs['tags'] = ",".join(kwargs['tags']) kwargs['tags'] = ",".join(kwargs['tags'])
# -args and skiplock require root@pam user - but can not use api tokens # -args and skiplock require root@pam user - but can not use api tokens
if module.params['api_user'] == "root@pam" and module.params['args'] is None: if self.module.params['api_user'] == "root@pam" and self.module.params['args'] is None:
if not update and module.params['proxmox_default_behavior'] == 'compatibility': if not update and self.module.params['proxmox_default_behavior'] == 'compatibility':
kwargs['args'] = vm_args kwargs['args'] = vm_args
elif module.params['api_user'] == "root@pam" and module.params['args'] is not None: elif self.module.params['api_user'] == "root@pam" and self.module.params['args'] is not None:
kwargs['args'] = module.params['args'] kwargs['args'] = self.module.params['args']
elif module.params['api_user'] != "root@pam" and module.params['args'] is not None: elif self.module.params['api_user'] != "root@pam" and self.module.params['args'] is not None:
module.fail_json(msg='args parameter require root@pam user. ') self.module.fail_json(msg='args parameter require root@pam user. ')
if module.params['api_user'] != "root@pam" and module.params['skiplock'] is not None: if self.module.params['api_user'] != "root@pam" and self.module.params['skiplock'] is not None:
module.fail_json(msg='skiplock parameter require root@pam user. ') self.module.fail_json(msg='skiplock parameter require root@pam user. ')
if update: if update:
if proxmox_node.qemu(vmid).config.set(name=name, memory=memory, cpu=cpu, cores=cores, sockets=sockets, **kwargs) is None: if proxmox_node.qemu(vmid).config.set(name=name, memory=memory, cpu=cpu, cores=cores, sockets=sockets, **kwargs) is None:
return True return True
else: else:
return False return False
elif module.params['clone'] is not None: elif self.module.params['clone'] is not None:
for param in valid_clone_params: for param in valid_clone_params:
if module.params[param] is not None: if self.module.params[param] is not None:
clone_params[param] = module.params[param] clone_params[param] = self.module.params[param]
clone_params.update(dict([k, int(v)] for k, v in clone_params.items() if isinstance(v, bool))) clone_params.update(dict([k, int(v)] for k, v in clone_params.items() if isinstance(v, bool)))
taskid = proxmox_node.qemu(vmid).clone.post(newid=newid, name=name, **clone_params) taskid = proxmox_node.qemu(vmid).clone.post(newid=newid, name=name, **clone_params)
else: else:
taskid = proxmox_node.qemu.create(vmid=vmid, name=name, memory=memory, cpu=cpu, cores=cores, sockets=sockets, **kwargs) taskid = proxmox_node.qemu.create(vmid=vmid, name=name, memory=memory, cpu=cpu, cores=cores, sockets=sockets, **kwargs)
if not wait_for_task(module, proxmox, node, taskid): if not self.wait_for_task(node, taskid):
module.fail_json(msg='Reached timeout while waiting for creating VM. Last line in task before timeout: %s' % self.module.fail_json(msg='Reached timeout while waiting for creating VM. Last line in task before timeout: %s' %
proxmox_node.tasks(taskid).log.get()[:1]) proxmox_node.tasks(taskid).log.get()[:1])
return False return False
return True return True
def start_vm(self, vm):
def start_vm(module, proxmox, vm): vmid = vm['vmid']
vmid = vm[0]['vmid'] proxmox_node = self.proxmox_api.nodes(vm['node'])
proxmox_node = proxmox.nodes(vm[0]['node'])
taskid = proxmox_node.qemu(vmid).status.start.post() taskid = proxmox_node.qemu(vmid).status.start.post()
if not wait_for_task(module, proxmox, vm[0]['node'], taskid): if not self.wait_for_task(vm['node'], taskid):
module.fail_json(msg='Reached timeout while waiting for starting VM. Last line in task before timeout: %s' % self.module.fail_json(msg='Reached timeout while waiting for starting VM. Last line in task before timeout: %s' %
proxmox_node.tasks(taskid).log.get()[:1]) proxmox_node.tasks(taskid).log.get()[:1])
return False return False
return True return True
def stop_vm(self, vm, force):
def stop_vm(module, proxmox, vm, force): vmid = vm['vmid']
vmid = vm[0]['vmid'] proxmox_node = self.proxmox_api.nodes(vm['node'])
proxmox_node = proxmox.nodes(vm[0]['node'])
taskid = proxmox_node.qemu(vmid).status.shutdown.post(forceStop=(1 if force else 0)) taskid = proxmox_node.qemu(vmid).status.shutdown.post(forceStop=(1 if force else 0))
if not wait_for_task(module, proxmox, vm[0]['node'], taskid): if not self.wait_for_task(vm['node'], taskid):
module.fail_json(msg='Reached timeout while waiting for stopping VM. Last line in task before timeout: %s' % self.module.fail_json(msg='Reached timeout while waiting for stopping VM. Last line in task before timeout: %s' %
proxmox_node.tasks(taskid).log.get()[:1]) proxmox_node.tasks(taskid).log.get()[:1])
return False return False
return True return True
def proxmox_version(proxmox):
apireturn = proxmox.version.get()
return LooseVersion(apireturn['version'])
def main(): def main():
module = AnsibleModule( module_args = proxmox_auth_argument_spec()
argument_spec=dict( kvm_args = dict(
acpi=dict(type='bool'), acpi=dict(type='bool'),
agent=dict(type='bool'), agent=dict(type='bool'),
args=dict(type='str'), args=dict(type='str'),
api_host=dict(required=True),
api_password=dict(no_log=True, fallback=(env_fallback, ['PROXMOX_PASSWORD'])),
api_token_id=dict(no_log=True),
api_token_secret=dict(no_log=True),
api_user=dict(required=True),
autostart=dict(type='bool'), autostart=dict(type='bool'),
balloon=dict(type='int'), balloon=dict(type='int'),
bios=dict(choices=['seabios', 'ovmf']), bios=dict(choices=['seabios', 'ovmf']),
@ -1050,28 +1013,23 @@ def main():
template=dict(type='bool'), template=dict(type='bool'),
timeout=dict(type='int', default=30), timeout=dict(type='int', default=30),
update=dict(type='bool', default=False), update=dict(type='bool', default=False),
validate_certs=dict(type='bool', default=False),
vcpus=dict(type='int'), vcpus=dict(type='int'),
vga=dict(choices=['std', 'cirrus', 'vmware', 'qxl', 'serial0', 'serial1', 'serial2', 'serial3', 'qxl2', 'qxl3', 'qxl4']), vga=dict(choices=['std', 'cirrus', 'vmware', 'qxl', 'serial0', 'serial1', 'serial2', 'serial3', 'qxl2', 'qxl3', 'qxl4']),
virtio=dict(type='dict'), virtio=dict(type='dict'),
vmid=dict(type='int'), vmid=dict(type='int'),
watchdog=dict(), watchdog=dict(),
proxmox_default_behavior=dict(type='str', default='no_defaults', choices=['compatibility', 'no_defaults']), proxmox_default_behavior=dict(type='str', default='no_defaults', choices=['compatibility', 'no_defaults']),
), )
module_args.update(kvm_args)
module = AnsibleModule(
argument_spec=module_args,
mutually_exclusive=[('delete', 'revert'), ('delete', 'update'), ('revert', 'update'), ('clone', 'update'), ('clone', 'delete'), ('clone', 'revert')], mutually_exclusive=[('delete', 'revert'), ('delete', 'update'), ('revert', 'update'), ('clone', 'update'), ('clone', 'delete'), ('clone', 'revert')],
required_together=[('api_token_id', 'api_token_secret')], required_together=[('api_token_id', 'api_token_secret')],
required_one_of=[('name', 'vmid'), ('api_password', 'api_token_id')], required_one_of=[('name', 'vmid'), ('api_password', 'api_token_id')],
required_if=[('state', 'present', ['node'])], required_if=[('state', 'present', ['node'])],
) )
if not HAS_PROXMOXER:
module.fail_json(msg='proxmoxer required for this module')
api_host = module.params['api_host']
api_password = module.params['api_password']
api_token_id = module.params['api_token_id']
api_token_secret = module.params['api_token_secret']
api_user = module.params['api_user']
clone = module.params['clone'] clone = module.params['clone']
cpu = module.params['cpu'] cpu = module.params['cpu']
cores = module.params['cores'] cores = module.params['cores']
@ -1112,87 +1070,70 @@ def main():
if module.params['format'] == 'unspecified': if module.params['format'] == 'unspecified':
module.params['format'] = None module.params['format'] = None
auth_args = {'user': api_user} proxmox = ProxmoxKvmAnsible(module)
if not (api_token_id and api_token_secret):
auth_args['password'] = api_password
else:
auth_args['token_name'] = api_token_id
auth_args['token_value'] = api_token_secret
try:
proxmox = ProxmoxAPI(api_host, verify_ssl=validate_certs, **auth_args)
global PVE_MAJOR_VERSION
version = proxmox_version(proxmox)
PVE_MAJOR_VERSION = 3 if version < LooseVersion('4.0') else version.version[0]
except Exception as e:
module.fail_json(msg='authorization on proxmox cluster failed with exception: %s' % e)
# If vmid is not defined then retrieve its value from the vm name, # If vmid is not defined then retrieve its value from the vm name,
# the cloned vm name or retrieve the next free VM id from ProxmoxAPI. # the cloned vm name or retrieve the next free VM id from ProxmoxAPI.
if not vmid: if not vmid:
if state == 'present' and not update and not clone and not delete and not revert: if state == 'present' and not update and not clone and not delete and not revert:
try: try:
vmid = get_nextvmid(module, proxmox) vmid = proxmox.get_nextvmid()
except Exception: except Exception:
module.fail_json(msg="Can't get the next vmid for VM {0} automatically. Ensure your cluster state is good".format(name)) module.fail_json(msg="Can't get the next vmid for VM {0} automatically. Ensure your cluster state is good".format(name))
else: else:
clone_target = clone or name clone_target = clone or name
try: vmid = proxmox.get_vmid(clone_target, ignore_missing=True, choose_first_if_multiple=True)
vmid = get_vmid(proxmox, clone_target)[0]
except Exception:
vmid = -1
if clone is not None: if clone is not None:
# If newid is not defined then retrieve the next free id from ProxmoxAPI # If newid is not defined then retrieve the next free id from ProxmoxAPI
if not newid: if not newid:
try: try:
newid = get_nextvmid(module, proxmox) newid = proxmox.get_nextvmid()
except Exception: except Exception:
module.fail_json(msg="Can't get the next vmid for VM {0} automatically. Ensure your cluster state is good".format(name)) module.fail_json(msg="Can't get the next vmid for VM {0} automatically. Ensure your cluster state is good".format(name))
# Ensure source VM name exists when cloning # Ensure source VM name exists when cloning
if -1 == vmid: if not vmid:
module.fail_json(msg='VM with name = %s does not exist in cluster' % clone) module.fail_json(msg='VM with name = %s does not exist in cluster' % clone)
# Ensure source VM id exists when cloning # Ensure source VM id exists when cloning
if not get_vm(proxmox, vmid): proxmox.get_vm(vmid)
module.fail_json(vmid=vmid, msg='VM with vmid = %s does not exist in cluster' % vmid)
# Ensure the choosen VM name doesn't already exist when cloning # Ensure the choosen VM name doesn't already exist when cloning
existing_vmid = get_vmid(proxmox, name) existing_vmid = proxmox.get_vmid(name, choose_first_if_multiple=True)
if existing_vmid: if existing_vmid:
module.exit_json(changed=False, vmid=existing_vmid[0], msg="VM with name <%s> already exists" % name) module.exit_json(changed=False, vmid=existing_vmid, msg="VM with name <%s> already exists" % name)
# Ensure the choosen VM id doesn't already exist when cloning # Ensure the choosen VM id doesn't already exist when cloning
if get_vm(proxmox, newid): if proxmox.get_vm(newid, ignore_errors=True):
module.exit_json(changed=False, vmid=vmid, msg="vmid %s with VM name %s already exists" % (newid, name)) module.exit_json(changed=False, vmid=vmid, msg="vmid %s with VM name %s already exists" % (newid, name))
if delete is not None: if delete is not None:
try: try:
settings(proxmox, vmid, node, delete=delete) proxmox.settings(vmid, node, delete=delete)
module.exit_json(changed=True, vmid=vmid, msg="Settings has deleted on VM {0} with vmid {1}".format(name, vmid)) module.exit_json(changed=True, vmid=vmid, msg="Settings has deleted on VM {0} with vmid {1}".format(name, vmid))
except Exception as e: except Exception as e:
module.fail_json(vmid=vmid, msg='Unable to delete settings on VM {0} with vmid {1}: '.format(name, vmid) + str(e)) module.fail_json(vmid=vmid, msg='Unable to delete settings on VM {0} with vmid {1}: '.format(name, vmid) + str(e))
if revert is not None: if revert is not None:
try: try:
settings(proxmox, vmid, node, revert=revert) proxmox.settings(vmid, node, revert=revert)
module.exit_json(changed=True, vmid=vmid, msg="Settings has reverted on VM {0} with vmid {1}".format(name, vmid)) module.exit_json(changed=True, vmid=vmid, msg="Settings has reverted on VM {0} with vmid {1}".format(name, vmid))
except Exception as e: except Exception as e:
module.fail_json(vmid=vmid, msg='Unable to revert settings on VM {0} with vmid {1}: Maybe is not a pending task... '.format(name, vmid) + str(e)) module.fail_json(vmid=vmid, msg='Unable to revert settings on VM {0} with vmid {1}: Maybe is not a pending task... '.format(name, vmid) + str(e))
if state == 'present': if state == 'present':
try: try:
if get_vm(proxmox, vmid) and not (update or clone): if proxmox.get_vm(vmid, ignore_missing=True) and not (update or clone):
module.exit_json(changed=False, vmid=vmid, msg="VM with vmid <%s> already exists" % vmid) module.exit_json(changed=False, vmid=vmid, msg="VM with vmid <%s> already exists" % vmid)
elif get_vmid(proxmox, name) and not (update or clone): elif proxmox.get_vmid(name, ignore_missing=True, choose_first_if_multiple=True) and not (update or clone):
module.exit_json(changed=False, vmid=get_vmid(proxmox, name)[0], msg="VM with name <%s> already exists" % name) module.exit_json(changed=False, vmid=proxmox.get_vmid(name, choose_first_if_multiple=True), msg="VM with name <%s> already exists" % name)
elif not (node, name): elif not (node, name):
module.fail_json(msg='node, name is mandatory for creating/updating vm') module.fail_json(msg='node, name is mandatory for creating/updating vm')
elif not node_check(proxmox, node): elif not proxmox.get_node(node):
module.fail_json(msg="node '%s' does not exist in cluster" % node) module.fail_json(msg="node '%s' does not exist in cluster" % node)
create_vm(module, proxmox, vmid, newid, node, name, memory, cpu, cores, sockets, update, proxmox.create_vm(vmid, newid, node, name, memory, cpu, cores, sockets, update,
acpi=module.params['acpi'], acpi=module.params['acpi'],
agent=module.params['agent'], agent=module.params['agent'],
autostart=module.params['autostart'], autostart=module.params['autostart'],
@ -1253,7 +1194,7 @@ def main():
watchdog=module.params['watchdog']) watchdog=module.params['watchdog'])
if not clone: if not clone:
get_vminfo(module, proxmox, node, vmid, proxmox.get_vminfo(node, vmid,
ide=module.params['ide'], ide=module.params['ide'],
net=module.params['net'], net=module.params['net'],
sata=module.params['sata'], sata=module.params['sata'],
@ -1276,16 +1217,14 @@ def main():
elif state == 'started': elif state == 'started':
status = {} status = {}
try: try:
if -1 == vmid: if not vmid:
module.fail_json(msg='VM with name = %s does not exist in cluster' % name) module.fail_json(msg='VM with name = %s does not exist in cluster' % name)
vm = get_vm(proxmox, vmid) vm = proxmox.get_vm(vmid)
if not vm: status['status'] = vm['status']
module.fail_json(vmid=vmid, msg='VM with vmid <%s> does not exist in cluster' % vmid) if vm['status'] == 'running':
status['status'] = vm[0]['status']
if vm[0]['status'] == 'running':
module.exit_json(changed=False, vmid=vmid, msg="VM %s is already running" % vmid, **status) module.exit_json(changed=False, vmid=vmid, msg="VM %s is already running" % vmid, **status)
if start_vm(module, proxmox, vm): if proxmox.start_vm(vm):
module.exit_json(changed=True, vmid=vmid, msg="VM %s started" % vmid, **status) module.exit_json(changed=True, vmid=vmid, msg="VM %s started" % vmid, **status)
except Exception as e: except Exception as e:
module.fail_json(vmid=vmid, msg="starting of VM %s failed with exception: %s" % (vmid, e), **status) module.fail_json(vmid=vmid, msg="starting of VM %s failed with exception: %s" % (vmid, e), **status)
@ -1293,18 +1232,16 @@ def main():
elif state == 'stopped': elif state == 'stopped':
status = {} status = {}
try: try:
if -1 == vmid: if not vmid:
module.fail_json(msg='VM with name = %s does not exist in cluster' % name) module.fail_json(msg='VM with name = %s does not exist in cluster' % name)
vm = get_vm(proxmox, vmid) vm = proxmox.get_vm(vmid)
if not vm:
module.fail_json(vmid=vmid, msg='VM with vmid = %s does not exist in cluster' % vmid)
status['status'] = vm[0]['status'] status['status'] = vm['status']
if vm[0]['status'] == 'stopped': if vm['status'] == 'stopped':
module.exit_json(changed=False, vmid=vmid, msg="VM %s is already stopped" % vmid, **status) module.exit_json(changed=False, vmid=vmid, msg="VM %s is already stopped" % vmid, **status)
if stop_vm(module, proxmox, vm, force=module.params['force']): if proxmox.stop_vm(vm, force=module.params['force']):
module.exit_json(changed=True, vmid=vmid, msg="VM %s is shutting down" % vmid, **status) module.exit_json(changed=True, vmid=vmid, msg="VM %s is shutting down" % vmid, **status)
except Exception as e: except Exception as e:
module.fail_json(vmid=vmid, msg="stopping of VM %s failed with exception: %s" % (vmid, e), **status) module.fail_json(vmid=vmid, msg="stopping of VM %s failed with exception: %s" % (vmid, e), **status)
@ -1312,17 +1249,15 @@ def main():
elif state == 'restarted': elif state == 'restarted':
status = {} status = {}
try: try:
if -1 == vmid: if not vmid:
module.fail_json(msg='VM with name = %s does not exist in cluster' % name) module.fail_json(msg='VM with name = %s does not exist in cluster' % name)
vm = get_vm(proxmox, vmid) vm = proxmox.get_vm(vmid)
if not vm: status['status'] = vm['status']
module.fail_json(vmid=vmid, msg='VM with vmid = %s does not exist in cluster' % vmid) if vm['status'] == 'stopped':
status['status'] = vm[0]['status']
if vm[0]['status'] == 'stopped':
module.exit_json(changed=False, vmid=vmid, msg="VM %s is not running" % vmid, **status) module.exit_json(changed=False, vmid=vmid, msg="VM %s is not running" % vmid, **status)
if stop_vm(module, proxmox, vm, force=module.params['force']) and start_vm(module, proxmox, vm): if proxmox.stop_vm(vm, force=module.params['force']) and proxmox.start_vm(vm):
module.exit_json(changed=True, vmid=vmid, msg="VM %s is restarted" % vmid, **status) module.exit_json(changed=True, vmid=vmid, msg="VM %s is restarted" % vmid, **status)
except Exception as e: except Exception as e:
module.fail_json(vmid=vmid, msg="restarting of VM %s failed with exception: %s" % (vmid, e), **status) module.fail_json(vmid=vmid, msg="restarting of VM %s failed with exception: %s" % (vmid, e), **status)
@ -1330,19 +1265,19 @@ def main():
elif state == 'absent': elif state == 'absent':
status = {} status = {}
try: try:
vm = get_vm(proxmox, vmid) vm = proxmox.get_vm(vmid, ignore_missing=True)
if not vm: if not vm:
module.exit_json(changed=False, vmid=vmid) module.exit_json(changed=False, vmid=vmid)
proxmox_node = proxmox.nodes(vm[0]['node']) proxmox_node = proxmox.proxmox_api.nodes(vm['node'])
status['status'] = vm[0]['status'] status['status'] = vm['status']
if vm[0]['status'] == 'running': if vm['status'] == 'running':
if module.params['force']: if module.params['force']:
stop_vm(module, proxmox, vm, True) proxmox.stop_vm(vm, True)
else: else:
module.exit_json(changed=False, vmid=vmid, msg="VM %s is running. Stop it before deletion or use force=yes." % vmid) module.exit_json(changed=False, vmid=vmid, msg="VM %s is running. Stop it before deletion or use force=yes." % vmid)
taskid = proxmox_node.qemu.delete(vmid) taskid = proxmox_node.qemu.delete(vmid)
if not wait_for_task(module, proxmox, vm[0]['node'], taskid): if not proxmox.wait_for_task(vm['node'], taskid):
module.fail_json(msg='Reached timeout while waiting for removing VM. Last line in task before timeout: %s' % module.fail_json(msg='Reached timeout while waiting for removing VM. Last line in task before timeout: %s' %
proxmox_node.tasks(taskid).log.get()[:1]) proxmox_node.tasks(taskid).log.get()[:1])
else: else:
@ -1352,14 +1287,12 @@ def main():
elif state == 'current': elif state == 'current':
status = {} status = {}
if -1 == vmid: if not vmid:
module.fail_json(msg='VM with name = %s does not exist in cluster' % name) module.fail_json(msg='VM with name = %s does not exist in cluster' % name)
vm = get_vm(proxmox, vmid) vm = proxmox.get_vm(vmid)
if not vm:
module.fail_json(msg='VM with vmid = %s does not exist in cluster' % vmid)
if not name: if not name:
name = vm[0]['name'] name = vm['name']
current = proxmox.nodes(vm[0]['node']).qemu(vmid).status.current.get()['status'] current = proxmox.proxmox_api.nodes(vm['node']).qemu(vmid).status.current.get()['status']
status['status'] = current status['status'] = current
if status: if status:
module.exit_json(changed=False, vmid=vmid, msg="VM %s with vmid = %s is %s" % (name, vmid, current), **status) module.exit_json(changed=False, vmid=vmid, msg="VM %s with vmid = %s is %s" % (name, vmid, current), **status)

View file

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
#
# Copyright: (c) 2021, Lammert Hellinga (@Kogelvis) <lammert@hellinga.it> # Copyright: (c) 2021, Lammert Hellinga (@Kogelvis) <lammert@hellinga.it>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
@ -136,41 +136,18 @@ msg:
sample: "Nic net0 unchanged on VM with vmid 103" sample: "Nic net0 unchanged on VM with vmid 103"
''' '''
try:
from proxmoxer import ProxmoxAPI
HAS_PROXMOXER = True
except ImportError:
HAS_PROXMOXER = False
from ansible.module_utils.basic import AnsibleModule, env_fallback from ansible.module_utils.basic import AnsibleModule, env_fallback
from ansible_collections.community.general.plugins.module_utils.proxmox import proxmox_auth_argument_spec from ansible_collections.community.general.plugins.module_utils.proxmox import (proxmox_auth_argument_spec, ProxmoxAnsible)
def get_vmid(module, proxmox, name): class ProxmoxNicAnsible(ProxmoxAnsible):
try: def update_nic(self, vmid, interface, model, **kwargs):
vms = [vm['vmid'] for vm in proxmox.cluster.resources.get(type='vm') if vm.get('name') == name] vm = self.get_vm(vmid)
except Exception as e:
module.fail_json(msg='Error: %s occurred while retrieving VM with name = %s' % (e, name))
if not vms:
module.fail_json(msg='No VM found with name: %s' % name)
elif len(vms) > 1:
module.fail_json(msg='Multiple VMs found with name: %s, provide vmid instead' % name)
return vms[0]
def get_vm(proxmox, vmid):
return [vm for vm in proxmox.cluster.resources.get(type='vm') if vm['vmid'] == int(vmid)]
def update_nic(module, proxmox, vmid, interface, model, **kwargs):
vm = get_vm(proxmox, vmid)
try: try:
vminfo = proxmox.nodes(vm[0]['node']).qemu(vmid).config.get() vminfo = self.proxmox_api.nodes(vm['node']).qemu(vmid).config.get()
except Exception as e: except Exception as e:
module.fail_json(msg='Getting information for VM with vmid = %s failed with exception: %s' % (vmid, e)) self.module.fail_json(msg='Getting information for VM with vmid = %s failed with exception: %s' % (vmid, e))
if interface in vminfo: if interface in vminfo:
# Convert the current config to a dictionary # Convert the current config to a dictionary
@ -213,7 +190,7 @@ def update_nic(module, proxmox, vmid, interface, model, **kwargs):
if kwargs['mtu']: if kwargs['mtu']:
config_provided += ",mtu={0}".format(kwargs['mtu']) config_provided += ",mtu={0}".format(kwargs['mtu'])
if model != 'virtio': if model != 'virtio':
module.warn( self.module.warn(
'Ignoring MTU for nic {0} on VM with vmid {1}, ' 'Ignoring MTU for nic {0} on VM with vmid {1}, '
'model should be set to \'virtio\': '.format(interface, vmid)) 'model should be set to \'virtio\': '.format(interface, vmid))
@ -230,23 +207,22 @@ def update_nic(module, proxmox, vmid, interface, model, **kwargs):
config_provided += ",trunks={0}".format(';'.join(str(x) for x in kwargs['trunks'])) config_provided += ",trunks={0}".format(';'.join(str(x) for x in kwargs['trunks']))
net = {interface: config_provided} net = {interface: config_provided}
vm = get_vm(proxmox, vmid) vm = self.get_vm(vmid)
if ((interface not in vminfo) or (vminfo[interface] != config_provided)): if ((interface not in vminfo) or (vminfo[interface] != config_provided)):
if not module.check_mode: if not self.module.check_mode:
proxmox.nodes(vm[0]['node']).qemu(vmid).config.set(**net) self.proxmox_api.nodes(vm['node']).qemu(vmid).config.set(**net)
return True return True
return False return False
def delete_nic(self, vmid, interface):
def delete_nic(module, proxmox, vmid, interface): vm = self.get_vm(vmid)
vm = get_vm(proxmox, vmid) vminfo = self.proxmox_api.nodes(vm['node']).qemu(vmid).config.get()
vminfo = proxmox.nodes(vm[0]['node']).qemu(vmid).config.get()
if interface in vminfo: if interface in vminfo:
if not module.check_mode: if not self.module.check_mode:
proxmox.nodes(vm[0]['node']).qemu(vmid).config.set(vmid=vmid, delete=interface) self.proxmox_api.nodes(vm['node']).qemu(vmid).config.set(vmid=vmid, delete=interface)
return True return True
return False return False
@ -281,44 +257,24 @@ def main():
supports_check_mode=True, supports_check_mode=True,
) )
if not HAS_PROXMOXER: proxmox = ProxmoxNicAnsible(module)
module.fail_json(msg='proxmoxer required for this module')
api_host = module.params['api_host']
api_password = module.params['api_password']
api_token_id = module.params['api_token_id']
api_token_secret = module.params['api_token_secret']
api_user = module.params['api_user']
interface = module.params['interface'] interface = module.params['interface']
model = module.params['model'] model = module.params['model']
name = module.params['name'] name = module.params['name']
state = module.params['state'] state = module.params['state']
validate_certs = module.params['validate_certs']
vmid = module.params['vmid'] vmid = module.params['vmid']
auth_args = {'user': api_user}
if not (api_token_id and api_token_secret):
auth_args['password'] = api_password
else:
auth_args['token_name'] = api_token_id
auth_args['token_value'] = api_token_secret
try:
proxmox = ProxmoxAPI(api_host, verify_ssl=validate_certs, **auth_args)
except Exception as e:
module.fail_json(msg='authorization on proxmox cluster failed with exception: %s' % e)
# If vmid is not defined then retrieve its value from the vm name, # If vmid is not defined then retrieve its value from the vm name,
if not vmid: if not vmid:
vmid = get_vmid(module, proxmox, name) vmid = proxmox.get_vmid(name)
# Ensure VM id exists # Ensure VM id exists
if not get_vm(proxmox, vmid): proxmox.get_vm(vmid)
module.fail_json(vmid=vmid, msg='VM with vmid = %s does not exist in cluster' % vmid)
if state == 'present': if state == 'present':
try: try:
if update_nic(module, proxmox, vmid, interface, model, if proxmox.update_nic(vmid, interface, model,
bridge=module.params['bridge'], bridge=module.params['bridge'],
firewall=module.params['firewall'], firewall=module.params['firewall'],
link_down=module.params['link_down'], link_down=module.params['link_down'],
@ -336,7 +292,7 @@ def main():
elif state == 'absent': elif state == 'absent':
try: try:
if delete_nic(module, proxmox, vmid, interface): if proxmox.delete_nic(vmid, interface):
module.exit_json(changed=True, vmid=vmid, msg="Nic {0} deleted on VM with vmid {1}".format(interface, vmid)) module.exit_json(changed=True, vmid=vmid, msg="Nic {0} deleted on VM with vmid {1}".format(interface, vmid))
else: else:
module.exit_json(vmid=vmid, msg="Nic {0} does not exist on VM with vmid {1}".format(interface, vmid)) module.exit_json(vmid=vmid, msg="Nic {0} does not exist on VM with vmid {1}".format(interface, vmid))

View file

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
#
# Copyright: (c) 2020, Jeffrey van Pelt (@Thulium-Drake) <jeff@vanpelt.one> # Copyright: (c) 2020, Jeffrey van Pelt (@Thulium-Drake) <jeff@vanpelt.one>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
@ -16,22 +16,6 @@ description:
- Allows you to create/delete snapshots from instances in Proxmox VE cluster. - Allows you to create/delete snapshots from instances in Proxmox VE cluster.
- Supports both KVM and LXC, OpenVZ has not been tested, as it is no longer supported on Proxmox VE. - Supports both KVM and LXC, OpenVZ has not been tested, as it is no longer supported on Proxmox VE.
options: options:
api_host:
description:
- The host of the Proxmox VE cluster.
required: true
type: str
api_user:
description:
- The user to authenticate with.
required: true
type: str
api_password:
description:
- The password to authenticate with.
- You can use PROXMOX_PASSWORD environment variable.
type: str
required: yes
hostname: hostname:
description: description:
- The instance name. - The instance name.
@ -41,11 +25,6 @@ options:
- The instance id. - The instance id.
- If not set, will be fetched from PromoxAPI based on the hostname. - If not set, will be fetched from PromoxAPI based on the hostname.
type: str type: str
validate_certs:
description:
- Enable / disable https certificate verification.
type: bool
default: no
state: state:
description: description:
- Indicate desired state of the instance snapshot. - Indicate desired state of the instance snapshot.
@ -83,6 +62,8 @@ notes:
- Supports C(check_mode). - Supports C(check_mode).
requirements: [ "proxmoxer", "python >= 2.7", "requests" ] requirements: [ "proxmoxer", "python >= 2.7", "requests" ]
author: Jeffrey van Pelt (@Thulium-Drake) author: Jeffrey van Pelt (@Thulium-Drake)
extends_documentation_fragment:
- community.general.proxmox.documentation
''' '''
EXAMPLES = r''' EXAMPLES = r'''
@ -110,81 +91,57 @@ RETURN = r'''#'''
import time import time
import traceback import traceback
PROXMOXER_IMP_ERR = None
try:
from proxmoxer import ProxmoxAPI
HAS_PROXMOXER = True
except ImportError:
PROXMOXER_IMP_ERR = traceback.format_exc()
HAS_PROXMOXER = False
from ansible.module_utils.basic import AnsibleModule, missing_required_lib, env_fallback from ansible.module_utils.basic import AnsibleModule, missing_required_lib, env_fallback
from ansible.module_utils.common.text.converters import to_native from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.general.plugins.module_utils.proxmox import (proxmox_auth_argument_spec, ProxmoxAnsible, HAS_PROXMOXER, PROXMOXER_IMP_ERR)
VZ_TYPE = None class ProxmoxSnapAnsible(ProxmoxAnsible):
def snapshot(self, vm, vmid):
return getattr(self.proxmox_api.nodes(vm['node']), vm['type'])(vmid).snapshot
def snapshot_create(self, vm, vmid, timeout, snapname, description, vmstate):
def get_vmid(proxmox, hostname): if self.module.check_mode:
return [vm['vmid'] for vm in proxmox.cluster.resources.get(type='vm') if 'name' in vm and vm['name'] == hostname]
def get_instance(proxmox, vmid):
return [vm for vm in proxmox.cluster.resources.get(type='vm') if int(vm['vmid']) == int(vmid)]
def snapshot_create(module, proxmox, vm, vmid, timeout, snapname, description, vmstate):
if module.check_mode:
return True return True
if VZ_TYPE == 'lxc': if vm['type'] == 'lxc':
taskid = getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).snapshot.post(snapname=snapname, description=description) taskid = self.snapshot(vm, vmid).post(snapname=snapname, description=description)
else: else:
taskid = getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).snapshot.post(snapname=snapname, description=description, vmstate=int(vmstate)) taskid = self.snapshot(vm, vmid).post(snapname=snapname, description=description, vmstate=int(vmstate))
while timeout: while timeout:
if (proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['status'] == 'stopped' and if (self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['status'] == 'stopped' and
proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'): self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'):
return True return True
timeout -= 1 timeout -= 1
if timeout == 0: if timeout == 0:
module.fail_json(msg='Reached timeout while waiting for creating VM snapshot. Last line in task before timeout: %s' % self.module.fail_json(msg='Reached timeout while waiting for creating VM snapshot. Last line in task before timeout: %s' %
proxmox.nodes(vm[0]['node']).tasks(taskid).log.get()[:1]) self.proxmox_api.nodes(vm['node']).tasks(taskid).log.get()[:1])
time.sleep(1) time.sleep(1)
return False return False
def snapshot_remove(self, vm, vmid, timeout, snapname, force):
def snapshot_remove(module, proxmox, vm, vmid, timeout, snapname, force): if self.module.check_mode:
if module.check_mode:
return True return True
taskid = getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).snapshot.delete(snapname, force=int(force)) taskid = self.snapshot(vm, vmid).delete(snapname, force=int(force))
while timeout: while timeout:
if (proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['status'] == 'stopped' and if (self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['status'] == 'stopped' and
proxmox.nodes(vm[0]['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'): self.proxmox_api.nodes(vm['node']).tasks(taskid).status.get()['exitstatus'] == 'OK'):
return True return True
timeout -= 1 timeout -= 1
if timeout == 0: if timeout == 0:
module.fail_json(msg='Reached timeout while waiting for removing VM snapshot. Last line in task before timeout: %s' % self.module.fail_json(msg='Reached timeout while waiting for removing VM snapshot. Last line in task before timeout: %s' %
proxmox.nodes(vm[0]['node']).tasks(taskid).log.get()[:1]) self.proxmox_api.nodes(vm['node']).tasks(taskid).log.get()[:1])
time.sleep(1) time.sleep(1)
return False return False
def setup_api(api_host, api_user, api_password, validate_certs):
api = ProxmoxAPI(api_host, user=api_user, password=api_password, verify_ssl=validate_certs)
return api
def main(): def main():
module = AnsibleModule( module_args = proxmox_auth_argument_spec()
argument_spec=dict( snap_args = dict(
api_host=dict(required=True),
api_user=dict(required=True),
api_password=dict(no_log=True, required=True, fallback=(env_fallback, ['PROXMOX_PASSWORD'])),
vmid=dict(required=False), vmid=dict(required=False),
validate_certs=dict(type='bool', default='no'),
hostname=dict(), hostname=dict(),
timeout=dict(type='int', default=30), timeout=dict(type='int', default=30),
state=dict(default='present', choices=['present', 'absent']), state=dict(default='present', choices=['present', 'absent']),
@ -192,20 +149,18 @@ def main():
snapname=dict(type='str', default='ansible_snap'), snapname=dict(type='str', default='ansible_snap'),
force=dict(type='bool', default='no'), force=dict(type='bool', default='no'),
vmstate=dict(type='bool', default='no'), vmstate=dict(type='bool', default='no'),
), )
module_args.update(snap_args)
module = AnsibleModule(
argument_spec=module_args,
supports_check_mode=True supports_check_mode=True
) )
if not HAS_PROXMOXER: proxmox = ProxmoxSnapAnsible(module)
module.fail_json(msg=missing_required_lib('proxmoxer'),
exception=PROXMOXER_IMP_ERR)
state = module.params['state'] state = module.params['state']
api_user = module.params['api_user']
api_host = module.params['api_host']
api_password = module.params['api_password']
vmid = module.params['vmid'] vmid = module.params['vmid']
validate_certs = module.params['validate_certs']
hostname = module.params['hostname'] hostname = module.params['hostname']
description = module.params['description'] description = module.params['description']
snapname = module.params['snapname'] snapname = module.params['snapname']
@ -213,37 +168,21 @@ def main():
force = module.params['force'] force = module.params['force']
vmstate = module.params['vmstate'] vmstate = module.params['vmstate']
try:
proxmox = setup_api(api_host, api_user, api_password, validate_certs)
except Exception as e:
module.fail_json(msg='authorization on proxmox cluster failed with exception: %s' % to_native(e))
# If hostname is set get the VM id from ProxmoxAPI # If hostname is set get the VM id from ProxmoxAPI
if not vmid and hostname: if not vmid and hostname:
hosts = get_vmid(proxmox, hostname) vmid = proxmox.get_vmid(hostname, choose_first_if_multiple=True)
if len(hosts) == 0:
module.fail_json(msg="Vmid could not be fetched => Hostname does not exist (action: %s)" % state)
vmid = hosts[0]
elif not vmid: elif not vmid:
module.exit_json(changed=False, msg="Vmid could not be fetched for the following action: %s" % state) module.exit_json(changed=False, msg="Vmid could not be fetched for the following action: %s" % state)
vm = get_instance(proxmox, vmid) vm = proxmox.get_vm(vmid)
global VZ_TYPE
VZ_TYPE = vm[0]['type']
if state == 'present': if state == 'present':
try: try:
vm = get_instance(proxmox, vmid) for i in proxmox.snapshot(vm, vmid).get():
if not vm:
module.fail_json(msg='VM with vmid = %s not exists in cluster' % vmid)
for i in getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).snapshot.get():
if i['name'] == snapname: if i['name'] == snapname:
module.exit_json(changed=False, msg="Snapshot %s is already present" % snapname) module.exit_json(changed=False, msg="Snapshot %s is already present" % snapname)
if snapshot_create(module, proxmox, vm, vmid, timeout, snapname, description, vmstate): if proxmox.snapshot_create(vm, vmid, timeout, snapname, description, vmstate):
if module.check_mode: if module.check_mode:
module.exit_json(changed=False, msg="Snapshot %s would be created" % snapname) module.exit_json(changed=False, msg="Snapshot %s would be created" % snapname)
else: else:
@ -254,13 +193,9 @@ def main():
elif state == 'absent': elif state == 'absent':
try: try:
vm = get_instance(proxmox, vmid)
if not vm:
module.fail_json(msg='VM with vmid = %s not exists in cluster' % vmid)
snap_exist = False snap_exist = False
for i in getattr(proxmox.nodes(vm[0]['node']), VZ_TYPE)(vmid).snapshot.get(): for i in proxmox.snapshot(vm, vmid).get():
if i['name'] == snapname: if i['name'] == snapname:
snap_exist = True snap_exist = True
continue continue
@ -268,7 +203,7 @@ def main():
if not snap_exist: if not snap_exist:
module.exit_json(changed=False, msg="Snapshot %s does not exist" % snapname) module.exit_json(changed=False, msg="Snapshot %s does not exist" % snapname)
else: else:
if snapshot_remove(module, proxmox, vm, vmid, timeout, snapname, force): if proxmox.snapshot_remove(vm, vmid, timeout, snapname, force):
if module.check_mode: if module.check_mode:
module.exit_json(changed=False, msg="Snapshot %s would be removed" % snapname) module.exit_json(changed=False, msg="Snapshot %s would be removed" % snapname)
else: else:

View file

@ -2,7 +2,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
# Copyright: Ansible Project # Copyright: Ansible Project
#
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
@ -117,70 +116,56 @@ EXAMPLES = '''
import os import os
import time import time
try:
from proxmoxer import ProxmoxAPI
HAS_PROXMOXER = True
except ImportError:
HAS_PROXMOXER = False
from ansible.module_utils.basic import AnsibleModule, env_fallback from ansible.module_utils.basic import AnsibleModule, env_fallback
from ansible_collections.community.general.plugins.module_utils.proxmox import (proxmox_auth_argument_spec, ProxmoxAnsible)
def get_template(proxmox, node, storage, content_type, template): class ProxmoxTemplateAnsible(ProxmoxAnsible):
return [True for tmpl in proxmox.nodes(node).storage(storage).content.get() def get_template(self, node, storage, content_type, template):
return [True for tmpl in self.proxmox_api.nodes(node).storage(storage).content.get()
if tmpl['volid'] == '%s:%s/%s' % (storage, content_type, template)] if tmpl['volid'] == '%s:%s/%s' % (storage, content_type, template)]
def task_status(self, node, taskid, timeout):
def task_status(module, proxmox, node, taskid, timeout):
""" """
Check the task status and wait until the task is completed or the timeout is reached. Check the task status and wait until the task is completed or the timeout is reached.
""" """
while timeout: while timeout:
task_status = proxmox.nodes(node).tasks(taskid).status.get() task_status = self.proxmox_api.nodes(node).tasks(taskid).status.get()
if task_status['status'] == 'stopped' and task_status['exitstatus'] == 'OK': if task_status['status'] == 'stopped' and task_status['exitstatus'] == 'OK':
return True return True
timeout = timeout - 1 timeout = timeout - 1
if timeout == 0: if timeout == 0:
module.fail_json(msg='Reached timeout while waiting for uploading/downloading template. Last line in task before timeout: %s' self.module.fail_json(msg='Reached timeout while waiting for uploading/downloading template. Last line in task before timeout: %s' %
% proxmox.node(node).tasks(taskid).log.get()[:1]) self.proxmox_api.node(node).tasks(taskid).log.get()[:1])
time.sleep(1) time.sleep(1)
return False return False
def upload_template(self, node, storage, content_type, realpath, timeout):
taskid = self.proxmox_api.nodes(node).storage(storage).upload.post(content=content_type, filename=open(realpath, 'rb'))
return self.task_status(node, taskid, timeout)
def upload_template(module, proxmox, node, storage, content_type, realpath, timeout): def download_template(self, node, storage, template, timeout):
taskid = proxmox.nodes(node).storage(storage).upload.post(content=content_type, filename=open(realpath, 'rb')) taskid = self.proxmox_api.nodes(node).aplinfo.post(storage=storage, template=template)
return task_status(module, proxmox, node, taskid, timeout) return self.task_status(node, taskid, timeout)
def delete_template(self, node, storage, content_type, template, timeout):
def download_template(module, proxmox, node, storage, template, timeout):
taskid = proxmox.nodes(node).aplinfo.post(storage=storage, template=template)
return task_status(module, proxmox, node, taskid, timeout)
def delete_template(module, proxmox, node, storage, content_type, template, timeout):
volid = '%s:%s/%s' % (storage, content_type, template) volid = '%s:%s/%s' % (storage, content_type, template)
proxmox.nodes(node).storage(storage).content.delete(volid) self.proxmox_api.nodes(node).storage(storage).content.delete(volid)
while timeout: while timeout:
if not get_template(proxmox, node, storage, content_type, template): if not self.get_template(node, storage, content_type, template):
return True return True
timeout = timeout - 1 timeout = timeout - 1
if timeout == 0: if timeout == 0:
module.fail_json(msg='Reached timeout while waiting for deleting template.') self.module.fail_json(msg='Reached timeout while waiting for deleting template.')
time.sleep(1) time.sleep(1)
return False return False
def main(): def main():
module = AnsibleModule( module_args = proxmox_auth_argument_spec()
argument_spec=dict( template_args = dict(
api_host=dict(required=True),
api_password=dict(no_log=True, fallback=(env_fallback, ['PROXMOX_PASSWORD'])),
api_token_id=dict(no_log=True),
api_token_secret=dict(no_log=True),
api_user=dict(required=True),
validate_certs=dict(type='bool', default=False),
node=dict(), node=dict(),
src=dict(type='path'), src=dict(type='path'),
template=dict(), template=dict(),
@ -189,40 +174,23 @@ def main():
timeout=dict(type='int', default=30), timeout=dict(type='int', default=30),
force=dict(type='bool', default=False), force=dict(type='bool', default=False),
state=dict(default='present', choices=['present', 'absent']), state=dict(default='present', choices=['present', 'absent']),
), )
module_args.update(template_args)
module = AnsibleModule(
argument_spec=module_args,
required_together=[('api_token_id', 'api_token_secret')], required_together=[('api_token_id', 'api_token_secret')],
required_one_of=[('api_password', 'api_token_id')], required_one_of=[('api_password', 'api_token_id')],
required_if=[('state', 'absent', ['template'])] required_if=[('state', 'absent', ['template'])]
) )
if not HAS_PROXMOXER: proxmox = ProxmoxTemplateAnsible(module)
module.fail_json(msg='proxmoxer required for this module')
state = module.params['state'] state = module.params['state']
api_host = module.params['api_host']
api_password = module.params['api_password']
api_token_id = module.params['api_token_id']
api_token_secret = module.params['api_token_secret']
api_user = module.params['api_user']
validate_certs = module.params['validate_certs']
node = module.params['node'] node = module.params['node']
storage = module.params['storage'] storage = module.params['storage']
timeout = module.params['timeout'] timeout = module.params['timeout']
auth_args = {'user': api_user}
if not (api_token_id and api_token_secret):
auth_args['password'] = api_password
else:
auth_args['token_name'] = api_token_id
auth_args['token_value'] = api_token_secret
try:
proxmox = ProxmoxAPI(api_host, verify_ssl=validate_certs, **auth_args)
# Used to test the validity of the token if given
proxmox.version.get()
except Exception as e:
module.fail_json(msg='authorization on proxmox cluster failed with exception: %s' % e)
if state == 'present': if state == 'present':
try: try:
content_type = module.params['content_type'] content_type = module.params['content_type']
@ -235,21 +203,21 @@ def main():
if not template: if not template:
module.fail_json(msg='template param for downloading appliance template is mandatory') module.fail_json(msg='template param for downloading appliance template is mandatory')
if get_template(proxmox, node, storage, content_type, template) and not module.params['force']: if proxmox.get_template(node, storage, content_type, template) and not module.params['force']:
module.exit_json(changed=False, msg='template with volid=%s:%s/%s already exists' % (storage, content_type, template)) module.exit_json(changed=False, msg='template with volid=%s:%s/%s already exists' % (storage, content_type, template))
if download_template(module, proxmox, node, storage, template, timeout): if proxmox.download_template(node, storage, template, timeout):
module.exit_json(changed=True, msg='template with volid=%s:%s/%s downloaded' % (storage, content_type, template)) module.exit_json(changed=True, msg='template with volid=%s:%s/%s downloaded' % (storage, content_type, template))
template = os.path.basename(src) template = os.path.basename(src)
if get_template(proxmox, node, storage, content_type, template) and not module.params['force']: if proxmox.get_template(node, storage, content_type, template) and not module.params['force']:
module.exit_json(changed=False, msg='template with volid=%s:%s/%s is already exists' % (storage, content_type, template)) module.exit_json(changed=False, msg='template with volid=%s:%s/%s is already exists' % (storage, content_type, template))
elif not src: elif not src:
module.fail_json(msg='src param to uploading template file is mandatory') module.fail_json(msg='src param to uploading template file is mandatory')
elif not (os.path.exists(src) and os.path.isfile(src)): elif not (os.path.exists(src) and os.path.isfile(src)):
module.fail_json(msg='template file on path %s not exists' % src) module.fail_json(msg='template file on path %s not exists' % src)
if upload_template(module, proxmox, node, storage, content_type, src, timeout): if proxmox.upload_template(node, storage, content_type, src, timeout):
module.exit_json(changed=True, msg='template with volid=%s:%s/%s uploaded' % (storage, content_type, template)) module.exit_json(changed=True, msg='template with volid=%s:%s/%s uploaded' % (storage, content_type, template))
except Exception as e: except Exception as e:
module.fail_json(msg="uploading/downloading of template %s failed with exception: %s" % (template, e)) module.fail_json(msg="uploading/downloading of template %s failed with exception: %s" % (template, e))
@ -259,10 +227,10 @@ def main():
content_type = module.params['content_type'] content_type = module.params['content_type']
template = module.params['template'] template = module.params['template']
if not get_template(proxmox, node, storage, content_type, template): if not proxmox.get_template(node, storage, content_type, template):
module.exit_json(changed=False, msg='template with volid=%s:%s/%s is already deleted' % (storage, content_type, template)) module.exit_json(changed=False, msg='template with volid=%s:%s/%s is already deleted' % (storage, content_type, template))
if delete_template(module, proxmox, node, storage, content_type, template, timeout): if proxmox.delete_template(node, storage, content_type, template, timeout):
module.exit_json(changed=True, msg='template with volid=%s:%s/%s deleted' % (storage, content_type, template)) module.exit_json(changed=True, msg='template with volid=%s:%s/%s deleted' % (storage, content_type, template))
except Exception as e: except Exception as e:
module.fail_json(msg="deleting of template %s failed with exception: %s" % (template, e)) module.fail_json(msg="deleting of template %s failed with exception: %s" % (template, e))

View file

@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2021, Ansible Project # Copyright: (c) 2021, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

View file

@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2019, Ansible Project # Copyright: (c) 2019, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
@ -6,8 +8,9 @@ __metaclass__ = type
import json import json
import pytest import pytest
from ansible_collections.community.general.tests.unit.compat.mock import MagicMock from ansible_collections.community.general.tests.unit.compat.mock import MagicMock, patch
from ansible_collections.community.general.plugins.modules.cloud.misc import proxmox_snap from ansible_collections.community.general.plugins.modules.cloud.misc import proxmox_snap
import ansible_collections.community.general.plugins.module_utils.proxmox as proxmox_utils
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args
@ -32,8 +35,8 @@ def get_resources(type):
"status": "running"}] "status": "running"}]
def fake_api(api_host, api_user, api_password, validate_certs): def fake_api(mocker):
r = MagicMock() r = mocker.MagicMock()
r.cluster.resources.get = MagicMock(side_effect=get_resources) r.cluster.resources.get = MagicMock(side_effect=get_resources)
return r return r
@ -48,7 +51,8 @@ def test_proxmox_snap_without_argument(capfd):
assert json.loads(out)['failed'] assert json.loads(out)['failed']
def test_create_snapshot_check_mode(capfd, mocker): @patch('ansible_collections.community.general.plugins.module_utils.proxmox.ProxmoxAnsible._connect')
def test_create_snapshot_check_mode(connect_mock, capfd, mocker):
set_module_args({"hostname": "test-lxc", set_module_args({"hostname": "test-lxc",
"api_user": "root@pam", "api_user": "root@pam",
"api_password": "secret", "api_password": "secret",
@ -58,8 +62,8 @@ def test_create_snapshot_check_mode(capfd, mocker):
"timeout": "1", "timeout": "1",
"force": True, "force": True,
"_ansible_check_mode": True}) "_ansible_check_mode": True})
proxmox_snap.HAS_PROXMOXER = True proxmox_utils.HAS_PROXMOXER = True
proxmox_snap.setup_api = mocker.MagicMock(side_effect=fake_api) connect_mock.side_effect = lambda: fake_api(mocker)
with pytest.raises(SystemExit) as results: with pytest.raises(SystemExit) as results:
proxmox_snap.main() proxmox_snap.main()
@ -68,7 +72,8 @@ def test_create_snapshot_check_mode(capfd, mocker):
assert not json.loads(out)['changed'] assert not json.loads(out)['changed']
def test_remove_snapshot_check_mode(capfd, mocker): @patch('ansible_collections.community.general.plugins.module_utils.proxmox.ProxmoxAnsible._connect')
def test_remove_snapshot_check_mode(connect_mock, capfd, mocker):
set_module_args({"hostname": "test-lxc", set_module_args({"hostname": "test-lxc",
"api_user": "root@pam", "api_user": "root@pam",
"api_password": "secret", "api_password": "secret",
@ -78,8 +83,8 @@ def test_remove_snapshot_check_mode(capfd, mocker):
"timeout": "1", "timeout": "1",
"force": True, "force": True,
"_ansible_check_mode": True}) "_ansible_check_mode": True})
proxmox_snap.HAS_PROXMOXER = True proxmox_utils.HAS_PROXMOXER = True
proxmox_snap.setup_api = mocker.MagicMock(side_effect=fake_api) connect_mock.side_effect = lambda: fake_api(mocker)
with pytest.raises(SystemExit) as results: with pytest.raises(SystemExit) as results:
proxmox_snap.main() proxmox_snap.main()

View file

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
#
# Copyright: (c) 2021, Andreas Botzner (@paginabianca) <andreas at botzner dot com> # Copyright: (c) 2021, Andreas Botzner (@paginabianca) <andreas at botzner dot com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# #
@ -130,6 +131,14 @@ def test_without_required_parameters(connect_mock, capfd, mocker):
assert json.loads(out)['failed'] assert json.loads(out)['failed']
def mock_api_tasks_response(mocker):
m = mocker.MagicMock()
g = mocker.MagicMock()
m.nodes = mocker.MagicMock(return_value=g)
g.tasks.get = mocker.MagicMock(return_value=TASKS)
return m
@patch('ansible_collections.community.general.plugins.module_utils.proxmox.ProxmoxAnsible._connect') @patch('ansible_collections.community.general.plugins.module_utils.proxmox.ProxmoxAnsible._connect')
def test_get_tasks(connect_mock, capfd, mocker): def test_get_tasks(connect_mock, capfd, mocker):
set_module_args({'api_host': 'proxmoxhost', set_module_args({'api_host': 'proxmoxhost',
@ -137,14 +146,7 @@ def test_get_tasks(connect_mock, capfd, mocker):
'api_password': 'supersecret', 'api_password': 'supersecret',
'node': NODE}) 'node': NODE})
def f(): connect_mock.side_effect = lambda: mock_api_tasks_response(mocker)
m = mocker.MagicMock()
g = mocker.MagicMock()
m.nodes = mocker.MagicMock(return_value=g)
g.tasks.get = mocker.MagicMock(return_value=TASKS)
return m
connect_mock.side_effect = f
proxmox_utils.HAS_PROXMOXER = True proxmox_utils.HAS_PROXMOXER = True
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
@ -163,14 +165,7 @@ def test_get_single_task(connect_mock, capfd, mocker):
'node': NODE, 'node': NODE,
'task': TASK_UPID}) 'task': TASK_UPID})
def f(): connect_mock.side_effect = lambda: mock_api_tasks_response(mocker)
m = mocker.MagicMock()
g = mocker.MagicMock()
m.nodes = mocker.MagicMock(return_value=g)
g.tasks.get = mocker.MagicMock(return_value=TASKS)
return m
connect_mock.side_effect = f
proxmox_utils.HAS_PROXMOXER = True proxmox_utils.HAS_PROXMOXER = True
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
@ -190,14 +185,7 @@ def test_get_non_existent_task(connect_mock, capfd, mocker):
'node': NODE, 'node': NODE,
'task': 'UPID:nonexistent'}) 'task': 'UPID:nonexistent'})
def f(): connect_mock.side_effect = lambda: mock_api_tasks_response(mocker)
m = mocker.MagicMock()
g = mocker.MagicMock()
m.nodes = mocker.MagicMock(return_value=g)
g.tasks.get = mocker.MagicMock(return_value=TASKS)
return m
connect_mock.side_effect = f
proxmox_utils.HAS_PROXMOXER = True proxmox_utils.HAS_PROXMOXER = True
with pytest.raises(SystemExit): with pytest.raises(SystemExit):