diff --git a/news/12543.feature.rst b/news/12543.feature.rst new file mode 100644 index 00000000000..2c2be2d8682 --- /dev/null +++ b/news/12543.feature.rst @@ -0,0 +1 @@ +Add support for pulling username from keyring subprocess provider diff --git a/src/pip/_internal/network/auth.py b/src/pip/_internal/network/auth.py index 1a2606ed080..0a0f21d28ea 100644 --- a/src/pip/_internal/network/auth.py +++ b/src/pip/_internal/network/auth.py @@ -4,6 +4,7 @@ providing credentials in the context of network requests. """ +import json import logging import os import shutil @@ -115,23 +116,22 @@ def __init__(self, cmd: str) -> None: self.keyring = cmd def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: - # This is the default implementation of keyring.get_credential - # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139 - if username is not None: - password = self._get_password(url, username) - if password is not None: - return username, password - return None + return self._get_creds(url, username) def save_auth_info(self, url: str, username: str, password: str) -> None: return self._set_password(url, username, password) - def _get_password(self, service_name: str, username: str) -> Optional[str]: - """Mirror the implementation of keyring.get_password using cli""" + def _get_creds( + self, service_name: str, username: Optional[str] + ) -> Optional[AuthInfo]: + """Mirror the implementation of keyring.get_credential using cli""" if self.keyring is None: return None - cmd = [self.keyring, "get", service_name, username] + cmd = [self.keyring, "--mode=creds", "--output=json", "get", service_name] + if username is not None: + cmd.append(username) + env = os.environ.copy() env["PYTHONIOENCODING"] = "utf-8" res = subprocess.run( @@ -142,7 +142,9 @@ def _get_password(self, service_name: str, username: str) -> Optional[str]: ) if res.returncode: return None - return res.stdout.decode("utf-8").strip(os.linesep) + + data = json.loads(res.stdout.decode("utf-8")) + return (data["username"], data["password"]) def _set_password(self, service_name: str, username: str, password: str) -> None: """Mirror the implementation of keyring.set_password using cli""" diff --git a/tests/functional/test_install_config.py b/tests/functional/test_install_config.py index d111bc5f7bc..7837c4ef920 100644 --- a/tests/functional/test_install_config.py +++ b/tests/functional/test_install_config.py @@ -394,7 +394,6 @@ def test_prompt_for_keyring_if_needed( cert_factory: CertFactory, auth_needed: bool, flags: List[str], - keyring_provider: str, keyring_provider_implementation: str, tmpdir: Path, script_factory: ScriptFactory, @@ -403,10 +402,12 @@ def test_prompt_for_keyring_if_needed( """Test behaviour while installing from an index url requiring authentication and keyring is possible. """ - environ = os.environ.copy() workspace = tmpdir.joinpath("workspace") + virtualenv = virtualenv_factory(workspace.joinpath("venv")) + if keyring_provider_implementation == "subprocess": + # Install keyring into its own venv. keyring_virtualenv = virtualenv_factory(workspace.joinpath("keyring")) keyring_script = script_factory( workspace.joinpath("keyring"), keyring_virtualenv @@ -416,22 +417,32 @@ def test_prompt_for_keyring_if_needed( "keyring", ) - environ["PATH"] = str(keyring_script.bin_path) + os.pathsep + environ["PATH"] - - virtualenv = virtualenv_factory(workspace.joinpath("venv")) - script = script_factory(workspace.joinpath("venv"), virtualenv, environ=environ) - - if ( - keyring_provider not in [None, "auto"] - or keyring_provider_implementation != "subprocess" - ): - script.pip( + # Set up this venv with a PATH that can see the keyring installed in a + # separate venv. + virtualenv_script = script_factory( + workspace.joinpath("venv"), + virtualenv, + environ={ + **os.environ, + "PATH": str(keyring_script.bin_path) + os.pathsep + os.environ["PATH"], + }, + ) + elif keyring_provider_implementation == "import": + # Set up a venv with keyring installed. + virtualenv_script = script_factory(workspace.joinpath("venv"), virtualenv) + virtualenv_script.pip( "install", "keyring", ) - - if keyring_provider_implementation != "subprocess": - keyring_script = script + keyring_script = virtualenv_script + elif keyring_provider_implementation == "disabled": + # Set up an venv that does not have keyring installed, nor is able to + # find keyring anywhere on the PATH. + virtualenv_script = script_factory(workspace.joinpath("venv"), virtualenv) + keyring_script = None + pass + else: + pytest.fail(f"Unrecognized {keyring_provider_implementation=}") cert_path = cert_factory() ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) @@ -454,42 +465,42 @@ def test_prompt_for_keyring_if_needed( url = f"https://USERNAME@{server.host}:{server.port}/simple" - keyring_content = textwrap.dedent( - """\ - import os - import sys - import keyring - from keyring.backend import KeyringBackend - from keyring.credentials import SimpleCredential + if keyring_script is not None: + keyring_content = textwrap.dedent( + """\ + import os + import sys + import keyring + from keyring.backend import KeyringBackend + from keyring.credentials import SimpleCredential - class TestBackend(KeyringBackend): - priority = 1 + class TestBackend(KeyringBackend): + priority = 1 - def get_credential(self, url, username): - sys.stderr.write("get_credential was called" + os.linesep) - return SimpleCredential(username="USERNAME", password="PASSWORD") + def get_credential(self, url, username): + sys.stderr.write("get_credential was called" + os.linesep) + return SimpleCredential(username="USERNAME", password="PASSWORD") - def get_password(self, url, username): - sys.stderr.write("get_password was called" + os.linesep) - return "PASSWORD" + def get_password(self, url, username): + assert False, "get_password should not ever be called" - def set_password(self, url, username): - pass - """ - ) - keyring_path = keyring_script.site_packages_path / "keyring_test.py" - keyring_path.write_text(keyring_content) + def set_password(self, url, username): + pass + """ + ) + keyring_path = keyring_script.site_packages_path / "keyring_test.py" + keyring_path.write_text(keyring_content) - keyring_content = ( - "import keyring_test;" - " import keyring;" - " keyring.set_keyring(keyring_test.TestBackend())" + os.linesep - ) - keyring_path = keyring_path.with_suffix(".pth") - keyring_path.write_text(keyring_content) + keyring_content = ( + "import keyring_test;" + " import keyring;" + " keyring.set_keyring(keyring_test.TestBackend())" + os.linesep + ) + keyring_path = keyring_path.with_suffix(".pth") + keyring_path.write_text(keyring_content) with server_running(server): - result = script.pip( + result = virtualenv_script.pip( "install", "--index-url", url, @@ -501,12 +512,7 @@ def set_password(self, url, username): "simple", ) - function_name = ( - "get_credential" - if keyring_provider_implementation == "import" - else "get_password" - ) if auth_needed: - assert function_name + " was called" in result.stderr + assert "get_credential was called" in result.stderr else: - assert function_name + " was called" not in result.stderr + assert "get_credential was called" not in result.stderr diff --git a/tests/unit/test_network_auth.py b/tests/unit/test_network_auth.py index 86f01e436c0..80fd9e696c7 100644 --- a/tests/unit/test_network_auth.py +++ b/tests/unit/test_network_auth.py @@ -1,8 +1,11 @@ +import contextlib import functools +import json import os import subprocess import sys -from typing import Any, Dict, Iterable, List, Optional, Tuple +from dataclasses import dataclass +from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple import pytest @@ -327,42 +330,96 @@ def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse: class KeyringModuleV2: """Represents the current supported API of keyring""" + def __init__(self) -> None: + self.saved_credential_by_username_by_system: dict[ + str, dict[str, KeyringModuleV2.Credential] + ] = {} + + @dataclass class Credential: - def __init__(self, username: str, password: str) -> None: - self.username = username - self.password = password + username: str + password: str def get_password(self, system: str, username: str) -> None: pytest.fail("get_password should not ever be called") - def get_credential(self, system: str, username: str) -> Optional[Credential]: - if system == "http://example.com/path2/": - return self.Credential("username", "url") - if system == "example.com": - return self.Credential("username", "netloc") - return None + def get_credential( + self, system: str, username: Optional[str] + ) -> Optional[Credential]: + credential_by_username = self.saved_credential_by_username_by_system.get( + system, {} + ) + if username is None: + # Just return the first cred we can find (if + # there even are any for this service). + credentials = list(credential_by_username.values()) + if len(credentials) == 0: + return None + + # Just pick the first one we can find. + credential = credentials[0] + return credential + + return credential_by_username.get(username) + + def set_password(self, system: str, username: str, password: str) -> None: + if system not in self.saved_credential_by_username_by_system: + self.saved_credential_by_username_by_system[system] = {} + + credential_by_username = self.saved_credential_by_username_by_system[system] + assert username not in credential_by_username + credential_by_username[username] = self.Credential(username, password) + + def delete_password(self, system: str, username: str) -> None: + del self.saved_credential_by_username_by_system[system][username] + + @contextlib.contextmanager + def add_credential( + self, system: str, username: str, password: str + ) -> Generator[None, None, None]: + """ + Context manager that adds the given credential to the keyring + and yields. Once the yield is done, the credential is removed + from the keyring. + + This is re-entrant safe: it's ok for one thread to call this while in + the middle of an existing invocation + + This is probably not thread safe: it's not ok for multiple threads to + simultaneously call this method on the exact same instance of KeyringModuleV2. + """ + self.set_password(system, username, password) + try: + yield + finally: + # No matter what happened, make sure we clean up after ourselves. + self.delete_password(system, username) @pytest.mark.parametrize( "url, expect", [ - ("http://example.com/path1", ("username", "netloc")), - ("http://example.com/path2/path3", ("username", "url")), - ("http://user2@example.com/path2/path3", ("username", "url")), + ("http://example.com/path1", ("username", "hunter2")), + ("http://example.com/path2/path3", ("username", "hunter3")), + ("http://user2@example.com/path2/path3", ("user2", None)), ], ) def test_keyring_get_credential( monkeypatch: pytest.MonkeyPatch, url: str, expect: Tuple[str, str] ) -> None: - monkeypatch.setitem(sys.modules, "keyring", KeyringModuleV2()) + keyring = KeyringModuleV2() + monkeypatch.setitem(sys.modules, "keyring", keyring) auth = MultiDomainBasicAuth( index_urls=["http://example.com/path1", "http://example.com/path2"], keyring_provider="import", ) - assert ( - auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) == expect - ) + with keyring.add_credential("example.com", "username", "hunter2"): + with keyring.add_credential("http://example.com/path2/", "username", "hunter3"): + assert ( + auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) + == expect + ) class KeyringModuleBroken: @@ -393,7 +450,7 @@ def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> Non assert keyring_broken._call_count == 1 -class KeyringSubprocessResult(KeyringModuleV1): +class KeyringSubprocessResult(KeyringModuleV2): """Represents the subprocess call to keyring""" returncode = 0 # Default to zero retcode @@ -408,33 +465,85 @@ def __call__( input: Optional[bytes] = None, check: Optional[bool] = None, ) -> Any: - if cmd[1] == "get": - assert stdin == -3 # subprocess.DEVNULL - assert stdout == subprocess.PIPE - assert env["PYTHONIOENCODING"] == "utf-8" - assert check is None - - password = self.get_password(*cmd[2:]) - if password is None: - # Expect non-zero returncode if no password present - self.returncode = 1 - else: - # Passwords are returned encoded with a newline appended - self.returncode = 0 - self.stdout = (password + os.linesep).encode("utf-8") - - if cmd[1] == "set": - assert stdin is None - assert stdout is None - assert env["PYTHONIOENCODING"] == "utf-8" - assert input is not None - assert check - - # Input from stdin is encoded - self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip(os.linesep)) + parsed_cmd = list(cmd) + assert parsed_cmd.pop(0) == "keyring" + subcommand = [ + arg + for arg in parsed_cmd + # Skip past all the --whatever options until we get to the subcommand. + if not arg.startswith("--") + ][0] + subcommand_func = { + "get": self._get_subcommand, + "set": self._set_subcommand, + }[subcommand] + + subcommand_func( + parsed_cmd, + env=env, + stdin=stdin, + stdout=stdout, + input=input, + check=check, + ) return self + def _get_subcommand( + self, + cmd: List[str], + *, + env: Dict[str, str], + stdin: Optional[Any] = None, + stdout: Optional[Any] = None, + input: Optional[bytes] = None, + check: Optional[bool] = None, + ) -> None: + assert cmd.pop(0) == "--mode=creds" + assert cmd.pop(0) == "--output=json" + assert stdin == -3 # subprocess.DEVNULL + assert stdout == subprocess.PIPE + assert env["PYTHONIOENCODING"] == "utf-8" + assert check is None + assert cmd.pop(0) == "get" + + service = cmd.pop(0) + username = cmd.pop(0) if len(cmd) > 0 else None + creds = self.get_credential(service, username) + if creds is None: + # Expect non-zero returncode if no creds present + self.returncode = 1 + else: + # Passwords are returned encoded with a newline appended + self.returncode = 0 + self.stdout = json.dumps( + { + "username": creds.username, + "password": creds.password, + } + ).encode("utf-8") + + def _set_subcommand( + self, + cmd: List[str], + *, + env: Dict[str, str], + stdin: Optional[Any] = None, + stdout: Optional[Any] = None, + input: Optional[bytes] = None, + check: Optional[bool] = None, + ) -> None: + assert cmd.pop(0) == "set" + assert stdin is None + assert stdout is None + assert env["PYTHONIOENCODING"] == "utf-8" + assert input is not None + assert check + + # Input from stdin is encoded + system, username = cmd + self.set_password(system, username, input.decode("utf-8").strip(os.linesep)) + def check_returncode(self) -> None: if self.returncode: raise Exception() @@ -443,13 +552,14 @@ def check_returncode(self) -> None: @pytest.mark.parametrize( "url, expect", [ - ("http://example.com/path1", (None, None)), - # path1 URLs will be resolved by netloc - ("http://user@example.com/path3", ("user", "user!netloc")), - ("http://user2@example.com/path3", ("user2", "user2!netloc")), - # path2 URLs will be resolved by index URL - ("http://example.com/path2/path3", (None, None)), - ("http://foo@example.com/path2/path3", ("foo", "foo!url")), + # It's not obvious, but this url ultimately resolves to index url + # http://example.com/path2, so we get the creds for that index. + ("http://example.com/path1", ("saved-user1", "pw1")), + ("http://saved-user1@example.com/path2", ("saved-user1", "pw1")), + ("http://saved-user2@example.com/path2", ("saved-user2", "pw2")), + ("http://new-user@example.com/path2", ("new-user", None)), + ("http://example.com/path2/path3", ("saved-user1", "pw1")), + ("http://foo@example.com/path2/path3", ("foo", None)), ], ) def test_keyring_cli_get_password( @@ -457,17 +567,27 @@ def test_keyring_cli_get_password( url: str, expect: Tuple[Optional[str], Optional[str]], ) -> None: + keyring_subprocess = KeyringSubprocessResult() monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") monkeypatch.setattr( - pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult() + pip._internal.network.auth.subprocess, "run", keyring_subprocess ) auth = MultiDomainBasicAuth( index_urls=["http://example.com/path2", "http://example.com/path3"], keyring_provider="subprocess", ) - actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) - assert actual == expect + with keyring_subprocess.add_credential("example.com", "example", "!netloc"): + with keyring_subprocess.add_credential( + "http://example.com/path2/", "saved-user1", "pw1" + ): + with keyring_subprocess.add_credential( + "http://example.com/path2/", "saved-user2", "pw2" + ): + actual = auth._get_new_credentials( + url, allow_netrc=False, allow_keyring=True + ) + assert actual == expect @pytest.mark.parametrize( @@ -492,13 +612,14 @@ def test_keyring_cli_set_password( creds: Tuple[str, str, bool], expect_save: bool, ) -> None: + expected_username, expected_password, save = creds monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") keyring = KeyringSubprocessResult() monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring) auth = MultiDomainBasicAuth(prompting=True, keyring_provider="subprocess") monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None)) monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds) - if creds[2]: + if save: # when _prompt_for_password indicates to save, we should save def should_save_password_to_keyring(*a: Any) -> bool: return True @@ -535,6 +656,12 @@ def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse: auth.handle_401(resp) if expect_save: - assert keyring.saved_passwords == [("example.com", creds[0], creds[1])] + assert keyring.saved_credential_by_username_by_system == { + "example.com": { + expected_username: KeyringModuleV2.Credential( + expected_username, expected_password + ), + }, + } else: - assert keyring.saved_passwords == [] + assert keyring.saved_credential_by_username_by_system == {}