Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heartbeat timeout docs #46257

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bb8d47f
Emphasize task heartbeat timeout terminology in docs to match logs
karenbraganz Jan 29, 2025
7ab777a
Grammatical correction
karenbraganz Jan 29, 2025
8aa1f72
Grammatical correction
karenbraganz Jan 29, 2025
bdab0a7
Merge branch 'main' into heartbeat_timeout_docs
karenbraganz Jan 29, 2025
7f92de7
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Feb 5, 2025
6b8e595
edit docs
karenbraganz Feb 5, 2025
5e0aa2d
redirect URL
karenbraganz Feb 5, 2025
b479fdc
Update docs/apache-airflow/core-concepts/tasks.rst
karenbraganz Feb 5, 2025
1d16f19
Update docs/apache-airflow/core-concepts/tasks.rst
karenbraganz Feb 5, 2025
5eb20ba
Update docs/apache-airflow/core-concepts/tasks.rst
karenbraganz Feb 5, 2025
270ccbf
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Feb 15, 2025
1e554f8
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Feb 22, 2025
bdcbeda
Edit docs
karenbraganz Feb 16, 2025
bc09224
Update config.yml with new config names
karenbraganz Feb 16, 2025
09d08fc
Update code to use heartbeat timeout terminology
karenbraganz Feb 24, 2025
6752b77
Update code to include heartbeat timeout terminology
karenbraganz Feb 24, 2025
4424e65
Fix incorrect config name in test_sync_orphaned_tasks
karenbraganz Feb 28, 2025
8887ba2
Merge branch 'main' into heartbeat_timeout_docs
karenbraganz Mar 4, 2025
58a2086
Change task_instance_heartbeat_timeout_threshold to task_instance_hea…
karenbraganz Mar 4, 2025
1409a84
Update supervisor.py
karenbraganz Mar 6, 2025
7339e4b
Merge branch 'main' into heartbeat_timeout_docs
karenbraganz Mar 6, 2025
954ab0e
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Mar 6, 2025
7a8e846
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Mar 6, 2025
e586bcb
Merge branch 'main' into heartbeat_timeout_docs
karenbraganz Mar 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BaseCallbackRequest(BaseModel):
bundle_name: str
bundle_version: str | None
msg: str | None = None
"""Additional Message that can be used for logging to determine failure/zombie"""
"""Additional Message that can be used for logging to determine failure/task heartbeat timeout"""

@classmethod
def from_json(cls, data: str | bytes | bytearray) -> Self:
Expand All @@ -54,7 +54,7 @@ class TaskCallbackRequest(BaseCallbackRequest):
Task callback status information.

