Skip to content

Commit dee53e2

Browse files
anrsanrs
and
anrs
authored
refactor: refining those ETCD primitives, so we could depends on them from exterior pkgs. (#316)
Co-authored-by: anrs <anders.hu@shopee.com>
1 parent c69ed71 commit dee53e2

10 files changed

+489
-379
lines changed

store/etcdv3/mercury.go

+5-261
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,11 @@
11
package etcdv3
22

33
import (
4-
"context"
5-
"crypto/tls"
6-
"fmt"
7-
"sync"
84
"time"
95

10-
"go.etcd.io/etcd/pkg/transport"
11-
12-
"github.com/projecteru2/core/log"
13-
14-
"github.com/projecteru2/core/lock"
15-
"github.com/projecteru2/core/lock/etcdlock"
16-
"github.com/projecteru2/core/store/etcdv3/embedded"
6+
"github.com/projecteru2/core/store/etcdv3/meta"
177
"github.com/projecteru2/core/types"
188
"github.com/projecteru2/core/utils"
19-
"go.etcd.io/etcd/clientv3"
20-
"go.etcd.io/etcd/clientv3/namespace"
21-
"go.etcd.io/etcd/mvcc/mvccpb"
229
)
2310

2411
const (
@@ -36,262 +23,19 @@ const (
3623
workloadDeployPrefix = "/deploy" // /deploy/{appname}/{entrypoint}/{nodename}/{workloadID}
3724
workloadStatusPrefix = "/status" // /status/{appname}/{entrypoint}/{nodename}/{workloadID} value -> something by agent
3825
workloadProcessingPrefix = "/processing" // /processing/{appname}/{entrypoint}/{nodename}/{opsIdent} value -> count
39-
40-
cmpVersion = "version"
41-
cmpValue = "value"
4226
)
4327

4428
// Mercury means store with etcdv3
4529
type Mercury struct {
46-
cliv3 *clientv3.Client
30+
meta.KV
4731
config types.Config
4832
}
4933

5034
// New for create a Mercury instance
51-
func New(config types.Config, embeddedStorage bool) (*Mercury, error) {
52-
var cliv3 *clientv3.Client
53-
var err error
54-
var tlsConfig *tls.Config
55-
56-
switch {
57-
case embeddedStorage:
58-
cliv3 = embedded.NewCluster()
59-
log.Info("[Mercury] use embedded cluster")
60-
default:
61-
if config.Etcd.Ca != "" && config.Etcd.Key != "" && config.Etcd.Cert != "" {
62-
tlsInfo := transport.TLSInfo{
63-
TrustedCAFile: config.Etcd.Ca,
64-
KeyFile: config.Etcd.Key,
65-
CertFile: config.Etcd.Cert,
66-
}
67-
tlsConfig, err = tlsInfo.ClientConfig()
68-
if err != nil {
69-
return nil, err
70-
}
71-
}
72-
if cliv3, err = clientv3.New(clientv3.Config{
73-
Endpoints: config.Etcd.Machines,
74-
Username: config.Etcd.Auth.Username,
75-
Password: config.Etcd.Auth.Password,
76-
TLS: tlsConfig,
77-
}); err != nil {
78-
return nil, err
79-
}
80-
}
81-
cliv3.KV = namespace.NewKV(cliv3.KV, config.Etcd.Prefix)
82-
cliv3.Watcher = namespace.NewWatcher(cliv3.Watcher, config.Etcd.Prefix)
83-
cliv3.Lease = namespace.NewLease(cliv3.Lease, config.Etcd.Prefix)
84-
return &Mercury{cliv3: cliv3, config: config}, nil
85-
}
86-
87-
// TerminateEmbededStorage terminate embedded storage
88-
func (m *Mercury) TerminateEmbededStorage() {
89-
embedded.TerminateCluster()
90-
}
91-
92-
// CreateLock create a lock instance
93-
func (m *Mercury) CreateLock(key string, ttl time.Duration) (lock.DistributedLock, error) {
94-
lockKey := fmt.Sprintf("%s/%s", m.config.Etcd.LockPrefix, key)
95-
mutex, err := etcdlock.New(m.cliv3, lockKey, ttl)
96-
return mutex, err
97-
}
98-
99-
// Get get results or noting
100-
func (m *Mercury) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
101-
return m.cliv3.Get(ctx, key, opts...)
102-
}
103-
104-
// GetOne get one result or noting
105-
func (m *Mercury) GetOne(ctx context.Context, key string, opts ...clientv3.OpOption) (*mvccpb.KeyValue, error) {
106-
resp, err := m.Get(ctx, key, opts...)
107-
if err != nil {
108-
return nil, err
109-
}
110-
if resp.Count != 1 {
111-
return nil, types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("key: %s", key))
112-
}
113-
return resp.Kvs[0], nil
114-
}
115-
116-
// GetMulti gets several results
117-
func (m *Mercury) GetMulti(ctx context.Context, keys []string, opts ...clientv3.OpOption) (kvs []*mvccpb.KeyValue, err error) {
118-
var txnResponse *clientv3.TxnResponse
119-
if len(keys) == 0 {
120-
return
121-
}
122-
if txnResponse, err = m.batchGet(ctx, keys); err != nil {
123-
return
124-
}
125-
for idx, responseOp := range txnResponse.Responses {
126-
resp := responseOp.GetResponseRange()
127-
if resp.Count != 1 {
128-
return nil, types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("key: %s", keys[idx]))
129-
}
130-
kvs = append(kvs, resp.Kvs[0])
131-
}
132-
if len(kvs) != len(keys) {
133-
err = types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("keys: %v", keys))
134-
}
35+
func New(config types.Config, embeddedStorage bool) (m *Mercury, err error) {
36+
m = &Mercury{config: config}
37+
m.KV, err = meta.NewETCD(config.Etcd, embeddedStorage)
13538
return
13639
}
13740

