1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Fixes #23558 rpm key ids (#27847)

* Changed rpm-keyid extraction and verification method
* minor style fixes
* fixed rpm key deletion,added integration test for mono key,fixed wording in integration tests
This commit is contained in:
schwatvogel 2017-08-08 20:56:03 +02:00 committed by Toshio Kuratomi
parent d1d0cd5406
commit b8d371ca8b
2 changed files with 47 additions and 37 deletions

View file

@ -90,6 +90,10 @@ class RpmKey(object):
state = module.params['state'] state = module.params['state']
key = module.params['key'] key = module.params['key']
self.gpg = self.module.get_bin_path('gpg')
if not self.gpg:
self.gpg = self.module.get_bin_path('gpg2',required=True)
if '://' in key: if '://' in key:
keyfile = self.fetch_key(key) keyfile = self.fetch_key(key)
keyid = self.getkeyid(keyfile) keyid = self.getkeyid(keyfile)
@ -109,13 +113,13 @@ class RpmKey(object):
else: else:
if not keyfile: if not keyfile:
self.module.fail_json(msg="When importing a key, a valid file must be given") self.module.fail_json(msg="When importing a key, a valid file must be given")
self.import_key(keyfile, dryrun=module.check_mode) self.import_key(keyfile)
if should_cleanup_keyfile: if should_cleanup_keyfile:
self.module.cleanup(keyfile) self.module.cleanup(keyfile)
module.exit_json(changed=True) module.exit_json(changed=True)
else: else:
if self.is_key_imported(keyid): if self.is_key_imported(keyid):
self.drop_key(keyid, dryrun=module.check_mode) self.drop_key(keyid)
module.exit_json(changed=True) module.exit_json(changed=True)
else: else:
module.exit_json(changed=False) module.exit_json(changed=False)
@ -130,14 +134,15 @@ class RpmKey(object):
if not is_pubkey(key): if not is_pubkey(key):
self.module.fail_json(msg="Not a public key: %s" % url) self.module.fail_json(msg="Not a public key: %s" % url)
tmpfd, tmpname = tempfile.mkstemp() tmpfd, tmpname = tempfile.mkstemp()
self.module.add_cleanup_file(tmpname)
tmpfile = os.fdopen(tmpfd, "w+b") tmpfile = os.fdopen(tmpfd, "w+b")
tmpfile.write(key) tmpfile.write(key)
tmpfile.close() tmpfile.close()
return tmpname return tmpname
def normalize_keyid(self, keyid): def normalize_keyid(self, keyid):
"""Ensure a keyid doesn't have a leading 0x, has leading or trailing whitespace, and make sure is lowercase""" """Ensure a keyid doesn't have a leading 0x, has leading or trailing whitespace, and make sure is uppercase"""
ret = keyid.strip().lower() ret = keyid.strip().upper()
if ret.startswith('0x'): if ret.startswith('0x'):
return ret[2:] return ret[2:]
elif ret.startswith('0X'): elif ret.startswith('0X'):
@ -146,21 +151,12 @@ class RpmKey(object):
return ret return ret
def getkeyid(self, keyfile): def getkeyid(self, keyfile):
stdout, stderr = self.execute_command([self.gpg, '--no-tty', '--batch', '--with-colons', '--fixed-list-mode', keyfile])
gpg = self.module.get_bin_path('gpg')
if not gpg:
gpg = self.module.get_bin_path('gpg2')
if not gpg:
self.module.fail_json(msg="rpm_key requires a command line gpg or gpg2, none found")
stdout, stderr = self.execute_command([gpg, '--no-tty', '--batch', '--with-colons', '--fixed-list-mode', '--list-packets', keyfile])
for line in stdout.splitlines(): for line in stdout.splitlines():
line = line.strip() line = line.strip()
if line.startswith(':signature packet:'): if line.startswith('pub:'):
# We want just the last 8 characters of the keyid return line.split(':')[4]
keyid = line.split()[-1].strip()[8:]
return keyid
self.module.fail_json(msg="Unexpected gpg output") self.module.fail_json(msg="Unexpected gpg output")
def is_keyid(self, keystr): def is_keyid(self, keystr):
@ -168,32 +164,26 @@ class RpmKey(object):
return re.match('(0x)?[0-9a-f]{8}', keystr, flags=re.IGNORECASE) return re.match('(0x)?[0-9a-f]{8}', keystr, flags=re.IGNORECASE)
def execute_command(self, cmd): def execute_command(self, cmd):
rc, stdout, stderr = self.module.run_command(cmd) rc, stdout, stderr = self.module.run_command(cmd, use_unsafe_shell=True)
if rc != 0: if rc != 0:
self.module.fail_json(msg=stderr) self.module.fail_json(msg=stderr)
return stdout, stderr return stdout, stderr
def is_key_imported(self, keyid): def is_key_imported(self, keyid):
stdout, stderr = self.execute_command([self.rpm, '-qa', 'gpg-pubkey']) cmd=self.rpm + ' -q gpg-pubkey --qf "%{description}" | ' + self.gpg + ' --no-tty --batch --with-colons --fixed-list-mode -'
stdout, stderr = self.execute_command(cmd)
for line in stdout.splitlines(): for line in stdout.splitlines():
line = line.strip() if keyid in line.split(':')[4]:
if not line:
continue
match = re.match('gpg-pubkey-([0-9a-f]+)-([0-9a-f]+)', line)
if not match:
self.module.fail_json(msg="rpm returned unexpected output [%s]" % line)
else:
if keyid == match.group(1):
return True return True
return False return False
def import_key(self, keyfile, dryrun=False): def import_key(self, keyfile):
if not dryrun: if not self.module.check_mode:
self.execute_command([self.rpm, '--import', keyfile]) self.execute_command([self.rpm, '--import', keyfile])
def drop_key(self, key, dryrun=False): def drop_key(self, keyid):
if not dryrun: if not self.module.check_mode:
self.execute_command([self.rpm, '--erase', '--allmatches', "gpg-pubkey-%s" % key]) self.execute_command([self.rpm, '--erase', '--allmatches', "gpg-pubkey-%s" % keyid[8:].lower()])
def main(): def main():

