Skip to content

Commit afd37f2

Browse files
KevinYang21Alice Berard
authored and
Alice Berard
committed
[AIRFLOW-2756] Fix bug in set DAG run state workflow (apache#3606)
1 parent 803bdbf commit afd37f2

File tree

4 files changed

+38
-4
lines changed

4 files changed

+38
-4
lines changed

airflow/api/common/experimental/mark_tasks.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,10 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
206206
DR.execution_date == execution_date
207207
).one()
208208
dr.state = state
209-
dr.end_date = timezone.utcnow()
209+
if state == State.RUNNING:
210+
dr.start_date = timezone.utcnow()
211+
else:
212+
dr.end_date = timezone.utcnow()
210213
session.commit()
211214

212215

airflow/jobs.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1024,8 +1024,7 @@ def _change_state_for_tis_without_dagrun(self,
10241024
models.TaskInstance.dag_id == subq.c.dag_id,
10251025
models.TaskInstance.task_id == subq.c.task_id,
10261026
models.TaskInstance.execution_date ==
1027-
subq.c.execution_date,
1028-
models.TaskInstance.task_id == subq.c.task_id)) \
1027+
subq.c.execution_date)) \
10291028
.update({models.TaskInstance.state: new_state},
10301029
synchronize_session=False)
10311030
session.commit()

airflow/www/views.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -2742,7 +2742,8 @@ def after_model_change(self, form, dagrun, is_created, session=None):
27422742
altered_tis = set_dag_run_state_to_success(
27432743
dagbag.get_dag(dagrun.dag_id),
27442744
dagrun.execution_date,
2745-
commit=True)
2745+
commit=True,
2746+
session=session)
27462747
elif dagrun.state == State.FAILED:
27472748
altered_tis = set_dag_run_state_to_failed(
27482749
dagbag.get_dag(dagrun.dag_id),

tests/api/common/experimental/mark_tasks.py

+31
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,25 @@ def _create_test_dag_run(self, state, date):
267267
def _verify_dag_run_state(self, dag, date, state):
268268
drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
269269
dr = drs[0]
270+
270271
self.assertEqual(dr.get_state(), state)
271272

273+
def _verify_dag_run_dates(self, dag, date, state, middle_time):
274+
# When target state is RUNNING, we should set start_date,
275+
# otherwise we should set end_date.
276+
drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
277+
dr = drs[0]
278+
if state == State.RUNNING:
279+
self.assertGreater(dr.start_date, middle_time)
280+
self.assertIsNone(dr.end_date)
281+
else:
282+
self.assertLess(dr.start_date, middle_time)
283+
self.assertGreater(dr.end_date, middle_time)
284+
272285
def test_set_running_dag_run_to_success(self):
273286
date = self.execution_dates[0]
274287
dr = self._create_test_dag_run(State.RUNNING, date)
288+
middle_time = timezone.utcnow()
275289
self._set_default_task_instance_states(dr)
276290

277291
altered = set_dag_run_state_to_success(self.dag1, date, commit=True)
@@ -280,10 +294,12 @@ def test_set_running_dag_run_to_success(self):
280294
self.assertEqual(len(altered), 5)
281295
self._verify_dag_run_state(self.dag1, date, State.SUCCESS)
282296
self._verify_task_instance_states(self.dag1, date, State.SUCCESS)
297+
self._verify_dag_run_dates(self.dag1, date, State.SUCCESS, middle_time)
283298

284299
def test_set_running_dag_run_to_failed(self):
285300
date = self.execution_dates[0]
286301
dr = self._create_test_dag_run(State.RUNNING, date)
302+
middle_time = timezone.utcnow()
287303
self._set_default_task_instance_states(dr)
288304

289305
altered = set_dag_run_state_to_failed(self.dag1, date, commit=True)
@@ -292,10 +308,12 @@ def test_set_running_dag_run_to_failed(self):
292308
self.assertEqual(len(altered), 1)
293309
self._verify_dag_run_state(self.dag1, date, State.FAILED)
294310
self.assertEqual(dr.get_task_instance('run_after_loop').state, State.FAILED)
311+
self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)
295312

296313
def test_set_running_dag_run_to_running(self):
297314
date = self.execution_dates[0]
298315
dr = self._create_test_dag_run(State.RUNNING, date)
316+
middle_time = timezone.utcnow()
299317
self._set_default_task_instance_states(dr)
300318

301319
altered = set_dag_run_state_to_running(self.dag1, date, commit=True)
@@ -304,10 +322,12 @@ def test_set_running_dag_run_to_running(self):
304322
self.assertEqual(len(altered), 0)
305323
self._verify_dag_run_state(self.dag1, date, State.RUNNING)
306324
self._verify_task_instance_states_remain_default(dr)
325+
self._verify_dag_run_dates(self.dag1, date, State.RUNNING, middle_time)
307326

