Skip to content

Commit 5e8119a

Browse files
Merge branch 'apache:main' into teradata_release_1.0.0
2 parents 3b79807 + 9f4f8b3 commit 5e8119a

File tree

71 files changed

+2097
-391
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+2097
-391
lines changed

airflow/providers/amazon/aws/log/cloudwatch_task_handler.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20-
from datetime import date, datetime, timedelta
20+
from datetime import date, datetime, timedelta, timezone
2121
from functools import cached_property
2222
from typing import TYPE_CHECKING, Any
2323

@@ -163,7 +163,7 @@ def get_cloudwatch_logs(self, stream_name: str, task_instance: TaskInstance) ->
163163
return "\n".join(self._event_to_str(event) for event in events)
164164

165165
def _event_to_str(self, event: dict) -> str:
166-
event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0)
166+
event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc)
167167
formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
168168
message = event["message"]
169169
return f"[{formatted_event_dt}] {message}"

airflow/providers/amazon/aws/utils/task_log_fetcher.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from __future__ import annotations
1919

2020
import time
21-
from datetime import datetime, timedelta
21+
from datetime import datetime, timedelta, timezone
2222
from threading import Event, Thread
2323
from typing import TYPE_CHECKING, Generator
2424

@@ -87,7 +87,7 @@ def _get_log_events(self, skip_token: AwsLogsHook.ContinuationToken | None = Non
8787

8888
@staticmethod
8989
def event_to_str(event: dict) -> str:
90-
event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0)
90+
event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc)
9191
formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
9292
message = event["message"]
9393
return f"[{formatted_event_dt}] {message}"

airflow/providers/cncf/kubernetes/hooks/kubernetes.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from airflow.utils import yaml
3838

3939
if TYPE_CHECKING:
40-
from kubernetes.client.models import V1Deployment, V1Pod
40+
from kubernetes.client.models import V1Deployment, V1Job, V1Pod
4141

4242
LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..."
4343

@@ -290,6 +290,10 @@ def apps_v1_client(self) -> client.AppsV1Api:
290290
def custom_object_client(self) -> client.CustomObjectsApi:
291291
return client.CustomObjectsApi(api_client=self.api_client)
292292

293+
@cached_property
294+
def batch_v1_client(self) -> client.BatchV1Api:
295+
return client.BatchV1Api(api_client=self.api_client)
296+
293297
def create_custom_object(
294298
self, group: str, version: str, plural: str, body: str | dict, namespace: str | None = None
295299
):
@@ -472,6 +476,32 @@ def get_deployment_status(
472476
except Exception as exc:
473477
raise exc
474478

479+
def create_job(
480+
self,
481+
job: V1Job,
482+
**kwargs,
483+
) -> V1Job:
484+
"""
485+
Run Job.
486+
487+
:param job: A kubernetes Job object
488+
"""
489+
sanitized_job = self.batch_v1_client.api_client.sanitize_for_serialization(job)
490+
json_job = json.dumps(sanitized_job, indent=2)
491+
492+
self.log.debug("Job Creation Request: \n%s", json_job)
493+
try:
494+
resp = self.batch_v1_client.create_namespaced_job(
495+
body=sanitized_job, namespace=job.metadata.namespace, **kwargs
496+
)
497+
self.log.debug("Job Creation Response: %s", resp)
498+
except Exception as e:
499+
self.log.exception(
500+
"Exception when attempting to create Namespaced Job: %s", str(json_job).replace("\n", " ")
501+
)
502+
raise e
503+
return resp
504+
475505

476506
def _get_bool(val) -> bool | None:
477507
"""Convert val to bool if can be done with certainty; if we cannot infer intention we return None."""

airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py

+58
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import logging
2020
import secrets
2121
import string
22+
import warnings
2223
from typing import TYPE_CHECKING
2324

2425
import pendulum
2526
from slugify import slugify
2627

2728
from airflow.compat.functools import cache
2829
from airflow.configuration import conf
30+
from airflow.exceptions import AirflowProviderDeprecationWarning
2931

3032
if TYPE_CHECKING:
3133
from airflow.models.taskinstancekey import TaskInstanceKey
@@ -45,6 +47,18 @@ def rand_str(num):
4547
return "".join(secrets.choice(alphanum_lower) for _ in range(num))
4648

4749

50+
def add_unique_suffix(*, name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str:
51+
"""Add random string to pod or job name while staying under max length.
52+
53+
:param name: name of the pod or job
54+
:param rand_len: length of the random string to append
55+
:param max_len: maximum length of the pod name
56+
:meta private:
57+
"""
58+
suffix = "-" + rand_str(rand_len)
59+
return name[: max_len - len(suffix)].strip("-.") + suffix
60+
61+
4862
def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str:
4963
"""Add random string to pod name while staying under max length.
5064
@@ -53,10 +67,48 @@ def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_
5367
:param max_len: maximum length of the pod name
5468
:meta private:
5569
"""
70+
warnings.warn(
71+
"This function is deprecated. Please use `add_unique_suffix`.",
72+
AirflowProviderDeprecationWarning,
73+
stacklevel=2,
74+
)
75+
5676
suffix = "-" + rand_str(rand_len)
5777
return pod_name[: max_len - len(suffix)].strip("-.") + suffix
5878

5979

80+
def create_unique_id(
81+
dag_id: str | None = None,
82+
task_id: str | None = None,
83+
*,
84+
max_length: int = POD_NAME_MAX_LENGTH,
85+
unique: bool = True,
86+
) -> str:
87+
"""
88+
Generate unique pod or job ID given a dag_id and / or task_id.
89+
90+
:param dag_id: DAG ID
91+
:param task_id: Task ID
92+
:param max_length: max number of characters
93+
:param unique: whether a random string suffix should be added
94+
:return: A valid identifier for a kubernetes pod name
95+
"""
96+
if not (dag_id or task_id):
97+
raise ValueError("Must supply either dag_id or task_id.")
98+
name = ""
99+
if dag_id:
100+
name += dag_id
101+
if task_id:
102+
if name:
103+
name += "-"
104+
name += task_id
105+
base_name = slugify(name, lowercase=True)[:max_length].strip(".-")
106+
if unique:
107+
return add_pod_suffix(pod_name=base_name, rand_len=8, max_len=max_length)
108+
else:
109+
return base_name
110+
111+
60112
def create_pod_id(
61113
dag_id: str | None = None,
62114
task_id: str | None = None,
@@ -73,6 +125,12 @@ def create_pod_id(
73125
:param unique: whether a random string suffix should be added
74126
:return: A valid identifier for a kubernetes pod name
75127
"""
128+
warnings.warn(
129+
"This function is deprecated. Please use `create_unique_id`.",
130+
AirflowProviderDeprecationWarning,
131+
stacklevel=2,
132+
)
133+
76134
if not (dag_id or task_id):
77135
raise ValueError("Must supply either dag_id or task_id.")
78136
name = ""

0 commit comments

Comments
 (0)