Skip to content

Commit f963a86

Browse files
committed
split remap
1 parent c280e15 commit f963a86

File tree

6 files changed

+116
-94
lines changed

6 files changed

+116
-94
lines changed

cluster/calcium/image_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestRemoveImage(t *testing.T) {
4949
assert.False(t, c.Success)
5050
}
5151
engine.On("ImageRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"xx"}, nil)
52-
// sucess remove but prune fail
52+
// success remove but prune fail
5353
engine.On("ImagesPrune", mock.Anything).Return(types.ErrBadStorage).Once()
5454
ch, err = c.RemoveImage(ctx, &types.ImageOptions{Podname: "podname", Images: []string{"xx"}, Prune: true})
5555
for c := range ch {

cluster/calcium/remap.go

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package calcium
2+
3+
import (
4+
"context"
5+
6+
enginetypes "github.com/projecteru2/core/engine/types"
7+
"github.com/projecteru2/core/log"
8+
"github.com/projecteru2/core/types"
9+
"github.com/projecteru2/core/utils"
10+
)
11+
12+
type remapMsg struct {
13+
id string
14+
err error
15+
}
16+
17+
func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, node *types.Node) {
18+
log.Debugf(ctx, "[doRemapResourceAndLog] remap node %s", node.Name)
19+
ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
20+
defer cancel()
21+
22+
err := c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
23+
logger = logger.WithField("Calcium", "doRemapResourceAndLog").WithField("nodename", node.Name)
24+
if ch, err := c.remapResource(ctx, node); logger.ErrWithTracing(ctx, err) == nil {
25+
for msg := range ch {
26+
log.Infof(ctx, "[doRemapResourceAndLog] id %v", msg.id)
27+
logger.WithField("id", msg.id).ErrWithTracing(ctx, msg.err) // nolint:errcheck
28+
}
29+
}
30+
return nil
31+
})
32+
33+
if err != nil {
34+
log.Errorf(ctx, "[doRemapResourceAndLog] remap node %s failed, err: %v", node.Name, err)
35+
}
36+
}
37+
38+
// called on changes of resource binding, such as cpu binding
39+
// as an internal api, remap doesn't lock node, the responsibility of that should be taken on by caller
40+
func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch chan *remapMsg, err error) {
41+
workloads, err := c.store.ListNodeWorkloads(ctx, node.Name, nil)
42+
if err != nil {
43+
return
44+
}
45+
46+
workloadMap := map[string]*types.Workload{}
47+
for _, workload := range workloads {
48+
workloadMap[workload.ID] = workload
49+
}
50+
51+
engineArgsMap, err := c.rmgr.GetRemapArgs(ctx, node.Name, workloadMap)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
ch = make(chan *remapMsg, len(engineArgsMap))
57+
go func() {
58+
defer close(ch)
59+
for workloadID, engineArgs := range engineArgsMap {
60+
ch <- &remapMsg{
61+
id: workloadID,
62+
err: node.Engine.VirtualizationUpdateResource(ctx, workloadID, &enginetypes.VirtualizationResource{EngineArgs: engineArgs}),
63+
}
64+
}
65+
}()
66+
67+
return ch, nil
68+
}

cluster/calcium/remap_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package calcium
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
enginemocks "github.com/projecteru2/core/engine/mocks"
8+
enginetypes "github.com/projecteru2/core/engine/types"
9+
"github.com/projecteru2/core/log"
10+
resourcemocks "github.com/projecteru2/core/resources/mocks"
11+
storemocks "github.com/projecteru2/core/store/mocks"
12+
"github.com/projecteru2/core/types"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/mock"
15+
)
16+
17+
func TestRemapResource(t *testing.T) {
18+
c := NewTestCluster()
19+
store := c.store.(*storemocks.Store)
20+
rmgr := c.rmgr.(*resourcemocks.Manager)
21+
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
22+
map[string]types.NodeResourceArgs{"test": map[string]interface{}{"abc": 123}},
23+
map[string]types.NodeResourceArgs{"test": map[string]interface{}{"abc": 123}},
24+
[]string{types.ErrNoETCD.Error()},
25+
nil)
26+
rmgr.On("GetRemapArgs", mock.Anything, mock.Anything, mock.Anything).Return(
27+
map[string]types.EngineArgs{},
28+
nil,
29+
)
30+
engine := &enginemocks.API{}
31+
node := &types.Node{Engine: engine}
32+
33+
workload := &types.Workload{
34+
ResourceArgs: map[string]types.WorkloadResourceArgs{},
35+
}
36+
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
37+
ch := make(chan enginetypes.VirtualizationRemapMessage, 1)
38+
ch <- enginetypes.VirtualizationRemapMessage{}
39+
close(ch)
40+
engine.On("VirtualizationResourceRemap", mock.Anything, mock.Anything).Return((<-chan enginetypes.VirtualizationRemapMessage)(ch), nil)
41+
_, err := c.remapResource(context.Background(), node)
42+
assert.Nil(t, err)
43+
44+
c.doRemapResourceAndLog(context.TODO(), log.WithField("test", "zc"), node)
45+
}

cluster/calcium/resource.go

