Skip to content

Commit 46f170b

Browse files
anrsanrs
and
anrs
authored
use the real context from exterior (#466)
Co-authored-by: anrs <anders.hu@shopee.com>
1 parent fd7dc02 commit 46f170b

File tree

10 files changed

+63
-84
lines changed

10 files changed

+63
-84
lines changed

cluster/calcium/calcium.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
8585
}
8686

8787
// DisasterRecover .
88-
func (c *Calcium) DisasterRecover() {
89-
c.wal.Recover()
88+
func (c *Calcium) DisasterRecover(ctx context.Context) {
89+
c.wal.Recover(ctx)
9090
}
9191

9292
// Finalizer use for defer

cluster/calcium/wal.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,13 @@ func (h *CreateWorkloadHandler) Event() string {
7979
}
8080

8181
// Check .
82-
func (h *CreateWorkloadHandler) Check(raw interface{}) (bool, error) {
82+
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (bool, error) {
8383
wrk, ok := raw.(*types.Workload)
8484
if !ok {
8585
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
8686
}
8787

88-
ctx, cancel := getReplayContext(context.Background())
88+
ctx, cancel := getReplayContext(ctx)
8989
defer cancel()
9090

9191
_, err := h.calcium.GetWorkload(ctx, wrk.ID)
@@ -122,13 +122,13 @@ func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
122122
}
123123

124124
// Handle .
125-
func (h *CreateWorkloadHandler) Handle(raw interface{}) error {
125+
func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) error {
126126
wrk, ok := raw.(*types.Workload)
127127
if !ok {
128128
return types.NewDetailedErr(types.ErrInvalidType, raw)
129129
}
130130

131-
ctx, cancel := getReplayContext(context.Background())
131+
ctx, cancel := getReplayContext(ctx)
132132
defer cancel()
133133

134134
// There hasn't been the exact workload metadata, so we must remove it.
@@ -173,7 +173,7 @@ func (h *CreateLambdaHandler) Event() string {
173173
}
174174

175175
// Check .
176-
func (h *CreateLambdaHandler) Check(interface{}) (bool, error) {
176+
func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) {
177177
return true, nil
178178
}
179179

@@ -194,20 +194,20 @@ func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
194194
}
195195

196196
// Handle .
197-
func (h *CreateLambdaHandler) Handle(raw interface{}) error {
197+
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
198198
opts, ok := raw.(*types.ListWorkloadsOptions)
199199
if !ok {
200200
return types.NewDetailedErr(types.ErrInvalidType, raw)
201201
}
202202

203-
workloadIDs, err := h.getWorkloadIDs(opts)
203+
workloadIDs, err := h.getWorkloadIDs(ctx, opts)
204204
if err != nil {
205205
log.Errorf(context.TODO(), "[CreateLambdaHandler.Handle] Get workloads %s/%s/%v failed: %v",
206206
opts.Appname, opts.Entrypoint, opts.Labels, err)
207207
return err
208208
}
209209

210-
ctx, cancel := getReplayContext(context.Background())
210+
ctx, cancel := getReplayContext(ctx)
211211
defer cancel()
212212

213213
if err := h.calcium.doRemoveWorkloadSync(ctx, workloadIDs); err != nil {
@@ -220,8 +220,8 @@ func (h *CreateLambdaHandler) Handle(raw interface{}) error {
220220
return nil
221221
}
222222

223-
func (h *CreateLambdaHandler) getWorkloadIDs(opts *types.ListWorkloadsOptions) ([]string, error) {
224-
ctx, cancel := getReplayContext(context.Background())
223+
func (h *CreateLambdaHandler) getWorkloadIDs(ctx context.Context, opts *types.ListWorkloadsOptions) ([]string, error) {
224+
ctx, cancel := getReplayContext(ctx)
225225
defer cancel()
226226

227227
workloads, err := h.calcium.ListWorkloads(ctx, opts)

cluster/calcium/wal_test.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ func TestHandleCreateWorkloadNoHandle(t *testing.T) {
3131
defer store.AssertExpectations(t)
3232
store.On("GetWorkload", mock.Anything, wrkid).Return(wrk, nil).Once()
3333

34-
c.wal.Recover()
34+
c.wal.Recover(context.TODO())
3535

3636
// Recovers nothing.
37-
c.wal.Recover()
37+
c.wal.Recover(context.TODO())
3838
}
3939

4040
func TestHandleCreateWorkloadError(t *testing.T) {
@@ -59,12 +59,12 @@ func TestHandleCreateWorkloadError(t *testing.T) {
5959
store := c.store.(*storemocks.Store)
6060
defer store.AssertExpectations(t)
6161
store.On("GetWorkload", mock.Anything, wrkid).Return(wrk, fmt.Errorf("err")).Once()
62-
c.wal.Recover()
62+
c.wal.Recover(context.TODO())
6363

6464
err = types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("keys: [%s]", wrkid))
6565
store.On("GetWorkload", mock.Anything, wrkid).Return(wrk, err)
6666
store.On("GetNode", mock.Anything, wrk.Nodename).Return(nil, fmt.Errorf("err")).Once()
67-
c.wal.Recover()
67+
c.wal.Recover(context.TODO())
6868

6969
store.On("GetNode", mock.Anything, wrk.Nodename).Return(node, nil)
7070
eng, ok := node.Engine.(*enginemocks.API)
@@ -73,15 +73,15 @@ func TestHandleCreateWorkloadError(t *testing.T) {
7373
eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true).
7474
Return(fmt.Errorf("err")).
7575
Once()
76-
c.wal.Recover()
76+
c.wal.Recover(context.TODO())
7777

7878
eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true).
7979
Return(fmt.Errorf("Error: No such container: %s", wrk.ID)).
8080
Once()
81-
c.wal.Recover()
81+
c.wal.Recover(context.TODO())
8282

8383
// Nothing recovered.
84-
c.wal.Recover()
84+
c.wal.Recover(context.TODO())
8585
}
8686

8787
func TestHandleCreateWorkloadHandled(t *testing.T) {
@@ -118,10 +118,10 @@ func TestHandleCreateWorkloadHandled(t *testing.T) {
118118
Return(nil).
119119
Once()
120120

121-
c.wal.Recover()
121+
c.wal.Recover(context.TODO())
122122

123123
// Recovers nothing.
124-
c.wal.Recover()
124+
c.wal.Recover(context.TODO())
125125
}
126126

127127
func TestHandleCreateLambda(t *testing.T) {
@@ -154,7 +154,7 @@ func TestHandleCreateLambda(t *testing.T) {
154154
Return(nil, fmt.Errorf("err")).
155155
Once()
156156
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
157-
c.wal.Recover()
157+
c.wal.Recover(context.TODO())
158158

159159
store.On("ListWorkloads", mock.Anything, deployOpts.Name, deployOpts.Entrypoint.Name, "", int64(0), deployOpts.Labels).
160160
Return([]*types.Workload{wrk}, nil).
@@ -179,11 +179,11 @@ func TestHandleCreateLambda(t *testing.T) {
179179
Once()
180180

181181
lock := &lockmocks.DistributedLock{}
182-
lock.On("Lock", mock.Anything).Return(context.Background(), nil)
182+
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
183183
lock.On("Unlock", mock.Anything).Return(nil)
184184
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
185185

186-
c.wal.Recover()
186+
c.wal.Recover(context.TODO())
187187
// Recovered nothing.
188-
c.wal.Recover()
188+
c.wal.Recover(context.TODO())
189189
}

core.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func serve(c *cli.Context) error {
8181
return err
8282
}
8383
defer cluster.Finalizer()
84-
cluster.DisasterRecover()
84+
cluster.DisasterRecover(c.Context)
8585

8686
stop := make(chan struct{}, 1)
8787
vibranium := rpc.New(cluster, config, stop)

store/redis/ephemeral.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func (r *Rediaron) StartEphemeral(ctx context.Context, path string, heartbeat ti
2222
return nil, nil, errors.Wrap(types.ErrKeyExists, path)
2323
}
2424

25-
cctx, cancel := context.WithCancel(context.Background())
25+
ctx, cancel := context.WithCancel(ctx)
2626
expiry := make(chan struct{})
2727

2828
var wg sync.WaitGroup
@@ -37,11 +37,11 @@ func (r *Rediaron) StartEphemeral(ctx context.Context, path string, heartbeat ti
3737
for {
3838
select {
3939
case <-tick.C:
40-
if err := r.refreshEphemeral(path, heartbeat); err != nil {
40+
if err := r.refreshEphemeral(ctx, path, heartbeat); err != nil {
4141
r.revokeEphemeral(path)
4242
return
4343
}
44-
case <-cctx.Done():
44+
case <-ctx.Done():
4545
r.revokeEphemeral(path)
4646
return
4747
}
@@ -55,15 +55,15 @@ func (r *Rediaron) StartEphemeral(ctx context.Context, path string, heartbeat ti
5555
}
5656

5757
func (r *Rediaron) revokeEphemeral(path string) {
58-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
58+
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
5959
defer cancel()
6060
if _, err := r.cli.Del(ctx, path).Result(); err != nil {
6161
log.Errorf(context.TODO(), "[refreshEphemeral] revoke with %s failed: %v", path, err)
6262
}
6363
}
6464

65-
func (r *Rediaron) refreshEphemeral(path string, ttl time.Duration) error {
66-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
65+
func (r *Rediaron) refreshEphemeral(ctx context.Context, path string, ttl time.Duration) error {
66+
ctx, cancel := context.WithTimeout(ctx, time.Second)
6767
defer cancel()
6868
_, err := r.cli.Expire(ctx, path, ttl).Result()
6969
return err

wal/hydro.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (h *Hydro) Register(handler EventHandler) {
4242
}
4343

4444
// Recover starts a disaster recovery, which will replay all the events.
45-
func (h *Hydro) Recover() {
45+
func (h *Hydro) Recover(ctx context.Context) {
4646
ch, _ := h.kv.Scan([]byte(EventPrefix))
4747

4848
events := []HydroEvent{}
@@ -62,27 +62,27 @@ func (h *Hydro) Recover() {
6262
continue
6363
}
6464

65-
if err := h.recover(handler, ev); err != nil {
65+
if err := h.recover(ctx, handler, ev); err != nil {
6666
log.Errorf(context.TODO(), "[Recover] handle event %d (%s) failed: %v", ev.ID, ev.Type, err)
6767
continue
6868
}
6969
}
7070
}
7171

72-
func (h *Hydro) recover(handler EventHandler, event HydroEvent) error {
72+
func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error {
7373
item, err := handler.Decode(event.Item)
7474
if err != nil {
7575
return err
7676
}
7777

78-
switch handle, err := handler.Check(item); {
78+
switch handle, err := handler.Check(ctx, item); {
7979
case err != nil:
8080
return err
8181
case !handle:
8282
return event.Delete()
8383
}
8484

85-
if err := handler.Handle(item); err != nil {
85+
if err := handler.Handle(ctx, item); err != nil {
8686
return err
8787
}
8888

wal/hydro_test.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package wal
22

33
import (
4+
"context"
45
"fmt"
56
"io/ioutil"
67
"os"
@@ -74,7 +75,7 @@ func TestRecoverFailedAsNoSuchHandler(t *testing.T) {
7475

7576
hydro.handlers.Delete(eventype)
7677

77-
hydro.Recover()
78+
hydro.Recover(context.TODO())
7879
require.True(t, encoded)
7980
require.False(t, decoded)
8081
require.False(t, checked)
@@ -98,7 +99,7 @@ func TestRecoverFailedAsCheckError(t *testing.T) {
9899
require.NoError(t, err)
99100
require.NotNil(t, commit)
100101

101-
hydro.Recover()
102+
hydro.Recover(context.TODO())
102103
require.True(t, encoded)
103104
require.True(t, decoded)
104105
require.True(t, checked)
@@ -145,7 +146,7 @@ func TestRecoverFailedAsDecodeLogError(t *testing.T) {
145146
require.NoError(t, err)
146147
require.NotNil(t, commit)
147148

148-
hydro.Recover()
149+
hydro.Recover(context.TODO())
149150
require.True(t, encoded)
150151
require.True(t, decoded)
151152
require.False(t, checked)
@@ -171,7 +172,7 @@ func TestHydroRecoverDiscardNoNeedEvent(t *testing.T) {
171172
require.NoError(t, err)
172173
require.NotNil(t, commit)
173174

174-
hydro.Recover()
175+
hydro.Recover(context.TODO())
175176
require.True(t, encoded)
176177
require.True(t, decoded)
177178
require.True(t, checked)
@@ -191,7 +192,7 @@ func TestHydroRecover(t *testing.T) {
191192
require.NoError(t, err)
192193
require.NotNil(t, commit)
193194

194-
hydro.Recover()
195+
hydro.Recover(context.TODO())
195196
require.True(t, encoded)
196197
require.True(t, decoded)
197198
require.True(t, checked)
@@ -236,7 +237,7 @@ func TestHydroRecoverWithRealLithium(t *testing.T) {
236237
hydro.Log(handler.event, struct{}{})
237238
hydro.Log(handler.event, struct{}{})
238239

239-
hydro.Recover()
240+
hydro.Recover(context.TODO())
240241

241242
ch, _ := hydro.kv.Scan([]byte(EventPrefix))
242243
for range ch {

wal/mocks/WAL.go

+4-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)