Skip to content

Commit 7a24b21

Browse files
[iamclient] Lock public & protected grpc calls if connection lost
Signed-off-by: Mykola Kobets <mykola_kobets@epam.com>
1 parent f0b4cac commit 7a24b21

File tree

2 files changed

+134
-36
lines changed

2 files changed

+134
-36
lines changed

iamclient/iamcertprovider.go

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
//
3+
// Copyright (C) 2025 EPAM Systems, Inc.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package iamclient
18+
19+
import (
20+
"context"
21+
22+
"github.com/aosedge/aos_common/aoserrors"
23+
pb "github.com/aosedge/aos_common/api/iamanager"
24+
log "github.com/sirupsen/logrus"
25+
"google.golang.org/grpc"
26+
)
27+
28+
/***********************************************************************************************************************
29+
* Consts
30+
**********************************************************************************************************************/
31+
32+
/***********************************************************************************************************************
33+
* Types
34+
**********************************************************************************************************************/
35+
36+
// IAM certificate provider.
37+
type IAMCertProvider struct {
38+
connection *grpc.ClientConn
39+
}
40+
41+
/***********************************************************************************************************************
42+
* Public
43+
**********************************************************************************************************************/
44+
45+
// New creates new certificate provider instance.
46+
func NewCertProvider(connection *grpc.ClientConn) *IAMCertProvider {
47+
return &IAMCertProvider{connection: connection}
48+
}
49+
50+
// GetCertificate gets certificate by issuer.
51+
func (provider *IAMCertProvider) GetCertificate(
52+
certType string, issuer []byte, serial string,
53+
) (certURL, keyURL string, err error) {
54+
log.Debug("Get IAM certificate")
55+
56+
ctx, cancel := context.WithTimeout(context.Background(), iamRequestTimeout)
57+
defer cancel()
58+
59+
publicService := pb.NewIAMPublicServiceClient(provider.connection)
60+
response, err := publicService.GetCert(
61+
ctx, &pb.GetCertRequest{Type: certType, Issuer: issuer, Serial: serial})
62+
if err != nil {
63+
return "", "", aoserrors.Wrap(err)
64+
}
65+
66+
return response.GetCertUrl(), response.GetKeyUrl(), nil
67+
}

iamclient/iamclient.go

