Skip to content

Commit

Permalink
Only patch single label when adopting pod (#28776)
Browse files Browse the repository at this point in the history
When KubernetesExecutor adopts pods, it was patching the pod with the
pod it retrieved from the k8s api, while just updating a single label.
Normally this works just fine, but there are cases where the pod you
pull from the k8s api can't be used as-is when patching - it results
in a 422 `Forbidden: pod updates may not change fields other than ...`.

Instead we now just pass the single label we need to update to patch,
allowing us to avoid accidentally "updating" other fields.

Closes #24015
  • Loading branch information
jedcunningham authored Jan 10, 2023
1 parent 3ececb2 commit 9922953
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 18 deletions.
15 changes: 8 additions & 7 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,26 +778,28 @@ def adopt_launched_task(
assert self.scheduler_job_id

self.log.info("attempting to adopt pod %s", pod.metadata.name)
pod.metadata.labels["airflow-worker"] = pod_generator.make_safe_label_value(self.scheduler_job_id)
pod_id = annotations_to_key(pod.metadata.annotations)
if pod_id not in pod_ids:
self.log.error("attempting to adopt taskinstance which was not specified by database: %s", pod_id)
return

new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id)
try:
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body=PodGenerator.serialize_pod(pod),
body={"metadata": {"labels": {"airflow-worker": new_worker_id_label}}},
)
pod_ids.pop(pod_id)
self.running.add(pod_id)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
return

del pod_ids[pod_id]
self.running.add(pod_id)

def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
"""
Patch completed pod so that the KubernetesJobWatcher can delete it.
Patch completed pods so that the KubernetesJobWatcher can delete them.
:param kube_client: kubernetes client for speaking to kube API
"""
Expand All @@ -812,12 +814,11 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
self.log.info("Attempting to adopt pod %s", pod.metadata.name)
pod.metadata.labels["airflow-worker"] = new_worker_id_label
try:
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body=PodGenerator.serialize_pod(pod),
body={"metadata": {"labels": {"airflow-worker": new_worker_id_label}}},
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
Expand Down
80 changes: 69 additions & 11 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,20 +718,78 @@ def test_adopt_launched_task(self, mock_kube_client):
pod_ids = {ti_key: {}}

executor.adopt_launched_task(mock_kube_client, pod=pod, pod_ids=pod_ids)
assert mock_kube_client.patch_namespaced_pod.call_args[1] == {
"body": {
"metadata": {
"labels": {"airflow-worker": "modified"},
"annotations": annotations,
"name": "foo",
}
},
"name": "foo",
"namespace": None,
}
mock_kube_client.patch_namespaced_pod.assert_called_once_with(
body={"metadata": {"labels": {"airflow-worker": "modified"}}},
name="foo",
namespace=None,
)
assert pod_ids == {}
assert executor.running == {ti_key}

@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_adopt_launched_task_api_exception(self, mock_kube_client):
"""We shouldn't think we are running the task if aren't able to patch the pod"""
executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
annotations = {
"dag_id": "dag",
"run_id": "run_id",
"task_id": "task",
"try_number": "1",
}
ti_key = annotations_to_key(annotations)
pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo", annotations=annotations))
pod_ids = {ti_key: {}}

mock_kube_client.patch_namespaced_pod.side_effect = ApiException(status=400)
executor.adopt_launched_task(mock_kube_client, pod=pod, pod_ids=pod_ids)
mock_kube_client.patch_namespaced_pod.assert_called_once_with(
body={"metadata": {"labels": {"airflow-worker": "modified"}}},
name="foo",
namespace=None,
)
assert pod_ids == {ti_key: {}}
assert executor.running == set()

@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_adopt_completed_pods(self, mock_kube_client):
"""We should adopt all completed pods from other schedulers"""
executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
executor.kube_client = mock_kube_client
executor.kube_config.kube_namespace = "somens"
pod_names = ["one", "two"]
mock_kube_client.list_namespaced_pod.return_value.items = [
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name=pod_name,
labels={"airflow-worker": pod_name},
annotations={"some_annotation": "hello"},
namespace="somens",
)
)
for pod_name in pod_names
]

executor._adopt_completed_pods(mock_kube_client)
mock_kube_client.list_namespaced_pod.assert_called_once_with(
namespace="somens",
field_selector="status.phase=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker!=modified",
)
assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count
mock_kube_client.patch_namespaced_pod.assert_has_calls(
[
mock.call(
body={"metadata": {"labels": {"airflow-worker": "modified"}}},
name=pod_name,
namespace="somens",
)
for pod_name in pod_names
],
any_order=True,
)

@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_not_adopt_unassigned_task(self, mock_kube_client):
"""
Expand Down

0 comments on commit 9922953

Please sign in to comment.