Skip to content

Commit 38221ae

Browse files
committed
refactor wal2
1 parent 1b765fa commit 38221ae

30 files changed

+960
-450
lines changed

3rdmocks/ServerStream.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mock: deps
3636
mockery --dir store --output store/mocks --name Store
3737
mockery --dir engine --output engine/mocks --name API
3838
mockery --dir cluster --output cluster/mocks --name Cluster
39+
mockery --dir wal --output wal/mocks --name WAL
3940
mockery --dir lock --output lock/mocks --name DistributedLock
4041
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
4142
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn

cluster/calcium/calcium.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
6868
watcher := helium.New(config.GRPCConfig, store)
6969

7070
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
71-
cal.wal, err = newCalciumWAL(cal)
71+
cal.wal, err = newWAL(config, cal)
7272
cal.identifier = config.Identifier()
7373

7474
return cal, logger.Err(nil, errors.WithStack(err)) //nolint

cluster/calcium/lambda.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/projecteru2/core/strategy"
1414
"github.com/projecteru2/core/types"
1515
"github.com/projecteru2/core/utils"
16-
"github.com/projecteru2/core/wal"
1716

1817
"github.com/pkg/errors"
1918
)
@@ -68,7 +67,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
6867
}
6968
}
7069

71-
commit, err := c.walCreateLambda(message)
70+
commit, err := c.wal.Log(eventCreateLambda, message.WorkloadID)
7271
if err != nil {
7372
return &types.AttachWorkloadMessage{
7473
WorkloadID: message.WorkloadID,
@@ -197,7 +196,3 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
197196

198197
return workloadIDs, runMsgCh, nil
199198
}
200-
201-
func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
202-
return c.wal.logCreateLambda(opts)
203-
}

cluster/calcium/wal.go

+97-103
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,21 @@ const (
2222
// WAL for calcium.
2323
type WAL struct {
2424
wal.WAL
25-
config types.Config
2625
calcium *Calcium
2726
}
2827

29-
func newCalciumWAL(cal *Calcium) (*WAL, error) {
30-
w := &WAL{
31-
WAL: wal.NewHydro(),
32-
config: cal.config,
33-
calcium: cal,
28+
func newWAL(config types.Config, calcium *Calcium) (*WAL, error) {
29+
hydro, err := wal.NewHydro(config.WALFile, config.WALOpenTimeout)
30+
if err != nil {
31+
return nil, err
3432
}
3533

36-
if err := w.WAL.Open(w.config.WALFile, w.config.WALOpenTimeout); err != nil {
37-
return nil, err
34+
w := &WAL{
35+
WAL: hydro,
36+
calcium: calcium,
3837
}
3938

4039
w.registerHandlers()
41-
4240
return w, nil
4341
}
4442

@@ -49,26 +47,92 @@ func (w *WAL) registerHandlers() {
4947
w.Register(newProcessingCreatedHandler(w.calcium))
5048
}
5149

52-
func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
53-
return w.Log(eventCreateLambda, opts.WorkloadID)
50+
// CreateLambdaHandler indicates event handler for creating lambda.
51+
type CreateLambdaHandler struct {
52+
typ string
53+
calcium *Calcium
54+
}
55+
56+
func newCreateLambdaHandler(calcium *Calcium) *CreateLambdaHandler {
57+
return &CreateLambdaHandler{
58+
typ: eventCreateLambda,
59+
calcium: calcium,
60+
}
61+
}
62+
63+
// Event .
64+
func (h *CreateLambdaHandler) Typ() string {
65+
return h.typ
66+
}
67+
68+
// Check .
69+
func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) {
70+
return true, nil
71+
}
72+
73+
// Encode .
74+
func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) {
75+
workloadID, ok := raw.(string)
76+
if !ok {
77+
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
78+
}
79+
return []byte(workloadID), nil
80+
}
81+
82+
// Decode .
83+
func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
84+
return string(bs), nil
85+
}
86+
87+
// Handle .
88+
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
89+
workloadID, ok := raw.(string)
90+
if !ok {
91+
return types.NewDetailedErr(types.ErrInvalidType, raw)
92+
}
93+
94+
logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID)
95+
go func() {
96+
workload, err := h.calcium.GetWorkload(ctx, workloadID)
97+
if err != nil {
98+
logger.Errorf(ctx, "Get workload failed: %v", err)
99+
return
100+
}
101+
102+
r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "")
103+
if err != nil {
104+
logger.Errorf(ctx, "Wait failed: %+v", err)
105+
return
106+
}
107+
if r.Code != 0 {
108+
logger.Errorf(ctx, "Run failed: %s", r.Message)
109+
}
110+
111+
if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil {
112+
logger.Errorf(ctx, "Remove failed: %+v", err)
113+
}
114+
logger.Infof(ctx, "waited and removed")
115+
}()
116+
117+
return nil
54118
}
55119

