Skip to content

add SetNodeStatus and NodeStatusStream API #323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ GO_LDFLAGS ?= -s -X $(REPO_PATH)/version.REVISION=$(REVISION) \
-X $(REPO_PATH)/version.VERSION=$(VERSION)

grpc:
cd ./rpc/gen/; protoc --go_out=plugins=grpc:. core.proto
protoc --proto_path=./rpc/gen --go_out=plugins=grpc:./rpc/gen --go_opt=module=github.com/projecteru2/core/rpc/gen core.proto

deps:
env GO111MODULE=on go mod download
Expand All @@ -23,8 +23,10 @@ build: deps binary
test: deps unit-test

mock: deps
mockery -dir ./vendor/google.golang.org/grpc -name ServerStream -output 3rdmocks
mockery -dir vendor/github.com/docker/docker/client -name APIClient -output engine/docker/mocks
mockery --dir ./vendor/google.golang.org/grpc --name ServerStream --output 3rdmocks
mockery --dir vendor/github.com/docker/docker/client --name APIClient --output engine/docker/mocks
mockery --dir store --output store/mocks --name Store
mockery --dir cluster --output cluster/mocks --name Cluster

.ONESHELL:

Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ func withImageBuiltChannel(f func(chan *types.BuildImageMessage)) chan *types.Bu
return ch
}

func cleanupNodeImages(node *types.Node, IDs []string, ttl time.Duration) {
func cleanupNodeImages(node *types.Node, ids []string, ttl time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), ttl)
defer cancel()
for _, ID := range IDs {
if _, err := node.Engine.ImageRemove(ctx, ID, false, true); err != nil {
for _, id := range ids {
if _, err := node.Engine.ImageRemove(ctx, id, false, true); err != nil {
log.Errorf("[BuildImage] Remove image error: %s", err)
}
}
Expand Down
14 changes: 7 additions & 7 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ import (
)

// ControlWorkload control workloads status
func (c *Calcium) ControlWorkload(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) {
func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) {
ch := make(chan *types.ControlWorkloadMessage)

go func() {
defer close(ch)
wg := sync.WaitGroup{}
for _, ID := range IDs {
for _, id := range ids {
wg.Add(1)
go func(ID string) {
go func(id string) {
defer wg.Done()
var message []*bytes.Buffer
err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
var err error
switch t {
case cluster.WorkloadStop:
Expand All @@ -44,16 +44,16 @@ func (c *Calcium) ControlWorkload(ctx context.Context, IDs []string, t string, f
return types.ErrUnknownControlType
})
if err == nil {
log.Infof("[ControlWorkload] Workload %s %s", ID, t)
log.Infof("[ControlWorkload] Workload %s %s", id, t)
log.Info("[ControlWorkload] Hook Output:")
log.Info(string(utils.MergeHookOutputs(message)))
}
ch <- &types.ControlWorkloadMessage{
WorkloadID: ID,
WorkloadID: id,
Error: err,
Hook: message,
}
}(ID)
}(id)
}
wg.Wait()
}()
Expand Down
10 changes: 5 additions & 5 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
)

// DissociateWorkload dissociate workload from eru, return it resource but not modity it
func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *types.DissociateWorkloadMessage, error) {
func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error) {
ch := make(chan *types.DissociateWorkloadMessage)
go func() {
defer close(ch)
for _, ID := range IDs {
err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
for _, id := range ids {
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
return utils.Txn(
ctx,
Expand All @@ -35,9 +35,9 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *t
})
})
if err != nil {
log.Errorf("[DissociateWorkload] Dissociate workload %s failed, err: %v", ID, err)
log.Errorf("[DissociateWorkload] Dissociate workload %s failed, err: %v", id, err)
}
ch <- &types.DissociateWorkloadMessage{WorkloadID: ID, Error: err}
ch <- &types.DissociateWorkloadMessage{WorkloadID: id, Error: err}
}
}()
return ch, nil
Expand Down
10 changes: 5 additions & 5 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.Distrib
}
}

