Skip to content

Commit 653d157

Browse files
Overhaul node resource fixing (#432)
* overhaul node resource fixing * exam monopoly volume usage
1 parent 3e878d3 commit 653d157

File tree

1 file changed

+77
-58
lines changed

1 file changed

+77
-58
lines changed

cluster/calcium/resource.go

+77-58
Original file line numberDiff line numberDiff line change
@@ -67,69 +67,95 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bo
6767
Workloads: workloads, Diffs: []string{},
6868
}
6969

70-
cpus := 0.0
71-
memory := int64(0)
72-
storage := int64(0)
73-
cpumap := types.CPUMap{}
74-
volumes := int64(0)
75-
volumeMap := types.VolumeMap{}
70+
cpuByWorkloads := 0.0
71+
memoryByWorkloads := int64(0)
72+
storageByWorkloads := int64(0) // volume inclusive
73+
cpumapByWorkloads := types.CPUMap{}
74+
volumeByWorkloads := int64(0)
75+
volumeMapByWorkloads := types.VolumeMap{}
76+
monopolyVolumeByWorkloads := map[string][]string{} // volume -> array of workload id
7677
for _, workload := range workloads {
77-
cpus = utils.Round(cpus + workload.CPUQuotaRequest)
78-
memory += workload.MemoryRequest
79-
storage += workload.StorageRequest
80-
cpumap.Add(workload.CPU)
81-
for _, vmap := range workload.VolumePlanRequest {
82-
volumes += vmap.Total()
83-
volumeMap.Add(vmap)
78+
cpuByWorkloads = utils.Round(cpuByWorkloads + workload.CPUQuotaRequest)
79+
memoryByWorkloads += workload.MemoryRequest
80+
storageByWorkloads += workload.StorageRequest
81+
cpumapByWorkloads.Add(workload.CPU)
82+
for vb, vmap := range workload.VolumePlanRequest {
83+
volumeByWorkloads += vmap.Total()
84+
volumeMapByWorkloads.Add(vmap)
85+
if vb.RequireScheduleMonopoly() {
86+
monopolyVolumeByWorkloads[vmap.GetResourceID()] = append(monopolyVolumeByWorkloads[vmap.GetResourceID()], workload.ID)
87+
}
8488
}
8589
}
86-
nr.CPUPercent = cpus / float64(len(node.InitCPU))
87-
nr.MemoryPercent = float64(memory) / float64(node.InitMemCap)
90+
nr.CPUPercent = cpuByWorkloads / float64(len(node.InitCPU))
91+
nr.MemoryPercent = float64(memoryByWorkloads) / float64(node.InitMemCap)
8892
nr.NUMAMemoryPercent = map[string]float64{}
8993
nr.VolumePercent = float64(node.VolumeUsed) / float64(node.InitVolume.Total())
9094
for nodeID, nmemory := range node.NUMAMemory {
9195
if initMemory, ok := node.InitNUMAMemory[nodeID]; ok {
9296
nr.NUMAMemoryPercent[nodeID] = float64(nmemory) / float64(initMemory)
9397
}
9498
}
95-
if cpus != node.CPUUsed {
96-
nr.Diffs = append(nr.Diffs, fmt.Sprintf("cpus used: %f diff: %f", node.CPUUsed, cpus))
99+
100+
// cpu
101+
if cpuByWorkloads != node.CPUUsed {
102+
nr.Diffs = append(nr.Diffs,
103+
fmt.Sprintf("node.CPUUsed != sum(workload.CPURequest): %.2f != %.2f", node.CPUUsed, cpuByWorkloads))
97104
}
98-
node.CPU.Add(cpumap)
105+
node.CPU.Add(cpumapByWorkloads)
99106
for i, v := range node.CPU {
100107
if node.InitCPU[i] != v {
101-
nr.Diffs = append(nr.Diffs, fmt.Sprintf("cpu %s diff %d", i, node.InitCPU[i]-v))
108+
nr.Diffs = append(nr.Diffs,
109+
fmt.Sprintf("\tsum(workload.CPU[%s]) + node.CPU[%s] != node.InitCPU[%s]: %d + %d != %d", i, i, i,
110+
cpumapByWorkloads[i], node.CPU[i]-cpumapByWorkloads[i], node.InitCPU[i]))
102111
}
103112
}
104113

105-
if memory+node.MemCap != node.InitMemCap {
106-
nr.Diffs = append(nr.Diffs, fmt.Sprintf("memory used: %d, diff %d", node.MemCap, node.InitMemCap-(memory+node.MemCap)))
114+
// memory
115+
if memoryByWorkloads+node.MemCap != node.InitMemCap {
116+
nr.Diffs = append(nr.Diffs,
117+
fmt.Sprintf("node.MemCap + sum(workload.memoryRequest) != node.InitMemCap: %d + %d != %d",
118+
node.MemCap, memoryByWorkloads, node.InitMemCap))
107119
}
108120

121+
// storage
109122
nr.StoragePercent = 0
110123
if node.InitStorageCap != 0 {
111-
nr.StoragePercent = float64(storage) / float64(node.InitStorageCap)
112-
if storage+node.StorageCap != node.InitStorageCap {
113-
nr.Diffs = append(nr.Diffs, fmt.Sprintf("storage used: %d, diff %d", node.StorageCap, node.InitStorageCap-(storage+node.StorageCap)))
114-
}
124+
nr.StoragePercent = float64(storageByWorkloads) / float64(node.InitStorageCap)
125+
}
126+
if storageByWorkloads+node.StorageCap != node.InitStorageCap {
127+
nr.Diffs = append(nr.Diffs, fmt.Sprintf("sum(workload.storageRequest) + node.StorageCap != node.InitStorageCap: %d + %d != %d", storageByWorkloads, node.StorageCap, node.InitStorageCap))
115128
}
116129

117-
if node.VolumeUsed != volumes {
118-
nr.Diffs = append(nr.Diffs, fmt.Sprintf("volumes used: %d diff: %d", node.VolumeUsed, volumes))
130+
// volume
131+
if node.VolumeUsed != volumeByWorkloads {
132+
nr.Diffs = append(nr.Diffs, fmt.Sprintf("node.VolumeUsed != sum(workload.VolumeRequest): %d != %d", node.VolumeUsed, volumeByWorkloads))
119133
}
120-
node.Volume.Add(volumeMap)
121-
for vol, cap := range node.Volume {
122-
if node.InitVolume[vol] != cap {
123-
nr.Diffs = append(nr.Diffs, fmt.Sprintf("volume %s diff %d", vol, node.InitVolume[vol]-cap))
134+
node.Volume.Add(volumeMapByWorkloads)
135+
for i, v := range node.Volume {
136+
if node.InitVolume[i] != v {
137+
nr.Diffs = append(nr.Diffs, fmt.Sprintf("\tsum(workload.Volume[%s]) + node.Volume[%s] != node.InitVolume[%s]: %d + %d != %d",
138+
i, i, i,
139+
volumeMapByWorkloads[i], node.Volume[i]-volumeMapByWorkloads[i], node.InitVolume[i]))
124140
}
125141
}
142+
for vol, ids := range monopolyVolumeByWorkloads {
143+
if len(ids) > 1 {
144+
nr.Diffs = append(nr.Diffs, fmt.Sprintf("\tmonopoly volume used by multiple workloads: %s, %+v", vol, ids))
145+
}
146+
}
147+
148+
// volume and storage
149+
if node.InitStorageCap < node.InitVolume.Total() {
150+
nr.Diffs = append(nr.Diffs, fmt.Sprintf("init storage < init volumes: %d < %d", node.InitStorageCap, node.InitVolume.Total()))
151+
}
126152

127-
if err := node.Engine.ResourceValidate(ctx, cpus, cpumap, memory, storage); err != nil {
153+
if err := node.Engine.ResourceValidate(ctx, cpuByWorkloads, cpumapByWorkloads, memoryByWorkloads, storageByWorkloads); err != nil {
128154
nr.Diffs = append(nr.Diffs, err.Error())
129155
}
130156

131157
if fix {
132-
if err := c.doFixDiffResource(ctx, node, cpus, memory, storage, volumes); err != nil {
158+
if err := c.doFixDiffResource(ctx, node, cpuByWorkloads, memoryByWorkloads, storageByWorkloads, volumeByWorkloads); err != nil {
133159
log.Warnf(ctx, "[doGetNodeResource] fix node resource failed %v", err)
134160
}
135161
}
@@ -138,32 +164,25 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bo
138164
})
139165
}
140166

141-
func (c *Calcium) doFixDiffResource(ctx context.Context, node *types.Node, cpus float64, memory, storage, volumes int64) error {
167+
func (c *Calcium) doFixDiffResource(ctx context.Context, node *types.Node, cpuByWorkloads float64, memoryByWorkloads, storageByWorkloads, volumeByWorkloads int64) (err error) {
142168
var n *types.Node
143-
var err error
144-
return utils.Txn(ctx,
145-
func(ctx context.Context) error {
146-
if n, err = c.GetNode(ctx, node.Name); err != nil {
147-
return err
148-
}
149-
n.CPUUsed = cpus
150-
for i, v := range node.CPU {
151-
n.CPU[i] += node.InitCPU[i] - v
152-
}
153-
n.MemCap += node.InitMemCap - (memory + node.MemCap)
154-
n.StorageCap += node.InitStorageCap - (storage + node.StorageCap)
155-
n.VolumeUsed = volumes
156-
for vol, cap := range node.Volume {
157-
n.Volume[vol] += node.InitVolume[vol] - cap
158-
}
159-
return nil
160-
},
161-
func(ctx context.Context) error {
162-
return errors.WithStack(c.store.UpdateNodes(ctx, n))
163-
},
164-
nil,
165-
c.config.GlobalTimeout,
166-
)
169+
if n, err = c.GetNode(ctx, node.Name); err != nil {
170+
return err
171+
}
172+
n.CPUUsed = cpuByWorkloads
173+
for i, v := range node.CPU {
174+
n.CPU[i] += node.InitCPU[i] - v
175+
}
176+
n.MemCap = node.InitMemCap - memoryByWorkloads
177+
if n.InitStorageCap < n.InitVolume.Total() {
178+
n.InitStorageCap = n.InitVolume.Total()
179+
}
180+
n.StorageCap = n.InitStorageCap - storageByWorkloads
181+
n.VolumeUsed = volumeByWorkloads
182+
for i, v := range node.Volume {
183+
n.Volume[i] += node.InitVolume[i] - v
184+
}
185+
return errors.WithStack(c.store.UpdateNodes(ctx, n))
167186
}
168187

169188
func (c *Calcium) doAllocResource(ctx context.Context, nodeMap map[string]*types.Node, opts *types.DeployOptions) ([]resourcetypes.ResourcePlans, map[string]int, error) {

0 commit comments

Comments
 (0)