+67-36
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type Client struct {
7474
isMainNode bool
7575

7676
publicConnection *grpchelpers.GRPCConn
77-
protectedConnection *grpc.ClientConn
77+
protectedConnection *grpchelpers.GRPCConn
7878

7979
publicService pb.IAMPublicServiceClient
8080
identService pb.IAMPublicIdentityServiceClient
@@ -133,13 +133,14 @@ func New(
133133
cryptocontext *cryptutils.CryptoContext, insecure bool,
134134
) (client *Client, err error) {
135135
localClient := &Client{
136-
publicURL: publicURL,
137-
protectedURL: protectedURL,
138-
certStorage: certStorage,
139-
sender: sender,
140-
cryptocontext: cryptocontext,
141-
insecure: insecure,
142-
publicConnection: grpchelpers.NewGRPCConn(),
136+
publicURL: publicURL,
137+
protectedURL: protectedURL,
138+
certStorage: certStorage,
139+
sender: sender,
140+
cryptocontext: cryptocontext,
141+
insecure: insecure,
142+
publicConnection: grpchelpers.NewGRPCConn(),
143+
protectedConnection: grpchelpers.NewGRPCConn(),
143144
nodeInfoSubs: &nodeInfoChangeSub{
144145
listeners: make([]chan cloudprotocol.NodeInfo, 0),
145146
},
@@ -613,7 +614,6 @@ func (client *Client) Close() error {
613614
client.isReconnecting.Store(true)
614615

615616
client.closeGRPCConnection()
616-
client.publicConnection.Close()
617617

618618
log.Debug("Disconnected from IAM")
619619

@@ -657,7 +657,8 @@ func (client *Client) SubscribeCertChanged(certType string) (<-chan *pb.CertInfo
657657
ch := make(chan *pb.CertInfo, 1)
658658

659659
if _, ok := client.certChangeSub[certType]; !ok {
660-
grpcStream, err := client.subscribeCertChange(certType)
660+
grpcStream, err := client.publicService.SubscribeCertChanged(
661+
context.Background(), &pb.SubscribeCertChangedRequest{Type: certType})
661662
if err != nil {
662663
return nil, aoserrors.Wrap(err)
663664
}
@@ -839,38 +840,55 @@ func (client *Client) openGRPCConnection() (err error) {
839840
log.Debug("Connecting to IAM...")
840841

841842
var publicConn *grpc.ClientConn
843+
var protectedConn *grpc.ClientConn
842844

843845
publicConn, err = grpchelpers.CreatePublicConnection(
844846
client.publicURL, client.cryptocontext, client.insecure)
845847
if err != nil {
846848
return aoserrors.Wrap(err)
847849
}
848850

849-
if err := client.publicConnection.Start(publicConn); err != nil {
850-
return aoserrors.Wrap(err)
851-
}
851+
client.publicConnection.Set(publicConn)
852+
853+
defer func() {
854+
if err == nil {
855+
client.publicConnection.Start()
856+
} else {
857+
publicConn.Close()
858+
}
859+
}()
852860

853861
client.publicService = pb.NewIAMPublicServiceClient(client.publicConnection)
854862
client.identService = pb.NewIAMPublicIdentityServiceClient(client.publicConnection)
855863
client.publicNodesService = pb.NewIAMPublicNodesServiceClient(client.publicConnection)
856864
client.publicPermissionsService = pb.NewIAMPublicPermissionsServiceClient(client.publicConnection)
857865

858-
if err = client.restoreCertInfoSubs(); err != nil {
859-
log.Error("Failed subscribe on CertInfo change")
860-
861-
return aoserrors.Wrap(err)
866+
if err = client.restorePublicSubs(); err != nil {
867+
return err
862868
}
863869

864870
if !client.isProtectedConnEnabled() {
865871
return nil
866872
}
867873

868-
client.protectedConnection, err = grpchelpers.CreateProtectedConnection(client.certStorage,
869-
client.protectedURL, client.cryptocontext, client, client.insecure)
874+
certProvider := NewCertProvider(publicConn)
875+
876+
protectedConn, err = grpchelpers.CreateProtectedConnection(client.certStorage,
877+
client.protectedURL, client.cryptocontext, certProvider, client.insecure)
870878
if err != nil {
871879
return aoserrors.Wrap(err)
872880
}
873881

882+
client.protectedConnection.Set(protectedConn)
883+
884+
defer func() {
885+
if err == nil {
886+
client.protectedConnection.Start()
887+
} else {
888+
protectedConn.Close()
889+
}
890+
}()
891+
874892
client.certificateService = pb.NewIAMCertificateServiceClient(client.protectedConnection)
875893
client.provisioningService = pb.NewIAMProvisioningServiceClient(client.protectedConnection)
876894
client.nodesService = pb.NewIAMNodesServiceClient(client.protectedConnection)
@@ -889,7 +907,7 @@ func (client *Client) closeGRPCConnection() {
889907
}
890908

891909
if client.protectedConnection != nil {
892-
client.protectedConnection.Close()
910+
client.protectedConnection.Stop()
893911
}
894912

895913
for _, sub := range client.certChangeSub {
@@ -944,20 +962,6 @@ func (client *Client) subscribeUnitSubjectsChange() error {
944962
return nil
945963
}
946964

947-
func (client *Client) subscribeCertChange(certType string) (
948-
listener pb.IAMPublicService_SubscribeCertChangedClient, err error,
949-
) {
950-
listener, err = client.publicService.SubscribeCertChanged(context.Background(),
951-
&pb.SubscribeCertChangedRequest{Type: certType})
952-
if err != nil {
953-
log.WithField("error", err).Error("Can't subscribe on CertChange event")
954-
955-
return nil, aoserrors.Wrap(err)
956-
}
957-
958-
return listener, aoserrors.Wrap(err)
959-
}
960-
961965
func (client *Client) processNodeInfoChange(sub *nodeInfoChangeSub) {
962966
defer sub.stopWG.Done()
963967

@@ -1182,9 +1186,10 @@ func (client *Client) finishProvisioning(nodeID, password string) (errorInfo *cl
11821186

11831187
func (client *Client) restoreCertInfoSubs() error {
11841188
for certType, sub := range client.certChangeSub {
1185-
grpcStream, err := client.subscribeCertChange(certType)
1189+
grpcStream, err := client.publicService.SubscribeCertChanged(
1190+
context.Background(), &pb.SubscribeCertChangedRequest{Type: certType})
11861191
if err != nil {
1187-
return err
1192+
return aoserrors.Wrap(err)
11881193
}
11891194

11901195
sub.grpcStream = &grpcStream
@@ -1197,6 +1202,32 @@ func (client *Client) restoreCertInfoSubs() error {
11971202
return nil
11981203
}
11991204

1205+
func (client *Client) restorePublicSubs() error {
1206+
var err error
1207+
1208+
if err = client.restoreCertInfoSubs(); err != nil {
1209+
log.Error("Failed subscribe on CertInfo change")
1210+
1211+
return aoserrors.Wrap(err)
1212+
}
1213+
1214+
if client.isMainNode {
1215+
if err = client.subscribeNodeInfoChange(); err != nil {
1216+
log.Error("Failed subscribe on NodeInfo change")
1217+
1218+
return aoserrors.Wrap(err)
1219+
}
1220+
1221+
if err = client.subscribeUnitSubjectsChange(); err != nil {
1222+
log.Error("Failed subscribe on UnitSubject change")
1223+
1224+
return aoserrors.Wrap(err)
1225+
}
1226+
}
1227+
1228+
return nil
1229+
}
1230+
12001231
func (client *Client) onConnectionLost() {
12011232
select {
12021233
case <-client.closeChannel:

0 commit comments

Comments
 (0)