#!/usr/bin/python # -*- coding: utf-8 -*- # (c) 2012, Flowroute LLC # Written by Matthew Williams # Based on yum module written by Seth Vidal # # This module is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This software is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this software. If not, see . # DOCUMENTATION = ''' --- module: apt short_description: Manages apt-packages description: - Manages I(apt) packages (such as for Debian/Ubuntu). version_added: "0.0.2" options: pkg: description: - A package name or package specifier with version, like C(foo) or C(foo=1.0) required: true default: null state: description: - Indicates the desired package state required: false default: present choices: [ "latest", "absent", "present" ] update_cache: description: - Run the equivalent of C(apt-get update) before the operation. Can be run as part of the package installation or as a separate step required: false default: "no" choices: [ "yes", "no" ] cache_valid_time: description: - If C(update_cache) is specified and the last run is less or equal than I(cache_valid_time) seconds ago, the C(update_cache) gets skipped. required: false default: "no" purge: description: - Will force purging of configuration files if the module state is set to I(absent). required: false default: "no" choices: [ "yes", "no" ] default_release: description: - Corresponds to the C(-t) option for I(apt) and sets pin priorities required: false default: null install_recommends: description: - Corresponds to the C(--no-install-recommends) option for I(apt), default behavior works as apt's default behavior, C(no) does not install recommended packages. Suggested packages are never installed. required: false default: "yes" choices: [ "yes", "no" ] force: description: - If C(yes), force installs/removes. required: false default: "no" choices: [ "yes", "no" ] upgrade: description: - 'If yes or safe, performs an aptitude safe-upgrade.' - 'If full, performs an aptitude full-upgrade.' - 'If dist, performs an apt-get dist-upgrade.' - 'Note: This does not upgrade a specific package, use state=latest for that.' version_added: "1.1" required: false default: "yes" choices: [ "yes", "safe", "full", "dist"] requirements: [ python-apt, aptitude ] author: Matthew Williams notes: - Two of the upgrade modes (C(full) and C(dist)) require C(aptitude), otherwise C(apt-get) suffices. examples: - code: "apt: pkg=foo update_cache=yes" description: Update repositories cache and install C(foo) package - code: "apt: pkg=foo state=removed" description: Remove C(foo) package - code: "apt: pkg=foo state=installed" description: Install the package C(foo) - code: "apt: pkg=foo=1.00 state=installed" description: Install the version '1.00' of package C(foo) - code: "apt: pkg=nginx state=latest default_release=squeeze-backports update_cache=yes" description: Update the repository cache and update package C(ngnix) to latest version using default release C(squeeze-backport) - code: "apt: pkg=openjdk-6-jdk state=latest install_recommends=no" description: Install latest version of C(openjdk-6-jdk) ignoring C(install-reccomends) - code: "apt: upgrade=dist" description: Update all packages to the latest version - code: "apt: update_cache=yes" description: Run the equivalent of C(apt-get update) as a separate step - code: "apt: update_cache=yes cache_valid_time=3600" description: Only run C(update_cache=yes) if the last one is more than more than 3600 seconds ago ''' import traceback # added to stave off future warnings about apt api import warnings warnings.filterwarnings('ignore', "apt API not stable yet", FutureWarning) import os import datetime import fnmatch # APT related constants APT_ENVVARS = "DEBIAN_FRONTEND=noninteractive DEBIAN_PRIORITY=critical" DPKG_OPTIONS = '-o "Dpkg::Options::=--force-confdef" -o "Dpkg::Options::=--force-confold"' APT_GET_ZERO = "0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded." APTITUDE_ZERO = "0 packages upgraded, 0 newly installed, 0 to remove and 0 not upgraded." APT_LISTS_PATH = "/var/lib/apt/lists" APT_UPDATE_SUCCESS_STAMP_PATH = "/var/lib/apt/periodic/update-success-stamp" def package_split(pkgspec): parts = pkgspec.split('=') if len(parts) > 1: return parts[0], parts[1] else: return parts[0], None def package_status(m, pkgname, version, cache, state): try: pkg = cache[pkgname] except KeyError: if state == 'install': m.fail_json(msg="No package matching '%s' is available" % pkgname) else: return False, False if version: try : return pkg.is_installed and fnmatch.fnmatch(pkg.installed.version, version), False except AttributeError: #assume older version of python-apt is installed return pkg.isInstalled and fnmatch.fnmatch(pkg.installedVersion, version), False else: try : return pkg.is_installed, pkg.is_upgradable except AttributeError: #assume older version of python-apt is installed return pkg.isInstalled, pkg.isUpgradable def install(m, pkgspec, cache, upgrade=False, default_release=None, install_recommends=True, force=False): packages = "" for package in pkgspec: name, version = package_split(package) installed, upgradable = package_status(m, name, version, cache, state='install') if not installed or (upgrade and upgradable): packages += "'%s' " % package if len(packages) != 0: if force: force_yes = '--force-yes' else: force_yes = '' if m.check_mode: check_arg = '--simulate' else: check_arg = '' cmd = "%s %s -y %s %s %s install %s" % (APT_ENVVARS, APT_GET_CMD, DPKG_OPTIONS, force_yes, check_arg, packages) if default_release: cmd += " -t '%s'" % (default_release,) if not install_recommends: cmd += " --no-install-recommends" rc, out, err = m.run_command(cmd) if rc: m.fail_json(msg="'apt-get install %s' failed: %s" % (packages, err)) else: m.exit_json(changed=True) else: m.exit_json(changed=False) def remove(m, pkgspec, cache, purge=False): packages = "" for package in pkgspec: name, version = package_split(package) installed, upgradable = package_status(m, name, version, cache, state='remove') if installed: packages += "'%s' " % package if len(packages) == 0: m.exit_json(changed=False) else: purge = '' if purge: purge = '--purge' cmd = "%s -q -y %s remove %s" % (APT_GET_CMD, purge,packages) if m.check_mode: m.exit_json(changed=True) rc, out, err = m.run_command(cmd) if rc: m.fail_json(msg="'apt-get remove %s' failed: %s" % (packages, err)) m.exit_json(changed=True) def upgrade(m, mode="yes"): if m.check_mode: check_arg = '--simulate' else: check_arg = '' if mode == "dist": # apt-get dist-upgrade apt_cmd = APT_GET_CMD upgrade_command = "dist-upgrade" elif mode == "full": # aptitude full-upgrade apt_cmd = APTITUDE_CMD upgrade_command = "full-upgrade" else: # aptitude safe-upgrade # mode=yes # default apt_cmd = APTITUDE_CMD upgrade_command = "safe-upgrade" apt_cmd_path = m.get_bin_path(apt_cmd, required=True) cmd = '%s %s -y %s %s %s' % (APT_ENVVARS, apt_cmd_path, DPKG_OPTIONS, check_arg, upgrade_command) rc, out, err = m.run_command(cmd) if rc: m.fail_json(msg="'%s %s' failed: %s" % (apt_cmd, upgrade_command, err)) m.fail_json(msg="'aptitude safe-upgrade' failed: %s" % err) if (apt_cmd == APT_GET_CMD and APT_GET_ZERO in out) or (apt_cmd == APTITUDE_CMD and APTITUDE_ZERO in out): m.exit_json(changed=False, msg=out) m.exit_json(changed=True, msg=out) def main(): module = AnsibleModule( argument_spec = dict( state = dict(default='installed', choices=['installed', 'latest', 'removed', 'absent', 'present']), update_cache = dict(aliases=['update-cache'], type='bool'), cache_valid_time = dict(type='int'), purge = dict(default='no', type='bool'), package = dict(default=None, aliases=['pkg', 'name']), default_release = dict(default=None, aliases=['default-release']), install_recommends = dict(default='yes', aliases=['install-recommends'], type='bool'), force = dict(default='no', type='bool'), upgrade = dict(choices=['yes', 'safe', 'full', 'dist']) ), mutually_exclusive = [['package', 'upgrade']], required_one_of = [['package', 'upgrade', 'update_cache']], supports_check_mode = True ) try: import apt import apt_pkg except: module.fail_json(msg="Could not import python modules: apt, apt_pkg. Please install python-apt package.") global APTITUDE_CMD = module.get_bin_path("aptitude", False) global APT_GET_CMD = module.get_bin_path("apt-get") p = module.params install_recommends = p['install_recommends'] try: cache = apt.Cache() if p['default_release']: apt_pkg.config['APT::Default-Release'] = p['default_release'] # reopen cache w/ modified config cache.open(progress=None) if p['update_cache']: # Default is: always update the cache cache_valid = False if p['cache_valid_time']: tdelta = datetime.timedelta(seconds=p['cache_valid_time']) try: mtime = os.stat(APT_UPDATE_SUCCESS_STAMP_PATH).st_mtime except: mtime = False if mtime is False: # Looks like the update-success-stamp is not available # Fallback: Checking the mtime of the lists try: mtime = os.stat(APT_LISTS_PATH).st_mtime except: mtime = False if mtime is False: # No mtime could be read - looks like lists are not there # We update the cache to be safe cache_valid = False else: mtimestamp = datetime.datetime.fromtimestamp(mtime) if mtimestamp + tdelta >= datetime.datetime.now(): # dont update the cache # the old cache is less than cache_valid_time seconds old - so still valid cache_valid = True if cache_valid is not True: cache.update() cache.open(progress=None) if not p['package'] and not p['upgrade']: module.exit_json(changed=False) force_yes = p['force'] if p['upgrade']: upgrade(module, p['upgrade']) packages = p['package'].split(',') latest = p['state'] == 'latest' for package in packages: if package.count('=') > 1: module.fail_json(msg="invalid package spec: %s" % package) if latest and '=' in package: module.fail_json(msg='version number inconsistent with state=latest: %s' % package) if p['state'] == 'latest': install(module, packages, cache, upgrade=True, default_release=p['default_release'], install_recommends=install_recommends, force=force_yes) elif p['state'] in [ 'installed', 'present' ]: install(module, packages, cache, default_release=p['default_release'], install_recommends=install_recommends,force=force_yes) elif p['state'] in [ 'removed', 'absent' ]: remove(module, packages, cache, p['purge']) except apt.cache.LockFailedException: module.fail_json(msg="Failed to lock apt for exclusive operation") # this is magic, see lib/ansible/module_common.py #<> main()