-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PoC of AIP-69 - A first task can be called remote #40224
Closed
Closed
Changes from all commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
b5fb6b8
PoC of AIP-69 - A first task can be called remote
jscheffl 17ffaf9
Fix dependency to Airflow 2.10
jscheffl 6983471
Fix static checks
jscheffl 6be68fa
Re-add example_subdag_operator DAG but disabled in database isolation…
jscheffl 87b92c4
Sync RPC API endpoint from remote from AIP-44 changes
jscheffl f73b38e
Revert example_subdag_operator
jscheffl a9fed43
Add boilerplate test files
jscheffl 5798382
Fix dependency to Airflow 2.10
jscheffl 03e36a3
Implement some first worker concurrency
jscheffl 32d5008
Remove remote provider from compatability checks
jscheffl e5d82c1
Make pydantic a dependency to remote provider
jscheffl 1332c1c
Support for queues
jscheffl 624ed96
Notes on Windows
jscheffl 4ac9f46
Add a pytest for PR #40465
jscheffl 571b769
Started updating docs
jscheffl 5d87edf
Remove redundancy in internal API function list
jscheffl f81bcf9
Remove __init__.py from templatates
jscheffl 6df1702
Rework and add first parameters
jscheffl ad185ba
Static checks
jscheffl e37f6a0
Remove redundancy in internal API function list
jscheffl d3457a5
Adjust documentation fore new remote executor
jscheffl e871937
Update documentation for remote provider
jscheffl 83d1512
Fix pytest counting plugins
jscheffl b921fe0
Fix sdist generation
jscheffl 5b09389
Rework remote worker loop, extract functions
jscheffl 5099598
Transport logs for running tasks (Simple)
jscheffl bf8a604
Transport logs for running tasks (Simple)
jscheffl 042475b
Update TODO
jscheffl cb43d0e
Add missing pytest boilerplate
jscheffl e130fa1
Add an integration test DAG to examples for basic functionality
jscheffl 2b510a5
Update integration test to not-fail
jscheffl bd4b3eb
Update TODO
jscheffl 9474fd1
Make BashOperator compatible with Internal API AIP-44
jscheffl d0184af
Add configurability to Remote Executor
jscheffl 53ef266
Marking remote package as ready but pre-release
jscheffl 7652083
Add support for basic authentication on internal API client
jscheffl 15c3dc4
Add support for basic authentication on Remote Worker and API
jscheffl File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,6 +94,7 @@ body: | |
- presto | ||
- qdrant | ||
- redis | ||
- remote | ||
- salesforce | ||
- samba | ||
- segment | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
""" | ||
In this DAG all critical functions as integration test are contained. | ||
|
||
The DAG should work in all standard setups without error. | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
from datetime import datetime | ||
|
||
from airflow.decorators import task, task_group | ||
from airflow.exceptions import AirflowNotFoundException | ||
from airflow.hooks.base import BaseHook | ||
from airflow.models.dag import DAG | ||
from airflow.models.param import Param | ||
from airflow.models.variable import Variable | ||
from airflow.operators.bash import BashOperator | ||
from airflow.operators.empty import EmptyOperator | ||
from airflow.operators.python import PythonOperator | ||
|
||
with DAG( | ||
dag_id="integration_test", | ||
dag_display_name="Integration Test", | ||
description=__doc__.partition(".")[0], | ||
doc_md=__doc__, | ||
schedule=None, | ||
start_date=datetime(2024, 7, 1), | ||
tags=["example", "params", "integration test"], | ||
params={ | ||
"mapping_count": Param( | ||
4, | ||
type="integer", | ||
title="Mapping Count", | ||
description="Amount of tasks that should be mapped", | ||
), | ||
}, | ||
) as dag: | ||
|
||
@task | ||
def my_setup(): | ||
print("Assume this is a setup task") | ||
|
||
@task | ||
def mapping_from_params(**context) -> list[int]: | ||
mapping_count: int = context["params"]["mapping_count"] | ||
return list(range(1, mapping_count + 1)) | ||
|
||
@task | ||
def add_one(x: int): | ||
return x + 1 | ||
|
||
@task | ||
def sum_it(values): | ||
total = sum(values) | ||
print(f"Total was {total}") | ||
|
||
@task_group(prefix_group_id=False) | ||
def mapping_task_group(): | ||
added_values = add_one.expand(x=mapping_from_params()) | ||
sum_it(added_values) | ||
|
||
@task.branch | ||
def branching(): | ||
return ["bash", "virtualenv", "variable", "connection", "classic_bash", "classic_python"] | ||
|
||
@task.bash | ||
def bash(): | ||
return "echo hello world" | ||
|
||
@task.virtualenv(requirements="numpy") | ||
def virtualenv(): | ||
import numpy | ||
|
||
print(f"Welcome to virtualenv with numpy version {numpy.__version__}.") | ||
|
||
@task | ||
def variable(): | ||
Variable.set("integration_test_key", "value") | ||
assert Variable.get("integration_test_key") == "value" # noqa: S101 | ||
Variable.delete("integration_test_key") | ||
|
||
@task | ||
def connection(): | ||
try: | ||
conn = BaseHook.get_connection("integration_test") | ||
print(f"Got connection {conn}") | ||
except AirflowNotFoundException: | ||
print("Connection not found... but also OK.") | ||
|
||
@task_group(prefix_group_id=False) | ||
def standard_tasks_group(): | ||
classic_bash = BashOperator( | ||
task_id="classic_bash", bash_command="echo Parameter is {{ params.mapping_count }}" | ||
) | ||
|
||
empty = EmptyOperator(task_id="not_executed") | ||
|
||
def python_call(): | ||
print("Hello world") | ||
|
||
classic_py = PythonOperator(task_id="classic_python", python_callable=python_call) | ||
|
||
branching() >> [bash(), virtualenv(), variable(), connection(), classic_bash, classic_py, empty] | ||
|
||
@task | ||
def my_teardown(): | ||
print("Assume this is a teardown task") | ||
|
||
my_setup().as_setup() >> [mapping_task_group(), standard_tasks_group()] >> my_teardown().as_teardown() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
.. Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
|
||
.. http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
.. Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
|
||
|
||
.. NOTE TO CONTRIBUTORS: | ||
Please, only add notes to the Changelog just below the "Changelog" header when there are some breaking changes | ||
and you want to add an explanation to the users on how they are supposed to deal with them. | ||
The changelog is updated and maintained semi-automatically by release manager. | ||
|
||
``apache-airflow-providers-remote-executor`` | ||
|
||
|
||
Changelog | ||
--------- | ||
|
||
0.1.0 | ||
..... | ||
|
||
|experimental| | ||
|
||
Initial version of the provider. | ||
|
||
.. note:: | ||
This provider is currently experimental |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This change is needed allowing to re-route the Internal API calls to another endpoint (Assuming: Remote Worker API should be HTTPS (not HTTP) and needs to be exposed Internet-Facing. (App GW, Protection different than internal API)