-59
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
enginetypes "github.com/projecteru2/core/engine/types"
87
"github.com/projecteru2/core/log"
98
"github.com/projecteru2/core/strategy"
109
"github.com/projecteru2/core/types"
@@ -120,61 +119,3 @@ func (c *Calcium) doGetDeployMap(ctx context.Context, nodes []string, opts *type
120119

121120
return deployMap, nil
122121
}
123-
124-
type remapMsg struct {
125-
id string
126-
err error
127-
}
128-
129-
// called on changes of resource binding, such as cpu binding
130-
// as an internal api, remap doesn't lock node, the responsibility of that should be taken on by caller
131-
func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch chan *remapMsg, err error) {
132-
workloads, err := c.store.ListNodeWorkloads(ctx, node.Name, nil)
133-
if err != nil {
134-
return
135-
}
136-
137-
workloadMap := map[string]*types.Workload{}
138-
for _, workload := range workloads {
139-
workloadMap[workload.ID] = workload
140-
}
141-
142-
engineArgsMap, err := c.rmgr.GetRemapArgs(ctx, node.Name, workloadMap)
143-
if err != nil {
144-
return nil, err
145-
}
146-
147-
ch = make(chan *remapMsg, len(engineArgsMap))
148-
go func() {
149-
defer close(ch)
150-
for workloadID, engineArgs := range engineArgsMap {
151-
ch <- &remapMsg{
152-
id: workloadID,
153-
err: node.Engine.VirtualizationUpdateResource(ctx, workloadID, &enginetypes.VirtualizationResource{EngineArgs: engineArgs}),
154-
}
155-
}
156-
}()
157-
158-
return ch, nil
159-
}
160-
161-
func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, node *types.Node) {
162-
log.Debugf(ctx, "[doRemapResourceAndLog] remap node %s", node.Name)
163-
ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
164-
defer cancel()
165-
166-
err := c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
167-
logger = logger.WithField("Calcium", "doRemapResourceAndLog").WithField("nodename", node.Name)
168-
if ch, err := c.remapResource(ctx, node); logger.ErrWithTracing(ctx, err) == nil {
169-
for msg := range ch {
170-
log.Infof(ctx, "[doRemapResourceAndLog] id %v", msg.id)
171-
logger.WithField("id", msg.id).ErrWithTracing(ctx, msg.err) // nolint:errcheck
172-
}
173-
}
174-
return nil
175-
})
176-
177-
if err != nil {
178-
log.Errorf(ctx, "[doRemapResourceAndLog] remap node %s failed, err: %v", node.Name, err)
179-
}
180-
}

cluster/calcium/resource_test.go

-32
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ import (
88
"time"
99

1010
enginemocks "github.com/projecteru2/core/engine/mocks"
11-
enginetypes "github.com/projecteru2/core/engine/types"
1211
lockmocks "github.com/projecteru2/core/lock/mocks"
13-
"github.com/projecteru2/core/log"
1412
resourcetypes "github.com/projecteru2/core/resources"
1513
resourcemocks "github.com/projecteru2/core/resources/mocks"
1614
storemocks "github.com/projecteru2/core/store/mocks"
@@ -132,33 +130,3 @@ func TestNodeResource(t *testing.T) {
132130
details := strings.Join(nr.Diffs, ",")
133131
assert.Contains(t, details, "inspect failed")
134132
}
135-
136-
func TestRemapResource(t *testing.T) {
137-
c := NewTestCluster()
138-
store := c.store.(*storemocks.Store)
139-
rmgr := c.rmgr.(*resourcemocks.Manager)
140-
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
141-
map[string]types.NodeResourceArgs{"test": map[string]interface{}{"abc": 123}},
142-
map[string]types.NodeResourceArgs{"test": map[string]interface{}{"abc": 123}},
143-
[]string{types.ErrNoETCD.Error()},
144-
nil)
145-
rmgr.On("GetRemapArgs", mock.Anything, mock.Anything, mock.Anything).Return(
146-
map[string]types.EngineArgs{},
147-
nil,
148-
)
149-
engine := &enginemocks.API{}
150-
node := &types.Node{Engine: engine}
151-
152-
workload := &types.Workload{
153-
ResourceArgs: map[string]types.WorkloadResourceArgs{},
154-
}
155-
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
156-
ch := make(chan enginetypes.VirtualizationRemapMessage, 1)
157-
ch <- enginetypes.VirtualizationRemapMessage{}
158-
close(ch)
159-
engine.On("VirtualizationResourceRemap", mock.Anything, mock.Anything).Return((<-chan enginetypes.VirtualizationRemapMessage)(ch), nil)
160-
_, err := c.remapResource(context.Background(), node)
161-
assert.Nil(t, err)
162-
163-
c.doRemapResourceAndLog(context.TODO(), log.WithField("test", "zc"), node)
164-
}

lock/etcdlock/mutex_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestMutex(t *testing.T) {
5555
assert.NoError(t, err)
5656
_, err = m5.Lock(context.Background())
5757
assert.NoError(t, err)
58-
// then after embeded ETCD close, m5 will be unlocked from passive branch
58+
// then after embedded ETCD close, m5 will be unlocked from passive branch
5959
}
6060

6161
func TestTryLock(t *testing.T) {
@@ -109,5 +109,5 @@ func TestTryLock(t *testing.T) {
109109
assert.NoError(t, err)
110110
_, err = m6.TryLock(context.Background())
111111
assert.NoError(t, err)
112-
// then after embeded ETCD close, m5 will be unlocked from passive branch
112+
// then after embedded ETCD close, m5 will be unlocked from passive branch
113113
}

0 commit comments

Comments
 (0)