Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

common: jittered backoff implementation #3791

Merged
merged 5 commits into from
Jul 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ envoy_cc_library(
deps = [
":assert_lib",
"//include/envoy/common:backoff_strategy_interface",
"//include/envoy/runtime:runtime_interface",
],
)

Expand Down
34 changes: 13 additions & 21 deletions source/common/common/backoff_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,21 @@

namespace Envoy {

ExponentialBackOffStrategy::ExponentialBackOffStrategy(uint64_t initial_interval,
uint64_t max_interval, double multiplier)
: initial_interval_(initial_interval), max_interval_(max_interval), multiplier_(multiplier),
current_interval_(0) {
ASSERT(multiplier_ > 1.0);
ASSERT(initial_interval_ <= max_interval_);
ASSERT(initial_interval_ * multiplier_ <= max_interval_);
JitteredBackOffStrategy::JitteredBackOffStrategy(uint64_t base_interval, uint64_t max_interval,
Runtime::RandomGenerator& random)
: base_interval_(base_interval), max_interval_(max_interval), random_(random) {
ASSERT(base_interval_ <= max_interval_);
}

uint64_t ExponentialBackOffStrategy::nextBackOffMs() { return computeNextInterval(); }

void ExponentialBackOffStrategy::reset() { current_interval_ = 0; }

uint64_t ExponentialBackOffStrategy::computeNextInterval() {
if (current_interval_ == 0) {
current_interval_ = initial_interval_;
} else if (current_interval_ >= max_interval_) {
current_interval_ = max_interval_;
} else {
uint64_t new_interval = current_interval_;
new_interval = ceil(new_interval * multiplier_);
current_interval_ = new_interval > max_interval_ ? max_interval_ : new_interval;
uint64_t JitteredBackOffStrategy::nextBackOffMs() {
const uint64_t multiplier = (1 << current_retry_) - 1;
const uint64_t base_backoff = multiplier * base_interval_;
if (base_backoff <= max_interval_) {
current_retry_++;
}
return current_interval_;
return std::min(random_.random() % base_backoff, max_interval_);
}

void JitteredBackOffStrategy::reset() { current_retry_ = 1; }

} // namespace Envoy
25 changes: 15 additions & 10 deletions source/common/common/backoff_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,35 @@
#include <memory>

#include "envoy/common/backoff_strategy.h"
#include "envoy/runtime/runtime.h"

#include "common/common/assert.h"

namespace Envoy {

/**
* Implementation of BackOffStrategy that increases the back off period for each retry attempt. When
* the interval has reached the max interval, it is no longer increased.
* Implementation of BackOffStrategy that uses a fully jittered exponential backoff algorithm.
*/
class ExponentialBackOffStrategy : public BackOffStrategy {
class JitteredBackOffStrategy : public BackOffStrategy {

public:
ExponentialBackOffStrategy(uint64_t initial_interval, uint64_t max_interval, double multiplier);
/**
* Use this constructor if max_interval need to be enforced.
* @param base_interval the base_interval to be used for next backoff computation.
* @param max_interval if the computed next backoff is more than this, this will be returned.
* @param random the random generator
*/
JitteredBackOffStrategy(uint64_t base_interval, uint64_t max_interval,
Runtime::RandomGenerator& random);

// BackOffStrategy methods
uint64_t nextBackOffMs() override;
void reset() override;

private:
uint64_t computeNextInterval();

const uint64_t initial_interval_;
const uint64_t max_interval_;
const double multiplier_;
uint64_t current_interval_;
const uint64_t base_interval_;
const uint64_t max_interval_{};
uint64_t current_retry_{1};
Runtime::RandomGenerator& random_;
};
} // namespace Envoy
12 changes: 6 additions & 6 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ namespace Config {
GrpcMuxImpl::GrpcMuxImpl(const envoy::api::v2::core::Node& node, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
MonotonicTimeSource& time_source)
Runtime::RandomGenerator& random, MonotonicTimeSource& time_source)
: node_(node), async_client_(std::move(async_client)), service_method_(service_method),
time_source_(time_source) {
random_(random), time_source_(time_source) {
retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
backoff_strategy_ptr_ = std::make_unique<ExponentialBackOffStrategy>(
RETRY_INITIAL_DELAY_MS, RETRY_MAX_DELAY_MS, MULTIPLIER);
backoff_strategy_ = std::make_unique<JitteredBackOffStrategy>(RETRY_INITIAL_DELAY_MS,
RETRY_MAX_DELAY_MS, random_);
}

GrpcMuxImpl::~GrpcMuxImpl() {
Expand All @@ -31,7 +31,7 @@ GrpcMuxImpl::~GrpcMuxImpl() {
void GrpcMuxImpl::start() { establishNewStream(); }

void GrpcMuxImpl::setRetryTimer() {
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_ptr_->nextBackOffMs()));
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
}

void GrpcMuxImpl::establishNewStream() {
Expand Down Expand Up @@ -159,7 +159,7 @@ void GrpcMuxImpl::onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) {

void GrpcMuxImpl::onReceiveMessage(std::unique_ptr<envoy::api::v2::DiscoveryResponse>&& message) {
// Reset here so that it starts with fresh backoff interval on next disconnect.
backoff_strategy_ptr_->reset();
backoff_strategy_->reset();

const std::string& type_url = message->type_url();
ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url, message->version_info());
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class GrpcMuxImpl : public GrpcMux,
public:
GrpcMuxImpl(const envoy::api::v2::core::Node& node, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random,
MonotonicTimeSource& time_source = ProdMonotonicTimeSource::instance_);
~GrpcMuxImpl();

Expand All @@ -45,7 +46,6 @@ class GrpcMuxImpl : public GrpcMux,
// TODO(htuch): Make this configurable or some static.
const uint32_t RETRY_INITIAL_DELAY_MS = 500;
const uint32_t RETRY_MAX_DELAY_MS = 30000; // Do not cross more than 30s
const double MULTIPLIER = 2;

private:
void setRetryTimer();
Expand Down Expand Up @@ -103,8 +103,9 @@ class GrpcMuxImpl : public GrpcMux,
// Envoy's dependendency ordering.
std::list<std::string> subscriptions_;
Event::TimerPtr retry_timer_;
Runtime::RandomGenerator& random_;
MonotonicTimeSource& time_source_;
BackOffStrategyPtr backoff_strategy_ptr_;
BackOffStrategyPtr backoff_strategy_;
};

class NullGrpcMuxImpl : public GrpcMux {
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ template <class ResourceType>
class GrpcSubscriptionImpl : public Config::Subscription<ResourceType> {
public:
GrpcSubscriptionImpl(const envoy::api::v2::core::Node& node, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher,
Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random,
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats)
: grpc_mux_(node, std::move(async_client), dispatcher, service_method),
: grpc_mux_(node, std::move(async_client), dispatcher, service_method, random),
grpc_mux_subscription_(grpc_mux_, stats) {}

// Config::Subscription
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class SubscriptionFactory {
Config::Utility::factoryForGrpcApiConfigSource(cm.grpcAsyncClientManager(),
config.api_config_source(), scope)
->create(),
dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method),
stats));
dispatcher, random,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), stats));
break;
}
default:
Expand Down
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ envoy_cc_library(
"//include/envoy/runtime:runtime_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/common:assert_lib",
"//source/common/common:backoff_lib",
"//source/common/common:utility_lib",
"//source/common/grpc:common_lib",
"//source/common/http:codes_lib",
Expand Down
13 changes: 5 additions & 8 deletions source/common/router/retry_state_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,20 @@ RetryStateImpl::RetryStateImpl(const RetryPolicy& route_policy, Http::HeaderMap&
// Merge in the route policy.
retry_on_ |= route_policy.retryOn();
retries_remaining_ = std::max(retries_remaining_, route_policy.numRetries());
const uint32_t base = runtime_.snapshot().getInteger("upstream.base_retry_backoff_ms", 25);
// Cap the max interval to 10 times the base interval to ensure reasonable backoff intervals.
backoff_strategy_ = std::make_unique<JitteredBackOffStrategy>(base, base * 10, random_);
}

RetryStateImpl::~RetryStateImpl() { resetRetry(); }

void RetryStateImpl::enableBackoffTimer() {
// TODO(ramaraochavali): Implement JitteredExponentialBackOff and refactor this.
// We use a fully jittered exponential backoff algorithm.
current_retry_++;
uint32_t multiplier = (1 << current_retry_) - 1;
uint64_t base = runtime_.snapshot().getInteger("upstream.base_retry_backoff_ms", 25);
uint64_t timeout = random_.random() % (base * multiplier);

if (!retry_timer_) {
retry_timer_ = dispatcher_.createTimer([this]() -> void { callback_(); });
}

retry_timer_->enableTimer(std::chrono::milliseconds(timeout));
// We use a fully jittered exponential backoff algorithm.
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
}

uint32_t RetryStateImpl::parseRetryOn(absl::string_view config) {
Expand Down
4 changes: 3 additions & 1 deletion source/common/router/retry_state_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "envoy/runtime/runtime.h"
#include "envoy/upstream/upstream.h"

#include "common/common/backoff_strategy.h"

#include "absl/strings/string_view.h"
#include "absl/types/optional.h"

Expand Down Expand Up @@ -55,10 +57,10 @@ class RetryStateImpl : public RetryState {
Event::Dispatcher& dispatcher_;
uint32_t retry_on_{};
uint32_t retries_remaining_{1};
uint32_t current_retry_{};
DoRetryCallback callback_;
Event::TimerPtr retry_timer_;
Upstream::ResourcePriority priority_;
BackOffStrategyPtr backoff_strategy_;
};

} // namespace Router
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
->create(),
main_thread_dispatcher,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources")));
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_));
} else {
ads_mux_.reset(new Config::NullGrpcMuxImpl());
}
Expand Down
1 change: 1 addition & 0 deletions test/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_cc_test(
srcs = ["backoff_strategy_test.cc"],
deps = [
"//source/common/common:backoff_lib",
"//test/mocks/runtime:runtime_mocks",
],
)

