#!/usr/bin/python # -*- coding: utf-8 -*- # Copyright: (c) 2020, Silvie Chlupova # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = r""" --- module: copr short_description: Manage one of the Copr repositories version_added: 2.0.0 description: This module can enable, disable or remove the specified repository. author: Silvie Chlupova (@schlupov) requirements: - dnf - dnf-plugins-core notes: - Supports C(check_mode). options: host: description: The Copr host to work with. default: copr.fedorainfracloud.org type: str protocol: description: This indicate which protocol to use with the host. default: https type: str name: description: Copr directory name, for example C(@copr/copr-dev). required: true type: str state: description: - Whether to set this project as C(enabled), C(disabled) or C(absent). default: enabled type: str choices: [absent, enabled, disabled] chroot: description: - The name of the chroot that you want to enable/disable/remove in the project, for example C(epel-7-x86_64). Default chroot is determined by the operating system, version of the operating system, and architecture on which the module is run. type: str """ EXAMPLES = r""" - name: Enable project Test of the user schlupov community.general.copr: host: copr.fedorainfracloud.org state: enabled name: schlupov/Test chroot: fedora-31-x86_64 - name: Remove project integration_tests of the group copr community.general.copr: state: absent name: '@copr/integration_tests' """ RETURN = r""" repo_filename: description: The name of the repo file in which the copr project information is stored. returned: success type: str sample: _copr:copr.fedorainfracloud.org:group_copr:integration_tests.repo repo: description: Path to the project on the host. returned: success type: str sample: copr.fedorainfracloud.org/group_copr/integration_tests """ import stat import os import traceback try: import dnf import dnf.cli import dnf.repodict from dnf.conf import Conf HAS_DNF_PACKAGES = True DNF_IMP_ERR = None except ImportError: DNF_IMP_ERR = traceback.format_exc() HAS_DNF_PACKAGES = False from ansible.module_utils.six.moves.urllib.error import HTTPError from ansible.module_utils.basic import missing_required_lib from ansible.module_utils import distro # pylint: disable=import-error from ansible.module_utils.basic import AnsibleModule # pylint: disable=import-error from ansible.module_utils.urls import open_url # pylint: disable=import-error class CoprModule(object): """The class represents a copr module. The class contains methods that take care of the repository state of a project, whether the project is enabled, disabled or missing. """ ansible_module = None def __init__(self, host, name, state, protocol, chroot=None, check_mode=False): self.host = host self.name = name self.state = state self.chroot = chroot self.protocol = protocol self.check_mode = check_mode if not chroot: self.chroot = self.chroot_conf() else: self.chroot = chroot self.get_base() @property def short_chroot(self): """str: Chroot (distribution-version-architecture) shorten to distribution-version.""" return self.chroot.rsplit('-', 1)[0] @property def arch(self): """str: Target architecture.""" chroot_parts = self.chroot.split("-") return chroot_parts[-1] @property def user(self): """str: Copr user (this can also be the name of the group).""" return self._sanitize_username(self.name.split("/")[0]) @property def project(self): """str: The name of the copr project.""" return self.name.split("/")[1] @classmethod def need_root(cls): """Check if the module was run as root.""" if os.geteuid() != 0: cls.raise_exception("This command has to be run under the root user.") @classmethod def get_base(cls): """Initialize the configuration from dnf. Returns: An instance of the BaseCli class. """ cls.base = dnf.cli.cli.BaseCli(Conf()) return cls.base @classmethod def raise_exception(cls, msg): """Raise either an ansible exception or a python exception. Args: msg: The message to be displayed when an exception is thrown. """ if cls.ansible_module: raise cls.ansible_module.fail_json(msg=msg, changed=False) raise Exception(msg) def _get(self, chroot): """Send a get request to the server to obtain the necessary data. Args: chroot: Chroot in the form of distribution-version. Returns: Info about a repository and status code of the get request. """ repo_info = None url = "{0}://{1}/coprs/{2}/repo/{3}/dnf.repo?arch={4}".format( self.protocol, self.host, self.name, chroot, self.arch ) try: r = open_url(url) status_code = r.getcode() repo_info = r.read().decode("utf-8") except HTTPError as e: status_code = e.getcode() return repo_info, status_code def _download_repo_info(self): """Download information about the repository. Returns: Information about the repository. """ distribution, version = self.short_chroot.split('-', 1) chroot = self.short_chroot while True: repo_info, status_code = self._get(chroot) if repo_info: return repo_info if distribution == "rhel": chroot = "centos-stream-8" distribution = "centos" elif distribution == "centos": if version == "stream-8": version = "8" elif version == "stream-9": version = "9" chroot = "epel-{0}".format(version) distribution = "epel" else: if str(status_code) != "404": self.raise_exception( "This repository does not have any builds yet so you cannot enable it now." ) else: self.raise_exception( "Chroot {0} does not exist in {1}".format(self.chroot, self.name) ) def _enable_repo(self, repo_filename_path, repo_content=None): """Write information to a repo file. Args: repo_filename_path: Path to repository. repo_content: Repository information from the host. Returns: True, if the information in the repo file matches that stored on the host, False otherwise. """ if not repo_content: repo_content = self._download_repo_info() if self._compare_repo_content(repo_filename_path, repo_content): return False if not self.check_mode: with open(repo_filename_path, "w+") as file: file.write(repo_content) os.chmod( repo_filename_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH, ) return True def _get_repo_with_old_id(self): """Try to get a repository with the old name.""" repo_id = "{0}-{1}".format(self.user, self.project) if repo_id in self.base.repos and "_copr" in self.base.repos[repo_id].repofile: file_name = self.base.repos[repo_id].repofile.split("/")[-1] try: copr_hostname = file_name.rsplit(":", 2)[0].split(":", 1)[1] if copr_hostname != self.host: return None return file_name except IndexError: return file_name return None def _read_all_repos(self, repo_id=None): """The method is used to initialize the base variable by repositories using the RepoReader class from dnf. Args: repo_id: Repo id of the repository we want to work with. """ reader = dnf.conf.read.RepoReader(self.base.conf, None) for repo in reader: try: if repo_id: if repo.id == repo_id: self.base.repos.add(repo) break else: self.base.repos.add(repo) except dnf.exceptions.ConfigError as e: self.raise_exception(str(e)) def _get_copr_repo(self): """Return one specific repository from all repositories on the system. Returns: The repository that a user wants to enable, disable, or remove. """ repo_id = "copr:{0}:{1}:{2}".format(self.host, self.user, self.project) if repo_id not in self.base.repos: if self._get_repo_with_old_id() is None: return None return self.base.repos[repo_id] def _disable_repo(self, repo_filename_path): """Disable the repository. Args: repo_filename_path: Path to repository. Returns: False, if the repository is already disabled on the system, True otherwise. """ self._read_all_repos() repo = self._get_copr_repo() if repo is None: if self.check_mode: return True self._enable_repo(repo_filename_path) self._read_all_repos("copr:{0}:{1}:{2}".format(self.host, self.user, self.project)) repo = self._get_copr_repo() for repo_id in repo.cfg.sections(): repo_content_api = self._download_repo_info() with open(repo_filename_path, "r") as file: repo_content_file = file.read() if repo_content_file != repo_content_api: if not self.resolve_differences( repo_content_file, repo_content_api, repo_filename_path ): return False if not self.check_mode: self.base.conf.write_raw_configfile( repo.repofile, repo_id, self.base.conf.substitutions, {"enabled": "0"}, ) return True def resolve_differences(self, repo_content_file, repo_content_api, repo_filename_path): """Detect differences between the contents of the repository stored on the system and the information about the repository on the server. Args: repo_content_file: The contents of the repository stored on the system. repo_content_api: The information about the repository from the server. repo_filename_path: Path to repository. Returns: False, if the contents of the repo file and the information on the server match, True otherwise. """ repo_file_lines = repo_content_file.split("\n") repo_api_lines = repo_content_api.split("\n") repo_api_lines.remove("enabled=1") if "enabled=0" in repo_file_lines: repo_file_lines.remove("enabled=0") if " ".join(repo_api_lines) == " ".join(repo_file_lines): return False if not self.check_mode: os.remove(repo_filename_path) self._enable_repo(repo_filename_path, repo_content_api) else: repo_file_lines.remove("enabled=1") if " ".join(repo_api_lines) != " ".join(repo_file_lines): if not self.check_mode: os.remove(repo_filename_path) self._enable_repo(repo_filename_path, repo_content_api) return True def _remove_repo(self): """Remove the required repository. Returns: True, if the repository has been removed, False otherwise. """ self._read_all_repos() repo = self._get_copr_repo() if not repo: return False if not self.check_mode: try: os.remove(repo.repofile) except OSError as e: self.raise_exception(str(e)) return True def run(self): """The method uses methods of the CoprModule class to change the state of the repository. Returns: Dictionary with information that the ansible module displays to the user at the end of the run. """ self.need_root() state = dict() repo_filename = "_copr:{0}:{1}:{2}.repo".format(self.host, self.user, self.project) state["repo"] = "{0}/{1}/{2}".format(self.host, self.user, self.project) state["repo_filename"] = repo_filename repo_filename_path = "{0}/_copr:{1}:{2}:{3}.repo".format( self.base.conf.get_reposdir, self.host, self.user, self.project ) if self.state == "enabled": enabled = self._enable_repo(repo_filename_path) state["msg"] = "enabled" state["state"] = bool(enabled) elif self.state == "disabled": disabled = self._disable_repo(repo_filename_path) state["msg"] = "disabled" state["state"] = bool(disabled) elif self.state == "absent": removed = self._remove_repo() state["msg"] = "absent" state["state"] = bool(removed) return state @staticmethod def _compare_repo_content(repo_filename_path, repo_content_api): """Compare the contents of the stored repository with the information from the server. Args: repo_filename_path: Path to repository. repo_content_api: The information about the repository from the server. Returns: True, if the information matches, False otherwise. """ if not os.path.isfile(repo_filename_path): return False with open(repo_filename_path, "r") as file: repo_content_file = file.read() return repo_content_file == repo_content_api @staticmethod def chroot_conf(): """Obtain information about the distribution, version, and architecture of the target. Returns: Chroot info in the form of distribution-version-architecture. """ (distribution, version, codename) = distro.linux_distribution(full_distribution_name=False) base = CoprModule.get_base() return "{0}-{1}-{2}".format(distribution, version, base.conf.arch) @staticmethod def _sanitize_username(user): """Modify the group name. Args: user: User name. Returns: Modified user name if it is a group name with @. """ if user[0] == "@": return "group_{0}".format(user[1:]) return user def run_module(): """The function takes care of the functioning of the whole ansible copr module.""" module_args = dict( host=dict(type="str", default="copr.fedorainfracloud.org"), protocol=dict(type="str", default="https"), name=dict(type="str", required=True), state=dict(type="str", choices=["enabled", "disabled", "absent"], default="enabled"), chroot=dict(type="str"), ) module = AnsibleModule(argument_spec=module_args, supports_check_mode=True) params = module.params if not HAS_DNF_PACKAGES: module.fail_json(msg=missing_required_lib("dnf"), exception=DNF_IMP_ERR) CoprModule.ansible_module = module copr_module = CoprModule( host=params["host"], name=params["name"], state=params["state"], protocol=params["protocol"], chroot=params["chroot"], check_mode=module.check_mode, ) state = copr_module.run() info = "Please note that this repository is not part of the main distribution" if params["state"] == "enabled" and state["state"]: module.exit_json( changed=state["state"], msg=state["msg"], repo=state["repo"], repo_filename=state["repo_filename"], info=info, ) module.exit_json( changed=state["state"], msg=state["msg"], repo=state["repo"], repo_filename=state["repo_filename"], ) def main(): """Launches ansible Copr module.""" run_module() if __name__ == "__main__": main()