Skip to content

Commit 1fa0f42

Browse files
committed
CPU prior with mem limit
1 parent 2881845 commit 1fa0f42

20 files changed

+251
-258
lines changed

cluster/calcium/common.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ type NodeContainers map[*types.Node][]*types.Container
2626
//NodeCPUMap store cpu and node info
2727
type NodeCPUMap map[*types.Node][]types.CPUMap
2828

29-
//CPUNodeContainers store cpu and nodecontainers
30-
type CPUNodeContainers map[float64]NodeContainers
29+
//CPUMemNodeContainers store cpu, mem and nodecontainers
30+
type CPUMemNodeContainers map[float64]map[int64]NodeContainers
3131

32-
//CPUNodeContainersMap store cpu and nodecpumap
33-
type CPUNodeContainersMap map[float64]NodeCPUMap
32+
//CPUMemNodeContainersMap store cpu, mem and nodecpumap
33+
type CPUMemNodeContainersMap map[float64]map[int64]NodeCPUMap
3434

3535
type imageBucket struct {
3636
sync.Mutex

cluster/calcium/create_container.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,16 @@ func (c *Calcium) CreateContainer(ctx context.Context, opts *types.DeployOptions
2727
return nil, err
2828
}
2929
log.Infof("[CreateContainer] Creating container with options: %v", opts)
30+
31+
// 4194304 Byte = 4 MB, docker 创建容器的内存最低标准
32+
// -1 means without limit
33+
if opts.Memory < minMemory && !opts.RawResource {
34+
return nil, fmt.Errorf("Minimum memory limit allowed is 4MB, got %d", opts.Memory)
35+
}
36+
if opts.Count <= 0 { // Count 要大于0
37+
return nil, fmt.Errorf("Count must be positive, got %d", opts.Count)
38+
}
39+
3040
if opts.RawResource || pod.Favor == scheduler.MEMORY_PRIOR {
3141
return c.createContainerWithMemoryPrior(ctx, opts)
3242
} else if pod.Favor == scheduler.CPU_PRIOR {
@@ -37,14 +47,6 @@ func (c *Calcium) CreateContainer(ctx context.Context, opts *types.DeployOptions
3747

3848
func (c *Calcium) createContainerWithMemoryPrior(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) {
3949
ch := make(chan *types.CreateContainerMessage)
40-
// 4194304 Byte = 4 MB, docker 创建容器的内存最低标准
41-
// -1 means without limit
42-
if opts.Memory < minMemory && !opts.RawResource {
43-
return ch, fmt.Errorf("Minimum memory limit allowed is 4MB, got %d", opts.Memory)
44-
}
45-
if opts.Count <= 0 { // Count 要大于0
46-
return ch, fmt.Errorf("Count must be positive, got %d", opts.Count)
47-
}
4850

4951
// TODO RFC 计算当前 app 部署情况的时候需要保证同一时间只有这个 app 的这个 entrypoint 在跑
5052
// 因此需要在这里加个全局锁,直到部署完毕才释放
@@ -87,7 +89,7 @@ func (c *Calcium) doCreateContainerWithMemoryPrior(ctx context.Context, nodeInfo
8789
for i := 0; i < nodeInfo.Deploy; i++ {
8890
ms[i] = &types.CreateContainerMessage{Error: err}
8991
if !opts.RawResource {
90-
if err := c.store.UpdateNodeMem(ctx, opts.Podname, nodeInfo.Name, opts.Memory, "+"); err != nil {
92+
if err := c.store.UpdateNodeResource(ctx, opts.Podname, nodeInfo.Name, types.CPUMap{}, opts.Memory, "+"); err != nil {
9193
log.Errorf("[doCreateContainerWithMemoryPrior] reset node memory failed %v", err)
9294
}
9395
}
@@ -153,7 +155,7 @@ func (c *Calcium) doCreateContainerWithCPUPrior(ctx context.Context, nodeName st
153155
log.Errorf("[doCreateContainerWithCPUPrior] Get and prepare node error %v", err)
154156
for i := 0; i < deployCount; i++ {
155157
ms[i] = &types.CreateContainerMessage{Error: err}
156-
if err := c.store.UpdateNodeCPU(ctx, opts.Podname, nodeName, cpuMap[i], "+"); err != nil {
158+
if err := c.store.UpdateNodeResource(ctx, opts.Podname, nodeName, cpuMap[i], opts.Memory, "+"); err != nil {
157159
log.Errorf("[doCreateContainerWithCPUPrior] update node CPU failed %v", err)
158160
}
159161
}
@@ -296,7 +298,7 @@ func (c *Calcium) makeContainerOptions(index int, quota types.CPUMap, opts *type
296298

297299
var resource enginecontainer.Resources
298300
if favor == scheduler.CPU_PRIOR {
299-
resource = makeCPUPriorSetting(c.config.Scheduler.ShareBase, quota)
301+
resource = makeCPUPriorSetting(c.config.Scheduler.ShareBase, quota, opts.Memory)
300302
} else if favor == scheduler.MEMORY_PRIOR {
301303
resource = makeMemoryPriorSetting(opts.Memory, opts.CPUQuota)
302304
} else {

cluster/calcium/helper.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,25 @@ func makeMemoryPriorSetting(memory int64, cpu float64) enginecontainer.Resources
5252
return resource
5353
}
5454

55-
func makeCPUPriorSetting(shareBase int64, quota types.CPUMap) enginecontainer.Resources {
55+
func makeCPUPriorSetting(shareBase int64, quota types.CPUMap, memory int64) enginecontainer.Resources {
5656
// calculate CPUShares and CPUSet
5757
// scheduler won't return more than 1 share quota
5858
// so the smallest share is the share numerator
5959
shareQuota := shareBase
6060
cpuIDs := []string{}
61-
for cpuid, share := range quota {
62-
cpuIDs = append(cpuIDs, cpuid)
61+
for cpuID, share := range quota {
62+
cpuIDs = append(cpuIDs, cpuID)
6363
if share < shareQuota {
6464
shareQuota = share
6565
}
6666
}
67-
cpuShares := int64(float64(shareQuota) / float64(shareQuota) * float64(CpuShareBase))
67+
cpuShares := int64(float64(shareQuota) / float64(shareBase) * float64(CpuShareBase))
6868
cpuSetCpus := strings.Join(cpuIDs, ",")
69+
log.Debugf("[makeCPUPriorSetting] CPU core %v CPU share %v Memory soft limit %v", cpuSetCpus, cpuShares, memory)
6970
resource := enginecontainer.Resources{
70-
CPUShares: cpuShares,
71-
CpusetCpus: cpuSetCpus,
71+
CPUShares: cpuShares,
72+
CpusetCpus: cpuSetCpus,
73+
MemoryReservation: memory,
7274
}
7375
return resource
7476
}

cluster/calcium/mock_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -423,9 +423,7 @@ func initMockConfig() {
423423
mockStore.On("GetNode", podname, "node3").Return(n3, nil)
424424
mockStore.On("GetNode", podname, updatenodename).Return(n4, nil)
425425
mockStore.On("GetNode", "", "").Return(n2, nil)
426-
427-
mockStore.On("UpdateNodeMem", podname, mockStringType, mock.AnythingOfType("int64"), mockStringType).Return(nil)
428-
mockStore.On("UpdateNodeCPU", podname, mockStringType, mockCPUMapType, mockStringType).Return(nil)
426+
mockStore.On("UpdateNodeResource", podname, mockStringType, mockCPUMapType, mock.AnythingOfType("int64"), mockStringType).Return(nil)
429427
mockStore.On("UpdateNode", mockNodeType).Return(nil)
430428

431429
lk := mockstore.MockLock{}

0 commit comments

Comments
 (0)