Skip to content

Commit

Permalink
Merge pull request #249 from sendgrid/rate_limit
Browse files Browse the repository at this point in the history
Adding optional rate limit support.
  • Loading branch information
thinkingserious authored Dec 21, 2017
2 parents b98bbd8 + ae18ce7 commit 0f23918
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 7 deletions.
19 changes: 19 additions & 0 deletions src/main/java/com/sendgrid/APICallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.sendgrid;

/**
* An interface describing a callback mechanism for the
* asynchronous, rate limit aware API connection.
*/
public interface APICallback {
/**
* Callback method in case of an error.
* @param ex the error that was thrown.
*/
public void error(Exception ex);

/**
* Callback method in case of a valid response.
* @param response the valid response.
*/
public void response(Response response);
}
36 changes: 36 additions & 0 deletions src/main/java/com/sendgrid/RateLimitException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.sendgrid;

/**
* An exception thrown when the maximum number of retries
* have occurred, and the API calls are still rate limited.
*/
public class RateLimitException extends Exception {
private final Request request;
private final int retryCount;

/**
* Construct a new exception.
* @param request the originating request object.
* @param retryCount the number of times a retry was attempted.
*/
public RateLimitException(Request request, int retryCount) {
this.request = request;
this.retryCount = retryCount;
}

/**
* Get the originating request object.
* @return the request object.
*/
public Request getRequest() {
return this.request;
}

/**
* Get the number of times the action was attemted.
* @return the retry count.
*/
public int getRetryCount() {
return this.retryCount;
}
}
126 changes: 119 additions & 7 deletions src/main/java/com/sendgrid/SendGrid.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

/**
* Class SendGrid allows for quick and easy access to the SendGrid API.
Expand All @@ -13,6 +15,10 @@ public class SendGrid implements SendGridAPI {

/** The user agent string to return to SendGrid. */
private static final String USER_AGENT = "sendgrid/" + VERSION + ";java";
private static final int RATE_LIMIT_RESPONSE_CODE = 429;
private static final int THREAD_POOL_SIZE = 8;

private ExecutorService pool;

