diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index b2fe7b2ae95c..7ebcc8cd0e4e 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -78,7 +78,7 @@ void ConnectionHandlerImpl::enableListeners() { void ConnectionHandlerImpl::ActiveTcpListener::removeConnection(ActiveConnection& connection) { ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "adding to cleanup list", *connection.connection_); - ActiveConnectionPtr removed = connection.removeFromList(connections_); + ActiveConnectionPtr removed = connection.removeFromList(connection.group_.connections_); parent_.dispatcher_.deferredDelete(std::move(removed)); ASSERT(parent_.num_connections_ > 0); parent_.num_connections_--; @@ -112,11 +112,12 @@ ConnectionHandlerImpl::ActiveTcpListener::~ActiveTcpListener() { ActiveSocketPtr removed = sockets_.front()->removeFromList(sockets_); parent_.dispatcher_.deferredDelete(std::move(removed)); } - - while (!connections_.empty()) { - connections_.front()->connection_->close(Network::ConnectionCloseType::NoFlush); + for (auto& chain_and_connections : connection_groups_) { + auto& connections = chain_and_connections.second.connections_; + while (!connections.empty()) { + connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush); + } } - parent_.dispatcher_.clearDeferredDeleteList(); } @@ -246,6 +247,21 @@ void ConnectionHandlerImpl::ActiveTcpListener::onAccept( } } +// TODO(silentdai): Consider index by filter chain tag. Either would require filter chain manager +// assign the same tag/pointer to the generations of filter chain. +// TODO(silentdai): Switch to filter chain resource name once filter chain has name field. +ConnectionHandlerImpl::ConnectionGroup& +ConnectionHandlerImpl::ActiveTcpListener::findGroupByFilterChain( + const Envoy::Network::FilterChain* filter_chain) { + auto iter = connection_groups_.find(filter_chain); + if (iter != connection_groups_.end()) { + return iter->second; + } else { + return connection_groups_.emplace(filter_chain, ConnectionHandlerImpl::ConnectionGroup{*this}) + .first->second; + } +} + void ConnectionHandlerImpl::ActiveTcpListener::newConnection( Network::ConnectionSocketPtr&& socket) { // Find matching filter chain. @@ -271,39 +287,40 @@ void ConnectionHandlerImpl::ActiveTcpListener::newConnection( new_connection->close(Network::ConnectionCloseType::NoFlush); return; } - - onNewConnection(std::move(new_connection)); + auto& connection_group = findGroupByFilterChain(filter_chain); + onNewConnection(connection_group, std::move(new_connection)); } void ConnectionHandlerImpl::ActiveTcpListener::onNewConnection( - Network::ConnectionPtr&& new_connection) { + ConnectionGroup& group, Network::ConnectionPtr&& new_connection) { ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "new connection", *new_connection); // If the connection is already closed, we can just let this connection immediately die. if (new_connection->state() != Network::Connection::State::Closed) { ActiveConnectionPtr active_connection( - new ActiveConnection(*this, std::move(new_connection), parent_.dispatcher_.timeSource())); - active_connection->moveIntoList(std::move(active_connection), connections_); + new ActiveConnection(group, std::move(new_connection), parent_.dispatcher_.timeSource())); + active_connection->moveIntoList(std::move(active_connection), group.connections_); parent_.num_connections_++; } } -ConnectionHandlerImpl::ActiveConnection::ActiveConnection(ActiveTcpListener& listener, +ConnectionHandlerImpl::ActiveConnection::ActiveConnection(ConnectionGroup& group, Network::ConnectionPtr&& new_connection, TimeSource& time_source) - : listener_(listener), connection_(std::move(new_connection)), - conn_length_(new Stats::Timespan(listener_.stats_.downstream_cx_length_ms_, time_source)) { + : group_(group), connection_(std::move(new_connection)), + conn_length_( + new Stats::Timespan(group_.listener_.stats_.downstream_cx_length_ms_, time_source)) { // We just universally set no delay on connections. Theoretically we might at some point want // to make this configurable. connection_->noDelay(true); connection_->addConnectionCallbacks(*this); - listener_.stats_.downstream_cx_total_.inc(); - listener_.stats_.downstream_cx_active_.inc(); + group_.listener_.stats_.downstream_cx_total_.inc(); + group_.listener_.stats_.downstream_cx_active_.inc(); } ConnectionHandlerImpl::ActiveConnection::~ActiveConnection() { - listener_.stats_.downstream_cx_active_.dec(); - listener_.stats_.downstream_cx_destroy_.inc(); + group_.listener_.stats_.downstream_cx_active_.dec(); + group_.listener_.stats_.downstream_cx_destroy_.inc(); conn_length_->complete(); } diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index f8b83e3628f6..a704e6263ecc 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -121,6 +121,16 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable { Network::UdpListenerReadFilterPtr read_filter_; }; + // The abstraction of a filter chain. It owns the connections and belongs to a ActiveTcpListener. + struct ConnectionGroup { + explicit ConnectionGroup(ActiveTcpListener& listener) noexcept : listener_(listener) {} + explicit ConnectionGroup(ConnectionGroup&&) noexcept = default; + + ~ConnectionGroup() = default; + ActiveTcpListener& listener_; + std::list connections_; + }; + /** * Wrapper for an active tcp listener owned by this handler. */ @@ -135,7 +145,17 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable { // Network::ListenerCallbacks void onAccept(Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) override; - void onNewConnection(Network::ConnectionPtr&& new_connection) override; + + // TODO(silentdai): replace onNewConnection + void onNewConnection(Network::ConnectionPtr&& new_connection) override { + UNREFERENCED_PARAMETER(new_connection); + } + void onNewConnection(ConnectionGroup& group, Network::ConnectionPtr&& new_connection); + + // TODO(silentdai): find the group mapping to the filter chain. + // The implementation must guarantee that the exact group should be found if the filter chain + // survives between filter chain config update(or listener update at this moment). + ConnectionGroup& findGroupByFilterChain(const Network::FilterChain* filter_chain); /** * Remove and destroy an active connection. @@ -149,7 +169,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable { void newConnection(Network::ConnectionSocketPtr&& socket); std::list sockets_; - std::list connections_; + std::unordered_map connection_groups_; }; /** @@ -158,7 +178,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable { struct ActiveConnection : LinkedObject, public Event::DeferredDeletable, public Network::ConnectionCallbacks { - ActiveConnection(ActiveTcpListener& listener, Network::ConnectionPtr&& new_connection, + ActiveConnection(ConnectionGroup& group, Network::ConnectionPtr&& new_connection, TimeSource& time_system); ~ActiveConnection() override; @@ -167,13 +187,13 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable { // Any event leads to destruction of the connection. if (event == Network::ConnectionEvent::LocalClose || event == Network::ConnectionEvent::RemoteClose) { - listener_.removeConnection(*this); + group_.listener_.removeConnection(*this); } } void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} - ActiveTcpListener& listener_; + ConnectionGroup& group_; Network::ConnectionPtr connection_; Stats::TimespanPtr conn_length_; };