Skip to content

Commit e232f4d

Browse files
dima-asanaFokko Driesprong
authored and
Fokko Driesprong
committed
[AIRFLOW-1837] Respect task start_date when different from dag's (apache#4010)
Currently task instances get created and scheduled based on the DAG's start date rather than their own. This commit adds a check before creating a task instance to see that the start date is not after the execution date.
1 parent 2444ed2 commit e232f4d

File tree

5 files changed

+91
-30
lines changed

5 files changed

+91
-30
lines changed

airflow/models.py

+2
Original file line numberDiff line numberDiff line change
@@ -5153,6 +5153,8 @@ def verify_integrity(self, session=None):
51535153
for task in six.itervalues(dag.task_dict):
51545154
if task.adhoc:
51555155
continue
5156+
if task.start_date > self.execution_date and not self.is_backfill:
5157+
continue
51565158

51575159
if task.task_id not in task_ids:
51585160
ti = TaskInstance(task, self.execution_date)

tests/core.py

+29-14
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
from airflow.utils import timezone
6464
from airflow.utils.timezone import datetime
6565
from airflow.utils.state import State
66-
from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
66+
from airflow.utils.dates import days_ago, infer_time_unit, round_time, scale_time_units
6767
from lxml import html
6868
from airflow.exceptions import AirflowException
6969
from airflow.configuration import AirflowConfigException, run_command
@@ -81,6 +81,7 @@
8181
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
8282
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
8383
TEST_DAG_ID = 'unit_tests'
84+
EXAMPLE_DAG_DEFAULT_DATE = days_ago(2)
8485

8586
try:
8687
import cPickle as pickle
@@ -1651,21 +1652,21 @@ def setUp(self):
16511652

16521653
self.dagrun_python = self.dag_python.create_dagrun(
16531654
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
1654-
execution_date=DEFAULT_DATE,
1655+
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
16551656
start_date=timezone.utcnow(),
16561657
state=State.RUNNING
16571658
)
16581659

16591660
self.sub_dag.create_dagrun(
16601661
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
1661-
execution_date=DEFAULT_DATE,
1662+
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
16621663
start_date=timezone.utcnow(),
16631664
state=State.RUNNING
16641665
)
16651666

16661667
self.example_xcom.create_dagrun(
16671668
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
1668-
execution_date=DEFAULT_DATE,
1669+
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
16691670
start_date=timezone.utcnow(),
16701671
state=State.RUNNING
16711672
)
@@ -1758,7 +1759,7 @@ def test_dag_views(self):
17581759
response = self.app.get(
17591760
'/admin/airflow/task?'
17601761
'task_id=runme_0&dag_id=example_bash_operator&'
1761-
'execution_date={}'.format(DEFAULT_DATE_DS))
1762+
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
17621763
self.assertIn("Attributes", response.data.decode('utf-8'))
17631764
response = self.app.get(
17641765
'/admin/airflow/dag_stats')
@@ -1770,22 +1771,21 @@ def test_dag_views(self):
17701771
"/admin/airflow/success?task_id=print_the_context&"
17711772
"dag_id=example_python_operator&upstream=false&downstream=false&"
17721773
"future=false&past=false&execution_date={}&"
1773-
"origin=/admin".format(DEFAULT_DATE_DS))
1774+
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
17741775
response = self.app.get(url)
17751776
self.assertIn("Wait a minute", response.data.decode('utf-8'))
1776-
response = self.app.get(url + "&confirmed=true")
17771777
response = self.app.get(
17781778
'/admin/airflow/clear?task_id=print_the_context&'
17791779
'dag_id=example_python_operator&future=true&past=false&'
17801780
'upstream=true&downstream=false&'
17811781
'execution_date={}&'
1782-
'origin=/admin'.format(DEFAULT_DATE_DS))
1782+
'origin=/admin'.format(EXAMPLE_DAG_DEFAULT_DATE))
17831783
self.assertIn("Wait a minute", response.data.decode('utf-8'))
17841784
url = (
17851785
"/admin/airflow/success?task_id=section-1&"
17861786
"dag_id=example_subdag_operator&upstream=true&downstream=true&"
17871787
"future=false&past=false&execution_date={}&"
1788-
"origin=/admin".format(DEFAULT_DATE_DS))
1788+
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
17891789
response = self.app.get(url)
17901790
self.assertIn("Wait a minute", response.data.decode('utf-8'))
17911791
self.assertIn("section-1-task-1", response.data.decode('utf-8'))
@@ -1799,7 +1799,7 @@ def test_dag_views(self):
17991799
"dag_id=example_python_operator&future=false&past=false&"
18001800
"upstream=false&downstream=true&"
18011801
"execution_date={}&"
1802-
"origin=/admin".format(DEFAULT_DATE_DS))
1802+
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
18031803
response = self.app.get(url)
18041804
self.assertIn("Wait a minute", response.data.decode('utf-8'))
18051805
response = self.app.get(url + "&confirmed=true")
@@ -1808,7 +1808,7 @@ def test_dag_views(self):
18081808
"dag_id=example_subdag_operator.section-1&future=false&past=false&"
18091809
"upstream=false&downstream=true&recursive=true&"
18101810
"execution_date={}&"
1811-
"origin=/admin".format(DEFAULT_DATE_DS))
1811+
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
18121812
response = self.app.get(url)
18131813
self.assertIn("Wait a minute", response.data.decode('utf-8'))
18141814
self.assertIn("example_subdag_operator.end",
@@ -1835,7 +1835,7 @@ def test_dag_views(self):
18351835
"/admin/airflow/run?task_id=runme_0&"
18361836
"dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"
18371837
"ignore_task_deps=true&execution_date={}&"
1838-
"origin=/admin".format(DEFAULT_DATE_DS))
1838+
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
18391839
response = self.app.get(url)
18401840
response = self.app.get(
18411841
"/admin/airflow/refresh?dag_id=example_bash_operator")
@@ -1870,13 +1870,28 @@ def test_fetch_task_instance(self):
18701870
url = (
18711871
"/admin/airflow/object/task_instances?"
18721872
"dag_id=example_python_operator&"
1873-
"execution_date={}".format(DEFAULT_DATE_DS))
1873+
"execution_date={}".format(EXAMPLE_DAG_DEFAULT_DATE))
18741874
response = self.app.get(url)
18751875
self.assertIn("print_the_context", response.data.decode('utf-8'))
18761876

