Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heartbeat timeout docs #46257

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bb8d47f
Emphasize task heartbeat timeout terminology in docs to match logs
karenbraganz Jan 29, 2025
7ab777a
Grammatical correction
karenbraganz Jan 29, 2025
8aa1f72
Grammatical correction
karenbraganz Jan 29, 2025
bdab0a7
Merge branch 'main' into heartbeat_timeout_docs
karenbraganz Jan 29, 2025
7f92de7
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Feb 5, 2025
6b8e595
edit docs
karenbraganz Feb 5, 2025
5e0aa2d
redirect URL
karenbraganz Feb 5, 2025
b479fdc
Update docs/apache-airflow/core-concepts/tasks.rst
karenbraganz Feb 5, 2025
1d16f19
Update docs/apache-airflow/core-concepts/tasks.rst
karenbraganz Feb 5, 2025
5eb20ba
Update docs/apache-airflow/core-concepts/tasks.rst
karenbraganz Feb 5, 2025
270ccbf
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Feb 15, 2025
1e554f8
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Feb 22, 2025
bdcbeda
Edit docs
karenbraganz Feb 16, 2025
bc09224
Update config.yml with new config names
karenbraganz Feb 16, 2025
09d08fc
Update code to use heartbeat timeout terminology
karenbraganz Feb 24, 2025
6752b77
Update code to include heartbeat timeout terminology
karenbraganz Feb 24, 2025
4424e65
Fix incorrect config name in test_sync_orphaned_tasks
karenbraganz Feb 28, 2025
8887ba2
Merge branch 'main' into heartbeat_timeout_docs
karenbraganz Mar 4, 2025
58a2086
Change task_instance_heartbeat_timeout_threshold to task_instance_hea…
karenbraganz Mar 4, 2025
1409a84
Update supervisor.py
karenbraganz Mar 6, 2025
7339e4b
Merge branch 'main' into heartbeat_timeout_docs
karenbraganz Mar 6, 2025
954ab0e
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Mar 6, 2025
7a8e846
Merge branch 'apache:main' into heartbeat_timeout_docs
karenbraganz Mar 6, 2025
e586bcb
Merge branch 'main' into heartbeat_timeout_docs
karenbraganz Mar 8, 2025
cbcd0f0
Check for Edge executor test config version compatibility
karenbraganz Mar 11, 2025
db669b0
Merge branch 'main' into heartbeat_timeout_docs
karenbraganz Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BaseCallbackRequest(BaseModel):
bundle_name: str
bundle_version: str | None
msg: str | None = None
"""Additional Message that can be used for logging to determine failure/zombie"""
"""Additional Message that can be used for logging to determine failure/task heartbeat timeout"""

