Skip to content

Commit 0f7c423

Browse files
committed
simplify realloc: accept single container ID
1 parent 7f41a7a commit 0f7c423

File tree

9 files changed

+5997
-3705
lines changed

9 files changed

+5997
-3705
lines changed

cluster/calcium/realloc.go

+88-221
Original file line numberDiff line numberDiff line change
@@ -2,265 +2,132 @@ package calcium
22

33
import (
44
"context"
5-
"sync"
65

76
"github.com/pkg/errors"
87
enginetypes "github.com/projecteru2/core/engine/types"
98
"github.com/projecteru2/core/resources"
109
resourcetypes "github.com/projecteru2/core/resources/types"
1110
"github.com/projecteru2/core/types"
1211
"github.com/projecteru2/core/utils"
13-
log "github.com/sirupsen/logrus"
1412
)
1513

16-
// nodename -> container list
17-
type nodeContainers map[string][]*types.Container
18-
19-
// ReallocResource allow realloc container resource
20-
func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (chan *types.ReallocResourceMessage, error) {
21-
ch := make(chan *types.ReallocResourceMessage)
22-
go func() {
23-
defer close(ch)
24-
if err := c.withContainersLocked(ctx, opts.IDs, func(containers map[string]*types.Container) error {
25-
// Pod-Node-Containers
26-
containersInfo := map[*types.Pod]nodeContainers{}
27-
// Pod cache
28-
podCache := map[string]*types.Pod{}
29-
var err error
30-
for _, container := range containers {
31-
pod, ok := podCache[container.Podname]
32-
if !ok {
33-
pod, err = c.store.GetPod(ctx, container.Podname)
34-
if err != nil {
35-
ch <- &types.ReallocResourceMessage{
36-
ContainerID: container.ID,
37-
Error: err,
38-
}
39-
continue
40-
}
41-
podCache[container.Podname] = pod
42-
containersInfo[pod] = nodeContainers{}
43-
}
44-
if _, ok = containersInfo[pod][container.Nodename]; !ok {
45-
containersInfo[pod][container.Nodename] = []*types.Container{}
46-
}
47-
containersInfo[pod][container.Nodename] = append(containersInfo[pod][container.Nodename], container)
48-
}
49-
50-
wg := sync.WaitGroup{}
51-
wg.Add(len(containersInfo))
52-
// deal with normal container
53-
for _, nodeContainersInfo := range containersInfo {
54-
go func(nodeContainersInfo nodeContainers) {
55-
defer wg.Done()
56-
c.doReallocContainersOnPod(ctx, ch, nodeContainersInfo, opts)
57-
}(nodeContainersInfo)
58-
}
59-
wg.Wait()
60-
return nil
61-
}); err != nil {
62-
log.Errorf("[ReallocResource] Realloc failed %+v", err)
63-
for _, ID := range opts.IDs {
64-
ch <- &types.ReallocResourceMessage{
65-
ContainerID: ID,
66-
Error: err,
67-
}
68-
}
14+
func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (err error) {
15+
return c.withContainerLocked(ctx, opts.ID, func(container *types.Container) error {
16+
rrs, err := resources.MakeRequests(
17+
types.ResourceOptions{
18+
CPUQuotaRequest: container.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest,
19+
CPUQuotaLimit: container.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit,
20+
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(container.CPU) > 0),
21+
MemoryRequest: container.MemoryRequest + opts.ResourceOpts.MemoryRequest,
22+
MemoryLimit: container.MemoryLimit + opts.ResourceOpts.MemoryLimit,
23+
StorageRequest: container.StorageRequest + opts.ResourceOpts.StorageRequest,
24+
StorageLimit: container.StorageLimit + opts.ResourceOpts.StorageLimit,
25+
VolumeRequest: types.MergeVolumeBindings(container.VolumeRequest, opts.ResourceOpts.VolumeRequest),
26+
VolumeLimit: types.MergeVolumeBindings(container.VolumeLimit, opts.ResourceOpts.VolumeLimit),
27+
},
28+
)
29+
if err != nil {
30+
return errors.WithStack(err)
6931
}
70-
}()
71-
return ch, nil
32+
return c.doReallocOnNode(ctx, container.Nodename, container, rrs)
33+
})
7234
}
7335

74-
// group containers by node and requests
75-
func (c *Calcium) doReallocContainersOnPod(ctx context.Context, ch chan *types.ReallocResourceMessage, nodeContainersInfo nodeContainers, opts *types.ReallocOptions) {
76-
hardVbsMap := map[string]types.VolumeBindings{}
77-
containerGroups := map[string]map[resourcetypes.ResourceRequests][]*types.Container{}
78-
for nodename, containers := range nodeContainersInfo {
79-
containerGroups[nodename] = map[resourcetypes.ResourceRequests][]*types.Container{}
80-
for _, container := range containers {
81-
if err := func() (err error) {
82-
var (
83-
autoVbsRequest, autoVbsLimit types.VolumeBindings
84-
rrs resourcetypes.ResourceRequests
85-
)
86-
autoVbsRequest, hardVbsMap[container.ID] = types.MergeVolumeBindings(container.VolumeRequest, opts.ResourceOpts.VolumeRequest, opts.ResourceOpts.Volumes).Divide()
87-
autoVbsLimit, _ = types.MergeVolumeBindings(container.VolumeLimit, opts.ResourceOpts.VolumeLimit, opts.Volumes).Divide()
88-
89-
rrs, err = resources.MakeRequests(
90-
types.ResourceOptions{
91-
CPUQuotaRequest: container.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest + opts.CPU,
92-
CPUQuotaLimit: container.CPUQuotaLimit + opts.CPULimit + opts.CPU,
93-
CPUBind: types.ParseTriOption(opts.BindCPUOpt, len(container.CPURequest) > 0),
94-
MemoryRequest: container.MemoryRequest + opts.MemoryRequest + opts.Memory,
95-
MemoryLimit: container.MemoryLimit + opts.MemoryLimit + opts.Memory,
96-
MemorySoft: types.ParseTriOption(opts.MemoryLimitOpt, container.SoftLimit),
97-
VolumeRequest: autoVbsRequest,
98-
VolumeLimit: autoVbsLimit,
99-
StorageRequest: container.StorageRequest + opts.StorageRequest + opts.Storage,
100-
StorageLimit: container.StorageLimit + opts.StorageLimit + opts.Storage,
101-
})
102-
103-
containerGroups[nodename][rrs] = append(containerGroups[nodename][rrs], container)
104-
return
105-
106-
}(); err != nil {
107-
log.Errorf("[ReallocResource.doReallocContainersOnPod] Realloc failed: %+v", err)
108-
ch <- &types.ReallocResourceMessage{Error: err}
109-
return
110-
}
36+
// transaction: node resource
37+
func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, container *types.Container, rrs resourcetypes.ResourceRequests) error {
38+
return c.withNodeLocked(ctx, nodename, func(node *types.Node) error {
39+
node.RecycleResources(&container.Resource1)
40+
_, total, planMap, err := resources.SelectNodesByResourceRequests(rrs, map[string]*types.Node{node.Name: node})
41+
if err != nil {
42+
return errors.WithStack(err)
11143
}
112-
}
113-
114-
for nodename, containerByApps := range containerGroups {
115-
for rrs, containers := range containerByApps {
116-
if err := c.doReallocContainersOnNode(ctx, ch, nodename, containers, rrs, hardVbsMap); err != nil {
117-
118-
log.Errorf("[ReallocResource.doReallocContainersOnPod] Realloc failed: %+v", err)
119-
ch <- &types.ReallocResourceMessage{Error: err}
120-
}
44+
if total != 1 {
45+
return errors.WithStack(types.ErrInsufficientRes)
12146
}
122-
}
123-
}
12447

125-
// transaction: node meta
126-
func (c *Calcium) doReallocContainersOnNode(ctx context.Context, ch chan *types.ReallocResourceMessage, nodename string, containers []*types.Container, rrs resourcetypes.ResourceRequirements, hardVbsMap map[string]types.VolumeBindings) (err error) {
127-
{
128-
return c.withNodeLocked(ctx, nodename, func(node *types.Node) error {
48+
originalContainer := *container
49+
return utils.Txn(
50+
ctx,
12951

130-
for _, container := range containers {
131-
recycleResources(node, container)
132-
}
133-
planMap, total, _, err := resources.SelectNodes(rrs, map[string]*types.Node{node.Name: node})
134-
if err != nil {
135-
return errors.WithStack(err)
136-
}
137-
if total < len(containers) {
138-
return errors.WithStack(types.ErrInsufficientRes)
139-
}
140-
141-
var (
142-
rollbacks []int
143-
originalContainers []types.Container
144-
)
145-
146-
return utils.Txn(
147-
ctx,
148-
149-
// if: commit changes of realloc resources
150-
func(ctx context.Context) (err error) {
151-
for _, plan := range planMap {
152-
plan.ApplyChangesOnNode(node, utils.Range(len(containers))...)
153-
}
154-
rollbacks = utils.Range(len(containers))
155-
for _, container := range containers {
156-
originalContainers = append(originalContainers, *container)
157-
}
158-
return c.store.UpdateNodes(ctx, node)
159-
},
160-
161-
// then: update instances' resources
162-
func(ctx context.Context) error {
163-
rollbacks, err = c.doUpdateResourceOnInstances(ctx, ch, node, planMap, containers, hardVbsMap)
164-
return err
165-
},
166-
167-
// rollback: back to origin
168-
func(ctx context.Context) error {
169-
for _, plan := range planMap {
170-
plan.RollbackChangesOnNode(node, rollbacks...)
171-
}
172-
for _, idx := range rollbacks {
173-
preserveResources(node, &originalContainers[idx])
174-
}
175-
return c.store.UpdateNodes(ctx, node)
176-
},
177-
c.config.GlobalTimeout,
178-
)
179-
})
180-
}
181-
}
182-
183-
// boundary: chan *types.ReallocResourceMessage
184-
func (c *Calcium) doUpdateResourceOnInstances(ctx context.Context, ch chan *types.ReallocResourceMessage, node *types.Node, planMap map[types.ResourceType]resourcetypes.ResourcePlans, containers []*types.Container, hardVbsMap map[string]types.VolumeBindings) (rollbacks []int, err error) {
185-
wg := sync.WaitGroup{}
186-
wg.Add(len(containers))
187-
188-
for idx, container := range containers {
189-
go func(container *types.Container, idx int) {
190-
var e error
191-
msg := &types.ReallocResourceMessage{ContainerID: container.ID}
192-
defer func() {
193-
if e != nil {
194-
err = e
195-
msg.Error = e
196-
rollbacks = append(rollbacks, idx)
52+
// if commit changes
53+
func(ctx context.Context) (err error) {
54+
for _, plan := range planMap {
55+
plan.ApplyChangesOnNode(node, 1)
19756
}
198-
ch <- msg
199-
wg.Done()
200-
}()
201-
202-
rsc := &types.Resources{}
203-
for _, plan := range planMap {
204-
if e = plan.Dispense(resourcetypes.DispenseOptions{
205-
Node: node,
206-
Index: idx,
207-
ExistingInstances: containers,
208-
HardVolumeBindings: hardVbsMap[container.ID],
209-
}, rsc); e != nil {
210-
return
57+
return c.store.UpdateNodes(ctx, node)
58+
},
59+
60+
// then update workload resources
61+
func(ctx context.Context) error {
62+
return c.doReallocContainersOnInstance(ctx, node, planMap, container)
63+
},
64+
65+
// rollback to origin
66+
func(ctx context.Context) error {
67+
for _, plan := range planMap {
68+
plan.RollbackChangesOnNode(node, 1)
21169
}
212-
}
70+
node.PreserveResources(&originalContainer.Resource1)
71+
return c.store.UpdateNodes(ctx, node)
72+
},
21373

214-
e = c.doUpdateResourceOnInstance(ctx, node, container, *rsc)
215-
}(container, idx)
216-
}
217-
218-
wg.Wait()
219-
return rollbacks, errors.WithStack(err)
74+
c.config.GlobalTimeout,
75+
)
76+
})
22077
}
22178

222-
// transaction: container meta
223-
func (c *Calcium) doUpdateResourceOnInstance(ctx context.Context, node *types.Node, container *types.Container, rsc types.Resources) error {
224-
originContainer := *container
79+
func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types.Node, planMap map[types.ResourceType]resourcetypes.ResourcePlans, container *types.Container) (err error) {
80+
r := &types.Resource1{}
81+
for _, plan := range planMap {
82+
// TODO@zc: single existing instance
83+
// TODO@zc: no HardVolumeBindings
84+
if r, err = plan.Dispense(resourcetypes.DispenseOptions{
85+
Node: node,
86+
Index: 1,
87+
ExistingInstances: []*types.Container{container},
88+
}, r); err != nil {
89+
return
90+
}
91+
}
92+
93+
originalContainer := *container
22594
return utils.Txn(
22695
ctx,
22796

22897
// if: update container meta
22998
func(ctx context.Context) error {
230-
container.CPURequest = rsc.CPURequest
231-
container.QuotaRequest = rsc.CPUQuotaRequest
232-
container.QuotaLimit = rsc.CPUQuotaLimit
233-
container.MemoryRequest = rsc.MemoryRequest
234-
container.MemoryLimit = rsc.MemoryLimit
235-
container.SoftLimit = rsc.MemorySoftLimit
236-
container.VolumeRequest = rsc.VolumeRequest
237-
container.VolumePlanRequest = rsc.VolumePlanRequest
238-
container.VolumeLimit = rsc.VolumeLimit
239-
container.VolumePlanLimit = rsc.VolumePlanLimit
240-
container.StorageRequest = rsc.StorageRequest
241-
container.StorageLimit = rsc.StorageLimit
99+
container.CPUQuotaRequest = r.CPUQuotaRequest
100+
container.CPUQuotaLimit = r.CPUQuotaLimit
101+
container.CPU = r.CPU
102+
container.MemoryRequest = r.MemoryRequest
103+
container.MemoryLimit = r.MemoryLimit
104+
container.VolumeRequest = r.VolumeRequest
105+
container.VolumePlanRequest = r.VolumePlanRequest
106+
container.VolumeLimit = r.VolumeLimit
107+
container.VolumePlanLimit = r.VolumePlanLimit
108+
container.StorageRequest = r.StorageRequest
109+
container.StorageLimit = r.StorageLimit
242110
return errors.WithStack(c.store.UpdateContainer(ctx, container))
243111
},
244112

245113
// then: update container resources
246114
func(ctx context.Context) error {
247115
r := &enginetypes.VirtualizationResource{
248-
CPU: rsc.CPURequest,
249-
Quota: rsc.CPUQuotaLimit,
250-
NUMANode: rsc.NUMANode,
251-
Memory: rsc.MemoryLimit,
252-
SoftLimit: rsc.MemorySoftLimit,
253-
Volumes: rsc.VolumeLimit.ToStringSlice(false, false),
254-
VolumePlan: rsc.VolumePlanLimit.ToLiteral(),
255-
VolumeChanged: rsc.VolumeChanged,
256-
Storage: rsc.StorageLimit,
116+
CPU: r.CPU,
117+
Quota: r.CPUQuotaLimit,
118+
NUMANode: r.NUMANode,
119+
Memory: r.MemoryLimit,
120+
Volumes: r.VolumeLimit.ToStringSlice(false, false),
121+
VolumePlan: r.VolumePlanLimit.ToLiteral(),
122+
VolumeChanged: r.VolumeChanged,
123+
Storage: r.StorageLimit,
257124
}
258125
return errors.WithStack(node.Engine.VirtualizationUpdateResource(ctx, container.ID, r))
259126
},
260127

261128
// rollback: container meta
262129
func(ctx context.Context) error {
263-
return errors.WithStack(c.store.UpdateContainer(ctx, &originContainer))
130+
return errors.WithStack(c.store.UpdateContainer(ctx, &originalContainer))
264131
},
265132

266133
c.config.GlobalTimeout,

cluster/cluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type Cluster interface {
7979
DissociateContainer(ctx context.Context, IDs []string) (chan *types.DissociateContainerMessage, error)
8080
ControlContainer(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlContainerMessage, error)
8181
ExecuteContainer(ctx context.Context, opts *types.ExecuteContainerOptions, inCh <-chan []byte) chan *types.AttachContainerMessage
82-
ReallocResource(ctx context.Context, opts *types.ReallocOptions) (chan *types.ReallocResourceMessage, error)
82+
ReallocResource(ctx context.Context, opts *types.ReallocOptions) error
8383
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)
8484
RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachContainerMessage, error)
8585
// finalizer

resources/cpumem/cpumem.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package cpumem
22

33
import (
4-
"strconv"
5-
64
"github.com/pkg/errors"
75
resourcetypes "github.com/projecteru2/core/resources/types"
86
"github.com/projecteru2/core/scheduler"
@@ -168,12 +166,14 @@ func (rp ResourcePlans) Dispense(opts resourcetypes.DispenseOptions, r *types.Re
168166
r.NUMANode = opts.Node.GetNUMANode(r.CPU)
169167
}
170168

169+
/* TODO@zc: check this
171170
// special handle when converting from cpu-binding to cpu-unbinding
172171
if len(opts.ExistingInstances) > opts.Index && len(opts.ExistingInstances[opts.Index].CPU) > 0 && len(rp.CPUPlans) == 0 {
173172
r.CPU = types.CPUMap{}
174173
for i := 0; i < len(opts.Node.InitCPU); i++ {
175174
r.CPU[strconv.Itoa(i)] = 0
176175
}
177176
}
177+
*/
178178
return r, nil
179179
}

0 commit comments

Comments
 (0)