Skip to content

Commit

Permalink
config: switch from std::set to absl::flat_hash_set for resource name…
Browse files Browse the repository at this point in the history
…s. (#14739)

This was a cleanup deferred from the review of #14311. The idea is to switch to the more efficient
unordered absl::flat_hash_set across the resource subscription code base. Internally, we still use
std::set (and even explicitly sort in the http_subscription_impl) to avoid changing any wire
ordering. It seems desirable to preserve this for two reasons: (1) this derisks this PR as an
internal-only change and (2) having deterministic wire ordering makes debug of xDS issues somewhat
easier.

Risk level: Low
Testing: Updated tests.

Signed-off-by: Harvey Tuch <htuch@google.com>
  • Loading branch information
htuch authored Jan 19, 2021
1 parent da1b0e4 commit 7d48928
Show file tree
Hide file tree
Showing 28 changed files with 183 additions and 116 deletions.
6 changes: 3 additions & 3 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GrpcMuxWatch {
* Updates the set of resources that the watch is interested in.
* @param resources set of resource names to watch for
*/
virtual void update(const std::set<std::string>& resources) PURE;
virtual void update(const absl::flat_hash_set<std::string>& resources) PURE;
};

using GrpcMuxWatchPtr = std::unique_ptr<GrpcMuxWatch>;
Expand Down Expand Up @@ -100,13 +100,13 @@ class GrpcMux {
* away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr.
*/
virtual GrpcMuxWatchPtr addWatch(const std::string& type_url,
const std::set<std::string>& resources,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) PURE;

virtual void requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) PURE;
const absl::flat_hash_set<std::string>& for_update) PURE;

using TypeUrlMap = absl::flat_hash_map<std::string, std::string>;
static TypeUrlMap& typeUrlMap() { MUTABLE_CONSTRUCT_ON_FIRST_USE(TypeUrlMap, {}); }
Expand Down
10 changes: 5 additions & 5 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,20 @@ class Subscription {
* to fetch throughout the lifetime of the Subscription object.
* @param resources set of resource names to fetch.
*/
virtual void start(const std::set<std::string>& resource_names) PURE;
virtual void start(const absl::flat_hash_set<std::string>& resource_names) PURE;

/**
* Update the resources to fetch.
* @param resources vector of resource names to fetch. It's a (not unordered_)set so that it can
* be passed to std::set_difference, which must be given sorted collections.
* @param resources vector of resource names to fetch.
*/
virtual void updateResourceInterest(const std::set<std::string>& update_to_these_names) PURE;
virtual void
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) PURE;

/**
* Creates a discovery request for resources.
* @param add_these_names resource ids for inclusion in the discovery request.
*/
virtual void requestOnDemandUpdate(const std::set<std::string>& add_these_names) PURE;
virtual void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) PURE;
};

using SubscriptionPtr = std::unique_ptr<Subscription>;
Expand Down
13 changes: 13 additions & 0 deletions source/common/common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -752,4 +752,17 @@ class InlineString : public InlineStorage {
char data_[];
};

class SetUtil {
public:
// Use instead of std::set_difference for unordered absl::flat_hash_set containers.
template <typename T>
static void setDifference(const absl::flat_hash_set<T>& original_set,
const absl::flat_hash_set<T>& remove_set,
absl::flat_hash_set<T>& result_set) {
std::copy_if(original_set.begin(), original_set.end(),
std::inserter(result_set, result_set.begin()),
[&remove_set](const T& v) -> bool { return remove_set.count(v) == 0; });
}
};

} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ envoy_cc_library(
"//source/common/common:assert_lib",
"//source/common/common:cleanup_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:utility_lib",
"//source/common/protobuf",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info),
dispatcher_(dispatcher) {}

void DeltaSubscriptionState::updateSubscriptionInterest(const std::set<std::string>& cur_added,
const std::set<std::string>& cur_removed) {
void DeltaSubscriptionState::updateSubscriptionInterest(
const absl::flat_hash_set<std::string>& cur_added,
const absl::flat_hash_set<std::string>& cur_removed) {
for (const auto& a : cur_added) {
setResourceWaitingForServer(a);
// If interest in a resource is removed-then-added (all before a discovery request
Expand Down
10 changes: 5 additions & 5 deletions source/common/config/delta_subscription_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher);

// Update which resources we're interested in subscribing to.
void updateSubscriptionInterest(const std::set<std::string>& cur_added,
const std::set<std::string>& cur_removed);
void addAliasesToResolve(const std::set<std::string>& aliases);
void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added,
const absl::flat_hash_set<std::string>& cur_removed);
void addAliasesToResolve(const absl::flat_hash_set<std::string>& aliases);

// Whether there was a change in our subscription interest we have yet to inform the server of.
bool subscriptionUpdatePending() const;
Expand Down Expand Up @@ -98,8 +98,8 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
const bool supports_heartbeats_;
TtlManager ttl_;
// The keys of resource_versions_. Only tracked separately because std::map does not provide an
// iterator into just its keys, e.g. for use in std::set_difference.
std::set<std::string> resource_names_;
// iterator into just its keys.
absl::flat_hash_set<std::string> resource_names_;

const std::string type_url_;
UntypedConfigUpdateCallbacks& watch_map_;
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/filesystem_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
}

