Skip to content

Commit 428a91d

Browse files
ms32035ashb
authored andcommitted
[AIRFLOW-3218] add support for poking a whole DAG (apache#4058)
Add support for poking a whole DAG
1 parent 3697a68 commit 428a91d

File tree

2 files changed

+45
-12
lines changed

2 files changed

+45
-12
lines changed

airflow/sensors/external_task_sensor.py

+21-11
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# specific language governing permissions and limitations
1818
# under the License.
1919

20-
from airflow.models import TaskInstance
20+
from airflow.models import TaskInstance, DagRun
2121
from airflow.sensors.base_sensor_operator import BaseSensorOperator
2222
from airflow.utils.db import provide_session
2323
from airflow.utils.decorators import apply_defaults
@@ -26,18 +26,18 @@
2626

2727
class ExternalTaskSensor(BaseSensorOperator):
2828
"""
29-
Waits for a task to complete in a different DAG
29+
Waits for a different DAG or a task in in a different DAG to complete
3030
3131
:param external_dag_id: The dag_id that contains the task you want to
3232
wait for
3333
:type external_dag_id: str
3434
:param external_task_id: The task_id that contains the task you want to
35-
wait for
35+
wait for. If ``None`` the sensor waits for the DAG
3636
:type external_task_id: str
3737
:param allowed_states: list of allowed states, default is ``['success']``
3838
:type allowed_states: list
3939
:param execution_delta: time difference with the previous execution to
40-
look at, the default is the same execution_date as the current task.
40+
look at, the default is the same execution_date as the current task or DAG.
4141
For yesterday, use [positive!] datetime.timedelta(days=1). Either
4242
execution_delta or execution_date_fn can be passed to
4343
ExternalTaskSensor, but not both.
@@ -102,13 +102,23 @@ def poke(self, context, session=None):
102102
'{self.external_dag_id}.'
103103
'{self.external_task_id} on '
104104
'{} ... '.format(serialized_dttm_filter, **locals()))
105-
TI = TaskInstance
106105

107-
count = session.query(TI).filter(
108-
TI.dag_id == self.external_dag_id,
109-
TI.task_id == self.external_task_id,
110-
TI.state.in_(self.allowed_states),
111-
TI.execution_date.in_(dttm_filter),
112-
).count()
106+
if self.external_task_id:
107+
TI = TaskInstance
108+
109+
count = session.query(TI).filter(
110+
TI.dag_id == self.external_dag_id,
111+
TI.task_id == self.external_task_id,
112+
TI.state.in_(self.allowed_states),
113+
TI.execution_date.in_(dttm_filter),
114+
).count()
115+
else:
116+
DR = DagRun
117+
count = session.query(DR).filter(
118+
DR.dag_id == self.external_dag_id,
119+
DR.state.in_(self.allowed_states),
120+
DR.execution_date.in_(dttm_filter),
121+
).count()
122+
113123
session.commit()
114124
return count == len(dttm_filter)

tests/sensors/test_external_task_sensor.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
# specific language governing permissions and limitations
1818
# under the License.
1919
import unittest
20-
2120
from datetime import timedelta, time
2221

2322
from airflow import DAG, configuration, settings
@@ -75,6 +74,30 @@ def test_external_task_sensor(self):
7574
ignore_ti_state=True
7675
)
7776

77+
def test_external_dag_sensor(self):
78+
79+
other_dag = DAG(
80+
'other_dag',
81+
default_args=self.args,
82+
end_date=DEFAULT_DATE,
83+
schedule_interval='@once')
84+
other_dag.create_dagrun(
85+
run_id='test',
86+
start_date=DEFAULT_DATE,
87+
execution_date=DEFAULT_DATE,
88+
state=State.SUCCESS)
89+
t = ExternalTaskSensor(
90+
task_id='test_external_dag_sensor_check',
91+
external_dag_id='other_dag',
92+
external_task_id=None,
93+
dag=self.dag
94+
)
95+
t.run(
96+
start_date=DEFAULT_DATE,
97+
end_date=DEFAULT_DATE,
98+
ignore_ti_state=True
99+
)
100+
78101
def test_templated_sensor(self):
79102
dag = DAG(TEST_DAG_ID, self.args)
80103

0 commit comments

Comments
 (0)