Skip to content

Commit acedc88

Browse files
committed
Merge branch 'timeout' of gitlab.ricebook.net:platform/core into timeout
2 parents 5c9f37c + acd67c6 commit acedc88

File tree

2 files changed

+57
-21
lines changed

2 files changed

+57
-21
lines changed

cluster/calcium/run_and_wait.go

+56-21
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package calcium
33
import (
44
"bufio"
55
"fmt"
6+
"sync"
7+
"time"
68

79
log "github.com/Sirupsen/logrus"
810
enginetypes "github.com/docker/docker/api/types"
@@ -11,6 +13,11 @@ import (
1113
"golang.org/x/net/context"
1214
)
1315

16+
// TODO 这里还是有个问题啊
17+
// 如果在这个等待的过程中重启了core, 那岂不是又有容器没有回收的?
18+
// 有啥方法避免这个问题么?
19+
const defaultWaitTimeout = 1200
20+
1421
func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan *types.RunAndWaitMessage, error) {
1522
ch := make(chan *types.RunAndWaitMessage)
1623

@@ -19,35 +26,51 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan
1926
entry.LogConfig = "json-file"
2027
specs.Entrypoints[opts.Entrypoint] = entry
2128

29+
// 默认给出1200秒的超时时间吧
30+
// 没别的地方好传了, 不如放这里好了, 不需要用的就默认0或者不传
31+
waitTimeout := entry.RunAndWaitTimeout
32+
if waitTimeout == 0 {
33+
waitTimeout = defaultWaitTimeout
34+
}
35+
36+
// 创建容器, 有问题就gg
2237
createChan, err := c.CreateContainer(specs, opts)
2338
if err != nil {
24-
log.Errorf("[RunAndWait] Create container error, %s", err.Error())
39+
log.Errorf("[RunAndWait] Create container error, %v", err)
2540
return ch, err
2641
}
2742

43+
// 来个goroutine处理剩下的事情
44+
// 基本上就是, attach拿日志写到channel, 以及等待容器结束后清理资源
2845
go func() {
29-
defer log.Info("[RunAndWait] Finish run and wait for containers")
30-
defer close(ch)
3146
logsOpts := enginetypes.ContainerLogsOptions{Follow: true, ShowStdout: true, ShowStderr: true}
47+
wg := sync.WaitGroup{}
3248

33-
ids := map[string]*types.Node{}
3449
for message := range createChan {
50+
// 可能不成功, 可能没有容器id, 但是其实第一项足够判断了
51+
// 不成功就无视掉
3552
if !message.Success || message.ContainerID == "" {
3653
log.Errorf("[RunAndWait] Create container error, %s", message.Error)
3754
continue
3855
}
3956

57+
// 找不到对应node也不管
58+
// 理论上不会这样
4059
node, err := c.store.GetNode(message.Podname, message.Nodename)
4160
if err != nil {
42-
log.Errorf("[RunAndWait] Can't find node, %s", err.Error())
61+
log.Errorf("[RunAndWait] Can't find node, %v", err)
4362
continue
4463
}
4564

46-
ids[message.ContainerID] = node
65+
// 加个task
66+
wg.Add(1)
67+
68+
// goroutine attach日志然后处理了写回给channel
69+
// 日志跟task无关, 不管wg
4770
go func(node *types.Node, containerID string) {
4871
resp, err := node.Engine.ContainerLogs(context.Background(), containerID, logsOpts)
4972
if err != nil {
50-
log.Errorf("[RunAndWait] Failed to get logs, %s", err.Error())
73+
log.Errorf("[RunAndWait] Failed to get logs, %v", err)
5174
return
5275
}
5376

@@ -60,25 +83,37 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan
6083
}
6184

6285
if err := scanner.Err(); err != nil {
63-
log.Errorf("[RunAndWait] Parse log failed, %s", err.Error())
86+
log.Errorf("[RunAndWait] Parse log failed, %v", err)
6487
return
6588
}
6689
}(node, message.ContainerID)
67-
}
6890

69-
rmids := []string{}
70-
for id, node := range ids {
71-
rmids = append(rmids, id)
72-
code, err := node.Engine.ContainerWait(context.Background(), id)
73-
exitData := []byte(fmt.Sprintf("[exitcode] %d", code))
74-
if err != nil {
75-
log.Errorf("%s run failed, %s", id[:12], err.Error())
76-
exitData = []byte(fmt.Sprintf("[exitcode]unknown %s", err.Error()))
77-
}
78-
ch <- &types.RunAndWaitMessage{ContainerID: id, Data: exitData}
79-
log.Infof("[RunAndWait] Container %s finished, remove", id[:12])
91+
// goroutine 等待容器结束后删除掉
92+
go func(node *types.Node, containerID string) {
93+
// 无论如何一定要删掉这个容器并且回收资源
94+
// 并且容器删除了才算是task完成了一个
95+
defer wg.Done()
96+
defer log.Infof("[RunAndWait] Container %s finished and removed", containerID[:12])
97+
defer c.RemoveContainer([]string{containerID})
98+
99+
// 给超时时间, 不能等个没完没了
100+
// 超时的时候返回的code会是1, 不过会被err给覆盖掉
101+
ctx, _ := context.WithTimeout(context.Background(), time.Duration(waitTimeout)*time.Second)
102+
code, err := node.Engine.ContainerWait(ctx, containerID)
103+
exitData := []byte(fmt.Sprintf("[exitcode] %d", code))
104+
if err != nil {
105+
log.Errorf("%s run failed, %v", containerID[:12], err)
106+
exitData = []byte(fmt.Sprintf("[exitcode]unknown %v", err))
107+
}
108+
109+
ch <- &types.RunAndWaitMessage{ContainerID: containerID, Data: exitData}
110+
}(node, message.ContainerID)
80111
}
81-
go c.RemoveContainer(rmids)
112+
113+
// 等待全部任务完成才可以关闭channel
114+
wg.Wait()
115+
log.Info("[RunAndWait] Finish run and wait for containers")
116+
close(ch)
82117
}()
83118

84119
return ch, nil

types/specs.go

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type Entrypoint struct {
3333
Privileged string `yaml:"privileged,omitempty"`
3434
LogConfig string `yaml:"log_config,omitempty"`
3535
WorkingDir string `yaml:"working_dir,omitempty"`
36+
RunAndWaitTimeout int `yaml:"run_and_wait_timeout"`
3637
}
3738

3839
// single bind

0 commit comments

Comments
 (0)