Skip to content

Commit

Permalink
Handle ConnectionReset exception in Executor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
maxnathaniel committed Jan 3, 2023
1 parent 3b6ced6 commit 134d563
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,13 +872,16 @@ def end(self) -> None:
assert self.kube_scheduler

self.log.info("Shutting down Kubernetes executor")
self.log.debug("Flushing task_queue...")
self._flush_task_queue()
self.log.debug("Flushing result_queue...")
self._flush_result_queue()
# Both queues should be empty...
self.task_queue.join()
self.result_queue.join()
try:
self.log.debug("Flushing task_queue...")
self._flush_task_queue()
self.log.debug("Flushing result_queue...")
self._flush_result_queue()
# Both queues should be empty...
self.task_queue.join()
self.result_queue.join()
except ConnectionResetError:
self.log.exception("Connection Reset error while flushing task_queue and result_queue.")
if self.kube_scheduler:
self.kube_scheduler.terminate()
self._manager.shutdown()
Expand Down

0 comments on commit 134d563

Please sign in to comment.