-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlist.py
94 lines (83 loc) · 3.28 KB
/
list.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import argparse
import operator
from datetime import datetime, timezone
from typing import Any
from humanize import naturaltime, precisedelta
from rich import box
from rich.console import Console
from rich.table import Table
import openapi_client
from openapi_client.models import JobStatus
from .util import with_job_mgmt_api
@with_job_mgmt_api
def list_workloads(
client: openapi_client.JobManagementApi, args: argparse.Namespace
) -> None:
def format_status(s: JobStatus) -> str:
match s:
case JobStatus.SUCCEEDED:
return "[bright_green]" + s.value + "[/]"
case JobStatus.FAILED:
return "[bright_red bold]" + s.value + "[/]"
case _:
return s.value
def status_flags(wl: openapi_client.WorkloadMetadata) -> str:
if wl.was_evicted or wl.was_inadmissible:
return "[bright_yellow] [!][/]"
# if the job is already failed, we don't really need to warn anymore.
elif wl.has_failed_pods and wl.execution_status != JobStatus.FAILED:
return "[bright_red] [!][/]"
else:
return ""
resp = client.list_jobs_jobs_get(include_metadata=True)
t = Table(box=box.MINIMAL, show_lines=True, pad_edge=False)
t.add_column("Name", min_width=36) # accommodate for the workload UUID
t.add_column("Type")
t.add_column("Status")
t.add_column("Queue name")
t.add_column("Priority")
t.add_column("Submitted")
t.add_column("Execution time")
now = datetime.now(tz=timezone.utc).replace(microsecond=0)
for wl in sorted(
resp, key=operator.attrgetter("metadata.submission_timestamp"), reverse=True
):
meta = wl.metadata
cluster_queue = (
meta.kueue_status.admission.cluster_queue
if meta.kueue_status and meta.kueue_status.admission
else None
)
t.add_row(
f"{wl.name}{status_flags(meta)}\n[bright_black]{wl.id.uid}[/]",
f"[bright_black]{wl.id.group}/{wl.id.version}/[/]{wl.id.kind}",
f"{format_status(meta.execution_status)}",
f"{meta.spec.queue_name}\n[bright_black]↳ {cluster_queue}[/]",
f"{meta.spec.priority_class_name or '[bright_black]None[/]'}",
f"{naturaltime(meta.submission_timestamp)}",
f"{precisedelta((meta.termination_timestamp or now) - meta.last_admission_timestamp) if meta.last_admission_timestamp else '---'}",
)
Console().print(t)
def add_parser(subparsers: Any, parent: argparse.ArgumentParser) -> None:
# jobby status, the status querying command
parser: argparse.ArgumentParser = subparsers.add_parser(
"list",
parents=[parent],
description="List all previously dispatched jobs.",
)
# unique identifier of the job
parser.add_argument(
"--limit",
metavar="<N>",
default=None,
help="Limit the listing to only a number of the most recent workloads.",
)
parser.add_argument(
"--filter",
metavar="<cond>",
action="append",
help="Filter existing workloads by a condition of the form <key>=<value> "
"(e.g. status='succeeded'). Can be supplied multiple times for multiple "
"conditions.",
)
parser.set_defaults(func=list_workloads)