diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 281e55435c..64cbc7021b 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -1354,6 +1354,8 @@ files: maintainers: nate-kingsley $modules/urpmi.py: maintainers: pmakowski + $modules/usb_facts.py: + maintainers: maxopoly $modules/utm_: keywords: sophos utm maintainers: $team_e_spirit diff --git a/plugins/modules/usb_facts.py b/plugins/modules/usb_facts.py new file mode 100644 index 0000000000..340c71ee54 --- /dev/null +++ b/plugins/modules/usb_facts.py @@ -0,0 +1,113 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright (c) 2024, Max Maxopoly +# +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = ''' +--- +module: usb_facts +short_description: Allows listing information about USB devices +version_added: 8.5.0 +description: + - Allows retrieving information about available USB devices through C(lsusb). +author: + - Max Maxopoly (@maxopoly) +extends_documentation_fragment: + - community.general.attributes + - community.general.attributes.facts + - community.general.attributes.facts_module +requirements: + - lsusb binary on PATH (usually installed through the package usbutils and preinstalled on many systems) +''' + +EXAMPLES = ''' +- name: Get information about USB devices + community.general.usb_facts: + +- name: Print information about USB devices + ansible.builtin.debug: + msg: "On bus {{ item.bus }} device {{ item.device }} with id {{ item.id }} is {{ item.name }}" + loop: "{{ ansible_facts.usb_devices }}" +''' + +RETURN = r''' +ansible_facts: + description: Dictionary containing details of connected USB devices. + returned: always + type: dict + contains: + usb_devices: + description: A list of USB devices available. + returned: always + type: list + elements: dict + contains: + bus: + description: The bus the usb device is connected to. + returned: always + type: str + sample: "001" + device: + description: The device number occupied on the bus. + returned: always + type: str + sample: "002" + id: + description: ID of the USB device. + returned: always + type: str + sample: "1d6b:0002" + name: + description: Human readable name of the device. + returned: always + type: str + sample: Linux Foundation 2.0 root hub +''' + +import re +from ansible.module_utils.basic import AnsibleModule + + +def parse_lsusb(module, lsusb_path): + rc, stdout, stderr = module.run_command(lsusb_path, check_rc=True) + regex = re.compile(r'^Bus (\d{3}) Device (\d{3}): ID ([0-9a-f]{4}:[0-9a-f]{4}) (.*)$') + usb_devices = [] + for line in stdout.splitlines(): + match = re.match(regex, line) + if not match: + module.fail_json(msg="failed to parse unknown lsusb output %s" % (line), stdout=stdout, stderr=stderr) + current_device = { + 'bus': match.group(1), + 'device': match.group(2), + 'id': match.group(3), + 'name': match.group(4) + } + usb_devices.append(current_device) + return_value = { + "usb_devices": usb_devices + } + module.exit_json(msg="parsed %s USB devices" % (len(usb_devices)), stdout=stdout, stderr=stderr, ansible_facts=return_value) + + +def main(): + module = AnsibleModule( + {}, + supports_check_mode=True + ) + + # Set LANG env since we parse stdout + module.run_command_environ_update = dict(LANGUAGE='C', LC_ALL='C') + + lsusb_path = module.get_bin_path('lsusb', required=True) + parse_lsusb(module, lsusb_path) + + +if __name__ == '__main__': + main() diff --git a/tests/unit/plugins/modules/test_usb_facts.py b/tests/unit/plugins/modules/test_usb_facts.py new file mode 100644 index 0000000000..084433492f --- /dev/null +++ b/tests/unit/plugins/modules/test_usb_facts.py @@ -0,0 +1,105 @@ +# Copyright (c) Ansible project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +from ansible_collections.community.general.tests.unit.compat import mock +from ansible_collections.community.general.tests.unit.compat import unittest +from ansible.module_utils import basic +from ansible.module_utils.common.text.converters import to_bytes +from ansible_collections.community.general.plugins.modules import usb_facts + + +def set_module_args(args): + """prepare arguments so that they will be picked up during module creation""" + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) + + +class AnsibleExitJson(Exception): + """Exception class to be raised by module.exit_json and caught by the test case""" + pass + + +class AnsibleFailJson(Exception): + """Exception class to be raised by module.fail_json and caught by the test case""" + pass + + +def exit_json(*args, **kwargs): + """function to patch over exit_json; package return data into an exception""" + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): + """function to patch over fail_json; package return data into an exception""" + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +def get_bin_path(self, arg, required=False): + """Mock AnsibleModule.get_bin_path""" + if arg == 'lsusb': + return '/usr/bin/lsusb' + else: + if required: + fail_json(msg='%r not found !' % arg) + + +class TestUsbFacts(unittest.TestCase): + + def setUp(self): + self.mock_module_helper = mock.patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json, + get_bin_path=get_bin_path) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + self.testing_data = [ + { + "input": "Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub", + "bus": "001", + "device": "001", + "id": "1d6b:0002", + "name": "Linux Foundation 2.0 root hub" + }, + { + "input": "Bus 003 Device 002: ID 8087:8008 Intel Corp. Integrated Rate Matching Hub", + "bus": "003", + "device": "002", + "id": "8087:8008", + "name": "Intel Corp. Integrated Rate Matching Hub" + } + ] + self.output_fields = ["bus", "device", "id", "name"] + + def test_parsing_single_line(self): + for data in self.testing_data: + with mock.patch.object(basic.AnsibleModule, 'run_command') as mock_run_command: + command_output = data["input"] + mock_run_command.return_value = 0, command_output, None + with self.assertRaises(AnsibleExitJson) as result: + set_module_args({}) + usb_facts.main() + for output_field in self.output_fields: + self.assertEqual(result.exception.args[0]["ansible_facts"]["usb_devices"][0][output_field], data[output_field]) + + def test_parsing_multiple_lines(self): + input = "" + for data in self.testing_data: + input += ("%s\n" % data["input"]) + with mock.patch.object(basic.AnsibleModule, 'run_command') as mock_run_command: + mock_run_command.return_value = 0, input, None + with self.assertRaises(AnsibleExitJson) as result: + set_module_args({}) + usb_facts.main() + for index in range(0, len(self.testing_data)): + for output_field in self.output_fields: + self.assertEqual(result.exception.args[0]["ansible_facts"]["usb_devices"][index][output_field], + self.testing_data[index][output_field])