Skip to content

Commit 43d7381

Browse files
authored
refactor wal stage 1 (#561)
* refactor wal2 * remove useless step in remove * minor refactor createworkloadhandler
1 parent ae9e4e5 commit 43d7381

33 files changed

+1204
-1285
lines changed

3rdmocks/ServerStream.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mock: deps
3636
mockery --dir store --output store/mocks --name Store
3737
mockery --dir engine --output engine/mocks --name API
3838
mockery --dir cluster --output cluster/mocks --name Cluster
39+
mockery --dir wal --output wal/mocks --name WAL
3940
mockery --dir lock --output lock/mocks --name DistributedLock
4041
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
4142
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn

cluster/calcium/calcium.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
6868
watcher := helium.New(config.GRPCConfig, store)
6969

7070
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
71-
cal.wal, err = newCalciumWAL(cal)
71+
cal.wal, err = newWAL(config, cal)
7272
cal.identifier = config.Identifier()
7373

7474
return cal, logger.Err(nil, errors.WithStack(err)) //nolint

cluster/calcium/lambda.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/projecteru2/core/strategy"
1414
"github.com/projecteru2/core/types"
1515
"github.com/projecteru2/core/utils"
16-
"github.com/projecteru2/core/wal"
1716

1817
"github.com/pkg/errors"
1918
)
@@ -68,7 +67,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
6867
}
6968
}
7069

71-
commit, err := c.walCreateLambda(message)
70+
commit, err := c.wal.Log(eventCreateLambda, message.WorkloadID)
7271
if err != nil {
7372
return &types.AttachWorkloadMessage{
7473
WorkloadID: message.WorkloadID,
@@ -197,7 +196,3 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
197196

198197
return workloadIDs, runMsgCh, nil
199198
}
200-
201-
func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
202-
return c.wal.logCreateLambda(opts)
203-
}

cluster/calcium/remove.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414

1515
// RemoveWorkload remove workloads
1616
// returns a channel that contains removing responses
17-
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
18-
logger := log.WithField("Calcium", "RemoveWorkload").WithField("ids", ids).WithField("force", force).WithField("step", step)
17+
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) (chan *types.RemoveWorkloadMessage, error) {
18+
logger := log.WithField("Calcium", "RemoveWorkload").WithField("ids", ids).WithField("force", force)
1919

2020
nodeWorkloadGroup, err := c.groupWorkloadsByNode(ctx, ids)
2121
if err != nil {
@@ -45,7 +45,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
4545
},
4646
// then
4747
func(ctx context.Context) (err error) {
48-
if err = c.doRemoveWorkload(ctx, workload, force); err != nil {
48+
if err = c.doRemoveWorkload(ctx, workload, force); err == nil {
4949
log.Infof(ctx, "[RemoveWorkload] Workload %s removed", workload.ID)
5050
}
5151
return err
@@ -104,12 +104,13 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload
104104

105105
// 同步地删除容器, 在某些需要等待的场合异常有用!
106106
func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, ids []string) error {
107-
ch, err := c.RemoveWorkload(ctx, ids, true, 1)
107+
ch, err := c.RemoveWorkload(ctx, ids, true)
108108
if err != nil {
109109
return err
110110
}
111111

112112
for m := range ch {
113+
// TODO deal with failed
113114
log.Debugf(ctx, "[doRemoveWorkloadSync] Removed %s", m.WorkloadID)
114115
}
115116
return nil

cluster/calcium/remove_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestRemoveWorkload(t *testing.T) {
2424

2525
// failed by GetWorkload
2626
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
27-
ch, err := c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
27+
ch, err := c.RemoveWorkload(ctx, []string{"xx"}, false)
2828
assert.True(t, errors.Is(err, types.ErrNoETCD))
2929
store.AssertExpectations(t)
3030

@@ -36,7 +36,7 @@ func TestRemoveWorkload(t *testing.T) {
3636
}
3737
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
3838
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
39-
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
39+
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false)
4040
assert.NoError(t, err)
4141
for r := range ch {
4242
assert.False(t, r.Success)
@@ -54,7 +54,7 @@ func TestRemoveWorkload(t *testing.T) {
5454
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
5555
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Twice()
5656
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
57-
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
57+
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false)
5858
assert.NoError(t, err)
5959
for r := range ch {
6060
assert.False(t, r.Success)
@@ -69,7 +69,7 @@ func TestRemoveWorkload(t *testing.T) {
6969
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
7070
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil)
7171
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
72-
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
72+
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false)
7373
assert.NoError(t, err)
7474
for r := range ch {
7575
assert.True(t, r.Success)

cluster/calcium/wal.go

+95-112
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,21 @@ const (
2222
// WAL for calcium.
2323
type WAL struct {
2424
wal.WAL
25-
config types.Config
2625
calcium *Calcium
2726
}
2827

29-
func newCalciumWAL(cal *Calcium) (*WAL, error) {
30-
w := &WAL{
31-
WAL: wal.NewHydro(),
32-
config: cal.config,
33-
calcium: cal,
28+
func newWAL(config types.Config, calcium *Calcium) (*WAL, error) {
29+
hydro, err := wal.NewHydro(config.WALFile, config.WALOpenTimeout)
30+
if err != nil {
31+
return nil, err
3432
}
3533

36-
if err := w.WAL.Open(w.config.WALFile, w.config.WALOpenTimeout); err != nil {
37-
return nil, err
34+
w := &WAL{
35+
WAL: hydro,
36+
calcium: calcium,
3837
}
3938

4039
w.registerHandlers()
41-
4240
return w, nil
4341
}
4442

@@ -49,105 +47,22 @@ func (w *WAL) registerHandlers() {
4947
w.Register(newProcessingCreatedHandler(w.calcium))
5048
}
5149

52-
func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
53-
return w.Log(eventCreateLambda, opts.WorkloadID)
54-
}
55-
56-
// CreateWorkloadHandler indicates event handler for creating workload.
57-
type CreateWorkloadHandler struct {
58-
event string
59-
calcium *Calcium
60-
}
61-
62-
func newCreateWorkloadHandler(cal *Calcium) *CreateWorkloadHandler {
63-
return &CreateWorkloadHandler{
64-
event: eventWorkloadCreated,
65-
calcium: cal,
66-
}
67-
}
68-
69-
// Event .
70-
func (h *CreateWorkloadHandler) Event() string {
71-
return h.event
72-
}
73-
74-
// Check .
75-
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (handle bool, err error) {
76-
_, ok := raw.(*types.Workload)
77-
if !ok {
78-
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
79-
}
80-
return true, nil
81-
}
82-
83-
// Encode .
84-
func (h *CreateWorkloadHandler) Encode(raw interface{}) ([]byte, error) {
85-
wrk, ok := raw.(*types.Workload)
86-
if !ok {
87-
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
88-
}
89-
return json.Marshal(wrk)
90-
}
91-
92-
// Decode .
93-
func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
94-
wrk := &types.Workload{}
95-
err := json.Unmarshal(bs, wrk)
96-
return wrk, err
97-
}
98-
99-
// Handle will remove instance, remove meta, restore resource
100-
func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (err error) {
101-
wrk, _ := raw.(*types.Workload)
102-
logger := log.WithField("WAL.Handle", "CreateWorkload").WithField("ID", wrk.ID).WithField("nodename", wrk.Nodename)
103-
104-
ctx, cancel := getReplayContext(ctx)
105-
defer cancel()
106-
107-
if _, err = h.calcium.GetWorkload(ctx, wrk.ID); err == nil {
108-
// workload meta exists
109-
ch, err := h.calcium.RemoveWorkload(ctx, []string{wrk.ID}, true, 0)
110-
if err != nil {
111-
return logger.Err(ctx, err)
112-
}
113-
for msg := range ch {
114-
if !msg.Success {
115-
logger.Errorf(ctx, "failed to remove workload")
116-
}
117-
}
118-
logger.Infof(ctx, "workload with meta removed")
119-
return nil
120-
}
121-
122-
// workload meta doesn't exist
123-
node, err := h.calcium.GetNode(ctx, wrk.Nodename)
124-
if err != nil {
125-
return logger.Err(ctx, err)
126-
}
127-
if err = node.Engine.VirtualizationRemove(ctx, wrk.ID, true, true); err != nil && !errors.Is(err, types.ErrWorkloadNotExists) {
128-
return logger.Err(ctx, err)
129-
}
130-
131-
logger.Infof(ctx, "workload removed")
132-
return nil
133-
}
134-
13550
// CreateLambdaHandler indicates event handler for creating lambda.
13651
type CreateLambdaHandler struct {
137-
event string
52+
typ string
13853
calcium *Calcium
13954
}
14055

141-
func newCreateLambdaHandler(cal *Calcium) *CreateLambdaHandler {
56+
func newCreateLambdaHandler(calcium *Calcium) *CreateLambdaHandler {
14257
return &CreateLambdaHandler{
143-
event: eventCreateLambda,
144-
calcium: cal,
58+
typ: eventCreateLambda,
59+
calcium: calcium,
14560
}
14661
}
14762

14863
// Event .
149-
func (h *CreateLambdaHandler) Event() string {
150-
return h.event
64+
func (h *CreateLambdaHandler) Typ() string {
65+
return h.typ
15166
}
15267

15368
// Check .
@@ -202,26 +117,90 @@ func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error
202117
return nil
203118
}
204119

205-
func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
206-
return context.WithTimeout(ctx, time.Second*32)
120+
// CreateWorkloadHandler indicates event handler for creating workload.
121+
type CreateWorkloadHandler struct {
122+
typ string
123+
calcium *Calcium
124+
}
125+
126+
func newCreateWorkloadHandler(calcium *Calcium) *CreateWorkloadHandler {
127+
return &CreateWorkloadHandler{
128+
typ: eventWorkloadCreated,
129+
calcium: calcium,
130+
}
131+
}
132+
133+
// Event .
134+
func (h *CreateWorkloadHandler) Typ() string {
135+
return h.typ
136+
}
137+
138+
// Check .
139+
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (handle bool, err error) {
140+
_, ok := raw.(*types.Workload)
141+
if !ok {
142+
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
143+
}
144+
return true, nil
145+
}
146+
147+
// Encode .
148+
func (h *CreateWorkloadHandler) Encode(raw interface{}) ([]byte, error) {
149+
wrk, ok := raw.(*types.Workload)
150+
if !ok {
151+
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
152+
}
153+
return json.Marshal(wrk)
154+
}
155+
156+
// Decode .
157+
func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
158+
wrk := &types.Workload{}
159+
err := json.Unmarshal(bs, wrk)
160+
return wrk, err
161+
}
162+
163+
// Handle will remove instance, remove meta, restore resource
164+
func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (err error) {
165+
wrk, _ := raw.(*types.Workload)
166+
logger := log.WithField("WAL.Handle", "CreateWorkload").WithField("ID", wrk.ID).WithField("nodename", wrk.Nodename)
167+
168+
ctx, cancel := getReplayContext(ctx)
169+
defer cancel()
170+
171+
if _, err = h.calcium.GetWorkload(ctx, wrk.ID); err == nil {
172+
return h.calcium.doRemoveWorkloadSync(ctx, []string{wrk.ID})
173+
}
174+
175+
// workload meta doesn't exist
176+
node, err := h.calcium.GetNode(ctx, wrk.Nodename)
177+
if err != nil {
178+
return logger.Err(ctx, err)
179+
}
180+
if err = node.Engine.VirtualizationRemove(ctx, wrk.ID, true, true); err != nil && !errors.Is(err, types.ErrWorkloadNotExists) {
181+
return logger.Err(ctx, err)
182+
}
183+
184+
logger.Infof(ctx, "workload removed")
185+
return nil
207186
}
208187

209188
// WorkloadResourceAllocatedHandler .
210189
type WorkloadResourceAllocatedHandler struct {
211-
event string
190+
typ string
212191
calcium *Calcium
213192
}
214193

215-
func newWorkloadResourceAllocatedHandler(cal *Calcium) *WorkloadResourceAllocatedHandler {
194+
func newWorkloadResourceAllocatedHandler(calcium *Calcium) *WorkloadResourceAllocatedHandler {
216195
return &WorkloadResourceAllocatedHandler{
217-
event: eventWorkloadResourceAllocated,
218-
calcium: cal,
196+
typ: eventWorkloadResourceAllocated,
197+
calcium: calcium,
219198
}
220199
}
221200

222201
// Event .
223-
func (h *WorkloadResourceAllocatedHandler) Event() string {
224-
return h.event
202+
func (h *WorkloadResourceAllocatedHandler) Typ() string {
203+
return h.typ
225204
}
226205

227206
// Check .
@@ -276,20 +255,20 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter
276255

277256
// ProcessingCreatedHandler .
278257
type ProcessingCreatedHandler struct {
279-
event string
258+
typ string
280259
calcium *Calcium
281260
}
282261

283-
func newProcessingCreatedHandler(cal *Calcium) *ProcessingCreatedHandler {
262+
func newProcessingCreatedHandler(calcium *Calcium) *ProcessingCreatedHandler {
284263
return &ProcessingCreatedHandler{
285-
event: eventProcessingCreated,
286-
calcium: cal,
264+
typ: eventProcessingCreated,
265+
calcium: calcium,
287266
}
288267
}
289268

290269
// Event .
291-
func (h *ProcessingCreatedHandler) Event() string {
292-
return h.event
270+
func (h *ProcessingCreatedHandler) Typ() string {
271+
return h.typ
293272
}
294273

295274
// Check .
@@ -329,3 +308,7 @@ func (h *ProcessingCreatedHandler) Handle(ctx context.Context, raw interface{})
329308
logger.Infof(ctx, "obsolete processing deleted")
330309
return
331310
}
311+
312+
func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
313+
return context.WithTimeout(ctx, time.Second*32) // TODO why 32?
314+
}

0 commit comments

Comments
 (0)