Skip to content

Commit 6d713b9

Browse files
committed
Merge branch 'run-and-wait' into 'master'
run and wait See merge request !69
2 parents 7789588 + 6baaa92 commit 6d713b9

File tree

11 files changed

+407
-111
lines changed

11 files changed

+407
-111
lines changed

cluster/calcium/create_container.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ func (c *calcium) CreateContainer(specs types.Specs, opts *types.DeployOptions)
2626
pod, _ := c.store.GetPod(opts.Podname)
2727
if pod.Scheduler == "CPU" {
2828
return c.createContainerWithScheduler(specs, opts)
29-
} else {
30-
return c.createContainerWithCPUPeriod(specs, opts)
3129
}
30+
return c.createContainerWithCPUPeriod(specs, opts)
3231
}
3332

3433
func (c *calcium) createContainerWithCPUPeriod(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) {

cluster/calcium/helper.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func makeMountPaths(specs types.Specs, config types.Config) ([]string, map[strin
140140
func runExec(client *engineapi.Client, container enginetypes.ContainerJSON, label string) error {
141141
cmd, ok := container.Config.Labels[label]
142142
if !ok || cmd == "" {
143-
log.Debug("No %s found in container %s", label, container.ID)
143+
log.Debugf("No %s found in container %s", label, container.ID)
144144
return nil
145145
}
146146

cluster/calcium/run_and_wait.go

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package calcium
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"sync"
7+
8+
log "github.com/Sirupsen/logrus"
9+
enginetypes "github.com/docker/docker/api/types"
10+
"gitlab.ricebook.net/platform/core/types"
11+
"golang.org/x/net/context"
12+
)
13+
14+
// FUCK DOCKER
15+
const PREFIXLEN int = 8
16+
17+
func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan *types.RunAndWaitMessage, error) {
18+
ch := make(chan *types.RunAndWaitMessage)
19+
20+
// 强制为 json-file 输出
21+
entry, _ := specs.Entrypoints[opts.Entrypoint]
22+
entry.LogConfig = "json-file"
23+
specs.Entrypoints[opts.Entrypoint] = entry
24+
25+
createChan, err := c.CreateContainer(specs, opts)
26+
if err != nil {
27+
log.Errorf("[RunAndWait] Create container error, %s", err.Error())
28+
return ch, err
29+
}
30+
31+
go func() {
32+
wg := &sync.WaitGroup{}
33+
defer log.Info("[RunAndWait] Finish run and wait for containers")
34+
defer close(ch)
35+
defer wg.Wait()
36+
logsOpts := enginetypes.ContainerLogsOptions{Follow: true, ShowStdout: true, ShowStderr: true}
37+
38+
for message := range createChan {
39+
wg.Add(1)
40+
if message.ContainerID == "" {
41+
log.Errorf("[RunAndWait] Can't find container id %s", err.Error())
42+
continue
43+
}
44+
45+
node, err := c.store.GetNode(message.Podname, message.Nodename)
46+
if err != nil {
47+
log.Errorf("[RunAndWait] Can't find node, %s", err.Error())
48+
continue
49+
}
50+
51+
go func(node *types.Node, message *types.CreateContainerMessage) {
52+
defer wg.Done()
53+
resp, err := node.Engine.ContainerLogs(context.Background(), message.ContainerID, logsOpts)
54+
if err != nil {
55+
data := fmt.Sprintf("[RunAndWait] Failed to get logs, %s", err.Error())
56+
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: data}
57+
return
58+
}
59+
60+
scanner := bufio.NewScanner(resp)
61+
for scanner.Scan() {
62+
data := scanner.Bytes()[PREFIXLEN:]
63+
log.Debugf("[RunAndWait] %s %s", message.ContainerID[:12], data)
64+
m := &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: string(data)}
65+
ch <- m
66+
}
67+
68+
if err := scanner.Err(); err != nil {
69+
data := fmt.Sprintf("[RunAndWait] Parse log failed, %s", err.Error())
70+
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: data}
71+
return
72+
}
73+
74+
container, err := c.GetContainer(message.ContainerID)
75+
if err != nil {
76+
data := fmt.Sprintf("[RunAndWait] Container not found, %s", err.Error())
77+
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: data}
78+
return
79+
}
80+
defer c.removeOneContainer(container)
81+
82+
containerJSON, err := container.Inspect()
83+
if err == nil {
84+
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: fmt.Sprintf("[exitcode] %d", containerJSON.State.ExitCode)}
85+
} else {
86+
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: fmt.Sprintf("[exitcode]unknown %s", err.Error())}
87+
}
88+
log.Infof("[RunAndWait] Container %s finished, remove", message.ContainerID)
89+
}(node, message)
90+
}
91+
}()
92+
93+
return ch, nil
94+
}

cluster/cluster.go

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Cluster interface {
2121
// cluster methods
2222
BuildImage(repository, version, uid, artifact string) (chan *types.BuildImageMessage, error)
2323
CreateContainer(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error)
24+
RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan *types.RunAndWaitMessage, error)
2425
UpgradeContainer(ids []string, image string) (chan *types.UpgradeContainerMessage, error)
2526
RemoveContainer(ids []string) (chan *types.RemoveContainerMessage, error)
2627
RemoveImage(podname, nodename string, images []string) (chan *types.RemoveImageMessage, error)

0 commit comments

Comments
 (0)