Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support excluding nodes when creating workloads #365

Merged
merged 5 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
Total: 0,
NodeCapacities: map[string]int{},
}
return msg, c.withNodesLocked(ctx, opts.Podname, opts.Nodenames, nil, false, func(ctx context.Context, nodeMap map[string]*types.Node) error {

nf := types.NodeFilter{
Podname: opts.Podname,
Includes: opts.Nodenames,
All: false,
}
return msg, c.withNodesLocked(ctx, nf, func(ctx context.Context, nodeMap map[string]*types.Node) error {
if opts.DeployStrategy != strategy.Dummy {
if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
logger.Errorf("[Calcium.CalculateCapacity] doAllocResource failed: %+v", err)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

// if: alloc resources
func(ctx context.Context) error {
return c.withNodesLocked(ctx, opts.Podname, opts.Nodenames, opts.NodeLabels, false, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) {
return c.withNodesLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) {
defer func() {
if err != nil {
ch <- &types.CreateWorkloadMessage{Error: logger.Err(err)}
Expand Down
17 changes: 12 additions & 5 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (c *Calcium) withWorkloadLocked(ctx context.Context, id string, f func(cont
}

func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
return c.withNodesLocked(ctx, "", []string{nodename}, nil, true, func(ctx context.Context, nodes map[string]*types.Node) error {
nf := types.NodeFilter{
Includes: []string{nodename},
All: true,
}
return c.withNodesLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
if n, ok := nodes[nodename]; ok {
return f(ctx, n)
}
Expand Down Expand Up @@ -76,19 +80,22 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
return f(ctx, workloads)
}

func (c *Calcium) withNodesLocked(ctx context.Context, podname string, nodenames []string, labels map[string]string, all bool, f func(context.Context, map[string]*types.Node) error) error {
// withNodesLocked will using NodeFilter `nf` to filter nodes
// and lock the corresponding nodes for the callback function `f` to use
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
nodes := map[string]*types.Node{}
locks := map[string]lock.DistributedLock{}
defer log.Debugf("[withNodesLocked] Nodes %+v unlocked", nodenames)
defer log.Debugf("[withNodesLocked] Nodes %+v unlocked", nf)
defer c.doUnlockAll(context.Background(), locks)
ns, err := c.getNodes(ctx, podname, nodenames, labels, all)

ns, err := c.filterNodes(ctx, nf)
if err != nil {
return err
}

var lock lock.DistributedLock
for _, n := range ns {
lock, ctx, err = c.doLock(ctx, fmt.Sprintf(cluster.NodeLock, podname, n.Name), c.config.LockTimeout)
lock, ctx, err = c.doLock(ctx, fmt.Sprintf(cluster.NodeLock, n.Podname, n.Name), c.config.LockTimeout)
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions cluster/calcium/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ func TestWithNodesLocked(t *testing.T) {
}
// failed by list nodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once()
err := c.withNodesLocked(ctx, "test", nil, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err := c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once()
// failed by filter
var ns map[string]*types.Node
err = c.withNodesLocked(ctx, "test", nil, map[string]string{"eru": "2"}, false, func(ctx context.Context, nodes map[string]*types.Node) error {
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
ns = nodes
return nil
})
Expand All @@ -146,7 +146,7 @@ func TestWithNodesLocked(t *testing.T) {
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil)
// failed by getnode
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once()
// failed by lock
Expand All @@ -155,16 +155,16 @@ func TestWithNodesLocked(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)
// failed to get lock
lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once()
err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error {
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
assert.Len(t, nodes, 1)
return nil
})
Expand Down
41 changes: 41 additions & 0 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,44 @@ func (c *Calcium) getNodes(ctx context.Context, podname string, nodenames []stri
}
return ns, err
}

// filterNodes filters nodes using NodeFilter nf
// the filtering logic is introduced along with NodeFilter
// NOTE: when nf.Includes is set, they don't need to belong to podname
func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) ([]*types.Node, error) {
var (
err error
ns = []*types.Node{}
)
if len(nf.Includes) != 0 {
for _, nodename := range nf.Includes {
node, err := c.GetNode(ctx, nodename)
if err != nil {
return ns, err
}
ns = append(ns, node)
}
} else {
listedNodes, err := c.ListPodNodes(ctx, nf.Podname, nf.Labels, nf.All)
if err != nil {
return nil, err
}

if len(nf.Excludes) != 0 {
excludes := map[string]struct{}{}
for _, n := range nf.Excludes {
excludes[n] = struct{}{}
}

for _, n := range listedNodes {
if _, ok := excludes[n.Name]; ok {
continue
}
ns = append(ns, n)
}
} else {
ns = listedNodes
}
}
return ns, err
}
55 changes: 55 additions & 0 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/pkg/errors"
lockmocks "github.com/projecteru2/core/lock/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -260,3 +261,57 @@ func TestSetNode(t *testing.T) {
assert.Equal(t, n.Volume["/sda0"], int64(5))
assert.Equal(t, n.Volume["/sda2"], int64(19))
}

func TestFilterNodes(t *testing.T) {
assert := assert.New(t)
c := NewTestCluster()
store := c.store.(*storemocks.Store)
nodes := []*types.Node{
{
NodeMeta: types.NodeMeta{Name: "A"},
},
{
NodeMeta: types.NodeMeta{Name: "B"},
},
{
NodeMeta: types.NodeMeta{Name: "C"},
},
{
NodeMeta: types.NodeMeta{Name: "D"},
},
}

// error
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail to list pod nodes")).Once()
_, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{}, Excludes: []string{"A", "X"}})
assert.Error(err)

// empty nodenames, non-empty excludeNodenames
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil).Once()
nodes1, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{}, Excludes: []string{"A", "B"}})
assert.NoError(err)
assert.Equal("C", nodes1[0].Name)
assert.Equal("D", nodes1[1].Name)

// empty nodenames, empty excludeNodenames
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil).Once()
nodes2, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{}, Excludes: []string{}})
assert.NoError(err)
assert.Equal(4, len(nodes2))

// non-empty nodenames, empty excludeNodenames
store.On("GetNode", mock.Anything, "O").Return(&types.Node{NodeMeta: types.NodeMeta{Name: "O"}}, nil).Once()
store.On("GetNode", mock.Anything, "P").Return(&types.Node{NodeMeta: types.NodeMeta{Name: "P"}}, nil).Once()
nodes3, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{"O", "P"}, Excludes: []string{}})
assert.NoError(err)
assert.Equal("O", nodes3[0].Name)
assert.Equal("P", nodes3[1].Name)

// non-empty nodenames
store.On("GetNode", mock.Anything, "X").Return(&types.Node{NodeMeta: types.NodeMeta{Name: "X"}}, nil).Once()
store.On("GetNode", mock.Anything, "Y").Return(&types.Node{NodeMeta: types.NodeMeta{Name: "Y"}}, nil).Once()
nodes4, err := c.filterNodes(context.Background(), types.NodeFilter{Includes: []string{"X", "Y"}, Excludes: []string{"A", "B"}})
assert.NoError(err)
assert.Equal("X", nodes4[0].Name)
assert.Equal("Y", nodes4[1].Name)
}
7 changes: 6 additions & 1 deletion cluster/calcium/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ func (c *Calcium) RemovePod(ctx context.Context, podname string) error {
if podname == "" {
return logger.Err(errors.WithStack(types.ErrEmptyPodName))
}
return c.withNodesLocked(ctx, podname, []string{}, nil, true, func(ctx context.Context, nodes map[string]*types.Node) error {

nf := types.NodeFilter{
Podname: podname,
All: true,
}
return c.withNodesLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
// TODO dissociate workload to node
// TODO should remove node first
return logger.Err(errors.WithStack(c.store.RemovePod(ctx, podname)))
Expand Down
Loading