Skip to content

Commit ce23f16

Browse files
newtonleashb
authored andcommitted
[AIRFLOW-3060] DAG context manager fails to exit properly in certain circumstances
1 parent 1c00bce commit ce23f16

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

airflow/models.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -3442,6 +3442,8 @@ def __init__(
34423442
self.on_success_callback = on_success_callback
34433443
self.on_failure_callback = on_failure_callback
34443444

3445+
self._context_manager_set = False
3446+
34453447
self._comps = {
34463448
'dag_id',
34473449
'task_ids',
@@ -3490,13 +3492,16 @@ def __hash__(self):
34903492

34913493
def __enter__(self):
34923494
global _CONTEXT_MANAGER_DAG
3493-
self._old_context_manager_dag = _CONTEXT_MANAGER_DAG
3494-
_CONTEXT_MANAGER_DAG = self
3495+
if not self._context_manager_set:
3496+
self._old_context_manager_dag = _CONTEXT_MANAGER_DAG
3497+
_CONTEXT_MANAGER_DAG = self
3498+
self._context_manager_set = True
34953499
return self
34963500

34973501
def __exit__(self, _type, _value, _tb):
34983502
global _CONTEXT_MANAGER_DAG
34993503
_CONTEXT_MANAGER_DAG = self._old_context_manager_dag
3504+
self._context_manager_set = False
35003505

35013506
# /Context Manager ----------------------------------------------
35023507

tests/models.py

+9
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,15 @@ def test_dag_as_context_manager(self):
147147
self.assertEqual(dag.dag_id, 'creating_dag_in_cm')
148148
self.assertEqual(dag.tasks[0].task_id, 'op6')
149149

150+
with dag:
151+
with dag:
152+
op7 = DummyOperator(task_id='op7')
153+
op8 = DummyOperator(task_id='op8')
154+
op8.dag = dag2
155+
156+
self.assertEqual(op7.dag, dag)
157+
self.assertEqual(op8.dag, dag2)
158+
150159
def test_dag_topological_sort(self):
151160
dag = DAG(
152161
'dag',

0 commit comments

Comments
 (0)