Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Prototype of per filter chain update #6

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions include/envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,31 @@ class ConnectionHandler {
public:
virtual ~ConnectionHandler() = default;

/**
* Used by ConnectionHandler to manage listeners.
*/
class ActiveListener {
public:
virtual ~ActiveListener() = default;

/**
* @return the tag value as configured.
*/
virtual uint64_t listenerTag() PURE;

/**
* @return the actual Listener object.
*/
virtual Listener* listener() PURE;

/**
* Destroy the actual Listener it wraps.
*/
virtual void destroy() PURE;
};

using ActiveListenerPtr = std::unique_ptr<ActiveListener>;

/**
* @return uint64_t the number of active connections owned by the handler.
*/
Expand Down Expand Up @@ -57,6 +82,10 @@ class ConnectionHandler {
*/
virtual void stopListeners(uint64_t listener_tag) PURE;

virtual bool updateListener(
uint64_t listener_tag,
std::function<bool(Network::ConnectionHandler::ActiveListener&)> listener_update_func) PURE;

/**
* Stop all listeners. This will not close any connections and is used for draining.
*/
Expand All @@ -78,31 +107,6 @@ class ConnectionHandler {
* @return the stat prefix used for per-handler stats.
*/
virtual const std::string& statPrefix() PURE;

/**
* Used by ConnectionHandler to manage listeners.
*/
class ActiveListener {
public:
virtual ~ActiveListener() = default;

/**
* @return the tag value as configured.
*/
virtual uint64_t listenerTag() PURE;

/**
* @return the actual Listener object.
*/
virtual Listener* listener() PURE;

/**
* Destroy the actual Listener it wraps.
*/
virtual void destroy() PURE;
};

using ActiveListenerPtr = std::unique_ptr<ActiveListener>;
};

using ConnectionHandlerPtr = std::unique_ptr<ConnectionHandler>;
Expand Down
9 changes: 9 additions & 0 deletions include/envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,13 @@ class FilterChain {
* const std::vector<FilterFactoryCb>& a list of filters to be used by the new connection.
*/
virtual const std::vector<FilterFactoryCb>& networkFilterFactories() const PURE;

/**
* @brief Get the Tag object associated with this filter chain.
*
* @return int64_t
*/
virtual int64_t getTag() const { return 0; }
};

using FilterChainSharedPtr = std::shared_ptr<FilterChain>;
Expand All @@ -351,6 +358,8 @@ class FilterChainManager {
virtual const FilterChain* findFilterChain(const ConnectionSocket& socket) const PURE;
};

using FilterChainManagerSharedPtr = std::shared_ptr<FilterChainManager>;

/**
* Callbacks used by individual UDP listener read filter instances to communicate with the filter
* manager.
Expand Down
15 changes: 14 additions & 1 deletion include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ class ListenerConfig {
*/
virtual FilterChainManager& filterChainManager() PURE;

/**
* @return FilterChainManagerSharedPtr the factory for adding and searching through configured
* filter chains.
*/
virtual FilterChainManagerSharedPtr sharedFilterChainManager() PURE;

/**
* @return FilterChainFactory& the factory for setting up the filter chain on a new
* connection.
Expand Down Expand Up @@ -111,7 +117,7 @@ class ListenerConfig {
};

/**
* Callbacks invoked by a listener.
* Callbacks invoked by a tcp listener.
*/
class ListenerCallbacks {
public:
Expand All @@ -122,6 +128,13 @@ class ListenerCallbacks {
* @param socket supplies the socket that is moved into the callee.
*/
virtual void onAccept(ConnectionSocketPtr&& socket) PURE;

/**
* Called when a new filter chain manager is updated.
*
* @param filter_chain_manager
*/
virtual void onNewFilterChainManger(FilterChainManager& filter_chain_manager) PURE;
};

/**
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class Worker {
*/
virtual void stopListener(Network::ListenerConfig& listener,
std::function<void()> completion) PURE;

virtual void updateListener(
uint64_t listener_tag,
std::function<bool(Network::ConnectionHandler::ActiveListener&)> listener_update_func,
std::function<void(bool)> completion) PURE;
};

using WorkerPtr = std::unique_ptr<Worker>;
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Network {

void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg) {
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
auto* listener = static_cast<ListenerImpl*>(arg);

// Create the IoSocketHandleImpl for the fd here.
IoHandlePtr io_handle = std::make_unique<IoSocketHandleImpl>(fd);
Expand Down
16 changes: 16 additions & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ envoy_cc_library(
":drain_manager_lib",
":filter_chain_manager_lib",
":listener_manager_impl",
":tag_generator_lib",
":transport_socket_config_lib",
":well_known_names_lib",
"//include/envoy/server:active_udp_listener_config_interface",
Expand Down Expand Up @@ -333,9 +334,11 @@ envoy_cc_library(
"//include/envoy/server:transport_socket_config_interface",
"//source/common/common:empty_string",
"//source/common/config:utility_lib",
"//source/common/init:manager_lib",
"//source/common/network:cidr_range_lib",
"//source/common/network:lc_trie_lib",
"//source/server:configuration_lib",
"//source/server:tag_generator_lib",
"@envoy_api//envoy/api/v2/listener:pkg_cc_proto",
],
)
Expand Down Expand Up @@ -500,3 +503,16 @@ envoy_cc_library(
"//include/envoy/server:active_udp_listener_config_interface",
],
)

envoy_cc_library(
name = "tag_generator_lib",
srcs = ["tag_generator_batch_impl.cc"],
hdrs = [
"tag_generator.h",
"tag_generator_batch_impl.h",
],
deps = [
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/api/v2/listener:pkg_cc_proto",
],
)
60 changes: 47 additions & 13 deletions source/server/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ void ConnectionHandlerImpl::removeListeners(uint64_t listener_tag) {
}
}

bool ConnectionHandlerImpl::updateListener(
uint64_t listener_tag,
std::function<bool(Network::ConnectionHandler::ActiveListener&)> activeListenerUpdate) {
bool res = false;
for (auto& listener : listeners_) {
// listener type: std::pair<Network::Address::InstanceConstSharedPtr, ActiveListenerDetails>
if (listener.second.listener_->listenerTag() == listener_tag) {
// TODO(lambdai): explore in which condition there are more than one active listeners sharing
// the same tag
res |= activeListenerUpdate(*listener.second.listener_);
}
}
return res;
}

void ConnectionHandlerImpl::stopListeners(uint64_t listener_tag) {
for (auto& listener : listeners_) {
if (listener.second.listener_->listenerTag() == listener_tag) {
Expand Down Expand Up @@ -86,7 +101,12 @@ void ConnectionHandlerImpl::enableListeners() {

void ConnectionHandlerImpl::ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) {
ENVOY_CONN_LOG(debug, "adding to cleanup list", *connection.connection_);
ActiveTcpConnectionPtr removed = connection.removeFromList(connections_);
auto connections = tagged_connections_.find(connection.tag_);
ASSERT(connections != tagged_connections_.end());
ActiveTcpConnectionPtr removed = connection.removeFromList(connections->second);
if (connections->second.empty()) {
tagged_connections_.erase(connections);
}
parent_.dispatcher_.deferredDelete(std::move(removed));
}

Expand All @@ -111,7 +131,8 @@ ConnectionHandlerImpl::ActiveTcpListener::ActiveTcpListener(ConnectionHandlerImp
Network::ListenerConfig& config)
: ConnectionHandlerImpl::ActiveListenerImplBase(parent, config), parent_(parent),
listener_(std::move(listener)), listener_filters_timeout_(config.listenerFiltersTimeout()),
continue_on_listener_filters_timeout_(config.continueOnListenerFiltersTimeout()) {
continue_on_listener_filters_timeout_(config.continueOnListenerFiltersTimeout()),
active_filter_chain_manager_(config.sharedFilterChainManager()) {
config.connectionBalancer().registerHandler(*this);
}

Expand All @@ -125,8 +146,11 @@ ConnectionHandlerImpl::ActiveTcpListener::~ActiveTcpListener() {
parent_.dispatcher_.deferredDelete(std::move(removed));
}

while (!connections_.empty()) {
connections_.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
for (auto& kv : tagged_connections_) {
auto& connections = kv.second;
while (!connections.empty()) {
connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
}
}

parent_.dispatcher_.clearDeferredDeleteList();
Expand Down Expand Up @@ -270,14 +294,21 @@ void ConnectionHandlerImpl::ActiveTcpSocket::newConnection() {
// Particularly the assigned events need to reset before assigning new events in the follow up.
accept_filters_.clear();
// Create a new connection on this listener.
listener_.newConnection(std::move(socket_));
listener_.newConnection(*this);
}
}

void ConnectionHandlerImpl::ActiveTcpListener::onAccept(Network::ConnectionSocketPtr&& socket) {
onAcceptWorker(std::move(socket), config_.handOffRestoredDestinationConnections(), false);
}

void ConnectionHandlerImpl::ActiveTcpListener::onNewFilterChainManger(
Network::FilterChainManager& filter_chain_manager) {
UNREFERENCED_PARAMETER(filter_chain_manager);
// obtain the tags traits
// drain the connections out of the tracking tags
}

void ConnectionHandlerImpl::ActiveTcpListener::onAcceptWorker(
Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections,
bool rebalanced) {
Expand Down Expand Up @@ -306,21 +337,23 @@ void ConnectionHandlerImpl::ActiveTcpListener::onAcceptWorker(
}

void ConnectionHandlerImpl::ActiveTcpListener::newConnection(
Network::ConnectionSocketPtr&& socket) {
ConnectionHandlerImpl::ActiveTcpSocket& tcp_socket) {
// Find matching filter chain.
const auto filter_chain = config_.filterChainManager().findFilterChain(*socket);
auto snapped_filter_chain_manager = active_filter_chain_manager_;
const auto filter_chain = snapped_filter_chain_manager->findFilterChain(*tcp_socket.socket_);
if (filter_chain == nullptr) {
ENVOY_LOG(debug, "closing connection: no matching filter chain found");
stats_.no_filter_chain_match_.inc();
socket->close();
tcp_socket.socket_->close();
return;
}

auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket(nullptr);
ActiveTcpConnectionPtr active_connection(new ActiveTcpConnection(
*this,
parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket)),
parent_.dispatcher_.timeSource()));
ActiveTcpConnectionPtr active_connection(
new ActiveTcpConnection(*this,
parent_.dispatcher_.createServerConnection(
std::move(tcp_socket.socket_), std::move(transport_socket)),
parent_.dispatcher_.timeSource()));
active_connection->connection_->setBufferLimits(config_.perConnectionBufferLimitBytes());

const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain(
Expand All @@ -334,7 +367,8 @@ void ConnectionHandlerImpl::ActiveTcpListener::newConnection(
if (active_connection->connection_->state() != Network::Connection::State::Closed) {
ENVOY_CONN_LOG(debug, "new connection", *active_connection->connection_);
active_connection->connection_->addConnectionCallbacks(*active_connection);
active_connection->moveIntoList(std::move(active_connection), connections_);
auto& connections = tagged_connections_[active_connection->tag_];
active_connection->moveIntoList(std::move(active_connection), connections);
}
}

Expand Down
21 changes: 17 additions & 4 deletions source/server/connection_handler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,
void stopListeners() override;
void disableListeners() override;
void enableListeners() override;
bool updateListener(uint64_t listener_tag,
std::function<bool(Network::ConnectionHandler::ActiveListener&)>
listener_update_func) override;
const std::string& statPrefix() override { return per_handler_stat_prefix_; }

/**
Expand Down Expand Up @@ -115,6 +118,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,

// Network::ListenerCallbacks
void onAccept(Network::ConnectionSocketPtr&& socket) override;
void onNewFilterChainManger(Network::FilterChainManager& filter_chain_manager) override;

// ActiveListenerImplBase
Network::Listener* listener() override { return listener_.get(); }
Expand All @@ -134,15 +138,21 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,
/**
* Create a new connection from a socket accepted by the listener.
*/
void newConnection(Network::ConnectionSocketPtr&& socket);
void newConnection(ActiveTcpSocket& socket);

void
updateFilterChainManager(const Network::FilterChainManagerSharedPtr& new_filter_chain_manager);

ConnectionHandlerImpl& parent_;
Network::ListenerPtr listener_;
const std::chrono::milliseconds listener_filters_timeout_;
const bool continue_on_listener_filters_timeout_;
// Sockets going through listener filter chain
std::list<ActiveTcpSocketPtr> sockets_;
std::list<ActiveTcpConnectionPtr> connections_;

// Connections completed listener filter chain and currently going through network filter chain
absl::flat_hash_map<int64_t, std::list<ActiveTcpConnectionPtr>> tagged_connections_;
// The filter chain manager which should serve the new connections.
Network::FilterChainManagerSharedPtr active_filter_chain_manager_;
// The number of connections currently active on this listener. This is typically used for
// connection balancing across per-handler listeners.
std::atomic<uint64_t> num_listener_connections_{};
Expand Down Expand Up @@ -172,6 +182,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,
ActiveTcpListener& listener_;
Network::ConnectionPtr connection_;
Stats::TimespanPtr conn_length_;
int64_t tag_{0};
};

/**
Expand All @@ -185,7 +196,8 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,
bool hand_off_restored_destination_connections)
: listener_(listener), socket_(std::move(socket)),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
iter_(accept_filters_.end()) {
iter_(accept_filters_.end()),
snapped_filter_chain_manager_(listener.active_filter_chain_manager_) {
listener_.stats_.downstream_pre_cx_active_.inc();
}
~ActiveTcpSocket() override {
Expand Down Expand Up @@ -224,6 +236,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,
const bool hand_off_restored_destination_connections_;
std::list<Network::ListenerFilterPtr> accept_filters_;
std::list<Network::ListenerFilterPtr>::iterator iter_;
Network::FilterChainManagerSharedPtr snapped_filter_chain_manager_;
Event::TimerPtr timer_;
};

Expand Down
Loading