diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py index 2198062e6b89c..0d3e1295dcd4e 100644 --- a/airflow/cli/commands/celery_command.py +++ b/airflow/cli/commands/celery_command.py @@ -18,6 +18,7 @@ """Celery command.""" from __future__ import annotations +from contextlib import contextmanager from multiprocessing import Process import daemon @@ -83,22 +84,16 @@ def flower(args): celery_app.start(options) -def _serve_logs(skip_serve_logs: bool = False) -> Process | None: +@contextmanager +def _serve_logs(skip_serve_logs: bool = False): """Starts serve_logs sub-process.""" + sub_proc = None if skip_serve_logs is False: sub_proc = Process(target=serve_logs) sub_proc.start() - return sub_proc - return None - - -def _run_worker(options, skip_serve_logs): - sub_proc = _serve_logs(skip_serve_logs) - try: - celery_app.worker_main(options) - finally: - if sub_proc: - sub_proc.terminate() + yield + if sub_proc: + sub_proc.terminate() @cli_utils.action_cli @@ -190,17 +185,19 @@ def worker(args): stdout_handle.truncate(0) stderr_handle.truncate(0) - ctx = daemon.DaemonContext( + daemon_context = daemon.DaemonContext( files_preserve=[handle], umask=int(umask, 8), stdout=stdout_handle, stderr=stderr_handle, ) - with ctx: - _run_worker(options=options, skip_serve_logs=skip_serve_logs) + with daemon_context, _serve_logs(skip_serve_logs): + celery_app.worker_main(options) + else: # Run Celery worker in the same process - _run_worker(options=options, skip_serve_logs=skip_serve_logs) + with _serve_logs(skip_serve_logs): + celery_app.worker_main(options) @cli_utils.action_cli diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index a82ada1d52a0e..44544716df6da 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -18,6 +18,7 @@ from __future__ import annotations import signal +from contextlib import contextmanager from multiprocessing import Process import daemon @@ -31,28 +32,15 @@ from airflow.utils.scheduler_health import serve_health_check -def _create_scheduler_job(args): +def _run_scheduler_job(args): job = SchedulerJob( subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle, ) - return job - - -def _run_scheduler_job(args): - skip_serve_logs = args.skip_serve_logs - job = _create_scheduler_job(args) - logs_sub_proc = _serve_logs(skip_serve_logs) enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK") - health_sub_proc = _serve_health_check(enable_health_check) - try: + with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check): job.run() - finally: - if logs_sub_proc: - logs_sub_proc.terminate() - if health_sub_proc: - health_sub_proc.terminate() @cli_utils.action_cli @@ -85,23 +73,29 @@ def scheduler(args): _run_scheduler_job(args=args) -def _serve_logs(skip_serve_logs: bool = False) -> Process | None: +@contextmanager +def _serve_logs(skip_serve_logs: bool = False): """Starts serve_logs sub-process.""" from airflow.configuration import conf from airflow.utils.serve_logs import serve_logs + sub_proc = None if conf.get("core", "executor") in ["LocalExecutor", "SequentialExecutor"]: if skip_serve_logs is False: sub_proc = Process(target=serve_logs) sub_proc.start() - return sub_proc - return None + yield + if sub_proc: + sub_proc.terminate() -def _serve_health_check(enable_health_check: bool = False) -> Process | None: +@contextmanager +def _serve_health_check(enable_health_check: bool = False): """Starts serve_health_check sub-process.""" + sub_proc = None if enable_health_check: sub_proc = Process(target=serve_health_check) sub_proc.start() - return sub_proc - return None + yield + if sub_proc: + sub_proc.terminate()