Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add validation #298

Merged
merged 4 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions cluster/calcium/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,29 @@ import (

// Copy uses VirtualizationCopyFrom cp to copy specified things and send to remote
func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *types.CopyMessage, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
ch := make(chan *types.CopyMessage)
go func() {
defer close(ch)
wg := sync.WaitGroup{}
log.Infof("[Copy] Copy %d workloads files", len(opts.Targets))
// workload one by one
for ID, paths := range opts.Targets {
for id, paths := range opts.Targets {
wg.Add(1)
go func(ID string, paths []string) {
go func(id string, paths []string) {
defer wg.Done()
if err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error {
if err := c.withWorkloadLocked(ctx, id, func(workload *types.Workload) error {
for _, path := range paths {
resp, name, err := workload.Engine.VirtualizationCopyFrom(ctx, workload.ID, path)
ch <- makeCopyMessage(ID, name, path, err, resp)
ch <- makeCopyMessage(id, name, path, err, resp)
}
return nil
}); err != nil {
ch <- makeCopyMessage(ID, "", "", err, nil)
ch <- makeCopyMessage(id, "", "", err, nil)
}
}(ID, paths)
}(id, paths)
}
wg.Wait()
}()
Expand Down
6 changes: 6 additions & 0 deletions cluster/calcium/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
func TestCopy(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()

_, err := c.Copy(ctx, &types.CopyOptions{
Targets: map[string][]string{},
})
assert.Error(t, err)

opts := &types.CopyOptions{
Targets: map[string][]string{
"cid": {
Expand Down
4 changes: 4 additions & 0 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (

// CreateWorkload use options to create workloads
func (c *Calcium) CreateWorkload(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateWorkloadMessage, error) {
if err := opts.Validate(); err != nil {
return nil, err
}

opts.ProcessIdent = utils.RandomString(16)
log.Infof("[CreateWorkload %s] Creating workload with options:", opts.ProcessIdent)
litter.Dump(opts)
Expand Down
38 changes: 32 additions & 6 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,47 @@ import (
func TestCreateWorkload(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
opts := &types.DeployOptions{}
opts := &types.DeployOptions{
Name: "deployname",
Podname: "somepod",
Image: "image:todeploy",
Count: 1,
Entrypoint: &types.Entrypoint{
Name: "some-nice-entrypoint",
},
}
store := c.store.(*storemocks.Store)

store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)

// failed by GetPod
store.On("GetPod", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
// failed by validating
opts.Name = ""
_, err := c.CreateWorkload(ctx, opts)
assert.Error(t, err)
store.On("GetPod", mock.Anything, mock.Anything).Return(&types.Pod{Name: "test"}, nil)
// failed by count
opts.Name = "deployname"

opts.Podname = ""
_, err = c.CreateWorkload(ctx, opts)
assert.Error(t, err)
opts.Podname = "somepod"

opts.Image = ""
_, err = c.CreateWorkload(ctx, opts)
assert.Error(t, err)
opts.Image = "image:todeploy"

opts.Count = 0
_, err = c.CreateWorkload(ctx, opts)
assert.Error(t, err)
opts.Count = 1

opts.Entrypoint.Name = "bad_entry_name"
_, err = c.CreateWorkload(ctx, opts)
assert.Error(t, err)
opts.Entrypoint.Name = "some-nice-entrypoint"

// failed by memory check
opts.ResourceOpts = types.ResourceOptions{MemoryLimit: -1}
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
Expand All @@ -61,12 +84,15 @@ func TestCreateWorkloadTxn(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
opts := &types.DeployOptions{
Name: "zc:name",
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
ResourceOpts: types.ResourceOptions{CPUQuotaLimit: 1},
Image: "zc:test",
Entrypoint: &types.Entrypoint{},
Entrypoint: &types.Entrypoint{
Name: "good-entrypoint",
},
}
store := &storemocks.Store{}
sche := &schedulermocks.Scheduler{}
Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ func TestExecuteWorkload(t *testing.T) {
c.store = store
// failed by GetWorkload
store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, types.ErrBadCount).Once()
ch := c.ExecuteWorkload(ctx, &types.ExecuteWorkloadOptions{}, nil)
ID := "abc"
ch := c.ExecuteWorkload(ctx, &types.ExecuteWorkloadOptions{WorkloadID: ID}, nil)
for ac := range ch {
assert.NotEmpty(t, ac.Data)
}
engine := &enginemocks.API{}
ID := "abc"
workload := &types.Workload{
ID: ID,
Engine: engine,
}
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
// failed by Execute
engine.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(ID, nil, nil, types.ErrCannotGetEngine).Once()
ch = c.ExecuteWorkload(ctx, &types.ExecuteWorkloadOptions{}, nil)
ch = c.ExecuteWorkload(ctx, &types.ExecuteWorkloadOptions{WorkloadID: ID}, nil)
for ac := range ch {
assert.Equal(t, ac.WorkloadID, ID)
}
Expand Down
42 changes: 23 additions & 19 deletions cluster/calcium/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,23 @@ import (
)

// RemoveImage remove images
func (c *Calcium) RemoveImage(ctx context.Context, podname string, nodenames []string, images []string, step int, prune bool) (chan *types.RemoveImageMessage, error) {
ch := make(chan *types.RemoveImageMessage)
if step < 1 {
step = 1
func (c *Calcium) RemoveImage(ctx context.Context, opts *types.ImageOptions) (chan *types.RemoveImageMessage, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
opts.Normalize()

nodes, err := c.getNodes(ctx, podname, nodenames, nil, false)
nodes, err := c.getNodes(ctx, opts.Podname, opts.Nodenames, nil, false)
if err != nil {
return ch, err
return nil, err
}

if len(nodes) == 0 {
return nil, types.ErrPodNoNodes
}

ch := make(chan *types.RemoveImageMessage)

go func() {
defer close(ch)
wg := sync.WaitGroup{}
Expand All @@ -33,7 +35,7 @@ func (c *Calcium) RemoveImage(ctx context.Context, podname string, nodenames []s
wg.Add(1)
go func(node *types.Node) {
defer wg.Done()
for _, image := range images {
for _, image := range opts.Images {
m := &types.RemoveImageMessage{
Success: false,
Image: image,
Expand All @@ -49,15 +51,15 @@ func (c *Calcium) RemoveImage(ctx context.Context, podname string, nodenames []s
}
ch <- m
}
if prune {
if opts.Prune {
if err := node.Engine.ImagesPrune(ctx); err != nil {
log.Errorf("[RemoveImage] Prune %s pod %s node failed: %v", podname, node.Name, err)
log.Errorf("[RemoveImage] Prune %s pod %s node failed: %v", opts.Podname, node.Name, err)
} else {
log.Infof("[RemoveImage] Prune %s pod %s node", podname, node.Name)
log.Infof("[RemoveImage] Prune %s pod %s node", opts.Podname, node.Name)
}
}
}(node)
if (i+1)%step == 0 {
if (i+1)%opts.Step == 0 {
log.Info("[RemoveImage] Wait for previous cleaner done")
wg.Wait()
}
Expand All @@ -70,21 +72,23 @@ func (c *Calcium) RemoveImage(ctx context.Context, podname string, nodenames []s
// CacheImage cache Image
// 在podname上cache这个image
// 实际上就是在所有的node上去pull一次
func (c *Calcium) CacheImage(ctx context.Context, podname string, nodenames []string, images []string, step int) (chan *types.CacheImageMessage, error) {
ch := make(chan *types.CacheImageMessage)
if step < 1 {
step = 1
func (c *Calcium) CacheImage(ctx context.Context, opts *types.ImageOptions) (chan *types.CacheImageMessage, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
opts.Normalize()

nodes, err := c.getNodes(ctx, podname, nodenames, nil, false)
nodes, err := c.getNodes(ctx, opts.Podname, opts.Nodenames, nil, false)
if err != nil {
return ch, err
return nil, err
}

if len(nodes) == 0 {
return nil, types.ErrPodNoNodes
}

ch := make(chan *types.CacheImageMessage)

go func() {
defer close(ch)
wg := sync.WaitGroup{}
Expand All @@ -93,7 +97,7 @@ func (c *Calcium) CacheImage(ctx context.Context, podname string, nodenames []st
wg.Add(1)
go func(node *types.Node) {
defer wg.Done()
for _, image := range images {
for _, image := range opts.Images {
m := &types.CacheImageMessage{
Image: image,
Success: true,
Expand All @@ -107,7 +111,7 @@ func (c *Calcium) CacheImage(ctx context.Context, podname string, nodenames []st
ch <- m
}
}(node)
if (i+1)%step == 0 {
if (i+1)%opts.Step == 0 {
log.Info("[CacheImage] Wait for puller cleaner done")
wg.Wait()
}
Expand Down
24 changes: 15 additions & 9 deletions cluster/calcium/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ func TestRemoveImage(t *testing.T) {
ctx := context.Background()
store := &storemocks.Store{}
c.store = store
// fail by validating
_, err := c.RemoveImage(ctx, &types.ImageOptions{Podname: ""})
assert.Error(t, err)
// fail by get nodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrBadCount).Once()
_, err := c.RemoveImage(ctx, "", nil, []string{}, 0, false)
_, err = c.RemoveImage(ctx, &types.ImageOptions{Podname: "podname"})
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once()
// fail 0 nodes
_, err = c.RemoveImage(ctx, "", nil, []string{}, 0, false)
_, err = c.RemoveImage(ctx, &types.ImageOptions{Podname: "podname"})
assert.Error(t, err)
engine := &enginemocks.API{}
nodes := []*types.Node{
Expand All @@ -38,19 +41,19 @@ func TestRemoveImage(t *testing.T) {
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
// fail remove
engine.On("ImageRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrBadCount).Once()
ch, err := c.RemoveImage(ctx, "", nil, []string{"xx"}, 0, false)
ch, err := c.RemoveImage(ctx, &types.ImageOptions{Podname: "podname", Images: []string{"xx"}})
for c := range ch {
assert.False(t, c.Success)
}
engine.On("ImageRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"xx"}, nil)
// sucess remove but prune fail
engine.On("ImagesPrune", mock.Anything).Return(types.ErrBadStorage).Once()
ch, err = c.RemoveImage(ctx, "", nil, []string{"xx"}, 0, true)
ch, err = c.RemoveImage(ctx, &types.ImageOptions{Podname: "podname", Images: []string{"xx"}})
for c := range ch {
assert.True(t, c.Success)
}
engine.On("ImagesPrune", mock.Anything).Return(nil)
ch, err = c.RemoveImage(ctx, "", nil, []string{"xx"}, 0, true)
ch, err = c.RemoveImage(ctx, &types.ImageOptions{Podname: "podname", Images: []string{"xx"}})
for c := range ch {
assert.True(t, c.Success)
}
Expand All @@ -61,13 +64,16 @@ func TestCacheImage(t *testing.T) {
ctx := context.Background()
store := &storemocks.Store{}
c.store = store
// fail by validating
_, err := c.CacheImage(ctx, &types.ImageOptions{Podname: ""})
assert.Error(t, err)
// fail by get nodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrBadCount).Once()
_, err := c.CacheImage(ctx, "", nil, []string{}, 0)
_, err = c.CacheImage(ctx, &types.ImageOptions{Podname: "podname"})
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once()
// fail 0 nodes
_, err = c.CacheImage(ctx, "", nil, []string{}, 0)
_, err = c.CacheImage(ctx, &types.ImageOptions{Podname: "podname"})
assert.Error(t, err)
engine := &enginemocks.API{}
nodes := []*types.Node{
Expand All @@ -83,15 +89,15 @@ func TestCacheImage(t *testing.T) {
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("", types.ErrNoETCD).Once()
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err := c.CacheImage(ctx, "", nil, []string{"xx"}, 0)
ch, err := c.CacheImage(ctx, &types.ImageOptions{Podname: "podname", Images: []string{"xx"}})
for c := range ch {
assert.False(t, c.Success)
}
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("yy", nil)
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{"xx"}, nil)
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(ioutil.NopCloser(bytes.NewReader([]byte{})), nil)
// succ
ch, err = c.CacheImage(ctx, "", nil, []string{"xx"}, 0)
ch, err = c.CacheImage(ctx, &types.ImageOptions{Podname: "podname", Images: []string{"xx"}})
for c := range ch {
assert.True(t, c.Success)
}
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const exitDataPrefix = "[exitcode] "

// RunAndWait implement lambda
func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
opts.Lambda = true
// count = 1 && OpenStdin
if opts.OpenStdin && (opts.Count != 1 || opts.DeployStrategy != strategy.Auto) {
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, IDs []string, f func(
func (c *Calcium) withNodesLocked(ctx context.Context, podname string, nodenames []string, labels map[string]string, all bool, f func(nodes map[string]*types.Node) error) error {
nodes := map[string]*types.Node{}
locks := map[string]lock.DistributedLock{}
defer func() { c.doUnlockAll(ctx, locks) }()
defer c.doUnlockAll(ctx, locks)
ns, err := c.getNodes(ctx, podname, nodenames, labels, all)
if err != nil {
return err
Expand Down
12 changes: 12 additions & 0 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ import (

// AddNode adds a node
func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*types.Node, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
opts.Normalize()
return c.store.AddNode(ctx, opts)
}

// RemoveNode remove a node
func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
if nodename == "" {
return types.ErrEmptyNodeName
}
return c.withNodeLocked(ctx, nodename, func(node *types.Node) error {
ws, err := c.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
Expand All @@ -35,11 +41,17 @@ func (c *Calcium) ListPodNodes(ctx context.Context, podname string, labels map[s

// GetNode get node
func (c *Calcium) GetNode(ctx context.Context, nodename string) (*types.Node, error) {
if nodename == "" {
return nil, types.ErrEmptyNodeName
}
return c.store.GetNode(ctx, nodename)
}

// SetNode set node available or not
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) { // nolint
if err := opts.Validate(); err != nil {
return nil, err
}
var n *types.Node
return n, c.withNodeLocked(ctx, opts.Nodename, func(node *types.Node) error {
litter.Dump(opts)
Expand Down
Loading