// Config::Subscription
void FilesystemSubscriptionImpl::start(const std::set<std::string>&) {
void FilesystemSubscriptionImpl::start(const absl::flat_hash_set<std::string>&) {
started_ = true;
// Attempt to read in case there is a file there already.
refresh();
}

void FilesystemSubscriptionImpl::updateResourceInterest(const std::set<std::string>&) {
void FilesystemSubscriptionImpl::updateResourceInterest(const absl::flat_hash_set<std::string>&) {
// Bump stats for consistent behavior with other xDS.
stats_.update_attempt_.inc();
}
Expand Down
6 changes: 3 additions & 3 deletions source/common/config/filesystem_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ class FilesystemSubscriptionImpl : public Config::Subscription,
// Config::Subscription
// We report all discovered resources in the watched file, so the resource names arguments are
// unused, and updateResourceInterest is a no-op (other than updating a stat).
void start(const std::set<std::string>&) override;
void updateResourceInterest(const std::set<std::string>&) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
void start(const absl::flat_hash_set<std::string>&) override;
void updateResourceInterest(const absl::flat_hash_set<std::string>&) override;
void requestOnDemandUpdate(const absl::flat_hash_set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
}

GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, const bool) {
auto watch =
Expand Down
28 changes: 16 additions & 12 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ class GrpcMuxImpl : public GrpcMux,
ScopedResume pause(const std::string& type_url) override;
ScopedResume pause(const std::vector<std::string> type_urls) override;

GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set<std::string>& resources,
GrpcMuxWatchPtr addWatch(const std::string& type_url,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching = false) override;

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Expand Down Expand Up @@ -80,11 +81,12 @@ class GrpcMuxImpl : public GrpcMux,
void sendDiscoveryRequest(const std::string& type_url);

struct GrpcMuxWatchImpl : public GrpcMuxWatch {
GrpcMuxWatchImpl(const std::set<std::string>& resources, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, const std::string& type_url,
GrpcMuxImpl& parent)
: resources_(resources), callbacks_(callbacks), resource_decoder_(resource_decoder),
type_url_(type_url), parent_(parent), watches_(parent.apiStateFor(type_url).watches_) {
GrpcMuxWatchImpl(const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder,
const std::string& type_url, GrpcMuxImpl& parent)
: callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url),
parent_(parent), watches_(parent.apiStateFor(type_url).watches_) {
std::copy(resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()));
watches_.emplace(watches_.begin(), this);
}

Expand All @@ -95,17 +97,19 @@ class GrpcMuxImpl : public GrpcMux,
}
}

void update(const std::set<std::string>& resources) override {
void update(const absl::flat_hash_set<std::string>& resources) override {
watches_.remove(this);
if (!resources_.empty()) {
parent_.queueDiscoveryRequest(type_url_);
}
resources_ = resources;
resources_.clear();
std::copy(resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()));
// move this watch to the beginning of the list
watches_.emplace(watches_.begin(), this);
parent_.queueDiscoveryRequest(type_url_);
}

// Maintain deterministic wire ordering via ordered std::set.
std::set<std::string> resources_;
SubscriptionCallbacks& callbacks_;
OpaqueResourceDecoder& resource_decoder_;
Expand Down Expand Up @@ -184,12 +188,12 @@ class NullGrpcMuxImpl : public GrpcMux,
return std::make_unique<Cleanup>([] {});
}

GrpcMuxWatchPtr addWatch(const std::string&, const std::set<std::string>&, SubscriptionCallbacks&,
OpaqueResourceDecoder&, const bool) override {
GrpcMuxWatchPtr addWatch(const std::string&, const absl::flat_hash_set<std::string>&,
SubscriptionCallbacks&, OpaqueResourceDecoder&, const bool) override {
ExceptionUtil::throwEnvoyException("ADS must be configured to support an ADS config source");
}

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Expand Down
9 changes: 5 additions & 4 deletions source/common/config/grpc_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ GrpcSubscriptionImpl::GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux,
use_namespace_matching_(use_namespace_matching) {}

// Config::Subscription
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
void GrpcSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resources) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
Expand All @@ -50,12 +50,13 @@ void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
}