308327
def test_set_success_dag_run_to_success(self):
309328
date = self.execution_dates[0]
310329
dr = self._create_test_dag_run(State.SUCCESS, date)
330+
middle_time = timezone.utcnow()
311331
self._set_default_task_instance_states(dr)
312332

313333
altered = set_dag_run_state_to_success(self.dag1, date, commit=True)
@@ -316,10 +336,12 @@ def test_set_success_dag_run_to_success(self):
316336
self.assertEqual(len(altered), 5)
317337
self._verify_dag_run_state(self.dag1, date, State.SUCCESS)
318338
self._verify_task_instance_states(self.dag1, date, State.SUCCESS)
339+
self._verify_dag_run_dates(self.dag1, date, State.SUCCESS, middle_time)
319340

320341
def test_set_success_dag_run_to_failed(self):
321342
date = self.execution_dates[0]
322343
dr = self._create_test_dag_run(State.SUCCESS, date)
344+
middle_time = timezone.utcnow()
323345
self._set_default_task_instance_states(dr)
324346

325347
altered = set_dag_run_state_to_failed(self.dag1, date, commit=True)
@@ -328,10 +350,12 @@ def test_set_success_dag_run_to_failed(self):
328350
self.assertEqual(len(altered), 1)
329351
self._verify_dag_run_state(self.dag1, date, State.FAILED)
330352
self.assertEqual(dr.get_task_instance('run_after_loop').state, State.FAILED)
353+
self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)
331354

332355
def test_set_success_dag_run_to_running(self):
333356
date = self.execution_dates[0]
334357
dr = self._create_test_dag_run(State.SUCCESS, date)
358+
middle_time = timezone.utcnow()
335359
self._set_default_task_instance_states(dr)
336360

337361
altered = set_dag_run_state_to_running(self.dag1, date, commit=True)
@@ -340,10 +364,12 @@ def test_set_success_dag_run_to_running(self):
340364
self.assertEqual(len(altered), 0)
341365
self._verify_dag_run_state(self.dag1, date, State.RUNNING)
342366
self._verify_task_instance_states_remain_default(dr)
367+
self._verify_dag_run_dates(self.dag1, date, State.RUNNING, middle_time)
343368

344369
def test_set_failed_dag_run_to_success(self):
345370
date = self.execution_dates[0]
346371
dr = self._create_test_dag_run(State.SUCCESS, date)
372+
middle_time = timezone.utcnow()
347373
self._set_default_task_instance_states(dr)
348374

349375
altered = set_dag_run_state_to_success(self.dag1, date, commit=True)
@@ -352,10 +378,12 @@ def test_set_failed_dag_run_to_success(self):
352378
self.assertEqual(len(altered), 5)
353379
self._verify_dag_run_state(self.dag1, date, State.SUCCESS)
354380
self._verify_task_instance_states(self.dag1, date, State.SUCCESS)
381+
self._verify_dag_run_dates(self.dag1, date, State.SUCCESS, middle_time)
355382

356383
def test_set_failed_dag_run_to_failed(self):
357384
date = self.execution_dates[0]
358385
dr = self._create_test_dag_run(State.SUCCESS, date)
386+
middle_time = timezone.utcnow()
359387
self._set_default_task_instance_states(dr)
360388

361389
altered = set_dag_run_state_to_failed(self.dag1, date, commit=True)
@@ -364,10 +392,12 @@ def test_set_failed_dag_run_to_failed(self):
364392
self.assertEqual(len(altered), 1)
365393
self._verify_dag_run_state(self.dag1, date, State.FAILED)
366394
self.assertEqual(dr.get_task_instance('run_after_loop').state, State.FAILED)
395+
self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)
367396

368397
def test_set_failed_dag_run_to_running(self):
369398
date = self.execution_dates[0]
370399
dr = self._create_test_dag_run(State.SUCCESS, date)
400+
middle_time = timezone.utcnow()
371401
self._set_default_task_instance_states(dr)
372402

373403
altered = set_dag_run_state_to_running(self.dag1, date, commit=True)
@@ -376,6 +406,7 @@ def test_set_failed_dag_run_to_running(self):
376406
self.assertEqual(len(altered), 0)
377407
self._verify_dag_run_state(self.dag1, date, State.RUNNING)
378408
self._verify_task_instance_states_remain_default(dr)
409+
self._verify_dag_run_dates(self.dag1, date, State.RUNNING, middle_time)
379410

380411
def test_set_state_without_commit(self):
381412
date = self.execution_dates[0]

0 commit comments

Comments
 (0)