Skip to content

Commit 555c243

Browse files
committed
use ants pool to create a global pool
1 parent f963a86 commit 555c243

22 files changed

+239
-226
lines changed

cluster/calcium/calcium.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"strings"
66
"testing"
77

8+
"github.com/panjf2000/ants/v2"
89
"github.com/projecteru2/core/cluster"
910
"github.com/projecteru2/core/discovery"
1011
"github.com/projecteru2/core/discovery/helium"
@@ -17,6 +18,7 @@ import (
1718
"github.com/projecteru2/core/source/gitlab"
1819
"github.com/projecteru2/core/store"
1920
"github.com/projecteru2/core/types"
21+
"github.com/projecteru2/core/utils"
2022
"github.com/projecteru2/core/wal"
2123

2224
"github.com/pkg/errors"
@@ -30,6 +32,7 @@ type Calcium struct {
3032
source source.Source
3133
watcher discovery.Service
3234
wal wal.WAL
35+
pool *ants.PoolWithFunc
3336
identifier string
3437
}
3538

@@ -85,7 +88,13 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
8588
return nil, err
8689
}
8790

88-
cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, rmgr: rmgr}
91+
// init pool
92+
pool, err := utils.NewPool(config.MaxConcurrency)
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, rmgr: rmgr, pool: pool}
8998

