1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

onepassword - Support v2 (#4728)

* Begin building out separate classes to support  different op cli versions

Create separet base classes for each major version.
Define the main interface in the base class.
Create methods for getting the current version and instantiating the
appropriate class based on the found version.

* First pass at mostly working CLI version classes

* Correct mismathched parameters

* Update _run() method to allow updating enviroment

This allows passing in the app secret as an env var, which is more
secure than using a command line arg.

* Continuing to improve the interface

* Tear existing tests down to the studs

These tests were based off of the LastPass unit tests. I’m going to
just start from scratch given the new plugin code is vastly diffenent.

* Fix sanity test

* CLI config file path can be None

* Improve required param checking

- only report missing params
- use proper grammer based on number of missing params

* Change assert_logged_in() method return value

Return a boolean value indicating whether or not account is signed in

* Improve full login for v2

Have to do a bit of a dance to avoid hitting the interactive prompt
if there are no accounts configured.

* Remove unused methods

* Add some tests

* Fix linting errors

* Move fixtures to separate file

* Restructure mock test data and add more tests

* Add boilerplate

* Add test scenario for op v2 and increase coverage

* Fix up copyright statements

* Test v1 and v2 in all cases

* Use a more descriptive variable name

* Use docstrings rather than pass in abstract class

This adds coverage to abstract methods with the least amount of hackery.

* Increase test coverage for CLI classes

* Sort test parameters to avoid collection errors

* Update version tested in docs

* Revere test parameter sorting for now

The parameters need to be sorted to avoid the issue in older Python
versions in CI, but I’m having trouble working out how to do that
currently.

* Allow passing kwargs to the lookup module under test

* Favor label over id for v2 when looking for values

Add tests

* Display a warning for section on op v2 or greater

There is no “value” in section fields. If we wanted to support sections
in v2, we would also have to allow specifying the field name in
order to override “value”.

* Move test cases to their own file

Getting a bit unwieldy having it in the test file

* Move output into JSON files fore easier reuse

* Switch to using get_options()

* Add licenses for fixture files

* Use get_option() since get_options() was added in Ansible Core 2.12

* Rearrange fixtures

* Add changelog

* Move common classes to module_utils

* Move common classes back to lookup

The plugin relies on AnsibleLookupError() quite a bit which is not available
in module code.

Remove use of display for errors since section isn’t actually deprecated.

* Properly handle sections

Still room for improvement, but this is at least a start.

* Remove some comments that won’t be addressed

* Make test gathering more deterministic to avoid failures

* Update changelog fragment

* Simple fix for making tests reliable
This commit is contained in:
Sam Doran 2022-11-06 05:32:35 -05:00 committed by GitHub
parent 5af84e57e4
commit be0b5e5f8c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 1196 additions and 427 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- onepassword - support version 2 of the OnePassword CLI (https://github.com/ansible-collections/community.general/pull/4728)

View file

@ -55,7 +55,7 @@ DOCUMENTATION = '''
- This lookup stores potentially sensitive data from 1Password as Ansible facts. - This lookup stores potentially sensitive data from 1Password as Ansible facts.
Facts are subject to caching if enabled, which means this data could be stored in clear text Facts are subject to caching if enabled, which means this data could be stored in clear text
on disk or in a database. on disk or in a database.
- Tested with C(op) version 0.5.3 - Tested with C(op) version 2.7.2
''' '''
EXAMPLES = """ EXAMPLES = """
@ -96,106 +96,123 @@ RETURN = """
elements: str elements: str
""" """
import errno import abc
import json
import os import os
import json
from subprocess import Popen, PIPE import subprocess
from ansible.plugins.lookup import LookupBase from ansible.plugins.lookup import LookupBase
from ansible.errors import AnsibleLookupError from ansible.errors import AnsibleLookupError
from ansible.module_utils.common.process import get_bin_path
from ansible.module_utils.common.text.converters import to_bytes, to_text from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.module_utils.six import with_metaclass
from ansible_collections.community.general.plugins.module_utils.onepassword import OnePasswordConfig from ansible_collections.community.general.plugins.module_utils.onepassword import OnePasswordConfig
class OnePass(object): class OnePassCLIBase(with_metaclass(abc.ABCMeta, object)):
def __init__(self, path='op'): bin = "op"
self.cli_path = path
self.logged_in = False
self.token = None
self.subdomain = None
self.domain = None
self.username = None
self.secret_key = None
self.master_password = None
self._config = OnePasswordConfig() def __init__(self, subdomain=None, domain="1password.com", username=None, secret_key=None, master_password=None):
self.subdomain = subdomain
self.domain = domain
self.username = username
self.master_password = master_password
self.secret_key = secret_key
def get_token(self): self._path = None
# If the config file exists, assume an initial signin has taken place and try basic sign in self._version = None
if os.path.isfile(self._config.config_file_path):
if not self.master_password: def _check_required_params(self, required_params):
raise AnsibleLookupError('Unable to sign in to 1Password. master_password is required.') non_empty_attrs = dict((param, getattr(self, param, None)) for param in required_params if getattr(self, param, None))
missing = set(required_params).difference(non_empty_attrs)
if missing:
prefix = "Unable to sign in to 1Password. Missing required parameter"
plural = ""
suffix = ": {params}.".format(params=", ".join(missing))
if len(missing) > 1:
plural = "s"
try: msg = "{prefix}{plural}{suffix}".format(prefix=prefix, plural=plural, suffix=suffix)
args = ['signin', '--output=raw'] raise AnsibleLookupError(msg)
if self.subdomain: @abc.abstractmethod
args = ['signin', self.subdomain, '--output=raw'] def _parse_field(self, data_json, field_name, section_title):
"""Main method for parsing data returned from the op command line tool"""
rc, out, err = self._run(args, command_input=to_bytes(self.master_password)) def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False, environment_update=None):
self.token = out.strip() command = [self.path] + args
call_kwargs = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"stdin": subprocess.PIPE,
}
except AnsibleLookupError: if environment_update:
self.full_login() env = os.environ.copy()
env.update(environment_update)
call_kwargs["env"] = env
else: p = subprocess.Popen(command, **call_kwargs)
# Attempt a full sign in since there appears to be no existing sign in
self.full_login()
def assert_logged_in(self):
try:
rc, out, err = self._run(['get', 'account'], ignore_errors=True)
if rc == 0:
self.logged_in = True
if not self.logged_in:
self.get_token()
except OSError as e:
if e.errno == errno.ENOENT:
raise AnsibleLookupError("1Password CLI tool '%s' not installed in path on control machine" % self.cli_path)
raise e
def get_raw(self, item_id, vault=None):
args = ["get", "item", item_id]
if vault is not None:
args += ['--vault={0}'.format(vault)]
if not self.logged_in:
args += [to_bytes('--session=') + self.token]
rc, output, dummy = self._run(args)
return output
def get_field(self, item_id, field, section=None, vault=None):
output = self.get_raw(item_id, vault)
return self._parse_field(output, field, section) if output != '' else ''
def full_login(self):
if None in [self.subdomain, self.username, self.secret_key, self.master_password]:
raise AnsibleLookupError('Unable to perform initial sign in to 1Password. '
'subdomain, username, secret_key, and master_password are required to perform initial sign in.')
args = [
'signin',
'{0}.{1}'.format(self.subdomain, self.domain),
to_bytes(self.username),
to_bytes(self.secret_key),
'--output=raw',
]
rc, out, err = self._run(args, command_input=to_bytes(self.master_password))
self.token = out.strip()
def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
command = [self.cli_path] + args
p = Popen(command, stdout=PIPE, stderr=PIPE, stdin=PIPE)
out, err = p.communicate(input=command_input) out, err = p.communicate(input=command_input)
rc = p.wait() rc = p.wait()
if not ignore_errors and rc != expected_rc: if not ignore_errors and rc != expected_rc:
raise AnsibleLookupError(to_text(err)) raise AnsibleLookupError(to_text(err))
return rc, out, err return rc, out, err
def _parse_field(self, data_json, field_name, section_title=None): @abc.abstractmethod
def assert_logged_in(self):
"""Check whether a login session exists"""
@abc.abstractmethod
def full_signin(self):
"""Performa full login"""
@abc.abstractmethod
def get_raw(self, item_id, vault=None, token=None):
"""Gets the specified item from the vault"""
@abc.abstractmethod
def signin(self):
"""Sign in using the master password"""
@property
def path(self):
if self._path is None:
self._path = get_bin_path(self.bin)
return self._path
@property
def version(self):
if self._version is None:
self._version = self.get_current_version()
return self._version
@classmethod
def get_current_version(cls):
"""Standalone method to get the op CLI version. Useful when determining which class to load
based on the current version."""
try:
bin_path = get_bin_path(cls.bin)
except ValueError:
raise AnsibleLookupError("Unable to locate '%s' command line tool" % cls.bin)
try:
b_out = subprocess.check_output([bin_path, "--version"], stderr=subprocess.PIPE)
except subprocess.CalledProcessError as cpe:
raise AnsibleLookupError("Unable to get the op version: %s" % cpe)
return to_text(b_out).strip()
class OnePassCLIv1(OnePassCLIBase):
supports_version = "1"
def _parse_field(self, data_json, field_name, section_title):
""" """
Retrieves the desired field from the `op` response payload Retrieves the desired field from the `op` response payload
@ -249,36 +266,356 @@ class OnePass(object):
# check the details dictionary for `field_name` and return it immediately if it exists # check the details dictionary for `field_name` and return it immediately if it exists
# when the entry is a "password" instead of a "login" item, the password field is a key # when the entry is a "password" instead of a "login" item, the password field is a key
# in the `details` dictionary: # in the `details` dictionary:
if field_name in data['details']: if field_name in data["details"]:
return data['details'][field_name] return data["details"][field_name]
# when the field is not found above, iterate through the fields list in the object details # when the field is not found above, iterate through the fields list in the object details
for field_data in data['details'].get('fields', []): for field_data in data["details"].get("fields", []):
if field_data.get('name', '').lower() == field_name.lower(): if field_data.get("name", "").lower() == field_name.lower():
return field_data.get('value', '') return field_data.get("value", "")
for section_data in data['details'].get('sections', []):
if section_title is not None and section_title.lower() != section_data['title'].lower(): for section_data in data["details"].get("sections", []):
if section_title is not None and section_title.lower() != section_data["title"].lower():
continue continue
for field_data in section_data.get('fields', []):
if field_data.get('t', '').lower() == field_name.lower(): for field_data in section_data.get("fields", []):
return field_data.get('v', '') if field_data.get("t", "").lower() == field_name.lower():
return '' return field_data.get("v", "")
return ""
def assert_logged_in(self):
args = ["get", "account"]
if self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain)
args.extend(["--account", account])
rc, out, err = self._run(args, ignore_errors=True)
return not bool(rc)
def full_signin(self):
required_params = [
"subdomain",
"username",
"secret_key",
"master_password",
]
self._check_required_params(required_params)
args = [
"signin",
"{0}.{1}".format(self.subdomain, self.domain),
to_bytes(self.username),
to_bytes(self.secret_key),
"--raw",
]
return self._run(args, command_input=to_bytes(self.master_password))
def get_raw(self, item_id, vault=None, token=None):
args = ["get", "item", item_id]
if vault is not None:
args += ["--vault={0}".format(vault)]
if token is not None:
args += [to_bytes("--session=") + token]
return self._run(args)
def signin(self):
self._check_required_params(['master_password'])
args = ["signin", "--raw"]
if self.subdomain:
args.append(self.subdomain)
return self._run(args, command_input=to_bytes(self.master_password))
class OnePassCLIv2(OnePassCLIBase):
"""
CLIv2 Syntax Reference: https://developer.1password.com/docs/cli/upgrade#step-2-update-your-scripts
"""
supports_version = "2"
def _parse_field(self, data_json, field_name, section_title=None):
"""
Schema reference: https://developer.1password.com/docs/cli/item-template-json
Example Data:
# Password item
{
"id": "ywvdbojsguzgrgnokmcxtydgdv",
"title": "Authy Backup",
"version": 1,
"vault": {
"id": "bcqxysvcnejjrwzoqrwzcqjqxc",
"name": "Personal"
},
"category": "PASSWORD",
"last_edited_by": "7FUPZ8ZNE02KSHMAIMKHIVUE17",
"created_at": "2015-01-18T13:13:38Z",
"updated_at": "2016-02-20T16:23:54Z",
"additional_information": "Jan 18, 2015, 08:13:38",
"fields": [
{
"id": "password",
"type": "CONCEALED",
"purpose": "PASSWORD",
"label": "password",
"value": "OctoberPoppyNuttyDraperySabbath",
"reference": "op://Personal/Authy Backup/password",
"password_details": {
"strength": "FANTASTIC"
}
},
{
"id": "notesPlain",
"type": "STRING",
"purpose": "NOTES",
"label": "notesPlain",
"value": "Backup password to restore Authy",
"reference": "op://Personal/Authy Backup/notesPlain"
}
]
}
# Login item
{
"id": "awk4s2u44fhnrgppszcsvc663i",
"title": "Dummy Login",
"version": 2,
"vault": {
"id": "stpebbaccrq72xulgouxsk4p7y",
"name": "Personal"
},
"category": "LOGIN",
"last_edited_by": "LSGPJERUYBH7BFPHMZ2KKGL6AU",
"created_at": "2018-04-25T21:55:19Z",
"updated_at": "2018-04-25T21:56:06Z",
"additional_information": "agent.smith",
"urls": [
{
"primary": true,
"href": "https://acme.com"
}
],
"sections": [
{
"id": "linked items",
"label": "Related Items"
}
],
"fields": [
{
"id": "username",
"type": "STRING",
"purpose": "USERNAME",
"label": "username",
"value": "agent.smith",
"reference": "op://Personal/Dummy Login/username"
},
{
"id": "password",
"type": "CONCEALED",
"purpose": "PASSWORD",
"label": "password",
"value": "Q7vFwTJcqwxKmTU]Dzx7NW*wrNPXmj",
"entropy": 159.6083697084228,
"reference": "op://Personal/Dummy Login/password",
"password_details": {
"entropy": 159,
"generated": true,
"strength": "FANTASTIC"
}
},
{
"id": "notesPlain",
"type": "STRING",
"purpose": "NOTES",
"label": "notesPlain",
"reference": "op://Personal/Dummy Login/notesPlain"
}
]
}
"""
data = json.loads(data_json)
for field in data.get("fields", []):
if section_title is None:
# If the field name exists in the section, return that value
if field.get(field_name):
return field.get(field_name)
# If the field name doesn't exist in the section, match on the value of "label"
# then "id" and return "value"
if field.get("label") == field_name:
return field["value"]
if field.get("id") == field_name:
return field["value"]
# Look at the section data and get an indentifier. The value of 'id' is either a unique ID
# or a human-readable string. If a 'label' field exists, prefer that since
# it is the value visible in the 1Password UI when both 'id' and 'label' exist.
section = field.get("section", {})
current_section_title = section.get("label", section.get("id"))
if section_title == current_section_title:
# In the correct section. Check "label" then "id" for the desired field_name
if field.get("label") == field_name:
return field["value"]
if field.get("id") == field_name:
return field["value"]
return ""
def assert_logged_in(self):
args = ["account", "list"]
if self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain)
args.extend(["--account", account])
rc, out, err = self._run(args)
if out:
# Running 'op account get' if there are no accounts configured on the system drops into
# an interactive prompt. Only run 'op account get' after first listing accounts to see
# if there are any previously configured accounts.
args = ["account", "get"]
if self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain)
args.extend(["--account", account])
rc, out, err = self._run(args)
return not bool(rc)
return False
def full_signin(self):
required_params = [
"subdomain",
"username",
"secret_key",
"master_password",
]
self._check_required_params(required_params)
args = [
"account", "add", "--raw",
"--address", "{0}.{1}".format(self.subdomain, self.domain),
"--email", to_bytes(self.username),
"--signin",
]
environment_update = {"OP_SECRET_KEY": self.secret_key}
return self._run(args, command_input=to_bytes(self.master_password), environment_update=environment_update)
def get_raw(self, item_id, vault=None, token=None):
args = ["item", "get", item_id, "--format", "json"]
if vault is not None:
args += ["--vault={0}".format(vault)]
if token is not None:
args += [to_bytes("--session=") + token]
return self._run(args)
def signin(self):
self._check_required_params(['master_password'])
args = ["signin", "--raw"]
if self.subdomain:
args.extend(["--account", self.subdomain])
return self._run(args, command_input=to_bytes(self.master_password))
class OnePass(object):
def __init__(self, subdomain=None, domain="1password.com", username=None, secret_key=None, master_password=None):
self.subdomain = subdomain
self.domain = domain
self.username = username
self.secret_key = secret_key
self.master_password = master_password
self.logged_in = False
self.token = None
self._config = OnePasswordConfig()
self._cli = self._get_cli_class()
def _get_cli_class(self):
version = OnePassCLIBase.get_current_version()
for cls in OnePassCLIBase.__subclasses__():
if cls.supports_version == version.split(".")[0]:
try:
return cls(self.subdomain, self.domain, self.username, self.secret_key, self.master_password)
except TypeError as e:
raise AnsibleLookupError(e)
raise AnsibleLookupError("op version %s is unsupported" % version)
def set_token(self):
if self._config.config_file_path and os.path.isfile(self._config.config_file_path):
# If the config file exists, assume an initial sign in has taken place and try basic sign in
try:
rc, out, err = self._cli.signin()
except AnsibleLookupError as exc:
test_strings = (
"missing required parameters",
"unauthorized",
)
if any(string in exc.message.lower() for string in test_strings):
# A required parameter is missing, or a bad master password was supplied
# so don't bother attempting a full signin
raise
rc, out, err = self._cli.full_signin()
self.token = out.strip()
else:
# Attempt a full signin since there appears to be no existing signin
rc, out, err = self._cli.full_signin()
self.token = out.strip()
def assert_logged_in(self):
logged_in = self._cli.assert_logged_in()
if logged_in:
self.logged_in = logged_in
pass
else:
self.set_token()
def get_raw(self, item_id, vault=None):
rc, out, err = self._cli.get_raw(item_id, vault, self.token)
return out
def get_field(self, item_id, field, section=None, vault=None):
output = self.get_raw(item_id, vault)
if output:
return self._cli._parse_field(output, field, section)
return ""
class LookupModule(LookupBase): class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs): def run(self, terms, variables=None, **kwargs):
op = OnePass() self.set_options(var_options=variables, direct=kwargs)
field = kwargs.get('field', 'password') field = self.get_option("field")
section = kwargs.get('section') section = self.get_option("section")
vault = kwargs.get('vault') vault = self.get_option("vault")
op.subdomain = kwargs.get('subdomain') subdomain = self.get_option("subdomain")
op.domain = kwargs.get('domain', '1password.com') domain = self.get_option("domain")
op.username = kwargs.get('username') username = self.get_option("username")
op.secret_key = kwargs.get('secret_key') secret_key = self.get_option("secret_key")
op.master_password = kwargs.get('master_password', kwargs.get('vault_password')) master_password = self.get_option("master_password")
op = OnePass(subdomain, domain, username, secret_key, master_password)
op.assert_logged_in() op.assert_logged_in()
values = [] values = []

View file

@ -47,7 +47,7 @@ DOCUMENTATION = '''
- This lookup stores potentially sensitive data from 1Password as Ansible facts. - This lookup stores potentially sensitive data from 1Password as Ansible facts.
Facts are subject to caching if enabled, which means this data could be stored in clear text Facts are subject to caching if enabled, which means this data could be stored in clear text
on disk or in a database. on disk or in a database.
- Tested with C(op) version 0.5.3 - Tested with C(op) version 2.7.0
''' '''
EXAMPLES = """ EXAMPLES = """
@ -76,18 +76,21 @@ from ansible.plugins.lookup import LookupBase
class LookupModule(LookupBase): class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs): def run(self, terms, variables=None, **kwargs):
op = OnePass() self.set_options(var_options=variables, direct=kwargs)
vault = kwargs.get('vault') vault = self.get_option("vault")
op.subdomain = kwargs.get('subdomain') subdomain = self.get_option("subdomain")
op.username = kwargs.get('username') domain = self.get_option("domain", "1password.com")
op.secret_key = kwargs.get('secret_key') username = self.get_option("username")
op.master_password = kwargs.get('master_password', kwargs.get('vault_password')) secret_key = self.get_option("secret_key")
master_password = self.get_option("master_password")
op = OnePass(subdomain, domain, username, secret_key, master_password)
op.assert_logged_in() op.assert_logged_in()
values = [] values = []
for term in terms: for term in terms:
data = json.loads(op.get_raw(term, vault)) data = json.loads(op.get_raw(term, vault))
values.append(data) values.append(data)
return values return values

View file

@ -0,0 +1,85 @@
# Copyright (c) 2022 Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from ansible_collections.community.general.plugins.lookup.onepassword import (
OnePassCLIv1,
OnePassCLIv2,
)
def load_file(file):
with open((os.path.join(os.path.dirname(__file__), "fixtures", file)), "r") as f:
return json.loads(f.read())
# Intentionally excludes metadata leaf nodes that would exist in real output if not relevant.
MOCK_ENTRIES = {
OnePassCLIv1: [
{
'vault_name': 'Acme "Quot\'d" Servers',
'queries': [
'0123456789',
'Mock "Quot\'d" Server'
],
'expected': ['t0pS3cret', 't0pS3cret'],
'output': load_file("v1_out_01.json"),
},
{
'vault_name': 'Acme Logins',
'queries': [
'9876543210',
'Mock Website',
'acme.com'
],
'expected': ['t0pS3cret', 't0pS3cret', 't0pS3cret'],
'output': load_file("v1_out_02.json"),
},
{
'vault_name': 'Acme Logins',
'queries': [
'864201357'
],
'expected': ['vauxhall'],
'output': load_file("v1_out_03.json"),
},
],
OnePassCLIv2: [
{
"vault_name": "Test Vault",
"queries": [
"ywvdbojsguzgrgnokmcxtydgdv",
"Authy Backup",
],
"expected": ["OctoberPoppyNuttyDraperySabbath", "OctoberPoppyNuttyDraperySabbath"],
"output": load_file("v2_out_01.json"),
},
{
# Request a custom field where ID and label are different
"vault_name": "Test Vault",
"queries": ["Dummy Login"],
"kwargs": {
"field": "password1",
},
"expected": ["data in custom field"],
"output": load_file("v2_out_02.json")
},
{
# Request data from a custom section
"vault_name": "Test Vault",
"queries": ["Duplicate Sections"],
"kwargs": {
"field": "s2 text",
"section": "Section 2",
},
"expected": ["first value"],
"output": load_file("v2_out_03.json")
},
],
}

