Skip to content

Commit 5914b8f

Browse files
committed
enable redis as store
1 parent a2ba499 commit 5914b8f

File tree

8 files changed

+64
-282
lines changed

8 files changed

+64
-282
lines changed

cluster/calcium/calcium.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/projecteru2/core/source/gitlab"
1717
"github.com/projecteru2/core/store"
1818
"github.com/projecteru2/core/store/etcdv3"
19+
"github.com/projecteru2/core/store/redis"
1920
"github.com/projecteru2/core/types"
2021
)
2122

@@ -32,10 +33,21 @@ type Calcium struct {
3233
// New returns a new cluster config
3334
func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
3435
logger := log.WithField("Calcium", "New").WithField("config", config)
36+
3537
// set store
36-
store, err := etcdv3.New(config, embeddedStorage)
37-
if err != nil {
38-
return nil, logger.Err(errors.WithStack(err))
38+
var store store.Store
39+
var err error
40+
switch config.Store {
41+
case types.Etcd:
42+
store, err = etcdv3.New(config, embeddedStorage)
43+
if err != nil {
44+
return nil, logger.Err(errors.WithStack(err))
45+
}
46+
case types.Redis:
47+
store, err = redis.New(config, embeddedStorage)
48+
if err != nil {
49+
return nil, logger.Err(errors.WithStack(err))
50+
}
3951
}
4052

4153
// set scheduler

core.yaml.sample

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ grpc:
1818
service_discovery_interval: 5s # WatchServiceStatus push interval
1919
service_heartbeat_interval: 5s # RegisterService heartbeat
2020

21+
store: etcd
22+
2123
etcd:
2224
machines:
2325
- "http://127.0.0.1:2379"

go.sum

-183
Large diffs are not rendered by default.

store/redis/helper.go

+35
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
11
package redis
22

33
import (
4+
"context"
45
"strings"
56

67
"github.com/projecteru2/core/strategy"
78
)
89

10+
// extracts node name from key
11+
// /nodestatus/nodename -> nodename
12+
func extractNodename(s string) string {
13+
ps := strings.Split(s, "/")
14+
return ps[len(ps)-1]
15+
}
16+
917
func parseStatusKey(key string) (string, string, string, string) {
1018
parts := strings.Split(key, "/")
1119
l := len(parts)
@@ -19,3 +27,30 @@ func setCount(nodesCount map[string]int, strategyInfos []strategy.Info) {
1927
}
2028
}
2129
}
30+
31+
// getByKeyPattern gets key-value pairs that key matches pattern
32+
func (r *Rediaron) getByKeyPattern(ctx context.Context, pattern string, limit int64) (map[string]string, error) {
33+
var (
34+
cursor uint64
35+
result []string
36+
err error
37+
count int64
38+
keys = []string{}
39+
)
40+
for {
41+
result, cursor, err = r.cli.Scan(ctx, cursor, pattern, 0).Result()
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
keys = append(keys, result...)
47+
count += int64(len(result))
48+
if cursor == 0 || (limit > 0 && count >= limit) {
49+
break
50+
}
51+
}
52+
if limit > 0 && int64(len(keys)) >= limit {
53+
keys = keys[:limit]
54+
}
55+
return r.GetMulti(ctx, keys)
56+
}

store/redis/node.go

-7
Original file line numberDiff line numberDiff line change
@@ -361,10 +361,3 @@ func (r *Rediaron) NodeStatusStream(ctx context.Context) chan *types.NodeStatus
361361
}()
362362
return ch
363363
}
364-
365-
// extracts node name from key
366-
// /nodestatus/nodename -> nodename
367-
func extractNodename(s string) string {
368-
ps := strings.Split(s, "/")
369-
return ps[len(ps)-1]
370-
}

store/redis/rediaron.go

