From dc14a31ac3c6e62a846907d365c4d00d6a564795 Mon Sep 17 00:00:00 2001 From: Abdul Nimeri Date: Fri, 27 Jul 2018 14:55:32 -0700 Subject: [PATCH] [AIRFLOW-2145] fix deadlock on clearing running TI a `shutdown` task is not considered be `unfinished`, so a dag run can deadlock when all `unfinished` downstreams are all waiting on a task that's in the `shutdown` state. fix this by considering `shutdown` to be `unfinished`, since it's not truly a terminal state --- airflow/utils/state.py | 6 +++--- tests/models.py | 21 ++++++++++++++++++++- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/airflow/utils/state.py b/airflow/utils/state.py index 9da98510eb03d..a351df07b9654 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -101,7 +101,6 @@ def finished(cls): """ return [ cls.SUCCESS, - cls.SHUTDOWN, cls.FAILED, cls.SKIPPED, ] @@ -117,5 +116,6 @@ def unfinished(cls): cls.SCHEDULED, cls.QUEUED, cls.RUNNING, + cls.SHUTDOWN, cls.UP_FOR_RETRY ] diff --git a/tests/models.py b/tests/models.py index 1c88ea47f7085..529ae56454ded 100644 --- a/tests/models.py +++ b/tests/models.py @@ -801,7 +801,26 @@ def test_dagrun_deadlock(self): dr.update_state() self.assertEqual(dr.state, State.FAILED) - def test_dagrun_no_deadlock(self): + def test_dagrun_no_deadlock_with_shutdown(self): + session = settings.Session() + dag = DAG('test_dagrun_no_deadlock_with_shutdown', + start_date=DEFAULT_DATE) + with dag: + op1 = DummyOperator(task_id='upstream_task') + op2 = DummyOperator(task_id='downstream_task') + op2.set_upstream(op1) + + dr = dag.create_dagrun(run_id='test_dagrun_no_deadlock_with_shutdown', + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE) + upstream_ti = dr.get_task_instance(task_id='upstream_task') + upstream_ti.set_state(State.SHUTDOWN, session=session) + + dr.update_state() + self.assertEqual(dr.state, State.RUNNING) + + def test_dagrun_no_deadlock_with_depends_on_past(self): session = settings.Session() dag = DAG('test_dagrun_no_deadlock', start_date=DEFAULT_DATE)