Skip to content

Commit 4c82355

Browse files
authored
Replace raw config dicts with TypedDicts (#18)
* Add Typing for custom ResourceOptions by introducing TypedDicts for K8s, Docker, and Ray ResourceOptions. their filling value can be `None`. Then, we remove the `None`s by using the `remove_none_values` function. We add generic typing to the `remove_none_value` function and coerce back to the input type for consisten type hints. As the evaluation of the functional form is not deferred and using future imports has no effect, we use Union instead of | (pipe) to be compatible with Python 3.9 * Use single K8sResourceOption type as there is no conceptual difference within the two. As they may use different resources (in the limit and request) we do make the parameters optional. This also reflects the code logic which filters `None` values. We now also handle LIMIT and REQUEST types the same. This commit leaves a TODO that we have to decide if separate logic is warranted or not. Until then we do not remove the logical branches from the code, but right as of this commit both branches act the same.
1 parent 6c5d95e commit 4c82355

File tree

4 files changed

+90
-56
lines changed

4 files changed

+90
-56
lines changed

src/jobs/job.py

+61-44
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
import os
88
from dataclasses import dataclass
99
from pathlib import Path
10-
from typing import Any, Callable, Literal
10+
from typing import Any, Callable, TypedDict, Union
1111

1212
import docker.types
1313

1414
from jobs.assembler import config
1515
from jobs.assembler.renderers import RENDERERS
1616
from jobs.image import Image
17-
from jobs.types import AnyPath
17+
from jobs.types import AnyPath, K8sResourceKind
1818
from jobs.util import remove_none_values, run_command, to_rational
1919

2020

@@ -23,56 +23,73 @@ class BuildMode(enum.Enum):
2323
DOCKERFILE = "dockerfile"
2424

2525

26+
class DockerResourceOptions(TypedDict):
27+
mem_limit: str | None
28+
nano_cpus: float | None
29+
device_requests: list[docker.types.DeviceRequest] | None
30+
31+
32+
# Functional definition of TypedDict to enable special characters in dict keys
33+
K8sResourceOptions = TypedDict(
34+
"K8sResourceOptions",
35+
{
36+
"cpu": Union[str, None],
37+
"memory": Union[str, None],
38+
"nvidia.com/gpu": Union[int, None],
39+
},
40+
total=False,
41+
)
42+
43+
44+
class RayResourceOptions(TypedDict):
45+
entrypoint_memory: int | None
46+
entrypoint_num_cpus: int | None
47+
entrypoint_num_gpus: int | None
48+
49+
2650
@dataclass(frozen=True)
2751
class ResourceOptions:
2852
memory: str | None = None
2953
cpu: str | None = None
3054
gpu: int | None = None
3155

32-
def to_docker(self) -> dict[str, Any]:
33-
return remove_none_values(
34-
{
35-
"mem_limit": (
36-
str(int(to_rational(self.memory))) if self.memory else None
37-
),
38-
"nano_cpus": int(to_rational(self.cpu) * 10**9) if self.cpu else None,
39-
"device_requests": (
40-
[
41-
docker.types.DeviceRequest(
42-
capabilities=[["gpu"]],
43-
count=self.gpu,
44-
)
45-
]
46-
if self.gpu
47-
else None
48-
),
49-
}
50-
)
56+
def to_docker(self) -> DockerResourceOptions:
57+
options: DockerResourceOptions = {
58+
"mem_limit": str(int(to_rational(self.memory))) if self.memory else None,
59+
"nano_cpus": int(to_rational(self.cpu) * 10**9) if self.cpu else None,
60+
"device_requests": (
61+
[
62+
docker.types.DeviceRequest(
63+
capabilities=[["gpu"]],
64+
count=self.gpu,
65+
)
66+
]
67+
if self.gpu
68+
else None
69+
),
70+
}
71+
return remove_none_values(options)
5172

5273
def to_kubernetes(
53-
self, kind: Literal["requests", "limits"] = "requests"
54-
) -> dict[str, str]:
55-
if kind == "requests":
56-
return remove_none_values(
57-
{
58-
"cpu": self.cpu,
59-
"memory": self.memory,
60-
"nvidia.com/gpu": self.gpu,
61-
}
62-
)
63-
elif kind == "limits":
64-
return remove_none_values({"nvidia.com/gpu": self.gpu})
65-
66-
def to_ray(self) -> dict[str, Any]:
67-
return remove_none_values(
68-
{
69-
"entrypoint_memory": (
70-
int(to_rational(self.memory)) if self.memory else None
71-
),
72-
"entrypoint_num_cpus": int(to_rational(self.cpu)) if self.cpu else None,
73-
"entrypoint_num_gpus": self.gpu,
74-
}
75-
)
74+
self, kind: K8sResourceKind = K8sResourceKind.REQUESTS
75+
) -> K8sResourceOptions:
76+
# TODO: Currently kind is not accessed and the logic for "request" and "limit" is the same.
77+
# Down the road we have to decide if we want to keep it that way (and get rid of the distinction and arguments),
78+
# or if it makes sense for us to distinguish both cases.
79+
options: K8sResourceOptions = {
80+
"cpu": self.cpu or None,
81+
"memory": self.memory or None,
82+
"nvidia.com/gpu": self.gpu or None,
83+
}
84+
return remove_none_values(options)
85+
86+
def to_ray(self) -> RayResourceOptions:
87+
options: RayResourceOptions = {
88+
"entrypoint_memory": int(to_rational(self.memory)) if self.memory else None,
89+
"entrypoint_num_cpus": int(to_rational(self.cpu)) if self.cpu else None,
90+
"entrypoint_num_gpus": self.gpu or None,
91+
}
92+
return remove_none_values(options)
7693

