@@ -7,6 +7,11 @@ import (
7
7
"path/filepath"
8
8
"strconv"
9
9
"strings"
10
+ "sync"
11
+
12
+ "github.com/pkg/errors"
13
+ "go.etcd.io/etcd/api/v3/mvccpb"
14
+ clientv3 "go.etcd.io/etcd/client/v3"
10
15
11
16
"github.com/projecteru2/core/engine"
12
17
enginefactory "github.com/projecteru2/core/engine/factory"
@@ -15,10 +20,6 @@ import (
15
20
"github.com/projecteru2/core/store"
16
21
"github.com/projecteru2/core/types"
17
22
"github.com/projecteru2/core/utils"
18
-
19
- "github.com/pkg/errors"
20
- "go.etcd.io/etcd/api/v3/mvccpb"
21
- clientv3 "go.etcd.io/etcd/client/v3"
22
23
)
23
24
24
25
// AddNode save it to etcd
@@ -145,6 +146,11 @@ func (m *Mercury) GetNodesByPod(ctx context.Context, podname string, labels map[
145
146
// UpdateNodes .
146
147
func (m * Mercury ) UpdateNodes (ctx context.Context , nodes ... * types.Node ) error {
147
148
data := map [string ]string {}
149
+ addIfNotEmpty := func (key , value string ) {
150
+ if value != "" {
151
+ data [key ] = value
152
+ }
153
+ }
148
154
for _ , node := range nodes {
149
155
bytes , err := json .Marshal (node )
150
156
if err != nil {
@@ -153,6 +159,10 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
153
159
d := string (bytes )
154
160
data [fmt .Sprintf (nodeInfoKey , node .Name )] = d
155
161
data [fmt .Sprintf (nodePodKey , node .Podname , node .Name )] = d
162
+ addIfNotEmpty (fmt .Sprintf (nodeCaKey , node .Name ), node .Ca )
163
+ addIfNotEmpty (fmt .Sprintf (nodeCertKey , node .Name ), node .Cert )
164
+ addIfNotEmpty (fmt .Sprintf (nodeKeyKey , node .Name ), node .Key )
165
+ enginefactory .RemoveEngineFromCache (node .Endpoint , node .Ca , node .Cert , node .Key )
156
166
}
157
167
158
168
resp , err := m .BatchUpdate (ctx , data )
@@ -180,13 +190,19 @@ func (m *Mercury) UpdateNodeResource(ctx context.Context, node *types.Node, reso
180
190
}
181
191
182
192
func (m * Mercury ) makeClient (ctx context.Context , node * types.Node ) (client engine.API , err error ) {
193
+ // try to get from cache without ca/cert/key
194
+ if client = enginefactory .GetEngineFromCache (ctx , m .config , node .Endpoint , "" , "" , "" ); client != nil {
195
+ return client , nil
196
+ }
197
+
183
198
keyFormats := []string {nodeCaKey , nodeCertKey , nodeKeyKey }
184
199
data := []string {"" , "" , "" }
185
200
for i := 0 ; i < 3 ; i ++ {
186
201
ev , err := m .GetOne (ctx , fmt .Sprintf (keyFormats [i ], node .Name ))
187
202
if err != nil {
188
203
if ! errors .Is (err , types .ErrBadCount ) {
189
204
log .Warnf (ctx , "[makeClient] Get key failed %v" , err )
205
+ return nil , err
190
206
}
191
207
continue
192
208
}
@@ -282,13 +298,21 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
282
298
return nil , err
283
299
}
284
300
node .Init ()
285
- if (! node .IsDown () || all ) && utils .FilterWorkload (node .Labels , labels ) {
286
- if node .Engine , err = m .makeClient (ctx , node ); err != nil {
287
- return
301
+ nodes = append (nodes , node )
302
+ }
303
+ wg := & sync.WaitGroup {}
304
+ wg .Add (len (nodes ))
305
+ for _ , node := range nodes {
306
+ go func (node * types.Node ) {
307
+ defer wg .Done ()
308
+ if (! node .IsDown () || all ) && utils .FilterWorkload (node .Labels , labels ) {
309
+ if node .Engine , err = m .makeClient (ctx , node ); err != nil {
310
+ return
311
+ }
288
312
}
289
- nodes = append (nodes , node )
290
- }
313
+ }(node )
291
314
}
315
+ wg .Wait ()
292
316
return nodes , nil
293
317
}
294
318
0 commit comments