Skip to content

Commit

Permalink
feat: reportFunc returns a boolean value to decide whether the erro…
Browse files Browse the repository at this point in the history
…r counts towards to threshold

BREAKING CHANGE: `reportFunc` must return a boolean
  • Loading branch information
francescopepe committed Jun 17, 2024
1 parent 24d1260 commit ab739c8
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 15 deletions.
8 changes: 5 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type ErrorConfiguration struct {
// Default: 120s.
Period time.Duration

// The error report function
ReportFunc func(err error)
// The error report function, returns a boolean value to decide whether the error counts towards to threshold
ReportFunc func(err error) bool
}

// The MultiMessageBufferConfiguration defines a buffer which is consumed by the worker when either
Expand Down Expand Up @@ -107,8 +107,10 @@ func setWorkerConfigValues(config Configuration) Configuration {
}

if config.ErrorConfig.ReportFunc == nil {
config.ErrorConfig.ReportFunc = func(err error) {
config.ErrorConfig.ReportFunc = func(err error) bool {
log.Println("ERROR", err)

return true
}
}

Expand Down
2 changes: 1 addition & 1 deletion consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func makeAvailableConsumers(concurrency int) chan struct{} {
return consumers
}

// wrapHandler catches any panic error, logs it and returns the error that generated it.
// wrapHandler catches any panic error and returns the error that generated it.
// It prevents the worker from crashing in case of an unexpected error.
func wrapHandler(handler func() error) (err error) {
defer func() {
Expand Down
7 changes: 6 additions & 1 deletion controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type controller struct {
// the controller.
func (c *controller) decreaseCounterAfterTimeout() {
time.Sleep(c.errorConfig.Period)

c.mutex.Lock()
defer c.mutex.Unlock()

c.errorCounter--
}

Expand All @@ -29,6 +31,7 @@ func (c *controller) increaseCounter() {
// Increase counter
c.mutex.Lock()
defer c.mutex.Unlock()

c.errorCounter++
}

Expand All @@ -40,7 +43,9 @@ func (c *controller) shouldStop() bool {
}

func (c *controller) reportError(err error) {
c.errorConfig.ReportFunc(err)
if shouldIncreaseCounter := c.errorConfig.ReportFunc(err); !shouldIncreaseCounter {
return
}

c.increaseCounter()
go c.decreaseCounterAfterTimeout()
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ module github.com/francescopepe/formigo

go 1.20

require github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2
require github.com/aws/aws-sdk-go-v2/service/sqs v1.32.6

require (
github.com/aws/aws-sdk-go-v2 v1.18.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.27.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
)
18 changes: 18 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo=
github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA=
github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2 v1.27.2 h1:pLsTXqX93rimAOZG2FIYraDQstZaaGVVN4tNw65v0h8=
github.com/aws/aws-sdk-go-v2 v1.27.2/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 h1:A5UqQEmPaCFpedKouS4v+dHCTUo2sKqhoKO9U5kxyWo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34/go.mod h1:wZpTEecJe0Btj3IYnDx/VlUzor9wm3fJHyvLpQF0VwY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9 h1:cy8ahBJuhtM8GTTSyOkfy6WVPV1IE+SS5/wfXUYuulw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9/go.mod h1:CZBXGLaJnEZI6EVNcPd7a6B5IC5cA/GkRWtu9fp3S6Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 h1:srIVS45eQuewqz6fKKu6ZGXaq6FuFg5NzgQBAM6g8Y4=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28/go.mod h1:7VRpKQQedkfIEXb4k52I7swUnZP0wohVajJMRn3vsUw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9 h1:A4SYk07ef04+vxZToz9LWvAXl9LW0NClpPpMsi31cz0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9/go.mod h1:5jJcHuwDagxN+ErjQ3PU3ocf6Ylc/p9x+BLO/+X4iXw=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2 h1:Y2vfLiY3HmaMisuwx6fS2kMRYbajRXXB+9vesGVPseY=
github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2/go.mod h1:TaV67b6JMD1988x/uMDop/JnMFK6v5d4Ru+sDmFg+ww=
github.com/aws/aws-sdk-go-v2/service/sqs v1.31.4 h1:mE2ysZMEeQ3ulHWs4mmc4fZEhOfeY1o6QXAfDqjbSgw=
github.com/aws/aws-sdk-go-v2/service/sqs v1.31.4/go.mod h1:lCN2yKnj+Sp9F6UzpoPPTir+tSaC9Jwf6LcmTqnXFZw=
github.com/aws/aws-sdk-go-v2/service/sqs v1.32.6 h1:FrGnU+Ggf+jUFj1O7Pdw5hCk42dmyO9TOTCVL7mDISk=
github.com/aws/aws-sdk-go-v2/service/sqs v1.32.6/go.mod h1:2Ef3ZgVWL7lyz5YZf854YkMboK6qF1NbG/0hc9StZsg=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q=
github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down
2 changes: 1 addition & 1 deletion internal/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type BufferWithContextTimeoutConfiguration struct {
Size int
}

// bufferWithContextTimeout is used to construct a buffer that has a context timeout
// BufferWithContextTimeout is used to construct a buffer that has a context timeout
// along with the standard buffer timeout. This is used because the messages have to
// be processed within a certain period and if this doesn't happen, the buffer should
// delete the messages in it and reset.
Expand Down
8 changes: 4 additions & 4 deletions retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr
case <-ctx.Done():
return
default:
messages, err := receiver.ReceiveMessages()
msgs, err := receiver.ReceiveMessages()
if err != nil {
ctrl.reportError(fmt.Errorf("unable to receive message: %w", err))
continue
Expand All @@ -27,9 +27,9 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr
// This means that the retriever won't listen for context cancellation
// at this stage.
func() {
for _, message := range messages {
for _, msg := range msgs {
select {
case <-message.Ctx.Done():
case <-msg.Ctx.Done():
// If consumers don't pick up the messages within the messages' timeout we raise
// an error.
// This could be due to one or more of the following reasons:
Expand All @@ -42,7 +42,7 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr
ctrl.reportError(errors.New("message didn't get picked up by any consumer within its timeout"))

return // Avoid publishing all the messages downstream
case messageCh <- message:
case messageCh <- msg:
// Message pushed to the channel
}
}
Expand Down

0 comments on commit ab739c8

Please sign in to comment.