Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pulling username from keyring subprocess provider #12748

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions news/12543.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for pulling username from keyring subprocess provider
24 changes: 13 additions & 11 deletions src/pip/_internal/network/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
providing credentials in the context of network requests.
"""

import json
import logging
import os
import shutil
Expand Down Expand Up @@ -115,23 +116,22 @@ def __init__(self, cmd: str) -> None:
self.keyring = cmd

def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
# This is the default implementation of keyring.get_credential
# https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
if username is not None:
password = self._get_password(url, username)
if password is not None:
return username, password
return None
return self._get_creds(url, username)

def save_auth_info(self, url: str, username: str, password: str) -> None:
return self._set_password(url, username, password)

def _get_password(self, service_name: str, username: str) -> Optional[str]:
"""Mirror the implementation of keyring.get_password using cli"""
def _get_creds(
self, service_name: str, username: Optional[str]
) -> Optional[AuthInfo]:
"""Mirror the implementation of keyring.get_credential using cli"""
if self.keyring is None:
return None

cmd = [self.keyring, "get", service_name, username]
cmd = [self.keyring, "--mode=creds", "--output=json", "get", service_name]
if username is not None:
cmd.append(username)

env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
res = subprocess.run(
Expand All @@ -142,7 +142,9 @@ def _get_password(self, service_name: str, username: str) -> Optional[str]:
)
if res.returncode:
return None
return res.stdout.decode("utf-8").strip(os.linesep)

data = json.loads(res.stdout.decode("utf-8"))
return (data["username"], data["password"])

def _set_password(self, service_name: str, username: str, password: str) -> None:
"""Mirror the implementation of keyring.set_password using cli"""
Expand Down
9 changes: 2 additions & 7 deletions tests/functional/test_install_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,7 @@ def set_password(self, url, username):
"simple",
)

function_name = (
"get_credential"
if keyring_provider_implementation == "import"
else "get_password"
)
if auth_needed:
assert function_name + " was called" in result.stderr
assert "get_credential was called" in result.stderr
else:
assert function_name + " was called" not in result.stderr
assert "get_credential was called" not in result.stderr
237 changes: 182 additions & 55 deletions tests/unit/test_network_auth.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import contextlib
import functools
import json
import os
import subprocess
import sys
from typing import Any, Dict, Iterable, List, Optional, Tuple
from dataclasses import dataclass
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple

import pytest

Expand Down Expand Up @@ -327,42 +330,96 @@ def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse:
class KeyringModuleV2:
"""Represents the current supported API of keyring"""

def __init__(self) -> None:
self.saved_credential_by_username_by_system: dict[
str, dict[str, KeyringModuleV2.Credential]
] = {}

@dataclass
class Credential:
def __init__(self, username: str, password: str) -> None:
self.username = username
self.password = password
username: str
password: str

def get_password(self, system: str, username: str) -> None:
pytest.fail("get_password should not ever be called")

def get_credential(self, system: str, username: str) -> Optional[Credential]:
if system == "http://example.com/path2/":
return self.Credential("username", "url")
if system == "example.com":
return self.Credential("username", "netloc")
return None
def get_credential(
self, system: str, username: Optional[str]
) -> Optional[Credential]:
credential_by_username = self.saved_credential_by_username_by_system.get(
system, {}
)
if username is None:
# Just return the first cred we can find (if
# there even are any for this service).
credentials = list(credential_by_username.values())
if len(credentials) == 0:
return None

# Just pick the first one we can find.
credential = credentials[0]
return credential

return credential_by_username.get(username)

def set_password(self, system: str, username: str, password: str) -> None:
if system not in self.saved_credential_by_username_by_system:
self.saved_credential_by_username_by_system[system] = {}

credential_by_username = self.saved_credential_by_username_by_system[system]
assert username not in credential_by_username
credential_by_username[username] = self.Credential(username, password)

def delete_password(self, system: str, username: str) -> None:
del self.saved_credential_by_username_by_system[system][username]

@contextlib.contextmanager
def add_credential(
self, system: str, username: str, password: str
) -> Generator[None, None, None]:
"""
Context manager that adds the given credential to the keyring
and yields. Once the yield is done, the credential is removed
from the keyring.

This is re-entrant safe: it's ok for one thread to call this while in
the middle of an existing invocation

