Skip to content

Commit 21bc787

Browse files
jschwinger233CMGS
authored andcommitted
bulky operate remove and diss by node
1 parent 8754425 commit 21bc787

File tree

2 files changed

+110
-62
lines changed

2 files changed

+110
-62
lines changed

cluster/calcium/dissociate.go

+49-18
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,61 @@ import (
1313
// DissociateWorkload dissociate workload from eru, return it resource but not modity it
1414
func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error) {
1515
logger := log.WithField("Caliucm", "Dissociate").WithField("ids", ids)
16+
17+
nodeWorkloadGroup, err := c.groupWorkloadsByNode(ctx, ids)
18+
if err != nil {
19+
logger.Errorf("failed to group workloads by node: %+v", err)
20+
return nil, errors.WithStack(err)
21+
}
22+
1623
ch := make(chan *types.DissociateWorkloadMessage)
1724
go func() {
1825
defer close(ch)
19-
// TODO@zc: group ids by node
26+
27+
for nodename, workloadIDs := range nodeWorkloadGroup {
28+
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
29+
for _, workloadID := range workloadIDs {
30+
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
31+
msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID}
32+
if err := utils.Txn(
33+
ctx,
34+
// if
35+
func(ctx context.Context) error {
36+
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
37+
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
38+
},
39+
// then
40+
func(ctx context.Context) error {
41+
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
42+
},
43+
// rollback
44+
func(ctx context.Context, failedByCond bool) error {
45+
if failedByCond {
46+
return nil
47+
}
48+
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
49+
},
50+
c.config.GlobalTimeout,
51+
); err != nil {
52+
msg.Error = err
53+
logger.WithField("id", workloadID).Errorf("failed to diss workload: %+v", err)
54+
}
55+
ch <- msg
56+
return nil
57+
}); err != nil {
58+
logger.WithField("id", workloadID).Errorf("failed to lock workload: %+v", err)
59+
}
60+
}
61+
c.doRemapResourceAndLog(ctx, logger, node)
62+
return nil
63+
}); err != nil {
64+
logger.WithField("nodename", nodename).Errorf("failed to lock node: %+v", err)
65+
}
66+
}
67+
2068
for _, id := range ids {
2169
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
2270
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
23-
err = utils.Txn(
24-
ctx,
25-
// if
26-
func(ctx context.Context) error {
27-
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
28-
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
29-
},
30-
// then
31-
func(ctx context.Context) error {
32-
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
33-
},
34-
// rollback
35-
func(ctx context.Context, _ bool) error {
36-
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
37-
},
38-
c.config.GlobalTimeout,
39-
)
4071

4172
c.doRemapResourceAndLog(ctx, logger, node)
4273
return err

cluster/calcium/remove.go

+61-44
Original file line numberDiff line numberDiff line change
@@ -17,61 +17,66 @@ import (
1717
// returns a channel that contains removing responses
1818
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
1919
logger := log.WithField("Calcium", "RemoveWorkload").WithField("ids", ids).WithField("force", force).WithField("step", step)
20-
ch := make(chan *types.RemoveWorkloadMessage)
21-
if step < 1 {
22-
step = 1
20+
21+
nodeWorkloadGroup, err := c.groupWorkloadsByNode(ctx, ids)
22+
if err != nil {
23+
logger.Errorf("failed to group workloads by node: %+v", err)
24+
return nil, errors.WithStack(err)
2325
}
2426

27+
ch := make(chan *types.RemoveWorkloadMessage)
2528
go func() {
2629
defer close(ch)
2730
wg := sync.WaitGroup{}
2831
defer wg.Wait()
29-
for i, id := range ids {
32+
for nodename, workloadIDs := range nodeWorkloadGroup {
3033
wg.Add(1)
31-
go func(id string) {
34+
go func(nodename string, workloadIDs []string) {
3235
defer wg.Done()
33-
ret := &types.RemoveWorkloadMessage{WorkloadID: id, Success: false, Hook: []*bytes.Buffer{}}
34-
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
35-
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
36-
if err = utils.Txn(
37-
ctx,
38-
// if
39-
func(ctx context.Context) error {
40-
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
41-
},
42-
// then
43-
func(ctx context.Context) error {
44-
err := errors.WithStack(c.doRemoveWorkload(ctx, workload, force))
45-
if err != nil {
46-
log.Infof("[RemoveWorkload] Workload %s removed", workload.ID)
47-
}
48-
return err
49-
},
50-
// rollback
51-
func(ctx context.Context, _ bool) error {
52-
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
53-
},
54-
c.config.GlobalTimeout,
55-
); err != nil {
56-
return
57-
}
36+
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
37+
for _, workloadID := range workloadIDs {
38+
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
39+
ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}}
40+
if err := utils.Txn(
41+
ctx,
42+
// if
43+
func(ctx context.Context) error {
44+
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
45+
},
46+
// then
47+
func(ctx context.Context) error {
48+
err := errors.WithStack(c.doRemoveWorkload(ctx, workload, force))
49+
if err != nil {
50+
log.Infof("[RemoveWorkload] Workload %s removed", workload.ID)
51+
}
52+
return err
53+
},
54+
// rollback
55+
func(ctx context.Context, failedByCond bool) error {
56+
if failedByCond {
57+
return nil
58+
}
59+
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
60+
},
61+
c.config.GlobalTimeout,
62+
); err != nil {
63+
logger.WithField("id", workloadID).Errorf("[RemoveWorkload] Remove workload failed: %+v", err)
64+
ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error()))
65+
ret.Success = false
66+
}
5867

59-
// TODO@zc: 优化一下, 先按照 node 聚合 ids
60-
c.doRemapResourceAndLog(ctx, logger, node)
61-
return
62-
})
68+
ch <- ret
69+
return nil
70+
}); err != nil {
71+
logger.WithField("id", workloadID).Errorf("failed to lock workload: %+v", err)
72+
}
73+
}
74+
c.doRemapResourceAndLog(ctx, logger, node)
75+
return nil
6376
}); err != nil {
64-
logger.Errorf("[RemoveWorkload] Remove workload %s failed, err: %+v", id, err)
65-
ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error()))
66-
} else {
67-
ret.Success = true
77+
logger.WithField("nodename", nodename).Errorf("failed to lock node: %+v", err)
6878
}
69-
ch <- ret
70-
}(id)
71-
if (i+1)%step == 0 {
72-
log.Info("[RemoveWorkload] Wait for previous tasks done")
73-
wg.Wait()
74-
}
79+
}(nodename, workloadIDs)
7580
}
7681
}()
7782
return ch, nil
@@ -110,3 +115,15 @@ func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, ids []string) error
110115
}
111116
return nil
112117
}
118+
119+
func (c *Calcium) groupWorkloadsByNode(ctx context.Context, ids []string) (map[string][]string, error) {
120+
workloads, err := c.store.GetWorkloads(ctx, ids)
121+
if err != nil {
122+
return nil, errors.WithStack(err)
123+
}
124+
nodeWorkloadGroup := map[string][]string{}
125+
for _, workload := range workloads {
126+
nodeWorkloadGroup[workload.Nodename] = append(nodeWorkloadGroup[workload.Nodename], workload.ID)
127+
}
128+
return nodeWorkloadGroup, nil
129+
}

0 commit comments

Comments
 (0)