View file

@ -0,0 +1,39 @@
# Copyright (c) 2020 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible_collections.community.general.plugins.lookup.onepassword import OnePass
OP_VERSION_FIXTURES = [
"opv1",
"opv2"
]
@pytest.fixture
def fake_op(mocker):
def _fake_op(version):
mocker.patch("ansible_collections.community.general.plugins.lookup.onepassword.OnePassCLIBase.get_current_version", return_value=version)
op = OnePass(None, None, None, None, None)
op._config._config_file_path = "/home/jin/.op/config"
mocker.patch.object(op._cli, "_run")
return op
return _fake_op
@pytest.fixture
def opv1(fake_op):
return fake_op("1.17.2")
@pytest.fixture
def opv2(fake_op):
return fake_op("2.27.2")

View file

@ -0,0 +1,18 @@
{
"uuid": "0123456789",
"vaultUuid": "2468",
"overview": {
"title": "Mock \"Quot'd\" Server"
},
"details": {
"sections": [{
"title": "",
"fields": [
{"t": "username", "v": "jamesbond"},
{"t": "password", "v": "t0pS3cret"},
{"t": "notes", "v": "Test note with\nmultiple lines and trailing space.\n\n"},
{"t": "tricksy \"quot'd\" field\\", "v": "\"quot'd\" value"}
]
}]
}
}

