Skip to content

Commit

Permalink
feat: add enable_trace_malloc config to enable memory allocation tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Sep 23, 2024
1 parent 78584e6 commit 431ea75
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
10 changes: 10 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2566,6 +2566,16 @@ scheduler:
example: ~
default: "True"
see_also: ":ref:`Differences between the two cron timetables`"
enable_tracemalloc:
description: |
Whether to enable memory allocation tracing in the scheduler. If enabled, Airflow will start
tracing memory allocation and log the top 10 memory usages at the error level upon receiving the
signal SIGUSR1.
This is an expensive operation and generally should not be used except for debugging purposes.
version_added: 3.0.0
type: boolean
example: ~
default: "False"
triggerer:
description: ~
options:
Expand Down
19 changes: 15 additions & 4 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ def __init__(

self.do_pickle = do_pickle

self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc")
if self._enable_tracemalloc:
import tracemalloc

tracemalloc.start()

if log:
self._log = log

Expand All @@ -200,15 +206,22 @@ def register_signals(self) -> None:
"""Register signals that stop child processes."""
signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)
signal.signal(signal.SIGUSR1, self._log_memory_usage)
signal.signal(signal.SIGUSR2, self._debug_dump)

if self._enable_tracemalloc:
signal.signal(signal.SIGUSR1, self._log_memory_usage)

def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
"""Clean up processor_agent to avoid leaving orphan processes."""
if not _is_parent_process():
# Only the parent process should perform the cleanup.
return

if self._enable_tracemalloc:
import tracemalloc

tracemalloc.stop()

self.log.info("Exiting gracefully upon receiving signal %s", signum)
if self.processor_agent:
self.processor_agent.end()
Expand All @@ -217,16 +230,14 @@ def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
def _log_memory_usage(self, signum: int, frame: FrameType | None) -> None:
import tracemalloc

tracemalloc.start()
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("lineno")
n = 10
self.log.error(
"executor memory usgae:\n Top %d\n %s",
"scheduler memory usgae:\n Top %d\n %s",
n,
"\n\t".join(map(str, top_stats[:n])),
)
tracemalloc.stop()

def _debug_dump(self, signum: int, frame: FrameType | None) -> None:
if not _is_parent_process():
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,7 @@ tooltip
tooltips
traceback
tracebacks
tracemalloc
TrainingPipeline
travis
triage
Expand Down

0 comments on commit 431ea75

Please sign in to comment.