+1-50
Original file line numberDiff line numberDiff line change
@@ -65,41 +65,19 @@ type Rediaron struct {
6565
// New creates a new Rediaron instance from config
6666
// Only redis address and db is used
6767
// db is used to separate data, by default db 0 will be used
68-
func New(config types.Config) (*Rediaron, error) {
68+
func New(config types.Config, embeddedStorage bool) (*Rediaron, error) {
6969
cli := redis.NewClient(&redis.Options{
7070
Addr: config.Redis.Addr,
7171
DB: config.Redis.DB,
7272
})
7373

74-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
75-
defer cancel()
76-
77-
_, err := cli.Ping(ctx).Result()
78-
if err != nil {
79-
return nil, err
80-
}
81-
8274
return &Rediaron{
8375
cli: cli,
8476
config: config,
8577
db: config.Redis.DB,
8678
}, nil
8779
}
8880

89-
// // keysWatchedWithRetry provides a method to retry a transaction
90-
// func (r *Rediaron) keysWatchedWithRetry(ctx context.Context, keys []string, retry int, txn func(*redis.Tx) error) error {
91-
// for i := 0; i < retry; i++ {
92-
// err := r.cli.Watch(ctx, txn, keys...)
93-
// if err == redis.TxFailedErr {
94-
// // value of watched keys changed, retry
95-
// continue
96-
// }
97-
// // no error or other errors, return
98-
// return err
99-
// }
100-
// return ErrMaxRetryExceeded
101-
// }
102-
10381
// KNotifyMessage is received when using KNotify
10482
type KNotifyMessage struct {
10583
Key string
@@ -134,33 +112,6 @@ func (r *Rediaron) KNotify(ctx context.Context, pattern string) chan *KNotifyMes
134112
return ch
135113
}
136114

137-
// getByKeyPattern gets key-value pairs that key matches pattern
138-
func (r *Rediaron) getByKeyPattern(ctx context.Context, pattern string, limit int64) (map[string]string, error) {
139-
var (
140-
cursor uint64
141-
result []string
142-
err error
143-
count int64
144-
keys = []string{}
145-
)
146-
for {
147-
result, cursor, err = r.cli.Scan(ctx, cursor, pattern, 0).Result()
148-
if err != nil {
149-
return nil, err
150-
}
151-
152-
keys = append(keys, result...)
153-
count += int64(len(result))
154-
if cursor == 0 || (limit > 0 && count >= limit) {
155-
break
156-
}
157-
}
158-
if limit > 0 && int64(len(keys)) >= limit {
159-
keys = keys[:limit]
160-
}
161-
return r.GetMulti(ctx, keys)
162-
}
163-
164115
// GetOne is a wrapper
165116
func (r *Rediaron) GetOne(ctx context.Context, key string) (string, error) {
166117
return r.cli.Get(ctx, key).Result()

store/redis/rediaron_test.go

-37
Original file line numberDiff line numberDiff line change
@@ -26,43 +26,6 @@ func (s *RediaronTestSuite) TearDownTest() {
2626
s.rediaron.cli.FlushAll(context.Background())
2727
}
2828

29-
// func (s *RediaronTestSuite) TestKeysWatchedWithRetry() {
30-
// ctx := context.Background()
31-
//
32-
// update := func(key string) error {
33-
// txf := func(tx *redis.Tx) error {
34-
// n, err := tx.Get(ctx, key).Int()
35-
// if err != nil && err != redis.Nil {
36-
// return err
37-
// }
38-
//
39-
// n++
40-
//
41-
// _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
42-
// pipe.Set(ctx, key, n, 0)
43-
// return nil
44-
// })
45-
// return err
46-
// }
47-
// return s.rediaron.keysWatchedWithRetry(ctx, []string{"test"}, 200, txf)
48-
// }
49-
//
50-
// var wg sync.WaitGroup
51-
// for i := 0; i < 100; i++ {
52-
// wg.Add(1)
53-
// go func() {
54-
// defer wg.Done()
55-
//
56-
// s.NoError(update("test"))
57-
// }()
58-
// }
59-
// wg.Wait()
60-
//
61-
// n, err := s.rediaron.cli.Get(context.Background(), "test").Int()
62-
// s.NoError(err)
63-
// s.Equal(100, n)
64-
// }
65-
6629
func (s *RediaronTestSuite) TestKeyNotify() {
6730
ctx, cancel := context.WithCancel(context.Background())
6831
ch := s.rediaron.KNotify(ctx, "a*")

types/config.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@ import (
44
"time"
55
)
66

7+
const (
8+
// Etcd .
9+
Etcd = "etcd"
10+
// Redis .
11+
Redis = "redis"
12+
)
13+
714
// Config holds eru-core config
815
type Config struct {
916
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
@@ -14,8 +21,10 @@ type Config struct {
1421
Profile string `yaml:"profile"` // profile ip:port
1522
CertPath string `yaml:"cert_path"` // docker cert files path
1623
MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time
17-
Auth AuthConfig `yaml:"auth"` // grpc auth
18-
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
24+
Store string `yaml:"store" default:"etcd"` // store type
25+
26+
Auth AuthConfig `yaml:"auth"` // grpc auth
27+
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
1928

2029
WALFile string `yaml:"wal_file" required:"true" default:"core.wal"` // WAL file path
2130
WALOpenTimeout time.Duration `yaml:"wal_open_timeout" required:"true" default:"8s"` // timeout for opening a WAL file

0 commit comments

Comments
 (0)