@@ -74,7 +74,7 @@ type Client struct {
74
74
isMainNode bool
75
75
76
76
publicConnection * grpchelpers.GRPCConn
77
- protectedConnection * grpc. ClientConn
77
+ protectedConnection * grpchelpers. GRPCConn
78
78
79
79
publicService pb.IAMPublicServiceClient
80
80
identService pb.IAMPublicIdentityServiceClient
@@ -133,13 +133,14 @@ func New(
133
133
cryptocontext * cryptutils.CryptoContext , insecure bool ,
134
134
) (client * Client , err error ) {
135
135
localClient := & Client {
136
- publicURL : publicURL ,
137
- protectedURL : protectedURL ,
138
- certStorage : certStorage ,
139
- sender : sender ,
140
- cryptocontext : cryptocontext ,
141
- insecure : insecure ,
142
- publicConnection : grpchelpers .NewGRPCConn (),
136
+ publicURL : publicURL ,
137
+ protectedURL : protectedURL ,
138
+ certStorage : certStorage ,
139
+ sender : sender ,
140
+ cryptocontext : cryptocontext ,
141
+ insecure : insecure ,
142
+ publicConnection : grpchelpers .NewGRPCConn (),
143
+ protectedConnection : grpchelpers .NewGRPCConn (),
143
144
nodeInfoSubs : & nodeInfoChangeSub {
144
145
listeners : make ([]chan cloudprotocol.NodeInfo , 0 ),
145
146
},
@@ -613,7 +614,6 @@ func (client *Client) Close() error {
613
614
client .isReconnecting .Store (true )
614
615
615
616
client .closeGRPCConnection ()
616
- client .publicConnection .Close ()
617
617
618
618
log .Debug ("Disconnected from IAM" )
619
619
@@ -657,7 +657,8 @@ func (client *Client) SubscribeCertChanged(certType string) (<-chan *pb.CertInfo
657
657
ch := make (chan * pb.CertInfo , 1 )
658
658
659
659
if _ , ok := client .certChangeSub [certType ]; ! ok {
660
- grpcStream , err := client .subscribeCertChange (certType )
660
+ grpcStream , err := client .publicService .SubscribeCertChanged (
661
+ context .Background (), & pb.SubscribeCertChangedRequest {Type : certType })
661
662
if err != nil {
662
663
return nil , aoserrors .Wrap (err )
663
664
}
@@ -838,39 +839,55 @@ func (client *Client) GetPermissions(
838
839
func (client * Client ) openGRPCConnection () (err error ) {
839
840
log .Debug ("Connecting to IAM..." )
840
841
841
- var publicConn * grpc.ClientConn
842
+ var publicConn , protectedConn * grpc.ClientConn
842
843
843
844
publicConn , err = grpchelpers .CreatePublicConnection (
844
845
client .publicURL , client .cryptocontext , client .insecure )
845
846
if err != nil {
846
847
return aoserrors .Wrap (err )
847
848
}
848
849
849
- if err := client .publicConnection .Start (publicConn ); err != nil {
850
- return aoserrors .Wrap (err )
851
- }
850
+ client .publicConnection .Set (publicConn )
851
+
852
+ defer func () {
853
+ if err == nil {
854
+ client .publicConnection .Start ()
855
+ } else {
856
+ publicConn .Close ()
857
+ }
858
+ }()
852
859
853
860
client .publicService = pb .NewIAMPublicServiceClient (client .publicConnection )
854
861
client .identService = pb .NewIAMPublicIdentityServiceClient (client .publicConnection )
855
862
client .publicNodesService = pb .NewIAMPublicNodesServiceClient (client .publicConnection )
856
863
client .publicPermissionsService = pb .NewIAMPublicPermissionsServiceClient (client .publicConnection )
857
864
858
- if err = client .restoreCertInfoSubs (); err != nil {
859
- log .Error ("Failed subscribe on CertInfo change" )
860
-
861
- return aoserrors .Wrap (err )
865
+ if err = client .restorePublicSubs (); err != nil {
866
+ return err
862
867
}
863
868
864
869
if ! client .isProtectedConnEnabled () {
865
870
return nil
866
871
}
867
872
868
- client .protectedConnection , err = grpchelpers .CreateProtectedConnection (client .certStorage ,
869
- client .protectedURL , client .cryptocontext , client , client .insecure )
873
+ certProvider := NewCertProvider (publicConn )
874
+
875
+ protectedConn , err = grpchelpers .CreateProtectedConnection (client .certStorage ,
876
+ client .protectedURL , client .cryptocontext , certProvider , client .insecure )
870
877
if err != nil {
871
878
return aoserrors .Wrap (err )
872
879
}
873
880
881
+ client .protectedConnection .Set (protectedConn )
882
+
883
+ defer func () {
884
+ if err == nil {
885
+ client .protectedConnection .Start ()
886
+ } else {
887
+ protectedConn .Close ()
888
+ }
889
+ }()
890
+
874
891
client .certificateService = pb .NewIAMCertificateServiceClient (client .protectedConnection )
875
892
client .provisioningService = pb .NewIAMProvisioningServiceClient (client .protectedConnection )
876
893
client .nodesService = pb .NewIAMNodesServiceClient (client .protectedConnection )
@@ -889,7 +906,7 @@ func (client *Client) closeGRPCConnection() {
889
906
}
890
907
891
908
if client .protectedConnection != nil {
892
- client .protectedConnection .Close ()
909
+ client .protectedConnection .Stop ()
893
910
}
894
911
895
912
for _ , sub := range client .certChangeSub {
@@ -944,20 +961,6 @@ func (client *Client) subscribeUnitSubjectsChange() error {
944
961
return nil
945
962
}
946
963
947
- func (client * Client ) subscribeCertChange (certType string ) (
948
- listener pb.IAMPublicService_SubscribeCertChangedClient , err error ,
949
- ) {
950
- listener , err = client .publicService .SubscribeCertChanged (context .Background (),
951
- & pb.SubscribeCertChangedRequest {Type : certType })
952
- if err != nil {
953
- log .WithField ("error" , err ).Error ("Can't subscribe on CertChange event" )
954
-
955
- return nil , aoserrors .Wrap (err )
956
- }
957
-
958
- return listener , aoserrors .Wrap (err )
959
- }
960
-
961
964
func (client * Client ) processNodeInfoChange (sub * nodeInfoChangeSub ) {
962
965
defer sub .stopWG .Done ()
963
966
@@ -1182,9 +1185,10 @@ func (client *Client) finishProvisioning(nodeID, password string) (errorInfo *cl
1182
1185
1183
1186
func (client * Client ) restoreCertInfoSubs () error {
1184
1187
for certType , sub := range client .certChangeSub {
1185
- grpcStream , err := client .subscribeCertChange (certType )
1188
+ grpcStream , err := client .publicService .SubscribeCertChanged (
1189
+ context .Background (), & pb.SubscribeCertChangedRequest {Type : certType })
1186
1190
if err != nil {
1187
- return err
1191
+ return aoserrors . Wrap ( err )
1188
1192
}
1189
1193
1190
1194
sub .grpcStream = & grpcStream
@@ -1197,6 +1201,32 @@ func (client *Client) restoreCertInfoSubs() error {
1197
1201
return nil
1198
1202
}
1199
1203
1204
+ func (client * Client ) restorePublicSubs () error {
1205
+ var err error
1206
+
1207
+ if err = client .restoreCertInfoSubs (); err != nil {
1208
+ log .Error ("Failed subscribe on CertInfo change" )
1209
+
1210
+ return aoserrors .Wrap (err )
1211
+ }
1212
+
1213
+ if client .isMainNode {
1214
+ if err = client .subscribeNodeInfoChange (); err != nil {
1215
+ log .Error ("Failed subscribe on NodeInfo change" )
1216
+
1217
+ return aoserrors .Wrap (err )
1218
+ }
1219
+
1220
+ if err = client .subscribeUnitSubjectsChange (); err != nil {
1221
+ log .Error ("Failed subscribe on UnitSubject change" )
1222
+
1223
+ return aoserrors .Wrap (err )
1224
+ }
1225
+ }
1226
+
1227
+ return nil
1228
+ }
1229
+
1200
1230
func (client * Client ) onConnectionLost () {
1201
1231
select {
1202
1232
case <- client .closeChannel :
0 commit comments