View file

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: 2022, Ansible Project

View file

@ -0,0 +1,18 @@
{
"uuid": "9876543210",
"vaultUuid": "1357",
"overview": {
"title": "Mock Website",
"URLs": [
{"l": "website", "u": "https://acme.com/login"}
]
},
"details": {
"sections": [{
"title": "",
"fields": [
{"t": "password", "v": "t0pS3cret"}
]
}]
}
}

View file

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: 2022, Ansible Project

View file

@ -0,0 +1,20 @@
{
"uuid": "864201357",
"vaultUuid": "1357",
"overview": {
"title": "Mock Something"
},
"details": {
"fields": [
{
"value": "jbond@mi6.gov.uk",
"name": "emailAddress"
},
{
"name": "password",
"value": "vauxhall"
},
{}
]
}
}

View file

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: 2022, Ansible Project

View file

@ -0,0 +1,35 @@
{
"id": "ywvdbojsguzgrgnokmcxtydgdv",
"title": "Authy Backup",
"version": 1,
"vault": {
"id": "bcqxysvcnejjrwzoqrwzcqjqxc",
"name": "test vault"
},
"category": "PASSWORD",
"last_edited_by": "7FUPZ8ZNE02KSHMAIMKHIVUE17",
"created_at": "2015-01-18T13:13:38Z",
"updated_at": "2016-02-20T16:23:54Z",
"additional_information": "Jan 18, 2015, 08:13:38",
"fields": [
{
"id": "password",
"type": "CONCEALED",
"purpose": "PASSWORD",
"label": "password",
"value": "OctoberPoppyNuttyDraperySabbath",
"reference": "op://Test Vault/Authy Backup/password",
"password_details": {
"strength": "FANTASTIC"
}
},
{
"id": "notesPlain",
"type": "STRING",
"purpose": "NOTES",
"label": "notesPlain",
"value": "Backup password to restore Authy",
"reference": "op://Test Vault/Authy Backup/notesPlain"
}
]
}

