Skip to content

Commit 7624030

Browse files
anrsanrs
and
anrs
authored
fix: wal deadlock (#375)
Co-authored-by: anrs <anders.hu@shopee.com>
1 parent 6e5e014 commit 7624030

File tree

3 files changed

+50
-27
lines changed

3 files changed

+50
-27
lines changed

wal/hydro.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,25 @@ func (h *Hydro) Register(handler EventHandler) {
4444
func (h *Hydro) Recover() {
4545
ch, _ := h.kv.Scan([]byte(EventPrefix))
4646

47+
events := []HydroEvent{}
4748
for ent := range ch {
48-
event, err := h.decodeEvent(ent)
49+
ev, err := h.decodeEvent(ent)
4950
if err != nil {
5051
log.Errorf("[Recover] decode event error: %v", err)
5152
continue
5253
}
54+
events = append(events, ev)
55+
}
5356

54-
handler, ok := h.getEventHandler(event.Type)
57+
for _, ev := range events {
58+
handler, ok := h.getEventHandler(ev.Type)
5559
if !ok {
56-
log.Errorf("[Recover] no such event handler for %s", event.Type)
60+
log.Errorf("[Recover] no such event handler for %s", ev.Type)
5761
continue
5862
}
5963

60-
if err := h.recover(handler, event); err != nil {
61-
log.Errorf("[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err)
64+
if err := h.recover(handler, ev); err != nil {
65+
log.Errorf("[Recover] handle event %d (%s) failed: %v", ev.ID, ev.Type, err)
6266
continue
6367
}
6468
}
@@ -129,7 +133,6 @@ func (h *Hydro) decodeEvent(ent kv.ScanEntry) (event HydroEvent, err error) {
129133
}
130134

131135
event.kv = h.kv
132-
133136
event.ID, err = parseHydroEventID(key)
134137

135138
return

wal/hydro_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package wal
22

33
import (
44
"fmt"
5+
"io/ioutil"
6+
"os"
7+
"path/filepath"
58
"testing"
9+
"time"
610

711
"github.com/stretchr/testify/require"
812

@@ -211,6 +215,41 @@ func TestHydroEventParseIDShouldRemovePadding(t *testing.T) {
211215
require.Equal(t, uint64(15), id)
212216
}
213217

218+
func TestHydroRecoverWithRealLithium(t *testing.T) {
219+
dir, rmdir := tempdir(t)
220+
defer rmdir()
221+
222+
hydro := NewHydro()
223+
// Uses a real Lithium instance rather than a mocked one.
224+
require.NoError(t, hydro.Open(filepath.Join(dir, "temp.wal"), time.Second))
225+
226+
handler := SimpleEventHandler{
227+
event: "create",
228+
encode: func(interface{}) ([]byte, error) { return []byte("{}"), nil },
229+
decode: func([]byte) (interface{}, error) { return struct{}{}, nil },
230+
check: func(interface{}) (bool, error) { return true, nil },
231+
handle: func(interface{}) error { return nil },
232+
}
233+
hydro.Register(handler)
234+
235+
hydro.Log(handler.event, struct{}{})
236+
hydro.Log(handler.event, struct{}{})
237+
hydro.Log(handler.event, struct{}{})
238+
239+
hydro.Recover()
240+
241+
ch, _ := hydro.kv.Scan([]byte(EventPrefix))
242+
for range ch {
243+
require.FailNow(t, "expects no data")
244+
}
245+
}
246+
247+
func tempdir(t *testing.T) (string, func()) {
248+
dir, err := ioutil.TempDir("", "temp.wal")
249+
require.NoError(t, err)
250+
return dir, func() { os.RemoveAll(dir) }
251+
}
252+
214253
func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bool) SimpleEventHandler {
215254
check := func(interface{}) (bool, error) {
216255
*checked = true

wal/kv/lithium.go

+2-21
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,13 @@ func (l *Lithium) close() error {
8181

8282
// Put creates/updates a key/value pair.
8383
func (l *Lithium) Put(key []byte, value []byte) (err error) {
84-
l.Lock()
85-
defer l.Unlock()
86-
8784
return l.update(func(bkt *bolt.Bucket) error {
8885
return bkt.Put(key, value)
8986
})
9087
}
9188

9289
// Get read a key's value.
9390
func (l *Lithium) Get(key []byte) (dst []byte, err error) {
94-
l.Lock()
95-
defer l.Unlock()
96-
9791
err = l.view(func(bkt *bolt.Bucket) error {
9892
src := bkt.Get(key)
9993
dst = make([]byte, len(src))
@@ -110,8 +104,6 @@ func (l *Lithium) Get(key []byte) (dst []byte, err error) {
110104

111105
// Delete deletes a key.
112106
func (l *Lithium) Delete(key []byte) error {
113-
l.Lock()
114-
defer l.Unlock()
115107
return l.update(func(bkt *bolt.Bucket) error {
116108
return bkt.Delete(key)
117109
})
@@ -130,32 +122,24 @@ func (l *Lithium) Scan(prefix []byte) (<-chan ScanEntry, func()) {
130122
go func() {
131123
defer close(ch)
132124

133-
l.Lock()
134-
defer l.Unlock()
135-
136125
close(locked)
137-
ent := &LithiumScanEntry{}
138126

139127
scan := func(bkt *bolt.Bucket) error {
140128
c := bkt.Cursor()
141129
for key, value := c.First(); key != nil && bytes.HasPrefix(key, prefix); key, value = c.Next() {
142-
ent.key = key
143-
ent.value = value
144-
145130
select {
146131
case <-exit:
147132
return nil
148-
case ch <- *ent:
133+
case ch <- LithiumScanEntry{key: key, value: value}:
149134
}
150135
}
151136
return nil
152137
}
153138

154139
if err := l.view(scan); err != nil {
155-
ent.err = err
156140
select {
157141
case <-exit:
158-
case ch <- *ent:
142+
case ch <- LithiumScanEntry{err: err}:
159143
}
160144
}
161145
}()
@@ -168,9 +152,6 @@ func (l *Lithium) Scan(prefix []byte) (<-chan ScanEntry, func()) {
168152

169153
// NextSequence generates a new sequence.
170154
func (l *Lithium) NextSequence() (uint64, error) {
171-
l.Lock()
172-
defer l.Unlock()
173-
174155
var seq uint64
175156
err := l.update(func(bkt *bolt.Bucket) (ue error) {
176157
seq, ue = bkt.NextSequence()

0 commit comments

Comments
 (0)