Skip to content

Commit a4592f9

Browse files
ashbbolkedebruin
authored andcommitted
[AIRFLOW-1840] Support back-compat on old celery config
The new names are in-line with Celery 4, but if anyone upgrades Airflow without following the UPDATING.md instructions (which we probably assume most people won't, not until something stops working) their workers would suddenly just start failing. That's bad. This will issue a warning but carry on working as expected. We can remove the deprecation settings (but leave the code in config) after this release has been made. Closes apache#3549 from ashb/AIRFLOW-1840-back-compat
1 parent 57bf996 commit a4592f9

File tree

4 files changed

+95
-3
lines changed

4 files changed

+95
-3
lines changed

UPDATING.md

+3
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ To make the config of Airflow compatible with Celery, some properties have been
5656
```
5757
celeryd_concurrency -> worker_concurrency
5858
celery_result_backend -> result_backend
59+
celery_ssl_active -> ssl_active
60+
celery_ssl_cert -> ssl_cert
61+
celery_ssl_key -> ssl_key
5962
```
6063
Resulting in the same config parameters as Celery 4, with more transparency.
6164

airflow/configuration.py

+52-1
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,29 @@ class AirflowConfigParser(ConfigParser):
121121
('core', 'sql_alchemy_conn'),
122122
('core', 'fernet_key'),
123123
('celery', 'broker_url'),
124-
('celery', 'result_backend')
124+
('celery', 'result_backend'),
125+
# Todo: remove this in Airflow 1.11
126+
('celery', 'celery_result_backend'),
125127
}
126128

129+
# A two-level mapping of (section -> new_name -> old_name). When reading
130+
# new_name, the old_name will be checked to see if it exists. If it does a
131+
# DeprecationWarning will be issued and the old name will be used instead
132+
deprecated_options = {
133+
'celery': {
134+
# Remove these keys in Airflow 1.11
135+
'worker_concurrency': 'celeryd_concurrency',
136+
'broker_url': 'celery_broker_url',
137+
'ssl_active': 'celery_ssl_active',
138+
'ssl_cert': 'celery_ssl_cert',
139+
'ssl_key': 'celery_ssl_key',
140+
}
141+
}
142+
deprecation_format_string = (
143+
'The {old} option in [{section}] has been renamed to {new} - the old '
144+
'setting has been used, but please update your config.'
145+
)
146+
127147
def __init__(self, default_config=None, *args, **kwargs):
128148
super(AirflowConfigParser, self).__init__(*args, **kwargs)
129149

@@ -181,22 +201,42 @@ def get(self, section, key, **kwargs):
181201
section = str(section).lower()
182202
key = str(key).lower()
183203

204+
deprecated_name = self.deprecated_options.get(section, {}).get(key, None)
205+
184206
# first check environment variables
185207
option = self._get_env_var_option(section, key)
186208
if option is not None:
187209
return option
210+
if deprecated_name:
211+
option = self._get_env_var_option(section, deprecated_name)
212+
if option is not None:
213+
self._warn_deprecate(section, key, deprecated_name)
214+
return option
188215

189216
# ...then the config file
190217
if super(AirflowConfigParser, self).has_option(section, key):
191218
# Use the parent's methods to get the actual config here to be able to
192219
# separate the config from default config.
193220
return expand_env_var(
194221
super(AirflowConfigParser, self).get(section, key, **kwargs))
222+
if deprecated_name:
223+
if super(AirflowConfigParser, self).has_option(section, deprecated_name):
224+
self._warn_deprecate(section, key, deprecated_name)
225+
return expand_env_var(super(AirflowConfigParser, self).get(
226+
section,
227+
deprecated_name,
228+
**kwargs
229+
))
195230

196231
# ...then commands
197232
option = self._get_cmd_option(section, key)
198233
if option:
199234
return option
235+
if deprecated_name:
236+
option = self._get_cmd_option(section, deprecated_name)
237+
if option:
238+
self._warn_deprecate(section, key, deprecated_name)
239+
return option
200240

201241
# ...then the default config
202242
if self.defaults.has_option(section, key):
@@ -352,6 +392,17 @@ def load_test_config(self):
352392
# then read any "custom" test settings
353393
self.read(TEST_CONFIG_FILE)
354394

395+
def _warn_deprecate(self, section, key, deprecated_name):
396+
warnings.warn(
397+
self.deprecation_format_string.format(
398+
old=deprecated_name,
399+
new=key,
400+
section=section,
401+
),
402+
DeprecationWarning,
403+
stacklevel=3,
404+
)
405+
355406

356407
def mkdir_p(path):
357408
try:

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def write_version(filename=os.path.join(*['airflow',
231231
devel_ci = [package for package in devel_all if package not in
232232
['snakebite>=2.7.8', 'snakebite[kerberos]>=2.7.8']]
233233
else:
234-
devel_ci = devel_all
234+
devel_ci = devel_all + ['unittest2']
235235

236236

237237
def do_setup():

tests/configuration.py

+39-1
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,20 @@
2020
from __future__ import print_function
2121
from __future__ import unicode_literals
2222

23-
import unittest
23+
import os
2424
from collections import OrderedDict
2525

2626
import six
2727

2828
from airflow import configuration
2929
from airflow.configuration import conf, AirflowConfigParser, parameterized_config
3030

31+
if six.PY2:
32+
# Need `assertWarns` back-ported from unittest2
33+
import unittest2 as unittest
34+
else:
35+
import unittest
36+
3137

3238
class ConfTest(unittest.TestCase):
3339

@@ -154,3 +160,35 @@ def test_broker_transport_options(self):
154160
self.assertTrue(isinstance(section_dict['_test_only_float'], float))
155161

156162
self.assertTrue(isinstance(section_dict['_test_only_string'], six.string_types))
163+
164+
def test_deprecated_options(self):
165+
# Guarantee we have a deprecated setting, so we test the deprecation
166+
# lookup even if we remove this explicit fallback
167+
conf.deprecated_options['celery'] = {
168+
'worker_concurrency': 'celeryd_concurrency',
169+
}
170+
171+
# Remove it so we are sure we use the right setting
172+
conf.remove_option('celery', 'worker_concurrency')
173+
174+
with self.assertWarns(DeprecationWarning):
175+
os.environ['AIRFLOW__CELERY__CELERYD_CONCURRENCY'] = '99'
176+
self.assertEquals(conf.getint('celery', 'worker_concurrency'), 99)
177+
os.environ.pop('AIRFLOW__CELERY__CELERYD_CONCURRENCY')
178+
179+
with self.assertWarns(DeprecationWarning):
180+
conf.set('celery', 'celeryd_concurrency', '99')
181+
self.assertEquals(conf.getint('celery', 'worker_concurrency'), 99)
182+
conf.remove_option('celery', 'celeryd_concurrency')
183+
184+
def test_deprecated_options_cmd(self):
185+
# Guarantee we have a deprecated setting, so we test the deprecation
186+
# lookup even if we remove this explicit fallback
187+
conf.deprecated_options['celery'] = {'result_backend': 'celery_result_backend'}
188+
conf.as_command_stdout.add(('celery', 'celery_result_backend'))
189+
190+
conf.remove_option('celery', 'result_backend')
191+
conf.set('celery', 'celery_result_backend_cmd', '/bin/echo 99')
192+
193+
with self.assertWarns(DeprecationWarning):
194+
self.assertEquals(conf.getint('celery', 'result_backend'), 99)

0 commit comments

Comments
 (0)