This is probably not thread safe: it's not ok for multiple threads to
simultaneously call this method on the exact same instance of KeyringModuleV2.
"""
self.set_password(system, username, password)
try:
yield
finally:
# No matter what happened, make sure we clean up after ourselves.
self.delete_password(system, username)


@pytest.mark.parametrize(
"url, expect",
[
("http://example.com/path1", ("username", "netloc")),
("http://example.com/path2/path3", ("username", "url")),
("http://user2@example.com/path2/path3", ("username", "url")),
("http://example.com/path1", ("username", "hunter2")),
("http://example.com/path2/path3", ("username", "hunter3")),
("http://user2@example.com/path2/path3", ("user2", None)),
],
)
def test_keyring_get_credential(
monkeypatch: pytest.MonkeyPatch, url: str, expect: Tuple[str, str]
) -> None:
monkeypatch.setitem(sys.modules, "keyring", KeyringModuleV2())
keyring = KeyringModuleV2()
monkeypatch.setitem(sys.modules, "keyring", keyring)
auth = MultiDomainBasicAuth(
index_urls=["http://example.com/path1", "http://example.com/path2"],
keyring_provider="import",
)

assert (
auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) == expect
)
with keyring.add_credential("example.com", "username", "hunter2"):
with keyring.add_credential("http://example.com/path2/", "username", "hunter3"):
assert (
auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
== expect
)


class KeyringModuleBroken:
Expand Down Expand Up @@ -393,7 +450,7 @@ def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> Non
assert keyring_broken._call_count == 1


class KeyringSubprocessResult(KeyringModuleV1):
class KeyringSubprocessResult(KeyringModuleV2):
"""Represents the subprocess call to keyring"""

returncode = 0 # Default to zero retcode
Expand All @@ -408,33 +465,85 @@ def __call__(
input: Optional[bytes] = None,
check: Optional[bool] = None,
) -> Any:
if cmd[1] == "get":
assert stdin == -3 # subprocess.DEVNULL
assert stdout == subprocess.PIPE
assert env["PYTHONIOENCODING"] == "utf-8"
assert check is None

password = self.get_password(*cmd[2:])
if password is None:
# Expect non-zero returncode if no password present
self.returncode = 1
else:
# Passwords are returned encoded with a newline appended
self.returncode = 0
self.stdout = (password + os.linesep).encode("utf-8")

if cmd[1] == "set":
assert stdin is None
assert stdout is None
assert env["PYTHONIOENCODING"] == "utf-8"
assert input is not None
assert check

# Input from stdin is encoded
self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip(os.linesep))
parsed_cmd = list(cmd)
assert parsed_cmd.pop(0) == "keyring"
subcommand = [
arg
for arg in parsed_cmd
# Skip past all the --whatever options until we get to the subcommand.
if not arg.startswith("--")
][0]
subcommand_func = {
"get": self._get_subcommand,
"set": self._set_subcommand,
}[subcommand]

subcommand_func(
parsed_cmd,
env=env,
stdin=stdin,
stdout=stdout,
input=input,
check=check,
)

return self

def _get_subcommand(
self,
cmd: List[str],
*,
env: Dict[str, str],
stdin: Optional[Any] = None,
stdout: Optional[Any] = None,
input: Optional[bytes] = None,
check: Optional[bool] = None,
) -> None:
assert cmd.pop(0) == "--mode=creds"
assert cmd.pop(0) == "--output=json"
assert stdin == -3 # subprocess.DEVNULL
assert stdout == subprocess.PIPE
assert env["PYTHONIOENCODING"] == "utf-8"
assert check is None
assert cmd.pop(0) == "get"

service = cmd.pop(0)
username = cmd.pop(0) if len(cmd) > 0 else None
creds = self.get_credential(service, username)
if creds is None:
# Expect non-zero returncode if no creds present
self.returncode = 1
else:
# Passwords are returned encoded with a newline appended
self.returncode = 0
self.stdout = json.dumps(
{
"username": creds.username,
"password": creds.password,
}
).encode("utf-8")

def _set_subcommand(
self,
cmd: List[str],
*,
env: Dict[str, str],
stdin: Optional[Any] = None,
stdout: Optional[Any] = None,
input: Optional[bytes] = None,
check: Optional[bool] = None,
) -> None:
assert cmd.pop(0) == "set"
assert stdin is None
assert stdout is None
assert env["PYTHONIOENCODING"] == "utf-8"
assert input is not None
assert check

# Input from stdin is encoded
system, username = cmd
self.set_password(system, username, input.decode("utf-8").strip(os.linesep))

def check_returncode(self) -> None:
if self.returncode:
raise Exception()
Expand All @@ -443,31 +552,42 @@ def check_returncode(self) -> None:
@pytest.mark.parametrize(
"url, expect",
[
("http://example.com/path1", (None, None)),
# path1 URLs will be resolved by netloc
("http://user@example.com/path3", ("user", "user!netloc")),
("http://user2@example.com/path3", ("user2", "user2!netloc")),
# path2 URLs will be resolved by index URL
("http://example.com/path2/path3", (None, None)),
("http://foo@example.com/path2/path3", ("foo", "foo!url")),
# It's not obvious, but this url ultimately resolves to index url
# http://example.com/path2, so we get the creds for that index.
("http://example.com/path1", ("saved-user1", "pw1")),
("http://saved-user1@example.com/path2", ("saved-user1", "pw1")),
("http://saved-user2@example.com/path2", ("saved-user2", "pw2")),
("http://new-user@example.com/path2", ("new-user", None)),
("http://example.com/path2/path3", ("saved-user1", "pw1")),
("http://foo@example.com/path2/path3", ("foo", None)),
],
)
def test_keyring_cli_get_password(
monkeypatch: pytest.MonkeyPatch,
url: str,
expect: Tuple[Optional[str], Optional[str]],
) -> None:
keyring_subprocess = KeyringSubprocessResult()
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
monkeypatch.setattr(
pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult()
pip._internal.network.auth.subprocess, "run", keyring_subprocess
)
auth = MultiDomainBasicAuth(
index_urls=["http://example.com/path2", "http://example.com/path3"],
keyring_provider="subprocess",
)

actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
assert actual == expect
with keyring_subprocess.add_credential("example.com", "example", "!netloc"):
with keyring_subprocess.add_credential(
"http://example.com/path2/", "saved-user1", "pw1"
):
with keyring_subprocess.add_credential(
"http://example.com/path2/", "saved-user2", "pw2"
):
Comment on lines +580 to +586
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Urg, so this got ugly. I was hoping to use this nicer syntax for opening multiple context managers at once, but apparently it's not supported on Python 3.8:

Suggested change
with keyring_subprocess.add_credential("example.com", "example", "!netloc"):
with keyring_subprocess.add_credential(
"http://example.com/path2/", "saved-user1", "pw1"
):
with keyring_subprocess.add_credential(
"http://example.com/path2/", "saved-user2", "pw2"
):
with (
keyring_subprocess.add_credential("example.com", "example", "!netloc"),
keyring_subprocess.add_credential("http://example.com/path2/", "saved-user1", "pw1"),
keyring_subprocess.add_credential("http://example.com/path2/", "saved-user2", "pw2"),
):

I could rework this add_credential method to instead take an array of triplets or something, but IMO that makes the api kind of messy.

I'm inclined to leave this the way it is (and clean it up when we drop support for versions of python that don't support the above syntax), but I'm happy to approach this however folks prefer.

actual = auth._get_new_credentials(
url, allow_netrc=False, allow_keyring=True
)
assert actual == expect


@pytest.mark.parametrize(
Expand All @@ -492,13 +612,14 @@ def test_keyring_cli_set_password(
creds: Tuple[str, str, bool],
expect_save: bool,
) -> None:
expected_username, expected_password, save = creds
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
keyring = KeyringSubprocessResult()
monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring)
auth = MultiDomainBasicAuth(prompting=True, keyring_provider="subprocess")
monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None))
monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds)
if creds[2]:
if save:
# when _prompt_for_password indicates to save, we should save
def should_save_password_to_keyring(*a: Any) -> bool:
return True
Expand Down Expand Up @@ -535,6 +656,12 @@ def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse:
auth.handle_401(resp)

if expect_save:
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
assert keyring.saved_credential_by_username_by_system == {
"example.com": {
expected_username: KeyringModuleV2.Credential(
expected_username, expected_password
),
},
}
else:
assert keyring.saved_passwords == []
assert keyring.saved_credential_by_username_by_system == {}
Loading