9099
cal.wal, err = enableWAL(config, cal, store)
91100
if err != nil {
@@ -110,6 +119,7 @@ func (c *Calcium) DisasterRecover(ctx context.Context) {
110119
// Finalizer use for defer
111120
func (c *Calcium) Finalizer() {
112121
// TODO some resource recycle
122+
c.pool.Release()
113123
}
114124

115125
// GetIdentifier returns the identifier of calcium

cluster/calcium/calcium_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
sourcemocks "github.com/projecteru2/core/source/mocks"
1616
storemocks "github.com/projecteru2/core/store/mocks"
1717
"github.com/projecteru2/core/types"
18+
"github.com/projecteru2/core/utils"
1819
"github.com/projecteru2/core/wal"
1920
walmocks "github.com/projecteru2/core/wal/mocks"
2021
)
@@ -25,7 +26,8 @@ func NewTestCluster() *Calcium {
2526
panic(err)
2627
}
2728

28-
c := &Calcium{}
29+
pool, _ := utils.NewPool(20)
30+
c := &Calcium{pool: pool}
2931
c.config = types.Config{
3032
GlobalTimeout: 30 * time.Second,
3133
Git: types.GitConfig{

cluster/calcium/control.go

+35-33
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package calcium
33
import (
44
"bytes"
55
"context"
6+
"sync"
67

78
"github.com/projecteru2/core/cluster"
89
"github.com/projecteru2/core/log"
@@ -19,45 +20,46 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
1920

2021
utils.SentryGo(func() {
2122
defer close(ch)
22-
pool := utils.NewGoroutinePool(int(c.config.MaxConcurrency))
23+
wg := &sync.WaitGroup{}
24+
wg.Add(len(ids))
2325
for _, id := range ids {
24-
pool.Go(ctx, func(id string) func() {
25-
return func() {
26-
var message []*bytes.Buffer
27-
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
28-
var err error
29-
switch t {
30-
case cluster.WorkloadStop:
31-
message, err = c.doStopWorkload(ctx, workload, force)
32-
return err
33-
case cluster.WorkloadStart:
34-
message, err = c.doStartWorkload(ctx, workload, force)
35-
return err
36-
case cluster.WorkloadRestart:
37-
message, err = c.doStopWorkload(ctx, workload, force)
38-
if err != nil {
39-
return err
40-
}
41-
startHook, err := c.doStartWorkload(ctx, workload, force)
42-
message = append(message, startHook...)
26+
id := id
27+
_ = c.pool.Invoke(func() {
28+
defer wg.Done()
29+
var message []*bytes.Buffer
30+
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
31+
var err error
32+
switch t {
33+
case cluster.WorkloadStop:
34+
message, err = c.doStopWorkload(ctx, workload, force)
35+
return err
36+
case cluster.WorkloadStart:
37+
message, err = c.doStartWorkload(ctx, workload, force)
38+
return err
39+
case cluster.WorkloadRestart:
40+
message, err = c.doStopWorkload(ctx, workload, force)
41+
if err != nil {
4342
return err
4443
}
45-
return errors.WithStack(types.ErrUnknownControlType)
46-
})
47-
if err == nil {
48-
log.Infof(ctx, "[ControlWorkload] Workload %s %s", id, t)
49-
log.Info("[ControlWorkload] Hook Output:")
50-
log.Info(string(utils.MergeHookOutputs(message)))
51-
}
52-
ch <- &types.ControlWorkloadMessage{
53-
WorkloadID: id,
54-
Error: logger.ErrWithTracing(ctx, err),
55-
Hook: message,
44+
startHook, err := c.doStartWorkload(ctx, workload, force)
45+
message = append(message, startHook...)
46+
return err
5647
}
48+
return errors.WithStack(types.ErrUnknownControlType)
49+
})
50+
if err == nil {
51+
log.Infof(ctx, "[ControlWorkload] Workload %s %s", id, t)
52+
log.Info("[ControlWorkload] Hook Output:")
53+
log.Info(string(utils.MergeHookOutputs(message)))
54+
}
55+
ch <- &types.ControlWorkloadMessage{
56+
WorkloadID: id,
57+
Error: logger.ErrWithTracing(ctx, err),
58+
Hook: message,
5759
}
58-
}(id))
60+
})
5961
}
60-
pool.Wait(ctx)
62+
wg.Wait()
6163
})
6264

6365
return ch, nil

cluster/calcium/create.go

+25-23
Original file line numberDiff line numberDiff line change
@@ -229,40 +229,42 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context,
229229
return utils.Range(deploy), err
230230
}
231231

232-
pool, appendLock := utils.NewGoroutinePool(int(c.config.MaxConcurrency)), sync.Mutex{}
232+
appendLock := sync.Mutex{}
233+
wg := &sync.WaitGroup{}
234+
wg.Add(deploy)
233235
for idx := 0; idx < deploy; idx++ {
236+
idx := idx
234237
createMsg := &types.CreateWorkloadMessage{
235238
Podname: opts.Podname,
236239
Nodename: nodename,
237240
Publish: map[string][]string{},
238241
}
239242

240-
pool.Go(ctx, func(idx int) func() {
241-
return func() {
242-
var e error
243-
defer func() {
244-
if e != nil {
245-
err = e
246-
createMsg.Error = logger.ErrWithTracing(ctx, e)
247-
appendLock.Lock()
248-
indices = append(indices, idx)
249-
appendLock.Unlock()
250-
}
251-
ch <- createMsg
252-
}()
253-
254-
createMsg.EngineArgs = engineArgs[idx]
255-
createMsg.ResourceArgs = map[string]types.WorkloadResourceArgs{}
256-
for k, v := range resourceArgs[idx] {
257-
createMsg.ResourceArgs[k] = v
243+
_ = c.pool.Invoke(func() {
244+
defer wg.Done()
245+
var e error
246+
defer func() {
247+
if e != nil {
248+
err = e
249+
createMsg.Error = logger.ErrWithTracing(ctx, e)
250+
appendLock.Lock()
251+
indices = append(indices, idx)
252+
appendLock.Unlock()
258253
}
254+
ch <- createMsg
255+
}()
259256

260-
createOpts := c.doMakeWorkloadOptions(ctx, seq+idx, createMsg, opts, node)
261-
e = c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, true)
257+
createMsg.EngineArgs = engineArgs[idx]
258+
createMsg.ResourceArgs = map[string]types.WorkloadResourceArgs{}
259+
for k, v := range resourceArgs[idx] {
260+
createMsg.ResourceArgs[k] = v
262261
}
263-
}(idx))
262+
263+
createOpts := c.doMakeWorkloadOptions(ctx, seq+idx, createMsg, opts, node)
264+
e = c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, true)
265+
})
264266
}
265-
pool.Wait(ctx)
267+
wg.Wait()
266268

267269
// remap 就不搞进事务了吧, 回滚代价太大了
268270
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的

cluster/calcium/node.go

+17-15
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package calcium
22

33
import (
44
"context"
5+
"sync"
56

67
enginefactory "github.com/projecteru2/core/engine/factory"
78
enginetypes "github.com/projecteru2/core/engine/types"
@@ -107,26 +108,27 @@ func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions
107108
return ch, logger.ErrWithTracing(ctx, errors.WithStack(err))
108109
}
109110

110-
pool := utils.NewGoroutinePool(int(c.config.MaxConcurrency))
111-
go func() {
111+
utils.SentryGo(func() {
112+
wg := &sync.WaitGroup{}
113+
wg.Add(len(nodes))
112114
defer close(ch)
113115
for _, node := range nodes {
114-
pool.Go(ctx, func(node *types.Node) func() {
115-
return func() {
116-
if err := c.getNodeResourceInfo(ctx, node, nil); err != nil {
117-
logger.Errorf(ctx, "failed to get node %v resource info: %+v", node.Name, err)
118-
}
119-
if opts.CallInfo {
120-
if err := node.Info(ctx); err != nil {
121-
logger.Errorf(ctx, "failed to get node %v info: %+v", node.Name, err)
122-
}
116+
node := node
117+
_ = c.pool.Invoke(func() {
118+
defer wg.Done()
119+
if err := c.getNodeResourceInfo(ctx, node, nil); err != nil {
120+
logger.Errorf(ctx, "failed to get node %v resource info: %+v", node.Name, err)
121+
}
122+
if opts.CallInfo {
123+
if err := node.Info(ctx); err != nil {
124+
logger.Errorf(ctx, "failed to get node %v info: %+v", node.Name, err)
123125
}
124-
ch <- node
125126
}
126-
}(node))
127+
ch <- node
128+
})
127129
}
128-
pool.Wait(ctx)
129-
}()
130+
wg.Wait()
131+
})
130132
return ch, nil
131133
}
132134

cluster/calcium/node_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -284,36 +284,37 @@ func TestFilterNodes(t *testing.T) {
284284
NodeMeta: types.NodeMeta{Name: "D"},
285285
},
286286
}
287+
ctx := context.Background()
287288

288289
// error
289290
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail to list pod nodes")).Once()
290-
_, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{}, Excludes: []string{"A", "X"}})
291+
_, err := c.filterNodes(ctx, types.NodeFilter{Includes: []string{}, Excludes: []string{"A", "X"}})
291292
assert.Error(err)
292293

