Skip to content

Commit 0c0bb9d

Browse files
mykola-kobets-epamal1img
authored andcommitted
[iamclient] Lock public & protected grpc calls if connection lost
Signed-off-by: Mykola Kobets <mykola_kobets@epam.com>
1 parent c53e129 commit 0c0bb9d

File tree

2 files changed

+131
-37
lines changed

2 files changed

+131
-37
lines changed

iamclient/iamcertprovider.go

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
//
3+
// Copyright (C) 2025 EPAM Systems, Inc.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package iamclient
18+
19+
import (
20+
"context"
21+
22+
"github.com/aosedge/aos_common/aoserrors"
23+
pb "github.com/aosedge/aos_common/api/iamanager"
24+
log "github.com/sirupsen/logrus"
25+
"google.golang.org/grpc"
26+
)
27+
28+
/***********************************************************************************************************************
29+
* Types
30+
**********************************************************************************************************************/
31+
32+
// IAM certificate provider.
33+
type IAMCertProvider struct {
34+
connection *grpc.ClientConn
35+
}
36+
37+
/***********************************************************************************************************************
38+
* Public
39+
**********************************************************************************************************************/
40+
41+
// NewCertProvider creates new certificate provider instance.
42+
func NewCertProvider(connection *grpc.ClientConn) *IAMCertProvider {
43+
return &IAMCertProvider{connection: connection}
44+
}
45+
46+
// GetCertificate gets certificate by issuer.
47+
func (provider *IAMCertProvider) GetCertificate(
48+
certType string, issuer []byte, serial string,
49+
) (certURL, keyURL string, err error) {
50+
log.Debug("Get IAM certificate")
51+
52+
ctx, cancel := context.WithTimeout(context.Background(), iamRequestTimeout)
53+
defer cancel()
54+
55+
publicService := pb.NewIAMPublicServiceClient(provider.connection)
56+
57+
response, err := publicService.GetCert(
58+
ctx, &pb.GetCertRequest{Type: certType, Issuer: issuer, Serial: serial})
59+
if err != nil {
60+
return "", "", aoserrors.Wrap(err)
61+
}
62+
63+
return response.GetCertUrl(), response.GetKeyUrl(), nil
64+
}

iamclient/iamclient.go

