Skip to content

Commit 26b30e2

Browse files
committed
feat(backend): Implement job status endpoint
The `GET /jobs/{job_id}/status` API endpoint reports the high-level execution status of a Job (identified by its UID). The associated Kueue workload for the job is looked up and its status is used to determine the execution status (pending, executing, failed, succeeded).
1 parent 91c00e7 commit 26b30e2

File tree

6 files changed

+282
-2
lines changed

6 files changed

+282
-2
lines changed

backend/src/jobs_server/models.py

+8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import re
2+
from enum import StrEnum
23
from typing import Annotated, Any, TypeAlias
34

45
from jobs import JobOptions
@@ -49,3 +50,10 @@ class WorkloadIdentifier(BaseModel):
4950

5051
namespace: StrictStr
5152
uid: StrictStr
53+
54+
55+
class JobStatus(StrEnum):
56+
PENDING = "pending"
57+
EXECUTING = "executing"
58+
SUCCEEDED = "succeeded"
59+
FAILED = "failed"

backend/src/jobs_server/routers/jobs.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
from fastapi import APIRouter, HTTPException
22
from jobs import Image, Job
33

4-
from jobs_server.models import CreateJobModel, ExecutionMode, WorkloadIdentifier
4+
from jobs_server.models import (
5+
CreateJobModel,
6+
ExecutionMode,
7+
JobId,
8+
WorkloadIdentifier,
9+
)
510
from jobs_server.runner import Runner
11+
from jobs_server.utils.kueue import KueueWorkload
612

713
router = APIRouter(tags=["Job management"])
814

@@ -33,3 +39,9 @@ def job_fn(): ...
3339
image = Image(opts.image_ref)
3440
workload_id = runner.run(job, image, opts.submission_context)
3541
return workload_id
42+
43+
44+
@router.get("/jobs/{uid}/status")
45+
async def status(uid: JobId, namespace: str = "default"):
46+
workload = KueueWorkload.for_managed_resource(uid, namespace)
47+
return workload.execution_status

backend/src/jobs_server/utils/helpers.py

+86
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,89 @@ def remove_none_values(d: T) -> T:
1010
"""Remove all keys with a ``None`` value from a dict."""
1111
filtered_dict = {k: v for k, v in d.items() if v is not None}
1212
return cast(T, filtered_dict)
13+
14+
15+
def traverse(d: Any, key_path: str, sep: str = ".", strict: bool = True) -> Any:
16+
"""
17+
Retrieve a value from a nested Mapping-like object using a key path.
18+
19+
If the object behaves like a Mapping (i.e., implements `__getitem__`),
20+
this function will be used to access elements.
21+
If it behaves like an object (i.e., `__getattr__`), the path will be
22+
resolved as attributes, instead.
23+
24+
Parameters
25+
----------
26+
d : dict
27+
The object to traverse.
28+
key_path : str
29+
A string representing the path of keys, separated by `sep`.
30+
sep : str, optional
31+
The separator used to split the key path into individual keys.
32+
Default is ".".
33+
strict : bool, optional
34+
If False, return None when a key in the path does not exist.
35+
If True, raise a KeyError when a key does not exist.
36+
Default is False.
37+
38+
Returns
39+
-------
40+
Any
41+
The value at the specified key path, or None if a key is missing
42+
and `strict` is False.
43+
44+
Raises
45+
------
46+
KeyError
47+
If `strict` is True and any key in the path does not exist.
48+
49+
Examples
50+
--------
51+
>>> d = {"foo": {"bar": {"baz": 42}}}
52+
>>> traverse(d, "foo.bar.baz")
53+
42
54+
55+
>>> traverse(d, "foo.bar.qux", strict=False)
56+
None
57+
58+
>>> traverse(d, "foo.bar.qux", strict=True)
59+
Traceback (most recent call last):
60+
...
61+
KeyError: 'qux'
62+
"""
63+
64+
def has_item(container, key):
65+
# Check if the container is a dictionary or has the __contains__ method
66+
if hasattr(container, "__contains__"):
67+
return key in container
68+
# Check if it's an object with attributes
69+
elif hasattr(container, key):
70+
return True
71+
else:
72+
return False
73+
74+
def get_item(container, key, default=None):
75+
# Check if the container is a dictionary or supports the `__getitem__` method
76+
if hasattr(container, "__getitem__"):
77+
try:
78+
return container[key]
79+
except (KeyError, IndexError, TypeError):
80+
return default
81+
# Check if it's an object with attributes
82+
elif hasattr(container, key):
83+
return getattr(container, key, default)
84+
else:
85+
return default
86+
87+
keys = key_path.split(sep)
88+
for key in keys:
89+
# Bail out on missing keys in strict mode
90+
if strict:
91+
found = has_item(d, key)
92+
if not found:
93+
raise KeyError()
94+
95+
d = get_item(d, key)
96+
if d is None:
97+
return None
98+
return d

