Skip to content

Commit 1664108

Browse files
Resolve potential deadlocks (#438)
* sort & unique the nodes and workloads before locking * make sure locking nodes before locking workloads * unlock the mutex in the reverse order * add tests
1 parent c8630ed commit 1664108

File tree

7 files changed

+193
-97
lines changed

7 files changed

+193
-97
lines changed

cluster/calcium/lock.go

+28-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package calcium
33
import (
44
"context"
55
"fmt"
6+
"sort"
67
"time"
78

89
"github.com/pkg/errors"
@@ -37,11 +38,18 @@ func (c *Calcium) doUnlock(ctx context.Context, lock lock.DistributedLock, msg s
3738
return errors.WithStack(lock.Unlock(ctx))
3839
}
3940

40-
func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock) {
41-
for n, lock := range locks {
42-
// force unlock
43-
if err := c.doUnlock(ctx, lock, n); err != nil {
44-
log.Errorf(ctx, "[doUnlockAll] Unlock failed %v", err)
41+
func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock, order ...string) {
42+
// unlock in the reverse order
43+
if len(order) != len(locks) {
44+
log.Warn(ctx, "[doUnlockAll] order length not match lock map")
45+
order = []string{}
46+
for key := range locks {
47+
order = append(order, key)
48+
}
49+
}
50+
for _, key := range order {
51+
if err := c.doUnlock(ctx, locks[key], key); err != nil {
52+
log.Errorf(ctx, "[doUnlockAll] Unlock %s failed %v", key, err)
4553
continue
4654
}
4755
}
@@ -72,8 +80,16 @@ func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(co
7280
func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error {
7381
workloads := map[string]*types.Workload{}
7482
locks := map[string]lock.DistributedLock{}
83+
84+
// sort + unique
85+
sort.Strings(ids)
86+
ids = ids[:utils.Unique(ids, func(i int) string { return ids[i] })]
87+
7588
defer log.Debugf(ctx, "[withWorkloadsLocked] Workloads %+v unlocked", ids)
76-
defer func() { c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks) }()
89+
defer func() {
90+
utils.Reverse(ids)
91+
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, ids...)
92+
}()
7793
cs, err := c.GetWorkloads(ctx, ids)
7894
if err != nil {
7995
return err
@@ -94,10 +110,14 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
94110
// withNodesLocked will using NodeFilter `nf` to filter nodes
95111
// and lock the corresponding nodes for the callback function `f` to use
96112
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
113+
nodenames := []string{}
97114
nodes := map[string]*types.Node{}
98115
locks := map[string]lock.DistributedLock{}
99116
defer log.Debugf(ctx, "[withNodesLocked] Nodes %+v unlocked", nf)
100-
defer c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks)
117+
defer func() {
118+
utils.Reverse(nodenames)
119+
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, nodenames...)
120+
}()
101121

102122
ns, err := c.filterNodes(ctx, nf)
103123
if err != nil {
@@ -118,6 +138,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f fu
118138
return err
119139
}
120140
nodes[n.Name] = node
141+
nodenames = append(nodenames, n.Name)
121142
}
122143
return f(ctx, nodes)
123144
}

cluster/calcium/node.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package calcium
22

33
import (
44
"context"
5+
"sort"
56

67
"github.com/pkg/errors"
78
"github.com/projecteru2/core/log"
@@ -176,8 +177,16 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
176177
// filterNodes filters nodes using NodeFilter nf
177178
// the filtering logic is introduced along with NodeFilter
178179
// NOTE: when nf.Includes is set, they don't need to belong to podname
179-
func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) ([]*types.Node, error) {
180-
ns := []*types.Node{}
180+
// updateon 2021-06-21: sort and unique locks to avoid deadlock
181+
func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) (ns []*types.Node, err error) {
182+
defer func() {
183+
if len(ns) == 0 {
184+
return
185+
}
186+
sort.Slice(ns, func(i, j int) bool { return ns[i].Name <= ns[j].Name })
187+
// unique
188+
ns = ns[:utils.Unique(ns, func(i int) string { return ns[i].Name })]
189+
}()
181190

182191
if len(nf.Includes) != 0 {
183192
for _, nodename := range nf.Includes {

cluster/calcium/realloc.go

+82-78
Original file line numberDiff line numberDiff line change
@@ -16,94 +16,98 @@ import (
1616
// ReallocResource updates workload resource dynamically
1717
func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (err error) {
1818
logger := log.WithField("Calcium", "ReallocResource").WithField("opts", opts)
19-
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
20-
rrs, err := resources.MakeRequests(
21-
types.ResourceOptions{
22-
CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest),
23-
CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit),
24-
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0),
25-
CPU: workload.CPU,
26-
MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest,
27-
MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit,
28-
StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest,
29-
StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit,
30-
VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest),
31-
VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit),
32-
VolumeExist: workload.VolumePlanRequest,
33-
},
34-
)
35-
if err != nil {
36-
return logger.Err(ctx, err)
37-
}
38-
return logger.Err(ctx, c.doReallocOnNode(ctx, workload.Nodename, workload, rrs))
19+
workload, err := c.GetWorkload(ctx, opts.ID)
20+
if err != nil {
21+
return
22+
}
23+
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {
24+
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
25+
rrs, err := resources.MakeRequests(
26+
types.ResourceOptions{
27+
CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest),
28+
CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit),
29+
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0),
30+
CPU: workload.CPU,
31+
MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest,
32+
MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit,
33+
StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest,
34+
StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit,
35+
VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest),
36+
VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit),
37+
VolumeExist: workload.VolumePlanRequest,
38+
},
39+
)
40+
if err != nil {
41+
return logger.Err(ctx, err)
42+
}
43+
return logger.Err(ctx, c.doReallocOnNode(ctx, node, workload, rrs))
44+
})
3945
})
4046
}
4147

