diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index fd8765588a602..fb9ddbe2b003c 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -705,7 +705,9 @@ def clear(args): only_failed=args.only_failed, only_running=args.only_running, confirm_prompt=not args.no_confirm, - include_subdags=not args.exclude_subdags) + include_subdags=not args.exclude_subdags, + include_parentdag=not args.exclude_parentdag, + ) def get_num_ready_workers_running(gunicorn_master_proc): @@ -1604,6 +1606,10 @@ class CLIFactory(object): 'exclude_subdags': Arg( ("-x", "--exclude_subdags"), "Exclude subdags", "store_true"), + 'exclude_parentdag': Arg( + ("-xp", "--exclude_parentdag"), + "Exclude ParentDAGS if the task cleared is a part of a SubDAG", + "store_true"), 'dag_regex': Arg( ("-dx", "--dag_regex"), "Search dag_id as regex instead of exact string", "store_true"), @@ -1936,7 +1942,7 @@ class CLIFactory(object): 'args': ( 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir', 'upstream', 'downstream', 'no_confirm', 'only_failed', - 'only_running', 'exclude_subdags', 'dag_regex'), + 'only_running', 'exclude_subdags', 'exclude_parentdag', 'dag_regex'), }, { 'func': pause, 'help': "Pause a DAG", diff --git a/airflow/models.py b/airflow/models.py index d703810a779f4..1e4949e5639ad 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3798,9 +3798,11 @@ def clear( only_running=False, confirm_prompt=False, include_subdags=True, + include_parentdag=True, reset_dag_runs=True, dry_run=False, session=None, + get_tis=False, ): """ Clears a set of task instances associated with the current dag for @@ -3821,6 +3823,25 @@ def clear( tis = session.query(TI).filter(TI.dag_id == self.dag_id) tis = tis.filter(TI.task_id.in_(self.task_ids)) + if include_parentdag and self.is_subdag: + + p_dag = self.parent_dag.sub_dag( + task_regex=self.dag_id.split('.')[1], + include_upstream=False, + include_downstream=True) + + tis = tis.union(p_dag.clear( + start_date=start_date, end_date=end_date, + only_failed=only_failed, + only_running=only_running, + confirm_prompt=confirm_prompt, + include_subdags=include_subdags, + include_parentdag=False, + reset_dag_runs=reset_dag_runs, + get_tis=True, + session=session, + )) + if start_date: tis = tis.filter(TI.execution_date >= start_date) if end_date: @@ -3832,6 +3853,9 @@ def clear( if only_running: tis = tis.filter(TI.state == State.RUNNING) + if get_tis: + return tis + if dry_run: tis = tis.all() session.expunge_all() @@ -3875,6 +3899,7 @@ def clear_dags( only_running=False, confirm_prompt=False, include_subdags=True, + include_parentdag=False, reset_dag_runs=True, dry_run=False, ): @@ -3887,6 +3912,7 @@ def clear_dags( only_running=only_running, confirm_prompt=False, include_subdags=include_subdags, + include_parentdag=include_parentdag, reset_dag_runs=reset_dag_runs, dry_run=True) all_tis.extend(tis) diff --git a/airflow/www/views.py b/airflow/www/views.py index aa2530e45827a..be11b11376047 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1111,7 +1111,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin, count = dag.clear( start_date=start_date, end_date=end_date, - include_subdags=recursive) + include_subdags=recursive, + include_parentdag=recursive, + ) flash("{0} task instances have been cleared".format(count)) return redirect(origin) @@ -1120,7 +1122,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin, start_date=start_date, end_date=end_date, include_subdags=recursive, - dry_run=True) + dry_run=True, + include_parentdag=recursive, + ) if not tis: flash("No task instances to clear", 'error') response = redirect(origin) diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index 3dc3400968812..38835998e8823 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -837,7 +837,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin, count = dag.clear( start_date=start_date, end_date=end_date, - include_subdags=recursive) + include_subdags=recursive, + include_parentdag=recursive, + ) flash("{0} task instances have been cleared".format(count)) return redirect(origin) @@ -846,7 +848,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin, start_date=start_date, end_date=end_date, include_subdags=recursive, - dry_run=True) + include_parentdag=recursive, + dry_run=True, + ) if not tis: flash("No task instances to clear", 'error') response = redirect(origin) diff --git a/tests/core.py b/tests/core.py index a517070614371..b937178a9ef8c 100644 --- a/tests/core.py +++ b/tests/core.py @@ -1429,8 +1429,18 @@ def test_subdag_clear(self): 'clear', 'example_subdag_operator', '--no_confirm', '--exclude_subdags']) cli.clear(args) + def test_parentdag_downstream_clear(self): + args = self.parser.parse_args([ + 'clear', 'example_subdag_operator.section-1', '--no_confirm']) + cli.clear(args) + args = self.parser.parse_args([ + 'clear', 'example_subdag_operator.section-1', '--no_confirm', + '--exclude_parentdag']) + cli.clear(args) + def test_get_dags(self): - dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator', '-c'])) + dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator', + '-c'])) self.assertEqual(len(dags), 1) dags = cli.get_dags(self.parser.parse_args(['clear', 'subdag', '-dx', '-c'])) @@ -1942,6 +1952,34 @@ def test_dag_views(self): response = self.app.get(url) self.assertIn("Wait a minute", response.data.decode('utf-8')) response = self.app.get(url + "&confirmed=true") + url = ( + "/admin/airflow/clear?task_id=section-1-task-1&" + "dag_id=example_subdag_operator.section-1&future=false&past=false&" + "upstream=false&downstream=true&recursive=true&" + "execution_date={}&" + "origin=/admin".format(DEFAULT_DATE_DS)) + response = self.app.get(url) + self.assertIn("Wait a minute", response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.end", + response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.section-1.section-1-task-1", + response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.section-1", + response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.section-2", + response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.section-2.section-2-task-1", + response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.section-2.section-2-task-2", + response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.section-2.section-2-task-3", + response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.section-2.section-2-task-4", + response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.section-2.section-2-task-5", + response.data.decode('utf-8')) + self.assertIn("example_subdag_operator.some-other-task", + response.data.decode('utf-8')) url = ( "/admin/airflow/run?task_id=runme_0&" "dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&" diff --git a/tests/jobs.py b/tests/jobs.py index dc3381e8e035f..e18a87e6cf12d 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -901,6 +901,59 @@ def test_backfill_execute_subdag(self): subdag.clear() dag.clear() + def test_subdag_clear_parentdag_downstream_clear(self): + dag = self.dagbag.get_dag('example_subdag_operator') + subdag_op_task = dag.get_task('section-1') + + subdag = subdag_op_task.subdag + subdag.schedule_interval = '@daily' + + executor = TestExecutor(do_update=True) + job = BackfillJob(dag=subdag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + executor=executor, + donot_pickle=True) + + with timeout(seconds=30): + job.run() + + ti0 = TI( + task=subdag.get_task('section-1-task-1'), + execution_date=DEFAULT_DATE) + ti0.refresh_from_db() + self.assertEqual(ti0.state, State.SUCCESS) + + sdag = subdag.sub_dag( + task_regex='section-1-task-1', + include_downstream=True, + include_upstream=False) + + sdag.clear( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + include_parentdag=True) + + ti0.refresh_from_db() + self.assertEquals(State.NONE, ti0.state) + + ti1 = TI( + task=dag.get_task('some-other-task'), + execution_date=DEFAULT_DATE) + self.assertEquals(State.NONE, ti1.state) + + # Checks that all the Downstream tasks for Parent DAG + # have been cleared + for task in subdag_op_task.downstream_list: + ti = TI( + task=dag.get_task(task.task_id), + execution_date=DEFAULT_DATE + ) + self.assertEquals(State.NONE, ti.state) + + subdag.clear() + dag.clear() + def test_backfill_execute_subdag_with_removed_task(self): """ Ensure that subdag operators execute properly in the case where