View file

@ -9,6 +9,11 @@
url: https://download.fedoraproject.org/pub/epel/7/x86_64/s/sl-5.02-1.el7.x86_64.rpm url: https://download.fedoraproject.org/pub/epel/7/x86_64/s/sl-5.02-1.el7.x86_64.rpm
dest: /tmp/sl.rpm dest: /tmp/sl.rpm
- name: download Mono key
get_url:
url: http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x3FA7E0328081BFF6A14DA29AA6A19B38D3D831EF
dest: /tmp/mono.gpg
- name: remove EPEL GPG key from keyring - name: remove EPEL GPG key from keyring
rpm_key: rpm_key:
state: absent state: absent
@ -25,26 +30,41 @@
- "'MISSING KEYS' in sl_check.stdout" - "'MISSING KEYS' in sl_check.stdout"
- "sl_check.failed" - "sl_check.failed"
- name: remove EPEL GPG key from keyring (Idempotant) - name: remove EPEL GPG key from keyring (idempotent)
rpm_key: rpm_key:
state: absent state: absent
key: /tmp/RPM-GPG-KEY-EPEL-7 key: /tmp/RPM-GPG-KEY-EPEL-7
register: idempotant_test register: idempotent_test
- name: check Idempotant - name: check idempontence
assert: assert:
that: "not idempotant_test.changed" that: "not idempotent_test.changed"
- name: add EPEL GPG key to key ring - name: add EPEL GPG key to key ring
rpm_key: rpm_key:
state: present state: present
key: /tmp/RPM-GPG-KEY-EPEL-7 key: /tmp/RPM-GPG-KEY-EPEL-7
- name: add EPEL GPG key to key ring (Idempotant) - name: add EPEL GPG key to key ring (idempotent)
rpm_key: rpm_key:
state: present state: present
key: /tmp/RPM-GPG-KEY-EPEL-7 key: /tmp/RPM-GPG-KEY-EPEL-7
- name: add Mono gpg key
rpm_key:
state: present
key: /tmp/mono.gpg
- name: add Mono gpg key
rpm_key:
state: present
key: /tmp/mono.gpg
register: mono_indempotence
- name: verify idempotence
assert:
that: "not mono_indempotence.changed"
- name: check GPG signature of sl. Should return okay - name: check GPG signature of sl. Should return okay
shell: "rpm --checksig /tmp/sl.rpm" shell: "rpm --checksig /tmp/sl.rpm"
register: sl_check register: sl_check