Skip to content

Commit

Permalink
fix: use instance base_container_name to fetch logs on trigger_reentry (
Browse files Browse the repository at this point in the history
  • Loading branch information
peloyeje authored Oct 12, 2024
1 parent 465332b commit 794b153
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from contextlib import AbstractContextManager
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
from typing import TYPE_CHECKING, Any, Callable, Iterable, Literal, Sequence

import kubernetes
import tenacity
Expand Down Expand Up @@ -91,7 +91,6 @@
if TYPE_CHECKING:
import jinja2
from pendulum import DateTime
from typing_extensions import Literal

from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.utils.context import Context
Expand Down Expand Up @@ -285,7 +284,8 @@ def __init__(
startup_timeout_seconds: int = 120,
startup_check_interval_seconds: int = 5,
get_logs: bool = True,
container_logs: Iterable[str] | str | Literal[True] = BASE_CONTAINER_NAME,
base_container_name: str | None = None,
container_logs: Iterable[str] | str | Literal[True] | None = None,
image_pull_policy: str | None = None,
annotations: dict | None = None,
container_resources: k8s.V1ResourceRequirements | None = None,
Expand Down Expand Up @@ -315,7 +315,6 @@ def __init__(
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
base_container_name: str | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
poll_interval: float = 2,
log_pod_spec_on_failure: bool = True,
Expand Down Expand Up @@ -357,9 +356,10 @@ def __init__(
self.cluster_context = cluster_context
self.reattach_on_restart = reattach_on_restart
self.get_logs = get_logs
self.container_logs = container_logs
if self.container_logs == KubernetesPodOperator.BASE_CONTAINER_NAME:
self.container_logs = base_container_name or self.BASE_CONTAINER_NAME
# Fallback to the class variable BASE_CONTAINER_NAME here instead of via default argument value
# in the init method signature, to be compatible with subclasses overloading the class variable value.
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.container_logs = container_logs or self.base_container_name
self.image_pull_policy = image_pull_policy
self.node_selector = node_selector or {}
self.annotations = annotations or {}
Expand Down Expand Up @@ -398,7 +398,6 @@ def __init__(
if skip_on_exit_code is not None
else []
)
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.deferrable = deferrable
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
Expand Down Expand Up @@ -785,7 +784,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:

pod_log_status = self.pod_manager.fetch_container_logs(
pod=self.pod,
container_name=self.BASE_CONTAINER_NAME,
container_name=self.base_container_name,
follow=follow,
since_time=last_log_time,
)
Expand Down

0 comments on commit 794b153

Please sign in to comment.