Skip to content

Commit

Permalink
http: delaying attach pending requests (#2871)
Browse files Browse the repository at this point in the history
Attach pending upstream requests in next event after onResponseComplete.

Risk Level: Medium
Testing: unit test, integration test
Docs Changes: N/A
Release Notes: N/A

Fixes #2715

Signed-off-by: Lizan Zhou <zlizan@google.com>
  • Loading branch information
lizan authored and alyssawilk committed Apr 24, 2018
1 parent 321aae9 commit ee6f148
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 13 deletions.
43 changes: 36 additions & 7 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ namespace Envoy {
namespace Http {
namespace Http1 {

ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host,
Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options)
: dispatcher_(dispatcher), host_(host), priority_(priority), socket_options_(options),
upstream_ready_timer_(dispatcher_.createTimer([this]() { onUpstreamReady(); })) {}

ConnPoolImpl::~ConnPoolImpl() {
while (!ready_clients_.empty()) {
ready_clients_.front()->codec_client_->close();
Expand Down Expand Up @@ -180,7 +186,7 @@ void ConnPoolImpl::onConnectionEvent(ActiveClient& client, Network::ConnectionEv
// whether the client is in the ready list (connected) or the busy list (failed to connect).
if (event == Network::ConnectionEvent::Connected) {
conn_connect_ms_->complete();
processIdleClient(client);
processIdleClient(client, false);
}
}

Expand Down Expand Up @@ -209,25 +215,48 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) {
host_->cluster().stats().upstream_cx_max_requests_.inc();
onDownstreamReset(client);
} else {
processIdleClient(client);
// Upstream connection might be closed right after response is complete. Setting delay=true
// here to attach pending requests in next dispatcher loop to handle that case.
// https://github.com/envoyproxy/envoy/issues/2715
processIdleClient(client, true);
}
}

void ConnPoolImpl::onUpstreamReady() {
upstream_ready_enabled_ = false;
while (!pending_requests_.empty() && !ready_clients_.empty()) {
ActiveClient& client = *ready_clients_.front();
ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_);
// There is work to do so bind a request to the client and move it to the busy list. Pending
// requests are pushed onto the front, so pull from the back.
attachRequestToClient(client, pending_requests_.back()->decoder_,
pending_requests_.back()->callbacks_);
pending_requests_.pop_back();
client.moveBetweenLists(ready_clients_, busy_clients_);
}
}

void ConnPoolImpl::processIdleClient(ActiveClient& client) {
void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) {
client.stream_wrapper_.reset();
if (pending_requests_.empty()) {
// There is nothing to service so just move the connection into the ready list.
if (pending_requests_.empty() || delay) {
// There is nothing to service or delayed processing is requested, so just move the connection
// into the ready list.
ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_);
client.moveBetweenLists(busy_clients_, ready_clients_);
} else {
// There is work to do so bind a request to the client and move it to the busy list. Pending
// requests are pushed onto the front, so pull from the back.
// There is work to do immediately so bind a request to the client and move it to the busy list.
// Pending requests are pushed onto the front, so pull from the back.
ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_);
attachRequestToClient(client, pending_requests_.back()->decoder_,
pending_requests_.back()->callbacks_);
pending_requests_.pop_back();
}

if (delay && !pending_requests_.empty() && !upstream_ready_enabled_) {
upstream_ready_enabled_ = true;
upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0));
}

checkForDrained();
}

Expand Down
8 changes: 5 additions & 3 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
public:
ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host,
Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options)
: dispatcher_(dispatcher), host_(host), priority_(priority), socket_options_(options) {}
const Network::ConnectionSocket::OptionsSharedPtr& options);

~ConnPoolImpl();

Expand Down Expand Up @@ -123,7 +122,8 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void onDownstreamReset(ActiveClient& client);
void onPendingRequestCancel(PendingRequest& request);
void onResponseComplete(ActiveClient& client);
void processIdleClient(ActiveClient& client);
void onUpstreamReady();
void processIdleClient(ActiveClient& client, bool delay);

