Skip to content

Commit 0b750e1

Browse files
[utils] Split grpchelpers.Start/Set, so conn. reset didnt wakeup invoke
Signed-off-by: Mykola Kobets <mykola_kobets@epam.com>
1 parent b48900b commit 0b750e1

File tree

2 files changed

+34
-27
lines changed

2 files changed

+34
-27
lines changed

utils/grpchelpers/grpcconn.go

+28-18
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type GRPCConn struct {
3535
sync.Mutex
3636

3737
grpcConn *grpc.ClientConn
38+
started bool
3839
connStarted sync.WaitGroup
3940
}
4041

@@ -44,51 +45,59 @@ type GRPCConn struct {
4445

4546
// NewGRPCConn creates new GRPCConn object.
4647
func NewGRPCConn() *GRPCConn {
47-
conn := GRPCConn{}
48+
conn := GRPCConn{started: false}
4849

4950
conn.connStarted.Add(1)
5051

5152
return &conn
5253
}
5354

54-
// Start starts new grpc connection.
55-
func (conn *GRPCConn) Start(connection *grpc.ClientConn) error {
55+
// Set assigns new grpc connection.
56+
func (conn *GRPCConn) Set(connection *grpc.ClientConn) {
5657
conn.Lock()
5758
defer conn.Unlock()
5859

59-
if conn.grpcConn != nil {
60-
return aoserrors.New("Previous connection should be stopped before reset")
61-
}
62-
6360
conn.grpcConn = connection
64-
conn.connStarted.Done()
61+
}
62+
63+
// Start starts processing grpc calls.
64+
func (conn *GRPCConn) Start() {
65+
conn.Lock()
66+
defer conn.Unlock()
6567

66-
return nil
68+
if !conn.started {
69+
conn.started = true
70+
conn.connStarted.Done()
71+
}
6772
}
6873

6974
// Stop stops current grpc connection.
7075
func (conn *GRPCConn) Stop() {
7176
conn.Lock()
7277
defer conn.Unlock()
7378

74-
if conn.grpcConn == nil {
75-
return
79+
if conn.grpcConn != nil {
80+
conn.grpcConn.Close()
81+
conn.grpcConn = nil
7682
}
7783

78-
conn.grpcConn.Close()
79-
conn.grpcConn = nil
80-
81-
conn.connStarted.Add(1)
84+
if conn.started {
85+
conn.started = false
86+
conn.connStarted.Add(1)
87+
}
8288
}
8389

8490
// Close closes grpc connection and releases all spawned goroutines.
8591
func (conn *GRPCConn) Close() {
8692
conn.Lock()
8793
defer conn.Unlock()
8894

89-
if conn.grpcConn == nil {
95+
if !conn.started {
96+
conn.started = true
9097
conn.connStarted.Done()
91-
} else {
98+
}
99+
100+
if conn.grpcConn == nil {
92101
conn.grpcConn.Close()
93102
conn.grpcConn = nil
94103
}
@@ -102,7 +111,8 @@ func (conn *GRPCConn) Close() {
102111
func (conn *GRPCConn) Invoke(ctx context.Context, method string, args any, reply any,
103112
opts ...grpc.CallOption,
104113
) error {
105-
lock := make(chan struct{}, 1)
114+
lock := make(chan struct{})
115+
defer close(lock)
106116

107117
go func() {
108118
conn.connStarted.Wait()

utils/grpchelpers/grpcconn_test.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ func TestStoppedConnectionFails(t *testing.T) {
4646

4747
conn := grpchelpers.NewGRPCConn()
4848

49-
if err := conn.Start(grpcConn); err != nil {
50-
t.Fatalf("Blocking connection start failed: err=%v", err)
51-
}
49+
conn.Set(grpcConn)
50+
conn.Start()
5251

5352
if err := getNodeInfo(conn); err != nil {
5453
t.Fatalf("Server started, GetNodeInfo call should succeed: err=%v", err)
@@ -76,9 +75,8 @@ func TestRPCBlockedUntilConnectionReset(t *testing.T) {
7675

7776
conn := grpchelpers.NewGRPCConn()
7877

79-
if err := conn.Start(grpcConn); err != nil {
80-
t.Fatalf("Blocking connection start failed: err=%v", err)
81-
}
78+
conn.Set(grpcConn)
79+
conn.Start()
8280

8381
if err := getNodeInfo(conn); err != nil {
8482
t.Fatalf("GetNodeInfo call should succeed: err=%v", err)
@@ -111,9 +109,8 @@ func TestRPCBlockedUntilConnectionReset(t *testing.T) {
111109
t.Fatalf("Connection failed: err=%v", err)
112110
}
113111

114-
if err := conn.Start(grpcConn); err != nil {
115-
t.Fatalf("Blocking connection start failed: err=%v", err)
116-
}
112+
conn.Set(grpcConn)
113+
conn.Start()
117114

118115
if err := <-rpcResultChan; err != nil {
119116
t.Fatalf("GetNodeInfo call should succeed: err=%v", err)

0 commit comments

Comments
 (0)