Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully handle whole config sections being renamed #28008

Merged
merged 9 commits into from
Dec 1, 2022
154 changes: 101 additions & 53 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

from typing_extensions import overload

from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowConfigException
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend
from airflow.utils import yaml
Expand Down Expand Up @@ -228,36 +229,24 @@ class AirflowConfigParser(ConfigParser):
("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"),
("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"),
("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"),
**{
("kubernetes_executor", x): ("kubernetes", x, "2.4.2")
for x in (
"pod_template_file",
"worker_container_repository",
"worker_container_tag",
"namespace",
"delete_worker_pods",
"delete_worker_pods_on_failure",
"worker_pods_creation_batch_size",
"multi_namespace_mode",
"in_cluster",
"cluster_context",
"config_file",
"kube_client_request_args",
"delete_option_kwargs",
"enable_tcp_keepalive",
"tcp_keep_idle",
"tcp_keep_intvl",
"tcp_keep_cnt",
"verify_ssl",
"worker_pods_pending_timeout",
"worker_pods_pending_timeout_check_interval",
"worker_pods_queued_check_interval",
"worker_pods_pending_timeout_batch_size",
)
},
("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
}

# A mapping of new section -> (old section, since_version).
deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: The individual sections had it as 2.4.2, but that was incorrect, so I've corrected it here in this PR to 2.5.0


# Now build the inverse so we can go from old_section/old_key to new_section/new_key
# if someone tries to retrieve it based on old_section/old_key
@cached_property
def inversed_deprecated_options(self):
return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}

@cached_property
def inversed_deprecated_sections(self):
return {
old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
}

# A mapping of old default values that we want to change and warn the user
# about. Mapping of section -> setting -> { old, replace, by_version }
deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = {
Expand Down Expand Up @@ -572,63 +561,116 @@ def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[o
section = str(section).lower()
key = str(key).lower()

deprecated_section, deprecated_key, _ = self.deprecated_options.get(
(section, key), (None, None, None)
)
issue_warning = True
deprecated_section: str | None
deprecated_key: str | None

option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)
# For when we rename whole sections
if section in self.inversed_deprecated_sections:
deprecated_section, deprecated_key = (section, key)
section = self.inversed_deprecated_sections[section]
warnings.warn(
f"The config section [{deprecated_section}] has been renamed to "
f"[{section}]. Please update your `conf.get*` call to use the new name",
FutureWarning,
stacklevel=2,
)
# Don't warn about individual rename if the whole section is renamed
issue_warning = False
elif (section, key) in self.inversed_deprecated_options:
# Handle using deprecated section/key instead of the new section/key
new_section, new_key = self.inversed_deprecated_options[(section, key)]
if issue_warning:
warnings.warn(
f"section/key [{section}/{key}] has been deprecated, you should use"
f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
"new name",
FutureWarning,
stacklevel=2,
)
issue_warning = False
deprecated_section, deprecated_key = section, key
section, key = (new_section, new_key)
elif section in self.deprecated_sections:
# When accessing the new section name, make sure we check under the old config name
deprecated_key = key
deprecated_section = self.deprecated_sections[section][0]
else:
deprecated_section, deprecated_key, _ = self.deprecated_options.get(
(section, key), (None, None, None)
)

# first check environment variables
option = self._get_environment_variables(
deprecated_key, deprecated_section, key, section, issue_warning=issue_warning
)
if option is not None:
return option

option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section)
# ...then the config file
option = self._get_option_from_config_file(
deprecated_key, deprecated_section, key, kwargs, section, issue_warning=issue_warning
)
if option is not None:
return option

option = self._get_option_from_commands(deprecated_key, deprecated_section, key, section)
# ...then commands
option = self._get_option_from_commands(
deprecated_key, deprecated_section, key, section, issue_warning=issue_warning
)
if option is not None:
return option

option = self._get_option_from_secrets(deprecated_key, deprecated_section, key, section)
# ...then from secret backends
option = self._get_option_from_secrets(
deprecated_key, deprecated_section, key, section, issue_warning=issue_warning
)
if option is not None:
return option

return self._get_option_from_default_config(section, key, **kwargs)

def _get_option_from_default_config(self, section: str, key: str, **kwargs) -> str | None:
# ...then the default config
if self.airflow_defaults.has_option(section, key) or "fallback" in kwargs:
return expand_env_var(self.airflow_defaults.get(section, key, **kwargs))

else:
log.warning("section/key [%s/%s] not found in config", section, key)
log.warning("section/key [%s/%s] not found in config", section, key)

raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")

def _get_option_from_secrets(
self, deprecated_key: str | None, deprecated_section: str | None, key: str, section: str
self,
deprecated_key: str | None,
deprecated_section: str | None,
key: str,
section: str,
issue_warning: bool = True,
) -> str | None:
# ...then from secret backends
option = self._get_secret_option(section, key)
if option:
return option
if deprecated_section and deprecated_key:
option = self._get_secret_option(deprecated_section, deprecated_key)
if option:
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
return option
return None