1877+
def test_dag_view_task_with_python_operator_using_partial(self):
1878+
response = self.app.get(
1879+
'/admin/airflow/task?'
1880+
'task_id=test_dagrun_functool_partial&dag_id=test_task_view_type_check&'
1881+
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
1882+
self.assertIn("A function with two args", response.data.decode('utf-8'))
1883+
1884+
def test_dag_view_task_with_python_operator_using_instance(self):
1885+
response = self.app.get(
1886+
'/admin/airflow/task?'
1887+
'task_id=test_dagrun_instance&dag_id=test_task_view_type_check&'
1888+
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
1889+
self.assertIn("A __call__ method", response.data.decode('utf-8'))
1890+
18771891
def tearDown(self):
18781892
configuration.conf.set("webserver", "expose_config", "False")
1879-
self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow())
1893+
self.dag_bash.clear(start_date=EXAMPLE_DAG_DEFAULT_DATE,
1894+
end_date=timezone.utcnow())
18801895
session = Session()
18811896
session.query(models.DagRun).delete()
18821897
session.query(models.TaskInstance).delete()

tests/dags/test_scheduler_dags.py

+19-3
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,34 @@
1717
# specific language governing permissions and limitations
1818
# under the License.
1919

20-
from datetime import datetime
20+
from datetime import datetime, timedelta
2121

2222
from airflow.models import DAG
2323
from airflow.operators.dummy_operator import DummyOperator
24-
DEFAULT_DATE = datetime(2100, 1, 1)
24+
DEFAULT_DATE = datetime(2016, 1, 1)
2525

2626
# DAG tests backfill with pooled tasks
2727
# Previously backfill would queue the task but never run it
2828
dag1 = DAG(
2929
dag_id='test_start_date_scheduling',
30-
start_date=datetime(2100, 1, 1))
30+
start_date=datetime.utcnow() + timedelta(days=1))
3131
dag1_task1 = DummyOperator(
3232
task_id='dummy',
3333
dag=dag1,
3434
owner='airflow')
35+
36+
dag2 = DAG(
37+
dag_id='test_task_start_date_scheduling',
38+
start_date=DEFAULT_DATE
39+
)
40+
dag2_task1 = DummyOperator(
41+
task_id='dummy1',
42+
dag=dag2,
43+
owner='airflow',
44+
start_date=DEFAULT_DATE + timedelta(days=3)
45+
)
46+
dag2_task2 = DummyOperator(
47+
task_id='dummy2',
48+
dag=dag2,
49+
owner='airflow'
50+
)

tests/jobs.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -2209,7 +2209,7 @@ def test_scheduler_start_date(self):
22092209
dag_id = 'test_start_date_scheduling'
22102210
dag = self.dagbag.get_dag(dag_id)
22112211
dag.clear()
2212-
self.assertTrue(dag.start_date > DEFAULT_DATE)
2212+
self.assertTrue(dag.start_date > datetime.datetime.utcnow())
22132213