56120
// CreateWorkloadHandler indicates event handler for creating workload.
57121
type CreateWorkloadHandler struct {
58-
event string
122+
typ string
59123
calcium *Calcium
60124
}
61125

62-
func newCreateWorkloadHandler(cal *Calcium) *CreateWorkloadHandler {
126+
func newCreateWorkloadHandler(calcium *Calcium) *CreateWorkloadHandler {
63127
return &CreateWorkloadHandler{
64-
event: eventWorkloadCreated,
65-
calcium: cal,
128+
typ: eventWorkloadCreated,
129+
calcium: calcium,
66130
}
67131
}
68132

69133
// Event .
70-
func (h *CreateWorkloadHandler) Event() string {
71-
return h.event
134+
func (h *CreateWorkloadHandler) Typ() string {
135+
return h.typ
72136
}
73137

74138
// Check .
@@ -132,96 +196,22 @@ func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (er
132196
return nil
133197
}
134198

135-
// CreateLambdaHandler indicates event handler for creating lambda.
136-
type CreateLambdaHandler struct {
137-
event string
138-
calcium *Calcium
139-
}
140-
141-
func newCreateLambdaHandler(cal *Calcium) *CreateLambdaHandler {
142-
return &CreateLambdaHandler{
143-
event: eventCreateLambda,
144-
calcium: cal,
145-
}
146-
}
147-
148-
// Event .
149-
func (h *CreateLambdaHandler) Event() string {
150-
return h.event
151-
}
152-
153-
// Check .
154-
func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) {
155-
return true, nil
156-
}
157-
158-
// Encode .
159-
func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) {
160-
workloadID, ok := raw.(string)
161-
if !ok {
162-
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
163-
}
164-
return []byte(workloadID), nil
165-
}
166-
167-
// Decode .
168-
func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
169-
return string(bs), nil
170-
}
171-
172-
// Handle .
173-
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
174-
workloadID, ok := raw.(string)
175-
if !ok {
176-
return types.NewDetailedErr(types.ErrInvalidType, raw)
177-
}
178-
179-
logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID)
180-
go func() {
181-
workload, err := h.calcium.GetWorkload(ctx, workloadID)
182-
if err != nil {
183-
logger.Errorf(ctx, "Get workload failed: %v", err)
184-
return
185-
}
186-
187-
r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "")
188-
if err != nil {
189-
logger.Errorf(ctx, "Wait failed: %+v", err)
190-
return
191-
}
192-
if r.Code != 0 {
193-
logger.Errorf(ctx, "Run failed: %s", r.Message)
194-
}
195-
196-
if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil {
197-
logger.Errorf(ctx, "Remove failed: %+v", err)
198-
}
199-
logger.Infof(ctx, "waited and removed")
200-
}()
201-
202-
return nil
203-
}
204-
205-
func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
206-
return context.WithTimeout(ctx, time.Second*32)
207-
}
208-
209199
// WorkloadResourceAllocatedHandler .
210200
type WorkloadResourceAllocatedHandler struct {
211-
event string
201+
typ string
212202
calcium *Calcium
213203
}
214204