4248
// transaction: node resource
43-
func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload *types.Workload, rrs resourcetypes.ResourceRequests) error {
44-
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) (err error) {
45-
node.RecycleResources(&workload.ResourceMeta)
46-
plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node})
47-
if err != nil {
48-
return err
49-
}
49+
func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workload *types.Workload, rrs resourcetypes.ResourceRequests) (err error) {
50+
node.RecycleResources(&workload.ResourceMeta)
51+
plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node})
52+
if err != nil {
53+
return err
54+
}
5055

51-
originalWorkload := *workload
52-
resourceMeta := &types.ResourceMeta{}
53-
if err = utils.Txn(
54-
ctx,
56+
originalWorkload := *workload
57+
resourceMeta := &types.ResourceMeta{}
58+
if err = utils.Txn(
59+
ctx,
5560

56-
// if update workload resources
57-
func(ctx context.Context) (err error) {
58-
resourceMeta := &types.ResourceMeta{}
59-
for _, plan := range plans {
60-
if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{
61-
Node: node,
62-
}, resourceMeta); err != nil {
63-
return err
64-
}
61+
// if update workload resources
62+
func(ctx context.Context) (err error) {
63+
resourceMeta := &types.ResourceMeta{}
64+
for _, plan := range plans {
65+
if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{
66+
Node: node,
67+
}, resourceMeta); err != nil {
68+
return err
6569
}
70+
}
6671

67-
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload)
68-
},
69-
// then commit changes
70-
func(ctx context.Context) error {
71-
for _, plan := range plans {
72-
plan.ApplyChangesOnNode(node, 0)
73-
}
74-
return errors.WithStack(c.store.UpdateNodes(ctx, node))
75-
},
76-
// no need rollback
77-
func(ctx context.Context, failureByCond bool) (err error) {
78-
if failureByCond {
79-
return
80-
}
81-
r := &types.ResourceMeta{
82-
CPUQuotaRequest: originalWorkload.CPUQuotaRequest,
83-
CPUQuotaLimit: originalWorkload.CPUQuotaLimit,
84-
CPU: originalWorkload.CPU,
85-
NUMANode: originalWorkload.NUMANode,
86-
MemoryRequest: originalWorkload.MemoryRequest,
87-
MemoryLimit: originalWorkload.MemoryLimit,
88-
VolumeRequest: originalWorkload.VolumeRequest,
89-
VolumeLimit: originalWorkload.VolumeLimit,
90-
VolumePlanRequest: originalWorkload.VolumePlanRequest,
91-
VolumePlanLimit: originalWorkload.VolumePlanLimit,
92-
VolumeChanged: resourceMeta.VolumeChanged,
93-
StorageRequest: originalWorkload.StorageRequest,
94-
StorageLimit: originalWorkload.StorageLimit,
95-
}
96-
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload)
97-
},
72+
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload)
73+
},
74+
// then commit changes
75+
func(ctx context.Context) error {
76+
for _, plan := range plans {
77+
plan.ApplyChangesOnNode(node, 0)
78+
}
79+
return errors.WithStack(c.store.UpdateNodes(ctx, node))
80+
},
81+
// no need rollback
82+
func(ctx context.Context, failureByCond bool) (err error) {
83+
if failureByCond {
84+
return
85+
}
86+
r := &types.ResourceMeta{
87+
CPUQuotaRequest: originalWorkload.CPUQuotaRequest,
88+
CPUQuotaLimit: originalWorkload.CPUQuotaLimit,
89+
CPU: originalWorkload.CPU,
90+
NUMANode: originalWorkload.NUMANode,
91+
MemoryRequest: originalWorkload.MemoryRequest,
92+
MemoryLimit: originalWorkload.MemoryLimit,
93+
VolumeRequest: originalWorkload.VolumeRequest,
94+
VolumeLimit: originalWorkload.VolumeLimit,
95+
VolumePlanRequest: originalWorkload.VolumePlanRequest,
96+
VolumePlanLimit: originalWorkload.VolumePlanLimit,
97+
VolumeChanged: resourceMeta.VolumeChanged,
98+
StorageRequest: originalWorkload.StorageRequest,
99+
StorageLimit: originalWorkload.StorageLimit,
100+
}
101+
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload)
102+
},
98103