View file

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: 2022, Ansible Project

View file

@ -0,0 +1,85 @@
{
"id": "awk4s2u44fhnrgppszcsvc663i",
"title": "Dummy Login",
"version": 4,
"vault": {
"id": "stpebbaccrq72xulgouxsk4p7y",
"name": "Personal"
},
"category": "LOGIN",
"last_edited_by": "LSGPJERUYBH7BFPHMZ2KKGL6AU",
"created_at": "2018-04-25T21:55:19Z",
"updated_at": "2022-09-02T17:51:21Z",
"additional_information": "agent.smith",
"urls": [
{
"primary": true,
"href": "https://acme.com"
}
],
"sections": [
{
"id": "add more"
},
{
"id": "gafaeg7vnqmgrklw5r6yrufyxy",
"label": "COMMANDS"
},
{
"id": "linked items",
"label": "Related Items"
}
],
"fields": [
{
"id": "username",
"type": "STRING",
"purpose": "USERNAME",
"label": "username",
"value": "agent.smith",
"reference": "op://Personal/Dummy Login/username"
},
{
"id": "password",
"type": "CONCEALED",
"purpose": "PASSWORD",
"label": "password",
"value": "FootworkDegreeReverence",
"entropy": 159.60836791992188,
"reference": "op://Personal/Dummy Login/password",
"password_details": {
"entropy": 159,
"generated": true,
"strength": "FANTASTIC"
}
},
{
"id": "notesPlain",
"type": "STRING",
"purpose": "NOTES",
"label": "notesPlain",
"reference": "op://Personal/Dummy Login/notesPlain"
},
{
"id": "7gyjekelk24ghgd4rvafspjbli",
"section": {
"id": "add more"
},
"type": "STRING",
"label": "title",
"value": "value of the field",
"reference": "op://Personal/Dummy Login/add more/title"
},
{
"id": "fx4wpzokrxn7tlb3uwpdjfptgm",
"section": {
"id": "gafaeg7vnqmgrklw5r6yrufyxy",
"label": "COMMANDS"
},
"type": "CONCEALED",
"label": "password1",
"value": "data in custom field",
"reference": "op://Personal/Dummy Login/COMMANDS/password1"
}
]
}

