@@ -69,8 +69,45 @@ def __init__(self, **kwargs: str) -> None:
69
69
self ._queue = kwargs .get ("local_queue" , "user-queue" )
70
70
config .load_kube_config ()
71
71
72
- def _make_job_crd (self , job : Job , image : Image ) -> client .V1Job :
72
+ def _make_job_crd (self , job : Job , image : Image , namespace : str ) -> client .V1Job :
73
+ def _assert_kueue_localqueue (name : str ) -> bool :
74
+ try :
75
+ _ = client .CustomObjectsApi ().get_namespaced_custom_object (
76
+ "kueue.x-k8s.io" ,
77
+ "v1beta1" ,
78
+ namespace ,
79
+ "localqueues" ,
80
+ name ,
81
+ )
82
+ return True
83
+ except client .exceptions .ApiException :
84
+ return False
85
+
86
+ def _assert_kueue_workloadpriorityclass (name : str ) -> bool :
87
+ try :
88
+ _ = client .CustomObjectsApi ().get_cluster_custom_object (
89
+ "kueue.x-k8s.io" ,
90
+ "v1beta1" ,
91
+ "workloadpriorityclasses" ,
92
+ name ,
93
+ )
94
+ return True
95
+ except client .exceptions .ApiException :
96
+ return False
97
+
73
98
sched_opts = job .options .scheduling
99
+ if sched_opts :
100
+ if queue := sched_opts .queue_name :
101
+ if not _assert_kueue_localqueue (queue ):
102
+ raise ValueError (
103
+ f"Specified Kueue local queue does not exist: { queue !r} "
104
+ )
105
+ if pc := sched_opts .priority_class :
106
+ if not _assert_kueue_workloadpriorityclass (pc ):
107
+ raise ValueError (
108
+ f"Specified Kueue workload priority class does not exist: { pc !r} "
109
+ )
110
+
74
111
metadata = client .V1ObjectMeta (
75
112
generate_name = sanitize_rfc1123_domain_name (job .name ),
76
113
labels = remove_none_values (
@@ -121,7 +158,7 @@ def run(self, job: Job, image: Image) -> None:
121
158
_ , active_context = config .list_kube_config_contexts ()
122
159
current_namespace = active_context ["context" ].get ("namespace" )
123
160
124
- k8s_job = self ._make_job_crd (job , image )
161
+ k8s_job = self ._make_job_crd (job , image , current_namespace )
125
162
logging .debug (k8s_job )
126
163
127
164
batch_api = client .BatchV1Api ()
0 commit comments