Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC of AIP-69 - A first task can be called remote #40224

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b5fb6b8
PoC of AIP-69 - A first task can be called remote
jscheffl Jun 13, 2024
17ffaf9
Fix dependency to Airflow 2.10
jscheffl Jun 14, 2024
6983471
Fix static checks
jscheffl Jun 14, 2024
6be68fa
Re-add example_subdag_operator DAG but disabled in database isolation…
jscheffl Jun 15, 2024
87b92c4
Sync RPC API endpoint from remote from AIP-44 changes
jscheffl Jun 15, 2024
f73b38e
Revert example_subdag_operator
jscheffl Jun 15, 2024
a9fed43
Add boilerplate test files
jscheffl Jun 15, 2024
5798382
Fix dependency to Airflow 2.10
jscheffl Jun 15, 2024
03e36a3
Implement some first worker concurrency
jscheffl Jun 22, 2024
32d5008
Remove remote provider from compatability checks
jscheffl Jun 23, 2024
e5d82c1
Make pydantic a dependency to remote provider
jscheffl Jun 25, 2024
1332c1c
Support for queues
jscheffl Jun 25, 2024
624ed96
Notes on Windows
jscheffl Jun 25, 2024
4ac9f46
Add a pytest for PR #40465
jscheffl Jun 27, 2024
571b769
Started updating docs
jscheffl Jun 27, 2024
5d87edf
Remove redundancy in internal API function list
jscheffl Jul 3, 2024
f81bcf9
Remove __init__.py from templatates
jscheffl Jul 4, 2024
6df1702
Rework and add first parameters
jscheffl Jul 5, 2024
ad185ba
Static checks
jscheffl Jul 6, 2024
e37f6a0
Remove redundancy in internal API function list
jscheffl Jul 6, 2024
d3457a5
Adjust documentation fore new remote executor
jscheffl Jul 9, 2024
e871937
Update documentation for remote provider
jscheffl Jul 9, 2024
83d1512
Fix pytest counting plugins
jscheffl Jul 10, 2024
b921fe0
Fix sdist generation
jscheffl Jul 13, 2024
5b09389
Rework remote worker loop, extract functions
jscheffl Jul 13, 2024
5099598
Transport logs for running tasks (Simple)
jscheffl Jul 13, 2024
bf8a604
Transport logs for running tasks (Simple)
jscheffl Jul 13, 2024
042475b
Update TODO
jscheffl Jul 13, 2024
cb43d0e
Add missing pytest boilerplate
jscheffl Jul 13, 2024
e130fa1
Add an integration test DAG to examples for basic functionality
jscheffl Jul 13, 2024
2b510a5
Update integration test to not-fail
jscheffl Jul 14, 2024
bd4b3eb
Update TODO
jscheffl Jul 16, 2024
9474fd1
Make BashOperator compatible with Internal API AIP-44
jscheffl Jul 17, 2024
d0184af
Add configurability to Remote Executor
jscheffl Jul 17, 2024
53ef266
Marking remote package as ready but pre-release
jscheffl Jul 18, 2024
7652083
Add support for basic authentication on internal API client
jscheffl Jul 19, 2024
15c3dc4
Add support for basic authentication on Remote Worker and API
jscheffl Jul 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ body:
- presto
- qdrant
- redis
- remote
- salesforce
- samba
- segment
Expand Down
6 changes: 3 additions & 3 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ cncf.kubernetes, cohere, common.compat, common.io, common.sql, databricks, datad
dingding, discord, docker, elasticsearch, exasol, fab, facebook, ftp, github, google, grpc,
hashicorp, http, imap, influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.psrp,
microsoft.winrm, mongo, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie,
oracle, pagerduty, papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce,
samba, segment, sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular,
telegram, teradata, trino, vertica, weaviate, yandex, ydb, zendesk
oracle, pagerduty, papermill, pgvector, pinecone, postgres, presto, qdrant, redis, remote,
salesforce, samba, segment, sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh,
tableau, tabular, telegram, teradata, trino, vertica, weaviate, yandex, ydb, zendesk

# END PROVIDER EXTRAS HERE

Expand Down
4 changes: 2 additions & 2 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@


