Skip to content

Commit 6d2d3ef

Browse files
jschwinger233CMGS
authored andcommitted
cluster layer calls remap on resource changes
1 parent 2ec7b86 commit 6d2d3ef

File tree

8 files changed

+99
-21
lines changed

8 files changed

+99
-21
lines changed

cluster/calcium/build.go

+2
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types
121121
break
122122
}
123123
if err == context.Canceled || err == context.DeadlineExceeded {
124+
log.Errorf("[BuildImage] context timeout")
124125
lastMessage.ErrorDetail.Code = -1
126+
lastMessage.ErrorDetail.Message = err.Error()
125127
lastMessage.Error = err.Error()
126128
break
127129
}

cluster/calcium/create.go

+8
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,14 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
214214
err = e
215215
indices = utils.Range(deploy)
216216
}
217+
218+
// remap 就不搞进事务了吧, 回滚代价太大了
219+
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的
220+
// 而且 remap 是一个幂等操作, 就算这次 remap 失败, 下次 remap 也能收敛到正确到状态
221+
c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
222+
c.doRemapResourceAndLog(ctx, logger, node)
223+
return nil
224+
})
217225
return indices, errors.WithStack(err)
218226
}
219227

cluster/calcium/dissociate.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,30 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
1616
ch := make(chan *types.DissociateWorkloadMessage)
1717
go func() {
1818
defer close(ch)
19+
// TODO@zc: group ids by node
1920
for _, id := range ids {
2021
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
2122
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
22-
return utils.Txn(
23+
err = utils.Txn(
2324
ctx,
2425
// if
2526
func(ctx context.Context) error {
26-
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
27+
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
28+
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
2729
},
2830
// then
2931
func(ctx context.Context) error {
30-
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
31-
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
32+
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
3233
},
3334
// rollback
34-
nil,
35+
func(ctx context.Context, _ bool) error {
36+
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
37+
},
3538
c.config.GlobalTimeout,
3639
)
40+
41+
c.doRemapResourceAndLog(ctx, logger, node)
42+
return err
3743
})
3844
})
3945
if err != nil {

cluster/calcium/realloc.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption
4040

4141
// transaction: node resource
4242
func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload *types.Workload, rrs resourcetypes.ResourceRequests) error {
43-
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
43+
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) (err error) {
4444
node.RecycleResources(&workload.ResourceMeta)
4545
plans, err := resources.SelectNodesByResourceRequests(rrs, map[string]*types.Node{node.Name: node})
4646
if err != nil {
@@ -49,7 +49,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload
4949

5050
originalWorkload := *workload
5151
resourceMeta := &types.ResourceMeta{}
52-
return utils.Txn(
52+
if err = utils.Txn(
5353
ctx,
5454

5555
// if update workload resources
@@ -96,7 +96,12 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload
9696
},
9797

9898
c.config.GlobalTimeout,
99-
)
99+
); err != nil {
100+
return
101+
}
102+
103+
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
104+
return nil
100105
})
101106
}
102107

cluster/calcium/remove.go

+23-9
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,32 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
3333
ret := &types.RemoveWorkloadMessage{WorkloadID: id, Success: false, Hook: []*bytes.Buffer{}}
3434
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
3535
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
36-
return utils.Txn(
36+
if err = utils.Txn(
3737
ctx,
3838
// if
3939
func(ctx context.Context) error {
40-
return errors.WithStack(c.doRemoveWorkload(ctx, workload, force))
40+
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
4141
},
4242
// then
4343
func(ctx context.Context) error {
44-
log.Infof("[RemoveWorkload] Workload %s removed", workload.ID)
45-
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
44+
err := errors.WithStack(c.doRemoveWorkload(ctx, workload, force))
45+
if err != nil {
46+
log.Infof("[RemoveWorkload] Workload %s removed", workload.ID)
47+
}
48+
return err
4649
},
4750
// rollback
48-
nil,
51+
func(ctx context.Context, _ bool) error {
52+
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
53+
},
4954
c.config.GlobalTimeout,
50-
)
55+
); err != nil {
56+
return
57+
}
58+
59+
// TODO@zc: 优化一下, 先按照 node 聚合 ids
60+
c.doRemapResourceAndLog(ctx, logger, node)
61+
return
5162
})
5263
}); err != nil {
5364
logger.Errorf("[RemoveWorkload] Remove workload %s failed, err: %+v", id, err)
@@ -66,19 +77,22 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
6677
return ch, nil
6778
}
6879

