Skip to content

Commit

Permalink
feat: make the retriever stop immediately if the context is canceled
Browse files Browse the repository at this point in the history
  • Loading branch information
francescopepe committed Aug 27, 2024
1 parent cc77bc3 commit 0ffeeae
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 6 deletions.
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ module github.com/francescopepe/formigo

go 1.21

require github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4
require (
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5
github.com/stretchr/testify v1.9.0
)

require (
github.com/aws/aws-sdk-go-v2 v1.30.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,17 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs=
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 h1:FXPO72iKC5YmYNEANltl763bUj8A6qT20wx8Jwvxlsw=
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU=
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5 h1:HYyVDOC2/PIg+3oBX1q0wtDU5kONki6lrgIG0afrBkY=
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU=
github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4=
github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
8 changes: 6 additions & 2 deletions internal/client/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package client

import "github.com/francescopepe/formigo/internal/messages"
import (
"context"

"github.com/francescopepe/formigo/internal/messages"
)

type MessageReceiver interface {
ReceiveMessages() ([]messages.Message, error)
ReceiveMessages(ctx context.Context) ([]messages.Message, error)
}

type MessageDeleter interface {
Expand Down
8 changes: 7 additions & 1 deletion retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr
case <-ctx.Done():
return
default:
msgs, err := receiver.ReceiveMessages()
msgs, err := receiver.ReceiveMessages(ctx)
if err != nil {
if errors.Is(err, context.Canceled) && errors.Is(ctx.Err(), context.Canceled) {
// The worker's context was canceled. We can exit.
return
}

// Report the error to the controller and continue.
ctrl.reportError(fmt.Errorf("unable to receive message: %w", err))
continue
}
Expand Down
4 changes: 2 additions & 2 deletions sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type sqsClient struct {
messageCtxTimeout time.Duration
}

func (c sqsClient) ReceiveMessages() ([]messages.Message, error) {
out, err := c.svc.ReceiveMessage(context.Background(), c.receiveMessageInput)
func (c sqsClient) ReceiveMessages(ctx context.Context) ([]messages.Message, error) {
out, err := c.svc.ReceiveMessage(ctx, c.receiveMessageInput)
if err != nil {
return nil, fmt.Errorf("unable to receive messages: %w", err)
}
Expand Down

0 comments on commit 0ffeeae

Please sign in to comment.