Skip to content

Commit

Permalink
Exclude backfill dag runs in active_runs_of_dags counts (apache#42684)
Browse files Browse the repository at this point in the history
In the areas where this is used, we don't want to include backfill runs in the counts. Rather than rename the function to reflect the change, I add a parameter.

https://github.com/orgs/apache/projects/408
  • Loading branch information
dstandish authored and PaulKobow7536 committed Oct 24, 2024
1 parent fe55583 commit e53ab1c
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 43 deletions.
22 changes: 18 additions & 4 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,24 @@ class _RunInfo(NamedTuple):

@classmethod
def calculate(cls, dags: dict[str, DAG], *, session: Session) -> Self:
"""
Query the the run counts from the db.
:param dags: dict of dags to query
"""
# Skip these queries entirely if no DAGs can be scheduled to save time.
if not any(dag.timetable.can_be_scheduled for dag in dags.values()):
return cls({}, {})
return cls(
{run.dag_id: run for run in session.scalars(_get_latest_runs_stmt(dag_ids=dags))},
DagRun.active_runs_of_dags(dag_ids=dags, session=session),

latest_runs = {run.dag_id: run for run in session.scalars(_get_latest_runs_stmt(dag_ids=dags.keys()))}
active_run_counts = DagRun.active_runs_of_dags(
dag_ids=dags.keys(),
exclude_backfill=True,
session=session,
)

return cls(latest_runs, active_run_counts)


def _update_dag_tags(tag_names: set[str], dm: DagModel, *, session: Session) -> None:
orm_tags = {t.name: t for t in dm.tags}
Expand Down Expand Up @@ -188,7 +198,11 @@ def update_dags(
processor_subdir: str | None = None,
session: Session,
) -> None:
run_info = _RunInfo.calculate(self.dags, session=session)
# we exclude backfill from active run counts since their concurrency is separate
run_info = _RunInfo.calculate(
dags=self.dags,
session=session,
)

for dag_id, dm in sorted(orm_dags.items()):
dag = self.dags[dag_id]
Expand Down
28 changes: 20 additions & 8 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,8 +1337,16 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
.all()
)

# backfill runs are not created by scheduler and their concurrency is separate
# so we exclude them here
dag_ids = (dm.dag_id for dm in dag_models)
active_runs_of_dags = Counter(DagRun.active_runs_of_dags(dag_ids=dag_ids, session=session))
active_runs_of_dags = Counter(
DagRun.active_runs_of_dags(
dag_ids=dag_ids,
exclude_backfill=True,
session=session,
)
)

for dag_model in dag_models:
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
Expand Down Expand Up @@ -1382,7 +1390,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
dag,
dag_model,
last_dag_run=None,
total_active_runs=active_runs_of_dags[dag.dag_id],
active_non_backfill_runs=active_runs_of_dags[dag.dag_id],
session=session,
):
dag_model.calculate_dagrun_date_fields(dag, data_interval)
Expand Down Expand Up @@ -1496,7 +1504,7 @@ def _should_update_dag_next_dagruns(
dag_model: DagModel,
*,
last_dag_run: DagRun | None = None,
total_active_runs: int | None = None,
active_non_backfill_runs: int | None = None,
session: Session,
) -> bool:
"""Check if the dag's next_dagruns_create_after should be updated."""
Expand All @@ -1511,15 +1519,19 @@ def _should_update_dag_next_dagruns(
if not dag.timetable.can_be_scheduled:
return False

if total_active_runs is None:
runs_dict = DagRun.active_runs_of_dags(dag_ids=[dag.dag_id], session=session)
total_active_runs = runs_dict.get(dag.dag_id, 0)
if active_non_backfill_runs is None:
runs_dict = DagRun.active_runs_of_dags(
dag_ids=[dag.dag_id],
exclude_backfill=True,
session=session,
)
active_non_backfill_runs = runs_dict.get(dag.dag_id, 0)

if total_active_runs >= dag.max_active_runs:
if active_non_backfill_runs >= dag.max_active_runs:
self.log.info(
"DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
dag_model.dag_id,
total_active_runs,
active_non_backfill_runs,
dag.max_active_runs,
)
dag_model.next_dagrun_create_after = None
Expand Down
4 changes: 4 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ def refresh_from_db(self, session: Session = NEW_SESSION) -> None:
@provide_session
def active_runs_of_dags(
cls,
*,
dag_ids: Iterable[str],
exclude_backfill,
session: Session = NEW_SESSION,
) -> dict[str, int]:
"""
Expand All @@ -400,6 +402,8 @@ def active_runs_of_dags(
.where(cls.state.in_((DagRunState.RUNNING, DagRunState.QUEUED)))
.group_by(cls.dag_id)
)
if exclude_backfill:
query = query.where(cls.run_type != DagRunType.BACKFILL_JOB)
return dict(iter(session.execute(query)))

@classmethod
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ ignore = [
"PT005", # Fixture returns a value, remove leading underscore
"PT006", # Wrong type of names in @pytest.mark.parametrize
"PT007", # Wrong type of values in @pytest.mark.parametrize
"PT013", # silly rule prohibiting e.g. `from pytest import param`
"PT011", # pytest.raises() is too broad, set the match parameter
"PT019", # fixture without value is injected as parameter, use @pytest.mark.usefixtures instead
# Rules below explicitly set off which could overlap with Ruff's formatter
Expand Down
128 changes: 97 additions & 31 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import psutil
import pytest
import time_machine
from pytest import param
from sqlalchemy import func, select, update

import airflow.example_dags
Expand Down Expand Up @@ -3818,62 +3819,119 @@ def test_adopt_or_reset_orphaned_tasks_only_fails_scheduler_jobs(self, caplog):
assert "Marked 1 SchedulerJob instances as failed" in caplog.messages

@pytest.mark.parametrize(
"schedule, number_running, excepted",
"kwargs",
[
(None, None, False),
("*/1 * * * *", None, False),
("*/1 * * * *", 1, True),
param(
dict(
schedule=None,
backfill_runs=0,
other_runs=2,
max_active_runs=2,
should_update=False,
),
id="no_dag_schedule",
),
param(
dict(
schedule="0 0 * * *",
backfill_runs=0,
other_runs=2,
max_active_runs=2,
should_update=False,
),
id="dag_schedule_at_capacity",
),
param(
dict(
schedule="0 0 * * *",
backfill_runs=0,
other_runs=1,
max_active_runs=2,
should_update=True,
),
id="dag_schedule_under_capacity",
),
param(
dict(
schedule="0 0 * * *",
backfill_runs=0,
other_runs=5,
max_active_runs=2,
should_update=False,
),
id="dag_schedule_over_capacity",
),
param(
dict(
schedule="0 0 * * *",
number_running=None,
backfill_runs=5,
other_runs=1,
max_active_runs=2,
should_update=True,
),
id="dag_schedule_under_capacity_many_backfill",
),
],
ids=["no_dag_schedule", "dag_schedule_too_many_runs", "dag_schedule_less_runs"],
)
def test_should_update_dag_next_dagruns(self, schedule, number_running, excepted, session, dag_maker):
@pytest.mark.parametrize("provide_run_count", [True, False])
def test_should_update_dag_next_dagruns(self, provide_run_count: bool, kwargs: dict, session, dag_maker):
"""Test if really required to update next dagrun or possible to save run time"""
schedule: str | None = kwargs["schedule"]
backfill_runs: int = kwargs["backfill_runs"]
other_runs: int = kwargs["other_runs"]
max_active_runs: int = kwargs["max_active_runs"]
should_update: bool = kwargs["should_update"]

with dag_maker(
dag_id="test_should_update_dag_next_dagruns", schedule=schedule, max_active_runs=2
) as dag:
with dag_maker(schedule=schedule, max_active_runs=max_active_runs) as dag:
EmptyOperator(task_id="dummy")

dag_model = dag_maker.dag_model

for index in range(2):
index = 0
for index in range(other_runs):
dag_maker.create_dagrun(
run_id=f"run_{index}",
execution_date=(DEFAULT_DATE + timedelta(days=index)),
start_date=timezone.utcnow(),
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
session=session,
)

session.flush()
for index in range(index + 1, index + 1 + backfill_runs):
dag_maker.create_dagrun(
run_id=f"run_{index}",
execution_date=(DEFAULT_DATE + timedelta(days=index)),
start_date=timezone.utcnow(),
state=State.RUNNING,
run_type=DagRunType.BACKFILL_JOB,
session=session,
)
assert index == other_runs + backfill_runs - 1 # sanity check
session.commit()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)

assert excepted is self.job_runner._should_update_dag_next_dagruns(
dag, dag_model, total_active_runs=number_running, session=session
actual = self.job_runner._should_update_dag_next_dagruns(
dag=dag,
dag_model=dag_maker.dag_model,
active_non_backfill_runs=other_runs if provide_run_count else None, # exclude backfill here
session=session,
)
assert actual == should_update

@pytest.mark.parametrize(
"run_type, should_update",
"run_type, expected",
[
(DagRunType.MANUAL, False),
(DagRunType.SCHEDULED, True),
(DagRunType.BACKFILL_JOB, True),
(DagRunType.DATASET_TRIGGERED, False),
],
ids=[
DagRunType.MANUAL.name,
DagRunType.SCHEDULED.name,
DagRunType.BACKFILL_JOB.name,
DagRunType.DATASET_TRIGGERED.name,
],
)
def test_should_update_dag_next_dagruns_after_run_type(self, run_type, should_update, session, dag_maker):
"""Test that whether next dagrun is updated depends on run type"""
def test_should_update_dag_next_dagruns_after_run_type(self, run_type, expected, session, dag_maker):
"""Test that whether next dag run is updated depends on run type"""
with dag_maker(
dag_id="test_should_update_dag_next_dagruns_after_run_type",
schedule="*/1 * * * *",
max_active_runs=10,
max_active_runs=3,
) as dag:
EmptyOperator(task_id="dummy")

Expand All @@ -3892,9 +3950,13 @@ def test_should_update_dag_next_dagruns_after_run_type(self, run_type, should_up
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)

assert should_update is self.job_runner._should_update_dag_next_dagruns(
dag, dag_model, last_dag_run=run, total_active_runs=0, session=session
actual = self.job_runner._should_update_dag_next_dagruns(
dag=dag,
dag_model=dag_model,
last_dag_run=run,
session=session,
)
assert actual == expected

def test_create_dag_runs(self, dag_maker):
"""
Expand Down Expand Up @@ -4477,14 +4539,18 @@ def complete_one_dagrun():
model: DagModel = session.get(DagModel, dag.dag_id)

# Pre-condition
assert DagRun.active_runs_of_dags(dag_ids=["test_dag"], session=session) == {"test_dag": 3}
assert DagRun.active_runs_of_dags(dag_ids=["test_dag"], exclude_backfill=True, session=session) == {
"test_dag": 3
}

assert model.next_dagrun == timezone.DateTime(2016, 1, 3, tzinfo=UTC)
assert model.next_dagrun_create_after is None

complete_one_dagrun()

assert DagRun.active_runs_of_dags(dag_ids=["test_dag"], session=session) == {"test_dag": 3}
assert DagRun.active_runs_of_dags(dag_ids=["test_dag"], exclude_backfill=True, session=session) == {
"test_dag": 3
}

for _ in range(5):
self.job_runner._do_scheduling(session)
Expand Down

0 comments on commit e53ab1c

Please sign in to comment.