@@ -3,7 +3,6 @@ package calcium
3
3
import (
4
4
"bufio"
5
5
"fmt"
6
- "sync"
7
6
8
7
log "github.com/Sirupsen/logrus"
9
8
enginetypes "github.com/docker/docker/api/types"
@@ -27,14 +26,12 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan
27
26
}
28
27
29
28
go func () {
30
- wg := & sync.WaitGroup {}
31
29
defer log .Info ("[RunAndWait] Finish run and wait for containers" )
32
30
defer close (ch )
33
- defer wg .Wait ()
34
31
logsOpts := enginetypes.ContainerLogsOptions {Follow : true , ShowStdout : true , ShowStderr : true }
35
32
33
+ ids := map [string ]* types.Node {}
36
34
for message := range createChan {
37
- wg .Add (1 )
38
35
if message .ContainerID == "" {
39
36
log .Errorf ("[RunAndWait] Can't find container id %s" , err .Error ())
40
37
continue
@@ -46,9 +43,9 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan
46
43
continue
47
44
}
48
45
49
- go func ( node * types. Node , message * types. CreateContainerMessage ) {
50
- defer wg . Done ()
51
- resp , err := node .Engine .ContainerLogs (context .Background (), message . ContainerID , logsOpts )
46
+ ids [ message . ContainerID ] = node
47
+ go func ( node * types. Node , containerID string ) {
48
+ resp , err := node .Engine .ContainerLogs (context .Background (), containerID , logsOpts )
52
49
if err != nil {
53
50
log .Errorf ("[RunAndWait] Failed to get logs, %s" , err .Error ())
54
51
return
@@ -58,31 +55,30 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan
58
55
scanner := bufio .NewScanner (stream )
59
56
for scanner .Scan () {
60
57
data := scanner .Bytes ()
61
- ch <- & types.RunAndWaitMessage {ContainerID : message . ContainerID , Data : data }
62
- log .Debugf ("[RunAndWait] %s %s" , message . ContainerID [:12 ], data )
58
+ ch <- & types.RunAndWaitMessage {ContainerID : containerID , Data : data }
59
+ log .Debugf ("[RunAndWait] %s %s" , containerID [:12 ], data )
63
60
}
64
61
65
62
if err := scanner .Err (); err != nil {
66
63
log .Errorf ("[RunAndWait] Parse log failed, %s" , err .Error ())
67
64
return
68
65
}
66
+ }(node , message .ContainerID )
67
+ }
69
68
70
- container , err := c .GetContainer (message .ContainerID )
71
- if err != nil {
72
- log .Errorf ("[RunAndWait] Container not found, %s" , err .Error ())
73
- return
74
- }
75
-
76
- containerJSON , err := container .Inspect ()
77
- defer func () { go c .removeOneContainer (container , containerJSON ) }()
78
- exitData := []byte (fmt .Sprintf ("[exitcode] %d" , containerJSON .State .ExitCode ))
79
- if err != nil {
80
- exitData = []byte (fmt .Sprintf ("[exitcode]unknown %s" , err .Error ()))
81
- }
82
- ch <- & types.RunAndWaitMessage {ContainerID : message .ContainerID , Data : exitData }
83
- log .Infof ("[RunAndWait] Container %s finished, remove" , message .ContainerID )
84
- }(node , message )
69
+ rmids := []string {}
70
+ for id , node := range ids {
71
+ rmids = append (rmids , id )
72
+ code , err := node .Engine .ContainerWait (context .Background (), id )
73
+ exitData := []byte (fmt .Sprintf ("[exitcode] %d" , code ))
74
+ if err != nil {
75
+ log .Errorf ("%s run failed, %s" , id [:12 ], err .Error ())
76
+ exitData = []byte (fmt .Sprintf ("[exitcode]unknown %s" , err .Error ()))
77
+ }
78
+ ch <- & types.RunAndWaitMessage {ContainerID : id , Data : exitData }
79
+ log .Infof ("[RunAndWait] Container %s finished, remove" , id [:12 ])
85
80
}
81
+ go c .RemoveContainer (rmids )
86
82
}()
87
83
88
84
return ch , nil
0 commit comments