Skip to content

Commit

Permalink
Merge branch 'main' into fix/base-container-name-on-trigger-reentry
Browse files Browse the repository at this point in the history
  • Loading branch information
peloyeje authored Jul 19, 2024
2 parents c45c093 + 4196a96 commit 58bf972
Show file tree
Hide file tree
Showing 123 changed files with 3,498 additions and 648 deletions.
1 change: 0 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ github:
- vatsrahul1001
- cmarteepants
- romsharon98
- shahar1

notifications:
jobs: jobs@airflow.apache.org
10 changes: 6 additions & 4 deletions COMMITTERS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ To be able to merge PRs, committers have to integrate their GitHub ID with Apach
* ``dev/breeze/src/airflow_breeze/global_constants.py`` (COMMITTERS variable)
* name and GitHub ID in `project.rst <https://github.com/apache/airflow/blob/main/docs/apache-airflow/project.rst>`__.
* If you had been a collaborator role before getting committer, remove your Github ID from ``.asf.yaml``.
7. Raise PR to airflow site with the following:
* List your name in the committers list
`Airflow-Site committers.json <https://github.com/apache/airflow-site/blob/main/landing-pages/site/data/committers.json>`__.
* Post entry in `Announcements <https://github.com/apache/airflow-site/blob/main/landing-pages/site/content/en/announcements/_index.md>`__.
7. Raise a PR to `airflow-site <https://github.com/apache/airflow-site>`_ repository with the following additions:

* List your name(s) in the `committers list <https://github.com/apache/airflow-site/blob/main/landing-pages/site/data/committers.json>`__.
* Post an entry in `Announcements <https://github.com/apache/airflow-site/blob/main/landing-pages/site/content/en/announcements/_index.md>`__.

