Skip to content

Commit

Permalink
Remove core Airflow support for static hybrid executors (#47322)
Browse files Browse the repository at this point in the history
Remove all the handholding and custom logic we have in core airflow
which allows the use of static hybrid executors like
LocalKubernetesExecutor and CeleryKubernetesExecutor. These executors
will still work on 2.X versions of Airflow, but moving forward they will
not be supported on Airflow 3
  • Loading branch information
o-nikolas authored Mar 6, 2025
1 parent c505411 commit 83643e5
Show file tree
Hide file tree
Showing 11 changed files with 10 additions and 81 deletions.
3 changes: 1 addition & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ core:
description: |
The executor class that airflow should use. Choices include
``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``,
``KubernetesExecutor``, ``CeleryKubernetesExecutor``, ``LocalKubernetesExecutor`` or the
full import path to the class when using a custom executor.
``KubernetesExecutor`` or the full import path to the class when using a custom executor.
version_added: ~
type: string
example: ~
Expand Down
4 changes: 0 additions & 4 deletions airflow/executors/executor_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,15 @@ class ConnectorSource(Enum):


LOCAL_EXECUTOR = "LocalExecutor"
LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor"
SEQUENTIAL_EXECUTOR = "SequentialExecutor"
CELERY_EXECUTOR = "CeleryExecutor"
CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor"
KUBERNETES_EXECUTOR = "KubernetesExecutor"
DEBUG_EXECUTOR = "DebugExecutor"
MOCK_EXECUTOR = "MockExecutor"
CORE_EXECUTOR_NAMES = {
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
MOCK_EXECUTOR,
Expand Down
37 changes: 5 additions & 32 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
from airflow.exceptions import AirflowConfigException, UnknownExecutorException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
CORE_EXECUTOR_NAMES,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
ConnectorSource,
)
Expand Down Expand Up @@ -59,12 +57,8 @@ class ExecutorLoader:

executors = {
LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
"executors.local_kubernetes_executor.LocalKubernetesExecutor",
SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor",
CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor",
CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery."
"executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
"executors.kubernetes_executor.KubernetesExecutor",
DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
Expand Down Expand Up @@ -265,17 +259,12 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor
_executor_name = executor_name

try:
if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR:
executor = cls.__load_celery_kubernetes_executor()
elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR:
executor = cls.__load_local_kubernetes_executor()
executor_cls, import_source = cls.import_executor_cls(_executor_name)
log.debug("Loading executor %s from %s", _executor_name, import_source.value)
if _executor_name.team_id:
executor = executor_cls(team_id=_executor_name.team_id)
else:
executor_cls, import_source = cls.import_executor_cls(_executor_name)
log.debug("Loading executor %s from %s", _executor_name, import_source.value)
if _executor_name.team_id:
executor = executor_cls(team_id=_executor_name.team_id)
else:
executor = executor_cls()
executor = executor_cls()

except ImportError as e:
log.error(e)
Expand Down Expand Up @@ -315,19 +304,3 @@ def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSourc
executor_name = cls.get_default_executor_name()
executor, source = cls.import_executor_cls(executor_name)
return executor, source

@classmethod
def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()

celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR])
return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor)

@classmethod
def __load_local_kubernetes_executor(cls) -> BaseExecutor:
local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()

local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
return local_kubernetes_executor_cls(local_executor, kubernetes_executor)
6 changes: 1 addition & 5 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,7 @@ def initialize():
LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)

# Determines if the executor utilizes Kubernetes
IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
executor_constants.KUBERNETES_EXECUTOR,
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
}
IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") == executor_constants.KUBERNETES_EXECUTOR

# Executors can set this to true to configure logging correctly for
# containerized executors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def setup_class(cls):
("LocalExecutor", True),
("SequentialExecutor", True),
("KubernetesExecutor", False),
("LocalKubernetesExecutor", True),
],
)
@mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner")
Expand Down
6 changes: 0 additions & 6 deletions tests/cli/commands/local_commands/test_standalone_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
from airflow.executors import executor_loader
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
)

