Skip to content

Commit 0ebafa7

Browse files
[azservicebus] Handle 410 properly for session and non-session based links. (#17382)
410 (lock lost) handling needed to be separated for sessions and non-session links. For non-session links losing a link is essentially unrecoverable - the lock cannot be reobtained with just the lock token so further retries will always fail. For _session_ based links it's different since you actually obtain a session lock by opening a link for a particular session ID. You can retry re-opening the link and there are cases where, on detach, that you might have some overlap until the session is actually available again (this is shown in the `TestSessionReceiver_Detach` test). So the split here is simple - all retrying is really handled in amqpLinks, so now it takes a parameter giving it the proper function to determine if an error is fatal or retryable. As part of this I also made the parameters to amqpLinks an arg, which had some small rippling effects in callers. Fixes #17325 (probable fix for the latest issue mentioned in #17017 since this affects settlement time)
1 parent 0cf20e3 commit 0ebafa7

14 files changed

+254
-94
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66

77
- Support for using a SharedAccessSignature in a connection string. Ex: `Endpoint=sb://<sb>.servicebus.windows.net;SharedAccessSignature=SharedAccessSignature sr=<sb>.servicebus.windows.net&sig=<base64-sig>&se=<expiry>&skn=<keyname>` (#17314)
88

9-
### Breaking Changes
10-
119
### Bugs Fixed
1210

1311
- Fixed bug where message batch size calculation was inaccurate, resulting in batches that were too large to be sent. (#17318)
14-
15-
### Other Changes
12+
- Fixing an issue with an entity not being found leading to a longer timeout than needed. (#17279)
13+
- Fixed the RPCLink so it does better handling of connection/link failures. (#17389)
14+
- Fixed issue where a message lock expiring would cause unnecessary retries. These retries could cause message settlement calls (ex: Receiver.CompleteMessage)
15+
to appear to hang. (#17382)
1616

1717
## 0.3.6 (2022-03-08)
1818

sdk/messaging/azservicebus/client.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,10 @@ func newClientImpl(creds clientCreds, options *ClientOptions) (*Client, error) {
156156
func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOptions) (*Receiver, error) {
157157
id, cleanupOnClose := client.getCleanupForCloseable()
158158
receiver, err := newReceiver(newReceiverArgs{
159-
cleanupOnClose: cleanupOnClose,
160-
ns: client.namespace,
161-
entity: entity{Queue: queueName},
159+
cleanupOnClose: cleanupOnClose,
160+
ns: client.namespace,
161+
entity: entity{Queue: queueName},
162+
getRecoveryKindFunc: internal.GetRecoveryKind,
162163
}, options)
163164

164165
if err != nil {
@@ -173,9 +174,10 @@ func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOpt
173174
func (client *Client) NewReceiverForSubscription(topicName string, subscriptionName string, options *ReceiverOptions) (*Receiver, error) {
174175
id, cleanupOnClose := client.getCleanupForCloseable()
175176
receiver, err := newReceiver(newReceiverArgs{
176-
cleanupOnClose: cleanupOnClose,
177-
ns: client.namespace,
178-
entity: entity{Topic: topicName, Subscription: subscriptionName},
177+
cleanupOnClose: cleanupOnClose,
178+
ns: client.namespace,
179+
entity: entity{Topic: topicName, Subscription: subscriptionName},
180+
getRecoveryKindFunc: internal.GetRecoveryKind,
179181
}, options)
180182

181183
if err != nil {

sdk/messaging/azservicebus/client_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"net"
10+
"net/http"
1011
"os"
1112
"testing"
1213
"time"
@@ -346,5 +347,5 @@ func assertRPCNotFound(t *testing.T, err error) {
346347
}
347348

348349
require.ErrorAs(t, err, &rpcError)
349-
require.Equal(t, 404, rpcError.RPCCode())
350+
require.Equal(t, http.StatusNotFound, rpcError.RPCCode())
350351
}

sdk/messaging/azservicebus/internal/amqpLinks.go

+28-14
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type AMQPLinks interface {
4949

5050
// CloseIfNeeded closes the links or connection if the error is recoverable.
5151
// Use this if you don't want to recreate the connection/links at this point.
52-
CloseIfNeeded(ctx context.Context, err error) recoveryKind
52+
CloseIfNeeded(ctx context.Context, err error) RecoveryKind
5353

5454
// ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called.
5555
ClosedPermanently() bool
@@ -76,6 +76,8 @@ type AMQPLinksImpl struct {
7676
audience string
7777
createLink CreateLinkFunc
7878

79+
getRecoveryKindFunc func(err error) RecoveryKind
80+
7981
mu sync.RWMutex
8082

8183
// RPCLink lets you interact with the $management link for your entity.
@@ -103,16 +105,24 @@ type AMQPLinksImpl struct {
103105
// *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.
104106
type CreateLinkFunc func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)
105107

108+
type NewAMQPLinksArgs struct {
109+
NS NamespaceForAMQPLinks
110+
EntityPath string
111+
CreateLinkFunc CreateLinkFunc
112+
GetRecoveryKindFunc func(err error) RecoveryKind
113+
}
114+
106115
// NewAMQPLinks creates a session, starts the claim refresher and creates an associated
107116
// management link for a specific entity path.
108-
func NewAMQPLinks(ns NamespaceForAMQPLinks, entityPath string, createLink CreateLinkFunc) AMQPLinks {
117+
func NewAMQPLinks(args NewAMQPLinksArgs) AMQPLinks {
109118
l := &AMQPLinksImpl{
110-
entityPath: entityPath,
111-
managementPath: fmt.Sprintf("%s/$management", entityPath),
112-
audience: ns.GetEntityAudience(entityPath),
113-
createLink: createLink,
114-
closedPermanently: false,
115-
ns: ns,
119+
entityPath: args.EntityPath,
120+
managementPath: fmt.Sprintf("%s/$management", args.EntityPath),
121+
audience: args.NS.GetEntityAudience(args.EntityPath),
122+
createLink: args.CreateLinkFunc,
123+
closedPermanently: false,
124+
getRecoveryKindFunc: args.GetRecoveryKindFunc,
125+
ns: args.NS,
116126
}
117127

118128
return l
@@ -189,17 +199,17 @@ func (links *AMQPLinksImpl) RecoverIfNeeded(ctx context.Context, theirID LinkID,
189199
default:
190200
}
191201

192-
sbe := GetSBErrInfo(origErr)
202+
rk := links.getRecoveryKindFunc(origErr)
193203

194-
if sbe.RecoveryKind == RecoveryKindLink {
204+
if rk == RecoveryKindLink {
195205
if err := links.recoverLink(ctx, theirID); err != nil {
196206
log.Writef(EventConn, "failed to recreate link: %s", err.Error())
197207
return err
198208
}
199209

200210
log.Writef(EventConn, "Recovered links")
201211
return nil
202-
} else if sbe.RecoveryKind == RecoveryKindConn {
212+
} else if rk == RecoveryKindConn {
203213
if err := links.recoverConnection(ctx, theirID); err != nil {
204214
log.Writef(EventConn, "failed to recreate connection: %s", err.Error())
205215
return err
@@ -293,6 +303,10 @@ func (l *AMQPLinksImpl) Retry(ctx context.Context, name string, fn RetryWithLink
293303

294304
didQuickRetry := false
295305

306+
isFatalErrorFunc := func(err error) bool {
307+
return l.getRecoveryKindFunc(err) == RecoveryKindFatal
308+
}
309+
296310
return utils.Retry(ctx, name, func(ctx context.Context, args *utils.RetryFnArgs) error {
297311
if err := l.RecoverIfNeeded(ctx, lastID, args.LastErr); err != nil {
298312
return err
@@ -335,7 +349,7 @@ func (l *AMQPLinksImpl) Retry(ctx context.Context, name string, fn RetryWithLink
335349
}
336350

337351
return nil
338-
}, IsFatalSBError, o)
352+
}, isFatalErrorFunc, o)
339353
}
340354

341355
// EntityPath is the full entity path for the queue/topic/subscription.
@@ -368,11 +382,11 @@ func (l *AMQPLinksImpl) Close(ctx context.Context, permanent bool) error {
368382
// eats the cost of recovery, instead of doing it immediately. This is useful
369383
// if you're trying to exit out of a function quickly but still need to react
370384
// to a returned error.
371-
func (l *AMQPLinksImpl) CloseIfNeeded(ctx context.Context, err error) recoveryKind {
385+
func (l *AMQPLinksImpl) CloseIfNeeded(ctx context.Context, err error) RecoveryKind {
372386
l.mu.Lock()
373387
defer l.mu.Unlock()
374388

375-
rk := GetRecoveryKind(err)
389+
rk := l.getRecoveryKindFunc(err)
376390

377391
switch rk {
378392
case RecoveryKindLink:

0 commit comments

Comments
 (0)