Skip to content

Commit a9af172

Browse files
committed
refactor wal2
1 parent 1b765fa commit a9af172

13 files changed

+1276
-0
lines changed

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mock: deps
3636
mockery --dir store --output store/mocks --name Store
3737
mockery --dir engine --output engine/mocks --name API
3838
mockery --dir cluster --output cluster/mocks --name Cluster
39+
mockery --dir wal2 --output wal2/mocks --name WAL
3940
mockery --dir lock --output lock/mocks --name DistributedLock
4041
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
4142
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn

types/errors.go

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ var (
8787
ErrEntityNotExists = errors.New("entity not exists")
8888

8989
ErrUnregisteredWALEventType = errors.New("unregistered WAL event type")
90+
ErrBadWALEvent = errors.New("bad WAL event type")
9091
ErrInvalidWALBucket = errors.New("invalid WAL bucket")
9192
ErrInvalidType = errors.New("invalid type")
9293
ErrLockSessionDone = errors.New("lock session done")

wal2/event.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package wal2
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"path/filepath"
7+
"strconv"
8+
"strings"
9+
)
10+
11+
// HydroEvent indicates a log event.
12+
type HydroEvent struct {
13+
// A global unique identifier.
14+
ID uint64 `json:"id"`
15+
16+
// Registered event type name.
17+
Type string `json:"type"`
18+
19+
// The encoded log item.
20+
Item []byte `json:"item"`
21+
}
22+
23+
// NewHydroEvent initializes a new HydroEvent instance.
24+
func NewHydroEvent(ID uint64, typ string, item []byte) *HydroEvent {
25+
return &HydroEvent{ID: ID, Type: typ, Item: item}
26+
}
27+
28+
// Encode this event
29+
func (e HydroEvent) Encode() ([]byte, error) {
30+
return json.MarshalIndent(e, "", "\t")
31+
}
32+
33+
// Key returns this event's key path.
34+
func (e HydroEvent) Key() []byte {
35+
return []byte(filepath.Join(eventPrefix, fmt.Sprintf("%016x", e.ID)))
36+
}
37+
38+
func parseHydroEventID(key []byte) (uint64, error) {
39+
// Trims the EventPrefix, then trims the padding 0.
40+
id := strings.TrimLeft(strings.TrimPrefix(string(key), eventPrefix), "0")
41+
return strconv.ParseUint(id, 16, 64)
42+
}

wal2/hydro.go

+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package wal2
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"time"
7+
8+
"github.com/cornelk/hashmap"
9+
"github.com/projecteru2/core/log"
10+
coretypes "github.com/projecteru2/core/types"
11+
"github.com/projecteru2/core/wal2/kv"
12+
)
13+
14+
const (
15+
fileMode = 0600
16+
)
17+
18+
// Hydro is the simplest wal implementation.
19+
type Hydro struct {
20+
hashmap.HashMap
21+
stor kv.KV
22+
}
23+
24+
// NewHydro initailizes a new Hydro instance.
25+
func NewHydro(path string, timeout time.Duration) (*Hydro, error) {
26+
stor := kv.NewLithium()
27+
if err := stor.Open(path, fileMode, timeout); err != nil {
28+
return nil, err
29+
}
30+
return &Hydro{HashMap: hashmap.HashMap{}, stor: stor}, nil
31+
}
32+
33+
// Close disconnects the kvdb.
34+
func (h *Hydro) Close() error {
35+
return h.stor.Close()
36+
}
37+
38+
// Register registers a new event handler.
39+
func (h *Hydro) Register(handler EventHandler) {
40+
h.Set(handler.Typ(), handler)
41+
}
42+
43+
// Recover starts a disaster recovery, which will replay all the events.
44+
func (h *Hydro) Recover(ctx context.Context) {
45+
ch, _ := h.stor.Scan([]byte(eventPrefix))
46+
47+
for scanEntry := range ch {
48+
event, err := h.decodeEvent(scanEntry)
49+
if err != nil {
50+
log.Errorf(nil, "[Recover] decode event error: %v", err) // nolint
51+
continue
52+
}
53+
54+
handler, ok := h.getEventHandler(event.Type)
55+
if !ok {
56+
log.Errorf(nil, "[Recover] no such event handler for %s", event.Type) // nolint
57+
continue
58+
}
59+
60+
if err := h.recover(ctx, handler, event); err != nil {
61+
log.Errorf(nil, "[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) // nolint
62+
continue
63+
}
64+
}
65+
}
66+
67+
// Log records a log item.
68+
func (h *Hydro) Log(eventyp string, item interface{}) (Commit, error) {
69+
handler, ok := h.getEventHandler(eventyp)
70+
if !ok {
71+
return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventyp)
72+
}
73+
74+
bs, err := handler.Encode(item) // TODO 2 times encode is necessary?
75+
if err != nil {
76+
return nil, err
77+
}
78+
79+
var id uint64
80+
if id, err = h.stor.NextSequence(); err != nil {
81+
return nil, err
82+
}
83+
84+
event := NewHydroEvent(id, eventyp, bs)
85+
if bs, err = event.Encode(); err != nil {
86+
return nil, coretypes.ErrBadWALEvent
87+
}
88+
89+
if err = h.stor.Put(event.Key(), bs); err != nil {
90+
return nil, err
91+
}
92+
93+
return func() error {
94+
return h.stor.Delete(event.Key())
95+
}, nil
96+
}
97+
98+
func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error {
99+
item, err := handler.Decode(event.Item)
100+
if err != nil {
101+
return err
102+
}
103+
104+
delete := func() error {
105+
return h.stor.Delete(event.Key())
106+
}
107+
108+
switch handle, err := handler.Check(ctx, item); {
109+
case err != nil:
110+
return err
111+
case !handle:
112+
return delete()
113+
default:
114+
if err := handler.Handle(ctx, item); err != nil {
115+
return err
116+
}
117+
}
118+
return delete()
119+
}
120+
121+
func (h *Hydro) getEventHandler(eventyp string) (EventHandler, bool) {
122+
v, ok := h.GetStringKey(eventyp)
123+
if !ok {
124+
return nil, ok
125+
}
126+
handler, ok := v.(EventHandler)
127+
return handler, ok
128+
}
129+
130+
func (h *Hydro) decodeEvent(scanEntry kv.ScanEntry) (event HydroEvent, err error) {
131+
if err = scanEntry.Error(); err != nil {
132+
return
133+
}
134+
135+
key, value := scanEntry.Pair()
136+
if err = json.Unmarshal(value, &event); err != nil {
137+
return
138+
}
139+
140+
event.ID, err = parseHydroEventID(key)
141+
return
142+
}

0 commit comments

Comments
 (0)