mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Implement usb_facts module (#8076)
* First implementation of usb module * Rename to usb_facts * Working tests * Appease linter * Fix author * Remove splitlines arg for python 2.7 compat * indent usb test further * indent usb test a bit less * Remove trailing whitespace * Update plugins/modules/usb_facts.py Co-authored-by: Felix Fontein <felix@fontein.de> * Update plugins/modules/usb_facts.py Co-authored-by: Felix Fontein <felix@fontein.de> * Apply suggestions from code review Co-authored-by: Felix Fontein <felix@fontein.de> * Adjust usb_facts PR based on feedback * Add usage example and extend correct documentation fragment * Add myself to BOTMETA.yml * Apply suggestions from code review Co-authored-by: Felix Fontein <felix@fontein.de> --------- Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
parent
73b6b98ed9
commit
6298ad4faa
3 changed files with 220 additions and 0 deletions
2
.github/BOTMETA.yml
vendored
2
.github/BOTMETA.yml
vendored
|
@ -1354,6 +1354,8 @@ files:
|
|||
maintainers: nate-kingsley
|
||||
$modules/urpmi.py:
|
||||
maintainers: pmakowski
|
||||
$modules/usb_facts.py:
|
||||
maintainers: maxopoly
|
||||
$modules/utm_:
|
||||
keywords: sophos utm
|
||||
maintainers: $team_e_spirit
|
||||
|
|
113
plugins/modules/usb_facts.py
Normal file
113
plugins/modules/usb_facts.py
Normal file
|
@ -0,0 +1,113 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (c) 2024, Max Maxopoly <max@dermax.org>
|
||||
#
|
||||
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
DOCUMENTATION = '''
|
||||
---
|
||||
module: usb_facts
|
||||
short_description: Allows listing information about USB devices
|
||||
version_added: 8.5.0
|
||||
description:
|
||||
- Allows retrieving information about available USB devices through C(lsusb).
|
||||
author:
|
||||
- Max Maxopoly (@maxopoly)
|
||||
extends_documentation_fragment:
|
||||
- community.general.attributes
|
||||
- community.general.attributes.facts
|
||||
- community.general.attributes.facts_module
|
||||
requirements:
|
||||
- lsusb binary on PATH (usually installed through the package usbutils and preinstalled on many systems)
|
||||
'''
|
||||
|
||||
EXAMPLES = '''
|
||||
- name: Get information about USB devices
|
||||
community.general.usb_facts:
|
||||
|
||||
- name: Print information about USB devices
|
||||
ansible.builtin.debug:
|
||||
msg: "On bus {{ item.bus }} device {{ item.device }} with id {{ item.id }} is {{ item.name }}"
|
||||
loop: "{{ ansible_facts.usb_devices }}"
|
||||
'''
|
||||
|
||||
RETURN = r'''
|
||||
ansible_facts:
|
||||
description: Dictionary containing details of connected USB devices.
|
||||
returned: always
|
||||
type: dict
|
||||
contains:
|
||||
usb_devices:
|
||||
description: A list of USB devices available.
|
||||
returned: always
|
||||
type: list
|
||||
elements: dict
|
||||
contains:
|
||||
bus:
|
||||
description: The bus the usb device is connected to.
|
||||
returned: always
|
||||
type: str
|
||||
sample: "001"
|
||||
device:
|
||||
description: The device number occupied on the bus.
|
||||
returned: always
|
||||
type: str
|
||||
sample: "002"
|
||||
id:
|
||||
description: ID of the USB device.
|
||||
returned: always
|
||||
type: str
|
||||
sample: "1d6b:0002"
|
||||
name:
|
||||
description: Human readable name of the device.
|
||||
returned: always
|
||||
type: str
|
||||
sample: Linux Foundation 2.0 root hub
|
||||
'''
|
||||
|
||||
import re
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
|
||||
|
||||
def parse_lsusb(module, lsusb_path):
|
||||
rc, stdout, stderr = module.run_command(lsusb_path, check_rc=True)
|
||||
regex = re.compile(r'^Bus (\d{3}) Device (\d{3}): ID ([0-9a-f]{4}:[0-9a-f]{4}) (.*)$')
|
||||
usb_devices = []
|
||||
for line in stdout.splitlines():
|
||||
match = re.match(regex, line)
|
||||
if not match:
|
||||
module.fail_json(msg="failed to parse unknown lsusb output %s" % (line), stdout=stdout, stderr=stderr)
|
||||
current_device = {
|
||||
'bus': match.group(1),
|
||||
'device': match.group(2),
|
||||
'id': match.group(3),
|
||||
'name': match.group(4)
|
||||
}
|
||||
usb_devices.append(current_device)
|
||||
return_value = {
|
||||
"usb_devices": usb_devices
|
||||
}
|
||||
module.exit_json(msg="parsed %s USB devices" % (len(usb_devices)), stdout=stdout, stderr=stderr, ansible_facts=return_value)
|
||||
|
||||
|
||||
def main():
|
||||
module = AnsibleModule(
|
||||
{},
|
||||
supports_check_mode=True
|
||||
)
|
||||
|
||||
# Set LANG env since we parse stdout
|
||||
module.run_command_environ_update = dict(LANGUAGE='C', LC_ALL='C')
|
||||
|
||||
lsusb_path = module.get_bin_path('lsusb', required=True)
|
||||
parse_lsusb(module, lsusb_path)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
105
tests/unit/plugins/modules/test_usb_facts.py
Normal file
105
tests/unit/plugins/modules/test_usb_facts.py
Normal file
|
@ -0,0 +1,105 @@
|
|||
# Copyright (c) Ansible project
|
||||
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
import json
|
||||
|
||||
from ansible_collections.community.general.tests.unit.compat import mock
|
||||
from ansible_collections.community.general.tests.unit.compat import unittest
|
||||
from ansible.module_utils import basic
|
||||
from ansible.module_utils.common.text.converters import to_bytes
|
||||
from ansible_collections.community.general.plugins.modules import usb_facts
|
||||
|
||||
|
||||
def set_module_args(args):
|
||||
"""prepare arguments so that they will be picked up during module creation"""
|
||||
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
|
||||
basic._ANSIBLE_ARGS = to_bytes(args)
|
||||
|
||||
|
||||
class AnsibleExitJson(Exception):
|
||||
"""Exception class to be raised by module.exit_json and caught by the test case"""
|
||||
pass
|
||||
|
||||
|
||||
class AnsibleFailJson(Exception):
|
||||
"""Exception class to be raised by module.fail_json and caught by the test case"""
|
||||
pass
|
||||
|
||||
|
||||
def exit_json(*args, **kwargs):
|
||||
"""function to patch over exit_json; package return data into an exception"""
|
||||
if 'changed' not in kwargs:
|
||||
kwargs['changed'] = False
|
||||
raise AnsibleExitJson(kwargs)
|
||||
|
||||
|
||||
def fail_json(*args, **kwargs):
|
||||
"""function to patch over fail_json; package return data into an exception"""
|
||||
kwargs['failed'] = True
|
||||
raise AnsibleFailJson(kwargs)
|
||||
|
||||
|
||||
def get_bin_path(self, arg, required=False):
|
||||
"""Mock AnsibleModule.get_bin_path"""
|
||||
if arg == 'lsusb':
|
||||
return '/usr/bin/lsusb'
|
||||
else:
|
||||
if required:
|
||||
fail_json(msg='%r not found !' % arg)
|
||||
|
||||
|
||||
class TestUsbFacts(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mock_module_helper = mock.patch.multiple(basic.AnsibleModule,
|
||||
exit_json=exit_json,
|
||||
fail_json=fail_json,
|
||||
get_bin_path=get_bin_path)
|
||||
self.mock_module_helper.start()
|
||||
self.addCleanup(self.mock_module_helper.stop)
|
||||
self.testing_data = [
|
||||
{
|
||||
"input": "Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub",
|
||||
"bus": "001",
|
||||
"device": "001",
|
||||
"id": "1d6b:0002",
|
||||
"name": "Linux Foundation 2.0 root hub"
|
||||
},
|
||||
{
|
||||
"input": "Bus 003 Device 002: ID 8087:8008 Intel Corp. Integrated Rate Matching Hub",
|
||||
"bus": "003",
|
||||
"device": "002",
|
||||
"id": "8087:8008",
|
||||
"name": "Intel Corp. Integrated Rate Matching Hub"
|
||||
}
|
||||
]
|
||||
self.output_fields = ["bus", "device", "id", "name"]
|
||||
|
||||
def test_parsing_single_line(self):
|
||||
for data in self.testing_data:
|
||||
with mock.patch.object(basic.AnsibleModule, 'run_command') as mock_run_command:
|
||||
command_output = data["input"]
|
||||
mock_run_command.return_value = 0, command_output, None
|
||||
with self.assertRaises(AnsibleExitJson) as result:
|
||||
set_module_args({})
|
||||
usb_facts.main()
|
||||
for output_field in self.output_fields:
|
||||
self.assertEqual(result.exception.args[0]["ansible_facts"]["usb_devices"][0][output_field], data[output_field])
|
||||
|
||||
def test_parsing_multiple_lines(self):
|
||||
input = ""
|
||||
for data in self.testing_data:
|
||||
input += ("%s\n" % data["input"])
|
||||
with mock.patch.object(basic.AnsibleModule, 'run_command') as mock_run_command:
|
||||
mock_run_command.return_value = 0, input, None
|
||||
with self.assertRaises(AnsibleExitJson) as result:
|
||||
set_module_args({})
|
||||
usb_facts.main()
|
||||
for index in range(0, len(self.testing_data)):
|
||||
for output_field in self.output_fields:
|
||||
self.assertEqual(result.exception.args[0]["ansible_facts"]["usb_devices"][index][output_field],
|
||||
self.testing_data[index][output_field])
|
Loading…
Reference in a new issue