Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smarter Throttle Gate #839

Merged
merged 3 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: go
go:
- 1.12
- 1.13
script:
- ./check_format.sh
- env GO111MODULE=on make lint
Expand Down
2 changes: 2 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ v1.1.3
- Added --live flag to get, download, configure to automatically set
configuration to the published theme
- Added --hidepb flag to hide preview bar (#829)
- Improved request throttling and retrying, to eliminate ABS interactions and
hanging operations (#839)

v1.1.2 (Sep 29, 2020)
=====================
Expand Down
33 changes: 11 additions & 22 deletions src/httpify/client.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package httpify

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"runtime"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -142,30 +139,22 @@ func (client *HTTPClient) do(method, path string, body interface{}, headers map[
}

func (client *HTTPClient) doWithRetry(req *http.Request, body interface{}) (*http.Response, error) {
for attempt := 0; attempt <= client.maxRetry; {
// reset the body when non-nil for every request (rewind)
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return nil, err
}
req.Body = ioutil.NopCloser(bytes.NewBuffer(data))
var bodyData []byte
var err error
if body != nil {
bodyData, err = json.Marshal(body)
if err != nil {
return nil, err
}
}

client.limit.Wait()
resp, err := httpClient.Do(req)
if err == nil {
if resp.StatusCode >= 100 && resp.StatusCode <= 428 {
return resp, nil
} else if resp.StatusCode == http.StatusTooManyRequests {
after, _ := strconv.ParseFloat(resp.Header.Get("Retry-After"), 10)
client.limit.ResetAfter(time.Duration(after) * time.Second)
continue
}
for attempt := 0; attempt <= client.maxRetry; attempt++ {
resp, err := client.limit.GateReq(httpClient, req, bodyData)
if err == nil && resp.StatusCode >= 100 && resp.StatusCode < 500 {
return resp, nil
} else if strings.Contains(err.Error(), "no such host") {
return nil, ErrConnectionIssue
}
attempt++
time.Sleep(time.Duration(attempt) * time.Second)
}
return nil, fmt.Errorf("request failed after %v retries", client.maxRetry)
Expand Down
65 changes: 54 additions & 11 deletions src/ratelimiter/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package ratelimiter

import (
"bytes"
"context"
"errors"
"golang.org/x/time/rate"
"io/ioutil"
"net/http"
"strconv"
"time"
)

Expand All @@ -12,34 +17,72 @@ var domainLimitMap = make(map[string]*Limiter)
type Limiter struct {
perSecond rate.Limit
rate *rate.Limiter
waiting chan int
ctx context.Context
cancel context.CancelFunc
locked bool
}

// New creates a new call rate limiter for a single domain
func New(domain string, reqPerSec int) *Limiter {
if _, ok := domainLimitMap[domain]; !ok {
everySecond := rate.Every(time.Second / time.Duration(reqPerSec))
ctx, cancel := context.WithCancel(context.Background())
domainLimitMap[domain] = &Limiter{
perSecond: everySecond,
rate: rate.NewLimiter(everySecond, reqPerSec),
ctx: ctx,
cancel: cancel,
}
}
return domainLimitMap[domain]
}

// ResetAfter will reset the bucket to 0, wait for the amount of time until it resumes
// This will allow the rate limiter to stop all activity and restart slowly
func (limiter *Limiter) ResetAfter(after time.Duration) {
if limiter.rate.Limit() == 0 {
// GateReq will make the http request but will force it to comply with concurrent limits,
// rate limits, and it will also retry requests that receive 429.
// When a 429 occurs, it will cancel all inflight requests and pauses, so that the requests
// dont continue to batter the server and cause bot detection
func (limiter *Limiter) GateReq(client *http.Client, origReq *http.Request, body []byte) (*http.Response, error) {
limiter.rate.Wait(context.Background())
req := origReq.WithContext(limiter.ctx)
// reset the body when non-nil for every request (rewind)
if len(body) > 0 {
req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
}
resp, err := client.Do(req)
if err == nil && resp.StatusCode == http.StatusTooManyRequests {
limiter.retryAfter(resp.Header.Get("Retry-After"))
return limiter.GateReq(client, origReq, body)
} else if errors.Is(err, context.Canceled) {
<-limiter.waiting
return limiter.GateReq(client, origReq, body)
}
return resp, err
}

func (limiter *Limiter) retryAfter(header string) {
limiter.lock()
defer limiter.unlock()
after, _ := strconv.ParseFloat(header, 10)
time.Sleep(time.Duration(after) * time.Second)
}

func (limiter *Limiter) lock() {
if limiter.locked {
return
}
limiter.locked = true
limiter.waiting = make(chan int)
limiter.rate.SetLimit(0)
go func() {
time.Sleep(after)
limiter.rate.SetLimit(limiter.perSecond)
}()
limiter.cancel()
}

// Wait will block until enough time has passed and the limit will not be passed
func (limiter *Limiter) Wait() {
limiter.rate.Wait(context.Background())
func (limiter *Limiter) unlock() {
if !limiter.locked {
return
}
limiter.rate.SetLimit(limiter.perSecond)
limiter.ctx, limiter.cancel = context.WithCancel(context.Background())
close(limiter.waiting)
limiter.locked = false
}
17 changes: 14 additions & 3 deletions src/ratelimiter/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,22 @@ func TestRateLimiterForDomain(t *testing.T) {
assert.NotEqual(t, limiter2, limiter3)
}

func TestRateLimiterResetAfter(t *testing.T) {
func TestRateLimiterLockUnlock(t *testing.T) {
limiter := New("domain.com", 1)
assert.Equal(t, limiter.rate.Limit(), rate.Limit(1))
limiter.ResetAfter(time.Millisecond)
limiter.lock()
<-limiter.ctx.Done()
assert.Equal(t, limiter.rate.Limit(), rate.Limit(0))
time.Sleep(2 * time.Millisecond)
limiter.unlock()
assert.Equal(t, limiter.rate.Limit(), rate.Limit(1))
assert.Nil(t, limiter.ctx.Err())
limiter.unlock()
}

func TestRateLimiterRetryAfter(t *testing.T) {
limiter := New("domain.com", 1)
expected := time.Now().Add(2 * time.Second)
limiter.retryAfter("2.0")
after := time.Now()
assert.True(t, after.After(expected) || after.Equal(expected))
}