Skip to content

Commit 8cea48a

Browse files
committed
[AIRFLOW-1195] Add feature to clear tasks in Parent Dag (#3907)
1 parent 6984ac0 commit 8cea48a

File tree

6 files changed

+138
-7
lines changed

6 files changed

+138
-7
lines changed

airflow/bin/cli.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,9 @@ def clear(args):
634634
only_failed=args.only_failed,
635635
only_running=args.only_running,
636636
confirm_prompt=not args.no_confirm,
637-
include_subdags=not args.exclude_subdags)
637+
include_subdags=not args.exclude_subdags,
638+
include_parentdag=not args.exclude_parentdag,
639+
)
638640

639641

640642
def get_num_ready_workers_running(gunicorn_master_proc):
@@ -1420,6 +1422,10 @@ class CLIFactory(object):
14201422
'exclude_subdags': Arg(
14211423
("-x", "--exclude_subdags"),
14221424
"Exclude subdags", "store_true"),
1425+
'exclude_parentdag': Arg(
1426+
("-xp", "--exclude_parentdag"),
1427+
"Exclude ParentDAGS if the task cleared is a part of a SubDAG",
1428+
"store_true"),
14231429
'dag_regex': Arg(
14241430
("-dx", "--dag_regex"),
14251431
"Search dag_id as regex instead of exact string", "store_true"),
@@ -1739,7 +1745,7 @@ class CLIFactory(object):
17391745
'args': (
17401746
'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
17411747
'upstream', 'downstream', 'no_confirm', 'only_failed',
1742-
'only_running', 'exclude_subdags', 'dag_regex'),
1748+
'only_running', 'exclude_subdags', 'exclude_parentdag', 'dag_regex'),
17431749
}, {
17441750
'func': pause,
17451751
'help': "Pause a DAG",

airflow/models.py

+26
Original file line numberDiff line numberDiff line change
@@ -3762,9 +3762,11 @@ def clear(
37623762
only_running=False,
37633763
confirm_prompt=False,
37643764
include_subdags=True,
3765+
include_parentdag=True,
37653766
reset_dag_runs=True,
37663767
dry_run=False,
37673768
session=None,
3769+
get_tis=False,
37683770
):
37693771
"""
37703772
Clears a set of task instances associated with the current dag for
@@ -3785,6 +3787,25 @@ def clear(
37853787
tis = session.query(TI).filter(TI.dag_id == self.dag_id)
37863788
tis = tis.filter(TI.task_id.in_(self.task_ids))
37873789

3790+
if include_parentdag and self.is_subdag:
3791+
3792+
p_dag = self.parent_dag.sub_dag(
3793+
task_regex=self.dag_id.split('.')[1],
3794+
include_upstream=False,
3795+
include_downstream=True)
3796+
3797+
tis = tis.union(p_dag.clear(
3798+
start_date=start_date, end_date=end_date,
3799+
only_failed=only_failed,
3800+
only_running=only_running,
3801+
confirm_prompt=confirm_prompt,
3802+
include_subdags=include_subdags,
3803+
include_parentdag=False,
3804+
reset_dag_runs=reset_dag_runs,
3805+
get_tis=True,
3806+
session=session,
3807+
))
3808+
37883809
if start_date:
37893810
tis = tis.filter(TI.execution_date >= start_date)
37903811
if end_date:
@@ -3794,6 +3815,9 @@ def clear(
37943815
if only_running:
37953816
tis = tis.filter(TI.state == State.RUNNING)
37963817

3818+
if get_tis:
3819+
return tis
3820+
37973821
if dry_run:
37983822
tis = tis.all()
37993823
session.expunge_all()
@@ -3837,6 +3861,7 @@ def clear_dags(
38373861
only_running=False,
38383862
confirm_prompt=False,
38393863
include_subdags=True,
3864+
include_parentdag=False,
38403865
reset_dag_runs=True,
38413866
dry_run=False,
38423867
):
@@ -3849,6 +3874,7 @@ def clear_dags(
38493874
only_running=only_running,
38503875
confirm_prompt=False,
38513876
include_subdags=include_subdags,
3877+
include_parentdag=include_parentdag,
38523878
reset_dag_runs=reset_dag_runs,
38533879
dry_run=True)
38543880
all_tis.extend(tis)

airflow/www/views.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
10851085
count = dag.clear(
10861086
start_date=start_date,
10871087
end_date=end_date,
1088-
include_subdags=recursive)
1088+
include_subdags=recursive,
1089+
include_parentdag=recursive,
1090+
)
10891091

10901092
flash("{0} task instances have been cleared".format(count))
10911093
return redirect(origin)
@@ -1094,7 +1096,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
10941096
start_date=start_date,
10951097
end_date=end_date,
10961098
include_subdags=recursive,
1097-
dry_run=True)
1099+
dry_run=True,
1100+
include_parentdag=recursive,
1101+
)
10981102
if not tis:
10991103
flash("No task instances to clear", 'error')
11001104
response = redirect(origin)

airflow/www_rbac/views.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
768768
count = dag.clear(
769769
start_date=start_date,
770770
end_date=end_date,
771-
include_subdags=recursive)
771+
include_subdags=recursive,
772+
include_parentdag=recursive,
773+
)
772774

773775
flash("{0} task instances have been cleared".format(count))
774776
return redirect(origin)
@@ -777,7 +779,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
777779
start_date=start_date,
778780
end_date=end_date,
779781
include_subdags=recursive,
780-
dry_run=True)
782+
include_parentdag=recursive,
783+
dry_run=True,
784+
)
781785
if not tis:
782786
flash("No task instances to clear", 'error')
783787
response = redirect(origin)

tests/core.py

+39-1
Original file line numberDiff line numberDiff line change
@@ -1316,8 +1316,18 @@ def test_subdag_clear(self):
13161316
'clear', 'example_subdag_operator', '--no_confirm', '--exclude_subdags'])
13171317
cli.clear(args)
13181318

1319+
def test_parentdag_downstream_clear(self):
1320+
args = self.parser.parse_args([
1321+
'clear', 'example_subdag_operator.section-1', '--no_confirm'])
1322+
cli.clear(args)
1323+
args = self.parser.parse_args([
1324+
'clear', 'example_subdag_operator.section-1', '--no_confirm',
1325+
'--exclude_parentdag'])
1326+
cli.clear(args)
1327+
13191328
def test_get_dags(self):
1320-
dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator', '-c']))
1329+
dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator',
1330+
'-c']))
13211331
self.assertEqual(len(dags), 1)
13221332

13231333
dags = cli.get_dags(self.parser.parse_args(['clear', 'subdag', '-dx', '-c']))
@@ -1793,6 +1803,34 @@ def test_dag_views(self):
17931803
response = self.app.get(url)
17941804
self.assertIn("Wait a minute", response.data.decode('utf-8'))
17951805
response = self.app.get(url + "&confirmed=true")
1806+
url = (
1807+
"/admin/airflow/clear?task_id=section-1-task-1&"
1808+
"dag_id=example_subdag_operator.section-1&future=false&past=false&"
1809+
"upstream=false&downstream=true&recursive=true&"
1810+
"execution_date={}&"
1811+
"origin=/admin".format(DEFAULT_DATE_DS))
1812+
response = self.app.get(url)
1813+
self.assertIn("Wait a minute", response.data.decode('utf-8'))
1814+
self.assertIn("example_subdag_operator.end",
1815+
response.data.decode('utf-8'))
1816+
self.assertIn("example_subdag_operator.section-1.section-1-task-1",
1817+
response.data.decode('utf-8'))
1818+
self.assertIn("example_subdag_operator.section-1",
1819+
response.data.decode('utf-8'))
1820+
self.assertIn("example_subdag_operator.section-2",
1821+
response.data.decode('utf-8'))
1822+
self.assertIn("example_subdag_operator.section-2.section-2-task-1",
1823+
response.data.decode('utf-8'))
1824+
self.assertIn("example_subdag_operator.section-2.section-2-task-2",
1825+
response.data.decode('utf-8'))
1826+
self.assertIn("example_subdag_operator.section-2.section-2-task-3",
1827+
response.data.decode('utf-8'))
1828+
self.assertIn("example_subdag_operator.section-2.section-2-task-4",
1829+
response.data.decode('utf-8'))
1830+
self.assertIn("example_subdag_operator.section-2.section-2-task-5",
1831+
response.data.decode('utf-8'))
1832+
self.assertIn("example_subdag_operator.some-other-task",
1833+
response.data.decode('utf-8'))
17961834
url = (
17971835
"/admin/airflow/run?task_id=runme_0&"
17981836
"dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"

tests/jobs.py

+53
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,59 @@ def test_backfill_execute_subdag(self):
899899
subdag.clear()
900900
dag.clear()
901901

902+
def test_subdag_clear_parentdag_downstream_clear(self):
903+
dag = self.dagbag.get_dag('example_subdag_operator')
904+
subdag_op_task = dag.get_task('section-1')
905+
906+
subdag = subdag_op_task.subdag
907+
subdag.schedule_interval = '@daily'
908+
909+
executor = TestExecutor(do_update=True)
910+
job = BackfillJob(dag=subdag,
911+
start_date=DEFAULT_DATE,
912+
end_date=DEFAULT_DATE,
913+
executor=executor,
914+
donot_pickle=True)
915+
916+
with timeout(seconds=30):
917+
job.run()
918+
919+
ti0 = TI(
920+
task=subdag.get_task('section-1-task-1'),
921+
execution_date=DEFAULT_DATE)
922+
ti0.refresh_from_db()
923+
self.assertEqual(ti0.state, State.SUCCESS)
924+
925+
sdag = subdag.sub_dag(
926+
task_regex='section-1-task-1',
927+
include_downstream=True,
928+
include_upstream=False)
929+
930+
sdag.clear(
931+
start_date=DEFAULT_DATE,
932+
end_date=DEFAULT_DATE,
933+
include_parentdag=True)
934+
935+
ti0.refresh_from_db()
936+
self.assertEquals(State.NONE, ti0.state)
937+
938+
ti1 = TI(
939+
task=dag.get_task('some-other-task'),
940+
execution_date=DEFAULT_DATE)
941+
self.assertEquals(State.NONE, ti1.state)
942+
943+
# Checks that all the Downstream tasks for Parent DAG
944+
# have been cleared
945+
for task in subdag_op_task.downstream_list:
946+
ti = TI(
947+
task=dag.get_task(task.task_id),
948+
execution_date=DEFAULT_DATE
949+
)
950+
self.assertEquals(State.NONE, ti.state)
951+
952+
subdag.clear()
953+
dag.clear()
954+
902955
def test_backfill_execute_subdag_with_removed_task(self):
903956
"""
904957
Ensure that subdag operators execute properly in the case where

0 commit comments

Comments
 (0)