def _get_option_from_commands(
self, deprecated_key: str | None, deprecated_section: str | None, key: str, section: str
self,
deprecated_key: str | None,
deprecated_section: str | None,
key: str,
section: str,
issue_warning: bool = True,
) -> str | None:
# ...then commands
option = self._get_cmd_option(section, key)
if option:
return option
if deprecated_section and deprecated_key:
option = self._get_cmd_option(deprecated_section, deprecated_key)
if option:
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
return option
return None

Expand All @@ -639,29 +681,35 @@ def _get_option_from_config_file(
key: str,
kwargs: dict[str, Any],
section: str,
issue_warning: bool = True,
) -> str | None:
# ...then the config file
if super().has_option(section, key):
# Use the parent's methods to get the actual config here to be able to
# separate the config from default config.
return expand_env_var(super().get(section, key, **kwargs))
if deprecated_section and deprecated_key:
if super().has_option(deprecated_section, deprecated_key):
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
return None

def _get_environment_variables(
self, deprecated_key: str | None, deprecated_section: str | None, key: str, section: str
self,
deprecated_key: str | None,
deprecated_section: str | None,
key: str,
section: str,
issue_warning: bool = True,
) -> str | None:
# first check environment variables
option = self._get_env_var_option(section, key)
if option is not None:
return option
if deprecated_section and deprecated_key:
option = self._get_env_var_option(deprecated_section, deprecated_key)
if option is not None:
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
return option
return None

Expand Down Expand Up @@ -1283,14 +1331,14 @@ def _warn_deprecate(section: str, key: str, deprecated_section: str, deprecated_
f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
f"the old setting has been used, but please update your config.",
DeprecationWarning,
stacklevel=3,
stacklevel=4,
)
else:
warnings.warn(
f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
f"in [{section}] - the old setting has been used, but please update your config.",
DeprecationWarning,
stacklevel=3,
stacklevel=4,
)

def __getstate__(self):
Expand Down
70 changes: 70 additions & 0 deletions tests/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
HOME_DIR = os.path.expanduser("~")


@pytest.fixture(scope="module", autouse=True)
def restore_env():
with mock.patch.dict("os.environ"):
yield


@unittest.mock.patch.dict(
"os.environ",
{
Expand Down Expand Up @@ -871,6 +877,10 @@ def test_deprecated_options_with_new_section(self):
with mock.patch.dict("os.environ", AIRFLOW__CORE__LOGGING_LEVEL="VALUE"):
assert conf.get("logging", "logging_level") == "VALUE"

with pytest.warns(FutureWarning, match="Please update your `conf.get"):
with mock.patch.dict("os.environ", AIRFLOW__CORE__LOGGING_LEVEL="VALUE"):
assert conf.get("core", "logging_level") == "VALUE"

with pytest.warns(DeprecationWarning), conf_vars({("core", "logging_level"): "VALUE"}):
assert conf.get("logging", "logging_level") == "VALUE"

Expand Down Expand Up @@ -1005,6 +1015,66 @@ def make_config():
assert test_conf.get("core", "hostname_callable") == "CarrierPigeon"
assert [] == warning

@pytest.mark.parametrize(
("conf_dict", "environ", "expected"),
[
pytest.param({"old_section": {"val": "old_val"}}, None, "old_val", id="old_config"),
pytest.param(
{"old_section": {"val": "old_val"}},
("AIRFLOW__OLD_SECTION__VAL", "old_env"),
"old_env",
id="old_config_old_env",
),
pytest.param(
{},
("AIRFLOW__OLD_SECTION__VAL", "old_env"),
"old_env",
id="old_env",
),
pytest.param(
{"new_section": {"val": "val2"}},
("AIRFLOW__OLD_SECTION__VAL", "old_env"),
"old_env",
id="new_config_old_env",
),
],
)
def test_deprecated_sections(self, conf_dict, environ, expected, monkeypatch):
def make_config():
test_conf = AirflowConfigParser(
default_config=textwrap.dedent(
"""
[new_section]
val=new
"""
)
)
# Guarantee we have a deprecated setting, so we test the deprecation
# lookup even if we remove this explicit fallback
test_conf.deprecated_sections = {
"new_section": ("old_section", "2.1"),
}
test_conf.read_dict(conf_dict)
test_conf.validate()
return test_conf

if environ:
monkeypatch.setenv(*environ)

test_conf = make_config()
with pytest.warns(
DeprecationWarning,
match=r"\[old_section\] has been moved to the val option in \[new_section\].*update your config",
):
# Test when you've _set_ the old value that we warn you need to update your config
assert test_conf.get("new_section", "val") == expected
with pytest.warns(
FutureWarning,
match=r"\[old_section\] has been renamed to \[new_section\].*update your `conf.get",
):
# Test when you read using the old section you get told to change your `conf.get` call
assert test_conf.get("old_section", "val") == expected

def test_deprecated_funcs(self):
for func in [
"load_test_config",
Expand Down