View file

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: 2022, Ansible Project

View file

@ -0,0 +1,103 @@
{
"id": "7t7qu2r35qyvqj3crujd4dqxmy",
"title": "Duplicate Sections",
"version": 3,
"vault": {
"id": "stpebbaccrq72xulgouxsk4p7y",
"name": "Personal"
},
"category": "LOGIN",
"last_edited_by": "LSGPJERUYBH7BFPHMZ2KKGL6AU",
"created_at": "2022-11-04T17:09:18Z",
"updated_at": "2022-11-04T17:22:19Z",
"additional_information": "flora",
"urls": [
{
"label": "website",
"primary": true,
"href": "https://acme.com/login"
}
],
"sections": [
{
"id": "add more"
},
{
"id": "7osqcvd43i75teocdzbb6d7mie",
"label": "Section 2"
}
],
"fields": [
{
"id": "username",
"type": "STRING",
"purpose": "USERNAME",
"label": "username",
"value": "flora",
"reference": "op://Personal/Duplicate Sections/username"
},
{
"id": "password",
"type": "CONCEALED",
"purpose": "PASSWORD",
"label": "password",
"value": "PtZGFLAibx-erTo7ywywEvh-n4syas97n-tuF2D.b8DdqA2vCjrvRGkNQxj!Gi9R",
"entropy": 379.564697265625,
"reference": "op://Personal/Duplicate Sections/password",
"password_details": {
"entropy": 379,
"generated": true,
"strength": "FANTASTIC"
}
},
{
"id": "notesPlain",
"type": "STRING",
"purpose": "NOTES",
"label": "notesPlain",
"reference": "op://Personal/Duplicate Sections/notesPlain"
},
{
"id": "4saaazkb7arwisj6ysctb4jmm4",
"section": {
"id": "add more"
},
"type": "STRING",
"label": "text",
"value": "text field the first",
"reference": "op://Personal/Duplicate Sections/add more/text"
},
{
"id": "4vtfkj4bwcmg7d5uf62wnpkp3a",
"section": {
"id": "add more"
},
"type": "STRING",
"label": "text",
"value": "text field the second",
"reference": "op://Personal/Duplicate Sections/add more/text"
},
{
"id": "wbrjnowkrgavpooomtht36gjqu",
"section": {
"id": "7osqcvd43i75teocdzbb6d7mie",
"label": "Section 2"
},
"type": "STRING",
"label": "s2 text",
"value": "first value",
"reference": "op://Personal/Duplicate Sections/Section 2/s2 text"
},
{
"id": "bddlz2fj2pebmtfhksbmcexy7m",
"section": {
"id": "7osqcvd43i75teocdzbb6d7mie",
"label": "Section 2"
},
"type": "STRING",
"label": "s2 text",
"value": "second value",
"reference": "op://Personal/Duplicate Sections/Section 2/s2 text"
}
]
}

