Skip to content

Commit 592df31

Browse files
authored
Implement job status API endpoint (#46)
* feat(backend): Implement job status endpoint The `GET /jobs/{job_id}/status` API endpoint reports the high-level execution status of a Job (identified by its UID). The associated Kueue workload for the job is looked up and its status is used to determine the execution status (pending, executing, failed, succeeded). * fix(backend): Return 404 for missing workloads * fix(backend): Improve readability of `traverse` * fix(backend): Rename kubernetes helper module The rename prevents a name clash with the kubernetes client SDK package.
1 parent 91c00e7 commit 592df31

File tree

12 files changed

+399
-78
lines changed

12 files changed

+399
-78
lines changed

.editorconfig

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ root = true
99
end_of_line = lf
1010
# Consistency
1111
insert_final_newline = true
12-
indent_size = 2
12+
indent_size = 4

backend/src/jobs_server/exceptions.py

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
class WorkloadNotFound(Exception):
2+
"""Exception indicating a non-existing Kueue workload"""
3+
4+
def __init__(
5+
self,
6+
message: str = "workload not found",
7+
uid: str | None = None,
8+
namespace: str | None = None,
9+
):
10+
super().__init__(message)
11+
self.uid = uid
12+
self.namespace = namespace

backend/src/jobs_server/models.py

+8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import re
2+
from enum import StrEnum
23
from typing import Annotated, Any, TypeAlias
34

45
from jobs import JobOptions
@@ -49,3 +50,10 @@ class WorkloadIdentifier(BaseModel):
4950

5051
namespace: StrictStr
5152
uid: StrictStr
53+
54+
55+
class JobStatus(StrEnum):
56+
PENDING = "pending"
57+
EXECUTING = "executing"
58+
SUCCEEDED = "succeeded"
59+
FAILED = "failed"

backend/src/jobs_server/routers/jobs.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
from fastapi import APIRouter, HTTPException
22
from jobs import Image, Job
33

4-
from jobs_server.models import CreateJobModel, ExecutionMode, WorkloadIdentifier
4+
from jobs_server.exceptions import WorkloadNotFound
5+
from jobs_server.models import (
6+
CreateJobModel,
7+
ExecutionMode,
8+
JobId,
9+
WorkloadIdentifier,
10+
)
511
from jobs_server.runner import Runner
12+
from jobs_server.utils.kueue import KueueWorkload
613

714
router = APIRouter(tags=["Job management"])
815

@@ -33,3 +40,12 @@ def job_fn(): ...
3340
image = Image(opts.image_ref)
3441
workload_id = runner.run(job, image, opts.submission_context)
3542
return workload_id
43+
44+
45+
@router.get("/jobs/{uid}/status")
46+
async def status(uid: JobId, namespace: str = "default"):
47+
try:
48+
workload = KueueWorkload.for_managed_resource(uid, namespace)
49+
return workload.execution_status
50+
except WorkloadNotFound as e:
51+
raise HTTPException(404, "workload not found") from e

backend/src/jobs_server/runner/kueue.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from jobs_server.models import SubmissionContext, WorkloadIdentifier
99
from jobs_server.runner.base import ExecutionMode, Runner, _make_executor_command
10-
from jobs_server.utils.kubernetes import (
10+
from jobs_server.utils.k8s import (
1111
KubernetesNamespaceMixin,
1212
gvk,
1313
k8s_annotations,

backend/src/jobs_server/runner/ray.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from jobs_server.models import SubmissionContext, WorkloadIdentifier
2222
from jobs_server.runner.base import ExecutionMode, Runner, _make_executor_command
23-
from jobs_server.utils.kubernetes import (
23+
from jobs_server.utils.k8s import (
2424
KubernetesNamespaceMixin,
2525
gvk,
2626
k8s_annotations,

backend/src/jobs_server/utils/helpers.py

+84
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,87 @@ def remove_none_values(d: T) -> T:
1010
"""Remove all keys with a ``None`` value from a dict."""
1111
filtered_dict = {k: v for k, v in d.items() if v is not None}
1212
return cast(T, filtered_dict)
13+
14+
15+
def traverse(d: Any, key_path: str, sep: str = ".", strict: bool = True) -> Any:
16+
"""
17+
Retrieve a value from a nested Mapping-like object using a key path.
18+
19+
If the object behaves like a Mapping (i.e., implements `__getitem__`),
20+
this function will be used to access elements.
21+
If it behaves like an object (i.e., `__getattr__`), the path will be
22+
resolved as attributes, instead.
23+
24+
Parameters
25+
----------
26+
d : dict
27+
The object to traverse.
28+
key_path : str
29+
A string representing the path of keys, separated by `sep`.
30+
sep : str, optional
31+
The separator used to split the key path into individual keys.
32+
Default is ".".
33+
strict : bool, optional
34+
If False, return None when a key in the path does not exist.
35+
If True, raise a KeyError when a key does not exist.
36+
Default is False.
37+
38+
Returns
39+
-------
40+
Any
41+
The value at the specified key path, or None if a key is missing
42+
and `strict` is False.
43+
44+
Raises
45+
------
46+
KeyError
47+
If `strict` is True and any key in the path does not exist.
48+
49+
Examples
50+
--------
51+
>>> d = {"foo": {"bar": {"baz": 42}}}
52+
>>> traverse(d, "foo.bar.baz")
53+
42
54+
55+
>>> traverse(d, "foo.bar.qux", strict=False) is None
56+
True
57+
58+
>>> traverse(d, "foo.bar.qux", strict=True)
59+
Traceback (most recent call last):
60+
...
61+
KeyError: 'qux'
62+
"""
63+
64+
def has_item(container, key):
65+
if hasattr(container, "__contains__"):
66+
# Container behaves like a dict
67+
return key in container
68+
elif hasattr(container, key):
69+
# Container behaves like an object
70+
return True
71+
else:
72+
return False
73+
74+
def get_item(container, key, default=None):
75+
if hasattr(container, "__getitem__"):
76+
# Container behaves like a dict
77+
try:
78+
return container[key]
79+
except (KeyError, IndexError, TypeError):
80+
return default
81+
elif hasattr(container, key):
82+
# Container behaves like an object
83+
return getattr(container, key, default)
84+
else:
85+
return default
86+
87+
keys = key_path.split(sep)
88+
for key in keys:
89+
# Bail out on missing entries in strict mode
90+
if strict and not has_item(d, key):
91+
raise KeyError(key)
92+
93+
d = get_item(d, key)
94+
if d is None:
95+
return None
96+
return d

backend/src/jobs_server/utils/k8s.py

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
from __future__ import annotations
2+
3+
import json
4+
from dataclasses import dataclass
5+
from typing import Any, Protocol
6+
7+
import kubernetes
8+
from jobs.job import Job
9+
10+
from jobs_server.models import SubmissionContext
11+
from jobs_server.utils.helpers import traverse
12+
13+
14+
def sanitize_rfc1123_domain_name(s: str) -> str:
15+
"""Sanitize a string to be compliant with RFC 1123 domain name
16+
17+
Note: Any invalid characters are replaced with dashes."""
18+
19+
# TODO: This is obviously wildly incomplete
20+
return s.replace("_", "-")
21+
22+
23+
def k8s_annotations(
24+
job: Job, context: SubmissionContext | None = None
25+
) -> dict[str, str]:
26+
"""Determine the Kubernetes annotations for a Job"""
27+
# Store as annotations since labels have restrictive value formats
28+
options = job.options.labels if job.options else {}
29+
context = {"x-jobby.io/submission-context": json.dumps(context)} if context else {}
30+
return options | context
31+
32+
33+
@dataclass
34+
class GroupVersionKind:
35+
group: str
36+
version: str
37+
kind: str
38+
39+
40+
class KubernetesObject(Protocol):
41+
@property
42+
def api_version(self) -> str: ...
43+
44+
@property
45+
def kind(self) -> str: ...
46+
47+
48+
def gvk(obj: KubernetesObject | dict[str, Any]) -> GroupVersionKind:
49+
kind = obj.kind if hasattr(obj, "kind") else obj["kind"]
50+
if "/" in (
51+
api_version := obj.api_version
52+
if hasattr(obj, "api_version")
53+
else obj["apiVersion"]
54+
):
55+
group, version = api_version.split("/")
56+
else:
57+
group, version = "", api_version
58+
59+
return GroupVersionKind(group, version, kind)
60+
61+
62+
class KubernetesNamespaceMixin:
63+
"""Determine the desired or current Kubernetes namespace."""
64+
65+
def __init__(self, **kwargs):
66+
kubernetes.config.load_config()
67+
self._namespace: str | None = kwargs.get("namespace")
68+
69+
@property
70+
def namespace(self) -> str:
71+
_, active_context = kubernetes.config.list_kube_config_contexts()
72+
current_namespace = active_context["context"].get("namespace")
73+
return self._namespace or current_namespace
74+
75+
76+
def filter_conditions(
77+
obj: dict[str, Any],
78+
typ: str | None = None,
79+
reason: str | None = None,
80+
message: str | None = None,
81+
):
82+
"""
83+
Filters Kubernetes object conditions based on specified attributes.
84+
85+
This function filters the `status.conditions` field of a Kubernetes object
86+
by matching conditions against the provided `type`, `reason`, and `message`
87+
attributes. Only conditions that match all specified attributes are included
88+
in the result.
89+
90+
Parameters
91+
----------
92+
obj : dict[str, Any]
93+
The Kubernetes object, typically a dictionary representing a Kubernetes
94+
resource, containing a `status.conditions` field.
95+
typ : str, optional
96+
The type of condition to filter by. If `None`, this filter is not applied.
97+
reason : str, optional
98+
The reason attribute to filter by. If `None`, this filter is not applied.
99+
message : str, optional
100+
The message attribute to filter by. If `None`, this filter is not applied.
101+
102+
Returns
103+
-------
104+
list[dict[str, Any]]
105+
A list of conditions that match the specified filters. Each condition
106+
is represented as a dictionary.
107+
108+
Notes
109+
-----
110+
- The function assumes that the `status.conditions` field exists in the
111+
provided object and that it is a list of condition dictionaries.
112+
- If no conditions match the specified filters, an empty list is returned.
113+
114+
Examples
115+
--------
116+
>>> obj = {
117+
... "status": {
118+
... "conditions": [
119+
... {"type": "Ready", "reason": "DeploymentCompleted", "message": "Deployment successful."},
120+
... {"type": "Failed", "reason": "DeploymentFailed", "message": "Deployment failed due to timeout."}
121+
... ]
122+
... }
123+
... }
124+
>>> filter_conditions(obj, typ="Ready")
125+
[{'type': 'Ready', 'reason': 'DeploymentCompleted', 'message': 'Deployment successful.'}]
126+
127+
>>> filter_conditions(obj, reason="DeploymentFailed")
128+
[{'type': 'Failed', 'reason': 'DeploymentFailed', 'message': 'Deployment failed due to timeout.'}]
129+
"""
130+
131+
def _match(cond):
132+
return all([
133+
typ is None or cond["type"] == typ,
134+
reason is None or cond["reason"] == reason,
135+
message is None or cond["message"] == message,
136+
])
137+
138+
return [cond for cond in traverse(obj, "status.conditions") if _match(cond)]

backend/src/jobs_server/utils/kubernetes.py

-72
This file was deleted.

0 commit comments

Comments
 (0)