Skip to content

Commit 29447e9

Browse files
author
tonic
committed
Merge branch 'dev' into 'master'
update: new CreateContainer 1. add a benchmark test for ave alloc 2. a new CreateContainer api, use cpu-period as resource See merge request !8
2 parents 9dd2627 + af2fa60 commit 29447e9

File tree

6 files changed

+351
-44
lines changed

6 files changed

+351
-44
lines changed

cluster/calcium/cluster.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package calcium
22

33
import (
4-
"fmt"
5-
64
"gitlab.ricebook.net/platform/core/network"
75
"gitlab.ricebook.net/platform/core/network/calico"
86
"gitlab.ricebook.net/platform/core/scheduler"
@@ -24,21 +22,22 @@ type calcium struct {
2422
}
2523

2624
func New(config types.Config) (*calcium, error) {
25+
var err error
2726
store, err := etcdstore.New(config)
2827
if err != nil {
2928
return nil, err
3029
}
3130

3231
var scheduler scheduler.Scheduler
33-
if config.Scheduler.Type == "simple" {
34-
scheduler = simplescheduler.New()
35-
} else if config.Scheduler.Type == "complex" {
36-
scheduler, err = complexscheduler.New(config)
37-
if err != nil {
38-
return nil, err
32+
if config.ResourceAlloc == "scheduler" {
33+
if config.Scheduler.Type == "simple" {
34+
scheduler = simplescheduler.New()
35+
} else if config.Scheduler.Type == "complex" {
36+
scheduler, err = complexscheduler.New(config)
37+
if err != nil {
38+
return nil, err
39+
}
3940
}
40-
} else {
41-
return nil, fmt.Errorf("Wrong type for scheduler: either simple or complex")
4241
}
4342

4443
titanium := calico.New()

cluster/calcium/create_container.go

+196-30
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,148 @@ import (
2222
// Use specs and options to create
2323
// TODO what about networks?
2424
func (c *calcium) CreateContainer(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) {
25+
if c.config.ResourceAlloc == "scheduler" {
26+
return c.createContainerWithScheduler(specs, opts)
27+
} else {
28+
return c.createContainerWithCPUPeriod(specs, opts)
29+
}
30+
}
31+
32+
func (c *calcium) createContainerWithCPUPeriod(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) {
33+
ch := make(chan *types.CreateContainerMessage)
34+
35+
cpumap, _, err := c.getCPUMap(opts.Podname, opts.Nodename, 1.0)
36+
if err != nil {
37+
return ch, err
38+
}
39+
nodesInfo := utils.GetNodesInfo(cpumap)
40+
41+
cpuQuota := int(opts.CPUQuota * float64(utils.CpuPeriodBase))
42+
plan, err := utils.AllocContainerPlan(nodesInfo, cpuQuota, opts.Count)
43+
if err != nil {
44+
return ch, err
45+
}
46+
47+
go func() {
48+
wg := sync.WaitGroup{}
49+
wg.Add(len(plan))
50+
for nodename, num := range plan {
51+
go func(nodename string, num int, opts *types.DeployOptions) {
52+
defer wg.Done()
53+
for _, m := range c.doCreateContainerWithCPUPeriod(nodename, num, opts.CPUQuota, specs, opts) {
54+
ch <- m
55+
}
56+
}(nodename, num, opts)
57+
}
58+
wg.Wait()
59+
close(ch)
60+
}()
61+
62+
return ch, nil
63+
}
64+
65+
func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, quota float64, specs types.Specs, opts *types.DeployOptions) []*types.CreateContainerMessage {
66+
ms := make([]*types.CreateContainerMessage, connum)
67+
for i := 0; i < len(ms); i++ {
68+
ms[i] = &types.CreateContainerMessage{}
69+
}
70+
71+
node, err := c.GetNode(opts.Podname, nodename)
72+
if err != nil {
73+
return ms
74+
}
75+
76+
if err := pullImage(node, opts.Image); err != nil {
77+
return ms
78+
}
79+
80+
for i := 0; i < connum; i++ {
81+
config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(nil, specs, opts, "cpuperiod")
82+
if err != nil {
83+
log.Errorf("error when creating CreateContainerOptions, %v", err)
84+
ms[i].Error = err.Error()
85+
continue
86+
}
87+
88+
//create container
89+
container, err := node.Engine.ContainerCreate(context.Background(), config, hostConfig, networkConfig, containerName)
90+
if err != nil {
91+
log.Errorf("error when creating container, %v", err)
92+
ms[i].Error = err.Error()
93+
continue
94+
}
95+
96+
// connect container to network
97+
// if network manager uses docker plugin, then connect must be called before container starts
98+
if c.network.Type() == "plugin" {
99+
ctx := utils.ToDockerContext(node.Engine)
100+
breaked := false
101+
102+
// need to ensure all networks are correctly connected
103+
for networkID, ipv4 := range opts.Networks {
104+
if err = c.network.ConnectToNetwork(ctx, container.ID, networkID, ipv4); err != nil {
105+
log.Errorf("error when connecting container %q to network %q, %q", container.ID, networkID, err.Error())
106+
breaked = true
107+
break
108+
}
109+
}
110+
111+
// remove bridge network
112+
// only when user defined networks is given
113+
if len(opts.Networks) != 0 {
114+
if err := c.network.DisconnectFromNetwork(ctx, container.ID, "bridge"); err != nil {
115+
log.Errorf("error when disconnecting container %q from network %q, %q", container.ID, "bridge", err.Error())
116+
}
117+
}
118+
119+
// if any break occurs, then this container needs to be removed
120+
if breaked {
121+
ms[i].Error = err.Error()
122+
go node.Engine.ContainerRemove(context.Background(), container.ID, enginetypes.ContainerRemoveOptions{})
123+
continue
124+
}
125+
}
126+
127+
err = node.Engine.ContainerStart(context.Background(), container.ID, enginetypes.ContainerStartOptions{})
128+
if err != nil {
129+
log.Errorf("error when starting container, %v", err)
130+
ms[i].Error = err.Error()
131+
go node.Engine.ContainerRemove(context.Background(), container.ID, enginetypes.ContainerRemoveOptions{})
132+
continue
133+
}
134+
135+
// TODO
136+
// if network manager uses our own, then connect must be called after container starts
137+
// here
138+
139+
info, err := node.Engine.ContainerInspect(context.Background(), container.ID)
140+
if err != nil {
141+
log.Errorf("error when inspecting container, %v", err)
142+
ms[i].Error = err.Error()
143+
continue
144+
}
145+
146+
_, err = c.store.AddContainer(info.ID, opts.Podname, node.Name, containerName, nil)
147+
if err != nil {
148+
ms[i].Error = err.Error()
149+
continue
150+
}
151+
152+
ms[i] = &types.CreateContainerMessage{
153+
Podname: opts.Podname,
154+
Nodename: node.Name,
155+
ContainerID: info.ID,
156+
ContainerName: containerName,
157+
Error: "",
158+
Success: true,
159+
CPU: nil,
160+
}
161+
162+
}
163+
return ms
164+
}
165+
166+
func (c *calcium) createContainerWithScheduler(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) {
25167
ch := make(chan *types.CreateContainerMessage)
26168

27169
result, err := c.prepareNodes(opts.Podname, opts.Nodename, opts.CPUQuota, opts.Count)
@@ -50,7 +192,7 @@ func (c *calcium) CreateContainer(specs types.Specs, opts *types.DeployOptions)
50192
go func(nodename string, cpumap []types.CPUMap, opts *types.DeployOptions) {
51193
defer wg.Done()
52194

53-
for _, m := range c.doCreateContainer(nodename, cpumap, specs, opts) {
195+
for _, m := range c.doCreateContainerWithScheduler(nodename, cpumap, specs, opts) {
54196
ch <- m
55197
}
56198
}(nodename, cpumap, opts)
@@ -71,18 +213,14 @@ func makeCPUMap(nodes []*types.Node) map[string]types.CPUMap {
71213
return r
72214
}
73215

74-
// Prepare nodes for deployment.
75-
// Later if any error occurs, these nodes can be restored.
76-
func (c *calcium) prepareNodes(podname, nodename string, quota float64, num int) (map[string][]types.CPUMap, error) {
77-
result := make(map[string][]types.CPUMap)
78-
79-
// use podname as lock key to prevent scheduling on the same node at one time
216+
func (c *calcium) getCPUMap(podname, nodename string, quota float64) (map[string]types.CPUMap, []*types.Node, error) {
217+
result := make(map[string]types.CPUMap)
80218
lock, err := c.store.CreateLock(podname, 30)
81219
if err != nil {
82-
return result, err
220+
return result, nil, err
83221
}
84222
if err := lock.Lock(); err != nil {
85-
return result, err
223+
return result, nil, err
86224
}
87225
defer lock.Unlock()
88226

@@ -92,28 +230,40 @@ func (c *calcium) prepareNodes(podname, nodename string, quota float64, num int)
92230
if nodename == "" {
93231
nodes, err = c.ListPodNodes(podname)
94232
if err != nil {
95-
return result, err
233+
return result, nil, err
96234
}
97235
} else {
98236
n, err := c.GetNode(podname, nodename)
99237
if err != nil {
100-
return result, err
238+
return result, nil, err
101239
}
102240
nodes = append(nodes, n)
103241
}
104242

105-
// if public, use only public nodes
106243
if quota == 0 { // 因为要考虑quota=0.5这种需求,所以这里有点麻烦
107244
nodes = filterNodes(nodes, true)
108245
} else {
109246
nodes = filterNodes(nodes, false)
110247
}
111248

112249
if len(nodes) == 0 {
113-
return result, fmt.Errorf("No available nodes")
250+
return result, nil, fmt.Errorf("No available nodes")
114251
}
115252

116-
cpumap := makeCPUMap(nodes)
253+
result = makeCPUMap(nodes)
254+
return result, nodes, nil
255+
}
256+
257+
// Prepare nodes for deployment.
258+
// Later if any error occurs, these nodes can be restored.
259+
func (c *calcium) prepareNodes(podname, nodename string, quota float64, num int) (map[string][]types.CPUMap, error) {
260+
result := make(map[string][]types.CPUMap)
261+
262+
cpumap, nodes, err := c.getCPUMap(podname, nodename, quota)
263+
if err != nil {
264+
return result, err
265+
}
266+
// use podname as lock key to prevent scheduling on the same node at one time
117267
result, changed, err := c.scheduler.SelectNodes(cpumap, quota, num) // 这个接口统一使用float64了
118268
if err != nil {
119269
return result, err
@@ -167,7 +317,7 @@ func pullImage(node *types.Node, image string) error {
167317
return nil
168318
}
169319

170-
func (c *calcium) doCreateContainer(nodename string, cpumap []types.CPUMap, specs types.Specs, opts *types.DeployOptions) []*types.CreateContainerMessage {
320+
func (c *calcium) doCreateContainerWithScheduler(nodename string, cpumap []types.CPUMap, specs types.Specs, opts *types.DeployOptions) []*types.CreateContainerMessage {
171321
ms := make([]*types.CreateContainerMessage, len(cpumap))
172322
for i := 0; i < len(ms); i++ {
173323
ms[i] = &types.CreateContainerMessage{}
@@ -184,7 +334,7 @@ func (c *calcium) doCreateContainer(nodename string, cpumap []types.CPUMap, spec
184334

185335
for i, quota := range cpumap {
186336
// create options
187-
config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(quota, specs, opts)
337+
config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(quota, specs, opts, "scheduler")
188338
if err != nil {
189339
log.Errorf("error when creating CreateContainerOptions, %v", err)
190340
ms[i].Error = err.Error()
@@ -285,7 +435,7 @@ func (c *calcium) releaseQuota(node *types.Node, quota types.CPUMap) {
285435
c.store.UpdateNodeCPU(node.Podname, node.Name, quota, "+")
286436
}
287437

288-
func (c *calcium) makeContainerOptions(quota map[string]int, specs types.Specs, opts *types.DeployOptions) (
438+
func (c *calcium) makeContainerOptions(quota map[string]int, specs types.Specs, opts *types.DeployOptions, optionMode string) (
289439
*enginecontainer.Config,
290440
*enginecontainer.HostConfig,
291441
*enginenetwork.NetworkingConfig,
@@ -322,16 +472,20 @@ func (c *calcium) makeContainerOptions(quota map[string]int, specs types.Specs,
322472
// calculate CPUShares and CPUSet
323473
// scheduler won't return more than 1 share quota
324474
// so the smallest share is the share numerator
325-
shareQuota := 10
326-
labels := []string{}
327-
for label, share := range quota {
328-
labels = append(labels, label)
329-
if share < shareQuota {
330-
shareQuota = share
475+
var cpuShares int64
476+
var cpuSetCpus string
477+
if optionMode == "scheduler" {
478+
shareQuota := 10
479+
labels := []string{}
480+
for label, share := range quota {
481+
labels = append(labels, label)
482+
if share < shareQuota {
483+
shareQuota = share
484+
}
331485
}
486+
cpuShares = int64(float64(shareQuota) / float64(10) * float64(1024))
487+
cpuSetCpus = strings.Join(labels, ",")
332488
}
333-
cpuShares := int64(float64(shareQuota) / float64(10) * float64(1024))
334-
cpuSetCpus := strings.Join(labels, ",")
335489

336490
// env
337491
env := append(opts.Env, fmt.Sprintf("APP_NAME=%s", specs.Appname))
@@ -425,6 +579,22 @@ func (c *calcium) makeContainerOptions(quota map[string]int, specs types.Specs,
425579
NetworkDisabled: false,
426580
Labels: containerLabels,
427581
}
582+
583+
var resource enginecontainer.Resources
584+
if optionMode == "scheduler" {
585+
resource = enginecontainer.Resources{
586+
CPUShares: cpuShares,
587+
CpusetCpus: cpuSetCpus,
588+
Ulimits: ulimits,
589+
}
590+
} else {
591+
resource = enginecontainer.Resources{
592+
CPUPeriod: utils.CpuPeriodBase,
593+
CPUQuota: int64(opts.CPUQuota * float64(utils.CpuPeriodBase)),
594+
Ulimits: ulimits,
595+
}
596+
}
597+
428598
hostConfig := &enginecontainer.HostConfig{
429599
Binds: binds,
430600
LogConfig: enginecontainer.LogConfig{Type: logConfig},
@@ -433,11 +603,7 @@ func (c *calcium) makeContainerOptions(quota map[string]int, specs types.Specs,
433603
CapAdd: engineslice.StrSlice(capAdd),
434604
ExtraHosts: entry.ExtraHosts,
435605
Privileged: entry.Privileged != "",
436-
Resources: enginecontainer.Resources{
437-
CPUShares: cpuShares,
438-
CpusetCpus: cpuSetCpus,
439-
Ulimits: ulimits,
440-
},
606+
Resources: resource,
441607
}
442608
// this is empty because we don't use any plugin for Docker
443609
// networkConfig := &enginenetwork.NetworkingConfig{

devtools/client.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def _get_stub(ctx):
2020

2121
@click.group()
2222
@click.option('--grpc-host', default='localhost', show_default=True)
23-
@click.option('--grpc-port', default=5000, show_default=True, type=int)
23+
@click.option('--grpc-port', default=5001, show_default=True, type=int)
2424
@click.pass_context
2525
def cli(ctx, grpc_host, grpc_port):
2626
ctx.obj['grpc_host'] = grpc_host
@@ -121,7 +121,7 @@ def get_node(ctx, podname, nodename):
121121
@click.pass_context
122122
def add_node(ctx, nodename, endpoint, podname, certs, public):
123123
stub = _get_stub(ctx)
124-
124+
125125
cafile, certfile, keyfile = '', '', ''
126126
if certs:
127127
certs = os.path.abspath(certs)

0 commit comments

Comments
 (0)