Skip to content

Commit c41e2cf

Browse files
committed
add log stream for getting log from one container
1 parent 129d2f0 commit c41e2cf

13 files changed

+642
-309
lines changed

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
19.07.06
1+
19.07.07

cluster/calcium/log.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package calcium
2+
3+
import (
4+
"bufio"
5+
"context"
6+
7+
"github.com/projecteru2/core/types"
8+
)
9+
10+
// LogStream log stream for one container
11+
func (c *Calcium) LogStream(ctx context.Context, ID string) (chan *types.LogStreamMessage, error) {
12+
ch := make(chan *types.LogStreamMessage)
13+
go func() {
14+
defer close(ch)
15+
container, err := c.store.GetContainer(ctx, ID)
16+
if err != nil {
17+
ch <- &types.LogStreamMessage{ID: ID, Error: err}
18+
return
19+
}
20+
21+
resp, err := container.Engine.VirtualizationLogs(ctx, ID, true, true, true)
22+
if err != nil {
23+
ch <- &types.LogStreamMessage{ID: ID, Error: err}
24+
return
25+
}
26+
27+
scanner := bufio.NewScanner(resp)
28+
for scanner.Scan() {
29+
data := scanner.Bytes()
30+
ch <- &types.LogStreamMessage{ID: ID, Data: data}
31+
}
32+
}()
33+
return ch, nil
34+
}

