Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: adding an interface to inform filters of local replies #15172

Merged
merged 11 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ enum class FilterMetadataStatus {
Continue,
};

/**
* Return codes for onLocalReply filter invocations.
*/
enum class LocalErrorStatus {
// Continue sending the local reply after onLocalError has been sent to all filters.
Continue,

// Continue sending onLocalReply to all filters, but reset the stream once all filters have been
// informed rather than sending the local reply.
Comment on lines +173 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some way of indicating to filters that the local reply is going to be reset? I can imagine filters wanting to do some mutation of the local reply in some cases, but it doesn't make sense for them to do this if a previous filter has triggered a reset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they can't mutate the local reply here, only from the actual sendlocalReply, but can't hurt to pass on that info

ContinueAndResetStream,
};

/**
* The stream filter callbacks are passed to all filters to use for writing response data and
* interacting with the underlying stream in general.
Expand Down Expand Up @@ -596,6 +608,27 @@ class StreamFilterBase {
* @param action the resulting match action
*/
virtual void onMatchCallback(const Matcher::Action&) {}

struct LocalReplyData {
Http::Code code_;
absl::string_view details_;
};

/**
* Called after sendLocalReply is called, and before any local reply is
* serialized either to filters, or downstream.
*
* Note that in rare circumstances, onLocalReply may be called more than once
* for a given stream, because it is possible that a filter call
* sendLocalReply while processing the original local reply response.
*
* Filters implementing onLocalReply are responsible for never calling sendLocalReply
* from onLocalReply, as that has the potential for looping.
*
* @param data data associated with the sendLocalReply call.
* @param LocalErrorStatus the action to take after onLocalError completes.
*/
virtual LocalErrorStatus onLocalReply(LocalReplyData&) { return LocalErrorStatus::Continue; }
};

/**
Expand Down
20 changes: 19 additions & 1 deletion source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,18 @@ FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter,
return std::next(filter->entry());
}

LocalErrorStatus FilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) {
filter_manager_callbacks_.onLocalReply(data.code_);

LocalErrorStatus status = LocalErrorStatus::Continue;
for (auto entry : filters_) {
if (entry->onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) {
status = LocalErrorStatus::ContinueAndResetStream;
}
}
return status;
}

void FilterManager::sendLocalReply(
bool old_was_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
Expand All @@ -824,7 +836,13 @@ void FilterManager::sendLocalReply(

stream_info_.setResponseCodeDetails(details);

filter_manager_callbacks_.onLocalReply(code);
StreamFilterBase::LocalReplyData data{code, details};
if (FilterManager::onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) {
ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. onLocalReply requested reset.", *this,
details);
Comment on lines +843 to +844
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we can capture which filter is triggering the reset? Or let the filter that triggers a reset include information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we don't really for sendLocalReply logs either, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least with local replies we can infer it from the response code details, but yea I don't think this is necessary, just a nice to have

filter_manager_callbacks_.resetStream();
return;
}

if (!filter_manager_callbacks_.responseHeaders().has_value()) {
// If the response has not started at all, send the response through the filter chain.
Expand Down
12 changes: 12 additions & 0 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ class FilterManager : public ScopeTrackedObject,
// Http::FilterChainFactoryCallbacks
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override {
addStreamDecoderFilterWorker(filter, nullptr, false);
filters_.push_back(filter.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document the order in which filters are given the callback? Seems like the order is first added to last added, regardless of whether it was a decoder or encoder filter. I think people might intuitively expect this to follow the decoder or encoder filter chains, but currently this can interleave between the two.

I think with the current impl the order doesn't matter at all, but I can imagine iterations of this feature (like my suggestion of capturing information about which filter is doing the reset) where it starts mattering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, added to include.

}
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand All @@ -776,6 +777,7 @@ class FilterManager : public ScopeTrackedObject,
}
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter) override {
addStreamEncoderFilterWorker(filter, nullptr, false);
filters_.push_back(filter.get());
}
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand All @@ -793,6 +795,8 @@ class FilterManager : public ScopeTrackedObject,
void addStreamFilter(StreamFilterSharedPtr filter) override {
addStreamDecoderFilterWorker(filter, nullptr, true);
addStreamEncoderFilterWorker(filter, nullptr, true);
StreamDecoderFilter* decoder_filter = filter.get();
filters_.push_back(decoder_filter);
}
void addStreamFilter(StreamFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand Down Expand Up @@ -910,6 +914,13 @@ class FilterManager : public ScopeTrackedObject,
*/
void maybeEndEncode(bool end_stream);

