Skip to content

Commit 497af2f

Browse files
committed
add grpc tests
1 parent e559fef commit 497af2f

File tree

10 files changed

+648
-15
lines changed

10 files changed

+648
-15
lines changed

cluster/calcium/cluster.go

+4
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ func New(config types.Config) (*calcium, error) {
4141

4242
return &calcium{store: store, config: config, scheduler: scheduler, network: titanium, source: source}, nil
4343
}
44+
45+
func (c *calcium) ResetSotre(s store.Store) {
46+
c.store = s
47+
}

cluster/calcium/create_container.go

+4
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,10 @@ func (c *calcium) UpgradeContainer(ids []string, image string) (chan *types.Upgr
591591
return ch, fmt.Errorf("No container ids given")
592592
}
593593

594+
if image == "" {
595+
return ch, fmt.Errorf("Image is empty")
596+
}
597+
594598
containers, err := c.GetContainers(ids)
595599
if err != nil {
596600
return ch, err

rpc/counter.go

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func (v *vibranium) taskAdd(name string, verbose bool) {
1616
log.Infof("task [%s] added", name)
1717
}
1818
v.counter.Add(1)
19+
v.TaskNum++
1920
}
2021

2122
// 完成一个任务, 在任务执行完之后调用一次.
@@ -25,6 +26,7 @@ func (v *vibranium) taskDone(name string, verbose bool) {
2526
log.Infof("task [%s] done", name)
2627
}
2728
v.counter.Done()
29+
v.TaskNum--
2830
}
2931

3032
// 会在外面graceful之后调用.

rpc/rpc.go

+17-9
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type vibranium struct {
1818
cluster cluster.Cluster
1919
config types.Config
2020
counter sync.WaitGroup
21+
TaskNum int
2122
}
2223

2324
// Implementations for grpc server interface
@@ -70,8 +71,7 @@ func (v *vibranium) AddNode(ctx context.Context, opts *pb.AddNodeOptions) (*pb.N
7071
return toRPCNode(n, v.cluster.GetZone()), nil
7172
}
7273