Expand All @@ -40,17 +38,13 @@ class TestStandaloneCommand:
"conf_executor_name, conf_sql_alchemy_conn, expected_standalone_executor",
[
(LOCAL_EXECUTOR, "sqlite_conn_string", LOCAL_EXECUTOR),
(LOCAL_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(LOCAL_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(CELERY_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
],
Expand Down
10 changes: 2 additions & 8 deletions tests/cli/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

from airflow.executors import local_executor
from airflow.models.dagbag import DagBag
from airflow.providers.celery.executors import celery_executor, celery_kubernetes_executor
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, local_kubernetes_executor
from airflow.providers.celery.executors import celery_executor
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor

from tests_common.test_utils.config import conf_vars

Expand All @@ -33,15 +33,9 @@
custom_executor_module.CustomCeleryExecutor = type( # type: ignore
"CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
)
custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore
"CustomCeleryKubernetesExecutor", (celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
)
custom_executor_module.CustomLocalExecutor = type( # type: ignore
"CustomLocalExecutor", (local_executor.LocalExecutor,), {}
)
custom_executor_module.CustomLocalKubernetesExecutor = type( # type: ignore
"CustomLocalKubernetesExecutor", (local_kubernetes_executor.LocalKubernetesExecutor,), {}
)
custom_executor_module.CustomKubernetesExecutor = type( # type: ignore
"CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {}
)
Expand Down
4 changes: 0 additions & 4 deletions tests/cli/test_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,16 +365,12 @@ def test_executor_specific_commands_not_accessible(self, command):
"executor,expected_args",
[
("CeleryExecutor", ["celery"]),
("CeleryKubernetesExecutor", ["celery", "kubernetes"]),
("KubernetesExecutor", ["kubernetes"]),
("LocalExecutor", []),
("LocalKubernetesExecutor", ["kubernetes"]),
("SequentialExecutor", []),
# custom executors are mapped to the regular ones in `conftest.py`
("custom_executor.CustomLocalExecutor", []),
("custom_executor.CustomLocalKubernetesExecutor", ["kubernetes"]),
("custom_executor.CustomCeleryExecutor", ["celery"]),
("custom_executor.CustomCeleryKubernetesExecutor", ["celery", "kubernetes"]),
("custom_executor.CustomKubernetesExecutor", ["kubernetes"]),
],
)
Expand Down
2 changes: 0 additions & 2 deletions tests/executors/test_executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def test_no_executor_configured(self):
"executor_name",
[
"CeleryExecutor",
"CeleryKubernetesExecutor",
"DebugExecutor",
"KubernetesExecutor",
"LocalExecutor",
Expand Down Expand Up @@ -287,7 +286,6 @@ def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self,
("executor_config", "expected_value"),
[
("CeleryExecutor", "CeleryExecutor"),
("CeleryKubernetesExecutor", "CeleryKubernetesExecutor"),
("DebugExecutor", "DebugExecutor"),
("KubernetesExecutor", "KubernetesExecutor"),
("LocalExecutor", "LocalExecutor"),
Expand Down
8 changes: 0 additions & 8 deletions tests/sensors/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
from airflow.executors.debug_executor import DebugExecutor
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
)
from airflow.executors.local_executor import LocalExecutor
Expand All @@ -48,9 +46,7 @@
from airflow.models.trigger import TriggerFailureReason
from airflow.models.xcom import XCom
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import LocalKubernetesExecutor
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, poke_mode_only
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
Expand Down Expand Up @@ -1306,20 +1302,16 @@ def test_sensor_with_xcom_fails(self, make_sensor):
"executor_cls_mode",
[
(CeleryExecutor, "poke"),
(CeleryKubernetesExecutor, "poke"),
(DebugExecutor, "reschedule"),
(KubernetesExecutor, "poke"),
(LocalExecutor, "poke"),
(LocalKubernetesExecutor, "poke"),
(SequentialExecutor, "poke"),
],
ids=[
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
],
)
Expand Down
10 changes: 1 addition & 9 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,13 @@ def task_callable(ti):
@pytest.mark.parametrize(
"executor_name",
[
(executor_constants.LOCAL_KUBERNETES_EXECUTOR),
(executor_constants.CELERY_KUBERNETES_EXECUTOR),
(executor_constants.KUBERNETES_EXECUTOR),
(None),
],
)
@conf_vars(
{
("core", "EXECUTOR"): ",".join(
[
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.KUBERNETES_EXECUTOR,
]
),
("core", "EXECUTOR"): executor_constants.KUBERNETES_EXECUTOR,
}
)
@patch(
Expand Down

0 comments on commit 83643e5

Please sign in to comment.