/**
* Called before local reply is made by the filter manager.
* @param data the data associated with the local reply.
* @param LocalErrorStatus the status from the filter chain.
*/
LocalErrorStatus onLocalReply(StreamFilterBase::LocalReplyData& data);

void sendLocalReply(bool is_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
Expand Down Expand Up @@ -1061,6 +1072,7 @@ class FilterManager : public ScopeTrackedObject,

std::list<ActiveStreamDecoderFilterPtr> decoder_filters_;
std::list<ActiveStreamEncoderFilterPtr> encoder_filters_;
std::list<StreamFilterBase*> filters_;
std::list<AccessLog::InstanceSharedPtr> access_log_handlers_;

// Stores metadata added in the decoding filter that is being processed. Will be cleared before
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ envoy_cc_test(
"//source/extensions/filters/http/buffer:config",
"//source/extensions/filters/http/health_check:config",
"//test/integration/filters:metadata_stop_all_filter_config_lib",
"//test/integration/filters:on_local_reply_filter_config_lib",
"//test/integration/filters:request_metadata_filter_config_lib",
"//test/integration/filters:response_metadata_filter_config_lib",
"//test/integration/filters:set_response_code_filter_config_proto_cc_proto",
Expand Down
14 changes: 14 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,20 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "on_local_reply_filter_config_lib",
srcs = [
"on_local_reply_filter.cc",
],
deps = [
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)

envoy_cc_test_library(
name = "passthrough_filter_config_lib",
srcs = [
Expand Down
49 changes: 49 additions & 0 deletions test/integration/filters/on_local_reply_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "extensions/filters/http/common/pass_through_filter.h"

#include "test/extensions/filters/http/common/empty_http_filter_config.h"

namespace Envoy {

class OnLocalReplyFilter : public Http::PassThroughFilter {
public:
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override {
if (request_headers.get(Http::LowerCaseString("reset")).size() != 0) {
reset_ = true;
}
decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, "body", nullptr, absl::nullopt,
"details");
return Http::FilterHeadersStatus::StopIteration;
}

Http::LocalErrorStatus onLocalReply(LocalReplyData&) override {
if (reset_) {
return Http::LocalErrorStatus::ContinueAndResetStream;
}
return Http::LocalErrorStatus::Continue;
}

bool reset_{};
};

class OnLocalReplyFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig {
public:
OnLocalReplyFilterConfig() : EmptyHttpFilterConfig("on-local-reply-filter") {}
Http::FilterFactoryCb createFilter(const std::string&,
Server::Configuration::FactoryContext&) override {
return [](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<::Envoy::OnLocalReplyFilter>());
};
}
};

// perform static registration
static Registry::RegisterFactory<OnLocalReplyFilterConfig,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;
} // namespace Envoy
25 changes: 25 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,7 @@ TEST_P(Http2MetadataIntegrationTest, UpstreamMetadataAfterEndStream) {
upstream_request_->encodeHeaders(response_headers, true);

// Upstream sends metadata.

const Http::MetadataMap response_metadata_map = {{"resp_key1", "resp_value1"}};
Http::MetadataMapPtr metadata_map_ptr =
std::make_unique<Http::MetadataMap>(response_metadata_map);
Expand All @@ -1603,4 +1604,28 @@ TEST_P(Http2MetadataIntegrationTest, UpstreamMetadataAfterEndStream) {
EXPECT_EQ("200", response->headers().getStatusValue());
}

static std::string on_local_reply_filter = R"EOF(
name: on-local-reply-filter
typed_config:
"@type": type.googleapis.com/google.protobuf.Empty
)EOF";

TEST_P(Http2IntegrationTest, OnLocalReply) {
config_helper_.addFilter(on_local_reply_filter);
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
{
auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
response->waitForEndStream();
ASSERT_TRUE(response->complete());
}
{
default_request_headers_.addCopy("reset", "yes");
auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
response->waitForReset();
ASSERT_FALSE(response->complete());
}
}

} // namespace Envoy