99-
c.config.GlobalTimeout,
100-
); err != nil {
101-
return
102-
}
104+
c.config.GlobalTimeout,
105+
); err != nil {
106+
return
107+
}
103108

104-
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
105-
return nil
106-
})
109+
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
110+
return nil
107111
}
108112

109113
func (c *Calcium) doReallocWorkloadsOnInstance(ctx context.Context, engine engine.API, resourceMeta *types.ResourceMeta, workload *types.Workload) (err error) {

cluster/calcium/realloc_test.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -97,27 +97,29 @@ func TestRealloc(t *testing.T) {
9797
}
9898
}
9999

100-
store.On("GetWorkloads", mock.Anything, []string{"c1"}).Return(newC1, nil)
100+
store.On("GetWorkload", mock.Anything, "c1").Return(newC1(context.TODO(), nil)[0], nil)
101+
102+
// failed by GetNode
103+
store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once()
104+
err := c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
105+
assert.EqualError(t, err, "ETCD must be set")
106+
store.AssertExpectations(t)
107+
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
108+
101109
// failed by lock
102110
store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
103-
err := c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
111+
err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
104112
assert.EqualError(t, err, "ETCD must be set")
105113
store.AssertExpectations(t)
106114
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
107115

108116
// failed by newCPU < 0
117+
store.On("GetWorkloads", mock.Anything, []string{"c1"}).Return(newC1, nil)
109118
err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
110119
assert.EqualError(t, err, "limit or request less than 0: bad `CPU` value")
111120
store.AssertExpectations(t)
112121

