Skip to content

Commit 6aab298

Browse files
authored
add SetNodeStatus and NodeStatusStream API (#323)
* add SetNodeStatus and NodeStatusStream API * IDs -> ids, use /status prefix instead * regenerate mock
1 parent fa0fab2 commit 6aab298

22 files changed

+2008
-1387
lines changed

Makefile

+5-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ GO_LDFLAGS ?= -s -X $(REPO_PATH)/version.REVISION=$(REVISION) \
99
-X $(REPO_PATH)/version.VERSION=$(VERSION)
1010

1111
grpc:
12-
cd ./rpc/gen/; protoc --go_out=plugins=grpc:. core.proto
12+
protoc --proto_path=./rpc/gen --go_out=plugins=grpc:./rpc/gen --go_opt=module=github.com/projecteru2/core/rpc/gen core.proto
1313

1414
deps:
1515
env GO111MODULE=on go mod download
@@ -23,8 +23,10 @@ build: deps binary
2323
test: deps unit-test
2424

2525
mock: deps
26-
mockery -dir ./vendor/google.golang.org/grpc -name ServerStream -output 3rdmocks
27-
mockery -dir vendor/github.com/docker/docker/client -name APIClient -output engine/docker/mocks
26+
mockery --dir ./vendor/google.golang.org/grpc --name ServerStream --output 3rdmocks
27+
mockery --dir vendor/github.com/docker/docker/client --name APIClient --output engine/docker/mocks
28+
mockery --dir store --output store/mocks --name Store
29+
mockery --dir cluster --output cluster/mocks --name Cluster
2830

2931
.ONESHELL:
3032

cluster/calcium/build.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,11 @@ func withImageBuiltChannel(f func(chan *types.BuildImageMessage)) chan *types.Bu
164164
return ch
165165
}
166166

