From 134d563f5cd7afec23b27467e131260ed79aafa9 Mon Sep 17 00:00:00 2001 From: Max Ho Date: Tue, 3 Jan 2023 09:20:16 +0800 Subject: [PATCH] Handle ConnectionReset exception in Executor cleanup --- airflow/executors/kubernetes_executor.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 42d3633a6cb08..12fbca31ff27b 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -872,13 +872,16 @@ def end(self) -> None: assert self.kube_scheduler self.log.info("Shutting down Kubernetes executor") - self.log.debug("Flushing task_queue...") - self._flush_task_queue() - self.log.debug("Flushing result_queue...") - self._flush_result_queue() - # Both queues should be empty... - self.task_queue.join() - self.result_queue.join() + try: + self.log.debug("Flushing task_queue...") + self._flush_task_queue() + self.log.debug("Flushing result_queue...") + self._flush_result_queue() + # Both queues should be empty... + self.task_queue.join() + self.result_queue.join() + except ConnectionResetError: + self.log.exception("Connection Reset error while flushing task_queue and result_queue.") if self.kube_scheduler: self.kube_scheduler.terminate() self._manager.shutdown()