Skip to content

Commit eb94fcd

Browse files
kaxilChris Fei
authored and
Chris Fei
committed
Add feature to clear tasks in Parent Dag (apache#3907)
1 parent ed9fc3f commit eb94fcd

File tree

6 files changed

+138
-7
lines changed

6 files changed

+138
-7
lines changed

airflow/bin/cli.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,9 @@ def clear(args):
635635
only_failed=args.only_failed,
636636
only_running=args.only_running,
637637
confirm_prompt=not args.no_confirm,
638-
include_subdags=not args.exclude_subdags)
638+
include_subdags=not args.exclude_subdags,
639+
include_parentdag=not args.exclude_parentdag,
640+
)
639641

640642

641643
def get_num_ready_workers_running(gunicorn_master_proc):
@@ -1421,6 +1423,10 @@ class CLIFactory(object):
14211423
'exclude_subdags': Arg(
14221424
("-x", "--exclude_subdags"),
14231425
"Exclude subdags", "store_true"),
1426+
'exclude_parentdag': Arg(
1427+
("-xp", "--exclude_parentdag"),
1428+
"Exclude ParentDAGS if the task cleared is a part of a SubDAG",
1429+
"store_true"),
14241430
'dag_regex': Arg(
14251431
("-dx", "--dag_regex"),
14261432
"Search dag_id as regex instead of exact string", "store_true"),
@@ -1740,7 +1746,7 @@ class CLIFactory(object):
17401746
'args': (
17411747
'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
17421748
'upstream', 'downstream', 'no_confirm', 'only_failed',
1743-
'only_running', 'exclude_subdags', 'dag_regex'),
1749+
'only_running', 'exclude_subdags', 'exclude_parentdag', 'dag_regex'),
17441750
}, {
17451751
'func': pause,
17461752
'help': "Pause a DAG",

airflow/models.py

+26
Original file line numberDiff line numberDiff line change
@@ -3827,9 +3827,11 @@ def clear(
38273827
only_running=False,
38283828
confirm_prompt=False,
38293829
include_subdags=True,
3830+
include_parentdag=True,
38303831
reset_dag_runs=True,
38313832
dry_run=False,
38323833
session=None,
3834+
get_tis=False,
38333835
):
38343836
"""
38353837
Clears a set of task instances associated with the current dag for
@@ -3850,6 +3852,25 @@ def clear(
38503852
tis = session.query(TI).filter(TI.dag_id == self.dag_id)
38513853
tis = tis.filter(TI.task_id.in_(self.task_ids))
38523854

3855+
if include_parentdag and self.is_subdag:
3856+
3857+
p_dag = self.parent_dag.sub_dag(
3858+
task_regex=self.dag_id.split('.')[1],
3859+
include_upstream=False,
3860+
include_downstream=True)
3861+
3862+
tis = tis.union(p_dag.clear(
3863+
start_date=start_date, end_date=end_date,
3864+
only_failed=only_failed,
3865+
only_running=only_running,
3866+
confirm_prompt=confirm_prompt,
3867+
include_subdags=include_subdags,
3868+
include_parentdag=False,
3869+
reset_dag_runs=reset_dag_runs,
3870+
get_tis=True,
3871+
session=session,
3872+
))
3873+
38533874
if start_date:
38543875
tis = tis.filter(TI.execution_date >= start_date)
38553876
if end_date:
@@ -3859,6 +3880,9 @@ def clear(
38593880
if only_running:
38603881
tis = tis.filter(TI.state == State.RUNNING)
38613882

3883+
if get_tis:
3884+
return tis
3885+
38623886
if dry_run:
38633887
tis = tis.all()
38643888
session.expunge_all()
@@ -3902,6 +3926,7 @@ def clear_dags(
39023926
only_running=False,
39033927
confirm_prompt=False,
39043928
include_subdags=True,
3929+
include_parentdag=False,
39053930
reset_dag_runs=True,
39063931
dry_run=False,
39073932
):
@@ -3914,6 +3939,7 @@ def clear_dags(
39143939
only_running=only_running,
39153940
confirm_prompt=False,
39163941
include_subdags=include_subdags,
3942+
include_parentdag=include_parentdag,
39173943
reset_dag_runs=reset_dag_runs,
39183944
dry_run=True)
39193945
all_tis.extend(tis)

airflow/www/views.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
10851085
count = dag.clear(
10861086
start_date=start_date,
10871087
end_date=end_date,
1088-
include_subdags=recursive)
1088+
include_subdags=recursive,
1089+
include_parentdag=recursive,
1090+
)
10891091

10901092
flash("{0} task instances have been cleared".format(count))
10911093
return redirect(origin)
@@ -1094,7 +1096,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
10941096
start_date=start_date,
10951097
end_date=end_date,
10961098
include_subdags=recursive,
1097-
dry_run=True)
1099+
dry_run=True,
1100+
include_parentdag=recursive,
1101+
)
10981102
if not tis:
10991103
flash("No task instances to clear", 'error')
11001104
response = redirect(origin)

airflow/www_rbac/views.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
768768
count = dag.clear(
769769
start_date=start_date,
770770
end_date=end_date,
771-
include_subdags=recursive)
771+
include_subdags=recursive,
772+
include_parentdag=recursive,
773+
)
772774

773775
flash("{0} task instances have been cleared".format(count))
774776
return redirect(origin)
@@ -777,7 +779,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
777779
start_date=start_date,
778780
end_date=end_date,
779781
include_subdags=recursive,
780-
dry_run=True)
782+
include_parentdag=recursive,
783+
dry_run=True,
784+
)
781785
if not tis:
782786
flash("No task instances to clear", 'error')
783787
response = redirect(origin)

tests/core.py

+39-1
Original file line numberDiff line numberDiff line change
@@ -1316,8 +1316,18 @@ def test_subdag_clear(self):
13161316
'clear', 'example_subdag_operator', '--no_confirm', '--exclude_subdags'])
13171317
cli.clear(args)
13181318

1319+
def test_parentdag_downstream_clear(self):
1320+
args = self.parser.parse_args([
1321+
'clear', 'example_subdag_operator.section-1', '--no_confirm'])
1322+
cli.clear(args)
1323+
args = self.parser.parse_args([
1324+
'clear', 'example_subdag_operator.section-1', '--no_confirm',
1325+
'--exclude_parentdag'])
1326+
cli.clear(args)
1327+
13191328
def test_get_dags(self):
1320-
dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator', '-c']))
1329+
dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator',
1330+
'-c']))
13211331
self.assertEqual(len(dags), 1)
13221332

13231333
dags = cli.get_dags(self.parser.parse_args(['clear', 'subdag', '-dx', '-c']))
@@ -1793,6 +1803,34 @@ def test_dag_views(self):
17931803
response = self.app.get(url)
17941804
self.assertIn("Wait a minute", response.data.decode('utf-8'))
17951805
response = self.app.get(url + "&confirmed=true")
1806+
url = (
1807+
"/admin/airflow/clear?task_id=section-1-task-1&"
1808+
"dag_id=example_subdag_operator.section-1&future=false&past=false&"
1809+
"upstream=false&downstream=true&recursive=true&"
1810+
"execution_date={}&"
1811+
"origin=/admin".format(DEFAULT_DATE_DS))
1812+
response = self.app.get(url)
1813+
self.assertIn("Wait a minute", response.data.decode('utf-8'))
1814+
self.assertIn("example_subdag_operator.end",
1815+
response.data.decode('utf-8'))
1816+
self.assertIn("example_subdag_operator.section-1.section-1-task-1",
1817+
response.data.decode('utf-8'))
1818+
self.assertIn("example_subdag_operator.section-1",
1819+
response.data.decode('utf-8'))
1820+
self.assertIn("example_subdag_operator.section-2",
1821+
response.data.decode('utf-8'))
1822+
self.assertIn("example_subdag_operator.section-2.section-2-task-1",
1823+
response.data.decode('utf-8'))
1824+
self.assertIn("example_subdag_operator.section-2.section-2-task-2",
1825+
response.data.decode('utf-8'))
1826+
self.assertIn("example_subdag_operator.section-2.section-2-task-3",
1827+
response.data.decode('utf-8'))
1828+
self.assertIn("example_subdag_operator.section-2.section-2-task-4",
1829+
response.data.decode('utf-8'))
1830+
self.assertIn("example_subdag_operator.section-2.section-2-task-5",
1831+
response.data.decode('utf-8'))
1832+
self.assertIn("example_subdag_operator.some-other-task",
1833+
response.data.decode('utf-8'))
17961834
url = (
17971835
"/admin/airflow/run?task_id=runme_0&"
17981836
"dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"

tests/jobs.py

+53
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,59 @@ def test_backfill_execute_subdag(self):
899899
subdag.clear()
900900
dag.clear()
901901

902+
def test_subdag_clear_parentdag_downstream_clear(self):
903+
dag = self.dagbag.get_dag('example_subdag_operator')
904+
subdag_op_task = dag.get_task('section-1')
905+
906+
subdag = subdag_op_task.subdag
907+
subdag.schedule_interval = '@daily'
908+
909+
executor = TestExecutor(do_update=True)
910+
job = BackfillJob(dag=subdag,
911+
start_date=DEFAULT_DATE,
912+
end_date=DEFAULT_DATE,
913+
executor=executor,
914+
donot_pickle=True)
915+
916+
with timeout(seconds=30):
917+
job.run()
918+
919+
ti0 = TI(
920+
task=subdag.get_task('section-1-task-1'),
921+
execution_date=DEFAULT_DATE)
922+
ti0.refresh_from_db()
923+
self.assertEqual(ti0.state, State.SUCCESS)
924+
925+
sdag = subdag.sub_dag(
926+
task_regex='section-1-task-1',
927+
include_downstream=True,
928+
include_upstream=False)
929+
930+
sdag.clear(
931+
start_date=DEFAULT_DATE,
932+
end_date=DEFAULT_DATE,
933+
include_parentdag=True)
934+
935+
ti0.refresh_from_db()
936+
self.assertEquals(State.NONE, ti0.state)
937+
938+
ti1 = TI(
939+
task=dag.get_task('some-other-task'),
940+
execution_date=DEFAULT_DATE)
941+
self.assertEquals(State.NONE, ti1.state)
942+
943+
# Checks that all the Downstream tasks for Parent DAG
944+
# have been cleared
945+
for task in subdag_op_task.downstream_list:
946+
ti = TI(
947+
task=dag.get_task(task.task_id),
948+
execution_date=DEFAULT_DATE
949+
)
950+
self.assertEquals(State.NONE, ti.state)
951+
952+
subdag.clear()
953+
dag.clear()
954+
902955
def test_backfill_execute_subdag_with_removed_task(self):
903956
"""
904957
Ensure that subdag operators execute properly in the case where

0 commit comments

Comments
 (0)