1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

redhat_subscription: use D-Bus for registration if possible (#6122)

subscription-manager currently does not have a way to get credentials
(username, password, activation keys, organization ID) in a secure way:
the existing command line parameters can be easily spotted when running
a process listing while 'subscription-manager register' runs.
There is a D-Bus service, which is used by e.g. cockpit and Anaconda to
interface with RHSM (at least for registration and common queries).

Try to perform the registration using D-Bus, in a way very similar to
the work done in convert2rhel [1] (with my help):
- try to do a simple signal test to check whether the system bus works;
  inspired by the login in the dconf module
- pass most of the options as registration options; for the few that are
  not part of the registration, execute 'subscription-manager' manually
- add quirks for differently working (or not) registration options for
  the D-Bus Register*() methods depending on the version of RHEL
- 'subscription-manager register' is used only in case the signal test
  is not working; silent fallback in case of D-Bus errors during the
  registration is not done on purpose to avoid silent fallback to a less
  secure registration

[1] https://github.com/oamg/convert2rhel/pull/540/
This commit is contained in:
Pino Toscano 2023-03-14 22:52:51 +01:00 committed by GitHub
parent a49ad340af
commit e939cd07ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 226 additions and 2 deletions

View file

@ -0,0 +1,4 @@
minor_changes:
- redhat_subscription - the registration is done using the D-Bus ``rhsm`` service instead of spawning a ``subscription-manager register`` command, if possible;
this avoids passing plain-text credentials as arguments to ``subscription-manager register``, which can be seen while that command runs
(https://github.com/ansible-collections/community.general/pull/6122).

View file

@ -13,9 +13,10 @@ __metaclass__ = type
DOCUMENTATION = '''
---
module: redhat_subscription
short_description: Manage registration and subscriptions to RHSM using the C(subscription-manager) command
short_description: Manage registration and subscriptions to RHSM using C(subscription-manager)
description:
- Manage registration and subscription to the Red Hat Subscription Management entitlement platform using the C(subscription-manager) command
- Manage registration and subscription to the Red Hat Subscription Management entitlement platform using the C(subscription-manager) command,
registering using D-Bus if possible.
author: "Barnaby Court (@barnabycourt)"
notes:
- In order to register a system, subscription-manager requires either a username and password, or an activationkey and an Organization ID.
@ -25,6 +26,8 @@ notes:
config file and default to None.
requirements:
- subscription-manager
- Optionally the C(dbus) Python library; this is usually included in the OS
as it is used by C(subscription-manager).
extends_documentation_fragment:
- community.general.attributes
attributes:
@ -297,6 +300,7 @@ import json
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.six.moves import configparser
from ansible.module_utils import distro
SUBMAN_CMD = None
@ -410,6 +414,35 @@ class Rhsm(RegistrationBase):
else:
return False
def _can_connect_to_dbus(self):
"""
Checks whether it is possible to connect to the system D-Bus bus.
:returns: bool -- whether it is possible to connect to the system D-Bus bus.
"""
try:
# Technically speaking, subscription-manager uses dbus-python
# as D-Bus library, so this ought to work; better be safe than
# sorry, I guess...
import dbus
except ImportError:
self.module.debug('dbus Python module not available, will use CLI')
return False
try:
bus = dbus.SystemBus()
msg = dbus.lowlevel.SignalMessage('/', 'com.example', 'test')
bus.send_message(msg)
bus.flush()
except dbus.exceptions.DBusException as e:
self.module.debug('Failed to connect to system D-Bus bus, will use CLI: %s' % e)
return False
self.module.debug('Verified system D-Bus bus as usable')
return True
def register(self, username, password, token, auto_attach, activationkey, org_id,
consumer_type, consumer_name, consumer_id, force_register, environment,
release):
@ -417,6 +450,28 @@ class Rhsm(RegistrationBase):
Register the current system to the provided RHSM or Red Hat Satellite
or Katello server
Raises:
* Exception - if any error occurs during the registration
'''
# There is no support for token-based registration in the D-Bus API
# of rhsm, so always use the CLI in that case.
if not token and self._can_connect_to_dbus():
self._register_using_dbus(username, password, auto_attach,
activationkey, org_id, consumer_type,
consumer_name, consumer_id,
force_register, environment, release)
return
self._register_using_cli(username, password, token, auto_attach,
activationkey, org_id, consumer_type,
consumer_name, consumer_id,
force_register, environment, release)
def _register_using_cli(self, username, password, token, auto_attach,
activationkey, org_id, consumer_type, consumer_name,
consumer_id, force_register, environment, release):
'''
Register using the 'subscription-manager' command
Raises:
* Exception - if error occurs while running command
'''
@ -459,6 +514,169 @@ class Rhsm(RegistrationBase):
rc, stderr, stdout = self.module.run_command(args, check_rc=True, expand_user_and_vars=False)
def _register_using_dbus(self, username, password, auto_attach,
activationkey, org_id, consumer_type, consumer_name,
consumer_id, force_register, environment, release):
'''
Register using D-Bus (connecting to the rhsm service)
Raises:
* Exception - if error occurs during the D-Bus communication
'''
import dbus
SUBSCRIPTION_MANAGER_LOCALE = 'C'
# Seconds to wait for Registration to complete over DBus;
# 10 minutes should be a pretty generous timeout.
REGISTRATION_TIMEOUT = 600
def str2int(s, default=0):
try:
return int(s)
except ValueError:
return default
distro_id = distro.id()
distro_version = tuple(str2int(p) for p in distro.version_parts())
# Stop the rhsm service when using systemd (which means Fedora or
# RHEL 7+): this is because the service may not use new configuration bits
# - with subscription-manager < 1.26.5-1 (in RHEL < 8.2);
# fixed later by https://github.com/candlepin/subscription-manager/pull/2175
# - sporadically: https://bugzilla.redhat.com/show_bug.cgi?id=2049296
if distro_id == 'fedora' or distro_version[0] >= 7:
cmd = ['systemctl', 'stop', 'rhsm']
self.module.run_command(cmd, check_rc=True, expand_user_and_vars=False)
# While there is a 'force' options for the registration, it is actually
# not implemented (and thus it does not work)
# - in RHEL 7 and earlier
# - in RHEL 8 before 8.8: https://bugzilla.redhat.com/show_bug.cgi?id=2118486
# - in RHEL 9 before 9.2: https://bugzilla.redhat.com/show_bug.cgi?id=2121350
# Hence, use it only when implemented, manually unregistering otherwise.
# Match it on RHEL, since we know about it; other distributions
# will need their own logic.
dbus_force_option_works = False
if (distro_id == 'rhel' and
((distro_version[0] == 8 and distro_version[1] >= 8) or
(distro_version[0] == 9 and distro_version[1] >= 2) or
distro_version[0] > 9)):
dbus_force_option_works = True
if force_register and not dbus_force_option_works:
self.unregister()
register_opts = {}
if consumer_type:
register_opts['consumer_type'] = consumer_type
if consumer_name:
register_opts['name'] = consumer_name
if consumer_id:
register_opts['consumerid'] = consumer_id
if environment:
# The option for environments used to be 'environment' in versions
# of RHEL before 8.6, and then it changed to 'environments'; since
# the Register*() D-Bus functions reject unknown options, we have
# to pass the right option depending on the version -- funky.
environment_key = 'environment'
if distro_id == 'fedora' or \
(distro_id == 'rhel' and
((distro_version[0] == 8 and distro_version[1] >= 6) or
distro_version[0] >= 9)):
environment_key = 'environments'
register_opts[environment_key] = environment
if force_register and dbus_force_option_works:
register_opts['force'] = True
# Wrap it as proper D-Bus dict
register_opts = dbus.Dictionary(register_opts, signature='sv', variant_level=1)
connection_opts = {}
# Wrap it as proper D-Bus dict
connection_opts = dbus.Dictionary(connection_opts, signature='sv', variant_level=1)
bus = dbus.SystemBus()
register_server = bus.get_object('com.redhat.RHSM1',
'/com/redhat/RHSM1/RegisterServer')
address = register_server.Start(
SUBSCRIPTION_MANAGER_LOCALE,
dbus_interface='com.redhat.RHSM1.RegisterServer',
)
try:
# Use the private bus to register the system
self.module.debug('Connecting to the private DBus')
private_bus = dbus.connection.Connection(address)
try:
if activationkey:
args = (
org_id,
[activationkey],
register_opts,
connection_opts,
SUBSCRIPTION_MANAGER_LOCALE,
)
private_bus.call_blocking(
'com.redhat.RHSM1',
'/com/redhat/RHSM1/Register',
'com.redhat.RHSM1.Register',
'RegisterWithActivationKeys',
'sasa{sv}a{sv}s',
args,
timeout=REGISTRATION_TIMEOUT,
)
else:
args = (
org_id or '',
username,
password,
register_opts,
connection_opts,
SUBSCRIPTION_MANAGER_LOCALE,
)
private_bus.call_blocking(
'com.redhat.RHSM1',
'/com/redhat/RHSM1/Register',
'com.redhat.RHSM1.Register',
'Register',
'sssa{sv}a{sv}s',
args,
timeout=REGISTRATION_TIMEOUT,
)
except dbus.exceptions.DBusException as e:
# Sometimes we get NoReply but the registration has succeeded.
# Check the registration status before deciding if this is an error.
if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply':
if not self.is_registered():
# Host is not registered so re-raise the error
raise
else:
raise
# Host was registered so continue
finally:
# Always shut down the private bus
self.module.debug('Shutting down private DBus instance')
register_server.Stop(
SUBSCRIPTION_MANAGER_LOCALE,
dbus_interface='com.redhat.RHSM1.RegisterServer',
)
# Make sure to refresh all the local data: this will fetch all the
# certificates, update redhat.repo, etc.
self.module.run_command([SUBMAN_CMD, 'refresh'],
check_rc=True, expand_user_and_vars=False)
if auto_attach:
args = [SUBMAN_CMD, 'attach', '--auto']
self.module.run_command(args, check_rc=True, expand_user_and_vars=False)
# There is no support for setting the release via D-Bus, so invoke
# the CLI for this.
if release:
args = [SUBMAN_CMD, 'release', '--set', release]
self.module.run_command(args, check_rc=True, expand_user_and_vars=False)
def unsubscribe(self, serials=None):
'''
Unsubscribe a system from subscribed channels

View file

@ -27,6 +27,8 @@ def patch_redhat_subscription(mocker):
mocker.patch('ansible_collections.community.general.plugins.modules.redhat_subscription.unlink', return_value=True)
mocker.patch('ansible_collections.community.general.plugins.modules.redhat_subscription.AnsibleModule.get_bin_path',
return_value='/testbin/subscription-manager')
mocker.patch('ansible_collections.community.general.plugins.modules.redhat_subscription.Rhsm._can_connect_to_dbus',
return_value=False)
@pytest.mark.parametrize('patch_ansible_module', [{}], indirect=['patch_ansible_module'])