Stats::TimespanPtr conn_connect_ms_;
Event::Dispatcher& dispatcher_;
Expand All @@ -134,6 +134,8 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
std::list<DrainedCb> drained_callbacks_;
Upstream::ResourcePriority priority_;
const Network::ConnectionSocket::OptionsSharedPtr socket_options_;
Event::TimerPtr upstream_ready_timer_;
bool upstream_ready_enabled_{false};
};

/**
Expand Down
88 changes: 85 additions & 3 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ namespace Http1 {
class ConnPoolImplForTest : public ConnPoolImpl {
public:
ConnPoolImplForTest(Event::MockDispatcher& dispatcher,
Upstream::ClusterInfoConstSharedPtr cluster)
Upstream::ClusterInfoConstSharedPtr cluster,
NiceMock<Event::MockTimer>* upstream_ready_timer)
: ConnPoolImpl(dispatcher, Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000"),
Upstream::ResourcePriority::Default, nullptr),
mock_dispatcher_(dispatcher) {}
mock_dispatcher_(dispatcher), mock_upstream_ready_timer_(upstream_ready_timer) {}

~ConnPoolImplForTest() {
EXPECT_EQ(0U, ready_clients_.size());
Expand Down Expand Up @@ -98,7 +99,19 @@ class ConnPoolImplForTest : public ConnPoolImpl {
EXPECT_CALL(*test_client.connect_timer_, enableTimer(_));
}

void expectEnableUpstreamReady() {
EXPECT_FALSE(upstream_ready_enabled_);
EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(1).RetiresOnSaturation();
}

void expectAndRunUpstreamReady() {
EXPECT_TRUE(upstream_ready_enabled_);
mock_upstream_ready_timer_->callback_();
EXPECT_FALSE(upstream_ready_enabled_);
}

Event::MockDispatcher& mock_dispatcher_;
NiceMock<Event::MockTimer>* mock_upstream_ready_timer_;
std::vector<TestCodecClient> test_clients_;
};

Expand All @@ -107,7 +120,9 @@ class ConnPoolImplForTest : public ConnPoolImpl {
*/
class Http1ConnPoolImplTest : public testing::Test {
public:
Http1ConnPoolImplTest() : conn_pool_(dispatcher_, cluster_) {}
Http1ConnPoolImplTest()
: upstream_ready_timer_(new NiceMock<Event::MockTimer>(&dispatcher_)),
conn_pool_(dispatcher_, cluster_, upstream_ready_timer_) {}

~Http1ConnPoolImplTest() {
// Make sure all gauges are 0.
Expand All @@ -118,6 +133,7 @@ class Http1ConnPoolImplTest : public testing::Test {

NiceMock<Event::MockDispatcher> dispatcher_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_{new NiceMock<Upstream::MockClusterInfo>()};
NiceMock<Event::MockTimer>* upstream_ready_timer_;
ConnPoolImplForTest conn_pool_;
NiceMock<Runtime::MockLoader> runtime_;
};
Expand Down Expand Up @@ -437,6 +453,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

// Finishing request 1 will immediately bind to request 2.
conn_pool_.expectEnableUpstreamReady();
EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks2.pool_ready_, ready());
Expand All @@ -445,6 +462,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

conn_pool_.expectAndRunUpstreamReady();
callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
response_headers.reset(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);
Expand All @@ -455,6 +473,67 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test when upstream closes connection without 'connection: close' like
* https://github.com/envoyproxy/envoy/pull/2715
*/
TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) {
InSequence s;

// Request 1 should kick off a new connection.
NiceMock<Http::MockStreamDecoder> outer_decoder1;
ConnPoolCallbacks callbacks;
conn_pool_.expectClientCreate();
Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder1, callbacks);

EXPECT_NE(nullptr, handle);

// Request 2 should not kick off a new connection.
NiceMock<Http::MockStreamDecoder> outer_decoder2;
ConnPoolCallbacks callbacks2;
handle = conn_pool_.newStream(outer_decoder2, callbacks2);
EXPECT_EQ(1U, cluster_->stats_.upstream_cx_overflow_.value());