293294
// empty nodenames, non-empty excludeNodenames
294295
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil).Once()
295-
nodes1, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{}, Excludes: []string{"A", "B"}})
296+
nodes1, err := c.filterNodes(ctx, types.NodeFilter{Includes: []string{}, Excludes: []string{"A", "B"}})
296297
assert.NoError(err)
297298
assert.Equal(2, len(nodes1))
298299

299300
// empty nodenames, empty excludeNodenames
300301
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil).Once()
301-
nodes2, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{}, Excludes: []string{}})
302+
nodes2, err := c.filterNodes(ctx, types.NodeFilter{Includes: []string{}, Excludes: []string{}})
302303
assert.NoError(err)
303304
assert.Equal(4, len(nodes2))
304305

305306
// non-empty nodenames, empty excludeNodenames
306307
store.On("GetNode", mock.Anything, "O").Return(&types.Node{NodeMeta: types.NodeMeta{Name: "O"}}, nil).Once()
307308
store.On("GetNode", mock.Anything, "P").Return(&types.Node{NodeMeta: types.NodeMeta{Name: "P"}}, nil).Once()
308-
nodes3, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{"O", "P"}, Excludes: []string{}})
309+
nodes3, err := c.filterNodes(ctx, types.NodeFilter{Includes: []string{"O", "P"}, Excludes: []string{}})
309310
assert.NoError(err)
310311
assert.Equal("O", nodes3[0].Name)
311312
assert.Equal("P", nodes3[1].Name)
312313

313314
// non-empty nodenames
314315
store.On("GetNode", mock.Anything, "X").Return(&types.Node{NodeMeta: types.NodeMeta{Name: "X"}}, nil).Once()
315316
store.On("GetNode", mock.Anything, "Y").Return(&types.Node{NodeMeta: types.NodeMeta{Name: "Y"}}, nil).Once()
316-
nodes4, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{"X", "Y"}, Excludes: []string{"A", "B"}})
317+
nodes4, err := c.filterNodes(ctx, types.NodeFilter{Includes: []string{"X", "Y"}, Excludes: []string{"A", "B"}})
317318
assert.NoError(err)
318319
assert.Equal("X", nodes4[0].Name)
319320
assert.Equal("Y", nodes4[1].Name)

cluster/calcium/resource.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package calcium
33
import (
44
"context"
55
"fmt"
6+
"sync"
67

78
"github.com/projecteru2/core/log"
89
"github.com/projecteru2/core/strategy"
910
"github.com/projecteru2/core/types"
10-
"github.com/projecteru2/core/utils"
1111

1212
"github.com/pkg/errors"
1313
)
@@ -20,22 +20,25 @@ func (c *Calcium) PodResource(ctx context.Context, podname string) (chan *types.
2020
return nil, logger.ErrWithTracing(ctx, err)
2121
}
2222
ch := make(chan *types.NodeResource)
23-
pool := utils.NewGoroutinePool(int(c.config.MaxConcurrency))
23+
2424
go func() {
2525
defer close(ch)
26+
wg := &sync.WaitGroup{}
2627
for node := range nodeCh {
27-
nodename := node.Name
28-
pool.Go(ctx, func() {
29-
nodeResource, err := c.doGetNodeResource(ctx, nodename, false)
28+
node := node
29+
wg.Add(1)
30+
_ = c.pool.Invoke(func() {
31+
defer wg.Done()
32+
nodeResource, err := c.doGetNodeResource(ctx, node.Name, false)
3033
if err != nil {
3134
nodeResource = &types.NodeResource{
32-
Name: nodename, Diffs: []string{logger.ErrWithTracing(ctx, err).Error()},
35+
Name: node.Name, Diffs: []string{logger.ErrWithTracing(ctx, err).Error()},
3336
}
3437
}
3538
ch <- nodeResource
3639
})
3740
}
38-
pool.Wait(ctx)
41+
wg.Wait()
3942
}()
4043
return ch, nil
4144
}

0 commit comments

Comments
 (0)