Skip to content

Commit 65375d5

Browse files
committed
Record Inqueue job resource request in queueAttr
1 parent 6f5c977 commit 65375d5

File tree

3 files changed

+22
-21
lines changed

3 files changed

+22
-21
lines changed

pkg/scheduler/actions/enqueue/enqueue.go

-7
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) {
6868

6969
queueMap[queue.UID] = queue
7070
queues.Push(queue)
71-
ssn.InqueueJobResource[queue.UID] = api.EmptyResource()
7271
}
7372

7473
if job.PodGroup.Status.Phase == scheduling.PodGroupPending {
@@ -78,11 +77,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) {
7877
klog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
7978
jobsMap[job.Queue].Push(job)
8079
}
81-
82-
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
83-
klog.V(3).Infof("Added Job <%s/%s> into InqueueResource", job.Namespace, job.Name)
84-
ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources))
85-
}
8680
}
8781

8882
klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))
@@ -129,7 +123,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) {
129123
if inqueue {
130124
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
131125
ssn.Jobs[job.UID] = job
132-
ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources))
133126
}
134127

135128
// Added Queue back until no job in Queue.

pkg/scheduler/framework/session.go

+7-10
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,10 @@ type Session struct {
4242

4343
podGroupStatus map[api.JobID]*scheduling.PodGroupStatus
4444

45-
Jobs map[api.JobID]*api.JobInfo
46-
Nodes map[string]*api.NodeInfo
47-
Queues map[api.QueueID]*api.QueueInfo
48-
NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo
49-
InqueueJobResource map[api.QueueID]*api.Resource
45+
Jobs map[api.JobID]*api.JobInfo
46+
Nodes map[string]*api.NodeInfo
47+
Queues map[api.QueueID]*api.QueueInfo
48+
NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo
5049

5150
Tiers []conf.Tier
5251
Configurations []conf.Configuration
@@ -80,10 +79,9 @@ func openSession(cache cache.Cache) *Session {
8079

8180
podGroupStatus: map[api.JobID]*scheduling.PodGroupStatus{},
8281

83-
Jobs: map[api.JobID]*api.JobInfo{},
84-
Nodes: map[string]*api.NodeInfo{},
85-
Queues: map[api.QueueID]*api.QueueInfo{},
86-
InqueueJobResource: map[api.QueueID]*api.Resource{},
82+
Jobs: map[api.JobID]*api.JobInfo{},
83+
Nodes: map[string]*api.NodeInfo{},
84+
Queues: map[api.QueueID]*api.QueueInfo{},
8785

8886
plugins: map[string]Plugin{},
8987
jobOrderFns: map[string]api.CompareFn{},
@@ -155,7 +153,6 @@ func closeSession(ssn *Session) {
155153
ssn.jobOrderFns = nil
156154
ssn.namespaceOrderFns = nil
157155
ssn.queueOrderFns = nil
158-
ssn.InqueueJobResource = nil
159156

160157
klog.V(3).Infof("Close Session %v", ssn.UID)
161158
}

pkg/scheduler/plugins/proportion/proportion.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package proportion
1818

1919
import (
2020
"k8s.io/klog"
21+
22+
"volcano.sh/volcano/pkg/apis/scheduling"
2123
"volcano.sh/volcano/pkg/scheduler/api"
2224
"volcano.sh/volcano/pkg/scheduler/api/helpers"
2325
"volcano.sh/volcano/pkg/scheduler/framework"
@@ -43,6 +45,8 @@ type queueAttr struct {
4345
deserved *api.Resource
4446
allocated *api.Resource
4547
request *api.Resource
48+
// inqueue represents the resource request of the inqueue job
49+
inqueue *api.Resource
4650
}
4751

4852
// New return proportion action
@@ -69,7 +73,6 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
6973
// Build attributes for Queues.
7074
for _, job := range ssn.Jobs {
7175
klog.V(4).Infof("Considering Job <%s/%s>.", job.Namespace, job.Name)
72-
7376
if _, found := pp.queueOpts[job.Queue]; !found {
7477
queue := ssn.Queues[job.Queue]
7578
attr := &queueAttr{
@@ -80,25 +83,29 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
8083
deserved: api.EmptyResource(),
8184
allocated: api.EmptyResource(),
8285
request: api.EmptyResource(),
86+
inqueue: api.EmptyResource(),
8387
}
8488
pp.queueOpts[job.Queue] = attr
8589
klog.V(4).Infof("Added Queue <%s> attributes.", job.Queue)
8690
}
8791

92+
attr := pp.queueOpts[job.Queue]
8893
for status, tasks := range job.TaskStatusIndex {
8994
if api.AllocatedStatus(status) {
9095
for _, t := range tasks {
91-
attr := pp.queueOpts[job.Queue]
9296
attr.allocated.Add(t.Resreq)
9397
attr.request.Add(t.Resreq)
9498
}
9599
} else if status == api.Pending {
96100
for _, t := range tasks {
97-
attr := pp.queueOpts[job.Queue]
98101
attr.request.Add(t.Resreq)
99102
}
100103
}
101104
}
105+
106+
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
107+
attr.inqueue.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
108+
}
102109
}
103110

104111
// Record metrics
@@ -242,7 +249,11 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
242249

243250
minReq := api.NewResource(*job.PodGroup.Spec.MinResources)
244251
// The queue resource quota limit has not reached
245-
return minReq.Add(attr.allocated).Add(ssn.InqueueJobResource[job.Queue]).LessEqual(api.NewResource(queue.Queue.Spec.Capability))
252+
inqueue := minReq.Add(attr.allocated).Add(attr.inqueue).LessEqual(api.NewResource(queue.Queue.Spec.Capability))
253+
if inqueue {
254+
attr.inqueue.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
255+
}
256+
return inqueue
246257
})
247258

248259
// Register event handlers.

0 commit comments

Comments
 (0)