Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: initial drainable filter chain #4

Open
wants to merge 1 commit into
base: fcbase
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions source/server/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void ConnectionHandlerImpl::enableListeners() {
void ConnectionHandlerImpl::ActiveTcpListener::removeConnection(ActiveConnection& connection) {
ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "adding to cleanup list",
*connection.connection_);
ActiveConnectionPtr removed = connection.removeFromList(connections_);
ActiveConnectionPtr removed = connection.removeFromList(connection.group_.connections_);
parent_.dispatcher_.deferredDelete(std::move(removed));
ASSERT(parent_.num_connections_ > 0);
parent_.num_connections_--;
Expand Down Expand Up @@ -112,11 +112,12 @@ ConnectionHandlerImpl::ActiveTcpListener::~ActiveTcpListener() {
ActiveSocketPtr removed = sockets_.front()->removeFromList(sockets_);
parent_.dispatcher_.deferredDelete(std::move(removed));
}

while (!connections_.empty()) {
connections_.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
for (auto& chain_and_connections : connection_groups_) {
auto& connections = chain_and_connections.second.connections_;
while (!connections.empty()) {
connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
}
}

parent_.dispatcher_.clearDeferredDeleteList();
}

Expand Down Expand Up @@ -246,6 +247,21 @@ void ConnectionHandlerImpl::ActiveTcpListener::onAccept(
}
}

// TODO(silentdai): Consider index by filter chain tag. Either would require filter chain manager
// assign the same tag/pointer to the generations of filter chain.
// TODO(silentdai): Switch to filter chain resource name once filter chain has name field.
ConnectionHandlerImpl::ConnectionGroup&
ConnectionHandlerImpl::ActiveTcpListener::findGroupByFilterChain(
const Envoy::Network::FilterChain* filter_chain) {
auto iter = connection_groups_.find(filter_chain);
if (iter != connection_groups_.end()) {
return iter->second;
} else {
return connection_groups_.emplace(filter_chain, ConnectionHandlerImpl::ConnectionGroup{*this})
.first->second;
}
}

void ConnectionHandlerImpl::ActiveTcpListener::newConnection(
Network::ConnectionSocketPtr&& socket) {
// Find matching filter chain.
Expand All @@ -271,39 +287,40 @@ void ConnectionHandlerImpl::ActiveTcpListener::newConnection(
new_connection->close(Network::ConnectionCloseType::NoFlush);
return;
}

onNewConnection(std::move(new_connection));
auto& connection_group = findGroupByFilterChain(filter_chain);
onNewConnection(connection_group, std::move(new_connection));
}

void ConnectionHandlerImpl::ActiveTcpListener::onNewConnection(
Network::ConnectionPtr&& new_connection) {
ConnectionGroup& group, Network::ConnectionPtr&& new_connection) {
ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "new connection", *new_connection);

// If the connection is already closed, we can just let this connection immediately die.
if (new_connection->state() != Network::Connection::State::Closed) {
ActiveConnectionPtr active_connection(
new ActiveConnection(*this, std::move(new_connection), parent_.dispatcher_.timeSource()));
active_connection->moveIntoList(std::move(active_connection), connections_);
new ActiveConnection(group, std::move(new_connection), parent_.dispatcher_.timeSource()));
active_connection->moveIntoList(std::move(active_connection), group.connections_);
parent_.num_connections_++;
}
}

ConnectionHandlerImpl::ActiveConnection::ActiveConnection(ActiveTcpListener& listener,
ConnectionHandlerImpl::ActiveConnection::ActiveConnection(ConnectionGroup& group,
Network::ConnectionPtr&& new_connection,
TimeSource& time_source)
: listener_(listener), connection_(std::move(new_connection)),
conn_length_(new Stats::Timespan(listener_.stats_.downstream_cx_length_ms_, time_source)) {
: group_(group), connection_(std::move(new_connection)),
conn_length_(
new Stats::Timespan(group_.listener_.stats_.downstream_cx_length_ms_, time_source)) {
// We just universally set no delay on connections. Theoretically we might at some point want
// to make this configurable.
connection_->noDelay(true);
connection_->addConnectionCallbacks(*this);
listener_.stats_.downstream_cx_total_.inc();
listener_.stats_.downstream_cx_active_.inc();
group_.listener_.stats_.downstream_cx_total_.inc();
group_.listener_.stats_.downstream_cx_active_.inc();
}

ConnectionHandlerImpl::ActiveConnection::~ActiveConnection() {
listener_.stats_.downstream_cx_active_.dec();
listener_.stats_.downstream_cx_destroy_.inc();
group_.listener_.stats_.downstream_cx_active_.dec();
group_.listener_.stats_.downstream_cx_destroy_.inc();
conn_length_->complete();
}

Expand Down
30 changes: 25 additions & 5 deletions source/server/connection_handler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable {
Network::UdpListenerReadFilterPtr read_filter_;
};

// The abstraction of a filter chain. It owns the connections and belongs to a ActiveTcpListener.
struct ConnectionGroup {
explicit ConnectionGroup(ActiveTcpListener& listener) noexcept : listener_(listener) {}
explicit ConnectionGroup(ConnectionGroup&&) noexcept = default;

~ConnectionGroup() = default;
ActiveTcpListener& listener_;
std::list<ActiveConnectionPtr> connections_;
};

/**
* Wrapper for an active tcp listener owned by this handler.
*/
Expand All @@ -135,7 +145,17 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable {
// Network::ListenerCallbacks
void onAccept(Network::ConnectionSocketPtr&& socket,
bool hand_off_restored_destination_connections) override;
void onNewConnection(Network::ConnectionPtr&& new_connection) override;

// TODO(silentdai): replace onNewConnection
void onNewConnection(Network::ConnectionPtr&& new_connection) override {
UNREFERENCED_PARAMETER(new_connection);
}
void onNewConnection(ConnectionGroup& group, Network::ConnectionPtr&& new_connection);

// TODO(silentdai): find the group mapping to the filter chain.
// The implementation must guarantee that the exact group should be found if the filter chain
// survives between filter chain config update(or listener update at this moment).
ConnectionGroup& findGroupByFilterChain(const Network::FilterChain* filter_chain);

/**
* Remove and destroy an active connection.
Expand All @@ -149,7 +169,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable {
void newConnection(Network::ConnectionSocketPtr&& socket);

std::list<ActiveSocketPtr> sockets_;
std::list<ActiveConnectionPtr> connections_;
std::unordered_map<const Network::FilterChain*, ConnectionGroup> connection_groups_;
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using FilterChainTag = uint64_t;
std::unordered_map<FilterChainTag, ConnectionGroup> connection_groups_;
Group& ActiveTcpListener::findGroupByFilterChain(FilterChain * chain) {
  return connection_groups_[chain->filterchainTag()]; // omit how the Group is constructed.
};

};

/**
Expand All @@ -158,7 +178,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable {
struct ActiveConnection : LinkedObject<ActiveConnection>,
public Event::DeferredDeletable,
public Network::ConnectionCallbacks {
ActiveConnection(ActiveTcpListener& listener, Network::ConnectionPtr&& new_connection,
ActiveConnection(ConnectionGroup& group, Network::ConnectionPtr&& new_connection,
TimeSource& time_system);
~ActiveConnection() override;

Expand All @@ -167,13 +187,13 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable {
// Any event leads to destruction of the connection.
if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
listener_.removeConnection(*this);
group_.listener_.removeConnection(*this);
}
}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

ActiveTcpListener& listener_;
ConnectionGroup& group_;
Network::ConnectionPtr connection_;
Stats::TimespanPtr conn_length_;
};
Expand Down