A Class with information about the success/failure TI callback to be executed. Currently, only failure
callbacks (when tasks are externally killed) and Zombies are run via DagFileProcessorProcess.
callbacks when tasks are externally killed or experience heartbeat timeouts are run via DagFileProcessorProcess.
"""

ti: ti_datamodel.TaskInstance
Expand Down
12 changes: 12 additions & 0 deletions airflow/cli/commands/remote_commands/config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,18 @@ def message(self) -> str:
config=ConfigParameter("scheduler", "dag_dir_list_interval"),
renamed_to=ConfigParameter("dag_processor", "refresh_interval"),
),
ConfigChange(
config=ConfigParameter("scheduler", "local_task_job_heartbeat_sec"),
renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_sec"),
),
ConfigChange(
config=ConfigParameter("scheduler", "scheduler_zombie_task_threshold"),
renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_timeout"),
),
ConfigChange(
config=ConfigParameter("scheduler", "zombie_detection_interval"),
renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_timeout_detection_interval"),
),
# celery
ConfigChange(
config=ConfigParameter("celery", "stalled_task_timeout"),
Expand Down
10 changes: 5 additions & 5 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2200,11 +2200,11 @@ scheduler:
type: integer
example: ~
default: "5"
local_task_job_heartbeat_sec:
task_instance_heartbeat_sec:
description: |
The frequency (in seconds) at which the LocalTaskJob should send heartbeat signals to the
scheduler to notify it's still alive. If this value is set to 0, the heartbeat interval will default
to the value of ``[scheduler] scheduler_zombie_task_threshold``.
to the value of ``[scheduler] task_instance_heartbeat_timeout``.
version_added: 2.7.0
type: integer
example: ~
Expand Down Expand Up @@ -2290,7 +2290,7 @@ scheduler:
type: string
example: ~
default: "{AIRFLOW_HOME}/logs/scheduler"
scheduler_zombie_task_threshold:
task_instance_heartbeat_timeout:
description: |
Local task jobs periodically heartbeat to the DB. If the job has
not heartbeat in this many seconds, the scheduler will mark the
Expand All @@ -2299,9 +2299,9 @@ scheduler:
type: integer
example: ~
default: "300"
zombie_detection_interval:
task_instance_heartbeat_timeout_detection_interval:
description: |
How often (in seconds) should the scheduler check for zombie tasks.
How often (in seconds) should the scheduler check for task instances whose heartbeats have timed out.
version_added: 2.3.0
type: float
example: ~
Expand Down
94 changes: 57 additions & 37 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ def __init__(
# configure -- they'll want num_runs
self.num_times_parse_dags = num_times_parse_dags
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
self._zombie_threshold_secs = conf.getint("scheduler", "scheduler_zombie_task_threshold")
# How many seconds do we wait for tasks to heartbeat before timeout.
self._task_instance_heartbeat_timeout_secs = conf.getint(
"scheduler", "task_instance_heartbeat_timeout"
)
self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")
self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")
self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc")
Expand Down Expand Up @@ -827,7 +829,7 @@ def process_executor_events(
# or the TI is queued by another job. Either ways we should not fail it.

# All of this could also happen if the state is "running",
# but that is handled by the zombie detection.
# but that is handled by the scheduler detecting task instances without heartbeats.

ti_queued = ti.try_number == buffer_key.try_number and ti.state in (
TaskInstanceState.SCHEDULED,
Expand Down Expand Up @@ -1016,8 +1018,8 @@ def _run_scheduler_loop(self) -> None:
)

timers.call_regular_interval(
conf.getfloat("scheduler", "zombie_detection_interval", fallback=10.0),
self._find_and_purge_zombies,
conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0),
self._find_and_purge_task_instances_without_heartbeats,
)

timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)
Expand Down Expand Up @@ -1950,26 +1952,30 @@ def check_trigger_timeouts(
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)

# [START find_and_purge_zombies]
def _find_and_purge_zombies(self) -> None:
# [START find_and_purge_task_instances_without_heartbeats]
def _find_and_purge_task_instances_without_heartbeats(self) -> None:
"""
Find and purge zombie task instances.
Find and purge task instances without heartbeats.

Zombie instances are tasks that failed to heartbeat for too long, or
have a no-longer-running LocalTaskJob.
Task instances that failed to heartbeat for too long, or
have a no-longer-running LocalTaskJob will be failed by the scheduler.