EXPECT_NE(nullptr, handle);

// Connect event will bind to request 1.
NiceMock<Http::MockStreamEncoder> request_encoder;
Http::StreamDecoder* inner_decoder;
EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks.pool_ready_, ready());

conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

// Finishing request 1 will schedule binding the connection to request 2.
conn_pool_.expectEnableUpstreamReady();

callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

// Cause the connection to go away.
conn_pool_.expectClientCreate();
EXPECT_CALL(conn_pool_, onClientDestroy());
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

conn_pool_.expectAndRunUpstreamReady();

EXPECT_CALL(*conn_pool_.test_clients_[1].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks2.pool_ready_, ready());
conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::Connected);

callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
response_headers.reset(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

EXPECT_CALL(conn_pool_, onClientDestroy());
conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();
}

/**
* Test when upstream sends us 'connection: close'
*/
Expand Down Expand Up @@ -537,8 +616,11 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) {
ActiveTestRequest r3(*this, 0, ActiveTestRequest::Type::Pending);

// Finish r1, which gets r3 going.
conn_pool_.expectEnableUpstreamReady();
r3.expectNewStream();

r1.completeResponse(false);
conn_pool_.expectAndRunUpstreamReady();
r3.startRequest();

r2.completeResponse(false);
Expand Down
51 changes: 51 additions & 0 deletions test/integration/http_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,57 @@ void HttpIntegrationTest::testIdleTimeoutWithTwoRequests() {
test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_idle_timeout", 1);
}

void HttpIntegrationTest::testUpstreamDisconnectWithTwoRequests() {
initialize();
fake_upstreams_[0]->set_allow_unexpected_disconnects(true);

codec_client_ = makeHttpConnection(lookupPort("http"));

// Request 1.
codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "GET"},
{":path", "/test/long/url"},
{":scheme", "http"},
{":authority", "host"}},
1024, *response_);
waitForNextUpstreamRequest();

// Request 2.
IntegrationStreamDecoderPtr response2{new IntegrationStreamDecoder(*dispatcher_)};
IntegrationCodecClientPtr codec_client2 = makeHttpConnection(lookupPort("http"));
codec_client2->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "GET"},
{":path", "/test/long/url"},
{":scheme", "http"},
{":authority", "host"}},
512, *response2);

// Response 1.
upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(512, true);
fake_upstream_connection_->close();
response_->waitForEndStream();

EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(response_->complete());
EXPECT_STREQ("200", response_->headers().Status()->value().c_str());
test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", 1);
test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 1);

// Response 2.
fake_upstream_connection_->waitForDisconnect();
fake_upstream_connection_.reset();
waitForNextUpstreamRequest();
upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(1024, true);
response2->waitForEndStream();
codec_client2->close();

EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(response2->complete());
EXPECT_STREQ("200", response2->headers().Status()->value().c_str());
test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", 2);
test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 2);
}

void HttpIntegrationTest::testTwoRequests() {
initialize();

Expand Down
1 change: 1 addition & 0 deletions test/integration/http_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class HttpIntegrationTest : public BaseIntegrationTest {
void testIdleTimeoutBasic();
void testIdleTimeoutWithTwoRequests();
void testIdleTimerDisabled();
void testUpstreamDisconnectWithTwoRequests();
// HTTP/1 tests
void testBadFirstline();
void testMissingDelimiter();
Expand Down
4 changes: 4 additions & 0 deletions test/integration/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ TEST_P(IntegrationTest, EnvoyProxyingLate100ContinueWithEncoderFilter) {

TEST_P(IntegrationTest, TwoRequests) { testTwoRequests(); }

TEST_P(IntegrationTest, UpstreamDisconnectWithTwoRequests) {
testUpstreamDisconnectWithTwoRequests();
}

TEST_P(IntegrationTest, RetryHittingBufferLimit) { testRetryHittingBufferLimit(); }

TEST_P(IntegrationTest, HittingDecoderFilterLimit) { testHittingDecoderFilterLimit(); }
Expand Down

0 comments on commit ee6f148

Please sign in to comment.