73-
// AddNode saves a node and returns it to client
74-
// Method must be called synchronously, or nothing will be returned
74+
// RemoveNode removes the node from etcd
7575
func (v *vibranium) RemoveNode(ctx context.Context, opts *pb.RemoveNodeOptions) (*pb.Pod, error) {
7676
p, err := v.cluster.RemoveNode(opts.Nodename, opts.Podname)
7777
if err != nil {
@@ -143,11 +143,16 @@ func (v *vibranium) GetContainers(ctx context.Context, cids *pb.ContainerIDs) (*
143143
for _, c := range containers {
144144
info, err := c.Inspect()
145145
if err != nil {
146+
// catch这个error(Container因为某种原因inspect失败的话),防止其在etcd中变成脏数据
147+
log.Errorf("[GetContainers] Inspect container error: %s", err)
148+
errorInfo := fmt.Sprintf("Inspect container error: %s", err)
149+
cs = append(cs, toRPCContainer(c, errorInfo))
146150
continue
147151
}
148152

149153
bytes, err := json.Marshal(info)
150154
if err != nil {
155+
log.Errorf("[GetContainers] Marshal info json error: %s", err)
151156
continue
152157
}
153158

@@ -182,6 +187,9 @@ func (v *vibranium) SetNodeAvailable(ctx context.Context, opts *pb.NodeAvailable
182187
// streamed returned functions
183188
// caller must ensure that timeout will not be too short because these actions take a little time
184189
func (v *vibranium) BuildImage(opts *pb.BuildImageOptions, stream pb.CoreRPC_BuildImageServer) error {
190+
v.taskAdd("BuildImage", true)
191+
defer v.taskDone("BuildImage", true)
192+
185193
ch, err := v.cluster.BuildImage(opts.Repo, opts.Version, opts.Uid, opts.Artifact)
186194
if err != nil {
187195
return err
@@ -202,6 +210,7 @@ func (v *vibranium) BuildImage(opts *pb.BuildImageOptions, stream pb.CoreRPC_Bui
202210

203211
func (v *vibranium) CreateContainer(opts *pb.DeployOptions, stream pb.CoreRPC_CreateContainerServer) error {
204212
v.taskAdd("CreateContainer", true)
213+
defer v.taskDone("CreateContainer", true)
205214

206215
specs, err := types.LoadSpecs(opts.Specs)
207216
if err != nil {
@@ -216,7 +225,6 @@ func (v *vibranium) CreateContainer(opts *pb.DeployOptions, stream pb.CoreRPC_Cr
216225
for m := range ch {
217226
if err := stream.Send(toRPCCreateContainerMessage(m)); err != nil {
218227
go func() {
219-
defer v.taskDone("CreateContainer", true)
220228
for r := range ch {
221229
log.Infof("[CreateContainer] Unsent streamed message: %v", r)
222230
}
@@ -225,7 +233,6 @@ func (v *vibranium) CreateContainer(opts *pb.DeployOptions, stream pb.CoreRPC_Cr
225233
}
226234
}
227235

228-
v.taskDone("CreateContainer", true)
229236
return nil
230237
}
231238

@@ -249,8 +256,8 @@ func (v *vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error {
249256
defer stdinReader.Close()
250257
ch, err := v.cluster.RunAndWait(specs, toCoreDeployOptions(opts), stdinReader)
251258
if err != nil {
259+
// `ch` is nil now
252260
log.Errorf("[RunAndWait] Start run and wait failed %s", err)
253-
stream.Send(toRPCRunAndWaitMessage(<-ch))
254261
return err
255262
}
256263

@@ -293,6 +300,7 @@ func (v *vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error {
293300

294301
func (v *vibranium) UpgradeContainer(opts *pb.UpgradeOptions, stream pb.CoreRPC_UpgradeContainerServer) error {
295302
v.taskAdd("UpgradeContainer", true)
303+
defer v.taskDone("UpgradeContainer", true)
296304

297305
ids := []string{}
298306
for _, id := range opts.Ids {
@@ -307,7 +315,6 @@ func (v *vibranium) UpgradeContainer(opts *pb.UpgradeOptions, stream pb.CoreRPC_
307315
for m := range ch {
308316
if err := stream.Send(toRPCUpgradeContainerMessage(m)); err != nil {
309317
go func() {
310-
defer v.taskDone("UpgradeContainer", true)
311318
for r := range ch {
312319
log.Infof("[UpgradeContainer] Unsent streamed message: %v", r)
313320
}
@@ -316,12 +323,12 @@ func (v *vibranium) UpgradeContainer(opts *pb.UpgradeOptions, stream pb.CoreRPC_
316323
}
317324
}
318325

319-
v.taskDone("UpgradeContainer", true)
320326
return nil
321327
}
322328

323329
func (v *vibranium) RemoveContainer(cids *pb.ContainerIDs, stream pb.CoreRPC_RemoveContainerServer) error {
324330
v.taskAdd("RemoveContainer", true)
331+
defer v.taskDone("RemoveContainer", true)
325332

326333
ids := []string{}
327334
for _, id := range cids.Ids {
@@ -340,7 +347,6 @@ func (v *vibranium) RemoveContainer(cids *pb.ContainerIDs, stream pb.CoreRPC_Rem
340347
for m := range ch {
341348
if err := stream.Send(toRPCRemoveContainerMessage(m)); err != nil {
342349
go func() {
343-
defer v.taskDone("RemoveContainer", true)
344350
for r := range ch {
345351
log.Infof("[RemoveContainer] Unsent streamed message: %v", r)
346352
}
@@ -349,11 +355,13 @@ func (v *vibranium) RemoveContainer(cids *pb.ContainerIDs, stream pb.CoreRPC_Rem
349355
}
350356
}
351357

352-
v.taskDone("RemoveContainer", true)
353358
return nil
354359
}
355360

356361
func (v *vibranium) RemoveImage(opts *pb.RemoveImageOptions, stream pb.CoreRPC_RemoveImageServer) error {
362+
v.taskAdd("RemoveImage", true)
363+
defer v.taskDone("RemoveImage", true)
364+
357365
ch, err := v.cluster.RemoveImage(opts.Podname, opts.Nodename, opts.Images)
358366
if err != nil {
359367
return err

0 commit comments

Comments
 (0)