Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix overriden base container name not used to fetch logs on KPO trigger_reentry #40835

Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from contextlib import AbstractContextManager
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
from typing import TYPE_CHECKING, Any, Callable, Iterable, Literal, Sequence

import kubernetes
import tenacity
Expand Down Expand Up @@ -91,7 +91,6 @@
if TYPE_CHECKING:
import jinja2
from pendulum import DateTime
from typing_extensions import Literal

from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.utils.context import Context
Expand Down Expand Up @@ -281,7 +280,8 @@ def __init__(
startup_timeout_seconds: int = 120,
startup_check_interval_seconds: int = 5,
get_logs: bool = True,
container_logs: Iterable[str] | str | Literal[True] = BASE_CONTAINER_NAME,
base_container_name: str | None = None,
container_logs: Iterable[str] | str | Literal[True] | None = None,
image_pull_policy: str | None = None,
annotations: dict | None = None,
container_resources: k8s.V1ResourceRequirements | None = None,
Expand Down Expand Up @@ -311,7 +311,6 @@ def __init__(
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
base_container_name: str | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
poll_interval: float = 2,
log_pod_spec_on_failure: bool = True,
Expand Down Expand Up @@ -353,9 +352,9 @@ def __init__(
self.cluster_context = cluster_context
self.reattach_on_restart = reattach_on_restart
self.get_logs = get_logs
self.container_logs = container_logs
if self.container_logs == KubernetesPodOperator.BASE_CONTAINER_NAME:
self.container_logs = base_container_name or self.BASE_CONTAINER_NAME
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can change the default value of base_container_name to BASE_CONTAINER_NAME and remove this line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peloyeje can you handle the above comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, sorry for the delay.
Thanks for the review @hussein-awala!

# container_logs defaults to the provided base container name, or operator-level BASE_CONTAINER_NAME
self.container_logs = container_logs or self.base_container_name
self.image_pull_policy = image_pull_policy
self.node_selector = node_selector or {}
self.annotations = annotations or {}
Expand Down Expand Up @@ -394,7 +393,6 @@ def __init__(
if skip_on_exit_code is not None
else []
)
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.deferrable = deferrable
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
Expand Down Expand Up @@ -766,7 +764,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:

pod_log_status = self.pod_manager.fetch_container_logs(
pod=self.pod,
container_name=self.BASE_CONTAINER_NAME,
container_name=self.base_container_name,
follow=follow,
since_time=last_log_time,
)
Expand Down