1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/tests/unit/plugins/lookup/test_onepassword.py
Xeryus Stokkel f8652571f7
Support 1Password Connect (#5588) (#7116)
* Support 1Password Connect (#5588)

- Support 1Password Connect with the opv2 client

* Follow pep8, be less explicit

* Update changelog to include PR

* 1Password Connect host and token are now also parameters

* Get argument values from the environment or lookup arguments

* Move imports

* Force using Connect token and host at the same time

* Update unit tests

* Update documentation

* Additional tests
2023-11-16 20:57:11 +01:00

305 lines
10 KiB
Python

# Copyright (c) 2022 Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import operator
import itertools
import json
import pytest
from .onepassword_conftest import ( # noqa: F401, pylint: disable=unused-import
OP_VERSION_FIXTURES,
fake_op,
opv1,
opv2,
)
from .onepassword_common import MOCK_ENTRIES
from ansible.errors import AnsibleLookupError, AnsibleOptionsError
from ansible.plugins.loader import lookup_loader
from ansible_collections.community.general.plugins.lookup.onepassword import (
OnePassCLIv1,
OnePassCLIv2,
)
@pytest.mark.parametrize(
("args", "rc", "expected_call_args", "expected_call_kwargs", "expected"),
(
([], 0, ["get", "account"], {"ignore_errors": True}, True,),
([], 1, ["get", "account"], {"ignore_errors": True}, False,),
(["acme"], 1, ["get", "account", "--account", "acme.1password.com"], {"ignore_errors": True}, False,),
)
)
def test_assert_logged_in_v1(mocker, args, rc, expected_call_args, expected_call_kwargs, expected):
mocker.patch.object(OnePassCLIv1, "_run", return_value=[rc, "", ""])
op_cli = OnePassCLIv1(*args)
result = op_cli.assert_logged_in()
op_cli._run.assert_called_with(expected_call_args, **expected_call_kwargs)
assert result == expected
def test_full_signin_v1(mocker):
mocker.patch.object(OnePassCLIv1, "_run", return_value=[0, "", ""])
op_cli = OnePassCLIv1(
subdomain="acme",
username="bob@acme.com",
secret_key="SECRET",
master_password="ONEKEYTORULETHEMALL",
)
result = op_cli.full_signin()
op_cli._run.assert_called_with([
"signin",
"acme.1password.com",
b"bob@acme.com",
b"SECRET",
"--raw",
], command_input=b"ONEKEYTORULETHEMALL")
assert result == [0, "", ""]
@pytest.mark.parametrize(
("args", "out", "expected_call_args", "expected_call_kwargs", "expected"),
(
([], "list of accounts", ["account", "get"], {"ignore_errors": True}, True,),
(["acme"], "list of accounts", ["account", "get", "--account", "acme.1password.com"], {"ignore_errors": True}, True,),
([], "", ["account", "list"], {}, False,),
)
)
def test_assert_logged_in_v2(mocker, args, out, expected_call_args, expected_call_kwargs, expected):
mocker.patch.object(OnePassCLIv2, "_run", return_value=[0, out, ""])
op_cli = OnePassCLIv2(*args)
result = op_cli.assert_logged_in()
op_cli._run.assert_called_with(expected_call_args, **expected_call_kwargs)
assert result == expected
def test_assert_logged_in_v2_connect():
op_cli = OnePassCLIv2(connect_host="http://localhost:8080", connect_token="foobar")
result = op_cli.assert_logged_in()
assert result
def test_full_signin_v2(mocker):
mocker.patch.object(OnePassCLIv2, "_run", return_value=[0, "", ""])
op_cli = OnePassCLIv2(
subdomain="acme",
username="bob@acme.com",
secret_key="SECRET",
master_password="ONEKEYTORULETHEMALL",
)
result = op_cli.full_signin()
op_cli._run.assert_called_with(
[
"account", "add", "--raw",
"--address", "acme.1password.com",
"--email", b"bob@acme.com",
"--signin",
],
command_input=b"ONEKEYTORULETHEMALL",
environment_update={'OP_SECRET_KEY': 'SECRET'},
)
assert result == [0, "", ""]
@pytest.mark.parametrize(
("version", "version_class"),
(
("1.17.2", OnePassCLIv1),
("2.27.4", OnePassCLIv2),
)
)
def test_op_correct_cli_class(fake_op, version, version_class):
op = fake_op(version)
assert op._cli.version == version
assert isinstance(op._cli, version_class)
def test_op_unsupported_cli_version(fake_op):
with pytest.raises(AnsibleLookupError, match="is unsupported"):
fake_op("99.77.77")
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_op_set_token_with_config(op_fixture, mocker, request):
op = request.getfixturevalue(op_fixture)
token = "F5417F77529B41B595D7F9D6F76EC057"
mocker.patch("os.path.isfile", return_value=True)
mocker.patch.object(op._cli, "signin", return_value=(0, token + "\n", ""))
op.set_token()
assert op.token == token
@pytest.mark.parametrize(
("op_fixture", "message"),
[
(op, value)
for op in OP_VERSION_FIXTURES
for value in
(
"Missing required parameters",
"The operation is unauthorized",
)
]
)
def test_op_set_token_with_config_missing_args(op_fixture, message, request, mocker):
op = request.getfixturevalue(op_fixture)
mocker.patch("os.path.isfile", return_value=True)
mocker.patch.object(op._cli, "signin", return_value=(99, "", ""), side_effect=AnsibleLookupError(message))
mocker.patch.object(op._cli, "full_signin", return_value=(0, "", ""))
with pytest.raises(AnsibleLookupError, match=message):
op.set_token()
op._cli.full_signin.assert_not_called()
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_op_set_token_with_config_full_signin(op_fixture, request, mocker):
op = request.getfixturevalue(op_fixture)
mocker.patch("os.path.isfile", return_value=True)
mocker.patch.object(op._cli, "signin", return_value=(99, "", ""), side_effect=AnsibleLookupError("Raised intentionally"))
mocker.patch.object(op._cli, "full_signin", return_value=(0, "", ""))
op.set_token()
op._cli.full_signin.assert_called()
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_op_set_token_without_config(op_fixture, request, mocker):
op = request.getfixturevalue(op_fixture)
token = "B988E8A2680A4A348962751A96861FA1"
mocker.patch("os.path.isfile", return_value=False)
mocker.patch.object(op._cli, "signin", return_value=(99, "", ""))
mocker.patch.object(op._cli, "full_signin", return_value=(0, token + "\n", ""))
op.set_token()
op._cli.signin.assert_not_called()
assert op.token == token
@pytest.mark.parametrize(
("op_fixture", "login_status"),
[(op, value) for op in OP_VERSION_FIXTURES for value in [False, True]]
)
def test_op_assert_logged_in(mocker, login_status, op_fixture, request):
op = request.getfixturevalue(op_fixture)
mocker.patch.object(op._cli, "assert_logged_in", return_value=login_status)
mocker.patch.object(op, "set_token")
op.assert_logged_in()
op._cli.assert_logged_in.assert_called_once()
assert op.logged_in == login_status
if not login_status:
op.set_token.assert_called_once()
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_op_get_raw_v1(mocker, op_fixture, request):
op = request.getfixturevalue(op_fixture)
mocker.patch.object(op._cli, "get_raw", return_value=[99, "RAW OUTPUT", ""])
result = op.get_raw("some item")
assert result == "RAW OUTPUT"
op._cli.get_raw.assert_called_once()
@pytest.mark.parametrize(
("op_fixture", "output", "expected"),
(
list(itertools.chain([op], d))
for op in OP_VERSION_FIXTURES
for d in [
("RAW OUTPUT", "RAW OUTPUT"),
(None, ""),
("", ""),
]
)
)
def test_op_get_field(mocker, op_fixture, output, expected, request):
op = request.getfixturevalue(op_fixture)
mocker.patch.object(op, "get_raw", return_value=output)
mocker.patch.object(op._cli, "_parse_field", return_value=output)
result = op.get_field("some item", "some field")
assert result == expected
# This test sometimes fails on older Python versions because the gathered tests mismatch.
# Sort the fixture data to make this reliable
# https://github.com/pytest-dev/pytest-xdist/issues/432
@pytest.mark.parametrize(
("cli_class", "vault", "queries", "kwargs", "output", "expected"),
(
(_cli_class, item["vault_name"], item["queries"], item.get("kwargs", {}), item["output"], item["expected"])
for _cli_class in sorted(MOCK_ENTRIES, key=operator.attrgetter("__name__"))
for item in MOCK_ENTRIES[_cli_class]
)
)
def test_op_lookup(mocker, cli_class, vault, queries, kwargs, output, expected):
mocker.patch("ansible_collections.community.general.plugins.lookup.onepassword.OnePass._get_cli_class", cli_class)
mocker.patch("ansible_collections.community.general.plugins.lookup.onepassword.OnePass.assert_logged_in", return_value=True)
mocker.patch("ansible_collections.community.general.plugins.lookup.onepassword.OnePassCLIBase._run", return_value=(0, json.dumps(output), ""))
op_lookup = lookup_loader.get("community.general.onepassword")
result = op_lookup.run(queries, vault=vault, **kwargs)
assert result == expected
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_signin(op_fixture, request):
op = request.getfixturevalue(op_fixture)
op._cli.master_password = "master_pass"
op._cli.signin()
print(op._cli.version)
op._cli._run.assert_called_once_with(['signin', '--raw'], command_input=b"master_pass")
@pytest.mark.parametrize(
("plugin", "connect_host", "connect_token"),
[
(plugin, connect_host, connect_token)
for plugin in ("community.general.onepassword", "community.general.onepassword_raw")
for (connect_host, connect_token) in
(
("http://localhost", None),
(None, "foobar"),
)
]
)
def test_op_connect_partial_args(plugin, connect_host, connect_token):
op_lookup = lookup_loader.get(plugin)
with pytest.raises(AnsibleOptionsError):
op_lookup.run("login", vault_name="test vault", connect_host=connect_host, connect_token=connect_token)
@pytest.mark.parametrize(
("kwargs"),
(
{"connect_host": "http://localhost", "connect_token": "foobar"},
{"service_account_token": "foobar"},
)
)
def test_opv1_unsupported_features(kwargs):
op_cli = OnePassCLIv1(**kwargs)
with pytest.raises(AnsibleLookupError):
op_cli.full_signin()