Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Kafka operators ProduceToTopicOperator and ConsumeFromTopicOperator unusable since Airflow >= 2.10.0 #42502

Closed
1 of 2 tasks
mxmrlt opened this issue Sep 26, 2024 · 5 comments · Fixed by #42555
Closed
1 of 2 tasks
Assignees

Comments

@mxmrlt
Copy link

mxmrlt commented Sep 26, 2024

Apache Airflow Provider(s)

apache-kafka

Versions of Apache Airflow Providers

apache-airflow-providers-apache-kafka==1.6.0

Apache Airflow version

2.10.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Docker-Compose

Deployment details

No response

What happened

Since Apache Airflow 2.10.0 and introduction of the feature: callable for template_fields (#37028) operators do not longer work as it just fails at execution.

Indeed callable field producer_function of the ProduceToTopicOperator is part of template_fields which make fail DAG execution.

template_fields = (
    "topic",
    "producer_function",
    "producer_function_args",
    "producer_function_kwargs",
    "kafka_config_id",
)

Here's the execution log :

[2024-09-25, 18:13:51 CEST] {abstractoperator.py:778} ERROR - Exception rendering Jinja template for task 'produce_treats', field 'producer_function'. Template: <function prod_function at 0x7ff0cd915d00>
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/abstractoperator.py", line 768, in _do_render_template_fields
    rendered_content = value(context=context, jinja_env=jinja_env)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: prod_function() got an unexpected keyword argument 'context'
[2024-09-25, 18:13:51 CEST] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3114, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context, jinja_env=jinja_env)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3533, in render_templates
    original_task.render_template_fields(context, jinja_env)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1419, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/abstractoperator.py", line 768, in _do_render_template_fields
    rendered_content = value(context=context, jinja_env=jinja_env)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: prod_function() got an unexpected keyword argument 'context'

The same happens with apply_function field in ConsumeFromTopicOperator :

template_fields = (
    "topics",
    "apply_function",
    "apply_function_args",
    "apply_function_kwargs",
    "kafka_config_id",
)

What you think should happen instead

No response

How to reproduce

Just use the DAG example available here in an Apache Airflow instance: https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/stable/_modules/tests/system/providers/apache/kafka/example_dag_hello_kafka.html

produce_treats = ProduceToTopicOperator(
    task_id="produce_treats",
    kafka_config_id="kafka_default",
    topic=KAFKA_TOPIC,
    producer_function=prod_function,
    producer_function_args=["{{ ti.xcom_pull(task_ids='get_number_of_treats')}}"],
    producer_function_kwargs={
        "pet_name": "{{ ti.xcom_pull(task_ids='get_your_pet_name')}}"
    },
    poll_timeout=10,
)

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@mxmrlt mxmrlt added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 26, 2024
Copy link

boring-cyborg bot commented Sep 26, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@gopidesupavan
Copy link
Member

Thanks for raising this. able to re produce, will workout fix.

@gopidesupavan
Copy link
Member

@shahar1 thanks , have created please check when you get chance. not sure the build didnt proceed got this error in ci build

TARGET_BRANCH: main Unable to find image 'bash:latest' locally docker: Error response from daemon: Head "https://registry-1.docker.io/v2/library/bash/manifests/latest": unauthorized: incorrect username or password. See 'docker run --help'. Error: Process completed with exit code 125

Any idea about this? whats this error or would you be able to re-trigger please?

@shahar1
Copy link
Contributor

shahar1 commented Sep 27, 2024

@shahar1 thanks , have created please check when you get chance. not sure the build didnt proceed got this error in ci build

TARGET_BRANCH: main Unable to find image 'bash:latest' locally docker: Error response from daemon: Head "https://registry-1.docker.io/v2/library/bash/manifests/latest": unauthorized: incorrect username or password. See 'docker run --help'. Error: Process completed with exit code 125

Any idea about this? whats this error or would you be able to re-trigger please?

Seems like a Docker Hub issue:
https://www.dockerstatus.com/

@gopidesupavan
Copy link
Member

@shahar1 thanks , have created please check when you get chance. not sure the build didnt proceed got this error in ci build
TARGET_BRANCH: main Unable to find image 'bash:latest' locally docker: Error response from daemon: Head "https://registry-1.docker.io/v2/library/bash/manifests/latest": unauthorized: incorrect username or password. See 'docker run --help'. Error: Process completed with exit code 125
Any idea about this? whats this error or would you be able to re-trigger please?

Seems like a Docker Hub issue: https://www.dockerstatus.com/

Yeh correct just have seen in docker service page. There some active incidents going... :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants