Skip to content

Commit 3e878d3

Browse files
make AddWorkload and DecrProcessing atomic (#431)
* make AddWorkload and DecrProcessing atomic * impl redis version of BatchCreateAndDecr * pass unittest
1 parent e222480 commit 3e878d3

27 files changed

+228
-178
lines changed

cluster/calcium/capacity_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ func TestCalculateCapacity(t *testing.T) {
3838
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
3939
// failed by wrong resource
4040
opts := &types.DeployOptions{
41+
Entrypoint: &types.Entrypoint{
42+
Name: "entry",
43+
},
4144
ResourceOpts: types.ResourceOptions{
4245
CPUBind: true,
4346
CPUQuotaRequest: 0,
@@ -66,7 +69,7 @@ func TestCalculateCapacity(t *testing.T) {
6669
sched.On("SelectMemoryNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos, 5, nil).Twice()
6770
sched.On("SelectStorageNodes", mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos, 5, nil).Twice()
6871
sched.On("SelectVolumeNodes", mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos, nil, 5, nil).Twice()
69-
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
72+
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
7073
r, err := c.CalculateCapacity(ctx, opts)
7174
assert.NoError(t, err)
7275
assert.Equal(t, r.Total, 5)

cluster/calcium/create.go

+5-12
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
5656
defer func() {
5757
cctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.Background()), c.config.GlobalTimeout)
5858
for nodename := range deployMap {
59-
if e := c.store.DeleteProcessing(cctx, opts, nodename); e != nil {
59+
if e := c.store.DeleteProcessing(cctx, opts.GetProcessing(nodename)); e != nil {
6060
logger.Errorf(ctx, "[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, e)
6161
}
6262
}
@@ -87,7 +87,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
8787
plan.ApplyChangesOnNode(nodeMap[nodename], utils.Range(deploy)...)
8888
}
8989
nodes = append(nodes, nodeMap[nodename])
90-
if err = c.store.SaveProcessing(ctx, opts, nodename, deploy); err != nil {
90+
if err = c.store.CreateProcessing(ctx, opts.GetProcessing(nodename), deploy); err != nil {
9191
return errors.WithStack(err)
9292
}
9393
}
@@ -211,7 +211,7 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
211211

212212
createMsg.ResourceMeta = *r
213213
createOpts := c.doMakeWorkloadOptions(ctx, seq+idx, createMsg, opts, node)
214-
e = c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, deploy-1-idx)
214+
e = c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts)
215215
}
216216
}(idx))
217217
}
@@ -245,7 +245,6 @@ func (c *Calcium) doDeployOneWorkload(
245245
opts *types.DeployOptions,
246246
msg *types.CreateWorkloadMessage,
247247
config *enginetypes.VirtualizationCreateOptions,
248-
processingCount int,
249248
) (err error) {
250249
workload := &types.Workload{
251250
ResourceMeta: types.ResourceMeta{
@@ -352,14 +351,8 @@ func (c *Calcium) doDeployOneWorkload(
352351
// reset workload.hook
353352
workload.Hook = opts.Entrypoint.Hook
354353

355-
// update processing
356-
if processingCount >= 0 {
357-
if err := c.store.UpdateProcessing(ctx, opts, node.Name, processingCount); err != nil {
358-
return errors.WithStack(err)
359-
}
360-
}
361-
362-
if err := c.store.AddWorkload(ctx, workload); err != nil {
354+
// avoid be interrupted by MakeDeployStatus
355+
if err := c.store.AddWorkload(ctx, workload, opts.GetProcessing(node.Name)); err != nil {
363356
return errors.WithStack(err)
364357
}
365358
log.Infof(ctx, "[doDeployOneWorkload] workload created and saved: %s", workload.ID)

cluster/calcium/create_test.go

+8-12
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ func TestCreateWorkload(t *testing.T) {
3434
}
3535
store := c.store.(*storemocks.Store)
3636

37-
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
38-
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
37+
store.On("CreateProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
3938
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
4039

4140
// failed by validating
@@ -110,8 +109,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
110109
defer mwal.AssertExpectations(t)
111110
var walCommitted bool
112111

113-
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
114-
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
112+
store.On("CreateProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
115113
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
116114

117115
// doAllocResource fails: MakeDeployStatus
@@ -147,7 +145,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
147145
}
148146
return scheduleInfos
149147
}, len(nodes), nil)
150-
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(
148+
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
151149
errors.Wrap(context.DeadlineExceeded, "MakeDeployStatus"),
152150
).Once()
153151

@@ -163,7 +161,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
163161
assert.False(t, walCommitted)
164162

165163
// commit resource changes fails: UpdateNodes
166-
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
164+
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
167165
old := strategy.Plans[strategy.Auto]
168166
strategy.Plans[strategy.Auto] = func(ctx context.Context, sis []strategy.Info, need, total, _ int) (map[string]int, error) {
169167
deployInfos := make(map[string]int)
@@ -210,7 +208,6 @@ func TestCreateWorkloadTxn(t *testing.T) {
210208
engine := node1.Engine.(*enginemocks.API)
211209
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice()
212210
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice()
213-
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
214211
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
215212
walCommitted = false
216213
ch, err = c.CreateWorkload(ctx, opts)
@@ -252,7 +249,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
252249
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
253250
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
254251
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
255-
store.On("AddWorkload", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload")).Twice()
252+
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload")).Twice()
256253
commit := wal.Commit(func() error {
257254
walCommitted = true
258255
return nil
@@ -277,8 +274,8 @@ func TestCreateWorkloadTxn(t *testing.T) {
277274
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
278275
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
279276
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
280-
store.On("AddWorkload", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload2")).Once()
281-
store.On("AddWorkload", mock.Anything, mock.Anything).Return(nil).Once()
277+
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload2")).Once()
278+
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
282279
walCommitted = false
283280
ch, err = c.CreateWorkload(ctx, opts)
284281
assert.Nil(t, err)
@@ -325,8 +322,7 @@ func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {
325322
nodes := []*types.Node{node1, node2}
326323

327324
store := c.store.(*storemocks.Store)
328-
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
329-
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
325+
store.On("CreateProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
330326
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
331327

332328
// doAllocResource fails: MakeDeployStatus

cluster/calcium/lambda_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
5151
}
5252

5353
mstore := c.store.(*storemocks.Store)
54-
mstore.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")).Once()
54+
mstore.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")).Once()
5555

5656
_, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
5757
assert.NoError(err)
@@ -207,7 +207,7 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
207207

208208
node1, node2 := nodes[0], nodes[1]
209209

210-
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
210+
store.On("CreateProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
211211
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
212212
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
213213

@@ -244,7 +244,7 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
244244
return scheduleInfos
245245
}, len(nodes), nil)
246246

247-
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
247+
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
248248
old := strategy.Plans[strategy.Auto]
249249
strategy.Plans[strategy.Auto] = func(ctx context.Context, sis []strategy.Info, need, total, _ int) (map[string]int, error) {
250250
deployInfos := make(map[string]int)
@@ -282,7 +282,7 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
282282
engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
283283
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
284284
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
285-
store.On("AddWorkload", mock.Anything, mock.Anything).Return(nil)
285+
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(nil)
286286

287287
return c, nodes
288288
}

cluster/calcium/remove.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload
9797
if failedByCond {
9898
return nil
9999
}
100-
return errors.WithStack(c.store.AddWorkload(ctx, workload))
100+
return errors.WithStack(c.store.AddWorkload(ctx, workload, nil))
101101
},
102102
c.config.GlobalTimeout,
103103
)

cluster/calcium/replace.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (c *Calcium) doReplaceWorkload(
170170
// if
171171
func(ctx context.Context) error {
172172
vco := c.doMakeReplaceWorkloadOptions(ctx, index, createMessage, &opts.DeployOptions, node, workload.ID)
173-
return c.doDeployOneWorkload(ctx, node, &opts.DeployOptions, createMessage, vco, -1)
173+
return c.doDeployOneWorkload(ctx, node, &opts.DeployOptions, createMessage, vco)
174174
},
175175
// then
176176
func(ctx context.Context) (err error) {

cluster/calcium/replace_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func TestReplaceWorkload(t *testing.T) {
187187
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
188188
engine.On("VirtualizationCopyTo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
189189
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{User: "test"}, nil)
190-
store.On("AddWorkload", mock.Anything, mock.Anything).Return(nil)
190+
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(nil)
191191
// failed by remove workload
192192
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once()
193193
ch, err = c.ReplaceWorkload(ctx, opts)

cluster/calcium/resource.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, nodeMap map[string]*types
171171
if err != nil {
172172
return nil, nil, err
173173
}
174-
if err := c.store.MakeDeployStatus(ctx, opts, strategyInfos); err != nil {
174+
if err = c.store.MakeDeployStatus(ctx, opts.Name, opts.Entrypoint.Name, strategyInfos); err != nil {
175175
return nil, nil, errors.WithStack(err)
176176
}
177177
deployMap, err := strategy.Deploy(ctx, opts, strategyInfos, total)

cluster/calcium/resource_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ func TestAllocResource(t *testing.T) {
167167
podname := "testpod"
168168
opts := &types.DeployOptions{
169169
Podname: podname,
170+
Entrypoint: &types.Entrypoint{
171+
Name: "entry",
172+
},
170173
}
171174
config := types.Config{
172175
LockTimeout: time.Duration(time.Second * 3),
@@ -222,7 +225,7 @@ func TestAllocResource(t *testing.T) {
222225

223226
sched.On("SelectMemoryNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos, total, nil)
224227
testAllocFailedAsMakeDeployStatusError(t, c, opts, nodeMap)
225-
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
228+
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
226229

227230
testAllocFailedAsWrongDeployMethod(t, c, opts, nodeMap)
228231
testAllocFailedAsCommonDivisionError(t, c, opts, nodeMap)
@@ -250,7 +253,7 @@ func TestAllocResource(t *testing.T) {
250253

251254
func testAllocFailedAsMakeDeployStatusError(t *testing.T, c *Calcium, opts *types.DeployOptions, nodeMap map[string]*types.Node) {
252255
store := c.store.(*storemocks.Store)
253-
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once()
256+
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once()
254257
_, _, err := c.doAllocResource(context.Background(), nodeMap, opts)
255258
assert.Error(t, err)
256259
}

store/etcdv3/deploy.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,24 @@ import (
77

88
"github.com/projecteru2/core/log"
99
"github.com/projecteru2/core/strategy"
10-
"github.com/projecteru2/core/types"
1110
"go.etcd.io/etcd/v3/clientv3"
1211
)
1312

1413
// MakeDeployStatus get deploy status from store
15-
func (m *Mercury) MakeDeployStatus(ctx context.Context, opts *types.DeployOptions, strategyInfos []strategy.Info) error {
14+
func (m *Mercury) MakeDeployStatus(ctx context.Context, appname, entryname string, strategyInfos []strategy.Info) error {
1615
// 手动加 / 防止不精确
17-
key := filepath.Join(workloadDeployPrefix, opts.Name, opts.Entrypoint.Name) + "/"
16+
key := filepath.Join(workloadDeployPrefix, appname, entryname) + "/"
1817
resp, err := m.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithKeysOnly())
1918
if err != nil {
2019
return err
2120
}
2221
if resp.Count == 0 {
23-
log.Warnf(ctx, "[MakeDeployStatus] Deploy status not found %s.%s", opts.Name, opts.Entrypoint.Name)
22+
log.Warnf(ctx, "[MakeDeployStatus] Deploy status not found %s.%s", appname, entryname)
2423
}
2524
if err = m.doGetDeployStatus(ctx, resp, strategyInfos); err != nil {
2625
return err
2726
}
28-
return m.doLoadProcessing(ctx, opts, strategyInfos)
27+
return m.doLoadProcessing(ctx, appname, entryname, strategyInfos)
2928
}
3029

3130
func (m *Mercury) doGetDeployStatus(_ context.Context, resp *clientv3.GetResponse, strategyInfos []strategy.Info) error {

store/etcdv3/deploy_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestDeploy(t *testing.T) {
2525
}
2626

2727
// no workload deployed
28-
err := m.MakeDeployStatus(ctx, opts, sis)
28+
err := m.MakeDeployStatus(ctx, opts.Name, opts.Entrypoint.Name, sis)
2929
assert.NoError(t, err)
3030
assert.Equal(t, len(sis), 1)
3131
// have workloads
@@ -35,7 +35,7 @@ func TestDeploy(t *testing.T) {
3535
key = filepath.Join(workloadDeployPrefix, opts.Name, opts.Entrypoint.Name, "node", "id2")
3636
_, err = m.Put(ctx, key, "")
3737
assert.NoError(t, err)
38-
err = m.MakeDeployStatus(ctx, opts, sis)
38+
err = m.MakeDeployStatus(ctx, opts.Name, opts.Entrypoint.Name, sis)
3939
assert.NoError(t, err)
4040
assert.Equal(t, len(sis), 1)
4141
assert.Equal(t, sis[0].Nodename, "node")

store/etcdv3/meta/etcd.go

+42
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"fmt"
7+
"strconv"
78
"sync"
89
"time"
910

@@ -389,3 +390,44 @@ func (e *ETCD) doBatchOp(ctx context.Context, conds []clientv3.Cmp, ops, failOps
389390
}
390391
return resp, nil
391392
}
393+
394+
// BatchCreateAndDecr used to decr processing and add workload
395+
func (e *ETCD) BatchCreateAndDecr(ctx context.Context, data map[string]string, decrKey string) (err error) {
396+
resp, err := e.Get(ctx, decrKey)
397+
if err != nil {
398+
return
399+
}
400+
401+
decrKv := resp.Kvs[0]
402+
putOps := []clientv3.Op{}
403+
for key, value := range data {
404+
putOps = append(putOps, clientv3.OpPut(key, value))
405+
}
406+
407+
for {
408+
cnt, err := strconv.Atoi(string(decrKv.Value))
409+
if err != nil {
410+
return err
411+
}
412+
413+
conds := []clientv3.Cmp{
414+
clientv3.Compare(clientv3.Value(decrKey), "=", string(decrKv.Value)),
415+
}
416+
ops := append(putOps,
417+
clientv3.OpPut(decrKey, strconv.Itoa(cnt-1)),
418+
)
419+
failOps := []clientv3.Op{
420+
clientv3.OpGet(decrKey),
421+
}
422+
txnResp, err := e.doBatchOp(ctx, conds, ops, failOps)
423+
if err != nil {
424+
return err
425+
}
426+
if txnResp.Succeeded {
427+
break
428+
}
429+
decrKv = txnResp.Responses[0].GetResponseRange().Kvs[0]
430+
}
431+
432+
return nil
433+
}

store/etcdv3/meta/meta.go

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ type KV interface {
2525
Update(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error)
2626
Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error)
2727

28+
BatchCreateAndDecr(ctx context.Context, data map[string]string, decrKey string) error
29+
2830
BatchCreate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error)
2931
BatchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error)
3032
BatchDelete(ctx context.Context, keys []string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error)

store/etcdv3/processing.go

+9-14
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,25 @@ import (
1515
"go.etcd.io/etcd/v3/clientv3"
1616
)
1717

18-
// SaveProcessing save processing status in etcd
19-
func (m *Mercury) SaveProcessing(ctx context.Context, opts *types.DeployOptions, nodename string, count int) error {
20-
processingKey := filepath.Join(workloadProcessingPrefix, opts.Name, opts.Entrypoint.Name, nodename, opts.ProcessIdent)
21-
_, err := m.Create(ctx, processingKey, fmt.Sprintf("%d", count))
22-
return err
18+
func (m *Mercury) getProcessingKey(processing *types.Processing) string {
19+
return filepath.Join(workloadProcessingPrefix, processing.Appname, processing.Entryname, processing.Nodename, processing.Ident)
2320
}
2421

25-
// UpdateProcessing update processing status in etcd
26-
func (m *Mercury) UpdateProcessing(ctx context.Context, opts *types.DeployOptions, nodename string, count int) error {
27-
processingKey := filepath.Join(workloadProcessingPrefix, opts.Name, opts.Entrypoint.Name, nodename, opts.ProcessIdent)
28-
_, err := m.Update(ctx, processingKey, fmt.Sprintf("%d", count))
22+
// CreateProcessing save processing status in etcd
23+
func (m *Mercury) CreateProcessing(ctx context.Context, processing *types.Processing, count int) error {
24+
_, err := m.Create(ctx, m.getProcessingKey(processing), fmt.Sprintf("%d", count))
2925
return err
3026
}
3127

3228
// DeleteProcessing delete processing status in etcd
33-
func (m *Mercury) DeleteProcessing(ctx context.Context, opts *types.DeployOptions, nodename string) error {
34-
processingKey := filepath.Join(workloadProcessingPrefix, opts.Name, opts.Entrypoint.Name, nodename, opts.ProcessIdent)
35-
_, err := m.Delete(ctx, processingKey)
29+
func (m *Mercury) DeleteProcessing(ctx context.Context, processing *types.Processing) error {
30+
_, err := m.Delete(ctx, m.getProcessingKey(processing))
3631
return err
3732
}
3833

39-
func (m *Mercury) doLoadProcessing(ctx context.Context, opts *types.DeployOptions, strategyInfos []strategy.Info) error {
34+
func (m *Mercury) doLoadProcessing(ctx context.Context, appname, entryname string, strategyInfos []strategy.Info) error {
4035
// 显式的加 / 保证 prefix 一致性
41-
processingKey := filepath.Join(workloadProcessingPrefix, opts.Name, opts.Entrypoint.Name) + "/"
36+
processingKey := filepath.Join(workloadProcessingPrefix, appname, entryname) + "/"
4237
resp, err := m.Get(ctx, processingKey, clientv3.WithPrefix())
4338
if err != nil {
4439
return err

0 commit comments

Comments
 (0)