167-
func cleanupNodeImages(node *types.Node, IDs []string, ttl time.Duration) {
167+
func cleanupNodeImages(node *types.Node, ids []string, ttl time.Duration) {
168168
ctx, cancel := context.WithTimeout(context.Background(), ttl)
169169
defer cancel()
170-
for _, ID := range IDs {
171-
if _, err := node.Engine.ImageRemove(ctx, ID, false, true); err != nil {
170+
for _, id := range ids {
171+
if _, err := node.Engine.ImageRemove(ctx, id, false, true); err != nil {
172172
log.Errorf("[BuildImage] Remove image error: %s", err)
173173
}
174174
}

cluster/calcium/control.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ import (
1212
)
1313

1414
// ControlWorkload control workloads status
15-
func (c *Calcium) ControlWorkload(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) {
15+
func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) {
1616
ch := make(chan *types.ControlWorkloadMessage)
1717

1818
go func() {
1919
defer close(ch)
2020
wg := sync.WaitGroup{}
21-
for _, ID := range IDs {
21+
for _, id := range ids {
2222
wg.Add(1)
23-
go func(ID string) {
23+
go func(id string) {
2424
defer wg.Done()
2525
var message []*bytes.Buffer
26-
err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
26+
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
2727
var err error
2828
switch t {
2929
case cluster.WorkloadStop:
@@ -44,16 +44,16 @@ func (c *Calcium) ControlWorkload(ctx context.Context, IDs []string, t string, f
4444
return types.ErrUnknownControlType
4545
})
4646
if err == nil {
47-
log.Infof("[ControlWorkload] Workload %s %s", ID, t)
47+
log.Infof("[ControlWorkload] Workload %s %s", id, t)
4848
log.Info("[ControlWorkload] Hook Output:")
4949
log.Info(string(utils.MergeHookOutputs(message)))
5050
}
5151
ch <- &types.ControlWorkloadMessage{
52-
WorkloadID: ID,
52+
WorkloadID: id,
5353
Error: err,
5454
Hook: message,
5555
}
56-
}(ID)
56+
}(id)
5757
}
5858
wg.Wait()
5959
}()

cluster/calcium/dissociate.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ import (
1010
)
1111

1212
// DissociateWorkload dissociate workload from eru, return it resource but not modity it
13-
func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *types.DissociateWorkloadMessage, error) {
13+
func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error) {
1414
ch := make(chan *types.DissociateWorkloadMessage)
1515
go func() {
1616
defer close(ch)
17-
for _, ID := range IDs {
18-
err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
17+
for _, id := range ids {
18+
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
1919
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
2020
return utils.Txn(
2121
ctx,
@@ -35,9 +35,9 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *t
3535
})
3636
})
3737
if err != nil {
38-
log.Errorf("[DissociateWorkload] Dissociate workload %s failed, err: %v", ID, err)
38+
log.Errorf("[DissociateWorkload] Dissociate workload %s failed, err: %v", id, err)
3939
}
40-
ch <- &types.DissociateWorkloadMessage{WorkloadID: ID, Error: err}
40+
ch <- &types.DissociateWorkloadMessage{WorkloadID: id, Error: err}
4141
}
4242
}()
4343
return ch, nil

cluster/calcium/lock.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.Distrib
3535
}
3636
}
3737

38-
func (c *Calcium) withWorkloadLocked(ctx context.Context, ID string, f func(context.Context, *types.Workload) error) error {
39-
return c.withWorkloadsLocked(ctx, []string{ID}, func(ctx context.Context, workloads map[string]*types.Workload) error {
40-
if c, ok := workloads[ID]; ok {
38+
func (c *Calcium) withWorkloadLocked(ctx context.Context, id string, f func(context.Context, *types.Workload) error) error {
39+
return c.withWorkloadsLocked(ctx, []string{id}, func(ctx context.Context, workloads map[string]*types.Workload) error {
40+
if c, ok := workloads[id]; ok {
4141
return f(ctx, c)
4242
}
4343
return types.ErrWorkloadNotExists
@@ -53,11 +53,11 @@ func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(co
5353
})
5454
}
5555

56-
func (c *Calcium) withWorkloadsLocked(ctx context.Context, IDs []string, f func(context.Context, map[string]*types.Workload) error) error {
56+
func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error {
5757
workloads := map[string]*types.Workload{}
5858
locks := map[string]lock.DistributedLock{}
5959
defer func() { c.doUnlockAll(context.Background(), locks) }()
60-
cs, err := c.GetWorkloads(ctx, IDs)
60+
cs, err := c.GetWorkloads(ctx, ids)
6161
if err != nil {
6262
return err
6363
}

cluster/calcium/remove.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
// RemoveWorkload remove workloads
1616
// returns a channel that contains removing responses
17-
func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
17+
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
1818
ch := make(chan *types.RemoveWorkloadMessage)
1919
if step < 1 {
2020
step = 1
@@ -24,12 +24,12 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool,
2424
defer close(ch)
2525
wg := sync.WaitGroup{}
2626
defer wg.Wait()
27-
for i, ID := range IDs {
27+
for i, id := range ids {
2828
wg.Add(1)
29-
go func(ID string) {
29+
go func(id string) {
3030
defer wg.Done()
31-
ret := &types.RemoveWorkloadMessage{WorkloadID: ID, Success: false, Hook: []*bytes.Buffer{}}
32-
if err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
31+
ret := &types.RemoveWorkloadMessage{WorkloadID: id, Success: false, Hook: []*bytes.Buffer{}}
32+
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
3333
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
3434
return utils.Txn(
3535
ctx,
@@ -48,13 +48,13 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool,
4848
)
4949
})
5050
}); err != nil {
51-
log.Errorf("[RemoveWorkload] Remove workload %s failed, err: %v", ID, err)
51+
log.Errorf("[RemoveWorkload] Remove workload %s failed, err: %v", id, err)
5252
ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error()))
5353
} else {
5454
ret.Success = true
5555
}
5656
ch <- ret
57-
}(ID)
57+
}(id)
5858
if (i+1)%step == 0 {
5959
log.Info("[RemoveWorkload] Wait for previous tasks done")
6060
wg.Wait()
@@ -83,8 +83,8 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload
8383
}
8484

8585
// 同步地删除容器, 在某些需要等待的场合异常有用!
86-
func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, IDs []string) error {
87-
ch, err := c.RemoveWorkload(ctx, IDs, true, 1)
86+
func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, ids []string) error {
87+
ch, err := c.RemoveWorkload(ctx, ids, true, 1)
8888
if err != nil {
8989
return err
9090
}

cluster/calcium/send.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,21 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
2121
defer close(ch)
2222
wg := &sync.WaitGroup{}
2323

24-
for _, ID := range opts.IDs {
25-
log.Infof("[Send] Send files to %s", ID)
24+
for _, id := range opts.IDs {
25+
log.Infof("[Send] Send files to %s", id)
2626
wg.Add(1)
27-
go func(ID string) {
27+
go func(id string) {
2828
defer wg.Done()
29-
if err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
29+
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
3030
for dst, content := range opts.Data {
3131
err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, dst, bytes.NewBuffer(content), true, true)
32-
ch <- &types.SendMessage{ID: ID, Path: dst, Error: err}
32+
ch <- &types.SendMessage{ID: id, Path: dst, Error: err}
3333
}
3434
return nil
3535
}); err != nil {
36-
ch <- &types.SendMessage{ID: ID, Error: err}
36+
ch <- &types.SendMessage{ID: id, Error: err}
3737
}
38-
}(ID)
38+
}(id)
3939
}
4040
wg.Wait()
4141
}()

cluster/calcium/status.go

+15
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,18 @@ func (c *Calcium) SetWorkloadsStatus(ctx context.Context, status []*types.Status
4444
func (c *Calcium) WorkloadStatusStream(ctx context.Context, appname, entrypoint, nodename string, labels map[string]string) chan *types.WorkloadStatus {
4545
return c.store.WorkloadStatusStream(ctx, appname, entrypoint, nodename, labels)
4646
}
47+
48+
// SetNodeStatus set status of a node
49+
// it's used to report whether a node is still alive
50+
func (c *Calcium) SetNodeStatus(ctx context.Context, nodename string, ttl int64) error {
51+
node, err := c.store.GetNode(ctx, nodename)
52+
if err != nil {
53+
return err
54+
}
55+
return c.store.SetNodeStatus(ctx, node, ttl)
56+
}
57+
58+
// NodeStatusStream returns a stream of node status for subscribing
59+
func (c *Calcium) NodeStatusStream(ctx context.Context) chan *types.NodeStatus {
60+
return c.store.NodeStatusStream(ctx)
61+
}

cluster/calcium/status_test.go

+54
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,57 @@ func TestWorkloadStatusStream(t *testing.T) {
8080
assert.Equal(t, c.Delete, true)
8181
}
8282
}
83+
84+
func TestSetNodeStatus(t *testing.T) {
85+
assert := assert.New(t)
86+
c := NewTestCluster()
87+
ctx := context.Background()
88+
store := c.store.(*storemocks.Store)
89+
90+
node := &types.Node{
91+
NodeMeta: types.NodeMeta{
92+
Name: "testname",
93+
Endpoint: "ep",
94+
},
95+
}
96+
// failed
97+
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrBadCount).Once()
98+
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
99+
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
100+
// failed by SetWorkloadStatus
101+
store.On("SetNodeStatus",
102+
mock.Anything,
103+
mock.Anything,
104+
mock.Anything,
105+
).Return(types.ErrBadCount).Once()
106+
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
107+
// success
108+
store.On("SetNodeStatus",
109+
mock.Anything,
110+
mock.Anything,
111+
mock.Anything,
112+
).Return(nil)
113+
assert.NoError(c.SetNodeStatus(ctx, node.Name, 10))
114+
}
115+
116+
func TestNodeStatusStream(t *testing.T) {
117+
assert := assert.New(t)
118+
c := NewTestCluster()
119+
ctx := context.Background()
120+
dataCh := make(chan *types.NodeStatus)
121+
store := c.store.(*storemocks.Store)
122+
123+
store.On("NodeStatusStream", mock.Anything).Return(dataCh)
124+
go func() {
125+
msg := &types.NodeStatus{
126+
Alive: true,
127+
}
128+
dataCh <- msg
129+
close(dataCh)
130+
}()
131+
132+
ch := c.NodeStatusStream(ctx)
133+
for c := range ch {
134+
assert.True(c.Alive)
135+
}
136+
}

cluster/cluster.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,18 @@ type Cluster interface {
5151
ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error)
5252
GetNode(ctx context.Context, nodename string) (*types.Node, error)
5353
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
54+
SetNodeStatus(ctx context.Context, nodename string, ttl int64) error
55+
NodeStatusStream(ctx context.Context) chan *types.NodeStatus
5456
// node resource
5557
NodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResource, error)
5658
// calculate capacity
5759
CalculateCapacity(context.Context, *types.DeployOptions) (*types.CapacityMessage, error)
5860
// meta workloads
59-
GetWorkload(ctx context.Context, ID string) (*types.Workload, error)
60-
GetWorkloads(ctx context.Context, IDs []string) ([]*types.Workload, error)
61+
GetWorkload(ctx context.Context, id string) (*types.Workload, error)
62+
GetWorkloads(ctx context.Context, ids []string) ([]*types.Workload, error)
6163
ListWorkloads(ctx context.Context, opts *types.ListWorkloadsOptions) ([]*types.Workload, error)
6264
ListNodeWorkloads(ctx context.Context, nodename string, labels map[string]string) ([]*types.Workload, error)
63-
GetWorkloadsStatus(ctx context.Context, IDs []string) ([]*types.StatusMeta, error)
65+
GetWorkloadsStatus(ctx context.Context, ids []string) ([]*types.StatusMeta, error)
6466
SetWorkloadsStatus(ctx context.Context, status []*types.StatusMeta, ttls map[string]int64) ([]*types.StatusMeta, error)
6567
WorkloadStatusStream(ctx context.Context, appname, entrypoint, nodename string, labels map[string]string) chan *types.WorkloadStatus
6668
// file methods
@@ -73,9 +75,9 @@ type Cluster interface {
7375
// workload methods
7476
CreateWorkload(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateWorkloadMessage, error)
7577
ReplaceWorkload(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceWorkloadMessage, error)
76-
RemoveWorkload(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error)
77-
DissociateWorkload(ctx context.Context, IDs []string) (chan *types.DissociateWorkloadMessage, error)
78-
ControlWorkload(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlWorkloadMessage, error)
78+
RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error)
79+
DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error)
80+
ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error)
7981
ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorkloadOptions, inCh <-chan []byte) chan *types.AttachWorkloadMessage
8082
ReallocResource(ctx context.Context, opts *types.ReallocOptions) error
8183
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)

0 commit comments

Comments
 (0)