Skip to content

Commit 1a8fe58

Browse files
committed
[SPVR-80] add workload metadata before workload start
1 parent 912e9f1 commit 1a8fe58

File tree

10 files changed

+39
-20
lines changed

10 files changed

+39
-20
lines changed

cluster/calcium/create.go

+29-13
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,17 @@ func (c *Calcium) doDeployOneWorkload(
299299
},
300300

301301
func(ctx context.Context) (err error) {
302+
// avoid be interrupted by MakeDeployStatus
303+
processing := opts.GetProcessing(node.Name)
304+
if !decrProcessing {
305+
processing = nil
306+
}
307+
// add workload metadata first
308+
if err := c.store.AddWorkload(ctx, workload, processing); err != nil {
309+
return errors.WithStack(err)
310+
}
311+
log.Infof(ctx, "[doDeployOneWorkload] workload %s metadata created", workload.ID)
312+
302313
// Copy data to workload
303314
if len(opts.Files) > 0 {
304315
for _, file := range opts.Files {
@@ -323,12 +334,15 @@ func (c *Calcium) doDeployOneWorkload(
323334
}
324335
}
325336

326-
// start first
337+
// start workload
327338
msg.Hook, err = c.doStartWorkload(ctx, workload, opts.IgnoreHook)
328339
if err != nil {
329340
return err
330341
}
331342

343+
// reset workload.hook
344+
workload.Hook = opts.Entrypoint.Hook
345+
332346
// inspect real meta
333347
var workloadInfo *enginetypes.VirtualizationInfo
334348
workloadInfo, err = workload.Inspect(ctx) // 补充静态元数据
@@ -340,22 +354,18 @@ func (c *Calcium) doDeployOneWorkload(
340354
if workloadInfo.Networks != nil {
341355
msg.Publish = utils.MakePublishInfo(workloadInfo.Networks, opts.Entrypoint.Publish)
342356
}
343-
// reset users
357+
358+
// if workload metadata changed, then update
344359
if workloadInfo.User != workload.User {
360+
// reset users
345361
workload.User = workloadInfo.User
346-
}
347-
// reset workload.hook
348-
workload.Hook = opts.Entrypoint.Hook
349362

350-
// avoid be interrupted by MakeDeployStatus
351-
processing := opts.GetProcessing(node.Name)
352-
if !decrProcessing {
353-
processing = nil
354-
}
355-
if err := c.store.AddWorkload(ctx, workload, processing); err != nil {
356-
return errors.WithStack(err)
363+
if err := c.store.UpdateWorkload(ctx, workload); err != nil {
364+
return errors.WithStack(err)
365+
}
366+
log.Infof(ctx, "[doDeployOneWorkload] workload %s metadata updated", workload.ID)
357367
}
358-
log.Infof(ctx, "[doDeployOneWorkload] workload created and saved: %s", workload.ID)
368+
359369
msg.WorkloadID = workload.ID
360370
msg.WorkloadName = workload.Name
361371
msg.Podname = workload.Podname
@@ -365,9 +375,15 @@ func (c *Calcium) doDeployOneWorkload(
365375

366376
// remove workload
367377
func(ctx context.Context, _ bool) error {
378+
log.Errorf(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID)
368379
if workload.ID == "" {
369380
return nil
370381
}
382+
383+
if err := c.store.RemoveWorkload(ctx, workload); err != nil {
384+
log.Errorf(ctx, "[doDeployOneWorkload] failed to remove workload %s")
385+
}
386+
371387
return workload.Remove(ctx, true)
372388
},
373389
c.config.GlobalTimeout,

cluster/calcium/create_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
149149
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
150150
errors.Wrap(context.DeadlineExceeded, "MakeDeployStatus"),
151151
).Once()
152+
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil)
152153

153154
ch, err := c.CreateWorkload(ctx, opts)
154155
assert.Nil(t, err)

engine/docker/helper_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestWithDumpFiles(t *testing.T) {
2929
}
3030
fp := []string{}
3131
for target, content := range data {
32-
withTarfileDump(context.TODO(), target, bytes.NewBuffer(content), func(target, tarfile string) error {
32+
withTarfileDump(context.TODO(), target, content, 0, 0, 0, func(target, tarfile string) error {
3333
assert.True(t, strings.HasPrefix(target, "/tmp/test"))
3434
fp = append(fp, tarfile)
3535
_, err := os.Stat(tarfile)

rpc/rpc.go

+1
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,7 @@ func (v *Vibranium) CreateWorkload(opts *pb.DeployOptions, stream pb.CoreRPC_Cre
615615
return grpcstatus.Error(CreateWorkload, err.Error())
616616
}
617617
for m := range ch {
618+
log.Debugf(ctx, "[CreateWorkload] create workload message: %+v", m)
618619
if err = stream.Send(toRPCCreateWorkloadMessage(m)); err != nil {
619620
v.logUnsentMessages(ctx, "CreateWorkload", err, m)
620621
}

store/etcdv3/meta/etcd.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
277277
// There isn't the entity kv pair.
278278
if !entityTxn.Succeeded {
279279
e.revokeLease(ctx, leaseID)
280-
return nil
280+
return types.ErrEntityNotExists
281281
}
282282

283283
// There isn't a status bound to the entity.

store/etcdv3/meta/etcd_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func TestBindStatusButEntityTxnUnsuccessful(t *testing.T) {
102102

103103
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
104104
etcd.On("Txn", mock.Anything).Return(txn).Once()
105-
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
105+
require.Equal(t, types.ErrEntityNotExists, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
106106
}
107107

108108
func TestBindStatusButStatusTxnUnsuccessful(t *testing.T) {

store/etcdv3/workload_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func TestSetWorkloadStatus(t *testing.T) {
153153
workload.StatusMeta.Nodename = "n1"
154154
// no workload, err nil
155155
err = m.SetWorkloadStatus(ctx, workload.StatusMeta, 10)
156-
assert.NoError(t, err)
156+
assert.Equal(t, err, types.ErrEntityNotExists)
157157
assert.NoError(t, m.AddWorkload(ctx, workload, nil))
158158
// no status key, put succ, err nil
159159
err = m.SetWorkloadStatus(ctx, workload.StatusMeta, 10)

store/redis/rediaron.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,10 @@ func (r *Rediaron) BindStatus(ctx context.Context, entityKey, statusKey, statusV
272272
if err != nil {
273273
return err
274274
}
275-
// doesn't exist, returns nil, does nothing
275+
// doesn't exist, returns error
276276
// to behave just like etcd
277277
if count != 1 {
278-
return nil
278+
return types.ErrEntityNotExists
279279
}
280280

281281
_, err = r.cli.Set(ctx, statusKey, statusValue, time.Duration(ttl)*time.Second).Result()

store/redis/workload_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (s *RediaronTestSuite) TestSetWorkloadStatus() {
157157
workload.StatusMeta.Nodename = "n1"
158158
// no workload, err nil
159159
err = m.SetWorkloadStatus(ctx, workload.StatusMeta, 10)
160-
s.NoError(err)
160+
s.ErrorIs(err, types.ErrEntityNotExists)
161161
s.NoError(m.AddWorkload(ctx, workload, nil))
162162
// no status key, put succ, err nil
163163
err = m.SetWorkloadStatus(ctx, workload.StatusMeta, 10)

types/errors.go

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ var (
8585

8686
ErrNodeNotExists = errors.New("node not exists")
8787
ErrWorkloadNotExists = errors.New("workload not exists")
88+
ErrEntityNotExists = errors.New("entity not exists")
8889

8990
ErrUnregisteredWALEventType = errors.New("unregistered WAL event type")
9091
ErrInvalidWALBucket = errors.New("invalid WAL bucket")

0 commit comments

Comments
 (0)