Skip to content

add SetNodeStatus and NodeStatusStream API #323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ GO_LDFLAGS ?= -s -X $(REPO_PATH)/version.REVISION=$(REVISION) \
-X $(REPO_PATH)/version.VERSION=$(VERSION)

grpc:
cd ./rpc/gen/; protoc --go_out=plugins=grpc:. core.proto
protoc --proto_path=./rpc/gen --go_out=plugins=grpc:./rpc/gen --go_opt=module=github.com/projecteru2/core/rpc/gen core.proto

deps:
env GO111MODULE=on go mod download
Expand All @@ -23,8 +23,10 @@ build: deps binary
test: deps unit-test

mock: deps
mockery -dir ./vendor/google.golang.org/grpc -name ServerStream -output 3rdmocks
mockery -dir vendor/github.com/docker/docker/client -name APIClient -output engine/docker/mocks
mockery --dir ./vendor/google.golang.org/grpc --name ServerStream --output 3rdmocks
mockery --dir vendor/github.com/docker/docker/client --name APIClient --output engine/docker/mocks
mockery --dir store --output store/mocks --name Store
mockery --dir cluster --output cluster/mocks --name Cluster

.ONESHELL:

Expand Down
15 changes: 15 additions & 0 deletions cluster/calcium/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,18 @@ func (c *Calcium) SetWorkloadsStatus(ctx context.Context, status []*types.Status
func (c *Calcium) WorkloadStatusStream(ctx context.Context, appname, entrypoint, nodename string, labels map[string]string) chan *types.WorkloadStatus {
return c.store.WorkloadStatusStream(ctx, appname, entrypoint, nodename, labels)
}

// SetNodeStatus set status of a node
// it's used to report whether a node is still alive
func (c *Calcium) SetNodeStatus(ctx context.Context, nodename string, ttl int64) error {
node, err := c.store.GetNode(ctx, nodename)
if err != nil {
return err
}
return c.store.SetNodeStatus(ctx, node, ttl)
}

// NodeStatusStream returns a stream of node status for subscribing
func (c *Calcium) NodeStatusStream(ctx context.Context) chan *types.NodeStatus {
return c.store.NodeStatusStream(ctx)
}
54 changes: 54 additions & 0 deletions cluster/calcium/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,57 @@ func TestWorkloadStatusStream(t *testing.T) {
assert.Equal(t, c.Delete, true)
}
}

func TestSetNodeStatus(t *testing.T) {
assert := assert.New(t)
c := NewTestCluster()
ctx := context.Background()
store := c.store.(*storemocks.Store)

node := &types.Node{
NodeMeta: types.NodeMeta{
Name: "testname",
Endpoint: "ep",
},
}
// failed
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrBadCount).Once()
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
// failed by SetWorkloadStatus
store.On("SetNodeStatus",
mock.Anything,
mock.Anything,
mock.Anything,
).Return(types.ErrBadCount).Once()
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
// success
store.On("SetNodeStatus",
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil)
assert.NoError(c.SetNodeStatus(ctx, node.Name, 10))
}

func TestNodeStatusStream(t *testing.T) {
assert := assert.New(t)
c := NewTestCluster()
ctx := context.Background()
dataCh := make(chan *types.NodeStatus)
store := c.store.(*storemocks.Store)

store.On("NodeStatusStream", mock.Anything).Return(dataCh)
go func() {
msg := &types.NodeStatus{
Alive: true,
}
dataCh <- msg
close(dataCh)
}()

ch := c.NodeStatusStream(ctx)
for c := range ch {
assert.True(c.Alive)
}
}
14 changes: 8 additions & 6 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ type Cluster interface {
ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error)
GetNode(ctx context.Context, nodename string) (*types.Node, error)
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
SetNodeStatus(ctx context.Context, nodename string, ttl int64) error
NodeStatusStream(ctx context.Context) chan *types.NodeStatus
// node resource
NodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResource, error)
// calculate capacity
CalculateCapacity(context.Context, *types.DeployOptions) (*types.CapacityMessage, error)
// meta workloads
GetWorkload(ctx context.Context, ID string) (*types.Workload, error)
GetWorkloads(ctx context.Context, IDs []string) ([]*types.Workload, error)
GetWorkload(ctx context.Context, id string) (*types.Workload, error)
GetWorkloads(ctx context.Context, ids []string) ([]*types.Workload, error)
ListWorkloads(ctx context.Context, opts *types.ListWorkloadsOptions) ([]*types.Workload, error)
ListNodeWorkloads(ctx context.Context, nodename string, labels map[string]string) ([]*types.Workload, error)
GetWorkloadsStatus(ctx context.Context, IDs []string) ([]*types.StatusMeta, error)
GetWorkloadsStatus(ctx context.Context, ids []string) ([]*types.StatusMeta, error)
SetWorkloadsStatus(ctx context.Context, status []*types.StatusMeta, ttls map[string]int64) ([]*types.StatusMeta, error)
WorkloadStatusStream(ctx context.Context, appname, entrypoint, nodename string, labels map[string]string) chan *types.WorkloadStatus
// file methods
Expand All @@ -73,9 +75,9 @@ type Cluster interface {
// workload methods
CreateWorkload(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateWorkloadMessage, error)
ReplaceWorkload(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceWorkloadMessage, error)
RemoveWorkload(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error)
DissociateWorkload(ctx context.Context, IDs []string) (chan *types.DissociateWorkloadMessage, error)
ControlWorkload(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlWorkloadMessage, error)
RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error)
DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error)
ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error)
ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorkloadOptions, inCh <-chan []byte) chan *types.AttachWorkloadMessage
ReallocResource(ctx context.Context, opts *types.ReallocOptions) error
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)
Expand Down
92 changes: 61 additions & 31 deletions cluster/mocks/Cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading