Skip to content

Commit

Permalink
add "enable_tracemalloc" to log memory usage in scheduler (#42304)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W authored Sep 24, 2024
1 parent eed1d0d commit 57eed58
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
10 changes: 10 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2566,6 +2566,16 @@ scheduler:
example: ~
default: "True"
see_also: ":ref:`Differences between the two cron timetables`"
enable_tracemalloc:
description: |
Whether to enable memory allocation tracing in the scheduler. If enabled, Airflow will start
tracing memory allocation and log the top 10 memory usages at the error level upon receiving the
signal SIGUSR1.
This is an expensive operation and generally should not be used except for debugging purposes.
version_added: 3.0.0
type: boolean
example: ~
default: "False"
triggerer:
description: ~
options:
Expand Down
4 changes: 2 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,11 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li

def end(self) -> None: # pragma: no cover
"""Wait synchronously for the previously submitted job to complete."""
raise NotImplementedError()
raise NotImplementedError

def terminate(self):
"""Get called when the daemon receives a SIGTERM."""
raise NotImplementedError()
raise NotImplementedError

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover
"""
Expand Down
26 changes: 26 additions & 0 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ def __init__(

self.do_pickle = do_pickle

self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc")
if self._enable_tracemalloc:
import tracemalloc

tracemalloc.start()

if log:
self._log = log

Expand All @@ -202,17 +208,37 @@ def register_signals(self) -> None:
signal.signal(signal.SIGTERM, self._exit_gracefully)
signal.signal(signal.SIGUSR2, self._debug_dump)

if self._enable_tracemalloc:
signal.signal(signal.SIGUSR1, self._log_memory_usage)

def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
"""Clean up processor_agent to avoid leaving orphan processes."""
if not _is_parent_process():
# Only the parent process should perform the cleanup.
return

if self._enable_tracemalloc:
import tracemalloc

tracemalloc.stop()

self.log.info("Exiting gracefully upon receiving signal %s", signum)
if self.processor_agent:
self.processor_agent.end()
sys.exit(os.EX_OK)

def _log_memory_usage(self, signum: int, frame: FrameType | None) -> None:
import tracemalloc

snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("lineno")
n = 10
self.log.error(
"scheduler memory usgae:\n Top %d\n %s",
n,
"\n\t".join(map(str, top_stats[:n])),
)

def _debug_dump(self, signum: int, frame: FrameType | None) -> None:
if not _is_parent_process():
# Only the parent process should perform the debug dump.
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,7 @@ tooltip
tooltips
traceback
tracebacks
tracemalloc
TrainingPipeline
travis
triage
Expand Down

0 comments on commit 57eed58

Please sign in to comment.