View file

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: 2022, Ansible Project

View file

@ -0,0 +1,182 @@
# Copyright (c) 2020 Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import itertools
import json
import pytest
from .conftest import OP_VERSION_FIXTURES
from .common import MOCK_ENTRIES
from ansible.errors import AnsibleLookupError
from ansible.plugins.loader import lookup_loader
from ansible_collections.community.general.plugins.lookup.onepassword import (
OnePassCLIv1,
OnePassCLIv2,
)
@pytest.mark.parametrize(
("version", "version_class"),
(
("1.17.2", OnePassCLIv1),
("2.27.4", OnePassCLIv2),
)
)
def test_op_correct_cli_class(fake_op, version, version_class):
op = fake_op(version)
assert op._cli.version == version
assert isinstance(op._cli, version_class)
def test_op_unsupported_cli_version(fake_op):
with pytest.raises(AnsibleLookupError, match="is unsupported"):
fake_op("99.77.77")
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_op_set_token_with_config(op_fixture, mocker, request):
op = request.getfixturevalue(op_fixture)
token = "F5417F77529B41B595D7F9D6F76EC057"
mocker.patch("os.path.isfile", return_value=True)
mocker.patch.object(op._cli, "signin", return_value=(0, token + "\n", ""))
op.set_token()
assert op.token == token
@pytest.mark.parametrize(
("op_fixture", "message"),
[
(op, value)
for op in OP_VERSION_FIXTURES
for value in
(
"Missing required parameters",
"The operation is unauthorized",
)
]
)
def test_op_set_token_with_config_missing_args(op_fixture, message, request, mocker):
op = request.getfixturevalue(op_fixture)
mocker.patch("os.path.isfile", return_value=True)
mocker.patch.object(op._cli, "signin", return_value=(99, "", ""), side_effect=AnsibleLookupError(message))
mocker.patch.object(op._cli, "full_signin", return_value=(0, "", ""))
with pytest.raises(AnsibleLookupError, match=message):
op.set_token()
op._cli.full_signin.assert_not_called()
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_op_set_token_with_config_full_signin(op_fixture, request, mocker):
op = request.getfixturevalue(op_fixture)
mocker.patch("os.path.isfile", return_value=True)
mocker.patch.object(op._cli, "signin", return_value=(99, "", ""), side_effect=AnsibleLookupError("Raised intentionally"))
mocker.patch.object(op._cli, "full_signin", return_value=(0, "", ""))
op.set_token()
op._cli.full_signin.assert_called()
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_op_set_token_without_config(op_fixture, request, mocker):
op = request.getfixturevalue(op_fixture)
token = "B988E8A2680A4A348962751A96861FA1"
mocker.patch("os.path.isfile", return_value=False)
mocker.patch.object(op._cli, "signin", return_value=(99, "", ""))
mocker.patch.object(op._cli, "full_signin", return_value=(0, token + "\n", ""))
op.set_token()
op._cli.signin.assert_not_called()
assert op.token == token
@pytest.mark.parametrize(
("op_fixture", "login_status"),
[(op, value) for op in OP_VERSION_FIXTURES for value in [False, True]]
)
def test_op_assert_logged_in(mocker, login_status, op_fixture, request):
op = request.getfixturevalue(op_fixture)
mocker.patch.object(op._cli, "assert_logged_in", return_value=login_status)
mocker.patch.object(op, "set_token")
op.assert_logged_in()
op._cli.assert_logged_in.assert_called_once()
assert op.logged_in == login_status
if not login_status:
op.set_token.assert_called_once()
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_op_get_raw_v1(mocker, op_fixture, request):
op = request.getfixturevalue(op_fixture)
mocker.patch.object(op._cli, "get_raw", return_value=[99, "RAW OUTPUT", ""])
result = op.get_raw("some item")
assert result == "RAW OUTPUT"
op._cli.get_raw.assert_called_once()
@pytest.mark.parametrize(
("op_fixture", "output", "expected"),
(
list(itertools.chain([op], d))
for op in OP_VERSION_FIXTURES
for d in [
("RAW OUTPUT", "RAW OUTPUT"),
(None, ""),
("", ""),
]
)
)
def test_op_get_field(mocker, op_fixture, output, expected, request):
op = request.getfixturevalue(op_fixture)
mocker.patch.object(op, "get_raw", return_value=output)
mocker.patch.object(op._cli, "_parse_field", return_value=output)
result = op.get_field("some item", "some field")
assert result == expected
# This test sometimes fails on older Python versions because the gathered tests mismatch.
# Sort the fixture data to make this reliable
# https://github.com/pytest-dev/pytest-xdist/issues/432
@pytest.mark.parametrize(
("cli_class", "vault", "queries", "kwargs", "output", "expected"),
(
(_cli_class, item["vault_name"], item["queries"], item.get("kwargs", {}), item["output"], item["expected"])
for _cli_class in MOCK_ENTRIES
for item in MOCK_ENTRIES[_cli_class]
)
)
def test_op_lookup(mocker, cli_class, vault, queries, kwargs, output, expected):
mocker.patch("ansible_collections.community.general.plugins.lookup.onepassword.OnePass._get_cli_class", cli_class)
mocker.patch("ansible_collections.community.general.plugins.lookup.onepassword.OnePass.assert_logged_in", return_value=True)
mocker.patch("ansible_collections.community.general.plugins.lookup.onepassword.OnePassCLIBase._run", return_value=(0, json.dumps(output), ""))
op_lookup = lookup_loader.get("community.general.onepassword")
result = op_lookup.run(queries, vault=vault, **kwargs)
assert result == expected
@pytest.mark.parametrize("op_fixture", OP_VERSION_FIXTURES)
def test_signin(op_fixture, request):
op = request.getfixturevalue(op_fixture)
op._cli.master_password = "master_pass"
op._cli.signin()
print(op._cli.version)
op._cli._run.assert_called_once_with(['signin', '--raw'], command_input=b"master_pass")

View file

