Skip to content

Commit f4915fc

Browse files
Newton LeCI
Newton Le
authored and
CI
committed
[AIRFLOW-3060] DAG context manager fails to exit properly in certain circumstances
[AIRFLOW-3123] Use a stack for DAG context management (apache#3956) [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp. [AIRFLOW-3160] Load latest_dagruns asynchronously [REVBUMP] v9 [AIRFLOW-2861] Add index on log table (apache#3709) [AIRFLOW-2747][PARTIAL CHERRYPICK] Explicit re-schedule of sensors [AIRFLOW-3191] Fix not being able to specify execution_date when creating dagrun [AIRFLOW-2657] Add ability to delete dag from web UI Closes apache#3531 from Noremac201/master [REVBUMP] v10 [AIRFLOW-3233] Fix deletion of DAGs in the UI [AIRFLOW-4070] log.warning for duplicate task dependencies This change logs a warning on duplicate task dependencies rather than raising an AirflowException. This will allow automated task dependencies to be generated while giving the user the option to explicitly define task dependencies. Differential Revision: https://phabricator.twitter.biz/D320000
1 parent 1d2502e commit f4915fc

File tree

11 files changed

+276
-7
lines changed

11 files changed

+276
-7
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# flake8: noqa
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
20+
"""add task_reschedule table
21+
22+
Revision ID: 0a2a5b66e19d
23+
Revises: 9635ae0956e7
24+
Create Date: 2018-06-17 22:50:00.053620
25+
26+
"""
27+
28+
# revision identifiers, used by Alembic.
29+
revision = '0a2a5b66e19d'
30+
down_revision = '9635ae0956e7'
31+
branch_labels = None
32+
depends_on = None
33+
34+
from alembic import op
35+
import sqlalchemy as sa
36+
from sqlalchemy.dialects import mysql
37+
38+
39+
TABLE_NAME = 'task_reschedule'
40+
INDEX_NAME = 'idx_' + TABLE_NAME + '_dag_task_date'
41+
42+
# For Microsoft SQL Server, TIMESTAMP is a row-id type,
43+
# having nothing to do with date-time. DateTime() will
44+
# be sufficient.
45+
def mssql_timestamp():
46+
return sa.DateTime()
47+
48+
def mysql_timestamp():
49+
return mysql.TIMESTAMP(fsp=6)
50+
51+
def sa_timestamp():
52+
return sa.TIMESTAMP(timezone=True)
53+
54+
def upgrade():
55+
# See 0e2a74e0fc9f_add_time_zone_awareness
56+
conn = op.get_bind()
57+
if conn.dialect.name == 'mysql':
58+
timestamp = mysql_timestamp
59+
elif conn.dialect.name == 'mssql':
60+
timestamp = mssql_timestamp
61+
else:
62+
timestamp = sa_timestamp
63+
64+
op.create_table(
65+
TABLE_NAME,
66+
sa.Column('id', sa.Integer(), nullable=False),
67+
sa.Column('task_id', sa.String(length=250), nullable=False),
68+
sa.Column('dag_id', sa.String(length=250), nullable=False),
69+
# use explicit server_default=None otherwise mysql implies defaults for first timestamp column
70+
sa.Column('execution_date', timestamp(), nullable=False, server_default=None),
71+
sa.Column('try_number', sa.Integer(), nullable=False),
72+
sa.Column('start_date', timestamp(), nullable=False),
73+
sa.Column('end_date', timestamp(), nullable=False),
74+
sa.Column('duration', sa.Integer(), nullable=False),
75+
sa.Column('reschedule_date', timestamp(), nullable=False),
76+
sa.PrimaryKeyConstraint('id'),
77+
sa.ForeignKeyConstraint(['task_id', 'dag_id', 'execution_date'],
78+
['task_instance.task_id', 'task_instance.dag_id','task_instance.execution_date'],
79+
name='task_reschedule_dag_task_date_fkey')
80+
)
81+
op.create_index(
82+
INDEX_NAME,
83+
TABLE_NAME,
84+
['dag_id', 'task_id', 'execution_date'],
85+
unique=False
86+
)
87+
88+
89+
def downgrade():
90+
op.drop_index(INDEX_NAME, table_name=TABLE_NAME)
91+
op.drop_table(TABLE_NAME)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
from alembic import op
20+
21+
"""add idx_log_dag
22+
23+
Revision ID: dd25f486b8ea
24+
Revises: 9635ae0956e7
25+
Create Date: 2018-08-07 06:41:41.028249
26+
27+
"""
28+
29+
# revision identifiers, used by Alembic.
30+
revision = 'dd25f486b8ea'
31+
down_revision = '9635ae0956e7'
32+
branch_labels = None
33+
depends_on = None
34+
35+
36+
def upgrade():
37+
op.create_index('idx_log_dag', 'log', ['dag_id'], unique=False)
38+
39+
40+
def downgrade():
41+
op.drop_index('idx_log_dag', table_name='log')

airflow/models.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -2098,6 +2098,10 @@ class Log(Base):
20982098
owner = Column(String(500))
20992099
extra = Column(Text)
21002100

2101+
__table_args__ = (
2102+
Index('idx_log_dag', dag_id),
2103+
)
2104+
21012105
def __init__(self, event, task_instance, owner=None, extra=None, **kwargs):
21022106
self.dttm = timezone.utcnow()
21032107
self.event = event
@@ -2939,7 +2943,7 @@ def task_type(self):
29392943

29402944
def add_only_new(self, item_set, item):
29412945
if item in item_set:
2942-
raise AirflowException(
2946+
self.log.warning(
29432947
'Dependency {self}, {item} already registered'
29442948
''.format(**locals()))
29452949
else:
@@ -3252,6 +3256,8 @@ def __init__(
32523256
self.on_success_callback = on_success_callback
32533257
self.on_failure_callback = on_failure_callback
32543258

3259+
self._old_context_manager_dags = []
3260+
32553261
self._comps = {
32563262
'dag_id',
32573263
'task_ids',
@@ -3299,13 +3305,13 @@ def __hash__(self):
32993305

33003306
def __enter__(self):
33013307
global _CONTEXT_MANAGER_DAG
3302-
self._old_context_manager_dag = _CONTEXT_MANAGER_DAG
3308+
self._old_context_manager_dags.append(_CONTEXT_MANAGER_DAG)
33033309
_CONTEXT_MANAGER_DAG = self
33043310
return self
33053311

33063312
def __exit__(self, _type, _value, _tb):
33073313
global _CONTEXT_MANAGER_DAG
3308-
_CONTEXT_MANAGER_DAG = self._old_context_manager_dag
3314+
_CONTEXT_MANAGER_DAG = self._old_context_manager_dags.pop()
33093315

33103316
# /Context Manager ----------------------------------------------
33113317

airflow/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
# under the License.
1919
#
2020

21-
version = '1.10.0+twtr8'
21+
version = '1.10.0+twtr11'

airflow/www/templates/airflow/dag.html

+13
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ <h4 class="pull-right">
100100
Refresh
101101
</a>
102102
</li>
103+
<li>
104+
<a href="{{ url_for("airflow.delete", dag_id=dag.dag_id, root=root) }}"
105+
onclick="return confirmDeleteDag('{{ dag.safe_dag_id }}')">
106+
<span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true"></span>
107+
Delete
108+
</a>
109+
</li>
103110
</ul>
104111
</div>
105112
<hr>
@@ -302,6 +309,12 @@ <h4 class="modal-title" id="dagModalLabel">
302309
$("#dagModal").css("margin-top","0px");
303310
}
304311

312+
function confirmDeleteDag(dag_id){
313+
return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
314+
This option will delete ALL metadata, DAG runs, etc.\n\
315+
This cannot be undone.");
316+
}
317+
305318
$("#btn_rendered").click(function(){
306319
url = "{{ url_for('airflow.rendered') }}" +
307320
"?task_id=" + encodeURIComponent(task_id) +

airflow/www/templates/airflow/dags.html

+13
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,13 @@ <h2>DAGs</h2>
186186
<span class="glyphicon glyphicon-refresh" aria-hidden="true" data-original-title="Refresh"></span>
187187
</a>
188188

189+
<!-- Delete -->
190+
<!-- Use dag_id instead of dag.dag_id, because the DAG might not exist in the webserver's DagBag -->
191+
<a href="{{ url_for('airflow.delete', dag_id=dag_id) }}"
192+
onclick="return confirmDeleteDag('{{ dag_id }}')">
193+
<span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true" data-original-title="Delete Dag"></span>
194+
</a>
195+
189196
</td>
190197
</tr>
191198
{% endfor %}
@@ -240,6 +247,12 @@ <h2>DAGs</h2>
240247
function confirmTriggerDag(dag_id){
241248
return confirm("Are you sure you want to run '"+dag_id+"' now?");
242249
}
250+
251+
function confirmDeleteDag(dag_id){
252+
return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
253+
This option will delete ALL metadata, DAG runs, etc.\n\
254+
This cannot be undone.");
255+
}
243256
all_dags = $("[id^=toggle]");
244257
$.each(all_dags, function(i,v) {
245258
$(v).change (function() {

airflow/www/views.py

+43-3
Original file line numberDiff line numberDiff line change
@@ -1063,6 +1063,32 @@ def run(self):
10631063
"it should start any moment now.".format(ti))
10641064
return redirect(origin)
10651065

1066+
@expose('/delete')
1067+
@login_required
1068+
@wwwutils.action_logging
1069+
@wwwutils.notify_owner
1070+
def delete(self):
1071+
from airflow.api.common.experimental import delete_dag
1072+
from airflow.exceptions import DagNotFound, DagFileExists
1073+
1074+
dag_id = request.args.get('dag_id')
1075+
origin = request.args.get('origin') or "/admin/"
1076+
1077+
try:
1078+
delete_dag.delete_dag(dag_id)
1079+
except DagNotFound:
1080+
flash("DAG with id {} not found. Cannot delete".format(dag_id))
1081+
return redirect(request.referrer)
1082+
except DagFileExists:
1083+
flash("Dag id {} is still in DagBag. "
1084+
"Remove the DAG file first.".format(dag_id))
1085+
return redirect(request.referrer)
1086+
1087+
flash("Deleting DAG with id {}. May take a couple minutes to fully"
1088+
" disappear.".format(dag_id))
1089+
# Upon successful delete return to origin
1090+
return redirect(origin)
1091+
10661092
@expose('/trigger')
10671093
@login_required
10681094
@wwwutils.action_logging
@@ -1302,6 +1328,10 @@ def tree(self, session=None):
13021328
dag_id = request.args.get('dag_id')
13031329
blur = conf.getboolean('webserver', 'demo_mode')
13041330
dag = dagbag.get_dag(dag_id)
1331+
if dag_id not in dagbag.dags:
1332+
flash('DAG "{0}" seems to be missing.'.format(dag_id), "error")
1333+
return redirect('/admin/')
1334+
13051335
root = request.args.get('root')
13061336
if root:
13071337
dag = dag.sub_dag(
@@ -2542,9 +2572,18 @@ class DagRunModelView(ModelViewOnly):
25422572
('failed', 'failed'),
25432573
],
25442574
}
2545-
form_args = dict(
2546-
dag_id=dict(validators=[validators.DataRequired()])
2547-
)
2575+
form_args = {
2576+
'dag_id': {
2577+
'validators': [
2578+
validators.DataRequired(),
2579+
]
2580+
},
2581+
'execution_date': {
2582+
'filters': [
2583+
parse_datetime_f,
2584+
]
2585+
}
2586+
}
25482587
column_list = (
25492588
'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
25502589
column_filters = column_list
@@ -2557,6 +2596,7 @@ class DagRunModelView(ModelViewOnly):
25572596
dag_id=dag_link,
25582597
run_id=dag_run_link
25592598
)
2599+
form_overrides = dict(execution_date=DateTimeField)
25602600

25612601
@action('new_delete', "Delete", "Are you sure you want to delete selected records?")
25622602
@provide_session

airflow/www_rbac/templates/airflow/dag.html

+13
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@ <h4 class="pull-right">
9999
Refresh
100100
</a>
101101
</li>
102+
<li>
103+
<a href="{{ url_for('Airflow.delete', dag_id=dag.dag_id, root=root) }}"
104+
onclick="return confirmDeleteDag('{{ dag.safe_dag_id }}')">
105+
<span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true"></span>
106+
Delete
107+
</a>
108+
</li>
102109
</ul>
103110
</div>
104111
<hr>
@@ -300,6 +307,12 @@ <h4 class="modal-title" id="dagModalLabel">
300307
$("#dagModal").css("margin-top","0px");
301308
}
302309

310+
function confirmDeleteDag(dag_id){
311+
return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
312+
This option will delete ALL metadata, DAG runs, etc.\n\
313+
This cannot be undone.");
314+
}
315+
303316
$("#btn_rendered").click(function(){
304317
url = "{{ url_for('Airflow.rendered') }}" +
305318
"?task_id=" + encodeURIComponent(task_id) +

airflow/www_rbac/templates/airflow/dags.html

+11
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ <h2>DAGs</h2>
187187
<span class="glyphicon glyphicon-refresh" aria-hidden="true" data-original-title="Refresh"></span>
188188
</a>
189189

190+
<!-- Delete -->
191+
<a href="{{ url_for('Airflow.delete', dag_id=dag.dag_id) }}"
192+
onclick="return confirmDeleteDag('{{ dag.safe_dag_id }}')">
193+
<span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true" data-original-title="Delete Dag"></span>
194+
</a>
190195
</td>
191196
</tr>
192197
{% endfor %}
@@ -238,6 +243,12 @@ <h2>DAGs</h2>
238243
window.location = DAGS_INDEX + "?page_size=" + p_size;
239244
});
240245

246+
function confirmDeleteDag(dag_id){
247+
return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
248+
This option will delete ALL metadata, DAG runs, etc.\n\
249+
This cannot be undone.");
250+
}
251+
241252
function confirmTriggerDag(dag_id){
242253
return confirm("Are you sure you want to run '"+dag_id+"' now?");
243254
}

0 commit comments

Comments
 (0)