Skip to content

Commit d145b4c

Browse files
authored
optimize engine cache checker (#539)
1 parent 0f54c24 commit d145b4c

File tree

7 files changed

+172
-79
lines changed

7 files changed

+172
-79
lines changed

core.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ func serve(c *cli.Context) error {
7272
return err
7373
}
7474

75+
factory.InitEngineCache(c.Context, config)
76+
7577
var t *testing.T
7678
if embeddedStorage {
7779
t = &testing.T{}
@@ -133,9 +135,6 @@ func serve(c *cli.Context) error {
133135
ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
134136
defer cancel()
135137

136-
// start engine cache checker
137-
go factory.EngineCacheChecker(ctx, config.ConnectionTimeout)
138-
139138
<-ctx.Done()
140139

141140
log.Info("[main] Interrupt by signal")

engine/factory/factory.go

+121-39
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/projecteru2/core/engine"
1111
"github.com/projecteru2/core/engine/docker"
12+
"github.com/projecteru2/core/engine/fake"
1213
"github.com/projecteru2/core/engine/mocks/fakeengine"
1314
"github.com/projecteru2/core/engine/systemd"
1415
"github.com/projecteru2/core/engine/virt"
@@ -28,44 +29,109 @@ var (
2829
systemd.TCPPrefix: systemd.MakeClient,
2930
fakeengine.PrefixKey: fakeengine.MakeClient,
3031
}
31-
engineCache = utils.NewEngineCache(12*time.Hour, 10*time.Minute)
32-
keysToCheck = sync.Map{}
32+
engineCache *EngineCache
3333
)
3434

3535
func getEngineCacheKey(endpoint, ca, cert, key string) string {
3636
return endpoint + "-" + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8]
3737
}
3838

39-
// EngineCacheChecker checks if the engine in cache is available
40-
func EngineCacheChecker(ctx context.Context, timeout time.Duration) {
41-
log.Info("[EngineCacheChecker] starts")
42-
defer log.Info("[EngineCacheChecker] ends")
39+
type engineParams struct {
40+
endpoint string
41+
ca string
42+
cert string
43+
key string
44+
}
45+
46+
func (ep engineParams) getCacheKey() string {
47+
return getEngineCacheKey(ep.endpoint, ep.ca, ep.cert, ep.key)
48+
}
49+
50+
type EngineCache struct {
51+
cache *utils.EngineCache
52+
keysToCheck sync.Map
53+
config types.Config
54+
}
55+
56+
// NewEngineCache .
57+
func NewEngineCache(config types.Config) *EngineCache {
58+
return &EngineCache{
59+
cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute),
60+
keysToCheck: sync.Map{},
61+
config: config,
62+
}
63+
}
64+
65+
// InitEngineCache init engine cache and start engine cache checker
66+
func InitEngineCache(ctx context.Context, config types.Config) {
67+
engineCache = NewEngineCache(config)
68+
go engineCache.CheckAlive(ctx)
69+
}
70+
71+
// Get .
72+
func (e *EngineCache) Get(key string) engine.API {
73+
return e.cache.Get(key)
74+
}
75+
76+
// Set .
77+
func (e *EngineCache) Set(params engineParams, client engine.API) {
78+
e.cache.Set(params.getCacheKey(), client)
79+
e.keysToCheck.Store(params, struct{}{})
80+
}
81+
82+
// Delete .
83+
func (e *EngineCache) Delete(key string) {
84+
e.cache.Delete(key)
85+
}
86+
87+
// CheckAlive checks if the engine in cache is available
88+
func (e *EngineCache) CheckAlive(ctx context.Context) {
89+
log.Info("[EngineCache] starts")
90+
defer log.Info("[EngineCache] ends")
4391
for {
4492
select {
4593
case <-ctx.Done():
4694
return
4795
default:
4896
}
4997

50-
keysToRemove := []string{}
51-
keysToCheck.Range(func(key, _ interface{}) bool {
52-
cacheKey := key.(string)
53-
client := engineCache.Get(cacheKey)
54-
if client == nil {
55-
keysToRemove = append(keysToRemove, cacheKey)
98+
paramsChan := make(chan engineParams)
99+
go func() {
100+
e.keysToCheck.Range(func(key, _ interface{}) bool {
101+
paramsChan <- key.(engineParams)
56102
return true
57-
}
58-
if err := validateEngine(ctx, client, timeout); err != nil {
59-
log.Errorf(ctx, "[GetEngineFromCache] engine %v is unavailable, will be removed from cache, err: %v", cacheKey, err)
60-
keysToRemove = append(keysToRemove, cacheKey)
61-
}
62-
return true
63-
})
64-
for _, key := range keysToRemove {
65-
engineCache.Delete(key)
66-
keysToCheck.Delete(key)
103+
})
104+
close(paramsChan)
105+
}()
106+
107+
pool := utils.NewGoroutinePool(int(e.config.MaxConcurrency))
108+
for params := range paramsChan {
109+
params := params
110+
pool.Go(ctx, func() {
111+
cacheKey := params.getCacheKey()
112+
client := e.cache.Get(cacheKey)
113+
if client == nil {
114+
e.cache.Delete(params.getCacheKey())
115+
e.keysToCheck.Delete(params)
116+
return
117+
}
118+
if _, ok := client.(*fake.Engine); ok {
119+
if newClient, err := newEngine(ctx, e.config, utils.RandomString(8), params.endpoint, params.ca, params.key, params.cert); err != nil {
120+
log.Errorf(ctx, "[EngineCache] engine %v is still unavailable, err: %v", cacheKey, err)
121+
} else {
122+
e.cache.Set(cacheKey, newClient)
123+
}
124+
return
125+
}
126+
if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil {
127+
log.Errorf(ctx, "[EngineCache] engine %v is unavailable, will be replaced with a fake engine, err: %v", cacheKey, err)
128+
e.cache.Set(cacheKey, &fake.Engine{DefaultErr: err})
129+
}
130+
})
67131
}
68-
time.Sleep(timeout)
132+
133+
pool.Wait(ctx)
134+
time.Sleep(e.config.ConnectionTimeout)
69135
}
70136
}
71137

@@ -88,21 +154,8 @@ func RemoveEngineFromCache(endpoint, ca, cert, key string) {
88154
engineCache.Delete(cacheKey)
89155
}
90156

91-
// GetEngine get engine
92-
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
93-
if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil {
94-
return client, nil
95-
}
96-
97-
defer func() {
98-
if err == nil && client != nil {
99-
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
100-
engineCache.Set(cacheKey, client)
101-
keysToCheck.Store(cacheKey, struct{}{})
102-
log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey)
103-
}
104-
}()
105-
157+
// newEngine get engine
158+
func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
106159
prefix, err := getEnginePrefix(endpoint)
107160
if err != nil {
108161
return nil, err
@@ -111,7 +164,10 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
111164
if !ok {
112165
return nil, types.ErrNotSupport
113166
}
114-
if client, err = e(ctx, config, nodename, endpoint, ca, cert, key); err != nil {
167+
utils.WithTimeout(ctx, config.ConnectionTimeout, func(ctx context.Context) {
168+
client, err = e(ctx, config, nodename, endpoint, ca, cert, key)
169+
})
170+
if err != nil {
115171
return nil, err
116172
}
117173
if err = validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
@@ -121,6 +177,32 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
121177
return client, nil
122178
}
123179

180+
// GetEngine get engine with cache
181+
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
182+
if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil {
183+
return client, nil
184+
}
185+
186+
defer func() {
187+
params := engineParams{
188+
endpoint: endpoint,
189+
ca: ca,
190+
cert: cert,
191+
key: key,
192+
}
193+
cacheKey := params.getCacheKey()
194+
if err == nil {
195+
engineCache.Set(params, client)
196+
log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey)
197+
} else {
198+
engineCache.Set(params, &fake.Engine{DefaultErr: err})
199+
log.Infof(ctx, "[GetEngine] store fake engine %v in cache", cacheKey)
200+
}
201+
}()
202+
203+
return newEngine(ctx, config, nodename, endpoint, ca, cert, key)
204+
}
205+
124206
func getEnginePrefix(endpoint string) (string, error) {
125207
for prefix := range engines {
126208
if strings.HasPrefix(endpoint, prefix) {

0 commit comments

Comments
 (0)