diff --git a/client.go b/client.go index bf7455c6..272e19a9 100644 --- a/client.go +++ b/client.go @@ -320,9 +320,10 @@ func (c *Client) monitor(ctx context.Context) { c.pauseSubscriptions(ctx) var ( - subsToRepublish []uint32 // subscription ids for which to send republish requests - subsToRecreate []uint32 // subscription ids which need to be recreated as new subscriptions - availableSeqs map[uint32][]uint32 + subsToRepublish []uint32 // subscription ids for which to send republish requests + subsToRecreate []uint32 // subscription ids which need to be recreated as new subscriptions + availableSeqs map[uint32][]uint32 // available sequence numbers per subscription + activeSubs int // number of active subscriptions to resume/recreate ) for action != none { @@ -485,11 +486,13 @@ func (c *Client) monitor(ctx context.Context) { // Assume that subsToRecreate and subsToRepublish have been // populated in the previous step. + activeSubs = 0 for _, id := range subsToRepublish { if err := c.republishSubscription(ctx, id, availableSeqs[id]); err != nil { dlog.Printf("republish of subscription %d failed", id) subsToRecreate = append(subsToRecreate, id) } + activeSubs++ } for _, id := range subsToRecreate { @@ -498,6 +501,7 @@ func (c *Client) monitor(ctx context.Context) { action = recreateSession continue } + activeSubs++ } c.setState(Connected) @@ -521,9 +525,14 @@ func (c *Client) monitor(ctx context.Context) { <-c.sechanErr } - dlog.Printf("resuming subscriptions") - c.resumeSubscriptions(ctx) - dlog.Printf("resumed subscriptions") + switch { + case activeSubs > 0: + dlog.Printf("resuming %d subscriptions", activeSubs) + c.resumeSubscriptions(ctx) + dlog.Printf("resumed %d subscriptions", activeSubs) + default: + dlog.Printf("no subscriptions to resume") + } } } } diff --git a/client_sub.go b/client_sub.go index 0da4fecd..90f878e2 100644 --- a/client_sub.go +++ b/client_sub.go @@ -376,6 +376,9 @@ publish: default: // send publish request and handle response + // + // publish() blocks until a PublishResponse + // is received or the context is cancelled. if err := c.publish(ctx); err != nil { dlog.Print("error: ", err.Error()) c.pauseSubscriptions(ctx)