Skip to content

Commit

Permalink
Handle ConnectionReset exception in Executor cleanup (#28685)
Browse files Browse the repository at this point in the history
(cherry picked from commit a3de721)
  • Loading branch information
maxnathaniel authored and ephraimbuddy committed Jan 12, 2023
1 parent 0468ee7 commit 6a6a258
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,13 +843,16 @@ def end(self) -> None:
assert self.kube_scheduler

self.log.info("Shutting down Kubernetes executor")
self.log.debug("Flushing task_queue...")
self._flush_task_queue()
self.log.debug("Flushing result_queue...")
self._flush_result_queue()
# Both queues should be empty...
self.task_queue.join()
self.result_queue.join()
try:
self.log.debug("Flushing task_queue...")
self._flush_task_queue()
self.log.debug("Flushing result_queue...")
self._flush_result_queue()
# Both queues should be empty...
self.task_queue.join()
self.result_queue.join()
except ConnectionResetError:
self.log.exception("Connection Reset error while flushing task_queue and result_queue.")
if self.kube_scheduler:
self.kube_scheduler.terminate()
self._manager.shutdown()
Expand Down

0 comments on commit 6a6a258

Please sign in to comment.