Skip to content

Commit b28ca2c

Browse files
committed
refactor wal2
1 parent 1b765fa commit b28ca2c

13 files changed

+1280
-0
lines changed

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mock: deps
3636
mockery --dir store --output store/mocks --name Store
3737
mockery --dir engine --output engine/mocks --name API
3838
mockery --dir cluster --output cluster/mocks --name Cluster
39+
mockery --dir wal2 --output wal2/mocks --name WAL
3940
mockery --dir lock --output lock/mocks --name DistributedLock
4041
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
4142
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn

types/errors.go

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ var (
8787
ErrEntityNotExists = errors.New("entity not exists")
8888

8989
ErrUnregisteredWALEventType = errors.New("unregistered WAL event type")
90+
ErrBadWALEvent = errors.New("bad WAL event type")
9091
ErrInvalidWALBucket = errors.New("invalid WAL bucket")
9192
ErrInvalidType = errors.New("invalid type")
9293
ErrLockSessionDone = errors.New("lock session done")

wal2/event.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package wal2
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"path/filepath"
7+
"strconv"
8+
"strings"
9+
)
10+
11+
// HydroEvent indicates a log event.
12+
type HydroEvent struct {
13+
// A global unique identifier.
14+
ID uint64 `json:"id"`
15+
16+
// Registered event type name.
17+
Type string `json:"type"`
18+
19+
// The encoded log item.
20+
Item []byte `json:"item"`
21+
}
22+
23+
// NewHydroEvent initializes a new HydroEvent instance.
24+
func NewHydroEvent(ID uint64, typ string, item []byte) *HydroEvent {
25+
return &HydroEvent{ID: ID, Type: typ, Item: item}
26+
}
27+
28+
// Encode this event
29+
func (e HydroEvent) Encode() ([]byte, error) {
30+
return json.MarshalIndent(e, "", "\t")
31+
}
32+
33+
// Key returns this event's key path.
34+
func (e HydroEvent) Key() []byte {
35+
return []byte(filepath.Join(eventPrefix, fmt.Sprintf("%016x", e.ID)))
36+
}
37+
38+
func parseHydroEventID(key []byte) (uint64, error) {
39+
// Trims the EventPrefix, then trims the padding 0.
40+
id := strings.TrimLeft(strings.TrimPrefix(string(key), eventPrefix), "0")
41+
return strconv.ParseUint(id, 16, 64)
42+
}

wal2/hydro.go

+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package wal2
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"time"
7+
8+
"github.com/cornelk/hashmap"
9+
"github.com/projecteru2/core/log"
10+
coretypes "github.com/projecteru2/core/types"
11+
"github.com/projecteru2/core/wal2/kv"
12+
)
13+
14+
const (
15+
fileMode = 0600
16+
)
17+
18+
// Hydro is the simplest wal implementation.
19+
type Hydro struct {
20+
hashmap.HashMap
21+
stor kv.KV
22+
}
23+
24+
// NewHydro initailizes a new Hydro instance.
25+
func NewHydro(path string, timeout time.Duration) (*Hydro, error) {
26+
stor := kv.NewLithium()
27+
if err := stor.Open(path, fileMode, timeout); err != nil {
28+
return nil, err
29+
}
30+
return &Hydro{HashMap: hashmap.HashMap{}, stor: stor}, nil
31+
}
32+
33+
// Close disconnects the kvdb.
34+
func (h *Hydro) Close() error {
35+
return h.stor.Close()
36+
}
37+
38+
// Register registers a new event handler.
39+
func (h *Hydro) Register(handler EventHandler) {
40+
h.Set(handler.Typ(), handler)
41+
}
42+
43+
// Recover starts a disaster recovery, which will replay all the events.
44+
func (h *Hydro) Recover(ctx context.Context) {
45+
ch, _ := h.stor.Scan([]byte(eventPrefix))
46+
47+
events := []HydroEvent{}
48+
for scanEntry := range ch {
49+
event, err := h.decodeEvent(scanEntry)
50+
if err != nil {
51+
log.Errorf(nil, "[Recover] decode event error: %v", err) // nolint
52+
continue
53+
}
54+
events = append(events, event)
55+
}
56+
57+
for _, event := range events {
58+
handler, ok := h.getEventHandler(event.Type)
59+
if !ok {
60+
log.Errorf(nil, "[Recover] no such event handler for %s", event.Type) // nolint
61+
continue
62+
}
63+
64+
if err := h.recover(ctx, handler, event); err != nil {
65+
log.Errorf(nil, "[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) // nolint
66+
continue
67+
}
68+
}
69+
}
70+
71+
// Log records a log item.
72+
func (h *Hydro) Log(eventyp string, item interface{}) (Commit, error) {
73+
handler, ok := h.getEventHandler(eventyp)
74+
if !ok {
75+
return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventyp)
76+
}
77+
78+
bs, err := handler.Encode(item) // TODO 2 times encode is necessary?
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
var id uint64
84+
if id, err = h.stor.NextSequence(); err != nil {
85+
return nil, err
86+
}
87+
88+
event := NewHydroEvent(id, eventyp, bs)
89+
if bs, err = event.Encode(); err != nil {
90+
return nil, coretypes.ErrBadWALEvent
91+
}
92+
93+
if err = h.stor.Put(event.Key(), bs); err != nil {
94+
return nil, err
95+
}
96+
97+
return func() error {
98+
return h.stor.Delete(event.Key())
99+
}, nil
100+
}
101+
102+
func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error {
103+
item, err := handler.Decode(event.Item)
104+
if err != nil {
105+
return err
106+
}
107+
108+
delete := func() error {
109+
return h.stor.Delete(event.Key())
110+
}
111+
112+
switch handle, err := handler.Check(ctx, item); {
113+
case err != nil:
114+
return err
115+
case !handle:
116+
return delete()
117+
default:
118+
if err := handler.Handle(ctx, item); err != nil {
119+
return err
120+
}
121+
}
122+
return delete()
123+
}
124+
125+
func (h *Hydro) getEventHandler(eventyp string) (EventHandler, bool) {
126+
v, ok := h.GetStringKey(eventyp)
127+
if !ok {
128+
return nil, ok
129+
}
130+
handler, ok := v.(EventHandler)
131+
return handler, ok
132+
}
133+
134+
func (h *Hydro) decodeEvent(scanEntry kv.ScanEntry) (event HydroEvent, err error) {
135+
if err = scanEntry.Error(); err != nil {
136+
return
137+
}
138+
139+
key, value := scanEntry.Pair()
140+
if err = json.Unmarshal(value, &event); err != nil {
141+
return
142+
}
143+
144+
event.ID, err = parseHydroEventID(key)
145+
return
146+
}

0 commit comments

Comments
 (0)