@@ -27,15 +27,15 @@ import (
27
27
"github.com/aosedge/aos_common/aoserrors"
28
28
"github.com/aosedge/aos_common/aostypes"
29
29
"github.com/aosedge/aos_common/api/cloudprotocol"
30
+ "github.com/aosedge/aos_common/api/iamanager"
30
31
pb "github.com/aosedge/aos_common/api/servicemanager"
31
32
"github.com/aosedge/aos_common/utils/cryptutils"
33
+ "github.com/aosedge/aos_common/utils/grpchelpers"
32
34
"github.com/aosedge/aos_common/utils/pbconvert"
33
35
"github.com/golang/protobuf/ptypes/timestamp"
34
36
log "github.com/sirupsen/logrus"
35
37
"google.golang.org/grpc"
36
38
"google.golang.org/grpc/codes"
37
- "google.golang.org/grpc/credentials"
38
- "google.golang.org/grpc/credentials/insecure"
39
39
"google.golang.org/grpc/status"
40
40
"google.golang.org/protobuf/types/known/timestamppb"
41
41
@@ -77,6 +77,7 @@ type SMClient struct {
77
77
alertChannel <- chan interface {}
78
78
monitoringChannel <- chan aostypes.NodeMonitoring
79
79
logsChannel <- chan cloudprotocol.PushLog
80
+ tlsCertChan <- chan * iamanager.CertInfo
80
81
runStatus * launcher.InstancesStatus
81
82
}
82
83
@@ -87,7 +88,8 @@ type NodeInfoProvider interface {
87
88
88
89
// CertificateProvider interface to get certificate.
89
90
type CertificateProvider interface {
90
- GetCertificate (certType string ) (certURL , ketURL string , err error )
91
+ GetCertificate (certType string , issuer []byte , serial string ) (certURL , ketURL string , err error )
92
+ SubscribeCertChanged (certType string ) (<- chan * iamanager.CertInfo , error )
91
93
}
92
94
93
95
// NodeConfigProcessor node configuration handler.
@@ -160,10 +162,7 @@ func New(config *config.Config, nodeInfoProvider NodeInfoProvider, certificatePr
160
162
layersProcessor : layersProcessor , launcher : launcher , nodeConfigProcessor : nodeConfigProcessor ,
161
163
monitoringProvider : monitoringProvider , logsProvider : logsProvider , networkManager : networkManager ,
162
164
closeChannel : make (chan struct {}, 1 ),
163
- }
164
-
165
- if err := cmClient .createConnection (config , certificateProvider , cryptcoxontext , insecure ); err != nil {
166
- return nil , aoserrors .Wrap (err )
165
+ tlsCertChan : make (<- chan * iamanager.CertInfo ),
167
166
}
168
167
169
168
nodeInfo , err := nodeInfoProvider .GetCurrentNodeInfo ()
@@ -174,6 +173,18 @@ func New(config *config.Config, nodeInfoProvider NodeInfoProvider, certificatePr
174
173
cmClient .nodeID = nodeInfo .NodeID
175
174
cmClient .nodeType = nodeInfo .NodeType
176
175
176
+ if err := cmClient .createConnection (config , certificateProvider , cryptcoxontext , insecure ); err != nil {
177
+ return nil , aoserrors .Wrap (err )
178
+ }
179
+
180
+ if ! insecure {
181
+ if cmClient .tlsCertChan , err = certificateProvider .SubscribeCertChanged (config .CertStorage ); err != nil {
182
+ return nil , aoserrors .Wrap (err )
183
+ }
184
+
185
+ go cmClient .processTlsCertChanged ()
186
+ }
187
+
177
188
if cmClient .launcher != nil {
178
189
cmClient .runtimeStatusChannel = launcher .RuntimeStatusChannel ()
179
190
}
@@ -197,21 +208,11 @@ func New(config *config.Config, nodeInfoProvider NodeInfoProvider, certificatePr
197
208
func (client * SMClient ) Close () (err error ) {
198
209
log .Debug ("Close SM client" )
199
210
200
- if client .stream != nil {
201
- err = client .stream .CloseSend ()
202
- }
203
-
204
- if client .connection != nil {
205
- errCloseConn := client .connection .Close ()
206
-
207
- if err != nil {
208
- err = errCloseConn
209
- }
210
- }
211
-
212
211
close (client .closeChannel )
213
212
214
- return aoserrors .Wrap (err )
213
+ client .closeGRPCConnection ()
214
+
215
+ return nil
215
216
}
216
217
217
218
/***********************************************************************************************************************
@@ -222,72 +223,101 @@ func (client *SMClient) createConnection(
222
223
config * config.Config , provider CertificateProvider ,
223
224
cryptcoxontext * cryptutils.CryptoContext , insecureConn bool ,
224
225
) (err error ) {
225
- log .Debug ("Connecting to CM..." )
226
-
227
- var secureOpt grpc.DialOption
228
-
229
- if insecureConn {
230
- secureOpt = grpc .WithTransportCredentials (insecure .NewCredentials ())
231
- } else {
232
- certURL , keyURL , err := provider .GetCertificate (config .CertStorage )
233
- if err != nil {
234
- return aoserrors .Wrap (err )
235
- }
236
-
237
- tlsConfig , err := cryptcoxontext .GetClientMutualTLSConfig (certURL , keyURL )
238
- if err != nil {
239
- return aoserrors .Wrap (err )
240
- }
241
-
242
- secureOpt = grpc .WithTransportCredentials (credentials .NewTLS (tlsConfig ))
226
+ if err := client .register (config , provider , cryptcoxontext , insecureConn ); err != nil {
227
+ return err
243
228
}
244
229
245
- ctx , cancel := context .WithTimeout (context .Background (), cmRequestTimeout )
246
- defer cancel ()
230
+ go func () {
231
+ for {
232
+ if err = client .processMessages (); err != nil {
233
+ if errors .Is (err , io .EOF ) {
234
+ log .Debug ("Connection is closed" )
235
+ } else {
236
+ log .Errorf ("Connection error: %v" , aoserrors .Wrap (err ))
237
+ }
238
+ }
247
239
248
- if client .connection , err = grpc .DialContext (ctx , config .CMServerURL , secureOpt , grpc .WithBlock ()); err != nil {
249
- return aoserrors .Wrap (err )
250
- }
240
+ log .Debugf ("Reconnect to CM in %v..." , cmReconnectTimeout )
251
241
252
- log . Debug ( "Connected to CM" )
242
+ client . closeGRPCConnection ( )
253
243
254
- go func () {
255
- err := client .register ()
244
+ reconnectionLoop:
245
+ for {
246
+ select {
247
+ case <- client .closeChannel :
248
+ log .Debug ("Disconnected from CM" )
256
249
257
- for {
258
- if err != nil && len (client .closeChannel ) == 0 {
259
- log .Errorf ("Error register to CM: %v" , aoserrors .Wrap (err ))
260
- } else {
261
- if err = client .processMessages (); err != nil {
262
- if errors .Is (err , io .EOF ) {
263
- log .Debug ("Connection is closed" )
250
+ return
251
+
252
+ case <- time .After (cmReconnectTimeout ):
253
+ if err := client .register (config , provider , cryptcoxontext , insecureConn ); err != nil {
254
+ log .WithField ("err" , err ).Debug ("Reconnection failed" )
264
255
} else {
265
- log . Errorf ( "Connection error: %v" , aoserrors . Wrap ( err ))
256
+ break reconnectionLoop
266
257
}
267
258
}
268
259
}
260
+ }
261
+ }()
269
262
270
- log .Debugf ("Reconnect to CM in %v..." , cmReconnectTimeout )
263
+ return nil
264
+ }
271
265
272
- select {
273
- case <- client .closeChannel :
274
- log .Debugf ("Disconnected from CM" )
266
+ func (client * SMClient ) processTlsCertChanged () {
267
+ for {
268
+ select {
269
+ case <- client .tlsCertChan :
270
+ log .Debug ("TLS certificate changed" )
275
271
276
- return
272
+ client . closeGRPCConnection ()
277
273
278
- case <- time .After (cmReconnectTimeout ):
279
- err = client .register ()
280
- }
274
+ case <- client .closeChannel :
275
+ return
281
276
}
282
- }()
277
+ }
278
+ }
279
+
280
+ func (client * SMClient ) openGRPCConnection (config * config.Config , provider CertificateProvider ,
281
+ cryptcoxontext * cryptutils.CryptoContext , insecureConn bool ,
282
+ ) (err error ) {
283
+ log .Debug ("Connecting to CM..." )
284
+
285
+ if client .connection , err = grpchelpers .CreateProtectedConnection (
286
+ config .CertStorage , config .CMServerURL , cmRequestTimeout , cryptcoxontext , provider , insecureConn ); err != nil {
287
+ return aoserrors .Wrap (err )
288
+ }
283
289
284
290
return nil
285
291
}
286
292
287
- func (client * SMClient ) register () (err error ) {
293
+ func (client * SMClient ) closeGRPCConnection () {
294
+ client .Lock ()
295
+ defer client .Unlock ()
296
+
297
+ log .Debug ("Closing CM connection..." )
298
+
299
+ if client .stream != nil {
300
+ if err := client .stream .CloseSend (); err != nil {
301
+ log .WithField ("err" , err ).Error ("SMClient failed send close" )
302
+ }
303
+ }
304
+
305
+ if client .connection != nil {
306
+ client .connection .Close ()
307
+ client .connection = nil
308
+ }
309
+ }
310
+
311
+ func (client * SMClient ) register (config * config.Config , provider CertificateProvider ,
312
+ cryptcoxontext * cryptutils.CryptoContext , insecureConn bool ,
313
+ ) (err error ) {
288
314
client .Lock ()
289
315
defer client .Unlock ()
290
316
317
+ if err := client .openGRPCConnection (config , provider , cryptcoxontext , insecureConn ); err != nil {
318
+ return err
319
+ }
320
+
291
321
log .Debug ("Registering to CM..." )
292
322
293
323
if client .stream , err = pb .NewSMServiceClient (client .connection ).RegisterSM (context .Background ()); err != nil {
@@ -332,7 +362,7 @@ func (client *SMClient) processMessages() (err error) {
332
362
if err != nil {
333
363
if code , ok := status .FromError (err ); ok {
334
364
if code .Code () == codes .Canceled {
335
- log .Debug ("SM client connection closed" )
365
+ log .Debug ("CM client connection closed" )
336
366
return nil
337
367
}
338
368
}
@@ -380,7 +410,7 @@ func (client *SMClient) processMessages() (err error) {
380
410
func (client * SMClient ) processGetNodeConfigStatus () {
381
411
version , err := client .nodeConfigProcessor .GetNodeConfigStatus ()
382
412
383
- status := & pb.NodeConfigStatus {Version : version }
413
+ status := & pb.NodeConfigStatus {Version : version , NodeId : client . nodeID , NodeType : client . nodeType }
384
414
385
415
if err != nil {
386
416
status .Error = pbconvert .ErrorInfoToPB (& cloudprotocol.ErrorInfo {Message : err .Error ()})
0 commit comments