Skip to content

Commit f68938f

Browse files
kylebrooks-8451ashb
authored andcommitted
[AIRFLOW-2231] Fix relativedelta DAG schedule_interval (apache#3174)
Fixes issues when specifying a DAG with a schedule_interval of type relativedelta.
1 parent e7343a3 commit f68938f

File tree

2 files changed

+43
-3
lines changed

2 files changed

+43
-3
lines changed

airflow/models.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
from builtins import str, object, bytes, ImportError as BuiltinImportError
2828
import copy
29-
from collections import namedtuple, defaultdict, OrderedDict
29+
from collections import namedtuple, defaultdict, Hashable, OrderedDict
3030
from datetime import timedelta
3131

3232
import dill
@@ -3387,7 +3387,7 @@ def __init__(
33873387
)
33883388

33893389
self.schedule_interval = schedule_interval
3390-
if schedule_interval in cron_presets:
3390+
if isinstance(schedule_interval, Hashable) and schedule_interval in cron_presets:
33913391
self._schedule_interval = cron_presets.get(schedule_interval)
33923392
elif schedule_interval == '@once':
33933393
self._schedule_interval = None
@@ -3518,7 +3518,7 @@ def following_schedule(self, dttm):
35183518
tz = pendulum.timezone(self.timezone.name)
35193519
following = timezone.make_aware(naive, tz)
35203520
return timezone.convert_to_utc(following)
3521-
elif isinstance(self._schedule_interval, timedelta):
3521+
elif self._schedule_interval is not None:
35223522
return dttm + self._schedule_interval
35233523

35243524
def previous_schedule(self, dttm):

tests/core.py

+40
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,46 @@ def test_schedule_dag_no_previous_runs(self):
156156
self.assertFalse(dag_run.external_trigger)
157157
dag.clear()
158158

159+
def test_schedule_dag_relativedelta(self):
160+
"""
161+
Tests scheduling a dag with a relativedelta schedule_interval
162+
"""
163+
delta = relativedelta(hours=+1)
164+
dag = DAG(TEST_DAG_ID + 'test_schedule_dag_relativedelta',
165+
schedule_interval=delta)
166+
dag.add_task(models.BaseOperator(
167+
task_id="faketastic",
168+
owner='Also fake',
169+
start_date=datetime(2015, 1, 2, 0, 0)))
170+
171+
dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
172+
self.assertIsNotNone(dag_run)
173+
self.assertEqual(dag.dag_id, dag_run.dag_id)
174+
self.assertIsNotNone(dag_run.run_id)
175+
self.assertNotEqual('', dag_run.run_id)
176+
self.assertEqual(
177+
datetime(2015, 1, 2, 0, 0),
178+
dag_run.execution_date,
179+
msg='dag_run.execution_date did not match expectation: {0}'
180+
.format(dag_run.execution_date)
181+
)
182+
self.assertEqual(State.RUNNING, dag_run.state)
183+
self.assertFalse(dag_run.external_trigger)
184+
dag_run2 = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
185+
self.assertIsNotNone(dag_run2)
186+
self.assertEqual(dag.dag_id, dag_run2.dag_id)
187+
self.assertIsNotNone(dag_run2.run_id)
188+
self.assertNotEqual('', dag_run2.run_id)
189+
self.assertEqual(
190+
datetime(2015, 1, 2, 0, 0) + delta,
191+
dag_run2.execution_date,
192+
msg='dag_run2.execution_date did not match expectation: {0}'
193+
.format(dag_run2.execution_date)
194+
)
195+
self.assertEqual(State.RUNNING, dag_run2.state)
196+
self.assertFalse(dag_run2.external_trigger)
197+
dag.clear()
198+
159199
def test_schedule_dag_fake_scheduled_previous(self):
160200
"""
161201
Test scheduling a dag where there is a prior DagRun

0 commit comments

Comments
 (0)