Skip to content

Commit

Permalink
Patch kubernetes Api Client to work around lock contention issues
Browse files Browse the repository at this point in the history
Unclear whether the benefits are worth the risk of breaking changes here, but this is a workaround for the signifcant lock contention issues with the k8s client surfaced in #23933 (reply in thread). This appears to be a known issue with the k8s python client - see kubernetes-client/python#2284.

Test Plan: BK
  • Loading branch information
gibsondan committed Sep 26, 2024
1 parent e49e025 commit 6b2be63
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

from dagster_buildkite.package_spec import PackageSpec
from dagster_buildkite.python_version import AvailablePythonVersion
from dagster_buildkite.steps.packages import build_steps_from_package_specs, gcp_creds_extra_cmds
from dagster_buildkite.steps.packages import (
build_steps_from_package_specs,
gcp_creds_extra_cmds,
k8s_extra_cmds,
)
from dagster_buildkite.utils import BuildkiteStep


Expand All @@ -25,6 +29,19 @@ def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]:
AvailablePythonVersion.V3_12,
],
),
PackageSpec(
"python_modules/libraries/dagster-k8s",
env_vars=[
"AWS_ACCOUNT_ID",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"BUILDKITE_SECRETS_BUCKET",
],
pytest_tox_factors=[
"nightly",
],
pytest_extra_cmds=k8s_extra_cmds,
),
]
)

Expand Down
43 changes: 41 additions & 2 deletions python_modules/libraries/dagster-k8s/dagster_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

import kubernetes.client
import kubernetes.client.rest
import six
from dagster import (
DagsterInstance,
_check as check,
)
from dagster._core.storage.dagster_run import DagsterRunStatus
from kubernetes.client.api_client import ApiClient
from kubernetes.client.models import V1Job, V1JobStatus

try:
Expand Down Expand Up @@ -91,6 +93,39 @@ class DagsterK8sJobStatusException(Exception):
]


class PatchedApiClient(ApiClient):
# Forked from ApiClient implementation to pass configuration object down into created model
# objects, avoiding lock contention issues. See https://github.com/kubernetes-client/python/issues/2284
def __deserialize_model(self, data, klass):
"""Deserializes list or dict to model.
:param data: dict, list.
:param klass: class literal.
:return: model object.
"""
if not klass.openapi_types and not hasattr(klass, "get_real_child_model"):
return data

# Below is the only change from the base ApiClient implementation - pass through the
# Configuration object to each newly created model so that each one does not have to create
# one and acquire a lock
kwargs = {"local_vars_configuration": self.configuration}

if data is not None and klass.openapi_types is not None and isinstance(data, (list, dict)):
for attr, attr_type in six.iteritems(klass.openapi_types):
if klass.attribute_map[attr] in data:
value = data[klass.attribute_map[attr]]
kwargs[attr] = self.__deserialize(value, attr_type)

instance = klass(**kwargs)

if hasattr(instance, "get_real_child_model"):
klass_name = instance.get_real_child_model(data)
if klass_name:
instance = self.__deserialize(data, klass_name)
return instance


def k8s_api_retry(
fn: Callable[..., T],
max_retries: int,
Expand Down Expand Up @@ -209,8 +244,12 @@ def __init__(self, batch_api, core_api, logger, sleeper, timer):
@staticmethod
def production_client(batch_api_override=None, core_api_override=None):
return DagsterKubernetesClient(
batch_api=batch_api_override or kubernetes.client.BatchV1Api(),
core_api=core_api_override or kubernetes.client.CoreV1Api(),
batch_api=(
batch_api_override or kubernetes.client.BatchV1Api(api_client=PatchedApiClient())
),
core_api=(
core_api_override or kubernetes.client.CoreV1Api(api_client=PatchedApiClient())
),
logger=logging.info,
sleeper=time.sleep,
timer=time.time,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
KUBERNETES_VERSION_UPPER_BOUND = "32"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
markers =
nightly: only run in nightly test suite
Original file line number Diff line number Diff line change
@@ -1,5 +1,44 @@
import packaging.version
import pytest
import requests
from dagster_k8s.version import __version__


def test_version():
assert __version__


from dagster_k8s.kubernetes_version import KUBERNETES_VERSION_UPPER_BOUND


def parse_package_version(version_str: str) -> packaging.version.Version:
parsed_version = packaging.version.parse(version_str)
assert isinstance(
parsed_version, packaging.version.Version
), f"Found LegacyVersion: {version_str}"
return parsed_version


def _get_latest_published_k8s_version() -> packaging.version.Version:
res = requests.get("https://pypi.org/pypi/kubernetes/json")
module_json = res.json()
releases = module_json["releases"]
release_versions = [
parse_package_version(version)
for version, files in releases.items()
if not any(file.get("yanked") for file in files)
]
for release_version in reversed(sorted(release_versions)):
if not release_version.is_prerelease:
return release_version

raise Exception("Could not find any latest published kubernetes version")


@pytest.mark.nightly
def test_latest_version_pin():
latest_version = _get_latest_published_k8s_version()
assert latest_version.major < packaging.version.parse(KUBERNETES_VERSION_UPPER_BOUND).major, (
f"A new version {latest_version} of kubernetes has been released to pypi that exceeds our pin. "
"Increase the pinned version in kubernetes_version.py and verify that it still passes tests."
)
18 changes: 13 additions & 5 deletions python_modules/libraries/dagster-k8s/setup.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
from pathlib import Path
from typing import Dict
from typing import Dict, Tuple

from setuptools import find_packages, setup


def get_version() -> str:
def get_version() -> Tuple[str, str]:
version: Dict[str, str] = {}
kubernetes_version: Dict[str, str] = {}

with open(Path(__file__).parent / "dagster_k8s/version.py", encoding="utf8") as fp:
exec(fp.read(), version)

return version["__version__"]
with open(Path(__file__).parent / "dagster_k8s/kubernetes_version.py", encoding="utf8") as fp:
exec(fp.read(), kubernetes_version)

return version["__version__"], kubernetes_version["KUBERNETES_VERSION_UPPER_BOUND"]


ver = get_version()
(
ver,
KUBERNETES_VERSION_UPPER_BOUND,
) = get_version()
# dont pin dev installs to avoid pip dep resolver issues
pin = "" if ver == "1!0+dev" else f"=={ver}"
setup(
Expand All @@ -37,7 +45,7 @@ def get_version() -> str:
python_requires=">=3.8,<3.13",
install_requires=[
f"dagster{pin}",
"kubernetes",
f"kubernetes<{KUBERNETES_VERSION_UPPER_BOUND}",
# exclude a google-auth release that added an overly restrictive urllib3 pin that confuses dependency resolvers
"google-auth!=2.23.1",
],
Expand Down
5 changes: 3 additions & 2 deletions python_modules/libraries/dagster-k8s/tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = {default,old_kubernetes}
envlist = {default,old_kubernetes,nightly}
skipsdist = true

[testenv]
Expand Down Expand Up @@ -34,4 +34,5 @@ allowlist_externals =
uv
commands =
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster'
pytest --log-cli-level=INFO -vv {posargs}
!nightly: pytest --log-cli-level=INFO -m 'not nightly' -vv {posargs}
nightly: pytest --log-cli-level=INFO -m 'nightly' -vv {posargs}

0 comments on commit 6b2be63

Please sign in to comment.