7794

7895
@dataclass(frozen=True)

src/jobs/runner.py

+17-9
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import jobs
2020
from jobs import Image, Job
21-
from jobs.util import sanitize_rfc1123_domain_name
21+
from jobs.job import DockerResourceOptions
22+
from jobs.types import K8sResourceKind
23+
from jobs.util import remove_none_values, sanitize_rfc1123_domain_name
2224

2325
JOBS_EXECUTE_CMD = "jobs_execute"
2426

@@ -44,15 +46,19 @@ def __init__(self):
4446
def run(self, job: Job, image: Image) -> None:
4547
command = _make_executor_command(job)
4648

47-
resource_kwargs = {}
49+
resource_kwargs: DockerResourceOptions = {
50+
"mem_limit": None,
51+
"nano_cpus": None,
52+
"device_requests": None,
53+
}
4854
if job.options and (res := job.options.resources):
4955
resource_kwargs = res.to_docker()
5056

5157
container: docker.api.client.ContainerApiMixin = self._client.containers.run(
5258
image=image.tag,
5359
command=command,
5460
detach=True,
55-
**resource_kwargs,
61+
**remove_none_values(resource_kwargs),
5662
)
5763

5864
exit_code = container.wait()
@@ -83,12 +89,14 @@ def _make_job_crd(self, job: Job, image: Image) -> client.V1Job:
8389
image_pull_policy="IfNotPresent",
8490
name="dummy-job",
8591
command=_make_executor_command(job),
86-
resources={
87-
"requests": res.to_kubernetes(kind="requests"),
88-
"limits": res.to_kubernetes(kind="limits"),
89-
}
90-
if job.options and (res := job.options.resources)
91-
else None,
92+
resources=(
93+
{
94+
"requests": res.to_kubernetes(kind=K8sResourceKind.REQUESTS),
95+
"limits": res.to_kubernetes(kind=K8sResourceKind.LIMITS),
96+
}
97+
if job.options and (res := job.options.resources)
98+
else None
99+
),
92100
)
93101

94102
# Job template

src/jobs/types.py

+6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
import os
2+
from enum import Enum
23
from typing import Union
34

45
AnyPath = Union[os.PathLike[str], str]
6+
7+
8+
class K8sResourceKind(Enum):
9+
REQUESTS = "requests"
10+
LIMITS = "limits"

src/jobs/util.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
import threading
88
import time
99
from io import TextIOBase
10-
from typing import IO, AnyStr, Iterable, Mapping, TextIO
10+
from typing import IO, Any, AnyStr, Iterable, Mapping, TextIO, TypeVar, cast
1111

1212
from jobs.types import AnyPath
1313

14+
T = TypeVar("T", bound=Mapping[str, Any])
15+
1416

1517
def to_rational(s: str) -> float:
1618
"""Convert a number with optional SI/binary unit to floating-point"""
@@ -42,9 +44,10 @@ def to_rational(s: str) -> float:
4244
return factor * magnitude
4345

4446

45-
def remove_none_values(d: dict) -> dict:
47+
def remove_none_values(d: T) -> T:
4648
"""Remove all keys with a ``None`` value from a dict."""
47-
return {k: v for k, v in d.items() if v is not None}
49+
filtered_dict = {k: v for k, v in d.items() if v is not None}
50+
return cast(T, filtered_dict)
4851

4952

5053
def sanitize_rfc1123_domain_name(s: str) -> str:

0 commit comments

Comments
 (0)