Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use instance base_container_name to fetch logs on trigger_reentry #42960

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from contextlib import AbstractContextManager
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
from typing import TYPE_CHECKING, Any, Callable, Iterable, Literal, Sequence

import kubernetes
import tenacity
Expand Down Expand Up @@ -91,7 +91,6 @@
if TYPE_CHECKING:
import jinja2
from pendulum import DateTime
from typing_extensions import Literal

from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.utils.context import Context
Expand Down Expand Up @@ -285,7 +284,8 @@ def __init__(
startup_timeout_seconds: int = 120,
startup_check_interval_seconds: int = 5,
get_logs: bool = True,
container_logs: Iterable[str] | str | Literal[True] = BASE_CONTAINER_NAME,
base_container_name: str | None = None,
container_logs: Iterable[str] | str | Literal[True] | None = None,
image_pull_policy: str | None = None,
annotations: dict | None = None,
container_resources: k8s.V1ResourceRequirements | None = None,
Expand Down Expand Up @@ -315,7 +315,6 @@ def __init__(
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
base_container_name: str | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
poll_interval: float = 2,
log_pod_spec_on_failure: bool = True,
Expand Down Expand Up @@ -357,9 +356,10 @@ def __init__(
self.cluster_context = cluster_context
self.reattach_on_restart = reattach_on_restart
self.get_logs = get_logs
self.container_logs = container_logs
if self.container_logs == KubernetesPodOperator.BASE_CONTAINER_NAME:
self.container_logs = base_container_name or self.BASE_CONTAINER_NAME
# Fallback to the class variable BASE_CONTAINER_NAME here instead of via default argument value
# in the init method signature, to be compatible with subclasses overloading the class variable value.
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.container_logs = container_logs or self.base_container_name
self.image_pull_policy = image_pull_policy
self.node_selector = node_selector or {}
self.annotations = annotations or {}
Expand Down Expand Up @@ -398,7 +398,6 @@ def __init__(
if skip_on_exit_code is not None
else []
)
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.deferrable = deferrable
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
Expand Down Expand Up @@ -785,7 +784,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:

pod_log_status = self.pod_manager.fetch_container_logs(
pod=self.pod,
container_name=self.BASE_CONTAINER_NAME,
container_name=self.base_container_name,
follow=follow,
since_time=last_log_time,
)
Expand Down