Skip to content

Commit 2c98059

Browse files
authored
[AIRFLOW-1195] Add feature to clear tasks in Parent Dag (#3907)
1 parent fde2090 commit 2c98059

File tree

6 files changed

+138
-7
lines changed

6 files changed

+138
-7
lines changed

airflow/bin/cli.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,9 @@ def clear(args):
705705
only_failed=args.only_failed,
706706
only_running=args.only_running,
707707
confirm_prompt=not args.no_confirm,
708-
include_subdags=not args.exclude_subdags)
708+
include_subdags=not args.exclude_subdags,
709+
include_parentdag=not args.exclude_parentdag,
710+
)
709711

710712

711713
def get_num_ready_workers_running(gunicorn_master_proc):
@@ -1604,6 +1606,10 @@ class CLIFactory(object):
16041606
'exclude_subdags': Arg(
16051607
("-x", "--exclude_subdags"),
16061608
"Exclude subdags", "store_true"),
1609+
'exclude_parentdag': Arg(
1610+
("-xp", "--exclude_parentdag"),
1611+
"Exclude ParentDAGS if the task cleared is a part of a SubDAG",
1612+
"store_true"),
16071613
'dag_regex': Arg(
16081614
("-dx", "--dag_regex"),
16091615
"Search dag_id as regex instead of exact string", "store_true"),
@@ -1936,7 +1942,7 @@ class CLIFactory(object):
19361942
'args': (
19371943
'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
19381944
'upstream', 'downstream', 'no_confirm', 'only_failed',
1939-
'only_running', 'exclude_subdags', 'dag_regex'),
1945+
'only_running', 'exclude_subdags', 'exclude_parentdag', 'dag_regex'),
19401946
}, {
19411947
'func': pause,
19421948
'help': "Pause a DAG",

airflow/models.py

+26
Original file line numberDiff line numberDiff line change
@@ -3798,9 +3798,11 @@ def clear(
37983798
only_running=False,
37993799
confirm_prompt=False,
38003800
include_subdags=True,
3801+
include_parentdag=True,
38013802
reset_dag_runs=True,
38023803
dry_run=False,
38033804
session=None,
3805+
get_tis=False,
38043806
):
38053807
"""
38063808
Clears a set of task instances associated with the current dag for
@@ -3821,6 +3823,25 @@ def clear(
38213823
tis = session.query(TI).filter(TI.dag_id == self.dag_id)
38223824
tis = tis.filter(TI.task_id.in_(self.task_ids))
38233825

3826+
if include_parentdag and self.is_subdag:
3827+
3828+
p_dag = self.parent_dag.sub_dag(
3829+
task_regex=self.dag_id.split('.')[1],
3830+
include_upstream=False,
3831+
include_downstream=True)
3832+
3833+
tis = tis.union(p_dag.clear(
3834+
start_date=start_date, end_date=end_date,
3835+
only_failed=only_failed,
3836+
only_running=only_running,
3837+
confirm_prompt=confirm_prompt,
3838+
include_subdags=include_subdags,
3839+
include_parentdag=False,
3840+
reset_dag_runs=reset_dag_runs,
3841+
get_tis=True,
3842+
session=session,
3843+
))
3844+
38243845
if start_date:
38253846
tis = tis.filter(TI.execution_date >= start_date)
38263847
if end_date:
@@ -3832,6 +3853,9 @@ def clear(
38323853
if only_running:
38333854
tis = tis.filter(TI.state == State.RUNNING)
38343855

3856+
if get_tis:
3857+
return tis
3858+
38353859
if dry_run:
38363860
tis = tis.all()
38373861
session.expunge_all()
@@ -3875,6 +3899,7 @@ def clear_dags(
38753899
only_running=False,
38763900
confirm_prompt=False,
38773901
include_subdags=True,
3902+
include_parentdag=False,
38783903
reset_dag_runs=True,
38793904
dry_run=False,
38803905
):
@@ -3887,6 +3912,7 @@ def clear_dags(
38873912
only_running=only_running,
38883913
confirm_prompt=False,
38893914
include_subdags=include_subdags,
3915+
include_parentdag=include_parentdag,
38903916
reset_dag_runs=reset_dag_runs,
38913917
dry_run=True)
38923918
all_tis.extend(tis)

airflow/www/views.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
11111111
count = dag.clear(
11121112
start_date=start_date,
11131113
end_date=end_date,
1114-
include_subdags=recursive)
1114+
include_subdags=recursive,
1115+
include_parentdag=recursive,
1116+
)
11151117

11161118
flash("{0} task instances have been cleared".format(count))
11171119
return redirect(origin)
@@ -1120,7 +1122,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
11201122
start_date=start_date,
11211123
end_date=end_date,
11221124
include_subdags=recursive,
1123-
dry_run=True)
1125+
dry_run=True,
1126+
include_parentdag=recursive,
1127+
)
11241128
if not tis:
11251129
flash("No task instances to clear", 'error')
11261130
response = redirect(origin)

airflow/www_rbac/views.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
837837
count = dag.clear(
838838
start_date=start_date,
839839
end_date=end_date,
840-
include_subdags=recursive)
840+
include_subdags=recursive,
841+
include_parentdag=recursive,
842+
)
841843

842844
flash("{0} task instances have been cleared".format(count))
843845
return redirect(origin)
@@ -846,7 +848,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
846848
start_date=start_date,
847849
end_date=end_date,
848850
include_subdags=recursive,
849-
dry_run=True)
851+
include_parentdag=recursive,
852+
dry_run=True,
853+
)
850854
if not tis:
851855
flash("No task instances to clear", 'error')
852856
response = redirect(origin)

tests/core.py

+39-1
Original file line numberDiff line numberDiff line change
@@ -1429,8 +1429,18 @@ def test_subdag_clear(self):
14291429
'clear', 'example_subdag_operator', '--no_confirm', '--exclude_subdags'])
14301430
cli.clear(args)
14311431

1432+
def test_parentdag_downstream_clear(self):
1433+
args = self.parser.parse_args([
1434+
'clear', 'example_subdag_operator.section-1', '--no_confirm'])
1435+
cli.clear(args)
1436+
args = self.parser.parse_args([
1437+
'clear', 'example_subdag_operator.section-1', '--no_confirm',
1438+
'--exclude_parentdag'])
1439+
cli.clear(args)
1440+
14321441
def test_get_dags(self):
1433-
dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator', '-c']))
1442+
dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator',
1443+
'-c']))
14341444
self.assertEqual(len(dags), 1)
14351445

14361446
dags = cli.get_dags(self.parser.parse_args(['clear', 'subdag', '-dx', '-c']))
@@ -1942,6 +1952,34 @@ def test_dag_views(self):
19421952
response = self.app.get(url)
19431953
self.assertIn("Wait a minute", response.data.decode('utf-8'))
19441954
response = self.app.get(url + "&confirmed=true")
1955+
url = (
1956+
"/admin/airflow/clear?task_id=section-1-task-1&"
1957+
"dag_id=example_subdag_operator.section-1&future=false&past=false&"
1958+
"upstream=false&downstream=true&recursive=true&"
1959+
"execution_date={}&"
1960+
"origin=/admin".format(DEFAULT_DATE_DS))
1961+
response = self.app.get(url)
1962+
self.assertIn("Wait a minute", response.data.decode('utf-8'))
1963+
self.assertIn("example_subdag_operator.end",
1964+
response.data.decode('utf-8'))
1965+
self.assertIn("example_subdag_operator.section-1.section-1-task-1",
1966+
response.data.decode('utf-8'))
1967+
self.assertIn("example_subdag_operator.section-1",
1968+
response.data.decode('utf-8'))
1969+
self.assertIn("example_subdag_operator.section-2",
1970+
response.data.decode('utf-8'))
1971+
self.assertIn("example_subdag_operator.section-2.section-2-task-1",
1972+
response.data.decode('utf-8'))
1973+
self.assertIn("example_subdag_operator.section-2.section-2-task-2",
1974+
response.data.decode('utf-8'))
1975+
self.assertIn("example_subdag_operator.section-2.section-2-task-3",
1976+
response.data.decode('utf-8'))
1977+
self.assertIn("example_subdag_operator.section-2.section-2-task-4",
1978+
response.data.decode('utf-8'))
1979+
self.assertIn("example_subdag_operator.section-2.section-2-task-5",
1980+
response.data.decode('utf-8'))
1981+
self.assertIn("example_subdag_operator.some-other-task",
1982+
response.data.decode('utf-8'))
19451983
url = (
19461984
"/admin/airflow/run?task_id=runme_0&"
19471985
"dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"

tests/jobs.py

+53
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,59 @@ def test_backfill_execute_subdag(self):
901901
subdag.clear()
902902
dag.clear()
903903

904+
def test_subdag_clear_parentdag_downstream_clear(self):
905+
dag = self.dagbag.get_dag('example_subdag_operator')
906+
subdag_op_task = dag.get_task('section-1')
907+
908+
subdag = subdag_op_task.subdag
909+
subdag.schedule_interval = '@daily'
910+
911+
executor = TestExecutor(do_update=True)
912+
job = BackfillJob(dag=subdag,
913+
start_date=DEFAULT_DATE,
914+
end_date=DEFAULT_DATE,
915+
executor=executor,
916+
donot_pickle=True)
917+
918+
with timeout(seconds=30):
919+
job.run()
920+
921+
ti0 = TI(
922+
task=subdag.get_task('section-1-task-1'),
923+
execution_date=DEFAULT_DATE)
924+
ti0.refresh_from_db()
925+
self.assertEqual(ti0.state, State.SUCCESS)
926+
927+
sdag = subdag.sub_dag(
928+
task_regex='section-1-task-1',
929+
include_downstream=True,
930+
include_upstream=False)
931+
932+
sdag.clear(
933+
start_date=DEFAULT_DATE,
934+
end_date=DEFAULT_DATE,
935+
include_parentdag=True)
936+
937+
ti0.refresh_from_db()
938+
self.assertEquals(State.NONE, ti0.state)
939+
940+
ti1 = TI(
941+
task=dag.get_task('some-other-task'),
942+
execution_date=DEFAULT_DATE)
943+
self.assertEquals(State.NONE, ti1.state)
944+
945+
# Checks that all the Downstream tasks for Parent DAG
946+
# have been cleared
947+
for task in subdag_op_task.downstream_list:
948+
ti = TI(
949+
task=dag.get_task(task.task_id),
950+
execution_date=DEFAULT_DATE
951+
)
952+
self.assertEquals(State.NONE, ti.state)
953+
954+
subdag.clear()
955+
dag.clear()
956+
904957
def test_backfill_execute_subdag_with_removed_task(self):
905958
"""
906959
Ensure that subdag operators execute properly in the case where

0 commit comments

Comments
 (0)