#!/usr/bin/python # -*- coding: utf-8 -*- # (c) 2015, Kevin Brebanov # Based on pacman (Afterburn , Aaron Bull Schaefer ) # and apt (Matthew Williams ) modules. # # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = ''' --- module: apk short_description: Manages apk packages description: - Manages I(apk) packages for Alpine Linux. author: "Kevin Brebanov (@kbrebanov)" options: available: description: - During upgrade, reset versioned world dependencies and change logic to prefer replacing or downgrading packages (instead of holding them) if the currently installed package is no longer available from any repository. type: bool default: no name: description: - A package name, like C(foo), or multiple packages, like C(foo, bar). type: list elements: str no_cache: description: - Do not use any local cache path. type: bool default: no version_added: 1.0.0 repository: description: - A package repository or multiple repositories. Unlike with the underlying apk command, this list will override the system repositories rather than supplement them. type: list elements: str state: description: - Indicates the desired package(s) state. - C(present) ensures the package(s) is/are present. - C(absent) ensures the package(s) is/are absent. - C(latest) ensures the package(s) is/are present and the latest version(s). default: present choices: [ "present", "absent", "latest" ] type: str update_cache: description: - Update repository indexes. Can be run with other steps or on it's own. type: bool default: no upgrade: description: - Upgrade all installed packages to their latest version. type: bool default: no notes: - '"name" and "upgrade" are mutually exclusive.' - When used with a `loop:` each package will be processed individually, it is much more efficient to pass the list directly to the `name` option. ''' EXAMPLES = ''' - name: Update repositories and install foo package apk: name: foo update_cache: yes - name: Update repositories and install foo and bar packages apk: name: foo,bar update_cache: yes - name: Remove foo package apk: name: foo state: absent - name: Remove foo and bar packages apk: name: foo,bar state: absent - name: Install the package foo apk: name: foo state: present - name: Install the packages foo and bar apk: name: foo,bar state: present - name: Update repositories and update package foo to latest version apk: name: foo state: latest update_cache: yes - name: Update repositories and update packages foo and bar to latest versions apk: name: foo,bar state: latest update_cache: yes - name: Update all installed packages to the latest versions apk: upgrade: yes - name: Upgrade / replace / downgrade / uninstall all installed packages to the latest versions available apk: available: yes upgrade: yes - name: Update repositories as a separate step apk: update_cache: yes - name: Install package from a specific repository apk: name: foo state: latest update_cache: yes repository: http://dl-3.alpinelinux.org/alpine/edge/main - name: Install package without using cache apk: name: foo state: latest no_cache: yes ''' RETURN = ''' packages: description: a list of packages that have been changed returned: when packages have changed type: list sample: ['package', 'other-package'] ''' import re # Import module snippets. from ansible.module_utils.basic import AnsibleModule def parse_for_packages(stdout): packages = [] data = stdout.split('\n') regex = re.compile(r'^\(\d+/\d+\)\s+\S+\s+(\S+)') for l in data: p = regex.search(l) if p: packages.append(p.group(1)) return packages def update_package_db(module, exit): cmd = "%s update" % (APK_PATH) rc, stdout, stderr = module.run_command(cmd, check_rc=False) if rc != 0: module.fail_json(msg="could not update package db", stdout=stdout, stderr=stderr) elif exit: module.exit_json(changed=True, msg='updated repository indexes', stdout=stdout, stderr=stderr) else: return True def query_toplevel(module, name): # /etc/apk/world contains a list of top-level packages separated by ' ' or \n # packages may contain repository (@) or version (=<>~) separator characters or start with negation ! regex = re.compile(r'^' + re.escape(name) + r'([@=<>~].+)?$') with open('/etc/apk/world') as f: content = f.read().split() for p in content: if regex.search(p): return True return False def query_package(module, name): cmd = "%s -v info --installed %s" % (APK_PATH, name) rc, stdout, stderr = module.run_command(cmd, check_rc=False) if rc == 0: return True else: return False def query_latest(module, name): cmd = "%s version %s" % (APK_PATH, name) rc, stdout, stderr = module.run_command(cmd, check_rc=False) search_pattern = r"(%s)-[\d\.\w]+-[\d\w]+\s+(.)\s+[\d\.\w]+-[\d\w]+\s+" % (re.escape(name)) match = re.search(search_pattern, stdout) if match and match.group(2) == "<": return False return True def query_virtual(module, name): cmd = "%s -v info --description %s" % (APK_PATH, name) rc, stdout, stderr = module.run_command(cmd, check_rc=False) search_pattern = r"^%s: virtual meta package" % (re.escape(name)) if re.search(search_pattern, stdout): return True return False def get_dependencies(module, name): cmd = "%s -v info --depends %s" % (APK_PATH, name) rc, stdout, stderr = module.run_command(cmd, check_rc=False) dependencies = stdout.split() if len(dependencies) > 1: return dependencies[1:] else: return [] def upgrade_packages(module, available): if module.check_mode: cmd = "%s upgrade --simulate" % (APK_PATH) else: cmd = "%s upgrade" % (APK_PATH) if available: cmd = "%s --available" % cmd rc, stdout, stderr = module.run_command(cmd, check_rc=False) packagelist = parse_for_packages(stdout) if rc != 0: module.fail_json(msg="failed to upgrade packages", stdout=stdout, stderr=stderr, packages=packagelist) if re.search(r'^OK', stdout): module.exit_json(changed=False, msg="packages already upgraded", stdout=stdout, stderr=stderr, packages=packagelist) module.exit_json(changed=True, msg="upgraded packages", stdout=stdout, stderr=stderr, packages=packagelist) def install_packages(module, names, state): upgrade = False to_install = [] to_upgrade = [] for name in names: # Check if virtual package if query_virtual(module, name): # Get virtual package dependencies dependencies = get_dependencies(module, name) for dependency in dependencies: if state == 'latest' and not query_latest(module, dependency): to_upgrade.append(dependency) else: if not query_toplevel(module, name): to_install.append(name) elif state == 'latest' and not query_latest(module, name): to_upgrade.append(name) if to_upgrade: upgrade = True if not to_install and not upgrade: module.exit_json(changed=False, msg="package(s) already installed") packages = " ".join(to_install + to_upgrade) if upgrade: if module.check_mode: cmd = "%s add --upgrade --simulate %s" % (APK_PATH, packages) else: cmd = "%s add --upgrade %s" % (APK_PATH, packages) else: if module.check_mode: cmd = "%s add --simulate %s" % (APK_PATH, packages) else: cmd = "%s add %s" % (APK_PATH, packages) rc, stdout, stderr = module.run_command(cmd, check_rc=False) packagelist = parse_for_packages(stdout) if rc != 0: module.fail_json(msg="failed to install %s" % (packages), stdout=stdout, stderr=stderr, packages=packagelist) module.exit_json(changed=True, msg="installed %s package(s)" % (packages), stdout=stdout, stderr=stderr, packages=packagelist) def remove_packages(module, names): installed = [] for name in names: if query_package(module, name): installed.append(name) if not installed: module.exit_json(changed=False, msg="package(s) already removed") names = " ".join(installed) if module.check_mode: cmd = "%s del --purge --simulate %s" % (APK_PATH, names) else: cmd = "%s del --purge %s" % (APK_PATH, names) rc, stdout, stderr = module.run_command(cmd, check_rc=False) packagelist = parse_for_packages(stdout) # Check to see if packages are still present because of dependencies for name in installed: if query_package(module, name): rc = 1 break if rc != 0: module.fail_json(msg="failed to remove %s package(s)" % (names), stdout=stdout, stderr=stderr, packages=packagelist) module.exit_json(changed=True, msg="removed %s package(s)" % (names), stdout=stdout, stderr=stderr, packages=packagelist) # ========================================== # Main control flow. def main(): module = AnsibleModule( argument_spec=dict( state=dict(default='present', choices=['present', 'installed', 'absent', 'removed', 'latest']), name=dict(type='list', elements='str'), no_cache=dict(default=False, type='bool'), repository=dict(type='list', elements='str'), update_cache=dict(default=False, type='bool'), upgrade=dict(default=False, type='bool'), available=dict(default=False, type='bool'), ), required_one_of=[['name', 'update_cache', 'upgrade']], mutually_exclusive=[['name', 'upgrade']], supports_check_mode=True ) # Set LANG env since we parse stdout module.run_command_environ_update = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C', LC_CTYPE='C') global APK_PATH APK_PATH = module.get_bin_path('apk', required=True) p = module.params if p['no_cache']: APK_PATH = "%s --no-cache" % (APK_PATH, ) # add repositories to the APK_PATH if p['repository']: for r in p['repository']: APK_PATH = "%s --repository %s --repositories-file /dev/null" % (APK_PATH, r) # normalize the state parameter if p['state'] in ['present', 'installed']: p['state'] = 'present' if p['state'] in ['absent', 'removed']: p['state'] = 'absent' if p['update_cache']: update_package_db(module, not p['name'] and not p['upgrade']) if p['upgrade']: upgrade_packages(module, p['available']) if p['state'] in ['present', 'latest']: install_packages(module, p['name'], p['state']) elif p['state'] == 'absent': remove_packages(module, p['name']) if __name__ == '__main__': main()