From 512610fa37d5f70ec60feaeb1be6503f905390d3 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Thu, 24 Dec 2020 04:16:11 -0500 Subject: [PATCH 01/16] test: FakeUpstream threading fixes Signed-off-by: Antonio Vicente --- .../network/thrift_proxy/integration_test.cc | 3 +- test/integration/fake_upstream.cc | 134 ++++++++++++++---- test/integration/fake_upstream.h | 21 ++- .../sds_dynamic_integration_test.cc | 4 +- 4 files changed, 113 insertions(+), 49 deletions(-) diff --git a/test/extensions/filters/network/thrift_proxy/integration_test.cc b/test/extensions/filters/network/thrift_proxy/integration_test.cc index 16dddb2b3163..5fa3881d795a 100644 --- a/test/extensions/filters/network/thrift_proxy/integration_test.cc +++ b/test/extensions/filters/network/thrift_proxy/integration_test.cc @@ -413,8 +413,7 @@ TEST_P(ThriftConnManagerIntegrationTest, OnewayEarlyClosePartialRequest) { ASSERT_TRUE(tcp_client->write(partial_request)); tcp_client->close(); - FakeRawConnectionPtr fake_upstream_connection; - ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection)); + ASSERT_TRUE(expected_upstream->waitForAndConsumeDisconnectedConnection()); test_server_->waitForCounterGe("thrift.thrift_stats.cx_destroy_remote_with_active_rq", 1); diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 06234a2a7c78..9acf99743340 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -20,6 +20,7 @@ #include "test/test_common/utility.h" #include "absl/strings/str_cat.h" +#include "absl/synchronization/notification.h" using namespace std::chrono_literals; @@ -354,14 +355,24 @@ AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { } AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) { - return shared_connection_.executeOnDispatcher( - [disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); + if (shared_connection_.connection().dispatcher().isThreadSafe()) { + shared_connection_.connection().readDisable(disable); + return AssertionSuccess(); + } else { + return shared_connection_.executeOnDispatcher( + [disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); + } } AssertionResult FakeConnectionBase::enableHalfClose(bool enable, std::chrono::milliseconds timeout) { - return shared_connection_.executeOnDispatcher( - [enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); + if (shared_connection_.connection().dispatcher().isThreadSafe()) { + shared_connection_.connection().enableHalfClose(enable); + return AssertionSuccess(); + } else { + return shared_connection_.executeOnDispatcher( + [enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); + } } Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) { @@ -552,16 +563,28 @@ AssertionResult FakeUpstream::waitForHttpConnection( client_dispatcher, timeout)) { return AssertionFailure() << "Timed out waiting for new connection."; } - - connection = std::make_unique( - *this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, - max_request_headers_count, headers_with_underscores_action); } - VERIFY_ASSERTION(connection->initialize()); - if (read_disable_on_new_connection_) { - VERIFY_ASSERTION(connection->readDisable(false)); - } - return AssertionSuccess(); + + AssertionResult result = AssertionSuccess(); + absl::Notification done; + ASSERT(!dispatcher_->isThreadSafe()); + dispatcher_->post([&]() { + { + absl::MutexLock lock(&lock_); + connection = std::make_unique( + *this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, + max_request_headers_count, headers_with_underscores_action); + result = connection->initialize(); + if (result == AssertionSuccess()) { + if (read_disable_on_new_connection_) { + result = connection->readDisable(false); + } + } + } + done.Notify(); + }); + done.WaitForNotification(); + return result; } AssertionResult @@ -585,14 +608,27 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, client_dispatcher, 5ms)) { continue; } - connection = std::make_unique( - upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), - Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, - envoy::config::core::v3::HttpProtocolOptions::ALLOW); } - VERIFY_ASSERTION(connection->initialize()); - VERIFY_ASSERTION(connection->readDisable(false)); - return AssertionSuccess(); + + AssertionResult result = AssertionSuccess(); + absl::Notification done; + ASSERT(!upstream.dispatcher_->isThreadSafe()); + upstream.dispatcher_->post([&]() { + { + absl::MutexLock lock(&upstream.lock_); + connection = std::make_unique( + upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), + Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, + envoy::config::core::v3::HttpProtocolOptions::ALLOW); + result = connection->initialize(); + if (result == AssertionSuccess()) { + result = connection->readDisable(false); + } + } + done.Notify(); + }); + done.WaitForNotification(); + return result; } } return AssertionFailure() << "Timed out waiting for HTTP connection."; @@ -610,11 +646,42 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { return AssertionFailure() << "Timed out waiting for raw connection"; } - connection = std::make_unique(consumeConnection(), timeSystem()); } - VERIFY_ASSERTION(connection->initialize()); - VERIFY_ASSERTION(connection->readDisable(false)); - VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); + + AssertionResult result = AssertionSuccess(); + absl::Notification done; + ASSERT(!dispatcher_->isThreadSafe()); + dispatcher_->post([&]() { + { + absl::MutexLock lock(&lock_); + auto local_connection = + std::make_unique(consumeConnection(), timeSystem()); + result = local_connection->initialize(); + if (result == AssertionSuccess()) { + result = local_connection->readDisable(false); + if (result == AssertionSuccess()) { + result = local_connection->enableHalfClose(enable_half_close_); + } + } + connection = std::move(local_connection); + } + done.Notify(); + }); + done.WaitForNotification(); + return result; +} + +testing::AssertionResult +FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) { + absl::MutexLock lock(&lock_); + const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { + return !new_connections_.empty() && !new_connections_.front()->connected(); + }; + + if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { + return AssertionFailure() << "Timed out waiting for raw connection"; + } + consumeConnection(); return AssertionSuccess(); } @@ -683,14 +750,19 @@ FakeRawConnection::~FakeRawConnection() { } testing::AssertionResult FakeRawConnection::initialize() { - auto filter = Network::ReadFilterSharedPtr{new ReadFilter(*this)}; + ASSERT(shared_connection_.connection().dispatcher().isThreadSafe()); + Network::ReadFilterSharedPtr filter{new ReadFilter(*this)}; read_filter_ = filter; - testing::AssertionResult result = shared_connection_.executeOnDispatcher( - [filter = std::move(filter)](Network::Connection& connection) { - connection.addReadFilter(filter); - }); - if (!result) { - return result; + if (shared_connection_.connection().dispatcher().isThreadSafe()) { + shared_connection_.connection().addReadFilter(filter); + } else { + testing::AssertionResult result = shared_connection_.executeOnDispatcher( + [filter = std::move(filter)](Network::Connection& connection) { + connection.addReadFilter(filter); + }); + if (!result) { + return result; + } } return FakeConnectionBase::initialize(); } diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index d796d27bcff2..31a87feae1a8 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -257,17 +257,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, connection_.addConnectionCallbacks(*this); } - Common::CallbackHandle* addDisconnectCallback(DisconnectCallback callback) { - absl::MutexLock lock(&lock_); - return disconnect_callback_manager_.add(callback); - } - - // Avoid directly removing by caller, since CallbackManager is not thread safe. - void removeDisconnectCallback(Common::CallbackHandle* handle) { - absl::MutexLock lock(&lock_); - handle->remove(); - } - // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { // Throughout this entire function, we know that the connection_ cannot disappear, since this @@ -278,7 +267,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { disconnected_ = true; - disconnect_callback_manager_.runCallbacks(); } } @@ -315,6 +303,8 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, if (disconnected_) { return testing::AssertionSuccess(); } + ASSERT(!connection_.dispatcher().isThreadSafe(), + "deadlock: executeOnDispatcher called from dispatcher thread."); bool callback_ready_event = false; bool unexpected_disconnect = false; connection_.dispatcher().post( @@ -345,13 +335,13 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, void setParented() { absl::MutexLock lock(&lock_); + ASSERT(!parented_); parented_ = true; } private: Network::Connection& connection_; absl::Mutex lock_; - Common::CallbackManager<> disconnect_callback_manager_ ABSL_GUARDED_BY(lock_); bool parented_ ABSL_GUARDED_BY(lock_){}; bool disconnected_ ABSL_GUARDED_BY(lock_){}; }; @@ -579,6 +569,11 @@ class FakeUpstream : Logger::Loggable, testing::AssertionResult waitForRawConnection(FakeRawConnectionPtr& connection, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + + ABSL_MUST_USE_RESULT + testing::AssertionResult waitForAndConsumeDisconnectedConnection( + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + Network::Address::InstanceConstSharedPtr localAddress() const { return socket_->localAddress(); } // Wait for one of the upstreams to receive a connection diff --git a/test/integration/sds_dynamic_integration_test.cc b/test/integration/sds_dynamic_integration_test.cc index 6023882ab9d6..13127cb3d58b 100644 --- a/test/integration/sds_dynamic_integration_test.cc +++ b/test/integration/sds_dynamic_integration_test.cc @@ -654,9 +654,7 @@ TEST_P(SdsDynamicUpstreamIntegrationTest, WrongSecretFirst) { EXPECT_EQ("503", response->headers().getStatusValue()); // To flush out the reset connection from the first request in upstream. - FakeRawConnectionPtr fake_upstream_connection; - ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); + ASSERT_TRUE(fake_upstreams_[0]->waitForAndConsumeDisconnectedConnection()); // Failure EXPECT_EQ(0, test_server_->counter("sds.client_cert.update_success")->value()); From 19a63f7d03467496983557585e877c2ec9fb01b6 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Tue, 29 Dec 2020 14:58:27 -0500 Subject: [PATCH 02/16] address review comments Signed-off-by: Antonio Vicente --- test/integration/fake_upstream.cc | 97 +++++++++++++------------------ test/integration/fake_upstream.h | 1 + 2 files changed, 42 insertions(+), 56 deletions(-) diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 9acf99743340..42aa8a052c8a 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -355,6 +355,8 @@ AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { } AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) { + // Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called + // from outside the dispatcher thread. if (shared_connection_.connection().dispatcher().isThreadSafe()) { shared_connection_.connection().readDisable(disable); return AssertionSuccess(); @@ -366,6 +368,8 @@ AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milli AssertionResult FakeConnectionBase::enableHalfClose(bool enable, std::chrono::milliseconds timeout) { + // Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called + // from outside the dispatcher thread. if (shared_connection_.connection().dispatcher().isThreadSafe()) { shared_connection_.connection().enableHalfClose(enable); return AssertionSuccess(); @@ -565,26 +569,17 @@ AssertionResult FakeUpstream::waitForHttpConnection( } } - AssertionResult result = AssertionSuccess(); - absl::Notification done; - ASSERT(!dispatcher_->isThreadSafe()); - dispatcher_->post([&]() { - { - absl::MutexLock lock(&lock_); - connection = std::make_unique( - *this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, - max_request_headers_count, headers_with_underscores_action); - result = connection->initialize(); - if (result == AssertionSuccess()) { - if (read_disable_on_new_connection_) { - result = connection->readDisable(false); - } - } + return runOnDispatcherThreadAndWait([&]() { + absl::MutexLock lock(&lock_); + connection = std::make_unique( + *this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, + max_request_headers_count, headers_with_underscores_action); + VERIFY_ASSERTION(connection->initialize()); + if (read_disable_on_new_connection_) { + VERIFY_ASSERTION(connection->readDisable(false)); } - done.Notify(); + return AssertionSuccess(); }); - done.WaitForNotification(); - return result; } AssertionResult @@ -610,25 +605,16 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, } } - AssertionResult result = AssertionSuccess(); - absl::Notification done; - ASSERT(!upstream.dispatcher_->isThreadSafe()); - upstream.dispatcher_->post([&]() { - { - absl::MutexLock lock(&upstream.lock_); - connection = std::make_unique( - upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), - Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, - envoy::config::core::v3::HttpProtocolOptions::ALLOW); - result = connection->initialize(); - if (result == AssertionSuccess()) { - result = connection->readDisable(false); - } - } - done.Notify(); + return upstream.runOnDispatcherThreadAndWait([&]() { + absl::MutexLock lock(&upstream.lock_); + connection = std::make_unique( + upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), + Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, + envoy::config::core::v3::HttpProtocolOptions::ALLOW); + VERIFY_ASSERTION(connection->initialize()); + VERIFY_ASSERTION(connection->readDisable(false)); + return AssertionSuccess(); }); - done.WaitForNotification(); - return result; } } return AssertionFailure() << "Timed out waiting for HTTP connection."; @@ -648,27 +634,14 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect } } - AssertionResult result = AssertionSuccess(); - absl::Notification done; - ASSERT(!dispatcher_->isThreadSafe()); - dispatcher_->post([&]() { - { - absl::MutexLock lock(&lock_); - auto local_connection = - std::make_unique(consumeConnection(), timeSystem()); - result = local_connection->initialize(); - if (result == AssertionSuccess()) { - result = local_connection->readDisable(false); - if (result == AssertionSuccess()) { - result = local_connection->enableHalfClose(enable_half_close_); - } - } - connection = std::move(local_connection); - } - done.Notify(); + return runOnDispatcherThreadAndWait([&]() { + absl::MutexLock lock(&lock_); + connection = std::make_unique(consumeConnection(), timeSystem()); + VERIFY_ASSERTION(connection->initialize()); + VERIFY_ASSERTION(connection->readDisable(false)); + VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); + return AssertionSuccess(); }); - done.WaitForNotification(); - return result; } testing::AssertionResult @@ -714,6 +687,18 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { received_datagrams_.emplace_back(std::move(data)); } +AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function cb) { + AssertionResult result = AssertionSuccess(); + absl::Notification done; + ASSERT(!dispatcher_->isThreadSafe()); + dispatcher_->post([&]() { + result = cb(); + done.Notify(); + }); + done.WaitForNotification(); + return result; +} + void FakeUpstream::sendUdpDatagram(const std::string& buffer, const Network::Address::InstanceConstSharedPtr& peer) { dispatcher_->post([this, buffer, peer] { diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 31a87feae1a8..ab596ddf22ce 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -729,6 +729,7 @@ class FakeUpstream : Logger::Loggable, void threadRoutine(); SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_); void onRecvDatagram(Network::UdpRecvData& data); + AssertionResult runOnDispatcherThreadAndWait(std::function cb); Network::SocketSharedPtr socket_; Network::ListenSocketFactorySharedPtr socket_factory_; From 5f767cff096b86d977baa712d1115756f20d3d28 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Tue, 29 Dec 2020 16:35:07 -0500 Subject: [PATCH 03/16] fix flakiness in hds_integration_test Signed-off-by: Antonio Vicente --- test/integration/hds_integration_test.cc | 26 ++++++++++-------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index c473226d73f1..9d857629c11e 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -438,11 +438,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutHttp) { hds_stream_->sendGrpcMessage(server_health_check_specifier_); test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); - // Envoy sends a health check message to an endpoint - ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); - - // Endpoint doesn't respond to the health check - ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); + // Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the + // health check + ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection()); // Receive updates until the one we expect arrives waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT); @@ -515,11 +513,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutTcp) { hds_stream_->sendGrpcMessage(server_health_check_specifier_); test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); - // Envoys asks the endpoint if it's healthy - ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); - - // No response from the endpoint - ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); + // Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the + // health check + ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection()); // Receive updates until the one we expect arrives waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT); @@ -1031,6 +1027,8 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTlsMissingSocketMatch) { tls_hosts_ = true; initialize(); + // Allow the fake upstreams to detect an error and disconnect during the TLS handshake. + host_upstream_->setReadDisableOnNewConnection(false); // Server <--> Envoy waitForHdsStream(); @@ -1046,11 +1044,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTlsMissingSocketMatch) { hds_stream_->sendGrpcMessage(server_health_check_specifier_); test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); - // Envoy sends a health check message to an endpoint - ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); - - // Endpoint doesn't respond to the health check - ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); + // Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the + // health check + ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection()); // Receive updates until the one we expect arrives. This should be UNHEALTHY and not TIMEOUT, // because TIMEOUT occurs in the situation where there is no response from the endpoint. In this From 45b88648f4599c552a9f8828ed72df51e5724edf Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Wed, 6 Jan 2021 15:30:29 -0500 Subject: [PATCH 04/16] Fix wait for disconnect. Previously disconnect condition happened under a lock that we were not waiting on. Signed-off-by: Antonio Vicente --- test/integration/fake_upstream.cc | 6 +++--- test/integration/fake_upstream.h | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 42aa8a052c8a..9f0835dc01df 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -648,14 +648,14 @@ testing::AssertionResult FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) { absl::MutexLock lock(&lock_); const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { - return !new_connections_.empty() && !new_connections_.front()->connected(); + return !new_connections_.empty(); }; if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { return AssertionFailure() << "Timed out waiting for raw connection"; } - consumeConnection(); - return AssertionSuccess(); + auto& connection = consumeConnection(); + return connection.waitForDisconnect(timeout); } SharedConnectionWrapper& FakeUpstream::consumeConnection() { diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index ab596ddf22ce..264c574997bd 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -285,6 +285,17 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, return !disconnected_; } + ABSL_MUST_USE_RESULT + testing::AssertionResult waitForDisconnect(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { + absl::MutexLock lock(&lock_); + Event::TestTimeSystem& time_system = + dynamic_cast(connection_.dispatcher().timeSource()); + if (!time_system.waitFor(lock_, absl::Condition(&disconnected_), timeout)) { + return AssertionFailure() << "Timed out waiting for disconnect"; + } + return AssertionSuccess(); + } + // This provides direct access to the underlying connection, but only to const methods. // Stateful connection related methods should happen on the connection's dispatcher via // executeOnDispatcher. From 3b653f273432d2cfe37d4c194ea6fa4606aefa36 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Wed, 6 Jan 2021 19:36:03 -0500 Subject: [PATCH 05/16] fix use after free due to connection possibly being deleted. Signed-off-by: Antonio Vicente --- test/integration/fake_upstream.cc | 3 ++- test/integration/fake_upstream.h | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 9f0835dc01df..d47aa4ed9237 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -654,8 +654,9 @@ FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { return AssertionFailure() << "Timed out waiting for raw connection"; } + auto& connection = consumeConnection(); - return connection.waitForDisconnect(timeout); + return connection.waitForDisconnect(time_system_, timeout); } SharedConnectionWrapper& FakeUpstream::consumeConnection() { diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 264c574997bd..de9afed49fa8 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -286,10 +286,10 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, } ABSL_MUST_USE_RESULT - testing::AssertionResult waitForDisconnect(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { + testing::AssertionResult + waitForDisconnect(Event::TestTimeSystem& time_system, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { absl::MutexLock lock(&lock_); - Event::TestTimeSystem& time_system = - dynamic_cast(connection_.dispatcher().timeSource()); if (!time_system.waitFor(lock_, absl::Condition(&disconnected_), timeout)) { return AssertionFailure() << "Timed out waiting for disconnect"; } From ee052d6c4779849ee8751c8bd280150133c79ed7 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Wed, 6 Jan 2021 20:03:55 -0500 Subject: [PATCH 06/16] fix more use-after-free Signed-off-by: Antonio Vicente --- test/integration/fake_upstream.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index d47aa4ed9237..f18e339e8486 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -736,9 +736,13 @@ FakeRawConnection::~FakeRawConnection() { } testing::AssertionResult FakeRawConnection::initialize() { - ASSERT(shared_connection_.connection().dispatcher().isThreadSafe()); Network::ReadFilterSharedPtr filter{new ReadFilter(*this)}; read_filter_ = filter; + if (!shared_connection_.connected()) { + VERIFY_ASSERTION(FakeConnectionBase::initialize()); + return AssertionFailure() << "initialize failed, connection is disconnected."; + } + ASSERT(shared_connection_.connection().dispatcher().isThreadSafe()); if (shared_connection_.connection().dispatcher().isThreadSafe()) { shared_connection_.connection().addReadFilter(filter); } else { From e9f45971142dd0692ca537161f8f06d32d7e6fbd Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Thu, 7 Jan 2021 02:27:03 -0500 Subject: [PATCH 07/16] fix flaky test Signed-off-by: Antonio Vicente --- .../filters/network/thrift_proxy/integration_test.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/extensions/filters/network/thrift_proxy/integration_test.cc b/test/extensions/filters/network/thrift_proxy/integration_test.cc index 5fa3881d795a..8047d5dd8775 100644 --- a/test/extensions/filters/network/thrift_proxy/integration_test.cc +++ b/test/extensions/filters/network/thrift_proxy/integration_test.cc @@ -290,15 +290,10 @@ TEST_P(ThriftConnManagerIntegrationTest, EarlyClose) { const std::string partial_request = request_bytes_.toString().substr(0, request_bytes_.length() - 5); - FakeUpstream* expected_upstream = getExpectedUpstream(false); - IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0")); ASSERT_TRUE(tcp_client->write(partial_request)); tcp_client->close(); - FakeRawConnectionPtr fake_upstream_connection; - ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection)); - test_server_->waitForCounterGe("thrift.thrift_stats.cx_destroy_remote_with_active_rq", 1); Stats::CounterSharedPtr counter = From 495b19e0ebe87e70bb5307423bfae831e0ca24f9 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Fri, 8 Jan 2021 20:03:31 -0500 Subject: [PATCH 08/16] add logging to debug macos failure Signed-off-by: Antonio Vicente --- test/integration/fake_upstream.cc | 1 + test/integration/fake_upstream.h | 1 + 2 files changed, 2 insertions(+) diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index f3396ab3c8bf..146a1d47aa47 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -647,6 +647,7 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect testing::AssertionResult FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) { absl::MutexLock lock(&lock_); + ENVOY_LOG_MISC(critical, "waitForAndConsumeDisconnectedConnection"); const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_connections_.empty(); }; diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index a9281bd63ad5..a5761639f39f 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -291,6 +291,7 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { absl::MutexLock lock(&lock_); if (!time_system.waitFor(lock_, absl::Condition(&disconnected_), timeout)) { + ENVOY_LOG_MISC(critical, "timeout"); return AssertionFailure() << "Timed out waiting for disconnect"; } return AssertionSuccess(); From 6580555bdf4e0c731007ec7abba1efa19222c25c Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Mon, 11 Jan 2021 15:00:03 -0500 Subject: [PATCH 09/16] Wait outside the upstream lock since holding that lock is not needed. Let's see if this fixes the macos timeouts Signed-off-by: Antonio Vicente --- test/integration/fake_upstream.cc | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 146a1d47aa47..243d752675a8 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -646,18 +646,21 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect testing::AssertionResult FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) { - absl::MutexLock lock(&lock_); - ENVOY_LOG_MISC(critical, "waitForAndConsumeDisconnectedConnection"); - const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { - return !new_connections_.empty(); - }; + SharedConnectionWrapper* connection; + { + absl::MutexLock lock(&lock_); + ENVOY_LOG_MISC(critical, "waitForAndConsumeDisconnectedConnection"); + const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { + return !new_connections_.empty(); + }; - if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { - return AssertionFailure() << "Timed out waiting for raw connection"; - } + if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { + return AssertionFailure() << "Timed out waiting for raw connection"; + } - auto& connection = consumeConnection(); - return connection.waitForDisconnect(time_system_, timeout); + connection = &consumeConnection(); + } + return connection->waitForDisconnect(time_system_, timeout); } SharedConnectionWrapper& FakeUpstream::consumeConnection() { From f90610f13ccd0441ceed300dc57bf623cd0eb599 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Mon, 11 Jan 2021 18:24:52 -0500 Subject: [PATCH 10/16] Symbolize names of VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS test cases. Signed-off-by: Antonio Vicente --- test/common/grpc/grpc_client_integration.h | 4 +--- test/integration/hds_integration_test.cc | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/test/common/grpc/grpc_client_integration.h b/test/common/grpc/grpc_client_integration.h index 13683282d37e..80321fc83592 100644 --- a/test/common/grpc/grpc_client_integration.h +++ b/test/common/grpc/grpc_client_integration.h @@ -69,9 +69,7 @@ class VersionedGrpcClientIntegrationParamTest return fmt::format("{}_{}_{}", std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6", std::get<1>(p.param) == ClientType::GoogleGrpc ? "GoogleGrpc" : "EnvoyGrpc", - std::get<2>(p.param) == envoy::config::core::v3::ApiVersion::V3 - ? "V3" - : envoy::config::core::v3::ApiVersion::V2 ? "V2" : "AUTO"); + ApiVersion_Name(std::get<2>(p.param))); } Network::Address::IpVersion ipVersion() const override { return std::get<0>(GetParam()); } ClientType clientType() const override { return std::get<1>(GetParam()); } diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index 9d857629c11e..c5231aeb8e22 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -374,7 +374,8 @@ class HdsIntegrationTest : public Grpc::VersionedGrpcClientIntegrationParamTest, }; INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, HdsIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Tests Envoy HTTP health checking a single healthy endpoint and reporting that it is // indeed healthy to the server. From 22211c98d9696c3e970fa4c2b9d92ed0dfb1f628 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Mon, 11 Jan 2021 18:52:32 -0500 Subject: [PATCH 11/16] also stringify arguments to other VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS tests. Signed-off-by: Antonio Vicente --- .../grpc/http_grpc_access_log_integration_test.cc | 3 ++- .../grpc/tcp_grpc_access_log_integration_test.cc | 3 ++- .../http/ext_authz/ext_authz_integration_test.cc | 3 ++- .../http/ratelimit/ratelimit_integration_test.cc | 12 ++++++++---- .../metrics_service_integration_test.cc | 3 ++- test/integration/load_stats_integration_test.cc | 3 ++- 6 files changed, 18 insertions(+), 9 deletions(-) diff --git a/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc b/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc index 590a3d280e51..f2b5c2559933 100644 --- a/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc +++ b/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc @@ -126,7 +126,8 @@ class AccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrationPara }; INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, AccessLogIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Test a basic full access logging flow. TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) { diff --git a/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc b/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc index e8516e0f6d3a..51fc54f02d78 100644 --- a/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc +++ b/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc @@ -126,7 +126,8 @@ class TcpGrpcAccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrat }; INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, TcpGrpcAccessLogIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Test a basic full access logging flow. TEST_P(TcpGrpcAccessLogIntegrationTest, BasicAccessLogFlow) { diff --git a/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc b/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc index c3ec137d8360..13e8c40b378b 100644 --- a/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc +++ b/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc @@ -560,7 +560,8 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest, }; INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, ExtAuthzGrpcIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Verifies that the request body is included in the CheckRequest when the downstream protocol is // HTTP/1.1. diff --git a/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc b/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc index b02137388102..1a8e83aa5740 100644 --- a/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc +++ b/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc @@ -231,14 +231,18 @@ class RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest }; INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFailureModeIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFilterHeadersEnabledIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); TEST_P(RatelimitIntegrationTest, Ok) { XDS_DEPRECATED_FEATURE_TEST_SKIP; diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index 0bf4f9cada60..e325e04d5037 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -150,7 +150,8 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio }; INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, MetricsServiceIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Test a basic metric service flow. TEST_P(MetricsServiceIntegrationTest, BasicFlow) { diff --git a/test/integration/load_stats_integration_test.cc b/test/integration/load_stats_integration_test.cc index d47682397192..bd2ad88ed00d 100644 --- a/test/integration/load_stats_integration_test.cc +++ b/test/integration/load_stats_integration_test.cc @@ -390,7 +390,8 @@ class LoadStatsIntegrationTest : public Grpc::VersionedGrpcClientIntegrationPara }; INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, LoadStatsIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Validate the load reports for successful requests as cluster membership // changes. From 0d4fc3d45c8f8d45d02ec31a8925b82f0e1f8cf6 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Wed, 13 Jan 2021 18:43:56 -0500 Subject: [PATCH 12/16] Fix issue of timeouts waiting for disconnect on macos while also avoiding use-after-free risks and reducing test API sharp edges Signed-off-by: Antonio Vicente --- .../network/thrift_proxy/integration_test.cc | 8 +++++- test/integration/fake_upstream.cc | 26 ++++++++++++++----- test/integration/hds_integration_test.cc | 16 +++++++----- .../sds_dynamic_integration_test.cc | 2 ++ 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/test/extensions/filters/network/thrift_proxy/integration_test.cc b/test/extensions/filters/network/thrift_proxy/integration_test.cc index 8047d5dd8775..16dddb2b3163 100644 --- a/test/extensions/filters/network/thrift_proxy/integration_test.cc +++ b/test/extensions/filters/network/thrift_proxy/integration_test.cc @@ -290,10 +290,15 @@ TEST_P(ThriftConnManagerIntegrationTest, EarlyClose) { const std::string partial_request = request_bytes_.toString().substr(0, request_bytes_.length() - 5); + FakeUpstream* expected_upstream = getExpectedUpstream(false); + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0")); ASSERT_TRUE(tcp_client->write(partial_request)); tcp_client->close(); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection)); + test_server_->waitForCounterGe("thrift.thrift_stats.cx_destroy_remote_with_active_rq", 1); Stats::CounterSharedPtr counter = @@ -408,7 +413,8 @@ TEST_P(ThriftConnManagerIntegrationTest, OnewayEarlyClosePartialRequest) { ASSERT_TRUE(tcp_client->write(partial_request)); tcp_client->close(); - ASSERT_TRUE(expected_upstream->waitForAndConsumeDisconnectedConnection()); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection)); test_server_->waitForCounterGe("thrift.thrift_stats.cx_destroy_remote_with_active_rq", 1); diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 243d752675a8..e32488d837ff 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -528,6 +528,9 @@ bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection, const std::vector&) { absl::MutexLock lock(&lock_); if (read_disable_on_new_connection_) { + // Disable early close detection to avoid closing the network connection before full + // initialization is complete. + connection.detectEarlyCloseWhenReadDisabled(false); connection.readDisable(true); } auto connection_wrapper = std::make_unique(connection); @@ -575,9 +578,6 @@ AssertionResult FakeUpstream::waitForHttpConnection( *this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, max_request_headers_count, headers_with_underscores_action); VERIFY_ASSERTION(connection->initialize()); - if (read_disable_on_new_connection_) { - VERIFY_ASSERTION(connection->readDisable(false)); - } return AssertionSuccess(); }); } @@ -612,7 +612,6 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, envoy::config::core::v3::HttpProtocolOptions::ALLOW); VERIFY_ASSERTION(connection->initialize()); - VERIFY_ASSERTION(connection->readDisable(false)); return AssertionSuccess(); }); } @@ -638,7 +637,6 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect absl::MutexLock lock(&lock_); connection = std::make_unique(consumeConnection(), timeSystem()); VERIFY_ASSERTION(connection->initialize()); - VERIFY_ASSERTION(connection->readDisable(false)); VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); return AssertionSuccess(); }); @@ -646,6 +644,7 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect testing::AssertionResult FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) { + ASSERT(!read_disable_on_new_connection_); SharedConnectionWrapper* connection; { absl::MutexLock lock(&lock_); @@ -657,17 +656,32 @@ FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { return AssertionFailure() << "Timed out waiting for raw connection"; } + } + VERIFY_ASSERTION(runOnDispatcherThreadAndWait([&]() { + absl::MutexLock lock(&lock_); connection = &consumeConnection(); - } + return AssertionSuccess(); + })); + return connection->waitForDisconnect(time_system_, timeout); } SharedConnectionWrapper& FakeUpstream::consumeConnection() { ASSERT(!new_connections_.empty()); auto* const connection_wrapper = new_connections_.front().get(); + // Skip the thread safety check if the network connection has already been freed since there's no + // alternate way to get access to the dispatcher. + ASSERT(!connection_wrapper->connected() || + connection_wrapper->connection().dispatcher().isThreadSafe()); connection_wrapper->setParented(); connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_); + if (read_disable_on_new_connection_) { + // Re-enable read and early close detection. + auto& connection = connection_wrapper->connection(); + connection.detectEarlyCloseWhenReadDisabled(true); + connection.readDisable(false); + } return *connection_wrapper; } diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index c5231aeb8e22..0ba63baad5bc 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -439,9 +439,11 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutHttp) { hds_stream_->sendGrpcMessage(server_health_check_specifier_); test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); - // Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the - // health check - ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection()); + // Envoy sends a health check message to an endpoint + ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); + + // Endpoint doesn't respond to the health check + ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); // Receive updates until the one we expect arrives waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT); @@ -514,9 +516,11 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutTcp) { hds_stream_->sendGrpcMessage(server_health_check_specifier_); test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); - // Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the - // health check - ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection()); + // Envoys asks the endpoint if it's healthy + ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); + + // No response from the endpoint + ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); // Receive updates until the one we expect arrives waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT); diff --git a/test/integration/sds_dynamic_integration_test.cc b/test/integration/sds_dynamic_integration_test.cc index 13127cb3d58b..f55a974694b5 100644 --- a/test/integration/sds_dynamic_integration_test.cc +++ b/test/integration/sds_dynamic_integration_test.cc @@ -646,6 +646,8 @@ TEST_P(SdsDynamicUpstreamIntegrationTest, WrongSecretFirst) { sendSdsResponse(getWrongSecret(client_cert_)); }; initialize(); + // Allow the fake upstreams to detect an error and disconnect during the TLS handshake. + fake_upstreams_[0]->setReadDisableOnNewConnection(false); // Make a simple request, should get 503 BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest( From d0565e567a34efeb821cd12eaa67adadb1fcadf5 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Thu, 14 Jan 2021 20:26:02 -0500 Subject: [PATCH 13/16] address review comments Signed-off-by: Antonio Vicente --- test/integration/fake_upstream.cc | 59 ++++++++----------------------- test/integration/fake_upstream.h | 10 +++--- 2 files changed, 20 insertions(+), 49 deletions(-) diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index e32488d837ff..c0e32cb55967 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -355,28 +355,8 @@ AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { } AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) { - // Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called - // from outside the dispatcher thread. - if (shared_connection_.connection().dispatcher().isThreadSafe()) { - shared_connection_.connection().readDisable(disable); - return AssertionSuccess(); - } else { - return shared_connection_.executeOnDispatcher( - [disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); - } -} - -AssertionResult FakeConnectionBase::enableHalfClose(bool enable, - std::chrono::milliseconds timeout) { - // Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called - // from outside the dispatcher thread. - if (shared_connection_.connection().dispatcher().isThreadSafe()) { - shared_connection_.connection().enableHalfClose(enable); - return AssertionSuccess(); - } else { - return shared_connection_.executeOnDispatcher( - [enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); - } + return shared_connection_.executeOnDispatcher( + [disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); } Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) { @@ -637,7 +617,7 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect absl::MutexLock lock(&lock_); connection = std::make_unique(consumeConnection(), timeSystem()); VERIFY_ASSERTION(connection->initialize()); - VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); + connection->connection().enableHalfClose(enable_half_close_); return AssertionSuccess(); }); } @@ -648,7 +628,6 @@ FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds SharedConnectionWrapper* connection; { absl::MutexLock lock(&lock_); - ENVOY_LOG_MISC(critical, "waitForAndConsumeDisconnectedConnection"); const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_connections_.empty(); }; @@ -706,16 +685,18 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { received_datagrams_.emplace_back(std::move(data)); } -AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function cb) { - AssertionResult result = AssertionSuccess(); - absl::Notification done; +AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function cb, + std::chrono::milliseconds timeout) { + auto result = std::make_shared(AssertionSuccess()); + auto done = std::make_shared(); ASSERT(!dispatcher_->isThreadSafe()); dispatcher_->post([&]() { - result = cb(); - done.Notify(); + *result = cb(); + done->Notify(); }); - done.WaitForNotification(); - return result; + RELEASE_ASSERT(done->WaitForNotificationWithTimeout(absl::FromChrono(timeout)), + "Timed out waiting for cb to run on dispatcher"); + return *result; } void FakeUpstream::sendUdpDatagram(const std::string& buffer, @@ -756,23 +737,13 @@ FakeRawConnection::~FakeRawConnection() { testing::AssertionResult FakeRawConnection::initialize() { Network::ReadFilterSharedPtr filter{new ReadFilter(*this)}; read_filter_ = filter; + VERIFY_ASSERTION(FakeConnectionBase::initialize()); if (!shared_connection_.connected()) { - VERIFY_ASSERTION(FakeConnectionBase::initialize()); return AssertionFailure() << "initialize failed, connection is disconnected."; } ASSERT(shared_connection_.connection().dispatcher().isThreadSafe()); - if (shared_connection_.connection().dispatcher().isThreadSafe()) { - shared_connection_.connection().addReadFilter(filter); - } else { - testing::AssertionResult result = shared_connection_.executeOnDispatcher( - [filter = std::move(filter)](Network::Connection& connection) { - connection.addReadFilter(filter); - }); - if (!result) { - return result; - } - } - return FakeConnectionBase::initialize(); + shared_connection_.connection().addReadFilter(filter); + return AssertionSuccess(); } AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data, diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index a5761639f39f..79357bc91271 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -291,7 +291,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { absl::MutexLock lock(&lock_); if (!time_system.waitFor(lock_, absl::Condition(&disconnected_), timeout)) { - ENVOY_LOG_MISC(critical, "timeout"); return AssertionFailure() << "Timed out waiting for disconnect"; } return AssertionSuccess(); @@ -315,6 +314,8 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, if (disconnected_) { return testing::AssertionSuccess(); } + // Sanity check: detect if the post and wait is attempted from the dispatcher thread; fail + // immediately instead of deadlocking. ASSERT(!connection_.dispatcher().isThreadSafe(), "deadlock: executeOnDispatcher called from dispatcher thread."); bool callback_ready_event = false; @@ -387,9 +388,6 @@ class FakeConnectionBase : public Logger::Loggable { initialized_ = true; return testing::AssertionSuccess(); } - ABSL_MUST_USE_RESULT - testing::AssertionResult - enableHalfClose(bool enabled, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); // The same caveats apply here as in SharedConnectionWrapper::connection(). Network::Connection& connection() const { return shared_connection_.connection(); } bool connected() const { return shared_connection_.connected(); } @@ -743,7 +741,9 @@ class FakeUpstream : Logger::Loggable, void threadRoutine(); SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_); void onRecvDatagram(Network::UdpRecvData& data); - AssertionResult runOnDispatcherThreadAndWait(std::function cb); + AssertionResult + runOnDispatcherThreadAndWait(std::function cb, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); Network::SocketSharedPtr socket_; Network::ListenSocketFactorySharedPtr socket_factory_; From 197362f800aafad0470af278f783833113ee6f03 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Fri, 15 Jan 2021 21:04:33 -0500 Subject: [PATCH 14/16] revert back to the original wait for a raw connection followed by checking for disconnect in TLS upstream error tests This was accomplished by allowing raw connection initialization even if the upstream network connection is disconnected before raw connection creation. Signed-off-by: Antonio Vicente --- test/integration/autonomous_upstream.cc | 3 +- test/integration/fake_upstream.cc | 45 +++++-------------- test/integration/fake_upstream.h | 25 +---------- test/integration/hds_integration_test.cc | 10 ++--- .../sds_dynamic_integration_test.cc | 6 +-- 5 files changed, 23 insertions(+), 66 deletions(-) diff --git a/test/integration/autonomous_upstream.cc b/test/integration/autonomous_upstream.cc index 16d49cb50e60..762da275b31d 100644 --- a/test/integration/autonomous_upstream.cc +++ b/test/integration/autonomous_upstream.cc @@ -119,8 +119,7 @@ bool AutonomousUpstream::createNetworkFilterChain(Network::Connection& connectio shared_connections_.emplace_back(new SharedConnectionWrapper(connection)); AutonomousHttpConnectionPtr http_connection( new AutonomousHttpConnection(*this, *shared_connections_.back(), http_type_, *this)); - testing::AssertionResult result = http_connection->initialize(); - RELEASE_ASSERT(result, result.message()); + http_connection->initialize(); http_connections_.push_back(std::move(http_connection)); return true; } diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index c0e32cb55967..90fb9cd50404 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -557,7 +557,7 @@ AssertionResult FakeUpstream::waitForHttpConnection( connection = std::make_unique( *this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, max_request_headers_count, headers_with_underscores_action); - VERIFY_ASSERTION(connection->initialize()); + connection->initialize(); return AssertionSuccess(); }); } @@ -591,7 +591,7 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, envoy::config::core::v3::HttpProtocolOptions::ALLOW); - VERIFY_ASSERTION(connection->initialize()); + connection->initialize(); return AssertionSuccess(); }); } @@ -616,34 +616,13 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect return runOnDispatcherThreadAndWait([&]() { absl::MutexLock lock(&lock_); connection = std::make_unique(consumeConnection(), timeSystem()); - VERIFY_ASSERTION(connection->initialize()); - connection->connection().enableHalfClose(enable_half_close_); - return AssertionSuccess(); - }); -} - -testing::AssertionResult -FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) { - ASSERT(!read_disable_on_new_connection_); - SharedConnectionWrapper* connection; - { - absl::MutexLock lock(&lock_); - const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { - return !new_connections_.empty(); - }; - - if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { - return AssertionFailure() << "Timed out waiting for raw connection"; + connection->initialize(); + // Skip enableHalfClose if the connection is already disconnected. + if (connection->connected()) { + connection->connection().enableHalfClose(enable_half_close_); } - } - - VERIFY_ASSERTION(runOnDispatcherThreadAndWait([&]() { - absl::MutexLock lock(&lock_); - connection = &consumeConnection(); return AssertionSuccess(); - })); - - return connection->waitForDisconnect(time_system_, timeout); + }); } SharedConnectionWrapper& FakeUpstream::consumeConnection() { @@ -655,7 +634,7 @@ SharedConnectionWrapper& FakeUpstream::consumeConnection() { connection_wrapper->connection().dispatcher().isThreadSafe()); connection_wrapper->setParented(); connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_); - if (read_disable_on_new_connection_) { + if (read_disable_on_new_connection_ && connection_wrapper->connected()) { // Re-enable read and early close detection. auto& connection = connection_wrapper->connection(); connection.detectEarlyCloseWhenReadDisabled(true); @@ -734,16 +713,16 @@ FakeRawConnection::~FakeRawConnection() { } } -testing::AssertionResult FakeRawConnection::initialize() { +void FakeRawConnection::initialize() { + FakeConnectionBase::initialize(); Network::ReadFilterSharedPtr filter{new ReadFilter(*this)}; read_filter_ = filter; - VERIFY_ASSERTION(FakeConnectionBase::initialize()); if (!shared_connection_.connected()) { - return AssertionFailure() << "initialize failed, connection is disconnected."; + ENVOY_LOG(warn, "FakeRawConnection::initialize: network connection is already disconnected"); + return; } ASSERT(shared_connection_.connection().dispatcher().isThreadSafe()); shared_connection_.connection().addReadFilter(filter); - return AssertionSuccess(); } AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data, diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 79357bc91271..25885f0cf1ac 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -285,17 +285,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, return !disconnected_; } - ABSL_MUST_USE_RESULT - testing::AssertionResult - waitForDisconnect(Event::TestTimeSystem& time_system, - std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { - absl::MutexLock lock(&lock_); - if (!time_system.waitFor(lock_, absl::Condition(&disconnected_), timeout)) { - return AssertionFailure() << "Timed out waiting for disconnect"; - } - return AssertionSuccess(); - } - // This provides direct access to the underlying connection, but only to const methods. // Stateful connection related methods should happen on the connection's dispatcher via // executeOnDispatcher. @@ -383,11 +372,7 @@ class FakeConnectionBase : public Logger::Loggable { testing::AssertionResult waitForHalfClose(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - ABSL_MUST_USE_RESULT - virtual testing::AssertionResult initialize() { - initialized_ = true; - return testing::AssertionSuccess(); - } + virtual void initialize() { initialized_ = true; } // The same caveats apply here as in SharedConnectionWrapper::connection(). Network::Connection& connection() const { return shared_connection_.connection(); } bool connected() const { return shared_connection_.connected(); } @@ -493,8 +478,7 @@ class FakeRawConnection : public FakeConnectionBase { testing::AssertionResult write(const std::string& data, bool end_stream = false, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - ABSL_MUST_USE_RESULT - testing::AssertionResult initialize() override; + void initialize() override; // Creates a ValidatorFunction which returns true when data_to_wait_for is // contained in the incoming data string. Unlike many of Envoy waitFor functions, @@ -579,11 +563,6 @@ class FakeUpstream : Logger::Loggable, testing::AssertionResult waitForRawConnection(FakeRawConnectionPtr& connection, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - - ABSL_MUST_USE_RESULT - testing::AssertionResult waitForAndConsumeDisconnectedConnection( - std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - Network::Address::InstanceConstSharedPtr localAddress() const { return socket_->addressProvider().localAddress(); } diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index 0ba63baad5bc..85ebbc9b8d31 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -1032,8 +1032,6 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTlsMissingSocketMatch) { tls_hosts_ = true; initialize(); - // Allow the fake upstreams to detect an error and disconnect during the TLS handshake. - host_upstream_->setReadDisableOnNewConnection(false); // Server <--> Envoy waitForHdsStream(); @@ -1049,9 +1047,11 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTlsMissingSocketMatch) { hds_stream_->sendGrpcMessage(server_health_check_specifier_); test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); - // Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the - // health check - ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection()); + // Envoy sends a health check message to an endpoint + ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); + + // Endpoint doesn't respond to the health check + ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); // Receive updates until the one we expect arrives. This should be UNHEALTHY and not TIMEOUT, // because TIMEOUT occurs in the situation where there is no response from the endpoint. In this diff --git a/test/integration/sds_dynamic_integration_test.cc b/test/integration/sds_dynamic_integration_test.cc index f55a974694b5..6023882ab9d6 100644 --- a/test/integration/sds_dynamic_integration_test.cc +++ b/test/integration/sds_dynamic_integration_test.cc @@ -646,8 +646,6 @@ TEST_P(SdsDynamicUpstreamIntegrationTest, WrongSecretFirst) { sendSdsResponse(getWrongSecret(client_cert_)); }; initialize(); - // Allow the fake upstreams to detect an error and disconnect during the TLS handshake. - fake_upstreams_[0]->setReadDisableOnNewConnection(false); // Make a simple request, should get 503 BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest( @@ -656,7 +654,9 @@ TEST_P(SdsDynamicUpstreamIntegrationTest, WrongSecretFirst) { EXPECT_EQ("503", response->headers().getStatusValue()); // To flush out the reset connection from the first request in upstream. - ASSERT_TRUE(fake_upstreams_[0]->waitForAndConsumeDisconnectedConnection()); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); // Failure EXPECT_EQ(0, test_server_->counter("sds.client_cert.update_success")->value()); From 3aee6d7846a5a9f8c8932b3cce07c350ad20dc0b Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Tue, 19 Jan 2021 12:52:00 -0500 Subject: [PATCH 15/16] add thread annotations to FakeConnectionBase::initialized_ Signed-off-by: Antonio Vicente --- test/integration/fake_upstream.h | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 25885f0cf1ac..0e96ea5a8ad1 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -355,7 +355,10 @@ using SharedConnectionWrapperPtr = std::unique_ptr; */ class FakeConnectionBase : public Logger::Loggable { public: - virtual ~FakeConnectionBase() { ASSERT(initialized_); } + virtual ~FakeConnectionBase() { + absl::MutexLock lock(&lock_); + ASSERT(initialized_); + } ABSL_MUST_USE_RESULT testing::AssertionResult close(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); @@ -372,7 +375,10 @@ class FakeConnectionBase : public Logger::Loggable { testing::AssertionResult waitForHalfClose(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - virtual void initialize() { initialized_ = true; } + virtual void initialize() { + absl::MutexLock lock(&lock_); + initialized_ = true; + } // The same caveats apply here as in SharedConnectionWrapper::connection(). Network::Connection& connection() const { return shared_connection_.connection(); } bool connected() const { return shared_connection_.connected(); } @@ -383,9 +389,9 @@ class FakeConnectionBase : public Logger::Loggable { time_system_(time_system) {} SharedConnectionWrapper& shared_connection_; - bool initialized_{}; absl::Mutex& lock_; // TODO(mattklein123): Use the shared connection lock and figure out better // guarded by annotations. + bool initialized_ ABSL_GUARDED_BY(lock_){}; bool half_closed_ ABSL_GUARDED_BY(lock_){}; Event::TestTimeSystem& time_system_; }; From 8fccaf3fbd9cf2e85159fc8262b3b561c7ead261 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Wed, 20 Jan 2021 13:22:56 -0500 Subject: [PATCH 16/16] fix use-after-free in test framework Signed-off-by: Antonio Vicente --- .../proxy_protocol/proxy_protocol_integration_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/test/extensions/transport_sockets/proxy_protocol/proxy_protocol_integration_test.cc b/test/extensions/transport_sockets/proxy_protocol/proxy_protocol_integration_test.cc index 5747ec73d125..7e0f40d136c0 100644 --- a/test/extensions/transport_sockets/proxy_protocol/proxy_protocol_integration_test.cc +++ b/test/extensions/transport_sockets/proxy_protocol/proxy_protocol_integration_test.cc @@ -16,6 +16,7 @@ class ProxyProtocolIntegrationTest : public testing::TestWithParam