/** The user's API key. */
private String apiKey;
Expand All @@ -27,7 +33,13 @@ public class SendGrid implements SendGridAPI {
private Client client;

/** The request headers container. */
private Map<String,String> requestHeaders;
private Map<String, String> requestHeaders;

/** The number of times to try after a rate limit. */
private int rateLimitRetry;

/** The number of milliseconds to sleep between retries. */
private int rateLimitSleep;

/**
* Construct a new SendGrid API wrapper.
Expand Down Expand Up @@ -70,6 +82,10 @@ public void initializeSendGrid(String apiKey) {
this.requestHeaders.put("Authorization", "Bearer " + apiKey);
this.requestHeaders.put("User-agent", USER_AGENT);
this.requestHeaders.put("Accept", "application/json");
this.rateLimitRetry = 5;
this.rateLimitSleep = 1100;

this.pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
}

/**
Expand Down Expand Up @@ -100,7 +116,7 @@ public void setVersion(String version) {
* Obtain the request headers.
* @return the request headers.
*/
public Map<String,String> getRequestHeaders() {
public Map<String, String> getRequestHeaders() {
return this.requestHeaders;
}

Expand All @@ -110,7 +126,7 @@ public Map<String,String> getRequestHeaders() {
* @param value the header value.
* @return the new set of request headers.
*/
public Map<String,String> addRequestHeader(String key, String value) {
public Map<String, String> addRequestHeader(String key, String value) {
this.requestHeaders.put(key, value);
return getRequestHeaders();
}
Expand All @@ -120,7 +136,7 @@ public Map<String,String> addRequestHeader(String key, String value) {
* @param key the header key to remove.
* @return the new set of request headers.
*/
public Map<String,String> removeRequestHeader(String key) {
public Map<String, String> removeRequestHeader(String key) {
this.requestHeaders.remove(key);
return getRequestHeaders();
}
Expand All @@ -142,7 +158,43 @@ public void setHost(String host) {
}

/**
* Class makeCall makes the call to the SendGrid API, override this method for testing.
* Get the maximum number of retries on a rate limit response.
* The default is 5.
* @return the number of retries on a rate limit.
*/
public int getRateLimitRetry() {
return this.rateLimitRetry;
}

/**
* Set the maximum number of retries on a rate limit response.
* @param rateLimitRetry the maximum retry count.
*/
public void setRateLimitRetry(int rateLimitRetry) {
this.rateLimitRetry = rateLimitRetry;
}

/**
* Get the duration of time (in milliseconds) to sleep between
* consecutive rate limit retries. The SendGrid API enforces
* the rate limit to the second. The default value is 1.1 seconds.
* @return the sleep duration.
*/
public int getRateLimitSleep() {
return this.rateLimitSleep;
}

/**
* Set the duration of time (in milliseconds) to sleep between
* consecutive rate limit retries.
* @param rateLimitSleep the sleep duration.
*/
public void setRateLimitSleep(int rateLimitSleep) {
this.rateLimitSleep = rateLimitSleep;
}

/**
* Makes the call to the SendGrid API, override this method for testing.
* @param request the request to make.
* @return the response object.
* @throws IOException in case of a network error.
Expand All @@ -163,13 +215,73 @@ public Response api(Request request) throws IOException {
req.setBaseUri(this.host);
req.setEndpoint("/" + version + "/" + request.getEndpoint());
req.setBody(request.getBody());
for (Map.Entry <String, String> header : this.requestHeaders.entrySet()) {
for (Map.Entry<String, String> header : this.requestHeaders.entrySet()) {
req.addHeader(header.getKey(), header.getValue());
}
for (Map.Entry <String, String> queryParam : request.getQueryParams().entrySet()) {
for (Map.Entry<String, String> queryParam : request.getQueryParams().entrySet()) {
req.addQueryParam(queryParam.getKey(), queryParam.getValue());
}

return makeCall(req);
}

/**
* Attempt an API call. This method executes the API call asynchronously
* on an internal thread pool. If the call is rate limited, the thread
* will retry up to the maximum configured time.
* @param request the API request.
*/
public void attempt(Request request) {
this.attempt(request, new APICallback() {
@Override
public void error(Exception ex) {
}

public void response(Response r) {
}
});
}

/**
* Attempt an API call. This method executes the API call asynchronously
* on an internal thread pool. If the call is rate limited, the thread
* will retry up to the maximum configured time. The supplied callback
* will be called in the event of an error, or a successful response.
* @param request the API request.
* @param callback the callback.
*/
public void attempt(final Request request, final APICallback callback) {
this.pool.execute(new Runnable() {
@Override
public void run() {
Response response;

// Retry until the retry limit has been reached.
for (int i = 0; i < rateLimitRetry; ++i) {
try {
response = api(request);
} catch (IOException ex) {
// Stop retrying if there is a network error.
callback.error(ex);
return;
}

// We have been rate limited.
if (response.getStatusCode() == RATE_LIMIT_RESPONSE_CODE) {
try {
Thread.sleep(rateLimitSleep);
} catch (InterruptedException ex) {
// Can safely ignore this exception and retry.
}
} else {
callback.response(response);
return;
}
}

// Retries exhausted. Return error.
callback.error(new RateLimitException(request, rateLimitRetry));
}
});
}
}
100 changes: 100 additions & 0 deletions src/test/java/com/sendgrid/SendGridTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,106 @@ public void testHost() {
Assert.assertEquals(sg.getHost(), "api.new.com");
}

@Test
public void testRateLimitRetry() {
SendGrid sg = new SendGrid(SENDGRID_API_KEY);
sg.setRateLimitRetry(100);
Assert.assertEquals(sg.getRateLimitRetry(), 100);
}

@Test
public void testRateLimitSleep() {
SendGrid sg = new SendGrid(SENDGRID_API_KEY);
sg.setRateLimitSleep(999);
Assert.assertEquals(sg.getRateLimitSleep(), 999);
}


@Test
public void test_async() {
final Object sync = new Object();
SendGrid sg = null;
if(System.getenv("TRAVIS") != null && Boolean.parseBoolean(System.getenv("TRAVIS"))) {
sg = new SendGrid("SENDGRID_API_KEY");
sg.setHost(System.getenv("MOCK_HOST"));
} else {
sg = new SendGrid("SENDGRID_API_KEY", true);
sg.setHost("localhost:4010");
}
sg.addRequestHeader("X-Mock", "200");

Request request = new Request();

request.setMethod(Method.GET);
request.setEndpoint("access_settings/activity");
request.addQueryParam("limit", "1");
sg.attempt(request, new APICallback() {
@Override
public void error(Exception e) {
Assert.fail();
synchronized(sync) {
sync.notify();
}
}

@Override
public void response(Response response) {
Assert.assertEquals(200, response.getStatusCode());
synchronized(sync) {
sync.notify();
}
}
});

try {
synchronized(sync) {
sync.wait(2000);
}
} catch(InterruptedException ex) {
Assert.fail(ex.toString());
}
}

@Test
public void test_async_rate_limit() {
final Object sync = new Object();
SendGrid sg = null;
if(System.getenv("TRAVIS") != null && Boolean.parseBoolean(System.getenv("TRAVIS"))) {
sg = new SendGrid("SENDGRID_API_KEY");
sg.setHost(System.getenv("MOCK_HOST"));
} else {
sg = new SendGrid("SENDGRID_API_KEY", true);
sg.setHost("localhost:4010");
}
sg.addRequestHeader("X-Mock", "429");

Request request = new Request();

request.setMethod(Method.GET);
request.setEndpoint("access_settings/activity");
request.addQueryParam("limit", "1");
sg.attempt(request, new APICallback() {
@Override
public void error(Exception e) {
Assert.assertEquals(e.getClass(), RateLimitException.class);
sync.notify();
}

@Override
public void response(Response response) {
Assert.fail();
sync.notify();
}
});

try {
synchronized(sync) {
sync.wait(2000);
}
} catch(InterruptedException ex) {
Assert.fail(ex.toString());
}
}

@Test
public void test_access_settings_activity_get() throws IOException {
Expand Down

0 comments on commit 0f23918

Please sign in to comment.