22142214
scheduler = SchedulerJob(dag_id,
22152215
num_runs=2)
@@ -2244,6 +2244,27 @@ def test_scheduler_start_date(self):
22442244
self.assertEqual(
22452245
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
22462246

2247+
def test_scheduler_task_start_date(self):
2248+
"""
2249+
Test that the scheduler respects task start dates that are different
2250+
from DAG start dates
2251+
"""
2252+
dag_id = 'test_task_start_date_scheduling'
2253+
dag = self.dagbag.get_dag(dag_id)
2254+
dag.clear()
2255+
scheduler = SchedulerJob(dag_id,
2256+
num_runs=2)
2257+
scheduler.run()
2258+
2259+
session = settings.Session()
2260+
tiq = session.query(TI).filter(TI.dag_id == dag_id)
2261+
ti1s = tiq.filter(TI.task_id == 'dummy1').all()
2262+
ti2s = tiq.filter(TI.task_id == 'dummy2').all()
2263+
self.assertEqual(len(ti1s), 0)
2264+
self.assertEqual(len(ti2s), 2)
2265+
for t in ti2s:
2266+
self.assertEqual(t.state, State.SUCCESS)
2267+
22472268
def test_scheduler_multiprocessing(self):
22482269
"""
22492270
Test that the scheduler can successfully queue multiple dags in parallel

tests/www_rbac/test_views.py

+19-12
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
from airflow.models import DAG, DagRun, TaskInstance
4141
from airflow.operators.dummy_operator import DummyOperator
4242
from airflow.settings import Session
43-
from airflow.utils import timezone
43+
from airflow.utils import dates, timezone
4444
from airflow.utils.state import State
4545
from airflow.utils.timezone import datetime
4646
from airflow.www_rbac import app as application
@@ -263,8 +263,8 @@ def test_mount(self):
263263

264264

265265
class TestAirflowBaseViews(TestBase):
266-
default_date = timezone.datetime(2018, 3, 1)
267-
run_id = "test_{}".format(models.DagRun.id_for_date(default_date))
266+
EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
267+
run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE))
268268

269269
def setUp(self):
270270
super(TestAirflowBaseViews, self).setUp()
@@ -291,19 +291,19 @@ def prepare_dagruns(self):
291291

292292
self.bash_dagrun = self.bash_dag.create_dagrun(
293293
run_id=self.run_id,
294-
execution_date=self.default_date,
294+
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
295295
start_date=timezone.utcnow(),
296296
state=State.RUNNING)
297297

298298
self.sub_dagrun = self.sub_dag.create_dagrun(
299299
run_id=self.run_id,
300-
execution_date=self.default_date,
300+
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
301301
start_date=timezone.utcnow(),
302302
state=State.RUNNING)
303303

304304
self.xcom_dagrun = self.xcom_dag.create_dagrun(
305305
run_id=self.run_id,
306-
execution_date=self.default_date,
306+
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
307307
start_date=timezone.utcnow(),
308308
state=State.RUNNING)
309309

@@ -321,19 +321,19 @@ def test_home(self):
321321

322322
def test_task(self):
323323
url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
324-
.format(self.percent_encode(self.default_date)))
324+
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
325325
resp = self.client.get(url, follow_redirects=True)
326326
self.check_content_in_response('Task Instance Details', resp)
327327

328328
def test_xcom(self):
329329
url = ('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
330-
.format(self.percent_encode(self.default_date)))
330+
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
331331
resp = self.client.get(url, follow_redirects=True)
332332
self.check_content_in_response('XCom', resp)
333333

334334
def test_rendered(self):
335335
url = ('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
336-
.format(self.percent_encode(self.default_date)))
336+
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
337337
resp = self.client.get(url, follow_redirects=True)
338338
self.check_content_in_response('Rendered Template', resp)
339339

@@ -400,25 +400,32 @@ def test_paused(self):
400400
resp = self.client.post(url, follow_redirects=True)
401401
self.check_content_in_response('OK', resp)
402402

403+
def test_failed(self):
404+
url = ('failed?task_id=run_this_last&dag_id=example_bash_operator&'
405+
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
406+
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
407+
resp = self.client.get(url)
408+
self.check_content_in_response('Wait a minute', resp)
409+
403410
def test_success(self):
404411

405412
url = ('success?task_id=run_this_last&dag_id=example_bash_operator&'
406413
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
407-
.format(self.percent_encode(self.default_date)))
414+
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
408415
resp = self.client.get(url)
409416
self.check_content_in_response('Wait a minute', resp)
410417

411418
def test_clear(self):
412419
url = ('clear?task_id=runme_1&dag_id=example_bash_operator&'
413420
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
414-
.format(self.percent_encode(self.default_date)))
421+
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
415422
resp = self.client.get(url)
416423
self.check_content_in_response(['example_bash_operator', 'Wait a minute'], resp)
417424

418425
def test_run(self):
419426
url = ('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&'
420427
'ignore_ti_state=true&execution_date={}'
421-
.format(self.percent_encode(self.default_date)))
428+
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
422429
resp = self.client.get(url)
423430
self.check_content_in_response('', resp, resp_code=302)
424431

0 commit comments

Comments
 (0)