Skip to content

Commit 5cb4ee7

Browse files
author
yingbo_wang
committed
[AIRFLOW-2951] Update dag_run table end_date when state change
The existing airflow only change dag_run table end_date value when a user teminate a dag in web UI. The end_date will not be updated if airflow detected a dag finished and updated its state. This commit add end_date update in DagRun's set_state function to make up tho problem mentioned above.
1 parent e028b7c commit 5cb4ee7

File tree

3 files changed

+125
-4
lines changed

3 files changed

+125
-4
lines changed

airflow/api/common/experimental/mark_tasks.py

+1
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
208208
dr.state = state
209209
if state == State.RUNNING:
210210
dr.start_date = timezone.utcnow()
211+
dr.end_date = None
211212
else:
212213
dr.end_date = timezone.utcnow()
213214
session.commit()

airflow/models.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -4840,6 +4840,8 @@ def get_state(self):
48404840
def set_state(self, state):
48414841
if self._state != state:
48424842
self._state = state
4843+
self.end_date = timezone.utcnow() if self._state in State.finished() else None
4844+
48434845
if self.dag_id is not None:
48444846
# FIXME: Due to the scoped_session factor we we don't get a clean
48454847
# session here, so something really weird goes on:
@@ -5063,28 +5065,28 @@ def update_state(self, session=None):
50635065
if (not unfinished_tasks and
50645066
any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
50655067
self.log.info('Marking run %s failed', self)
5066-
self.state = State.FAILED
5068+
self.set_state(State.FAILED)
50675069
dag.handle_callback(self, success=False, reason='task_failure',
50685070
session=session)
50695071

50705072
# if all roots succeeded and no unfinished tasks, the run succeeded
50715073
elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
50725074
for r in roots):
50735075
self.log.info('Marking run %s successful', self)
5074-
self.state = State.SUCCESS
5076+
self.set_state(State.SUCCESS)
50755077
dag.handle_callback(self, success=True, reason='success', session=session)
50765078

50775079
# if *all tasks* are deadlocked, the run failed
50785080
elif (unfinished_tasks and none_depends_on_past and
50795081
none_task_concurrency and no_dependencies_met):
50805082
self.log.info('Deadlock; marking run %s failed', self)
5081-
self.state = State.FAILED
5083+
self.set_state(State.FAILED)
50825084
dag.handle_callback(self, success=False, reason='all_tasks_deadlocked',
50835085
session=session)
50845086

50855087
# finally, if the roots aren't done, the dag is still running
50865088
else:
5087-
self.state = State.RUNNING
5089+
self.set_state(State.RUNNING)
50885090

50895091
# todo: determine we want to use with_for_update to make sure to lock the run
50905092
session.merge(self)

tests/models.py

+118
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,124 @@ def on_failure_callable(context):
896896
updated_dag_state = dag_run.update_state()
897897
self.assertEqual(State.FAILED, updated_dag_state)
898898

899+
def test_dagrun_set_state_end_date(self):
900+
session = settings.Session()
901+
902+
dag = DAG(
903+
'test_dagrun_set_state_end_date',
904+
start_date=DEFAULT_DATE,
905+
default_args={'owner': 'owner1'})
906+
907+
dag.clear()
908+
909+
now = timezone.utcnow()
910+
dr = dag.create_dagrun(run_id='test_dagrun_set_state_end_date',
911+
state=State.RUNNING,
912+
execution_date=now,
913+
start_date=now)
914+
915+
# Initial end_date should be NULL
916+
# State.SUCCESS and State.FAILED are all ending state and should set end_date
917+
# State.RUNNING set end_date back to NULL
918+
session.add(dr)
919+
session.commit()
920+
self.assertIsNone(dr.end_date)
921+
922+
dr.set_state(State.SUCCESS)
923+
session.merge(dr)
924+
session.commit()
925+
926+
dr_database = session.query(DagRun).filter(
927+
DagRun.run_id == 'test_dagrun_set_state_end_date'
928+
).one()
929+
self.assertIsNotNone(dr_database.end_date)
930+
self.assertEqual(dr.end_date, dr_database.end_date)
931+
932+
dr.set_state(State.RUNNING)
933+
session.merge(dr)
934+
session.commit()
935+
936+
dr_database = session.query(DagRun).filter(
937+
DagRun.run_id == 'test_dagrun_set_state_end_date'
938+
).one()
939+
940+
self.assertIsNone(dr_database.end_date)
941+
942+
dr.set_state(State.FAILED)
943+
session.merge(dr)
944+
session.commit()
945+
dr_database = session.query(DagRun).filter(
946+
DagRun.run_id == 'test_dagrun_set_state_end_date'
947+
).one()
948+
949+
self.assertIsNotNone(dr_database.end_date)
950+
self.assertEqual(dr.end_date, dr_database.end_date)
951+
952+
def test_dagrun_update_state_end_date(self):
953+
session = settings.Session()
954+
955+
dag = DAG(
956+
'test_dagrun_update_state_end_date',
957+
start_date=DEFAULT_DATE,
958+
default_args={'owner': 'owner1'})
959+
960+
# A -> B
961+
with dag:
962+
op1 = DummyOperator(task_id='A')
963+
op2 = DummyOperator(task_id='B')
964+
op1.set_upstream(op2)
965+
966+
dag.clear()
967+
968+
now = timezone.utcnow()
969+
dr = dag.create_dagrun(run_id='test_dagrun_update_state_end_date',
970+
state=State.RUNNING,
971+
execution_date=now,
972+
start_date=now)
973+
974+
# Initial end_date should be NULL
975+
# State.SUCCESS and State.FAILED are all ending state and should set end_date
976+
# State.RUNNING set end_date back to NULL
977+
session.merge(dr)
978+
session.commit()
979+
self.assertIsNone(dr.end_date)
980+
981+
ti_op1 = dr.get_task_instance(task_id=op1.task_id)
982+
ti_op1.set_state(state=State.SUCCESS, session=session)
983+
ti_op2 = dr.get_task_instance(task_id=op2.task_id)
984+
ti_op2.set_state(state=State.SUCCESS, session=session)
985+
986+
dr.update_state()
987+
988+
dr_database = session.query(DagRun).filter(
989+
DagRun.run_id == 'test_dagrun_update_state_end_date'
990+
).one()
991+
self.assertIsNotNone(dr_database.end_date)
992+
self.assertEqual(dr.end_date, dr_database.end_date)
993+
994+
ti_op1.set_state(state=State.RUNNING, session=session)
995+
ti_op2.set_state(state=State.RUNNING, session=session)
996+
dr.update_state()
997+
998+
dr_database = session.query(DagRun).filter(
999+
DagRun.run_id == 'test_dagrun_update_state_end_date'
1000+
).one()
1001+
1002+
self.assertEqual(dr._state, State.RUNNING)
1003+
self.assertIsNone(dr.end_date)
1004+
self.assertIsNone(dr_database.end_date)
1005+
1006+
ti_op1.set_state(state=State.FAILED, session=session)
1007+
ti_op2.set_state(state=State.FAILED, session=session)
1008+
dr.update_state()
1009+
1010+
dr_database = session.query(DagRun).filter(
1011+
DagRun.run_id == 'test_dagrun_update_state_end_date'
1012+
).one()
1013+
1014+
self.assertIsNotNone(dr_database.end_date)
1015+
self.assertEqual(dr.end_date, dr_database.end_date)
1016+
8991017
def test_get_task_instance_on_empty_dagrun(self):
9001018
"""
9011019
Make sure that a proper value is returned when a dagrun has no task instances

0 commit comments

Comments
 (0)