@classmethod
def from_json(cls, data: str | bytes | bytearray) -> Self:
Expand All @@ -54,7 +54,7 @@ class TaskCallbackRequest(BaseCallbackRequest):
Task callback status information.
A Class with information about the success/failure TI callback to be executed. Currently, only failure
callbacks (when tasks are externally killed) and Zombies are run via DagFileProcessorProcess.
callbacks when tasks are externally killed or experience heartbeat timeouts are run via DagFileProcessorProcess.
"""

ti: ti_datamodel.TaskInstance
Expand Down
12 changes: 12 additions & 0 deletions airflow/cli/commands/remote_commands/config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,18 @@ def message(self) -> str:
config=ConfigParameter("scheduler", "dag_dir_list_interval"),
renamed_to=ConfigParameter("dag_processor", "refresh_interval"),
),
ConfigChange(
config=ConfigParameter("scheduler", "local_task_job_heartbeat_sec"),
renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_sec"),
),
ConfigChange(
config=ConfigParameter("scheduler", "scheduler_zombie_task_threshold"),
renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_timeout_threshold"),
),
ConfigChange(
config=ConfigParameter("scheduler", "zombie_detection_interval"),
renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_timeout_detection_interval"),
),
# celery
ConfigChange(
config=ConfigParameter("celery", "stalled_task_timeout"),
Expand Down
10 changes: 5 additions & 5 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2217,11 +2217,11 @@ scheduler:
type: integer
example: ~
default: "5"
local_task_job_heartbeat_sec:
task_instance_heartbeat_sec:
description: |
The frequency (in seconds) at which the LocalTaskJob should send heartbeat signals to the
scheduler to notify it's still alive. If this value is set to 0, the heartbeat interval will default
to the value of ``[scheduler] scheduler_zombie_task_threshold``.
to the value of ``[scheduler] task_instance_heartbeat_timeout_threshold``.
version_added: 2.7.0
type: integer
example: ~
Expand Down Expand Up @@ -2307,7 +2307,7 @@ scheduler:
type: string
example: ~
default: "{AIRFLOW_HOME}/logs/scheduler"
scheduler_zombie_task_threshold:
task_instance_heartbeat_timeout_threshold:
description: |
Local task jobs periodically heartbeat to the DB. If the job has
not heartbeat in this many seconds, the scheduler will mark the
Expand All @@ -2316,9 +2316,9 @@ scheduler:
type: integer
example: ~
default: "300"
zombie_detection_interval:
task_instance_heartbeat_timeout_detection_interval:
description: |
How often (in seconds) should the scheduler check for zombie tasks.
How often (in seconds) should the scheduler check for task instances whose heartbeats have timed out.
version_added: 2.3.0
type: float
example: ~
Expand Down
12 changes: 6 additions & 6 deletions airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,19 @@ def sigusr2_debug_handler(signum, frame):
return_code = None
try:
self.task_runner.start()
local_task_job_heartbeat_sec = conf.getint("scheduler", "local_task_job_heartbeat_sec")
if local_task_job_heartbeat_sec < 1:
heartbeat_time_limit = conf.getint("scheduler", "scheduler_zombie_task_threshold")
task_instance_heartbeat_sec = conf.getint("scheduler", "task_instance_heartbeat_sec")
if task_instance_heartbeat_sec < 1:
heartbeat_time_limit = conf.getint("scheduler", "task_instance_heartbeat_timeout_threshold")
else:
heartbeat_time_limit = local_task_job_heartbeat_sec
heartbeat_time_limit = task_instance_heartbeat_sec

# LocalTaskJob should not run callbacks, which are handled by TaskInstance._run_raw_task
# 1, LocalTaskJob does not parse DAG, thus cannot run callbacks
# 2, The run_as_user of LocalTaskJob is likely not same as the TaskInstance._run_raw_task.
# When run_as_user is specified, the process owner of the LocalTaskJob must be sudoable.
# It is not secure to run callbacks with sudoable users.

# If _run_raw_task receives SIGKILL, scheduler will mark it as zombie and invoke callbacks
# If _run_raw_task receives SIGKILL, the scheduler will fail the task instance due to heartbeat timeout and invoke callbacks
# If LocalTaskJob receives SIGTERM, LocalTaskJob passes SIGTERM to _run_raw_task
# If the state of task_instance is changed, LocalTaskJob sends SIGTERM to _run_raw_task
while not self.terminating:
Expand Down Expand Up @@ -210,7 +210,7 @@ def sigusr2_debug_handler(signum, frame):
)
except Exception as e:
# Failing the heartbeat should never kill the localtaskjob
# If it repeatedly can't heartbeat, it will be marked as a zombie anyhow
# If it repeatedly can't heartbeat, the scheduler will fail the task instance due to heartbeat timeout
self.log.warning("Heartbeat failed with Exception: %s", e)

# If it's been too long since we've heartbeat, then it's possible that
Expand Down
96 changes: 59 additions & 37 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ def __init__(
# configure -- they'll want num_runs
self.num_times_parse_dags = num_times_parse_dags
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
self._zombie_threshold_secs = conf.getint("scheduler", "scheduler_zombie_task_threshold")
# How many seconds do we wait for tasks to heartbeat before timeout.
self._task_instance_heartbeat_timeout_threshold_secs = conf.getint(
"scheduler", "task_instance_heartbeat_timeout_threshold"
)
self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")
self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")
self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc")
Expand Down Expand Up @@ -818,7 +820,7 @@ def process_executor_events(
# or the TI is queued by another job. Either ways we should not fail it.

# All of this could also happen if the state is "running",
# but that is handled by the zombie detection.
# but that is handled by the scheduler detecting task instances without heartbeats.

ti_queued = ti.try_number == buffer_key.try_number and ti.state in (
TaskInstanceState.SCHEDULED,
Expand Down Expand Up @@ -1007,8 +1009,8 @@ def _run_scheduler_loop(self) -> None:
)

timers.call_regular_interval(
conf.getfloat("scheduler", "zombie_detection_interval", fallback=10.0),
self._find_and_purge_zombies,
conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0),
self._find_and_purge_task_instances_without_heartbeats,
)

timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)
Expand Down Expand Up @@ -1962,26 +1964,32 @@ def check_trigger_timeouts(
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)

# [START find_and_purge_zombies]
def _find_and_purge_zombies(self) -> None:
# [START find_and_purge_task_instances_without_heartbeats]
def _find_and_purge_task_instances_without_heartbeats(self) -> None:
"""
Find and purge zombie task instances.
Find and purge task instances without heartbeats.