215-
func newWorkloadResourceAllocatedHandler(cal *Calcium) *WorkloadResourceAllocatedHandler {
205+
func newWorkloadResourceAllocatedHandler(calcium *Calcium) *WorkloadResourceAllocatedHandler {
216206
return &WorkloadResourceAllocatedHandler{
217-
event: eventWorkloadResourceAllocated,
218-
calcium: cal,
207+
typ: eventWorkloadResourceAllocated,
208+
calcium: calcium,
219209
}
220210
}
221211

222212
// Event .
223-
func (h *WorkloadResourceAllocatedHandler) Event() string {
224-
return h.event
213+
func (h *WorkloadResourceAllocatedHandler) Typ() string {
214+
return h.typ
225215
}
226216

227217
// Check .
@@ -276,20 +266,20 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter
276266

277267
// ProcessingCreatedHandler .
278268
type ProcessingCreatedHandler struct {
279-
event string
269+
typ string
280270
calcium *Calcium
281271
}
282272

283-
func newProcessingCreatedHandler(cal *Calcium) *ProcessingCreatedHandler {
273+
func newProcessingCreatedHandler(calcium *Calcium) *ProcessingCreatedHandler {
284274
return &ProcessingCreatedHandler{
285-
event: eventProcessingCreated,
286-
calcium: cal,
275+
typ: eventProcessingCreated,
276+
calcium: calcium,
287277
}
288278
}
289279

290280
// Event .
291-
func (h *ProcessingCreatedHandler) Event() string {
292-
return h.event
281+
func (h *ProcessingCreatedHandler) Typ() string {
282+
return h.typ
293283
}
294284

295285
// Check .
@@ -329,3 +319,7 @@ func (h *ProcessingCreatedHandler) Handle(ctx context.Context, raw interface{})
329319
logger.Infof(ctx, "obsolete processing deleted")
330320
return
331321
}
322+
323+
func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
324+
return context.WithTimeout(ctx, time.Second*32) // TODO why 32?
325+
}

cluster/calcium/wal_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
func TestHandleCreateWorkloadNoHandle(t *testing.T) {
2020
c := NewTestCluster()
21-
wal, err := newCalciumWAL(c)
21+
wal, err := newWAL(c.config, c)
2222
require.NoError(t, err)
2323
c.wal = wal
2424

@@ -43,7 +43,7 @@ func TestHandleCreateWorkloadNoHandle(t *testing.T) {
4343

4444
func TestHandleCreateWorkloadError(t *testing.T) {
4545
c := NewTestCluster()
46-
wal, err := newCalciumWAL(c)
46+
wal, err := newWAL(c.config, c)
4747
require.NoError(t, err)
4848
c.wal = wal
4949

@@ -90,7 +90,7 @@ func TestHandleCreateWorkloadError(t *testing.T) {
9090

9191
func TestHandleCreateWorkloadHandled(t *testing.T) {
9292
c := NewTestCluster()
93-
wal, err := newCalciumWAL(c)
93+
wal, err := newWAL(c.config, c)
9494
require.NoError(t, err)
9595
c.wal = wal
9696

@@ -130,11 +130,11 @@ func TestHandleCreateWorkloadHandled(t *testing.T) {
130130

131131
func TestHandleCreateLambda(t *testing.T) {
132132
c := NewTestCluster()
133-
wal, err := newCalciumWAL(c)
133+
wal, err := newWAL(c.config, c)
134134
require.NoError(t, err)
135135
c.wal = wal
136136

137-
_, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"})
137+
_, err = c.wal.Log(eventCreateLambda, "workloadid")
138138
require.NoError(t, err)
139139

140140
node := &types.Node{
@@ -153,7 +153,7 @@ func TestHandleCreateLambda(t *testing.T) {
153153
time.Sleep(500 * time.Millisecond)
154154
store.AssertExpectations(t)
155155

156-
_, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"})
156+
_, err = c.wal.Log(eventCreateLambda, "workloadid")
157157
require.NoError(t, err)
158158
store.On("GetWorkload", mock.Anything, mock.Anything).
159159
Return(wrk, nil).

engine/docker/mocks/APIClient.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/mocks/API.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lock/mocks/DistributedLock.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/mocks/CoreRPC_RunAndWaitServer.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scheduler/mocks/Scheduler.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)