Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-1837] Respect task start_date when different from dag's #4000

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5252,7 +5252,9 @@ def verify_integrity(self, session=None):

# check for missing tasks
for task in six.itervalues(dag.task_dict):
if task.adhoc:
if task.adhoc or task.start_date > self.execution_date:
continue
if task.start_date > self.execution_date and not self.is_backfill:
continue

if task.task_id not in task_ids:
Expand Down
22 changes: 19 additions & 3 deletions tests/dags/test_scheduler_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,34 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime
from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
DEFAULT_DATE = datetime(2100, 1, 1)
DEFAULT_DATE = datetime(2016, 1, 1)

# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(
dag_id='test_start_date_scheduling',
start_date=datetime(2100, 1, 1))
start_date=datetime.utcnow() + timedelta(days=1))
dag1_task1 = DummyOperator(
task_id='dummy',
dag=dag1,
owner='airflow')

dag2 = DAG(
dag_id='test_task_start_date_scheduling',
start_date=DEFAULT_DATE
)
dag2_task1 = DummyOperator(
task_id='dummy1',
dag=dag2,
owner='airflow',
start_date=DEFAULT_DATE + timedelta(days=3)
)
dag2_task2 = DummyOperator(
task_id='dummy2',
dag=dag2,
owner='airflow'
)
23 changes: 22 additions & 1 deletion tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2217,7 +2217,7 @@ def test_scheduler_start_date(self):
dag_id = 'test_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
self.assertTrue(dag.start_date > DEFAULT_DATE)
self.assertTrue(dag.start_date > datetime.datetime.utcnow() )

scheduler = SchedulerJob(dag_id,
num_runs=2)
Expand Down Expand Up @@ -2252,6 +2252,27 @@ def test_scheduler_start_date(self):
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)

def test_scheduler_task_start_date(self):
"""
Test that the scheduler respects task start dates that are different
from DAG start dates
"""
dag_id = 'test_task_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
scheduler = SchedulerJob(dag_id,
num_runs=2)
scheduler.run()

session = settings.Session()
tiq = session.query(TI).filter(TI.dag_id == dag_id)
ti1s = tiq.filter(TI.task_id == 'dummy1').all()
ti2s = tiq.filter(TI.task_id == 'dummy2').all()
self.assertEqual(len(ti1s), 0)
self.assertEqual(len(ti2s), 2)
for t in ti2s:
self.assertEqual(t.state, State.SUCCESS)

def test_scheduler_multiprocessing(self):
"""
Test that the scheduler can successfully queue multiple dags in parallel
Expand Down