138-
// Delete delete key
139-
func (m *Mercury) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
140-
return m.cliv3.Delete(ctx, key, opts...)
141-
}
142-
143-
// Put save a key value
144-
func (m *Mercury) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
145-
return m.cliv3.Put(ctx, key, val, opts...)
146-
}
147-
148-
// Create create a key if not exists
149-
func (m *Mercury) Create(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
150-
return m.batchCreate(ctx, map[string]string{key: val}, opts...)
151-
}
152-
153-
// Update update a key if exists
154-
func (m *Mercury) Update(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
155-
return m.batchUpdate(ctx, map[string]string{key: val}, opts...)
156-
}
157-
158-
// Watch wath a key
159-
func (m *Mercury) watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
160-
return m.cliv3.Watch(ctx, key, opts...)
161-
}
162-
163-
func (m *Mercury) batchGet(ctx context.Context, keys []string, opt ...clientv3.OpOption) (txnResponse *clientv3.TxnResponse, err error) {
164-
ops := []clientv3.Op{}
165-
for _, key := range keys {
166-
op := clientv3.OpGet(key, opt...)
167-
ops = append(ops, op)
168-
}
169-
return m.doBatchOp(ctx, nil, ops, nil)
170-
}
171-
172-
func (m *Mercury) batchDelete(ctx context.Context, keys []string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
173-
ops := []clientv3.Op{}
174-
for _, key := range keys {
175-
op := clientv3.OpDelete(key, opts...)
176-
ops = append(ops, op)
177-
}
178-
179-
return m.doBatchOp(ctx, nil, ops, nil)
180-
}
181-
182-
func (m *Mercury) batchPut(ctx context.Context, data map[string]string, limit map[string]map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
183-
ops := []clientv3.Op{}
184-
failOps := []clientv3.Op{}
185-
conds := []clientv3.Cmp{}
186-
for key, val := range data {
187-
op := clientv3.OpPut(key, val, opts...)
188-
ops = append(ops, op)
189-
if v, ok := limit[key]; ok {
190-
for method, condition := range v {
191-
switch method {
192-
case cmpVersion:
193-
cond := clientv3.Compare(clientv3.Version(key), condition, 0)
194-
conds = append(conds, cond)
195-
case cmpValue:
196-
cond := clientv3.Compare(clientv3.Value(key), condition, val)
197-
failOps = append(failOps, clientv3.OpGet(key))
198-
conds = append(conds, cond)
199-
}
200-
}
201-
}
202-
}
203-
return m.doBatchOp(ctx, conds, ops, failOps)
204-
}
205-
206-
func (m *Mercury) batchCreate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
207-
limit := map[string]map[string]string{}
208-
for key := range data {
209-
limit[key] = map[string]string{cmpVersion: "="}
210-
}
211-
resp, err := m.batchPut(ctx, data, limit, opts...)
212-
if err != nil {
213-
return resp, err
214-
}
215-
if !resp.Succeeded {
216-
return resp, types.ErrKeyExists
217-
}
218-
return resp, nil
219-
}
220-
221-
func (m *Mercury) batchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
222-
limit := map[string]map[string]string{}
223-
for key := range data {
224-
limit[key] = map[string]string{cmpVersion: "!=", cmpValue: "!="} // ignore same data
225-
}
226-
resp, err := m.batchPut(ctx, data, limit, opts...)
227-
if err != nil {
228-
return resp, err
229-
}
230-
if !resp.Succeeded {
231-
for _, failResp := range resp.Responses {
232-
if len(failResp.GetResponseRange().Kvs) == 0 {
233-
return resp, types.ErrKeyNotExists
234-
}
235-
}
236-
}
237-
return resp, nil
238-
}
239-
240-
func (m *Mercury) doBatchOp(ctx context.Context, conds []clientv3.Cmp, ops, failOps []clientv3.Op) (*clientv3.TxnResponse, error) {
241-
if len(ops) == 0 {
242-
return nil, types.ErrNoOps
243-
}
244-
245-
const txnLimit = 125
246-
count := len(ops) / txnLimit // stupid etcd txn, default limit is 128
247-
tail := len(ops) % txnLimit
248-
length := count
249-
if tail != 0 {
250-
length++
251-
}
252-
253-
resps := make([]*clientv3.TxnResponse, length)
254-
errs := make([]error, length)
255-
256-
wg := sync.WaitGroup{}
257-
doOp := func(index int, ops []clientv3.Op) {
258-
defer wg.Done()
259-
txn := m.cliv3.Txn(ctx)
260-
if len(conds) != 0 {
261-
txn = txn.If(conds...)
262-
}
263-
resp, err := txn.Then(ops...).Else(failOps...).Commit()
264-
resps[index] = resp
265-
errs[index] = err
266-
}
267-
268-
if tail != 0 {
269-
wg.Add(1)
270-
go doOp(length-1, ops[count*txnLimit:])
271-
}
272-
273-
for i := 0; i < count; i++ {
274-
wg.Add(1)
275-
go doOp(i, ops[i*txnLimit:(i+1)*txnLimit])
276-
}
277-
wg.Wait()
278-
279-
for _, err := range errs {
280-
if err != nil {
281-
return nil, err
282-
}
283-
}
284-
285-
if len(resps) == 0 {
286-
return &clientv3.TxnResponse{}, nil
287-
}
288-
289-
resp := resps[0]
290-
for i := 1; i < len(resps); i++ {
291-
resp.Succeeded = resp.Succeeded && resps[i].Succeeded
292-
resp.Responses = append(resp.Responses, resps[i].Responses...)
293-
}
294-
return resp, nil
295-
}
296-
29741
var _cache = utils.NewEngineCache(12*time.Hour, 10*time.Minute)

0 commit comments

Comments
 (0)