From 03d631cf407fd109af107a02b6c1da2d61f5f2c1 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Tue, 4 May 2021 21:26:49 +0100 Subject: [PATCH 1/3] Exponential Backoff for ByteFIFO This PR is another in the vein of queue improvements. It suggests an exponential backoff for bytefifo queues to reduce the load from queue polling. This will mostly be useful for redis queues. Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index bc86078493307..47e248490c40c 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -114,6 +114,23 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func() } func (q *ByteFIFOQueue) readToChan() { + backOffTime := time.Millisecond * 100 + maxBackOffTime := time.Second * 3 + backOff := func() time.Duration { + backOffTime += backOffTime / 2 + if backOffTime > maxBackOffTime { + backOffTime = maxBackOffTime + return maxBackOffTime + } + return backOffTime + } + doBackOff := func() { + select { + case <-q.closed: + case <-time.After(backOff()): + } + } + for { select { case <-q.closed: @@ -126,21 +143,23 @@ func (q *ByteFIFOQueue) readToChan() { if err != nil { q.lock.Unlock() log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) - time.Sleep(time.Millisecond * 100) + doBackOff() continue } if len(bs) == 0 { q.lock.Unlock() - time.Sleep(time.Millisecond * 100) + doBackOff() continue } + backOffTime = time.Millisecond * 100 + data, err := unmarshalAs(bs, q.exemplar) if err != nil { log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) q.lock.Unlock() - time.Sleep(time.Millisecond * 100) + doBackOff() continue } From c1748664ff125889a5d2e28ebf7ae7af252019e2 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 8 May 2021 12:58:00 +0100 Subject: [PATCH 2/3] Update modules/queue/queue_bytefifo.go Co-authored-by: Lauris BH --- modules/queue/queue_bytefifo.go | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index 47e248490c40c..1993bd63bcc47 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -120,7 +120,6 @@ func (q *ByteFIFOQueue) readToChan() { backOffTime += backOffTime / 2 if backOffTime > maxBackOffTime { backOffTime = maxBackOffTime - return maxBackOffTime } return backOffTime } From bda7fa305ba6cb6facb2c6690ba89f558cba0944 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 8 May 2021 14:48:23 +0100 Subject: [PATCH 3/3] as per lunny Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 98 ++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 43 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index 1993bd63bcc47..fe1fb7807e831 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -114,59 +114,71 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func() } func (q *ByteFIFOQueue) readToChan() { + // handle quick cancels + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + default: + } + backOffTime := time.Millisecond * 100 maxBackOffTime := time.Second * 3 - backOff := func() time.Duration { - backOffTime += backOffTime / 2 - if backOffTime > maxBackOffTime { - backOffTime = maxBackOffTime - } - return backOffTime - } - doBackOff := func() { - select { - case <-q.closed: - case <-time.After(backOff()): + for { + success, resetBackoff := q.doPop() + if resetBackoff { + backOffTime = 100 * time.Millisecond } - } - for { - select { - case <-q.closed: - // tell the pool to shutdown. - q.cancel() - return - default: - q.lock.Lock() - bs, err := q.byteFIFO.Pop() - if err != nil { - q.lock.Unlock() - log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) - doBackOff() - continue + if success { + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + default: } - - if len(bs) == 0 { - q.lock.Unlock() - doBackOff() - continue + } else { + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + case <-time.After(backOffTime): + } + backOffTime += backOffTime / 2 + if backOffTime > maxBackOffTime { + backOffTime = maxBackOffTime } + } + } +} - backOffTime = time.Millisecond * 100 +func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) { + q.lock.Lock() + defer q.lock.Unlock() + bs, err := q.byteFIFO.Pop() + if err != nil { + log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) + return + } + if len(bs) == 0 { + return + } - data, err := unmarshalAs(bs, q.exemplar) - if err != nil { - log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) - q.lock.Unlock() - doBackOff() - continue - } + resetBackoff = true - log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) - q.WorkerPool.Push(data) - q.lock.Unlock() - } + data, err := unmarshalAs(bs, q.exemplar) + if err != nil { + log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) + return } + + log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) + q.WorkerPool.Push(data) + success = true + return } // Shutdown processing from this queue