A TaskCallbackRequest is also created for the killed zombie to be
A TaskCallbackRequest is also created for the killed task instance to be
handled by the DAG processor, and the executor is informed to no longer
count the zombie as running when it calculates parallelism.
count the task instance as running when it calculates parallelism.
"""
with create_session() as session:
if zombies := self._find_zombies(session=session):
self._purge_zombies(zombies, session=session)
if task_instances_without_heartbeats := self._find_task_instances_without_heartbeats(
session=session
):
self._purge_task_instances_without_heartbeats(
task_instances_without_heartbeats, session=session
)

def _find_zombies(self, *, session: Session) -> list[TI]:
def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[TI]:
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
zombies = session.scalars(
limit_dttm = timezone.utcnow() - timedelta(seconds=self._task_instance_heartbeat_timeout_secs)
task_instances_without_heartbeats = session.scalars(
select(TI)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(DM, TI.dag_id == DM.dag_id)
Expand All @@ -1979,63 +1985,77 @@ def _find_zombies(self, *, session: Session) -> list[TI]:
)
.where(TI.queued_by_job_id == self.job.id)
).all()
if zombies:
self.log.warning("Failing %s TIs without heartbeat after %s", len(zombies), limit_dttm)
return zombies
if task_instances_without_heartbeats:
self.log.warning(
"Failing %s TIs without heartbeat after %s",
len(task_instances_without_heartbeats),
limit_dttm,
)
return task_instances_without_heartbeats

def _purge_zombies(self, zombies: list[TI], *, session: Session) -> None:
for ti in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
def _purge_task_instances_without_heartbeats(
self, task_instances_without_heartbeats: list[TI], *, session: Session
) -> None:
for ti in task_instances_without_heartbeats:
task_instance_heartbeat_timeout_message_details = (
self._generate_task_instance_heartbeat_timeout_message_details(ti)
)
request = TaskCallbackRequest(
filepath=ti.dag_model.relative_fileloc,
bundle_name=ti.dag_version.bundle_name,
bundle_version=ti.dag_run.bundle_version,
ti=ti,
msg=str(zombie_message_details),
msg=str(task_instance_heartbeat_timeout_message_details),
)
session.add(
Log(
event="heartbeat timeout",
task_instance=ti.key,
extra=(
f"Task did not emit heartbeat within time limit ({self._zombie_threshold_secs} "
f"Task did not emit heartbeat within time limit ({self._task_instance_heartbeat_timeout_secs} "
"seconds) and will be terminated. "
"See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-tasks"
"stable/core-concepts/tasks.html#task-instance-heartbeat-timeout"
),
)
)
self.log.error(
"Detected zombie job: %s "
"Detected a task instance without a heartbeat: %s "
"(See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-tasks)",
"stable/core-concepts/tasks.html#task-instance-heartbeat-timeout)",
request,
)
self.job.executor.send_callback(request)
if (executor := self._try_to_load_executor(ti.executor)) is None:
self.log.warning("Cannot clean up zombie %r with non-existent executor %s", ti, ti.executor)
self.log.warning(
"Cannot clean up task instance without heartbeat %r with non-existent executor %s",
ti,
ti.executor,
)
continue
executor.change_state(ti.key, TaskInstanceState.FAILED, remove_running=True)
Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})
Stats.incr(
"task_instances_without_heartbeats_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id}
)

# [END find_and_purge_zombies]
# [END find_and_purge_task_instances_without_heartbeats]

@staticmethod
def _generate_zombie_message_details(ti: TI) -> dict[str, Any]:
zombie_message_details = {
def _generate_task_instance_heartbeat_timeout_message_details(ti: TI) -> dict[str, Any]:
task_instance_heartbeat_timeout_message_details = {
"DAG Id": ti.dag_id,
"Task Id": ti.task_id,
"Run Id": ti.run_id,
}

if ti.map_index != -1:
zombie_message_details["Map Index"] = ti.map_index
task_instance_heartbeat_timeout_message_details["Map Index"] = ti.map_index
if ti.hostname:
zombie_message_details["Hostname"] = ti.hostname
task_instance_heartbeat_timeout_message_details["Hostname"] = ti.hostname
if ti.external_executor_id:
zombie_message_details["External Executor Id"] = ti.external_executor_id
task_instance_heartbeat_timeout_message_details["External Executor Id"] = ti.external_executor_id

return zombie_message_details
return task_instance_heartbeat_timeout_message_details

@provide_session
def _update_asset_orphanage(self, session: Session = NEW_SESSION) -> None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3105,7 +3105,7 @@ def fetch_handle_failure_context(

ti.clear_next_method_args()

# In extreme cases (zombie in case of dag with parse error) we might _not_ have a Task.
# In extreme cases (task instance heartbeat timeout in case of dag with parse error) we might _not_ have a Task.
if context is None and getattr(ti, "task", None):
context = ti.get_template_context(session)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Name Descripti
``ti_failures`` Overall task instances failures. Metric with dag_id and task_id tagging.
``ti_successes`` Overall task instances successes. Metric with dag_id and task_id tagging.
``previously_succeeded`` Number of previously succeeded task instances. Metric with dag_id and task_id tagging.
``zombies_killed`` Zombie tasks killed. Metric with dag_id and task_id tagging.
``task_instances_without_heartbeats_killed`` Task instances without heartbeats killed. Metric with dag_id and task_id tagging.
``scheduler_heartbeat`` Scheduler heartbeats
``dag_processor_heartbeat`` Standalone DAG processor heartbeats
``dag_processing.processes`` Relative number of currently running DAG parsing processes (ie this delta
Expand Down
26 changes: 13 additions & 13 deletions docs/apache-airflow/core-concepts/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -165,35 +165,35 @@ If you want to control your task's state from within custom Task/Operator code,

These can be useful if your code has extra knowledge about its environment and wants to fail/skip faster - e.g., skipping when it knows there's no data available, or fast-failing when it detects its API key is invalid (as that will not be fixed by a retry).

.. _concepts:zombies:
.. _concepts:task-instance-heartbeat-timeout:

Zombie Tasks
------------
Task Instance Heartbeat Timeout
-------------------------------

No system runs perfectly, and task instances are expected to die once in a while.

*Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite their associated jobs being inactive
(e.g. their process did not send a recent heartbeat as it got killed, or the machine died). Airflow will find these
periodically, clean them up, and either fail or retry the task depending on its settings. Tasks can become zombies for
``TaskInstances`` may get stuck in a ``running`` state despite their associated jobs being inactive
(for example if the ``TaskInstance``'s worker ran out of memory). Such tasks were formerly known as zombie tasks. Airflow will find these
periodically, clean them up, and mark the ``TaskInstance`` as failed or retry it if it has available retries. The ``TaskInstance``'s heartbeat can timeout for
many reasons, including:

* The Airflow worker ran out of memory and was OOMKilled.
* The Airflow worker failed its liveness probe, so the system (for example, Kubernetes) restarted the worker.
* The system (for example, Kubernetes) scaled down and moved an Airflow worker from one node to another.


Reproducing zombie tasks locally
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Reproducing task instance heartbeat timeouts locally
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you'd like to reproduce zombie tasks for development/testing processes, follow the steps below:
If you'd like to reproduce task instance heartbeat timeouts for development/testing processes, follow the steps below:

1. Set the below environment variables for your local Airflow setup (alternatively you could tweak the corresponding config values in airflow.cfg)

.. code-block:: bash

export AIRFLOW__SCHEDULER__LOCAL_TASK_JOB_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=2
export AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=5
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT=2
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT_DETECTION_INTERVAL=5


2. Have a DAG with a task that takes about 10 minutes to complete(i.e. a long-running task). For example, you could use the below DAG:
Expand All @@ -216,7 +216,7 @@ If you'd like to reproduce zombie tasks for development/testing processes, follo
sleep_dag()


Run the above DAG and wait for a while. You should see the task instance becoming a zombie task and then being killed by the scheduler.
Run the above DAG and wait for a while. The ``TaskInstance`` will be marked failed after <task_instance_heartbeat_timeout> seconds.



Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ Why did my task fail with no logs in the UI?
Logs are :ref:`typically served when a task reaches a terminal state <serving-worker-trigger-logs>`. Sometimes, a task's normal lifecycle is disrupted, and the task's
worker is unable to write the task's logs. This typically happens for one of two reasons:

1. :ref:`Zombie tasks <concepts:zombies>`.
1. :ref:`Task Instance Heartbeat Timeout <concepts:task-instance-heartbeat-timeout>`.
2. Tasks failed after getting stuck in queued (Airflow 2.6.0+). Tasks that are in queued for longer than :ref:`scheduler.task_queued_timeout <config:scheduler__task_queued_timeout>` will be marked as failed, and there will be no task logs in the Airflow UI.

Setting retries for each task drastically reduces the chance that either of these problems impact a workflow.
Expand Down
3 changes: 2 additions & 1 deletion docs/apache-airflow/static/redirects.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

document.addEventListener("DOMContentLoaded", function () {
const redirects = {
"zombie-undead-tasks": "zombie-tasks",
"zombie-undead-tasks": "task-instance-heartbeat-timeout",
"zombie-tasks": "task-instance-heartbeat-timeout",
};
const fragment = window.location.hash.substring(1);
if (redirects[fragment]) {
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Below are some example scenarios that could cause a task's state to change by a

- If a task's DAG failed to parse on the worker, the scheduler may mark the task as failed. If confirmed, consider increasing :ref:`core.dagbag_import_timeout <config:core__dagbag_import_timeout>` and :ref:`dag_processor.dag_file_processor_timeout <config:dag_processor__dag_file_processor_timeout>`.
- The scheduler will mark a task as failed if the task has been queued for longer than :ref:`scheduler.task_queued_timeout <config:scheduler__task_queued_timeout>`.
- If a task becomes a :ref:`zombie <concepts:zombies>`, it will be marked failed by the scheduler.
- If a :ref:`task instance's heartbeat times out <concepts:task-instance-heartbeat-timeout>`, it will be marked failed by the scheduler.
- A user marked the task as successful or failed in the Airflow UI.
- An external script or process used the :doc:`Airflow REST API <stable-rest-api-ref>` to change the state of a task.

Expand All @@ -45,4 +45,4 @@ Here are some examples that could cause such an event:

- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition.
- An Airflow worker running out of memory
- Usually, Airflow workers that run out of memory receive a SIGKILL and are marked as a zombie and failed by the scheduler. However, in some scenarios, Airflow kills the task before that happens.
- Usually, Airflow workers that run out of memory receive a SIGKILL, and the scheduler will fail the corresponding task instance for not having a heartbeat. However, in some scenarios, Airflow kills the task before that happens.
Loading
Loading