**A kind request**: If there are other committers who joined around the same time, please create a unified PR for all of you together.
6 changes: 6 additions & 0 deletions airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def get_event_logs(
dag_id: str | None = None,
task_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
try_number: int | None = None,
owner: str | None = None,
event: str | None = None,
excluded_events: str | None = None,
Expand Down Expand Up @@ -90,6 +92,10 @@ def get_event_logs(
query = query.where(Log.task_id == task_id)
if run_id:
query = query.where(Log.run_id == run_id)
if map_index:
query = query.where(Log.map_index == map_index)
if try_number:
query = query.where(Log.try_number == try_number)
if owner:
query = query.where(Log.owner == owner)
if event:
Expand Down
10 changes: 10 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,8 @@ paths:
- $ref: "#/components/parameters/FilterDAGID"
- $ref: "#/components/parameters/FilterTaskID"
- $ref: "#/components/parameters/FilterRunID"
- $ref: "#/components/parameters/FilterMapIndex"
- $ref: "#/components/parameters/FilterTryNumber"
- $ref: "#/components/parameters/Event"
- $ref: "#/components/parameters/Owner"
- $ref: "#/components/parameters/Before"
Expand Down Expand Up @@ -4068,6 +4070,7 @@ components:
type: string
format: "date-time"
readOnly: true
nullable: true
end_date:
type: string
format: "date-time"
Expand Down Expand Up @@ -5587,6 +5590,13 @@ components:
type: integer
description: Filter on map index for mapped task.

FilterTryNumber:
in: query
name: try_number
schema:
type: integer
description: Filter on try_number for task instance.

OrderBy:
in: query
name: order_by
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_connexion/schemas/event_log_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Meta:
dag_id = auto_field(dump_only=True)
task_id = auto_field(dump_only=True)
run_id = auto_field(dump_only=True)
map_index = auto_field(dump_only=True)
try_number = auto_field(dump_only=True)
event = auto_field(dump_only=True)
execution_date = auto_field(dump_only=True)
owner = auto_field(dump_only=True)
Expand Down
5 changes: 5 additions & 0 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def _initialize_map() -> dict[str, Callable]:
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskinstance import (
TaskInstance,
_add_log,
Expand Down Expand Up @@ -109,7 +111,10 @@ def _initialize_map() -> dict[str, Callable]:
DagRun.get_previous_scheduled_dagrun,
DagRun.fetch_task_instance,
DagRun._get_log_template,
RenderedTaskInstanceFields._update_runtime_evaluated_template_fields,
SerializedDagModel.get_serialized_dag,
SkipMixin._skip,
SkipMixin._skip_all_except,
TaskInstance._check_and_change_state_before_execution,
TaskInstance.get_task_instance,
TaskInstance._get_dagrun,
Expand Down
56 changes: 48 additions & 8 deletions airflow/cli/commands/rotate_fernet_key_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,57 @@
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import is_sqlalchemy_v1


@cli_utils.action_cli
@providers_configuration_loaded
def rotate_fernet_key(args):
"""Rotates all encrypted connection credentials and variables."""
"""Rotates all encrypted connection credentials, triggers and variables."""
batch_size = 100
rotate_method = rotate_items_in_batches_v1 if is_sqlalchemy_v1() else rotate_items_in_batches_v2
with create_session() as session:
conns_query = select(Connection).where(Connection.is_encrypted | Connection.is_extra_encrypted)
for conn in session.scalars(conns_query):
conn.rotate_fernet_key()
for var in session.scalars(select(Variable).where(Variable.is_encrypted)):
var.rotate_fernet_key()
for trigger in session.scalars(select(Trigger)):
trigger.rotate_fernet_key()
with session.begin(): # Start a single transaction
rotate_method(
session,
Connection,
filter_condition=Connection.is_encrypted | Connection.is_extra_encrypted,
batch_size=batch_size,
)
rotate_method(session, Variable, filter_condition=Variable.is_encrypted, batch_size=batch_size)
rotate_method(session, Trigger, filter_condition=None, batch_size=batch_size)


def rotate_items_in_batches_v1(session, model_class, filter_condition=None, batch_size=100):
"""
Rotates Fernet keys for items of a given model in batches to avoid excessive memory usage.
This function is a replacement for yield_per, which is not available in SQLAlchemy 1.x.
"""
offset = 0
while True:
query = select(model_class)
if filter_condition is not None:
query = query.where(filter_condition)
query = query.offset(offset).limit(batch_size)
items = session.scalars(query).all()
if not items:
break # No more items to process
for item in items:
item.rotate_fernet_key()
offset += batch_size


def rotate_items_in_batches_v2(session, model_class, filter_condition=None, batch_size=100):
"""
Rotates Fernet keys for items of a given model in batches to avoid excessive memory usage.
This function is taking advantage of yield_per available in SQLAlchemy 2.x.
"""
while True:
query = select(model_class)
if filter_condition is not None:
query = query.where(filter_condition)
items = session.scalars(query).yield_per(batch_size)
for item in items:
item.rotate_fernet_key()
1 change: 1 addition & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ def _log_file_processing_stats(self, known_file_paths):
seconds_ago = (now - last_run).total_seconds()
Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago)
last_num_of_db_queries = self.get_last_num_of_db_queries(file_path)
Stats.gauge(f"dag_processing.last_num_of_db_queries.{file_name}", last_num_of_db_queries)

rows.append(
(
Expand Down
5 changes: 4 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ def heartbeat(self) -> None:

self.log.debug("%s running task instances", num_running_tasks)
self.log.debug("%s in queue", num_queued_tasks)
self.log.debug("%s open slots", open_slots)
if open_slots == 0:
self.log.info("Executor parallelism limit reached. 0 open slots.")
else:
self.log.debug("%s open slots", open_slots)

Stats.gauge(
"executor.open_slots", value=open_slots, tags={"status": "open", "name": self.__class__.__name__}
Expand Down
18 changes: 0 additions & 18 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,6 @@ class ExecutorLoader:
DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
}

@classmethod
def block_use_of_hybrid_exec(cls, executor_config: list):
"""
Raise an exception if the user tries to use multiple executors before the feature is complete.
This check is built into a method so that it can be easily mocked in unit tests.
:param executor_config: core.executor configuration value.
"""
if len(executor_config) > 1 or ":" in "".join(executor_config):
raise AirflowConfigException(
"Configuring multiple executors and executor aliases are not yet supported!: "
f"{executor_config}"
)

@classmethod
def _get_executor_names(cls) -> list[ExecutorName]:
"""
Expand All @@ -102,9 +87,6 @@ def _get_executor_names(cls) -> list[ExecutorName]:

executor_names_raw = conf.get_mandatory_list_value("core", "EXECUTOR")

# AIP-61 is WIP. Unblock configuring multiple executors when the feature is ready to launch
cls.block_use_of_hybrid_exec(executor_names_raw)

executor_names = []
for name in executor_names_raw:
if len(split_name := name.split(":")) == 1:
Expand Down
39 changes: 36 additions & 3 deletions airflow/macros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from typing import TYPE_CHECKING, Any

import dateutil # noqa: F401
from babel import Locale
from babel.dates import LC_TIME, format_datetime

import airflow.utils.yaml as yaml # noqa: F401
from airflow.utils.deprecation_tools import add_deprecated_classes
Expand Down Expand Up @@ -64,18 +66,49 @@ def ds_format(ds: str, input_format: str, output_format: str) -> str:
"""
Output datetime string in a given format.
:param ds: input string which contains a date
:param input_format: input string format. E.g. %Y-%m-%d
:param output_format: output string format E.g. %Y-%m-%d
:param ds: Input string which contains a date.
:param input_format: Input string format (e.g., '%Y-%m-%d').
:param output_format: Output string format (e.g., '%Y-%m-%d').
>>> ds_format("2015-01-01", "%Y-%m-%d", "%m-%d-%y")
'01-01-15'
>>> ds_format("1/5/2015", "%m/%d/%Y", "%Y-%m-%d")
'2015-01-05'
>>> ds_format("12/07/2024", "%d/%m/%Y", "%A %d %B %Y", "en_US")
'Friday 12 July 2024'
"""
return datetime.strptime(str(ds), input_format).strftime(output_format)


def ds_format_locale(
ds: str, input_format: str, output_format: str, locale: Locale | str | None = None
) -> str:
"""
Output localized datetime string in a given Babel format.
:param ds: Input string which contains a date.
:param input_format: Input string format (e.g., '%Y-%m-%d').
:param output_format: Output string Babel format (e.g., `yyyy-MM-dd`).
:param locale: Locale used to format the output string (e.g., 'en_US').
If locale not specified, default LC_TIME will be used and if that's also not available,
'en_US' will be used.
>>> ds_format("2015-01-01", "%Y-%m-%d", "MM-dd-yy")
'01-01-15'
>>> ds_format("1/5/2015", "%m/%d/%Y", "yyyy-MM-dd")
'2015-01-05'
>>> ds_format("12/07/2024", "%d/%m/%Y", "EEEE dd MMMM yyyy", "en_US")
'Friday 12 July 2024'
.. versionadded:: 2.10.0
"""
return format_datetime(
datetime.strptime(str(ds), input_format),
format=output_format,
locale=locale or LC_TIME or Locale("en_US"),
)


def datetime_diff_for_humans(dt: Any, since: DateTime | None = None) -> str:
"""
Return a human-readable/approximate difference between datetimes.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add try_number to audit log.
Revision ID: 41b3bc7c0272
Revises: ec3471c1e067
Create Date: 2024-07-11 14:48:58.998259
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "41b3bc7c0272"
down_revision = "ec3471c1e067"
branch_labels = None
depends_on = None
airflow_version = "2.10.0"


def upgrade():
"""Apply add try_number to audit log."""
with op.batch_alter_table("log") as batch_op:
batch_op.add_column(sa.Column("try_number", sa.Integer(), nullable=True))
batch_op.create_index(
"idx_log_task_instance", ["dag_id", "task_id", "run_id", "map_index", "try_number"], unique=False
)


def downgrade():
"""Unapply add try_number to audit log."""
with op.batch_alter_table("log") as batch_op:
batch_op.drop_index("idx_log_task_instance")
batch_op.drop_column("try_number")
7 changes: 0 additions & 7 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,13 +978,6 @@ def __init__(
if end_date:
self.end_date = timezone.convert_to_utc(end_date)

if executor:
warnings.warn(
"Specifying executors for operators is not yet"
f"supported, the value {executor!r} will have no effect",
category=UserWarning,
stacklevel=2,
)
self.executor = executor
self.executor_config = executor_config or {}
self.run_as_user = run_as_user
Expand Down
1 change: 1 addition & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ def get_task_instance(
)

@staticmethod
@internal_api_call
@provide_session
def fetch_task_instance(
dag_id: str,
Expand Down
5 changes: 5 additions & 0 deletions airflow/models/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ class Log(Base):
owner = Column(String(500))
owner_display_name = Column(String(500))
extra = Column(Text)
try_number = Column(Integer)

__table_args__ = (
Index("idx_log_dag", dag_id),
Index("idx_log_dttm", dttm),
Index("idx_log_event", event),
Index("idx_log_task_instance", dag_id, task_id, run_id, map_index, try_number),
)

def __init__(self, event, task_instance=None, owner=None, owner_display_name=None, extra=None, **kwargs):
Expand All @@ -59,6 +61,7 @@ def __init__(self, event, task_instance=None, owner=None, owner_display_name=Non
self.task_id = task_instance.task_id
self.execution_date = task_instance.execution_date
self.run_id = task_instance.run_id
self.try_number = task_instance.try_number
self.map_index = task_instance.map_index
if getattr(task_instance, "task", None):
task_owner = task_instance.task.owner
Expand All @@ -73,6 +76,8 @@ def __init__(self, event, task_instance=None, owner=None, owner_display_name=Non
self.run_id = kwargs["run_id"]
if "map_index" in kwargs:
self.map_index = kwargs["map_index"]
if "try_number" in kwargs:
self.try_number = kwargs["try_number"]

self.owner = owner or task_owner
self.owner_display_name = owner_display_name or None
Expand Down
Loading

0 comments on commit 58bf972

Please sign in to comment.