Skip to content

Commit

Permalink
Optionally ignore backfill in active_runs_of_dags counts
Browse files Browse the repository at this point in the history
In the areas where this is used, we don't want to include backfill runs in the counts.
Rather than rename the function to reflect the change, I add a parameter.
  • Loading branch information
dstandish committed Oct 2, 2024
1 parent 3e5ecd2 commit 5cbc941
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
23 changes: 18 additions & 5 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,23 @@ class _RunInfo(NamedTuple):
num_active_runs: dict[str, int]

@classmethod
def calculate(cls, dags: dict[str, DAG], *, session: Session) -> Self:
def calculate(cls, dags: dict[str, DAG], *, count_backfill=False, session: Session) -> Self:
"""
:param count_backfill: Whether we should include backfill in active run counts.
"""
# Skip these queries entirely if no DAGs can be scheduled to save time.
if not any(dag.timetable.can_be_scheduled for dag in dags.values()):
return cls({}, {})
return cls(
{run.dag_id: run for run in session.scalars(_get_latest_runs_stmt(dag_ids=dags))},
DagRun.active_runs_of_dags(dag_ids=dags, session=session),

latest_runs = {run.dag_id: run for run in session.scalars(_get_latest_runs_stmt(dag_ids=dags))}
active_run_counts = DagRun.active_runs_of_dags(
dag_ids=dags,
include_backfill=count_backfill,
session=session,
)

return cls(latest_runs, active_run_counts)


def _update_dag_tags(tag_names: set[str], dm: DagModel, *, session: Session) -> None:
orm_tags = {t.name: t for t in dm.tags}
Expand Down Expand Up @@ -188,7 +196,12 @@ def update_dags(
processor_subdir: str | None = None,
session: Session,
) -> None:
run_info = _RunInfo.calculate(self.dags, session=session)
# we exclude backfill from active run counts since their concurrency is separate
run_info = _RunInfo.calculate(
dags=self.dags,
count_backfill=False,
session=session,
)

for dag_id, dm in sorted(orm_dags.items()):
dag = self.dags[dag_id]
Expand Down
8 changes: 7 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1334,8 +1334,14 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
.all()
)

# backfill runs are not created by scheduler and their concurrency is separate
# so we exclude them here
active_runs_of_dags = Counter(
DagRun.active_runs_of_dags(dag_ids=(dm.dag_id for dm in dag_models), session=session),
DagRun.active_runs_of_dags(
dag_ids=(dm.dag_id for dm in dag_models),
include_backfill=False,
session=session,
),
)

for dag_model in dag_models:
Expand Down
22 changes: 13 additions & 9 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,20 +372,24 @@ def refresh_from_db(self, session: Session = NEW_SESSION) -> None:
def active_runs_of_dags(
cls,
dag_ids: Iterable[str] | None = None,
only_running: bool = False,
include_backfill=False,
session: Session = NEW_SESSION,
) -> dict[str, int]:
"""Get the number of active dag runs for each dag."""
query = select(cls.dag_id, func.count("*"))
query = (
select(
cls.dag_id,
func.count("*"),
)
.where(
cls.state.in_((DagRunState.RUNNING, DagRunState.QUEUED)),
)
.group_by(cls.dag_id)
)
if not include_backfill:
query = query.where(cls.run_type != DagRunType.BACKFILL_JOB)
if dag_ids is not None:
# 'set' called to avoid duplicate dag_ids, but converted back to 'list'
# because SQLAlchemy doesn't accept a set here.
query = query.where(cls.dag_id.in_(set(dag_ids)))
if only_running:
query = query.where(cls.state == DagRunState.RUNNING)
else:
query = query.where(cls.state.in_((DagRunState.RUNNING, DagRunState.QUEUED)))
query = query.group_by(cls.dag_id)
return dict(iter(session.execute(query)))

@classmethod
Expand Down

0 comments on commit 5cbc941

Please sign in to comment.