From 9922953bcd9e11a1412a3528aef938444d62f7fe Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Mon, 9 Jan 2023 18:07:56 -0600 Subject: [PATCH] Only patch single label when adopting pod (#28776) When KubernetesExecutor adopts pods, it was patching the pod with the pod it retrieved from the k8s api, while just updating a single label. Normally this works just fine, but there are cases where the pod you pull from the k8s api can't be used as-is when patching - it results in a 422 `Forbidden: pod updates may not change fields other than ...`. Instead we now just pass the single label we need to update to patch, allowing us to avoid accidentally "updating" other fields. Closes #24015 --- airflow/executors/kubernetes_executor.py | 15 ++-- tests/executors/test_kubernetes_executor.py | 80 ++++++++++++++++++--- 2 files changed, 77 insertions(+), 18 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 12fbca31ff27b..c2d7406657f01 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -778,26 +778,28 @@ def adopt_launched_task( assert self.scheduler_job_id self.log.info("attempting to adopt pod %s", pod.metadata.name) - pod.metadata.labels["airflow-worker"] = pod_generator.make_safe_label_value(self.scheduler_job_id) pod_id = annotations_to_key(pod.metadata.annotations) if pod_id not in pod_ids: self.log.error("attempting to adopt taskinstance which was not specified by database: %s", pod_id) return + new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id) try: kube_client.patch_namespaced_pod( name=pod.metadata.name, namespace=pod.metadata.namespace, - body=PodGenerator.serialize_pod(pod), + body={"metadata": {"labels": {"airflow-worker": new_worker_id_label}}}, ) - pod_ids.pop(pod_id) - self.running.add(pod_id) except ApiException as e: self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) + return + + del pod_ids[pod_id] + self.running.add(pod_id) def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: """ - Patch completed pod so that the KubernetesJobWatcher can delete it. + Patch completed pods so that the KubernetesJobWatcher can delete them. :param kube_client: kubernetes client for speaking to kube API """ @@ -812,12 +814,11 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: pod_list = self._list_pods(query_kwargs) for pod in pod_list: self.log.info("Attempting to adopt pod %s", pod.metadata.name) - pod.metadata.labels["airflow-worker"] = new_worker_id_label try: kube_client.patch_namespaced_pod( name=pod.metadata.name, namespace=pod.metadata.namespace, - body=PodGenerator.serialize_pod(pod), + body={"metadata": {"labels": {"airflow-worker": new_worker_id_label}}}, ) except ApiException as e: self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index aef530911b5dd..17611b94343a6 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -718,20 +718,78 @@ def test_adopt_launched_task(self, mock_kube_client): pod_ids = {ti_key: {}} executor.adopt_launched_task(mock_kube_client, pod=pod, pod_ids=pod_ids) - assert mock_kube_client.patch_namespaced_pod.call_args[1] == { - "body": { - "metadata": { - "labels": {"airflow-worker": "modified"}, - "annotations": annotations, - "name": "foo", - } - }, - "name": "foo", - "namespace": None, - } + mock_kube_client.patch_namespaced_pod.assert_called_once_with( + body={"metadata": {"labels": {"airflow-worker": "modified"}}}, + name="foo", + namespace=None, + ) assert pod_ids == {} assert executor.running == {ti_key} + @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") + def test_adopt_launched_task_api_exception(self, mock_kube_client): + """We shouldn't think we are running the task if aren't able to patch the pod""" + executor = self.kubernetes_executor + executor.scheduler_job_id = "modified" + annotations = { + "dag_id": "dag", + "run_id": "run_id", + "task_id": "task", + "try_number": "1", + } + ti_key = annotations_to_key(annotations) + pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo", annotations=annotations)) + pod_ids = {ti_key: {}} + + mock_kube_client.patch_namespaced_pod.side_effect = ApiException(status=400) + executor.adopt_launched_task(mock_kube_client, pod=pod, pod_ids=pod_ids) + mock_kube_client.patch_namespaced_pod.assert_called_once_with( + body={"metadata": {"labels": {"airflow-worker": "modified"}}}, + name="foo", + namespace=None, + ) + assert pod_ids == {ti_key: {}} + assert executor.running == set() + + @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") + def test_adopt_completed_pods(self, mock_kube_client): + """We should adopt all completed pods from other schedulers""" + executor = self.kubernetes_executor + executor.scheduler_job_id = "modified" + executor.kube_client = mock_kube_client + executor.kube_config.kube_namespace = "somens" + pod_names = ["one", "two"] + mock_kube_client.list_namespaced_pod.return_value.items = [ + k8s.V1Pod( + metadata=k8s.V1ObjectMeta( + name=pod_name, + labels={"airflow-worker": pod_name}, + annotations={"some_annotation": "hello"}, + namespace="somens", + ) + ) + for pod_name in pod_names + ] + + executor._adopt_completed_pods(mock_kube_client) + mock_kube_client.list_namespaced_pod.assert_called_once_with( + namespace="somens", + field_selector="status.phase=Succeeded", + label_selector="kubernetes_executor=True,airflow-worker!=modified", + ) + assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count + mock_kube_client.patch_namespaced_pod.assert_has_calls( + [ + mock.call( + body={"metadata": {"labels": {"airflow-worker": "modified"}}}, + name=pod_name, + namespace="somens", + ) + for pod_name in pod_names + ], + any_order=True, + ) + @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") def test_not_adopt_unassigned_task(self, mock_kube_client): """