@ -0,0 +1,50 @@
# Copyright (c) 2022 Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible_collections.community.general.plugins.lookup.onepassword import OnePassCLIv1
@pytest.mark.parametrize(
("args", "rc", "expected_call_args", "expected_call_kwargs", "expected"),
(
([], 0, ["get", "account"], {"ignore_errors": True}, True,),
([], 1, ["get", "account"], {"ignore_errors": True}, False,),
(["acme"], 1, ["get", "account", "--account", "acme.1password.com"], {"ignore_errors": True}, False,),
)
)
def test_assert_logged_in(mocker, args, rc, expected_call_args, expected_call_kwargs, expected):
mocker.patch.object(OnePassCLIv1, "_run", return_value=[rc, "", ""])
op_cli = OnePassCLIv1(*args)
result = op_cli.assert_logged_in()
op_cli._run.assert_called_with(expected_call_args, **expected_call_kwargs)
assert result == expected
def test_full_signin(mocker):
mocker.patch.object(OnePassCLIv1, "_run", return_value=[0, "", ""])
op_cli = OnePassCLIv1(
subdomain="acme",
username="bob@acme.com",
secret_key="SECRET",
master_password="ONEKEYTORULETHEMALL",
)
result = op_cli.full_signin()
op_cli._run.assert_called_with([
"signin",
"acme.1password.com",
b"bob@acme.com",
b"SECRET",
"--raw",
], command_input=b"ONEKEYTORULETHEMALL")
assert result == [0, "", ""]

View file

@ -0,0 +1,52 @@
# Copyright (c) 2022 Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible_collections.community.general.plugins.lookup.onepassword import OnePassCLIv2
@pytest.mark.parametrize(
("args", "out", "expected_call_args", "expected"),
(
([], "list of accounts", ["account", "get"], True,),
(["acme"], "list of accounts", ["account", "get", "--account", "acme.1password.com"], True,),
([], "", ["account", "list"], False,),
)
)
def test_assert_logged_in(mocker, args, out, expected_call_args, expected):
mocker.patch.object(OnePassCLIv2, "_run", return_value=[0, out, ""])
op_cli = OnePassCLIv2(*args)
result = op_cli.assert_logged_in()
op_cli._run.assert_called_with(expected_call_args)
assert result == expected
def test_full_signin(mocker):
mocker.patch.object(OnePassCLIv2, "_run", return_value=[0, "", ""])
op_cli = OnePassCLIv2(
subdomain="acme",
username="bob@acme.com",
secret_key="SECRET",
master_password="ONEKEYTORULETHEMALL",
)
result = op_cli.full_signin()
op_cli._run.assert_called_with(
[
"account", "add", "--raw",
"--address", "acme.1password.com",
"--email", b"bob@acme.com",
"--signin",
],
command_input=b"ONEKEYTORULETHEMALL",
environment_update={'OP_SECRET_KEY': 'SECRET'},
)
assert result == [0, "", ""]

View file