func (c *Calcium) withWorkloadLocked(ctx context.Context, ID string, f func(context.Context, *types.Workload) error) error {
return c.withWorkloadsLocked(ctx, []string{ID}, func(ctx context.Context, workloads map[string]*types.Workload) error {
if c, ok := workloads[ID]; ok {
func (c *Calcium) withWorkloadLocked(ctx context.Context, id string, f func(context.Context, *types.Workload) error) error {
return c.withWorkloadsLocked(ctx, []string{id}, func(ctx context.Context, workloads map[string]*types.Workload) error {
if c, ok := workloads[id]; ok {
return f(ctx, c)
}
return types.ErrWorkloadNotExists
Expand All @@ -53,11 +53,11 @@ func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(co
})
}

func (c *Calcium) withWorkloadsLocked(ctx context.Context, IDs []string, f func(context.Context, map[string]*types.Workload) error) error {
func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error {
workloads := map[string]*types.Workload{}
locks := map[string]lock.DistributedLock{}
defer func() { c.doUnlockAll(context.Background(), locks) }()
cs, err := c.GetWorkloads(ctx, IDs)
cs, err := c.GetWorkloads(ctx, ids)
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// RemoveWorkload remove workloads
// returns a channel that contains removing responses
func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
ch := make(chan *types.RemoveWorkloadMessage)
if step < 1 {
step = 1
Expand All @@ -24,12 +24,12 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool,
defer close(ch)
wg := sync.WaitGroup{}
defer wg.Wait()
for i, ID := range IDs {
for i, id := range ids {
wg.Add(1)
go func(ID string) {
go func(id string) {
defer wg.Done()
ret := &types.RemoveWorkloadMessage{WorkloadID: ID, Success: false, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
ret := &types.RemoveWorkloadMessage{WorkloadID: id, Success: false, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
return utils.Txn(
ctx,
Expand All @@ -48,13 +48,13 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool,
)
})
}); err != nil {
log.Errorf("[RemoveWorkload] Remove workload %s failed, err: %v", ID, err)
log.Errorf("[RemoveWorkload] Remove workload %s failed, err: %v", id, err)
ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error()))
} else {
ret.Success = true
}
ch <- ret
}(ID)
}(id)
if (i+1)%step == 0 {
log.Info("[RemoveWorkload] Wait for previous tasks done")
wg.Wait()
Expand Down Expand Up @@ -83,8 +83,8 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload
}

// 同步地删除容器, 在某些需要等待的场合异常有用!
func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, IDs []string) error {
ch, err := c.RemoveWorkload(ctx, IDs, true, 1)
func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, ids []string) error {
ch, err := c.RemoveWorkload(ctx, ids, true, 1)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions cluster/calcium/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
defer close(ch)
wg := &sync.WaitGroup{}

for _, ID := range opts.IDs {
log.Infof("[Send] Send files to %s", ID)
for _, id := range opts.IDs {
log.Infof("[Send] Send files to %s", id)
wg.Add(1)
go func(ID string) {
go func(id string) {
defer wg.Done()
if err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
for dst, content := range opts.Data {
err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, dst, bytes.NewBuffer(content), true, true)
ch <- &types.SendMessage{ID: ID, Path: dst, Error: err}
ch <- &types.SendMessage{ID: id, Path: dst, Error: err}
}
return nil
}); err != nil {
ch <- &types.SendMessage{ID: ID, Error: err}
ch <- &types.SendMessage{ID: id, Error: err}
}
}(ID)
}(id)
}
wg.Wait()
}()
Expand Down
15 changes: 15 additions & 0 deletions cluster/calcium/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,18 @@ func (c *Calcium) SetWorkloadsStatus(ctx context.Context, status []*types.Status
func (c *Calcium) WorkloadStatusStream(ctx context.Context, appname, entrypoint, nodename string, labels map[string]string) chan *types.WorkloadStatus {
return c.store.WorkloadStatusStream(ctx, appname, entrypoint, nodename, labels)
}

// SetNodeStatus set status of a node
// it's used to report whether a node is still alive
func (c *Calcium) SetNodeStatus(ctx context.Context, nodename string, ttl int64) error {
node, err := c.store.GetNode(ctx, nodename)
if err != nil {
return err
}
return c.store.SetNodeStatus(ctx, node, ttl)
}

// NodeStatusStream returns a stream of node status for subscribing
func (c *Calcium) NodeStatusStream(ctx context.Context) chan *types.NodeStatus {
return c.store.NodeStatusStream(ctx)
}
54 changes: 54 additions & 0 deletions cluster/calcium/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,57 @@ func TestWorkloadStatusStream(t *testing.T) {
assert.Equal(t, c.Delete, true)
}
}

func TestSetNodeStatus(t *testing.T) {
assert := assert.New(t)
c := NewTestCluster()
ctx := context.Background()
store := c.store.(*storemocks.Store)

node := &types.Node{
NodeMeta: types.NodeMeta{
Name: "testname",
Endpoint: "ep",
},
}
// failed
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrBadCount).Once()
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
// failed by SetWorkloadStatus
store.On("SetNodeStatus",
mock.Anything,
mock.Anything,
mock.Anything,
).Return(types.ErrBadCount).Once()
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
// success
store.On("SetNodeStatus",
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil)
assert.NoError(c.SetNodeStatus(ctx, node.Name, 10))
}

func TestNodeStatusStream(t *testing.T) {
assert := assert.New(t)
c := NewTestCluster()
ctx := context.Background()
dataCh := make(chan *types.NodeStatus)
store := c.store.(*storemocks.Store)

store.On("NodeStatusStream", mock.Anything).Return(dataCh)
go func() {
msg := &types.NodeStatus{
Alive: true,
}
dataCh <- msg
close(dataCh)
}()

ch := c.NodeStatusStream(ctx)
for c := range ch {
assert.True(c.Alive)
}
}
14 changes: 8 additions & 6 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ type Cluster interface {
ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error)
GetNode(ctx context.Context, nodename string) (*types.Node, error)
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
SetNodeStatus(ctx context.Context, nodename string, ttl int64) error
NodeStatusStream(ctx context.Context) chan *types.NodeStatus
// node resource
NodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResource, error)
// calculate capacity
CalculateCapacity(context.Context, *types.DeployOptions) (*types.CapacityMessage, error)
// meta workloads
GetWorkload(ctx context.Context, ID string) (*types.Workload, error)
GetWorkloads(ctx context.Context, IDs []string) ([]*types.Workload, error)
GetWorkload(ctx context.Context, id string) (*types.Workload, error)
GetWorkloads(ctx context.Context, ids []string) ([]*types.Workload, error)
ListWorkloads(ctx context.Context, opts *types.ListWorkloadsOptions) ([]*types.Workload, error)
ListNodeWorkloads(ctx context.Context, nodename string, labels map[string]string) ([]*types.Workload, error)
GetWorkloadsStatus(ctx context.Context, IDs []string) ([]*types.StatusMeta, error)
GetWorkloadsStatus(ctx context.Context, ids []string) ([]*types.StatusMeta, error)
SetWorkloadsStatus(ctx context.Context, status []*types.StatusMeta, ttls map[string]int64) ([]*types.StatusMeta, error)
WorkloadStatusStream(ctx context.Context, appname, entrypoint, nodename string, labels map[string]string) chan *types.WorkloadStatus
// file methods
Expand All @@ -73,9 +75,9 @@ type Cluster interface {
// workload methods
CreateWorkload(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateWorkloadMessage, error)
ReplaceWorkload(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceWorkloadMessage, error)
RemoveWorkload(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error)
DissociateWorkload(ctx context.Context, IDs []string) (chan *types.DissociateWorkloadMessage, error)
ControlWorkload(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlWorkloadMessage, error)
RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error)
DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error)
ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error)
ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorkloadOptions, inCh <-chan []byte) chan *types.AttachWorkloadMessage
ReallocResource(ctx context.Context, opts *types.ReallocOptions) error
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)
Expand Down
Loading