1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/library/packaging/homebrew

783 lines
23 KiB
Text
Raw Normal View History

2013-03-17 02:52:51 +01:00
#!/usr/bin/python
# -*- coding: utf-8 -*-
# (c) 2013, Andrew Dunham <andrew@du.nham.ca>
2014-02-19 20:46:44 +01:00
# (c) 2013, Daniel Jaouen <dcj24@cornell.edu>
#
2013-03-17 02:52:51 +01:00
# Based on macports (Jimmy Tang <jcftang@gmail.com>)
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
DOCUMENTATION = '''
---
module: homebrew
author: Andrew Dunham
short_description: Package manager for Homebrew
description:
- Manages Homebrew packages
2014-02-19 20:46:44 +01:00
version_added: "1.1"
2013-03-17 02:52:51 +01:00
options:
name:
description:
- name of package to install/remove
required: true
state:
description:
- state of the package
2014-02-19 20:46:44 +01:00
choices: [ 'head', 'latest', 'installed', 'linked', 'uninstalled' ]
2013-03-17 02:52:51 +01:00
required: false
default: present
update_homebrew:
description:
- update homebrew itself first
required: false
default: "no"
choices: [ "yes", "no" ]
notes: []
'''
EXAMPLES = '''
- homebrew: name=foo state=present
- homebrew: name=foo state=present update_homebrew=yes
2014-02-19 20:46:44 +01:00
- homebrew: name=foo state=latest update_homebrew=yes
- homebrew: update_homebrew=yes upgrade=yes
- homebrew: name=foo state=head
- homebrew: name=foo state=linked
- homebrew: name=foo state=absent
- homebrew: name=foo,bar state=absent
2013-03-17 02:52:51 +01:00
'''
2014-02-19 20:46:44 +01:00
import os.path
import re
# exceptions -------------------------------------------------------------- {{{
class HomebrewException(Exception):
pass
# /exceptions ------------------------------------------------------------- }}}
# utils ------------------------------------------------------------------- {{{
def _create_regex_group(s):
lines = (line.strip() for line in s.split('\n') if line.strip())
chars = filter(None, (line.split('#')[0].strip() for line in lines))
group = r'[^' + r''.join(chars) + r']'
return re.compile(group)
# /utils ------------------------------------------------------------------ }}}
class Homebrew(object):
'''A class to manage Homebrew packages.'''
# class regexes ------------------------------------------------ {{{
VALID_PATH_CHARS = r'''
\w # alphanumeric characters (i.e., [a-zA-Z0-9_])
\s # spaces
: # colons
{sep} # the OS-specific path separator
- # dashes
'''.format(sep=os.path.sep)
VALID_BREW_PATH_CHARS = r'''
\w # alphanumeric characters (i.e., [a-zA-Z0-9_])
\s # spaces
{sep} # the OS-specific path separator
- # dashes
'''.format(sep=os.path.sep)
VALID_PACKAGE_CHARS = r'''
\w # alphanumeric characters (i.e., [a-zA-Z0-9_])
- # dashes
'''
INVALID_PATH_REGEX = _create_regex_group(VALID_PATH_CHARS)
INVALID_BREW_PATH_REGEX = _create_regex_group(VALID_BREW_PATH_CHARS)
INVALID_PACKAGE_REGEX = _create_regex_group(VALID_PACKAGE_CHARS)
# /class regexes ----------------------------------------------- }}}
# class validations -------------------------------------------- {{{
@classmethod
def valid_path(cls, path):
'''
`path` must be one of:
- list of paths
- a string containing only:
- alphanumeric characters
- dashes
- spaces
- colons
- os.path.sep
'''
if isinstance(path, basestring):
return not cls.INVALID_PATH_REGEX.search(path)
try:
iter(path)
except TypeError:
return False
else:
paths = path
return all(cls.valid_brew_path(path_) for path_ in paths)
@classmethod
def valid_brew_path(cls, brew_path):
'''
`brew_path` must be one of:
- None
- a string containing only:
- alphanumeric characters
- dashes
- spaces
- os.path.sep
'''
if brew_path is None:
return True
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
return (
isinstance(brew_path, basestring)
and not cls.INVALID_BREW_PATH_REGEX.search(brew_path)
)
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
@classmethod
def valid_package(cls, package):
'''A valid package is either None or alphanumeric.'''
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
if package is None:
return True
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
return (
isinstance(package, basestring)
and not cls.INVALID_PACKAGE_REGEX.search(package)
)
@classmethod
def valid_state(cls, state):
'''
A valid state is one of:
- None
- installed
- upgraded
- head
- linked
- absent
'''
if state is None:
return True
else:
return (
isinstance(state, basestring)
and state.lower() in (
'installed',
'upgraded',
'head',
'linked',
'absent',
)
)
@classmethod
def valid_module(cls, module):
'''A valid module is an instance of AnsibleModule.'''
return isinstance(module, AnsibleModule)
# /class validations ------------------------------------------- }}}
# class properties --------------------------------------------- {{{
@property
def module(self):
return self._module
@module.setter
def module(self, module):
if not self.valid_module(module):
self._module = None
self.failed = True
self.message = 'Invalid module: {0}.'.format(module)
raise HomebrewException(self.message)
else:
self._module = module
return module
@property
def path(self):
return self._path
@path.setter
def path(self, path):
if not self.valid_path(path):
self._path = []
self.failed = True
self.message = 'Invalid path: {0}.'.format(path)
raise HomebrewException(self.message)
else:
if isinstance(path, basestring):
self._path = path.split(':')
else:
self._path = path
return path
@property
def brew_path(self):
return self._brew_path
@brew_path.setter
def brew_path(self, brew_path):
if not self.valid_brew_path(brew_path):
self._brew_path = None
self.failed = True
self.message = 'Invalid brew_path: {0}.'.format(brew_path)
raise HomebrewException(self.message)
else:
self._brew_path = brew_path
return brew_path
@property
def params(self):
return self._params
@params.setter
def params(self, params):
self._params = self.module.params
return self._params
@property
def current_package(self):
return self._current_package
@current_package.setter
def current_package(self, package):
if not self.valid_package(package):
self._current_package = None
self.failed = True
self.message = 'Invalid package: {0}.'.format(package)
raise HomebrewException(self.message)
else:
self._current_package = package
return package
# /class properties -------------------------------------------- }}}
def __init__(self, module, path=None, packages=None, state=None,
update_homebrew=False, ):
self._setup_status_vars()
self._setup_instance_vars(module=module, path=path, packages=packages,
state=state, update_homebrew=update_homebrew, )
self._prep()
# prep --------------------------------------------------------- {{{
def _setup_status_vars(self):
self.failed = False
self.changed = False
self.changed_count = 0
self.unchanged_count = 0
self.message = ''
def _setup_instance_vars(self, **kwargs):
for key, val in kwargs.iteritems():
setattr(self, key, val)
def _prep(self):
self._prep_path()
self._prep_brew_path()
def _prep_path(self):
if not self.path:
self.path = ['/usr/local/bin']
def _prep_brew_path(self):
if not self.module:
self.brew_path = None
self.failed = True
self.message = 'AnsibleModule not set.'
raise HomebrewException(self.message)
self.brew_path = self.module.get_bin_path(
'brew',
required=True,
opt_dirs=self.path,
)
if not self.brew_path:
self.brew_path = None
self.failed = True
self.message = 'Unable to locate homebrew executable.'
raise HomebrewException('Unable to locate homebrew executable.')
return self.brew_path
def _status(self):
return (self.failed, self.changed, self.message)
# /prep -------------------------------------------------------- }}}
def run(self):
try:
self._run()
except HomebrewException:
pass
if not self.failed and (self.changed_count + self.unchanged_count > 1):
self.message = "Changed: %d, Unchanged: %d" % (
self.changed_count,
self.unchanged_count,
)
(failed, changed, message) = self._status()
return (failed, changed, message)
# checks ------------------------------------------------------- {{{
def _current_package_is_installed(self):
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
rc, out, err = self.module.run_command(
"{brew_path} list -m1 | grep -q '^{package}$'".format(
brew_path=self.brew_path,
package=self.current_package,
)
)
2013-03-17 02:52:51 +01:00
if rc == 0:
return True
2014-02-19 20:46:44 +01:00
else:
return False
def _outdated_packages(self):
rc, out, err = self.module.run_command([
self.brew_path,
'outdated',
])
return [line.split(' ')[0].strip() for line in out.split('\n') if line]
def _current_package_is_outdated(self):
if not self.valid_package(self.current_package):
return False
return self.current_package in self._outdated_packages()
def _current_package_is_installed_from_head(self):
if not Homebrew.valid_package(self.current_package):
return False
elif not self._current_package_is_installed():
return False
rc, out, err = self.module.run_command([
self.brew_path,
'info',
self.current_package,
])
try:
version_info = [line for line in out.split('\n') if line][0]
except IndexError:
return False
return version_info.split(' ')[-1] == 'HEAD'
# /checks ------------------------------------------------------ }}}
# commands ----------------------------------------------------- {{{
def _run(self):
if self.update_homebrew:
self._update_homebrew()
if self.packages:
if self.state == 'installed':
return self._install_packages()
elif self.state == 'upgraded':
return self._upgrade_packages()
elif self.state == 'head':
return self._install_packages()
# elif self.state == 'linked':
# return self._linked()
elif self.state == 'absent':
return self._uninstall_packages()
# updated -------------------------------- {{{
def _update_homebrew(self):
rc, out, err = self.module.run_command([
self.brew_path,
'update',
])
if rc == 0:
if out and isinstance(out, basestring):
already_updated = any(
re.search(r'Already up-to-date.', s.strip(), re.IGNORECASE)
for s in out.split('\n')
if s
)
if not already_updated:
self.changed = True
self.message = 'Homebrew updated successfully.'
else:
self.message = 'Homebrew already up-to-date.'
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
# /updated ------------------------------- }}}
# installed ------------------------------ {{{
def _install_current_package(self):
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if self._current_package_is_installed():
self.unchanged_count += 1
self.message = 'Package already installed: {0}'.format(
self.current_package,
)
return True
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
if self.module.check_mode:
self.changed = True
self.message = 'Package would be installed: {0}'.format(
self.current_package
)
raise HomebrewException(self.message)
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
if self.state == 'head':
head = '--HEAD'
else:
head = None
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
cmd = [opt
for opt in (self.brew_path, 'install', self.current_package, head)
if opt]
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
rc, out, err = self.module.run_command(cmd)
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
if self._current_package_is_installed():
self.changed_count += 1
self.changed = True
self.message = 'Package installed: {0}'.format(self.current_package)
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
def _install_packages(self):
for package in self.packages:
self.current_package = package
self._install_current_package()
return True
# /installed ----------------------------- }}}
# upgraded ------------------------------- {{{
def _upgrade_current_package(self):
command = 'upgrade'
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if not self._current_package_is_installed():
command = 'install'
if self._current_package_is_installed() and not self._current_package_is_outdated():
self.message = 'Package is already upgraded: {0}'.format(
self.current_package,
)
self.unchanged_count += 1
return True
2014-02-19 20:46:44 +01:00
if self.module.check_mode:
self.changed = True
self.message = 'Package would be upgraded: {0}'.format(
self.current_package
)
raise HomebrewException(self.message)
rc, out, err = self.module.run_command([
self.brew_path,
command,
self.current_package,
])
if not self._current_package_is_outdated():
self.changed_count += 1
self.changed = True
self.message = 'Package upgraded: {0}'.format(self.current_package)
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
def _upgrade_all_packages(self):
rc, out, err = self.module.run_command([
self.brew_path,
'upgrade',
])
if rc == 0:
self.changed = True
self.message = 'All packages upgraded.'
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
def _upgrade_packages(self):
if not self.packages:
self._upgrade_all_packages()
else:
for package in self.packages:
self.current_package = package
self._upgrade_current_package()
return True
# /upgraded ------------------------------ }}}
# uninstalled ---------------------------- {{{
def _uninstall_current_package(self):
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if not self._current_package_is_installed():
self.unchanged_count += 1
self.message = 'Package already uninstalled: {0}'.format(
self.current_package,
)
return True
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
if self.module.check_mode:
self.changed = True
self.message = 'Package would be uninstalled: {0}'.format(
self.current_package
)
raise HomebrewException(self.message)
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
cmd = [opt
for opt in (self.brew_path, 'uninstall', self.current_package)
if opt]
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
rc, out, err = self.module.run_command(cmd)
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
if not self._current_package_is_installed():
self.changed_count += 1
self.changed = True
self.message = 'Package uninstalled: {0}'.format(self.current_package)
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
2013-03-17 02:52:51 +01:00
2014-02-19 20:46:44 +01:00
def _uninstall_packages(self):
for package in self.packages:
self.current_package = package
self._uninstall_current_package()
2014-02-19 20:46:44 +01:00
return True
# /uninstalled ----------------------------- }}}
# /commands ---------------------------------------------------- }}}
2014-02-19 20:46:44 +01:00
# def link_package(module, brew_path, package):
# """ Links a single homebrew package. """
#
# failed, changed, msg = False, False, ''
#
# if not a_valid_package(package):
# failed = True
# msg = 'invalid package'
#
# elif not query_package(module, brew_path, package):
# failed = True
# msg = 'not installed'
#
# else:
# if module.check_mode:
# module.exit_json(changed=True)
#
# rc, out, err = module.run_command([
# brew_path,
# 'link',
# package,
# ])
#
# if rc:
# failed = True
# msg = out.strip()
# else:
# if err.strip().lower().find('already linked') != -1:
# msg = 'already linked'
# else:
# changed = True
# msg = 'linked'
#
# return (failed, changed, msg)
#
#
# def link_packages(module, brew_path, packages):
# """ Upgrades one or more packages. """
#
# failed, linked, unchanged, msg = False, 0, 0, ''
#
# for package in packages:
# failed, changed, msg = link_package(module, brew_path, package)
# if failed:
# break
# if changed:
# linked += 1
# else:
# unchanged += 1
#
# if failed:
# msg = 'installed: %d, unchanged: %d, error: ' + msg
# msg = msg % (linked, unchanged)
# elif linked:
# changed = True
# msg = 'linked: %d, unchanged: %d' % (linked, unchanged)
# else:
# msg = 'linked: %d, unchanged: %d' % (linked, unchanged)
#
# return (failed, changed, msg)
#
#
# def unlink_package(module, brew_path, package):
# """ Unlinks a single homebrew package. """
#
# failed, changed, msg = False, False, ''
#
# if not a_valid_package(package):
# failed = True
# msg = 'invalid package'
#
# elif not query_package(module, brew_path, package):
# failed = True
# msg = 'not installed'
#
# else:
# if module.check_mode:
# module.exit_json(changed=True)
#
# rc, out, err = module.run_command([
# brew_path,
# 'unlink',
# package,
# ])
#
# if rc:
# failed = True
# msg = out.strip()
# else:
# if out.find('0 links') != -1:
# msg = 'already unlinked'
# else:
# changed = True
# msg = 'linked'
#
# return (failed, changed, msg)
#
#
# def unlink_packages(module, brew_path, packages):
# """ Unlinks one or more packages. """
#
# failed, unlinked, unchanged, msg = False, 0, 0, ''
#
# for package in packages:
# failed, changed, msg = unlink_package(module, brew_path, package)
# if failed:
# break
# if changed:
# unlinked += 1
# else:
# unchanged += 1
#
# if failed:
# msg = 'installed: %d, unchanged: %d, error: ' + msg
# msg = msg % (unlinked, unchanged)
# elif unlinked:
# changed = True
# msg = 'unlinked: %d, unchanged: %d' % (unlinked, unchanged)
# else:
# msg = 'unlinked: %d, unchanged: %d' % (unlinked, unchanged)
#
# return (failed, changed, msg)
2013-03-17 02:52:51 +01:00
def main():
module = AnsibleModule(
2014-02-19 20:46:44 +01:00
argument_spec=dict(
name=dict(aliases=["pkg"], required=False),
path=dict(required=False),
state=dict(
default="present",
choices=[
"present", "installed",
"latest", "upgraded", "head",
"linked", "unlinked",
"absent", "removed", "uninstalled",
],
),
update_homebrew=dict(
default="no",
aliases=["update-brew"],
type='bool',
),
2013-03-17 02:52:51 +01:00
),
2014-02-19 20:46:44 +01:00
supports_check_mode=True,
2013-03-17 02:52:51 +01:00
)
p = module.params
2014-02-19 20:46:44 +01:00
if p['name']:
packages = p['name'].split(',')
else:
packages = None
path = p['path']
if path:
path = path.split(':')
else:
path = ['/usr/local/bin']
state = p['state']
if state in ('present', 'installed', 'head'):
state = 'installed'
if state in ('latest', 'upgraded'):
state = 'upgraded'
if state in ('absent', 'removed', 'uninstalled'):
state = 'absent'
update_homebrew = p['update_homebrew']
brew = Homebrew(module=module, path=path, packages=packages,
state=state, update_homebrew=update_homebrew)
(failed, changed, message) = brew.run()
if failed:
module.fail_json(msg=message)
else:
module.exit_json(changed=changed, msg=message)
# this is magic, see lib/ansible/module_common.py
#<<INCLUDE_ANSIBLE_MODULE_COMMON>>
2013-03-17 02:52:51 +01:00
main()