@ -1,322 +0,0 @@
# Copyright (c) 2018, Scott Buchanan <sbuchanan@ri.pn>
# Copyright (c) 2016, Andrew Zenk <azenk@umn.edu> (test_lastpass.py used as starting point)
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import datetime
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from argparse import ArgumentParser
from ansible_collections.community.general.tests.unit.compat import unittest
from ansible_collections.community.general.tests.unit.compat.mock import patch
from ansible.errors import AnsibleError
from ansible_collections.community.general.plugins.lookup.onepassword import OnePass, LookupModule
from ansible_collections.community.general.plugins.lookup.onepassword_raw import LookupModule as OnePasswordRawLookup
# Intentionally excludes metadata leaf nodes that would exist in real output if not relevant.
MOCK_ENTRIES = [
{
'vault_name': 'Acme "Quot\'d" Servers',
'queries': [
'0123456789',
'Mock "Quot\'d" Server'
],
'output': {
'uuid': '0123456789',
'vaultUuid': '2468',
'overview': {
'title': 'Mock "Quot\'d" Server'
},
'details': {
'sections': [{
'title': '',
'fields': [
{'t': 'username', 'v': 'jamesbond'},
{'t': 'password', 'v': 't0pS3cret'},
{'t': 'notes', 'v': 'Test note with\nmultiple lines and trailing space.\n\n'},
{'t': 'tricksy "quot\'d" field\\', 'v': '"quot\'d" value'}
]
}]
}
}
},
{
'vault_name': 'Acme Logins',
'queries': [
'9876543210',
'Mock Website',
'acme.com'
],
'output': {
'uuid': '9876543210',
'vaultUuid': '1357',
'overview': {
'title': 'Mock Website',
'URLs': [
{'l': 'website', 'u': 'https://acme.com/login'}
]
},
'details': {
'sections': [{
'title': '',
'fields': [
{'t': 'password', 'v': 't0pS3cret'}
]
}]
}
}
},
{
'vault_name': 'Acme Logins',
'queries': [
'864201357'
],
'output': {
'uuid': '864201357',
'vaultUuid': '1357',
'overview': {
'title': 'Mock Something'
},
'details': {
'fields': [
{
'value': 'jbond@mi6.gov.uk',
'name': 'emailAddress'
},
{
'name': 'password',
'value': 'vauxhall'
},
{},
]
}
}
},
]
def get_mock_query_generator(require_field=None):
def _process_field(field, section_title=None):
field_name = field.get('name', field.get('t', ''))
field_value = field.get('value', field.get('v', ''))
if require_field is None or field_name == require_field:
return entry, query, section_title, field_name, field_value
for entry in MOCK_ENTRIES:
for query in entry['queries']:
for field in entry['output']['details'].get('fields', []):
fixture = _process_field(field)
if fixture:
yield fixture
for section in entry['output']['details'].get('sections', []):
for field in section['fields']:
fixture = _process_field(field, section['title'])
if fixture:
yield fixture
def get_one_mock_query(require_field=None):
generator = get_mock_query_generator(require_field)
return next(generator)
class MockOnePass(OnePass):
_mock_logged_out = False
_mock_timed_out = False
def _lookup_mock_entry(self, key, vault=None):
for entry in MOCK_ENTRIES:
if vault is not None and vault.lower() != entry['vault_name'].lower() and vault.lower() != entry['output']['vaultUuid'].lower():
continue
match_fields = [
entry['output']['uuid'],
entry['output']['overview']['title']
]
# Note that exactly how 1Password matches on domains in non-trivial cases is neither documented
# nor obvious, so this may not precisely match the real behavior.
urls = entry['output']['overview'].get('URLs')
if urls is not None:
match_fields += [urlparse(url['u']).netloc for url in urls]
if key in match_fields:
return entry['output']
def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
parser = ArgumentParser()
command_parser = parser.add_subparsers(dest='command')
get_parser = command_parser.add_parser('get')
get_options = ArgumentParser(add_help=False)
get_options.add_argument('--vault')
get_type_parser = get_parser.add_subparsers(dest='object_type')
get_type_parser.add_parser('account', parents=[get_options])
get_item_parser = get_type_parser.add_parser('item', parents=[get_options])
get_item_parser.add_argument('item_id')
args = parser.parse_args(args)
def mock_exit(output='', error='', rc=0):
if rc != expected_rc:
raise AnsibleError(error)
if error != '':
now = datetime.date.today()
error = '[LOG] {0} (ERROR) {1}'.format(now.strftime('%Y/%m/%d %H:$M:$S'), error)
return rc, output, error
if args.command == 'get':
if self._mock_logged_out:
return mock_exit(error='You are not currently signed in. Please run `op signin --help` for instructions', rc=1)
if self._mock_timed_out:
return mock_exit(error='401: Authentication required.', rc=1)
if args.object_type == 'item':
mock_entry = self._lookup_mock_entry(args.item_id, args.vault)
if mock_entry is None:
return mock_exit(error='Item {0} not found'.format(args.item_id))
return mock_exit(output=json.dumps(mock_entry))
if args.object_type == 'account':
# Since we don't actually ever use this output, don't bother mocking output.
return mock_exit()
raise AnsibleError('Unsupported command string passed to OnePass mock: {0}'.format(args))
class LoggedOutMockOnePass(MockOnePass):
_mock_logged_out = True
class TimedOutMockOnePass(MockOnePass):
_mock_timed_out = True
class TestOnePass(unittest.TestCase):
def test_onepassword_cli_path(self):
op = MockOnePass(path='/dev/null')
self.assertEqual('/dev/null', op.cli_path)
def test_onepassword_logged_in(self):
op = MockOnePass()
try:
op.assert_logged_in()
except Exception:
self.fail()
def test_onepassword_logged_out(self):
op = LoggedOutMockOnePass()
with self.assertRaises(AnsibleError):
op.assert_logged_in()
def test_onepassword_timed_out(self):
op = TimedOutMockOnePass()
with self.assertRaises(AnsibleError):
op.assert_logged_in()
def test_onepassword_get(self):
op = MockOnePass()
op.logged_in = True
query_generator = get_mock_query_generator()
for dummy, query, dummy, field_name, field_value in query_generator:
self.assertEqual(field_value, op.get_field(query, field_name))
def test_onepassword_get_raw(self):
op = MockOnePass()
op.logged_in = True
for entry in MOCK_ENTRIES:
for query in entry['queries']:
self.assertEqual(json.dumps(entry['output']), op.get_raw(query))
def test_onepassword_get_not_found(self):
op = MockOnePass()
op.logged_in = True
self.assertEqual('', op.get_field('a fake query', 'a fake field'))
def test_onepassword_get_with_section(self):
op = MockOnePass()
op.logged_in = True
dummy, query, section_title, field_name, field_value = get_one_mock_query()
self.assertEqual(field_value, op.get_field(query, field_name, section=section_title))
def test_onepassword_get_with_vault(self):
op = MockOnePass()
op.logged_in = True
entry, query, dummy, field_name, field_value = get_one_mock_query()
for vault_query in [entry['vault_name'], entry['output']['vaultUuid']]:
self.assertEqual(field_value, op.get_field(query, field_name, vault=vault_query))
def test_onepassword_get_with_wrong_vault(self):
op = MockOnePass()
op.logged_in = True
dummy, query, dummy, field_name, dummy = get_one_mock_query()
self.assertEqual('', op.get_field(query, field_name, vault='a fake vault'))
def test_onepassword_get_diff_case(self):
op = MockOnePass()
op.logged_in = True
entry, query, section_title, field_name, field_value = get_one_mock_query()
self.assertEqual(
field_value,
op.get_field(
query,
field_name.upper(),
vault=entry['vault_name'].upper(),
section=section_title.upper()
)
)
@patch('ansible_collections.community.general.plugins.lookup.onepassword.OnePass', MockOnePass)
class TestLookupModule(unittest.TestCase):
def test_onepassword_plugin_multiple(self):
lookup_plugin = LookupModule()
entry = MOCK_ENTRIES[0]
field = entry['output']['details']['sections'][0]['fields'][0]
self.assertEqual(
[field['v']] * len(entry['queries']),
lookup_plugin.run(entry['queries'], field=field['t'])
)
def test_onepassword_plugin_default_field(self):
lookup_plugin = LookupModule()
dummy, query, dummy, dummy, field_value = get_one_mock_query('password')
self.assertEqual([field_value], lookup_plugin.run([query]))
@patch('ansible_collections.community.general.plugins.lookup.onepassword_raw.OnePass', MockOnePass)
class TestOnePasswordRawLookup(unittest.TestCase):
def test_onepassword_raw_plugin_multiple(self):
raw_lookup_plugin = OnePasswordRawLookup()
entry = MOCK_ENTRIES[0]
raw_value = entry['output']
self.assertEqual(
[raw_value] * len(entry['queries']),
raw_lookup_plugin.run(entry['queries'])
)

View file

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import pytest
from ansible_collections.community.general.plugins.module_utils.onepassword import OnePasswordConfig
@pytest.fixture
def os_expanduser(mocker):
def _os_expanduser(path):
return path.replace("~", "/home/testuser")
mocker.patch("os.path.expanduser", side_effect=_os_expanduser)
@pytest.fixture
def exists(mocker):
def _exists(path):
if "op/" in path:
return True
return os.path.exists(path)
def test_op_config(mocker, os_expanduser):
mocker.patch("os.path.exists", side_effect=[False, True])
op_config = OnePasswordConfig()
assert "/home/testuser/.config/op/config" == op_config.config_file_path
def test_op_no_config(mocker, os_expanduser):
mocker.patch("os.path.exists", return_value=False)
op_config = OnePasswordConfig()
assert op_config.config_file_path is None