80+
// semantic: instance removed on err == nil, instance remained on err != nil
6981
func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload, force bool) error {
7082
return utils.Txn(
7183
ctx,
7284
// if
7385
func(ctx context.Context) error {
74-
return errors.WithStack(workload.Remove(ctx, force))
86+
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
7587
},
7688
// then
7789
func(ctx context.Context) error {
78-
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
90+
return errors.WithStack(workload.Remove(ctx, force))
7991
},
8092
// rollback
81-
nil,
93+
func(ctx context.Context, _ bool) error {
94+
return errors.WithStack(c.store.AddWorkload(ctx, workload))
95+
},
8296
c.config.GlobalTimeout,
8397
)
8498

cluster/calcium/replace.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (c *Calcium) doReplaceWorkload(
153153
VolumePlanLimit: workload.VolumePlanLimit,
154154
},
155155
}
156-
return createMessage, removeMessage, utils.Txn(
156+
if err = utils.Txn(
157157
ctx,
158158
// if
159159
func(ctx context.Context) (err error) {
@@ -194,7 +194,16 @@ func (c *Calcium) doReplaceWorkload(
194194
return errors.WithStack(err)
195195
},
196196
c.config.GlobalTimeout,
197-
)
197+
); err != nil {
198+
return createMessage, removeMessage, err
199+
}
200+
201+
c.withNodeLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
202+
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReplaceWorkload"), node)
203+
return nil
204+
})
205+
206+
return createMessage, removeMessage, err
198207
}
199208

200209
func (c *Calcium) doMakeReplaceWorkloadOptions(no int, msg *types.CreateWorkloadMessage, opts *types.DeployOptions, node *types.Node, ancestorWorkloadID string) *enginetypes.VirtualizationCreateOptions {

cluster/calcium/resource.go

+34
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/pkg/errors"
88
"github.com/projecteru2/core/log"
99

10+
enginetypes "github.com/projecteru2/core/engine/types"
1011
resourcetypes "github.com/projecteru2/core/resources/types"
1112
"github.com/projecteru2/core/strategy"
1213
"github.com/projecteru2/core/types"
@@ -160,3 +161,36 @@ func (c *Calcium) doAllocResource(ctx context.Context, nodeMap map[string]*types
160161
log.Infof("[Calium.doAllocResource] deployMap: %+v", deployMap)
161162
return plans, deployMap, nil
162163
}
164+
165+
// called on changes of resource binding, such as cpu binding
166+
// as an internal api, remap doesn't lock node, the responsibility of that should be taken on by caller
167+
func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch <-chan enginetypes.VirtualizationRemapMessage, err error) {
168+
workloads, err := c.store.ListNodeWorkloads(ctx, node.Name, nil)
169+
if err != nil {
170+
return
171+
}
172+
remapOpts := &enginetypes.VirtualizationRemapOptions{
173+
CPUAvailable: node.CPU,
174+
CPUInit: node.InitCPU,
175+
CPUShareBase: int64(c.config.Scheduler.ShareBase),
176+
WorkloadResources: make(map[string]enginetypes.VirtualizationResource),
177+
}
178+
for _, workload := range workloads {
179+
remapOpts.WorkloadResources[workload.ID] = enginetypes.VirtualizationResource{
180+
CPU: workload.CPU,
181+
Quota: workload.CPUQuotaLimit,
182+
NUMANode: workload.NUMANode,
183+
}
184+
}
185+
ch, err = node.Engine.VirtualizationResourceRemap(ctx, remapOpts)
186+
return ch, errors.WithStack(err)
187+
}
188+
189+
func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, node *types.Node) {
190+
logger = logger.WithField("Calcium", "doRemapResourceIrresponsibly").WithField("nodename", node.Name)
191+
if ch, err := c.remapResource(ctx, node); logger.Err(err) == nil {
192+
for msg := range ch {
193+
logger.WithField("id", msg.ID).Err(msg.Error)
194+
}
195+
}
196+
}

engine/types/virtualization.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,6 @@ type VirtualizationRemapOptions struct {
7777
}
7878

7979
type VirtualizationRemapMessage struct {
80-
ID string
81-
VirtualizationResource
80+
ID string
81+
Error error
8282
}

0 commit comments

Comments
 (0)