From f528da0998804e37f8f3b4c87a2133081d80d225 Mon Sep 17 00:00:00 2001 From: Dima Kamalov Date: Thu, 4 Oct 2018 20:38:43 -0700 Subject: [PATCH 1/2] [AIRFLOW-1837] Respect task start_date when different from dag's Currently task instances get created and scheduled based on the DAG's start date rather than their own. This commit adds a check before creating a task instance to see that the start date is not after the execution date. --- airflow/models.py | 2 +- tests/dags/test_scheduler_dags.py | 22 +++++++++++++++++++--- tests/jobs.py | 21 +++++++++++++++++++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index 22e8d2596a95b..fd415b27a4b5b 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -5252,7 +5252,7 @@ def verify_integrity(self, session=None): # check for missing tasks for task in six.itervalues(dag.task_dict): - if task.adhoc: + if task.adhoc or task.start_date > self.execution_date: continue if task.task_id not in task_ids: diff --git a/tests/dags/test_scheduler_dags.py b/tests/dags/test_scheduler_dags.py index ae2bd202d9803..5671157e28e60 100644 --- a/tests/dags/test_scheduler_dags.py +++ b/tests/dags/test_scheduler_dags.py @@ -17,18 +17,34 @@ # specific language governing permissions and limitations # under the License. -from datetime import datetime +from datetime import datetime, timedelta from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator -DEFAULT_DATE = datetime(2100, 1, 1) +DEFAULT_DATE = datetime(2016, 1, 1) # DAG tests backfill with pooled tasks # Previously backfill would queue the task but never run it dag1 = DAG( dag_id='test_start_date_scheduling', - start_date=datetime(2100, 1, 1)) + start_date=DEFAULT_DATE + timedelta(days=3)) dag1_task1 = DummyOperator( task_id='dummy', dag=dag1, owner='airflow') + +dag2 = DAG( + dag_id='test_task_start_date_scheduling', + start_date=DEFAULT_DATE +) +dag2_task1 = DummyOperator( + task_id='dummy1', + dag=dag2, + owner='airflow', + start_date=DEFAULT_DATE + timedelta(days=3) +) +dag2_task2 = DummyOperator( + task_id='dummy2', + dag=dag2, + owner='airflow' +) diff --git a/tests/jobs.py b/tests/jobs.py index bb714bd201602..600c6b976e6ea 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -2252,6 +2252,27 @@ def test_scheduler_start_date(self): self.assertEqual( len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1) + def test_scheduler_task_start_date(self): + """ + Test that the scheduler respects task start dates that are different + from DAG start dates + """ + dag_id = 'test_task_start_date_scheduling' + dag = self.dagbag.get_dag(dag_id) + dag.clear() + scheduler = SchedulerJob(dag_id, + num_runs=2) + scheduler.run() + + session = settings.Session() + tiq = session.query(TI).filter(TI.dag_id == dag_id) + ti1s = tiq.filter(TI.task_id == 'dummy1').all() + ti2s = tiq.filter(TI.task_id == 'dummy2').all() + self.assertEqual(len(ti1s), 0) + self.assertEqual(len(ti2s), 2) + for t in ti2s: + self.assertEqual(t.state, State.SUCCESS) + def test_scheduler_multiprocessing(self): """ Test that the scheduler can successfully queue multiple dags in parallel From 96ab3b638cf203236e9b323e179456e0d00367a6 Mon Sep 17 00:00:00 2001 From: dima-asana <42555784+dima-asana@users.noreply.github.com> Date: Fri, 5 Oct 2018 14:15:29 -0700 Subject: [PATCH 2/2] [AIRFLOW-1837] Respect task start_date when different from dag's Currently task instances get created and scheduled based on the DAG's start date rather than their own. This commit adds a check before creating a task instance to see that the start date is not after the execution date. --- airflow/models.py | 2 ++ tests/dags/test_scheduler_dags.py | 2 +- tests/jobs.py | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index fd415b27a4b5b..4a94215b92799 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -5254,6 +5254,8 @@ def verify_integrity(self, session=None): for task in six.itervalues(dag.task_dict): if task.adhoc or task.start_date > self.execution_date: continue + if task.start_date > self.execution_date and not self.is_backfill: + continue if task.task_id not in task_ids: Stats.incr( diff --git a/tests/dags/test_scheduler_dags.py b/tests/dags/test_scheduler_dags.py index 5671157e28e60..94e6f8216d226 100644 --- a/tests/dags/test_scheduler_dags.py +++ b/tests/dags/test_scheduler_dags.py @@ -27,7 +27,7 @@ # Previously backfill would queue the task but never run it dag1 = DAG( dag_id='test_start_date_scheduling', - start_date=DEFAULT_DATE + timedelta(days=3)) + start_date=datetime.utcnow() + timedelta(days=1)) dag1_task1 = DummyOperator( task_id='dummy', dag=dag1, diff --git a/tests/jobs.py b/tests/jobs.py index 600c6b976e6ea..c23c6035c7bd3 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -2217,7 +2217,7 @@ def test_scheduler_start_date(self): dag_id = 'test_start_date_scheduling' dag = self.dagbag.get_dag(dag_id) dag.clear() - self.assertTrue(dag.start_date > DEFAULT_DATE) + self.assertTrue(dag.start_date > datetime.datetime.utcnow() ) scheduler = SchedulerJob(dag_id, num_runs=2)