Skip to content

Commit 3c64685

Browse files
[azservicebus] Fixing an error where the RPCLink wasn't very robust when it came to connection/link errors (#17389)
The RPC link didn't have a good comprehensive set of checks for errors that came back, which would result in the response router continually failing. Fixed now to use the same GetRecoveryKind() logic as the rest of the code does. Also, added in some better logging for some of the edge error cases, just so we know what's happening. This has potentially been hiding some failure cases, including scenarios where the link should have stopped (so it can properly recover). Also, some refactoring - the current go-amqp type structure is a bit hard to mock because concrete "client" types come back from other concrete types (for instance, amqp.Session has `GetReceiver() *amqp.Receiver`, which makes it hard to control the actual receiver object. I've added in an `amqpwrap` package where I've gone full-wrapper for client and session so I can do proper mocking in tests. That code is only used for rpc.go for now but I'll be migrating it over to others as we go.
1 parent d348005 commit 3c64685

File tree

6 files changed

+414
-76
lines changed

6 files changed

+414
-76
lines changed

sdk/messaging/azservicebus/internal/amqpInterfaces.go

+5-34
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,14 @@ package internal
66
import (
77
"context"
88

9+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
910
"github.com/Azure/go-amqp"
1011
)
1112

12-
// AMQPReceiver is implemented by *amqp.Receiver
13-
type AMQPReceiver interface {
14-
IssueCredit(credit uint32) error
15-
DrainCredit(ctx context.Context) error
16-
Receive(ctx context.Context) (*amqp.Message, error)
17-
Prefetched(ctx context.Context) (*amqp.Message, error)
18-
19-
// settlement functions
20-
AcceptMessage(ctx context.Context, msg *amqp.Message) error
21-
RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
22-
ReleaseMessage(ctx context.Context, msg *amqp.Message) error
23-
ModifyMessage(ctx context.Context, msg *amqp.Message, deliveryFailed, undeliverableHere bool, messageAnnotations amqp.Annotations) error
24-
25-
LinkName() string
26-
LinkSourceFilterValue(name string) interface{}
27-
}
28-
29-
// AMQPReceiver is implemented by *amqp.Receiver
30-
type AMQPReceiverCloser interface {
31-
AMQPReceiver
32-
Close(ctx context.Context) error
33-
}
13+
type AMQPReceiver = amqpwrap.AMQPReceiver
14+
type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser
15+
type AMQPSender = amqpwrap.AMQPSender
16+
type AMQPSenderCloser = amqpwrap.AMQPSenderCloser
3417

3518
// AMQPSession is implemented by *amqp.Session
3619
type AMQPSession interface {
@@ -44,18 +27,6 @@ type AMQPSessionCloser interface {
4427
Close(ctx context.Context) error
4528
}
4629

47-
// AMQPSender is implemented by *amqp.Sender
48-
type AMQPSender interface {
49-
Send(ctx context.Context, msg *amqp.Message) error
50-
MaxMessageSize() uint64
51-
}
52-
53-
// AMQPSenderCloser is implemented by *amqp.Sender
54-
type AMQPSenderCloser interface {
55-
AMQPSender
56-
Close(ctx context.Context) error
57-
}
58-
5930
// RPCLink is implemented by *rpc.Link
6031
type RPCLink interface {
6132
Close(ctx context.Context) error
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
// Package amqpwrap has some simple wrappers to make it easier to
5+
// abstract the go-amqp types.
6+
package amqpwrap
7+
8+
import (
9+
"context"
10+
11+
"github.com/Azure/go-amqp"
12+
)
13+
14+
// AMQPReceiver is implemented by *amqp.Receiver
15+
type AMQPReceiver interface {
16+
IssueCredit(credit uint32) error
17+
DrainCredit(ctx context.Context) error
18+
Receive(ctx context.Context) (*amqp.Message, error)
19+
Prefetched(ctx context.Context) (*amqp.Message, error)
20+
21+
// settlement functions
22+
AcceptMessage(ctx context.Context, msg *amqp.Message) error
23+
RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
24+
ReleaseMessage(ctx context.Context, msg *amqp.Message) error
25+
ModifyMessage(ctx context.Context, msg *amqp.Message, deliveryFailed, undeliverableHere bool, messageAnnotations amqp.Annotations) error
26+
27+
LinkName() string
28+
LinkSourceFilterValue(name string) interface{}
29+
}
30+
31+
// AMQPReceiverCloser is implemented by *amqp.Receiver
32+
type AMQPReceiverCloser interface {
33+
AMQPReceiver
34+
Close(ctx context.Context) error
35+
}
36+
37+
// AMQPSender is implemented by *amqp.Sender
38+
type AMQPSender interface {
39+
Send(ctx context.Context, msg *amqp.Message) error
40+
MaxMessageSize() uint64
41+
}
42+
43+
// AMQPSenderCloser is implemented by *amqp.Sender
44+
type AMQPSenderCloser interface {
45+
AMQPSender
46+
Close(ctx context.Context) error
47+
}
48+
49+
// AMQPSession is a simple interface, implemented by *AMQPSessionWrapper.
50+
// It exists only so we can return AMQPReceiver/AMQPSender interfaces.
51+
type AMQPSession interface {
52+
Close(ctx context.Context) error
53+
NewReceiver(opts ...amqp.LinkOption) (AMQPReceiverCloser, error)
54+
NewSender(opts ...amqp.LinkOption) (AMQPSenderCloser, error)
55+
}
56+
57+
type AMQPClient interface {
58+
Close() error
59+
NewSession(opts ...amqp.SessionOption) (AMQPSession, error)
60+
}
61+
62+
// AMQPClientWrapper is a simple interface, implemented by *AMQPClientWrapper
63+
// It exists only so we can return AMQPSession, which itself only exists so we can
64+
// return interfaces for AMQPSender and AMQPReceiver from AMQPSession.
65+
type AMQPClientWrapper struct {
66+
Inner *amqp.Client
67+
}
68+
69+
func (w *AMQPClientWrapper) Close() error {
70+
return w.Inner.Close()
71+
}
72+
73+
func (w *AMQPClientWrapper) NewSession(opts ...amqp.SessionOption) (AMQPSession, error) {
74+
sess, err := w.Inner.NewSession(opts...)
75+
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
return &AMQPSessionWrapper{
81+
inner: sess,
82+
}, nil
83+
}
84+
85+
type AMQPSessionWrapper struct {
86+
inner *amqp.Session
87+
}
88+
89+
func (w *AMQPSessionWrapper) Close(ctx context.Context) error {
90+
return w.inner.Close(ctx)
91+
}
92+
93+
func (w *AMQPSessionWrapper) NewReceiver(opts ...amqp.LinkOption) (AMQPReceiverCloser, error) {
94+
return w.inner.NewReceiver(opts...)
95+
}
96+
97+
func (w *AMQPSessionWrapper) NewSender(opts ...amqp.LinkOption) (AMQPSenderCloser, error) {
98+
return w.inner.NewSender(opts...)
99+
}

sdk/messaging/azservicebus/internal/cbs.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88

99
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
10+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
1011
"github.com/Azure/azure-sdk-for-go/sdk/messaging/internal/auth"
1112
"github.com/Azure/go-amqp"
1213
)
@@ -23,7 +24,7 @@ const (
2324
// NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
2425
func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, provider auth.TokenProvider) error {
2526
link, err := NewRPCLink(RPCLinkArgs{
26-
Client: conn,
27+
Client: &amqpwrap.AMQPClientWrapper{Inner: conn},
2728
Address: cbsAddress,
2829
})
2930

sdk/messaging/azservicebus/internal/namespace.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
1616
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
17+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
1718
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/sbauth"
1819
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing"
1920
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
@@ -224,7 +225,7 @@ func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (RPC
224225
}
225226

226227
return NewRPCLink(RPCLinkArgs{
227-
Client: client,
228+
Client: &amqpwrap.AMQPClientWrapper{Inner: client},
228229
Address: managementPath,
229230
})
230231
}

sdk/messaging/azservicebus/internal/rpc.go

+36-40
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
"sync"
1111
"time"
1212

13+
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
1314
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
15+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
1416
"github.com/Azure/go-amqp"
1517
)
1618

@@ -24,10 +26,9 @@ const (
2426
type (
2527
// rpcLink is the bidirectional communication structure used for CBS negotiation
2628
rpcLink struct {
27-
session *amqp.Session
28-
29-
receiver amqpReceiver // *amqp.Receiver
30-
sender amqpSender // *amqp.Sender
29+
session amqpwrap.AMQPSession
30+
receiver amqpwrap.AMQPReceiverCloser // *amqp.Receiver
31+
sender amqpwrap.AMQPSenderCloser // *amqp.Sender
3132

3233
clientAddress string
3334
sessionID *string
@@ -38,9 +39,10 @@ type (
3839
responseMap map[string]chan rpcResponse
3940
broadcastErr error // the error that caused the responseMap to be nil'd
4041

42+
logEvent azlog.Event
43+
4144
// for unit tests
42-
uuidNewV4 func() (uuid.UUID, error)
43-
messageAccept func(ctx context.Context, message *amqp.Message) error
45+
uuidNewV4 func() (uuid.UUID, error)
4446
}
4547

4648
// RPCResponse is the simplified response structure from an RPC like call
@@ -57,17 +59,6 @@ type (
5759
message *amqp.Message
5860
err error
5961
}
60-
61-
// Actually: *amqp.Receiver
62-
amqpReceiver interface {
63-
Receive(ctx context.Context) (*amqp.Message, error)
64-
Close(ctx context.Context) error
65-
}
66-
67-
amqpSender interface {
68-
Send(ctx context.Context, msg *amqp.Message) error
69-
Close(ctx context.Context) error
70-
}
7162
)
7263

7364
type rpcError struct {
@@ -84,8 +75,9 @@ func (e rpcError) RPCCode() int {
8475
}
8576

8677
type RPCLinkArgs struct {
87-
Client *amqp.Client
88-
Address string
78+
Client amqpwrap.AMQPClient
79+
Address string
80+
LogEvent azlog.Event
8981
}
9082

9183
// NewRPCLink will build a new request response link
@@ -110,6 +102,7 @@ func NewRPCLink(args RPCLinkArgs) (*rpcLink, error) {
110102
uuidNewV4: uuid.New,
111103
responseMap: map[string]chan rpcResponse{},
112104
startResponseRouterOnce: &sync.Once{},
105+
logEvent: args.LogEvent,
113106
}
114107

115108
sender, err := session.NewSender(
@@ -147,45 +140,57 @@ func NewRPCLink(args RPCLinkArgs) (*rpcLink, error) {
147140

148141
link.sender = sender
149142
link.receiver = receiver
150-
link.messageAccept = receiver.AcceptMessage
151143

152144
return link, nil
153145
}
154146

147+
const responseRouterShutdownMessage = "Response router has shut down"
148+
155149
// startResponseRouter is responsible for taking any messages received on the 'response'
156150
// link and forwarding it to the proper channel. The channel is being select'd by the
157151
// original `RPC` call.
158152
func (l *rpcLink) startResponseRouter() {
153+
defer azlog.Writef(l.logEvent, responseRouterShutdownMessage)
154+
159155
for {
160156
res, err := l.receiver.Receive(context.Background())
161157

162-
// You'll see this when the link is shutting down (either
163-
// service-initiated via 'detach' or a user-initiated shutdown)
164-
if isClosedError(err) {
165-
l.broadcastError(err)
166-
break
158+
if err != nil {
159+
// if the link or connection has a malfunction that would require it to restart then
160+
// we need to bail out, broadcasting to all affected callers/consumers.
161+
if GetRecoveryKind(err) != RecoveryKindNone {
162+
azlog.Writef(l.logEvent, "Error in RPCLink, stopping response router: %s", err.Error())
163+
l.broadcastError(err)
164+
break
165+
}
166+
167+
azlog.Writef(l.logEvent, "Non-fatal error in RPCLink, starting to receive again: %s", err.Error())
168+
continue
167169
}
168170

169171
// I don't believe this should happen. The JS version of this same code
170172
// ignores errors as well since responses should always be correlated
171173
// to actual send requests. So this is just here for completeness.
172174
if res == nil {
175+
azlog.Writef(l.logEvent, "RPCLink received no error, but also got no response")
173176
continue
174177
}
175178

176179
autogenMessageId, ok := res.Properties.CorrelationID.(string)
177180

178181
if !ok {
179-
// TODO: it'd be good to track these in some way. We don't have a good way to
180-
// forward this on at this point.
182+
azlog.Writef(l.logEvent, "RPCLink message received without a CorrelationID %v", res)
181183
continue
182184
}
183185

184186
ch := l.deleteChannelFromMap(autogenMessageId)
185187

186-
if ch != nil {
187-
ch <- rpcResponse{message: res, err: err}
188+
if ch == nil {
189+
azlog.Writef(l.logEvent, "RPCLink had no response channel for correlation ID %v", autogenMessageId)
190+
continue
188191
}
192+
193+
ch <- rpcResponse{message: res, err: err}
189194
}
190195
}
191196

@@ -228,7 +233,7 @@ func (l *rpcLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, err
228233

229234
if err != nil {
230235
l.deleteChannelFromMap(messageID)
231-
return nil, fmt.Errorf("Failed to send message with ID %s: %w", messageID, err)
236+
return nil, fmt.Errorf("failed to send message with ID %s: %w", messageID, err)
232237
}
233238

234239
var res *amqp.Message
@@ -281,7 +286,7 @@ func (l *rpcLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, err
281286
Message: res,
282287
}
283288

284-
if err := l.messageAccept(ctx, res); err != nil {
289+
if err := l.receiver.AcceptMessage(ctx, res); err != nil {
285290
return response, fmt.Errorf("failed accepting message on rpc link: %w", err)
286291
}
287292

@@ -415,15 +420,6 @@ func addMessageID(message *amqp.Message, uuidNewV4 func() (uuid.UUID, error)) (*
415420
return &copiedMessage, autoGenMessageID, nil
416421
}
417422

418-
func isClosedError(err error) bool {
419-
var detachError *amqp.DetachError
420-
421-
return errors.Is(err, amqp.ErrLinkClosed) ||
422-
errors.As(err, &detachError) ||
423-
errors.Is(err, amqp.ErrConnClosed) ||
424-
errors.Is(err, amqp.ErrSessionClosed)
425-
}
426-
427423
// asRPCError checks to see if the res is actually a failed request
428424
// (where failed means the status code was non-2xx). If so,
429425
// it returns true and updates the struct pointed to by err.

0 commit comments

Comments
 (0)