+67-37
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type Client struct {
7474
isMainNode bool
7575

7676
publicConnection *grpchelpers.GRPCConn
77-
protectedConnection *grpc.ClientConn
77+
protectedConnection *grpchelpers.GRPCConn
7878

7979
publicService pb.IAMPublicServiceClient
8080
identService pb.IAMPublicIdentityServiceClient
@@ -133,13 +133,14 @@ func New(
133133
cryptocontext *cryptutils.CryptoContext, insecure bool,
134134
) (client *Client, err error) {
135135
localClient := &Client{
136-
publicURL: publicURL,
137-
protectedURL: protectedURL,
138-
certStorage: certStorage,
139-
sender: sender,
140-
cryptocontext: cryptocontext,
141-
insecure: insecure,
142-
publicConnection: grpchelpers.NewGRPCConn(),
136+
publicURL: publicURL,
137+
protectedURL: protectedURL,
138+
certStorage: certStorage,
139+
sender: sender,
140+
cryptocontext: cryptocontext,
141+
insecure: insecure,
142+
publicConnection: grpchelpers.NewGRPCConn(),
143+
protectedConnection: grpchelpers.NewGRPCConn(),
143144
nodeInfoSubs: &nodeInfoChangeSub{
144145
listeners: make([]chan cloudprotocol.NodeInfo, 0),
145146
},
@@ -613,7 +614,6 @@ func (client *Client) Close() error {
613614
client.isReconnecting.Store(true)
614615

615616
client.closeGRPCConnection()
616-
client.publicConnection.Close()
617617

618618
log.Debug("Disconnected from IAM")
619619

@@ -657,7 +657,8 @@ func (client *Client) SubscribeCertChanged(certType string) (<-chan *pb.CertInfo
657657
ch := make(chan *pb.CertInfo, 1)
658658

659659
if _, ok := client.certChangeSub[certType]; !ok {
660-
grpcStream, err := client.subscribeCertChange(certType)
660+
grpcStream, err := client.publicService.SubscribeCertChanged(
661+
context.Background(), &pb.SubscribeCertChangedRequest{Type: certType})
661662
if err != nil {
662663
return nil, aoserrors.Wrap(err)
663664
}
@@ -838,39 +839,55 @@ func (client *Client) GetPermissions(
838839
func (client *Client) openGRPCConnection() (err error) {
839840
log.Debug("Connecting to IAM...")
840841

841-
var publicConn *grpc.ClientConn
842+
var publicConn, protectedConn *grpc.ClientConn
842843

843844
publicConn, err = grpchelpers.CreatePublicConnection(
844845
client.publicURL, client.cryptocontext, client.insecure)
845846
if err != nil {
846847
return aoserrors.Wrap(err)
847848
}
848849

849-
if err := client.publicConnection.Start(publicConn); err != nil {
850-
return aoserrors.Wrap(err)
851-
}
850+
client.publicConnection.Set(publicConn)
851+
852+
defer func() {
853+
if err == nil {
854+
client.publicConnection.Start()
855+
} else {
856+
publicConn.Close()
857+
}
858+
}()
852859

853860
client.publicService = pb.NewIAMPublicServiceClient(client.publicConnection)
854861
client.identService = pb.NewIAMPublicIdentityServiceClient(client.publicConnection)
855862
client.publicNodesService = pb.NewIAMPublicNodesServiceClient(client.publicConnection)
856863
client.publicPermissionsService = pb.NewIAMPublicPermissionsServiceClient(client.publicConnection)
857864

858-
if err = client.restoreCertInfoSubs(); err != nil {
859-
log.Error("Failed subscribe on CertInfo change")
860-
861-
return aoserrors.Wrap(err)
865+
if err = client.restorePublicSubs(); err != nil {
866+
return err
862867
}
863868

864869
if !client.isProtectedConnEnabled() {
865870
return nil
866871
}
867872

868-
client.protectedConnection, err = grpchelpers.CreateProtectedConnection(client.certStorage,
869-
client.protectedURL, client.cryptocontext, client, client.insecure)
873+
certProvider := NewCertProvider(publicConn)
874+
875+
protectedConn, err = grpchelpers.CreateProtectedConnection(client.certStorage,
876+
client.protectedURL, client.cryptocontext, certProvider, client.insecure)
870877
if err != nil {
871878
return aoserrors.Wrap(err)
872879
}
873880

881+
client.protectedConnection.Set(protectedConn)
882+
883+
defer func() {
884+
if err == nil {
885+
client.protectedConnection.Start()
886+
} else {
887+
protectedConn.Close()
888+
}
889+
}()
890+
874891
client.certificateService = pb.NewIAMCertificateServiceClient(client.protectedConnection)
875892
client.provisioningService = pb.NewIAMProvisioningServiceClient(client.protectedConnection)
876893
client.nodesService = pb.NewIAMNodesServiceClient(client.protectedConnection)
@@ -889,7 +906,7 @@ func (client *Client) closeGRPCConnection() {
889906
}
890907

891908
if client.protectedConnection != nil {
892-
client.protectedConnection.Close()
909+
client.protectedConnection.Stop()
893910
}
894911

895912
for _, sub := range client.certChangeSub {
@@ -944,20 +961,6 @@ func (client *Client) subscribeUnitSubjectsChange() error {
944961
return nil
945962
}
946963

947-
func (client *Client) subscribeCertChange(certType string) (
948-
listener pb.IAMPublicService_SubscribeCertChangedClient, err error,
949-
) {
950-
listener, err = client.publicService.SubscribeCertChanged(context.Background(),
951-
&pb.SubscribeCertChangedRequest{Type: certType})
952-
if err != nil {
953-
log.WithField("error", err).Error("Can't subscribe on CertChange event")
954-
955-
return nil, aoserrors.Wrap(err)
956-
}
957-
958-
return listener, aoserrors.Wrap(err)
959-
}
960-
961964
func (client *Client) processNodeInfoChange(sub *nodeInfoChangeSub) {
962965
defer sub.stopWG.Done()
963966

@@ -1182,9 +1185,10 @@ func (client *Client) finishProvisioning(nodeID, password string) (errorInfo *cl
11821185

11831186
func (client *Client) restoreCertInfoSubs() error {
11841187
for certType, sub := range client.certChangeSub {
1185-
grpcStream, err := client.subscribeCertChange(certType)
1188+
grpcStream, err := client.publicService.SubscribeCertChanged(
1189+
context.Background(), &pb.SubscribeCertChangedRequest{Type: certType})
11861190
if err != nil {
1187-
return err
1191+
return aoserrors.Wrap(err)
11881192
}
11891193

11901194
sub.grpcStream = &grpcStream
@@ -1197,6 +1201,32 @@ func (client *Client) restoreCertInfoSubs() error {
11971201
return nil
11981202
}
11991203

1204+
func (client *Client) restorePublicSubs() error {
1205+
var err error
1206+
1207+
if err = client.restoreCertInfoSubs(); err != nil {
1208+
log.Error("Failed subscribe on CertInfo change")
1209+
1210+
return aoserrors.Wrap(err)
1211+
}
1212+
1213+
if client.isMainNode {
1214+
if err = client.subscribeNodeInfoChange(); err != nil {
1215+
log.Error("Failed subscribe on NodeInfo change")
1216+
1217+
return aoserrors.Wrap(err)
1218+
}
1219+
1220+
if err = client.subscribeUnitSubjectsChange(); err != nil {
1221+
log.Error("Failed subscribe on UnitSubject change")
1222+
1223+
return aoserrors.Wrap(err)
1224+
}
1225+
}
1226+
1227+
return nil
1228+
}
1229+
12001230
func (client *Client) onConnectionLost() {
12011231
select {
12021232
case <-client.closeChannel:

0 commit comments

Comments
 (0)