@functools.lru_cache
def _initialize_map() -> dict[str, Callable]:
def initialize_method_map() -> dict[str, Callable]:
from airflow.cli.commands.task_command import _get_ti_db_access
from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.dag_processing.processor import DagFileProcessor
Expand Down Expand Up @@ -148,7 +148,7 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
if json_rpc != "2.0":
return log_and_build_error_response(message="Expected jsonrpc 2.0 request.", status=400)

methods_map = _initialize_map()
methods_map = initialize_method_map()
method_name = body.get("method")
if method_name not in methods_map:
return log_and_build_error_response(message=f"Unrecognized method: {method_name}.", status=400)
Expand Down
45 changes: 37 additions & 8 deletions airflow/api_internal/internal_api_call.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This change is needed allowing to re-route the Internal API calls to another endpoint (Assuming: Remote Worker API should be HTTPS (not HTTP) and needs to be exposed Internet-Facing. (App GW, Protection different than internal API)

Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
import json
import logging
from functools import wraps
from typing import Callable, TypeVar
from typing import TYPE_CHECKING, Callable, TypeVar

import requests
import tenacity
from requests.auth import HTTPBasicAuth
from urllib3.exceptions import NewConnectionError

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.settings import _ENABLE_AIP_44
from airflow.typing_compat import ParamSpec

if TYPE_CHECKING:
from requests.auth import AuthBase

PS = ParamSpec("PS")
RT = TypeVar("RT")

Expand All @@ -44,6 +48,7 @@ class InternalApiConfig:
_initialized = False
_use_internal_api = False
_internal_api_endpoint = ""
_internal_api_auth: AuthBase | None = None

@staticmethod
def force_database_direct_access():
Expand All @@ -56,6 +61,19 @@ def force_database_direct_access():
InternalApiConfig._initialized = True
InternalApiConfig._use_internal_api = False

@staticmethod
def force_api_access(api_endpoint: str, auth: AuthBase):
"""
Force using Internal API with provided endpoint.

All methods decorated with internal_api_call will always be executed remote/via API.
This mode is needed for remote setups/remote executor.
"""
InternalApiConfig._initialized = True
InternalApiConfig._use_internal_api = True
InternalApiConfig._internal_api_endpoint = api_endpoint
InternalApiConfig._internal_api_auth = auth

@staticmethod
def get_use_internal_api():
if not InternalApiConfig._initialized:
Expand All @@ -68,21 +86,31 @@ def get_internal_api_endpoint():
InternalApiConfig._init_values()
return InternalApiConfig._internal_api_endpoint

@staticmethod
def get_auth() -> AuthBase | None:
return InternalApiConfig._internal_api_auth

@staticmethod
def _init_values():
use_internal_api = conf.getboolean("core", "database_access_isolation", fallback=False)
if use_internal_api and not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it.")
internal_api_endpoint = ""
if use_internal_api:
internal_api_url = conf.get("core", "internal_api_url")
internal_api_endpoint = internal_api_url + "/internal_api/v1/rpcapi"
if not internal_api_endpoint.startswith("http://"):
raise AirflowConfigException("[core]internal_api_url must start with http://")
internal_api_endpoint = conf.get("core", "internal_api_url")
if internal_api_endpoint.find("/", 8) == -1:
internal_api_endpoint = internal_api_endpoint + "/internal_api/v1/rpcapi"
if not internal_api_endpoint.startswith("http://") and not internal_api_endpoint.startswith(
"https://"
):
raise AirflowConfigException("[core]internal_api_url must start with http:// or https://")
InternalApiConfig._internal_api_endpoint = internal_api_endpoint
internal_api_user = conf.get("core", "internal_api_user")
internal_api_password = conf.get("core", "internal_api_password")
if internal_api_user and internal_api_password:
InternalApiConfig._internal_api_auth = HTTPBasicAuth(internal_api_user, internal_api_password)

InternalApiConfig._initialized = True
InternalApiConfig._use_internal_api = use_internal_api
InternalApiConfig._internal_api_endpoint = internal_api_endpoint


def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]:
Expand Down Expand Up @@ -112,7 +140,8 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]:
def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
data = {"jsonrpc": "2.0", "method": method_name, "params": params_json}
internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint()
response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers)
auth = InternalApiConfig.get_auth()
response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers, auth=auth)
if response.status_code != 200:
raise AirflowException(
f"Got {response.status_code}:{response.reason} when sending "
Expand Down
126 changes: 126 additions & 0 deletions airflow/example_dags/integration_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
In this DAG all critical functions as integration test are contained.

The DAG should work in all standard setups without error.
"""

from __future__ import annotations

from datetime import datetime

from airflow.decorators import task, task_group
from airflow.exceptions import AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.models.variable import Variable
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

with DAG(
dag_id="integration_test",
dag_display_name="Integration Test",
description=__doc__.partition(".")[0],
doc_md=__doc__,
schedule=None,
start_date=datetime(2024, 7, 1),
tags=["example", "params", "integration test"],
params={
"mapping_count": Param(
4,
type="integer",
title="Mapping Count",
description="Amount of tasks that should be mapped",
),
},
) as dag:

@task
def my_setup():
print("Assume this is a setup task")

@task
def mapping_from_params(**context) -> list[int]:
mapping_count: int = context["params"]["mapping_count"]
return list(range(1, mapping_count + 1))

@task
def add_one(x: int):
return x + 1

@task
def sum_it(values):
total = sum(values)
print(f"Total was {total}")

@task_group(prefix_group_id=False)
def mapping_task_group():
added_values = add_one.expand(x=mapping_from_params())
sum_it(added_values)

@task.branch
def branching():
return ["bash", "virtualenv", "variable", "connection", "classic_bash", "classic_python"]

@task.bash
def bash():
return "echo hello world"

@task.virtualenv(requirements="numpy")
def virtualenv():
import numpy

print(f"Welcome to virtualenv with numpy version {numpy.__version__}.")

@task
def variable():
Variable.set("integration_test_key", "value")
assert Variable.get("integration_test_key") == "value" # noqa: S101
Variable.delete("integration_test_key")

@task
def connection():
try:
conn = BaseHook.get_connection("integration_test")
print(f"Got connection {conn}")
except AirflowNotFoundException:
print("Connection not found... but also OK.")

@task_group(prefix_group_id=False)
def standard_tasks_group():
classic_bash = BashOperator(
task_id="classic_bash", bash_command="echo Parameter is {{ params.mapping_count }}"
)

empty = EmptyOperator(task_id="not_executed")

def python_call():
print("Hello world")

classic_py = PythonOperator(task_id="classic_python", python_callable=python_call)

branching() >> [bash(), virtualenv(), variable(), connection(), classic_bash, classic_py, empty]

@task
def my_teardown():
print("Assume this is a teardown task")

my_setup().as_setup() >> [mapping_task_group(), standard_tasks_group()] >> my_teardown().as_teardown()
2 changes: 2 additions & 0 deletions airflow/executors/executor_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ConnectorSource(Enum):
CELERY_EXECUTOR = "CeleryExecutor"
CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor"
KUBERNETES_EXECUTOR = "KubernetesExecutor"
REMOTE_EXECUTOR = "RemoteExecutor"
DEBUG_EXECUTOR = "DebugExecutor"
MOCK_EXECUTOR = "MockExecutor"
CORE_EXECUTOR_NAMES = {
Expand All @@ -43,6 +44,7 @@ class ConnectorSource(Enum):
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
KUBERNETES_EXECUTOR,
REMOTE_EXECUTOR,
DEBUG_EXECUTOR,
MOCK_EXECUTOR,
}
5 changes: 5 additions & 0 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
REMOTE_EXECUTOR,
SEQUENTIAL_EXECUTOR,
ConnectorSource,
)
Expand Down Expand Up @@ -70,6 +71,7 @@ class ExecutorLoader:
"executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
"executors.kubernetes_executor.KubernetesExecutor",
REMOTE_EXECUTOR: "airflow.providers.remote.executors.RemoteExecutor",
DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
}

Expand Down Expand Up @@ -334,6 +336,9 @@ def validate_database_executor_compatibility(cls, executor: type[BaseExecutor])
if InternalApiConfig.get_use_internal_api():
return

if executor.__name__ == REMOTE_EXECUTOR:
return

from airflow.settings import engine

# SQLite only works with single threaded executors
Expand Down
38 changes: 38 additions & 0 deletions airflow/providers/remote/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.


.. NOTE TO CONTRIBUTORS:
Please, only add notes to the Changelog just below the "Changelog" header when there are some breaking changes
and you want to add an explanation to the users on how they are supposed to deal with them.
The changelog is updated and maintained semi-automatically by release manager.

``apache-airflow-providers-remote-executor``


Changelog
---------

0.1.0
.....

|experimental|

Initial version of the provider.

.. note::
This provider is currently experimental
Loading
Loading