backend/src/jobs_server/utils/kubernetes.py

+69
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from jobs.job import Job
99

1010
from jobs_server.models import SubmissionContext
11+
from jobs_server.utils.helpers import traverse
1112

1213

1314
def sanitize_rfc1123_domain_name(s: str) -> str:
@@ -70,3 +71,71 @@ def namespace(self) -> str:
7071
_, active_context = kubernetes.config.list_kube_config_contexts()
7172
current_namespace = active_context["context"].get("namespace")
7273
return self._namespace or current_namespace
74+
75+
76+
def filter_conditions(
77+
obj: dict[str, Any],
78+
typ: str | None = None,
79+
reason: str | None = None,
80+
message: str | None = None,
81+
):
82+
"""
83+
Filters Kubernetes object conditions based on specified attributes.
84+
85+
This function filters the `status.conditions` field of a Kubernetes object
86+
by matching conditions against the provided `type`, `reason`, and `message`
87+
attributes. Only conditions that match all specified attributes are included
88+
in the result.
89+
90+
Parameters
91+
----------
92+
obj : dict[str, Any]
93+
The Kubernetes object, typically a dictionary representing a Kubernetes
94+
resource, containing a `status.conditions` field.
95+
typ : str, optional
96+
The type of condition to filter by. If `None`, this filter is not applied.
97+
reason : str, optional
98+
The reason attribute to filter by. If `None`, this filter is not applied.
99+
message : str, optional
100+
The message attribute to filter by. If `None`, this filter is not applied.
101+
102+
Returns
103+
-------
104+
list[dict[str, Any]]
105+
A list of conditions that match the specified filters. Each condition
106+
is represented as a dictionary.
107+
108+
Notes
109+
-----
110+
- The function assumes that the `status.conditions` field exists in the
111+
provided object and that it is a list of condition dictionaries.
112+
- If no conditions match the specified filters, an empty list is returned.
113+
114+
Examples
115+
--------
116+
>>> obj = {
117+
... "status": {
118+
... "conditions": [
119+
... {"type": "Ready", "reason": "DeploymentCompleted", "message": "Deployment successful."},
120+
... {"type": "Failed", "reason": "DeploymentFailed", "message": "Deployment failed due to timeout."}
121+
... ]
122+
... }
123+
... }
124+
>>> filter_conditions(obj, typ="Ready")
125+
[{'type': 'Ready', 'reason': 'DeploymentCompleted', 'message': 'Deployment successful.'}]
126+
127+
>>> filter_conditions(obj, reason="DeploymentFailed")
128+
[{'type': 'Failed', 'reason': 'DeploymentFailed', 'message': 'Deployment failed due to timeout.'}]
129+
"""
130+
131+
def _match(cond):
132+
match = True
133+
if typ is not None:
134+
match &= cond["type"] == typ
135+
if reason is not None:
136+
match &= cond["reason"] == reason
137+
if message is not None:
138+
match &= cond["message"] == message
139+
return match
140+
141+
return [cond for cond in traverse(obj, "status.conditions") if _match(cond)]

backend/src/jobs_server/utils/kueue.py

+76-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
from collections.abc import Mapping
2-
from typing import cast
2+
from typing import Any, cast
33

44
from jobs.job import Job
55
from jobs.utils.helpers import remove_none_values
66
from kubernetes import client
7+
from pydantic import BaseModel, ConfigDict
8+
9+
from jobs_server.models import JobId, JobStatus
10+
from jobs_server.utils.helpers import traverse
11+
from jobs_server.utils.kubernetes import filter_conditions
712