Zombie instances are tasks that failed to heartbeat for too long, or
have a no-longer-running LocalTaskJob.
Task instances that failed to heartbeat for too long, or
have a no-longer-running LocalTaskJob will be failed by the scheduler.

A TaskCallbackRequest is also created for the killed zombie to be
A TaskCallbackRequest is also created for the killed task instance to be
handled by the DAG processor, and the executor is informed to no longer
count the zombie as running when it calculates parallelism.
count the task instance as running when it calculates parallelism.
"""
with create_session() as session:
if zombies := self._find_zombies(session=session):
self._purge_zombies(zombies, session=session)
if task_instances_without_heartbeats := self._find_task_instances_without_heartbeats(
session=session
):
self._purge_task_instances_without_heartbeats(
task_instances_without_heartbeats, session=session
)

def _find_zombies(self, *, session: Session) -> list[TI]:
def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[TI]:
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
zombies = session.scalars(
limit_dttm = timezone.utcnow() - timedelta(
seconds=self._task_instance_heartbeat_timeout_threshold_secs
)
task_instances_without_heartbeats = session.scalars(
select(TI)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(DM, TI.dag_id == DM.dag_id)
Expand All @@ -1991,63 +1999,77 @@ def _find_zombies(self, *, session: Session) -> list[TI]:
)
.where(TI.queued_by_job_id == self.job.id)
).all()
if zombies:
self.log.warning("Failing %s TIs without heartbeat after %s", len(zombies), limit_dttm)
return zombies
if task_instances_without_heartbeats:
self.log.warning(
"Failing %s TIs without heartbeat after %s",
len(task_instances_without_heartbeats),
limit_dttm,
)
return task_instances_without_heartbeats

def _purge_zombies(self, zombies: list[TI], *, session: Session) -> None:
for ti in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
def _purge_task_instances_without_heartbeats(
self, task_instances_without_heartbeats: list[TI], *, session: Session
) -> None:
for ti in task_instances_without_heartbeats:
task_instance_heartbeat_timeout_message_details = (
self._generate_task_instance_heartbeat_timeout_message_details(ti)
)
request = TaskCallbackRequest(
filepath=ti.dag_model.relative_fileloc,
bundle_name=ti.dag_version.bundle_name,
bundle_version=ti.dag_run.bundle_version,
ti=ti,
msg=str(zombie_message_details),
msg=str(task_instance_heartbeat_timeout_message_details),
)
session.add(
Log(
event="heartbeat timeout",
task_instance=ti.key,
extra=(
f"Task did not emit heartbeat within time limit ({self._zombie_threshold_secs} "
f"Task did not emit heartbeat within time limit ({self._task_instance_heartbeat_timeout_threshold_secs} "
"seconds) and will be terminated. "
"See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-tasks"
"stable/core-concepts/tasks.html#task-instance-heartbeat-timeout"
),
)
)
self.log.error(
"Detected zombie job: %s "
"Detected a task instance without a heartbeat: %s "
"(See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-tasks)",
"stable/core-concepts/tasks.html#task-instance-heartbeat-timeout)",
request,
)
self.job.executor.send_callback(request)
if (executor := self._try_to_load_executor(ti.executor)) is None:
self.log.warning("Cannot clean up zombie %r with non-existent executor %s", ti, ti.executor)
self.log.warning(
"Cannot clean up task instance without heartbeat %r with non-existent executor %s",
ti,
ti.executor,
)
continue
executor.change_state(ti.key, TaskInstanceState.FAILED, remove_running=True)
Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})
Stats.incr(
"task_instances_without_heartbeats_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id}
)

# [END find_and_purge_zombies]
# [END find_and_purge_task_instances_without_heartbeats]

@staticmethod
def _generate_zombie_message_details(ti: TI) -> dict[str, Any]:
zombie_message_details = {
def _generate_task_instance_heartbeat_timeout_message_details(ti: TI) -> dict[str, Any]:
task_instance_heartbeat_timeout_message_details = {
"DAG Id": ti.dag_id,
"Task Id": ti.task_id,
"Run Id": ti.run_id,
}

if ti.map_index != -1:
zombie_message_details["Map Index"] = ti.map_index
task_instance_heartbeat_timeout_message_details["Map Index"] = ti.map_index
if ti.hostname:
zombie_message_details["Hostname"] = ti.hostname
task_instance_heartbeat_timeout_message_details["Hostname"] = ti.hostname
if ti.external_executor_id:
zombie_message_details["External Executor Id"] = ti.external_executor_id
task_instance_heartbeat_timeout_message_details["External Executor Id"] = ti.external_executor_id

return zombie_message_details
return task_instance_heartbeat_timeout_message_details

@provide_session
def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3101,7 +3101,7 @@ def fetch_handle_failure_context(

ti.clear_next_method_args()

# In extreme cases (zombie in case of dag with parse error) we might _not_ have a Task.
# In extreme cases (task instance heartbeat timeout in case of dag with parse error) we might _not_ have a Task.
if context is None and getattr(ti, "task", None):
context = ti.get_template_context(session)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Name Descripti
``ti_failures`` Overall task instances failures. Metric with dag_id and task_id tagging.
``ti_successes`` Overall task instances successes. Metric with dag_id and task_id tagging.
``previously_succeeded`` Number of previously succeeded task instances. Metric with dag_id and task_id tagging.
``zombies_killed`` Zombie tasks killed. Metric with dag_id and task_id tagging.
``task_instances_without_heartbeats_killed`` Task instances without heartbeats killed. Metric with dag_id and task_id tagging.
``scheduler_heartbeat`` Scheduler heartbeats
``dag_processor_heartbeat`` Standalone DAG processor heartbeats
``dag_processing.processes`` Relative number of currently running DAG parsing processes (ie this delta
Expand Down
26 changes: 13 additions & 13 deletions docs/apache-airflow/core-concepts/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -165,35 +165,35 @@ If you want to control your task's state from within custom Task/Operator code,

These can be useful if your code has extra knowledge about its environment and wants to fail/skip faster - e.g., skipping when it knows there's no data available, or fast-failing when it detects its API key is invalid (as that will not be fixed by a retry).

.. _concepts:zombies:
.. _concepts:task-instance-heartbeat-timeout:

Zombie Tasks
------------
Task Instance Heartbeat Timeout
-------------------------------

No system runs perfectly, and task instances are expected to die once in a while.

*Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite their associated jobs being inactive
(e.g. their process did not send a recent heartbeat as it got killed, or the machine died). Airflow will find these
periodically, clean them up, and either fail or retry the task depending on its settings. Tasks can become zombies for
``TaskInstances`` may get stuck in a ``running`` state despite their associated jobs being inactive
(for example if the ``TaskInstance``'s worker ran out of memory). Such tasks were formerly known as zombie tasks. Airflow will find these
periodically, clean them up, and mark the ``TaskInstance`` as failed or retry it if it has available retries. The ``TaskInstance``'s heartbeat can timeout for
many reasons, including:

* The Airflow worker ran out of memory and was OOMKilled.
* The Airflow worker failed its liveness probe, so the system (for example, Kubernetes) restarted the worker.
* The system (for example, Kubernetes) scaled down and moved an Airflow worker from one node to another.


Reproducing zombie tasks locally
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Reproducing task instance heartbeat timeouts locally
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you'd like to reproduce zombie tasks for development/testing processes, follow the steps below:
If you'd like to reproduce task instance heartbeat timeouts for development/testing processes, follow the steps below:

1. Set the below environment variables for your local Airflow setup (alternatively you could tweak the corresponding config values in airflow.cfg)

.. code-block:: bash

export AIRFLOW__SCHEDULER__LOCAL_TASK_JOB_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=2
export AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=5
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT_THRESHOLD=2
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT_DETECTION_INTERVAL=5


2. Have a DAG with a task that takes about 10 minutes to complete(i.e. a long-running task). For example, you could use the below DAG:
Expand All @@ -216,7 +216,7 @@ If you'd like to reproduce zombie tasks for development/testing processes, follo
sleep_dag()


Run the above DAG and wait for a while. You should see the task instance becoming a zombie task and then being killed by the scheduler.
Run the above DAG and wait for a while. The ``TaskInstance`` will be marked failed after <task_instance_heartbeat_timeout_threshold> seconds.



Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ Why did my task fail with no logs in the UI?
Logs are :ref:`typically served when a task reaches a terminal state <serving-worker-trigger-logs>`. Sometimes, a task's normal lifecycle is disrupted, and the task's
worker is unable to write the task's logs. This typically happens for one of two reasons:

1. :ref:`Zombie tasks <concepts:zombies>`.
1. :ref:`Task Instance Heartbeat Timeout <concepts:task-instance-heartbeat-timeout>`.
2. Tasks failed after getting stuck in queued (Airflow 2.6.0+). Tasks that are in queued for longer than :ref:`scheduler.task_queued_timeout <config:scheduler__task_queued_timeout>` will be marked as failed, and there will be no task logs in the Airflow UI.

Setting retries for each task drastically reduces the chance that either of these problems impact a workflow.
Expand Down
Loading