Expand Down
82 changes: 42 additions & 40 deletions test/common/common/backoff_strategy_test.cc
Original file line number Diff line number Diff line change
@@ -1,57 +1,59 @@
#include "common/common/backoff_strategy.h"

#include "test/mocks/runtime/mocks.h"

#include "gtest/gtest.h"

using testing::NiceMock;
using testing::Return;

namespace Envoy {

TEST(BackOffStrategyTest, ExponentialBackOffBasicTest) {
ExponentialBackOffStrategy exponential_back_off(10, 100, 2);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(20, exponential_back_off.nextBackOffMs());
EXPECT_EQ(40, exponential_back_off.nextBackOffMs());
EXPECT_EQ(80, exponential_back_off.nextBackOffMs());
}
TEST(BackOffStrategyTest, JitteredBackOffBasicFlow) {
NiceMock<Runtime::MockRandomGenerator> random;
ON_CALL(random, random()).WillByDefault(Return(27));

TEST(BackOffStrategyTest, ExponentialBackOffFractionalMultiplier) {
ExponentialBackOffStrategy exponential_back_off(10, 50, 1.5);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(15, exponential_back_off.nextBackOffMs());
EXPECT_EQ(23, exponential_back_off.nextBackOffMs());
EXPECT_EQ(35, exponential_back_off.nextBackOffMs());
EXPECT_EQ(50, exponential_back_off.nextBackOffMs());
EXPECT_EQ(50, exponential_back_off.nextBackOffMs());
JitteredBackOffStrategy jittered_back_off(25, 30, random);
EXPECT_EQ(2, jittered_back_off.nextBackOffMs());
EXPECT_EQ(27, jittered_back_off.nextBackOffMs());
}

TEST(BackOffStrategyTest, ExponentialBackOffMaxIntervalReached) {
ExponentialBackOffStrategy exponential_back_off(10, 100, 2);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(20, exponential_back_off.nextBackOffMs());
EXPECT_EQ(40, exponential_back_off.nextBackOffMs());
EXPECT_EQ(80, exponential_back_off.nextBackOffMs());
EXPECT_EQ(100, exponential_back_off.nextBackOffMs()); // Should return Max here
EXPECT_EQ(100, exponential_back_off.nextBackOffMs()); // Should return Max here
}
TEST(BackOffStrategyTest, JitteredBackOffBasicReset) {
NiceMock<Runtime::MockRandomGenerator> random;
ON_CALL(random, random()).WillByDefault(Return(27));

TEST(BackOffStrategyTest, ExponentialBackOfReset) {
ExponentialBackOffStrategy exponential_back_off(10, 100, 2);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(20, exponential_back_off.nextBackOffMs());
JitteredBackOffStrategy jittered_back_off(25, 30, random);
EXPECT_EQ(2, jittered_back_off.nextBackOffMs());
EXPECT_EQ(27, jittered_back_off.nextBackOffMs());

exponential_back_off.reset();
EXPECT_EQ(10, exponential_back_off.nextBackOffMs()); // Should start from start
jittered_back_off.reset();
EXPECT_EQ(2, jittered_back_off.nextBackOffMs()); // Should start from start
}

TEST(BackOffStrategyTest, ExponentialBackOfResetAfterMaxReached) {
ExponentialBackOffStrategy exponential_back_off(10, 100, 2);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(20, exponential_back_off.nextBackOffMs());
EXPECT_EQ(40, exponential_back_off.nextBackOffMs());
EXPECT_EQ(80, exponential_back_off.nextBackOffMs());
EXPECT_EQ(100, exponential_back_off.nextBackOffMs()); // Should return Max here
TEST(BackOffStrategyTest, JitteredBackOffWithMaxInterval) {
NiceMock<Runtime::MockRandomGenerator> random;
ON_CALL(random, random()).WillByDefault(Return(1024));

JitteredBackOffStrategy jittered_back_off(5, 100, random);
EXPECT_EQ(4, jittered_back_off.nextBackOffMs());
EXPECT_EQ(4, jittered_back_off.nextBackOffMs());
EXPECT_EQ(9, jittered_back_off.nextBackOffMs());
EXPECT_EQ(49, jittered_back_off.nextBackOffMs());
EXPECT_EQ(94, jittered_back_off.nextBackOffMs());
EXPECT_EQ(94, jittered_back_off.nextBackOffMs()); // Should return Max here
}

exponential_back_off.reset();
TEST(BackOffStrategyTest, JitteredBackOffWithMaxIntervalReset) {
NiceMock<Runtime::MockRandomGenerator> random;
ON_CALL(random, random()).WillByDefault(Return(1024));

EXPECT_EQ(10, exponential_back_off.nextBackOffMs()); // Should start from start
}
JitteredBackOffStrategy jittered_back_off(5, 100, random);
EXPECT_EQ(4, jittered_back_off.nextBackOffMs());
EXPECT_EQ(4, jittered_back_off.nextBackOffMs());
EXPECT_EQ(9, jittered_back_off.nextBackOffMs());
EXPECT_EQ(49, jittered_back_off.nextBackOffMs());

jittered_back_off.reset();
EXPECT_EQ(4, jittered_back_off.nextBackOffMs()); // Should start from start
}
} // namespace Envoy
1 change: 1 addition & 0 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ envoy_cc_test(
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/runtime:runtime_mocks",
"//test/test_common:logging_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/api/v2:discovery_cc",
Expand Down
5 changes: 4 additions & 1 deletion test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "test/mocks/config/mocks.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/grpc/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/test_common/logging.h"
#include "test/test_common/utility.h"

Expand Down Expand Up @@ -44,7 +45,7 @@ class GrpcMuxImplTest : public testing::Test {
dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
time_source_));
random_, time_source_));
}

void expectSendMessage(const std::string& type_url,
Expand Down Expand Up @@ -72,6 +73,7 @@ class GrpcMuxImplTest : public testing::Test {

envoy::api::v2::core::Node node_;
NiceMock<Event::MockDispatcher> dispatcher_;
Runtime::MockRandomGenerator random_;
Grpc::MockAsyncClient* async_client_;
Event::MockTimer* timer_;
Event::TimerCb timer_cb_;
Expand Down Expand Up @@ -112,6 +114,7 @@ TEST_F(GrpcMuxImplTest, ResetStream) {
expectSendMessage("baz", {"z"}, "");
grpc_mux_->start();

EXPECT_CALL(random_, random());
EXPECT_CALL(*timer_, enableTimer(_));
grpc_mux_->onRemoteClose(Grpc::Status::GrpcStatus::Canceled, "");
EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_));
Expand Down
Loading