113-
// failed by GetNode
114-
store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once()
115-
err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
116-
assert.EqualError(t, err, "ETCD must be set")
117-
store.AssertExpectations(t)
118-
119122
// failed by no enough mem
120-
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
121123
simpleMockScheduler := &schedulermocks.Scheduler{}
122124
scheduler.InitSchedulerV1(simpleMockScheduler)
123125
c.scheduler = simpleMockScheduler
@@ -165,6 +167,7 @@ func TestRealloc(t *testing.T) {
165167
},
166168
Engine: engine,
167169
}
170+
store.On("GetWorkload", mock.Anything, "c2").Return(newC2(nil, nil)[0], nil)
168171
store.On("GetWorkloads", mock.Anything, []string{"c2"}).Return(newC2, nil)
169172
err = c.ReallocResource(ctx, newReallocOptions("c2", 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep))
170173
assert.EqualError(t, err, "workload ID must be length of 64")
@@ -251,6 +254,7 @@ func TestRealloc(t *testing.T) {
251254
simpleMockScheduler.On("ReselectCPUNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos[0], nodeCPUPlans, 2, nil)
252255
simpleMockScheduler.On("ReselectVolumeNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos[0], nodeVolumePlans, 2, nil).Once()
253256
store.On("GetNode", mock.Anything, "node2").Return(node2, nil)
257+
store.On("GetWorkload", mock.Anything, "c3").Return(c3, nil)
254258
store.On("GetWorkloads", mock.Anything, []string{"c3"}).Return([]*types.Workload{c3}, nil)
255259
store.On("UpdateWorkload", mock.Anything, mock.Anything).Return(types.ErrBadWorkloadID).Times(1)
256260
err = c.ReallocResource(ctx, newReallocOptions("c3", 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}), types.TriKeep, types.TriKeep))
@@ -341,6 +345,7 @@ func TestReallocBindCpu(t *testing.T) {
341345
}
342346

343347
store.On("GetNode", mock.Anything, "node3").Return(node3, nil)
348+
store.On("GetWorkload", mock.Anything, "c5").Return(c5, nil)
344349
store.On("GetWorkloads", mock.Anything, []string{"c5"}).Return([]*types.Workload{c5}, nil)
345350
engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(nil)
346351
store.On("UpdateWorkload", mock.Anything, mock.Anything).Return(nil)
@@ -358,6 +363,7 @@ func TestReallocBindCpu(t *testing.T) {
358363
assert.Equal(t, 0, len(c5.ResourceMeta.CPU))
359364
store.AssertExpectations(t)
360365

366+
store.On("GetWorkload", mock.Anything, "c6").Return(c6, nil)
361367
store.On("GetWorkloads", mock.Anything, []string{"c6"}).Return([]*types.Workload{c6}, nil)
362368
err = c.ReallocResource(ctx, newReallocOptions("c6", 0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep))
363369
assert.NoError(t, err)

scheduler/complex/potassium.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ func (m *Potassium) SelectCPUNodes(ctx context.Context, scheduleInfos []resource
124124

125125
// ReselectCPUNodes used for realloc one container with cpu affinity
126126
func (m *Potassium) ReselectCPUNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, CPU types.CPUMap, quota float64, memory int64) (resourcetypes.ScheduleInfo, map[string][]types.CPUMap, int, error) {
127+
log.Infof(ctx, "[SelectCPUNodes] scheduleInfo %v, need cpu %f, need memory %d, existing %v", scheduleInfo, quota, memory, CPU)
127128
var affinityPlan types.CPUMap
128129
// remaining quota that's impossible to achieve affinity
129130
if scheduleInfo, quota, affinityPlan = cpuReallocPlan(scheduleInfo, quota, CPU, int64(m.sharebase)); quota == 0 {
@@ -325,7 +326,7 @@ func (m *Potassium) SelectVolumeNodes(ctx context.Context, scheduleInfos []resou
325326

326327
// ReselectVolumeNodes is used for realloc only
327328
func (m *Potassium) ReselectVolumeNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, existing types.VolumePlan, vbsReq types.VolumeBindings) (resourcetypes.ScheduleInfo, map[string][]types.VolumePlan, int, error) {
328-
329+
log.Infof(ctx, "[ReselectVolumeNodes] scheduleInfo %v, need volume: %v, existing %v", scheduleInfo, vbsReq.ToStringSlice(true, true), existing.ToLiteral())
329330
affinityPlan := types.VolumePlan{}
330331
needReschedule := types.VolumeBindings{}
331332
norm, mono, unlim := distinguishVolumeBindings(vbsReq)

0 commit comments

Comments
 (0)