void GrpcSubscriptionImpl::updateResourceInterest(
const std::set<std::string>& update_to_these_names) {
const absl::flat_hash_set<std::string>& update_to_these_names) {
watch_->update(update_to_these_names);
stats_.update_attempt_.inc();
}

void GrpcSubscriptionImpl::requestOnDemandUpdate(const std::set<std::string>& for_update) {
void GrpcSubscriptionImpl::requestOnDemandUpdate(
const absl::flat_hash_set<std::string>& for_update) {
grpc_mux_->requestOnDemandUpdate(type_url_, for_update);
stats_.update_attempt_.inc();
}
Expand Down Expand Up @@ -137,7 +138,7 @@ GrpcCollectionSubscriptionImpl::GrpcCollectionSubscriptionImpl(
init_fetch_timeout, is_aggregated, false),
collection_locator_(collection_locator) {}

void GrpcCollectionSubscriptionImpl::start(const std::set<std::string>& resource_names) {
void GrpcCollectionSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
ASSERT(resource_names.empty());
GrpcSubscriptionImpl::start({XdsResourceIdentifier::encodeUrl(collection_locator_)});
}
Expand Down
9 changes: 5 additions & 4 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class GrpcSubscriptionImpl : public Subscription,
bool use_namespace_matching);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const std::set<std::string>& add_these_names) override;
void start(const absl::flat_hash_set<std::string>& resource_names) override;
void
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) override;
// Config::SubscriptionCallbacks (all pass through to callbacks_!)
void onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) override;
Expand Down Expand Up @@ -71,7 +72,7 @@ class GrpcCollectionSubscriptionImpl : public GrpcSubscriptionImpl {
Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated);

void start(const std::set<std::string>& resource_names) override;
void start(const absl::flat_hash_set<std::string>& resource_names) override;

private:
xds::core::v3::ResourceLocator collection_locator_;
Expand Down
8 changes: 6 additions & 2 deletions source/common/config/http_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl(
}

// Config::Subscription
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {
void HttpSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
Expand All @@ -53,14 +53,18 @@ void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {

Protobuf::RepeatedPtrField<std::string> resources_vector(resource_names.begin(),
resource_names.end());
// Sort to provide stable wire ordering.
std::sort(resources_vector.begin(), resources_vector.end());
request_.mutable_resource_names()->Swap(&resources_vector);
initialize();
}

void HttpSubscriptionImpl::updateResourceInterest(
const std::set<std::string>& update_to_these_names) {
const absl::flat_hash_set<std::string>& update_to_these_names) {
Protobuf::RepeatedPtrField<std::string> resources_vector(update_to_these_names.begin(),
update_to_these_names.end());
// Sort to provide stable wire ordering.
std::sort(resources_vector.begin(), resources_vector.end());
request_.mutable_resource_names()->Swap(&resources_vector);
}

Expand Down
7 changes: 4 additions & 3 deletions source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
ProtobufMessage::ValidationVisitor& validation_visitor);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
void start(const absl::flat_hash_set<std::string>& resource_names) override;
void
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const absl::flat_hash_set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Expand Down
8 changes: 4 additions & 4 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void NewGrpcMuxImpl::kickOffAck(UpdateAck ack) {
void NewGrpcMuxImpl::start() { grpc_stream_.establishNewStream(); }

GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) {
Expand All @@ -154,14 +154,14 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
const absl::flat_hash_set<std::string>& resources,
const bool creating_namespace_watch) {
ASSERT(watch != nullptr);
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
// If this is a glob collection subscription, we need to compute actual context parameters.
std::set<std::string> xdstp_resources;
absl::flat_hash_set<std::string> xdstp_resources;
// TODO(htuch): add support for resources beyond glob collections, the constraints below around
// resource size and ID reflect the progress of the xdstp:// implementation.
if (!resources.empty() && XdsResourceIdentifier::hasXdsTpScheme(*resources.begin())) {
Expand Down Expand Up @@ -198,7 +198,7 @@ void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
}

void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) {
const absl::flat_hash_set<std::string>& for_update) {
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
Expand Down
Loading

0 comments on commit 7d48928

Please sign in to comment.