Skip to content

Commit

Permalink
feat: move the memory usage log to sigusr1
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Sep 23, 2024
1 parent 2ab418a commit 78584e6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
9 changes: 0 additions & 9 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,6 @@ def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None,

def debug_dump(self):
"""Get called in response to SIGUSR2 by the scheduler."""
import tracemalloc

tracemalloc.start()
self.log.info(
"executor.queued (%d)\n\t%s",
len(self.queued_tasks),
Expand All @@ -620,12 +617,6 @@ def debug_dump(self):
len(self.event_buffer),
"\n\t".join(map(repr, self.event_buffer.items())),
)
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("lineno")
n = 10
memory_usage_top_n_info = "\n".join([str(stat) for stat in top_stats[:n]])
self.log.debug("executor memory usgae:\n Top %d\n %s", n, memory_usage_top_n_info)
tracemalloc.stop()

def send_callback(self, request: CallbackRequest) -> None:
"""
Expand Down
15 changes: 15 additions & 0 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def register_signals(self) -> None:
"""Register signals that stop child processes."""
signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)
signal.signal(signal.SIGUSR1, self._log_memory_usage)
signal.signal(signal.SIGUSR2, self._debug_dump)

def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
Expand All @@ -213,6 +214,20 @@ def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
self.processor_agent.end()
sys.exit(os.EX_OK)

def _log_memory_usage(self, signum: int, frame: FrameType | None) -> None:
import tracemalloc

tracemalloc.start()
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("lineno")
n = 10
self.log.error(
"executor memory usgae:\n Top %d\n %s",
n,
"\n\t".join(map(str, top_stats[:n])),
)
tracemalloc.stop()

def _debug_dump(self, signum: int, frame: FrameType | None) -> None:
if not _is_parent_process():
# Only the parent process should perform the debug dump.
Expand Down

0 comments on commit 78584e6

Please sign in to comment.