cluster/calcium/log_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package calcium
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
10+
"github.com/stretchr/testify/mock"
11+
12+
enginemocks "github.com/projecteru2/core/engine/mocks"
13+
storemocks "github.com/projecteru2/core/store/mocks"
14+
"github.com/projecteru2/core/types"
15+
)
16+
17+
func TestLogStream(t *testing.T) {
18+
c := NewTestCluster()
19+
store := c.store.(*storemocks.Store)
20+
engine := &enginemocks.API{}
21+
ID := "test"
22+
container := &types.Container{
23+
ID: ID,
24+
Engine: engine,
25+
}
26+
ctx := context.Background()
27+
// failed by GetContainer
28+
store.On("GetContainer", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
29+
ch, err := c.LogStream(ctx, ID)
30+
assert.NoError(t, err)
31+
for c := range ch {
32+
assert.Equal(t, c.ID, ID)
33+
assert.Empty(t, c.Data)
34+
}
35+
store.On("GetContainer", mock.Anything, mock.Anything).Return(container, nil)
36+
// failed by VirtualizationLogs
37+
engine.On("VirtualizationLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNodeExist).Once()
38+
ch, err = c.LogStream(ctx, ID)
39+
assert.NoError(t, err)
40+
for c := range ch {
41+
assert.Equal(t, c.ID, ID)
42+
assert.Empty(t, c.Data)
43+
}
44+
reader := bytes.NewBufferString("aaaa\nbbbb\n")
45+
engine.On("VirtualizationLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(reader, nil)
46+
// success
47+
ch, err = c.LogStream(ctx, ID)
48+
assert.NoError(t, err)
49+
for c := range ch {
50+
assert.Equal(t, c.ID, ID)
51+
assert.NotEmpty(t, c.Data)
52+
}
53+
}

cluster/cluster.go

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type Cluster interface {
8383
RemoveContainer(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveContainerMessage, error)
8484
DissociateContainer(ctx context.Context, IDs []string) (chan *types.DissociateContainerMessage, error)
8585
ReplaceContainer(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceContainerMessage, error)
86+
LogStream(ctx context.Context, ID string) (chan *types.LogStreamMessage, error)
8687
// finalizer
8788
Finalizer()
8889
}

rpc/gen/core.pb.go

+397-280
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/gen/core.proto

+8-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ service CoreRPC {
3636
rpc DissociateContainer(DissociateContainerOptions) returns (stream DissociateContainerMessage) {};
3737
rpc ControlContainer(ControlContainerOptions) returns (stream ControlContainerMessage) {};
3838
rpc ReallocResource(ReallocOptions) returns (stream ReallocResourceMessage) {};
39+
rpc LogStream(ContainerID) returns (stream LogStreamMessage) {};
3940
}
4041

4142
message Empty {}
@@ -430,7 +431,7 @@ message CopyMessage {
430431
message SendMessage {
431432
string id = 1;
432433
string path = 2;
433-
string err = 3;
434+
string error = 3;
434435
}
435436

436437
message RunAndWaitOptions{
@@ -448,3 +449,9 @@ message ControlContainerMessage {
448449
string error = 2;
449450
bytes hook = 3;
450451
}
452+
453+
message LogStreamMessage {
454+
string id = 1;
455+
string error = 2;
456+
bytes data = 3;
457+
}

rpc/gen/core_pb2.py

+73-11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/gen/core_pb2_grpc.py

+17
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@ def __init__(self, channel):
169169
request_serializer=core__pb2.ReallocOptions.SerializeToString,
170170
response_deserializer=core__pb2.ReallocResourceMessage.FromString,
171171
)
172+
self.LogStream = channel.unary_stream(
173+
'/pb.CoreRPC/LogStream',
174+
request_serializer=core__pb2.ContainerID.SerializeToString,
175+
response_deserializer=core__pb2.LogStreamMessage.FromString,
176+
)
172177

173178

174179
class CoreRPCServicer(object):
@@ -392,6 +397,13 @@ def ReallocResource(self, request, context):
392397
context.set_details('Method not implemented!')
393398
raise NotImplementedError('Method not implemented!')
394399

400+
def LogStream(self, request, context):
401+
# missing associated documentation comment in .proto file
402+
pass
403+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
404+
context.set_details('Method not implemented!')
405+
raise NotImplementedError('Method not implemented!')
406+
395407

396408
def add_CoreRPCServicer_to_server(servicer, server):
397409
rpc_method_handlers = {
@@ -550,6 +562,11 @@ def add_CoreRPCServicer_to_server(servicer, server):
550562
request_deserializer=core__pb2.ReallocOptions.FromString,
551563
response_serializer=core__pb2.ReallocResourceMessage.SerializeToString,
552564
),
565+
'LogStream': grpc.unary_stream_rpc_method_handler(
566+
servicer.LogStream,
567+
request_deserializer=core__pb2.ContainerID.FromString,
568+
response_serializer=core__pb2.LogStreamMessage.SerializeToString,
569+
),
553570
}
554571
generic_handler = grpc.method_handlers_generic_handler(
555572
'pb.CoreRPC', rpc_method_handlers)

rpc/rpc.go

+28-3
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ func (v *Vibranium) Send(opts *pb.SendOptions, stream pb.CoreRPC_SendServer) err
300300
}
301301

302302
if m.Error != nil {
303-
msg.Err = m.Error.Error()
303+
msg.Error = m.Error.Error()
304304
}
305305

306306
if err := stream.Send(msg); err != nil {
@@ -381,8 +381,8 @@ func (v *Vibranium) DeployStatus(opts *pb.DeployStatusOptions, stream pb.CoreRPC
381381
if !ok {
382382
return nil
383383
}
384-
if m.Err != nil {
385-
return m.Err
384+
if m.Error != nil {
385+
return m.Error
386386
}
387387
if err := stream.Send(&pb.DeployStatusMessage{
388388
Action: m.Action,
@@ -620,6 +620,31 @@ func (v *Vibranium) ReallocResource(opts *pb.ReallocOptions, stream pb.CoreRPC_R
620620
return err
621621
}
622622

623+
// LogStream get container logs
624+
func (v *Vibranium) LogStream(opts *pb.ContainerID, stream pb.CoreRPC_LogStreamServer) error {
625+
ID := opts.GetId()
626+
log.Infof("[LogStream] Get %s log start", ID)
627+
defer log.Infof("[LogStream] Get %s log done", ID)
628+
ch, err := v.cluster.LogStream(stream.Context(), ID)
629+
if err != nil {
630+
return err
631+
}
632+
633+
for {
634+
select {
635+
case m, ok := <-ch:
636+
if !ok {
637+
return nil
638+
}
639+
if err = stream.Send(toRPCLogStreamMessage(m)); err != nil {
640+
v.logUnsentMessages("LogStream", m)
641+
}
642+
case <-v.rpcch:
643+
return nil
644+
}
645+
}
646+
}
647+
623648
// GetNodeByName get node by name
624649
func (v *Vibranium) GetNodeByName(ctx context.Context, opts *pb.GetNodeOptions) (*pb.Node, error) {
625650
n, err := v.cluster.GetNodeByName(ctx, opts.Nodename)

rpc/transform.go

+21-11
Original file line numberDiff line numberDiff line change
@@ -239,17 +239,6 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) {
239239
}, nil
240240
}
241241

242-
func cleanTmpDataFile(data map[string]string) error {
243-
var err error
244-
for _, src := range data {
245-
err = os.RemoveAll(src)
246-
if err != nil {
247-
log.Errorf("[cleanTmpDataFile] clean temp files failed %v", err)
248-
}
249-
}
250-
return err
251-
}
252-
253242
func toRPCCreateContainerMessage(c *types.CreateContainerMessage) *pb.CreateContainerMessage {
254243
if c == nil {
255244
return nil
@@ -409,6 +398,17 @@ func toRPCContainer(ctx context.Context, c *types.Container) (*pb.Container, err
409398
}, nil
410399
}
411400

401+
func toRPCLogStreamMessage(msg *types.LogStreamMessage) *pb.LogStreamMessage {
402+
r := &pb.LogStreamMessage{
403+
Id: msg.ID,
404+
Data: msg.Data,
405+
}
406+
if msg.Error != nil {
407+
r.Error = msg.Error.Error()
408+
}
409+
return r
410+
}
411+
412412
func makeTempTarFiles(data map[string][]byte) (map[string]string, error) {
413413
tarFiles := map[string]string{}
414414
for path, data := range data {
@@ -423,3 +423,13 @@ func makeTempTarFiles(data map[string][]byte) (map[string]string, error) {
423423
}
424424
return tarFiles, nil
425425
}
426+
427+
func cleanTmpDataFile(data map[string]string) error {
428+
var err error
429+
for _, src := range data {
430+
if err = os.RemoveAll(src); err != nil {
431+
log.Errorf("[cleanTmpDataFile] clean temp files failed %v", err)
432+
}
433+
}
434+
return err
435+
}

store/etcdv3/container.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (m *Mercury) WatchDeployStatus(ctx context.Context, appname, entrypoint, no
167167
msg := &types.DeployStatus{}
168168
if resp.Err() != nil {
169169
if !resp.Canceled {
170-
msg.Err = resp.Err()
170+
msg.Error = resp.Err()
171171
ch <- msg
172172
}
173173
return

types/container.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (c *Container) Remove(ctx context.Context) error {
7070
// DeployStatus store deploy status
7171
type DeployStatus struct {
7272
Data string
73-
Err error
73+
Error error
7474
Action string
7575
Appname string
7676
Entrypoint string

types/message.go

+7
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,10 @@ type ReallocResourceMessage struct {
110110
ContainerID string
111111
Success bool
112112
}
113+
114+
// LogStreamMessage for log stream
115+
type LogStreamMessage struct {
116+
ID string
117+
Error error
118+
Data []byte
119+
}

0 commit comments

Comments
 (0)