Skip to content

Commit 113786d

Browse files
jschwinger233CMGS
authored andcommitted
intro goroutine pool to restrain max concurrency
1 parent 21bc787 commit 113786d

File tree

5 files changed

+93
-65
lines changed

5 files changed

+93
-65
lines changed

cluster/calcium/create.go

+36-46
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/pkg/errors"
1010
"github.com/sanity-io/litter"
11-
"golang.org/x/sync/semaphore"
1211

1312
"github.com/projecteru2/core/cluster"
1413
enginetypes "github.com/projecteru2/core/engine/types"
@@ -104,7 +103,10 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
104103
},
105104

106105
// rollback: give back resources
107-
func(ctx context.Context, _ bool) (err error) {
106+
func(ctx context.Context, failedOnCond bool) (err error) {
107+
if failedOnCond {
108+
return
109+
}
108110
for nodename, rollbackIndices := range rollbackMap {
109111
if e := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
110112
for _, plan := range plans {
@@ -159,61 +161,46 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
159161
return utils.Range(deploy), errors.WithStack(err)
160162
}
161163

162-
sem, appendLock := semaphore.NewWeighted(c.config.MaxConcurrency), sync.Mutex{}
164+
pool, appendLock := utils.NewGoroutinePool(int(c.config.MaxConcurrency)), sync.Mutex{}
163165
for idx := 0; idx < deploy; idx++ {
164166
createMsg := &types.CreateWorkloadMessage{
165167
Podname: opts.Podname,
166168
Nodename: nodename,
167169
Publish: map[string][]string{},
168170
}
169171

170-
if e := sem.Acquire(ctx, 1); e != nil {
171-
logger.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to acquire semaphore: %+v", e)
172-
err = e
173-
ch <- &types.CreateWorkloadMessage{Error: e}
174-
appendLock.Lock()
175-
indices = append(indices, idx)
176-
appendLock.Unlock()
177-
continue
178-
}
179-
go func(idx int) (e error) {
180-
defer func() {
181-
if e != nil {
182-
err = e
183-
createMsg.Error = logger.Err(e)
184-
appendLock.Lock()
185-
indices = append(indices, idx)
186-
appendLock.Unlock()
172+
pool.Go(func(idx int) func() {
173+
return func() {
174+
var e error
175+
defer func() {
176+
if e != nil {
177+
err = e
178+
createMsg.Error = logger.Err(e)
179+
appendLock.Lock()
180+
indices = append(indices, idx)
181+
appendLock.Unlock()
182+
}
183+
ch <- createMsg
184+
}()
185+
186+
r := &types.ResourceMeta{}
187+
o := resourcetypes.DispenseOptions{
188+
Node: node,
189+
Index: idx,
187190
}
188-
ch <- createMsg
189-
sem.Release(1)
190-
}()
191-
192-
r := &types.ResourceMeta{}
193-
o := resourcetypes.DispenseOptions{
194-
Node: node,
195-
Index: idx,
196-
}
197-
for _, plan := range plans {
198-
if r, e = plan.Dispense(o, r); e != nil {
199-
return errors.WithStack(e)
191+
for _, plan := range plans {
192+
if r, e = plan.Dispense(o, r); e != nil {
193+
return
194+
}
200195
}
201-
}
202-
203-
createMsg.ResourceMeta = *r
204-
createOpts := c.doMakeWorkloadOptions(seq+idx, createMsg, opts, node)
205-
return errors.WithStack(c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, deploy-1-idx))
206-
}(idx) // nolint:errcheck
207-
}
208196

209-
// sem.Acquire(ctx, MaxConcurrency) 等价于 WaitGroup.Wait()
210-
// 用 context.Background() 是为了防止语义被破坏: 一定要等到所有 goroutine 完毕, 不能被用户 ctx 打断
211-
// 否则可能会出现的情况是, 某些 goroutine 还没结束与运行 defer, 这个函数就 return 并 close channel, 导致 defer 里给 closed channel 发消息 panic
212-
if e := sem.Acquire(context.Background(), c.config.MaxConcurrency); e != nil {
213-
logger.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to wait all workers done: %+v", e)
214-
err = e
215-
indices = utils.Range(deploy)
197+
createMsg.ResourceMeta = *r
198+
createOpts := c.doMakeWorkloadOptions(seq+idx, createMsg, opts, node)
199+
e = errors.WithStack(c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, deploy-1-idx))
200+
}
201+
}(idx))
216202
}
203+
pool.Wait()
217204

218205
// remap 就不搞进事务了吧, 回滚代价太大了
219206
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的
@@ -367,6 +354,9 @@ func (c *Calcium) doDeployOneWorkload(
367354

368355
// remove workload
369356
func(ctx context.Context, _ bool) error {
357+
if workload.ID == "" {
358+
return nil
359+
}
370360
return errors.WithStack(c.doRemoveWorkload(ctx, workload, true))
371361
},
372362
c.config.GlobalTimeout,

cluster/calcium/remove.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,10 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload
9595
return errors.WithStack(workload.Remove(ctx, force))
9696
},
9797
// rollback
98-
func(ctx context.Context, _ bool) error {
98+
func(ctx context.Context, failedByCond bool) error {
99+
if failedByCond {
100+
return nil
101+
}
99102
return errors.WithStack(c.store.AddWorkload(ctx, workload))
100103
},
101104
c.config.GlobalTimeout,

engine/docker/container.go

+21-18
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"path/filepath"
1212
"strconv"
1313
"strings"
14-
"sync"
1514
"time"
1615

1716
"github.com/docker/go-connections/nat"
@@ -20,6 +19,7 @@ import (
2019

2120
corecluster "github.com/projecteru2/core/cluster"
2221
"github.com/projecteru2/core/log"
22+
"github.com/projecteru2/core/utils"
2323

2424
dockertypes "github.com/docker/docker/api/types"
2525
dockercontainer "github.com/docker/docker/api/types/container"
@@ -251,27 +251,30 @@ func (e *Engine) VirtualizationResourceRemap(ctx context.Context, opts *enginety
251251
}
252252

253253
// update!
254-
wg := sync.WaitGroup{}
255254
ch := make(chan enginetypes.VirtualizationRemapMessage)
255+
pool := utils.NewGoroutinePool(10)
256256
for id, resource := range freeWorkloadResources {
257-
// TODO@zc: limit the max goroutine
258-
wg.Add(1)
259-
go func(id string, resource enginetypes.VirtualizationResource) {
260-
defer wg.Done()
261-
updateConfig := dockercontainer.UpdateConfig{Resources: dockercontainer.Resources{
262-
CPUQuota: int64(resource.Quota * float64(corecluster.CPUPeriodBase)),
263-
CPUPeriod: corecluster.CPUPeriodBase,
264-
CpusetCpus: shareCPUSet,
265-
CPUShares: defaultCPUShare,
266-
}}
267-
_, err := e.client.ContainerUpdate(ctx, id, updateConfig)
268-
ch <- enginetypes.VirtualizationRemapMessage{
269-
ID: id,
270-
Error: err,
257+
pool.Go(func(id string, resource enginetypes.VirtualizationResource) func() {
258+
return func() {
259+
updateConfig := dockercontainer.UpdateConfig{Resources: dockercontainer.Resources{
260+
CPUQuota: int64(resource.Quota * float64(corecluster.CPUPeriodBase)),
261+
CPUPeriod: corecluster.CPUPeriodBase,
262+
CpusetCpus: shareCPUSet,
263+
CPUShares: defaultCPUShare,
264+
}}
265+
_, err := e.client.ContainerUpdate(ctx, id, updateConfig)
266+
ch <- enginetypes.VirtualizationRemapMessage{
267+
ID: id,
268+
Error: err,
269+
}
271270
}
272-
}(id, resource)
271+
}(id, resource))
273272
}
274-
wg.Wait()
273+
274+
go func() {
275+
defer close(ch)
276+
pool.Wait()
277+
}()
275278
return ch, nil
276279
}
277280

engine/docker/helper.go

+1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func makeResourceSetting(cpu float64, memory int64, cpuMap map[string]int64, num
104104
resource := dockercontainer.Resources{}
105105

106106
resource.CPUQuota = 0
107+
resource.CPUShares = defaultCPUShare
107108
resource.CPUPeriod = corecluster.CPUPeriodBase
108109
if cpu > 0 {
109110
resource.CPUQuota = int64(cpu * float64(corecluster.CPUPeriodBase))

utils/gopool.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
6+
"golang.org/x/sync/semaphore"
7+
)
8+
9+
type GoroutinePool struct {
10+
max int64
11+
sem *semaphore.Weighted
12+
}
13+
14+
func NewGoroutinePool(max int) *GoroutinePool {
15+
return &GoroutinePool{
16+
max: int64(max),
17+
sem: semaphore.NewWeighted(int64(max)),
18+
}
19+
}
20+
21+
func (p *GoroutinePool) Go(f func()) {
22+
p.sem.Acquire(context.Background(), 1)
23+
go func() {
24+
defer p.sem.Release(1)
25+
f()
26+
}()
27+
}
28+
29+
func (p *GoroutinePool) Wait() {
30+
p.sem.Acquire(context.Background(), p.max)
31+
}

0 commit comments

Comments
 (0)