813

914
def assert_kueue_localqueue(namespace: str, name: str) -> bool:
@@ -63,3 +68,73 @@ def kueue_scheduling_labels(job: Job, namespace: str) -> Mapping[str, str]:
6368
),
6469
}),
6570
)
71+
72+
73+
def workload_by_managed_uid(uid: JobId, namespace: str):
74+
"""Find a Kueue Workload by the UID of its underlying job."""
75+
76+
api = client.CustomObjectsApi()
77+
objs = api.list_namespaced_custom_object(
78+
"kueue.x-k8s.io",
79+
"v1beta1",
80+
namespace,
81+
"workloads",
82+
label_selector=f"kueue.x-k8s.io/job-uid={uid}",
83+
).get("items")
84+
85+
if not objs:
86+
return
87+
return objs[0]
88+
89+
90+
class WorkloadSpec(BaseModel):
91+
podSets: list
92+
queueName: str
93+
priorityClassName: str
94+
priority: int
95+
priorityClassSource: str
96+
active: bool
97+
98+
99+
class WorkloadAdmission(BaseModel):
100+
clusterQueue: str
101+
podSetAssignments: list
102+
103+
104+
class WorkloadStatus(BaseModel):
105+
conditions: list[dict[str, Any]]
106+
admission: WorkloadAdmission | None = None
107+
requeueState: Any | None = None
108+
reclaimablePods: list | None = None
109+
admissionChecks: list | None = None
110+
111+
112+
class KueueWorkload(BaseModel):
113+
"""Wrapper class for Kueue Workload resources.
114+
115+
See https://kueue.sigs.k8s.io/docs/reference/kueue.v1beta1/#kueue-x-k8s-io-v1beta1-Workload.
116+
"""
117+
118+
metadata: dict[str, Any]
119+
spec: WorkloadSpec
120+
status: WorkloadStatus
121+
122+
model_config = ConfigDict(
123+
arbitrary_types_allowed=False,
124+
)
125+
126+
@classmethod
127+
def for_managed_resource(cls, uid: str, namespace: str):
128+
workload = workload_by_managed_uid(uid, namespace)
129+
return cls.model_validate(workload)
130+
131+
@property
132+
def execution_status(self) -> JobStatus:
133+
if filter_conditions(self, reason="Succeeded"):
134+
return JobStatus.SUCCEEDED
135+
elif filter_conditions(self, reason="Failed"):
136+
return JobStatus.FAILED
137+
elif traverse(self, "status.admission", strict=False) is not None:
138+
return JobStatus.EXECUTING
139+
else:
140+
return JobStatus.PENDING

backend/tests/unit/test_util.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from typing import Any
2+
3+
import pytest
4+
5+
from jobs_server.utils.helpers import traverse
6+
7+
8+
@pytest.mark.parametrize(
9+
"d, key_path, sep, strict, expected",
10+
[
11+
# Non-strict mode
12+
({"foo": {"bar": {"baz": 42}}}, "foo.bar.baz", ".", False, 42),
13+
({"foo": {"bar": {"baz": 42}}}, "foo.bar.qux", ".", False, None),
14+
({"foo": {"bar": {"baz": 42}}}, "foo.qux.baz", ".", False, None),
15+
({"foo": {"bar": {"baz": None}}}, "foo.bar.baz", ".", False, None),
16+
# Strict mode
17+
({"foo": {"bar": {"baz": 42}}}, "foo.bar.baz", ".", True, 42),
18+
({"foo": {"bar": {"baz": 42}}}, "foo.qux.baz", ".", True, KeyError),
19+
({"foo": {"bar": {"baz": 42}}}, "foo-bar-qux", "-", True, KeyError),
20+
({"foo": {"bar": {"baz": None}}}, "foo.bar.baz", ".", True, None),
21+
],
22+
)
23+
def test_path_dict(
24+
d: dict[str, Any], key_path: str, sep: str, strict: bool, expected: Any
25+
):
26+
if strict and isinstance(expected, type):
27+
with pytest.raises(expected):
28+
traverse(d, key_path, sep, strict)
29+
else:
30+
assert traverse(d, key_path, sep, strict) == expected

0 commit comments

Comments
 (0)