Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: adding an interface to inform filters of local replies #15172

Merged
merged 11 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ enum class FilterMetadataStatus {
Continue,
};

/**
* Return codes for onLocalReply filter invocations.
*/
enum class LocalErrorStatus {
// Continue sending the local reply after onLocalError has been sent to all filters.
Continue,

// Continue sending onLocalReply to all filters, but reset the stream once all filters have been
// informed rather than sending the local reply.
Comment on lines +173 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some way of indicating to filters that the local reply is going to be reset? I can imagine filters wanting to do some mutation of the local reply in some cases, but it doesn't make sense for them to do this if a previous filter has triggered a reset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they can't mutate the local reply here, only from the actual sendlocalReply, but can't hurt to pass on that info

ContinueAndResetStream,
};

/**
* The stream filter callbacks are passed to all filters to use for writing response data and
* interacting with the underlying stream in general.
Expand Down Expand Up @@ -596,6 +608,36 @@ class StreamFilterBase {
* @param action the resulting match action
*/
virtual void onMatchCallback(const Matcher::Action&) {}

struct LocalReplyData {
// The error code which (barring reset) will be sent to the client.
Http::Code code_;
// The details of why a local reply is being sent.
absl::string_view details_;
// True if a reset will occur rather than the local reply (some prior filter
// has returned ContinueAndResetStream)
bool reset_imminent_;
};

/**
* Called after sendLocalReply is called, and before any local reply is
* serialized either to filters, or downstream.
* This will be called on both encoder and decoder filters starting at the
* router filter and working towards the first filter configured.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory someone could be using another terminal filter that isn't the router filter and still make use of local replies right? Maybe make this "at the terminal filter"

*
* Note that in some circumstances, onLocalReply may be called more than once
* for a given stream, because it is possible that a filter call
* sendLocalReply while processing the original local reply response.
*
* Filters implementing onLocalReply are responsible for never calling sendLocalReply
* from onLocalReply, as that has the potential for looping.
*
* @param data data associated with the sendLocalReply call.
* @param LocalErrorStatus the action to take after onLocalError completes.
*/
virtual LocalErrorStatus onLocalReply(const LocalReplyData&) {
return LocalErrorStatus::Continue;
}
};

/**
Expand Down
22 changes: 21 additions & 1 deletion source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -812,10 +812,23 @@ FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter,
return std::next(filter->entry());
}

void FilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) {
state_.under_on_local_reply_ = true;
filter_manager_callbacks_.onLocalReply(data.code_);

for (auto entry : filters_) {
if (entry->onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) {
data.reset_imminent_ = true;
}
}
state_.under_on_local_reply_ = false;
}

void FilterManager::sendLocalReply(
bool old_was_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
ASSERT(!state_.under_on_local_reply_);
const bool is_head_request = state_.is_head_request_;
bool is_grpc_request = old_was_grpc_request;
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unify_grpc_handling")) {
Expand All @@ -824,7 +837,14 @@ void FilterManager::sendLocalReply(

stream_info_.setResponseCodeDetails(details);

filter_manager_callbacks_.onLocalReply(code);
StreamFilterBase::LocalReplyData data{code, details, false};
FilterManager::onLocalReply(data);
if (data.reset_imminent_) {
ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. onLocalReply requested reset.", *this,
details);
Comment on lines +843 to +844
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we can capture which filter is triggering the reset? Or let the filter that triggers a reset include information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we don't really for sendLocalReply logs either, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least with local replies we can infer it from the response code details, but yea I don't think this is necessary, just a nice to have

filter_manager_callbacks_.resetStream();
return;
}

if (!filter_manager_callbacks_.responseHeaders().has_value()) {
// If the response has not started at all, send the response through the filter chain.
Expand Down
15 changes: 14 additions & 1 deletion source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ class FilterManager : public ScopeTrackedObject,
// Http::FilterChainFactoryCallbacks
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override {
addStreamDecoderFilterWorker(filter, nullptr, false);
filters_.push_back(filter.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document the order in which filters are given the callback? Seems like the order is first added to last added, regardless of whether it was a decoder or encoder filter. I think people might intuitively expect this to follow the decoder or encoder filter chains, but currently this can interleave between the two.

I think with the current impl the order doesn't matter at all, but I can imagine iterations of this feature (like my suggestion of capturing information about which filter is doing the reset) where it starts mattering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, added to include.

}
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand All @@ -787,6 +788,7 @@ class FilterManager : public ScopeTrackedObject,
}
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter) override {
addStreamEncoderFilterWorker(filter, nullptr, false);
filters_.push_back(filter.get());
}
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand All @@ -804,6 +806,8 @@ class FilterManager : public ScopeTrackedObject,
void addStreamFilter(StreamFilterSharedPtr filter) override {
addStreamDecoderFilterWorker(filter, nullptr, true);
addStreamEncoderFilterWorker(filter, nullptr, true);
StreamDecoderFilter* decoder_filter = filter.get();
filters_.push_back(decoder_filter);
}
void addStreamFilter(StreamFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand Down Expand Up @@ -921,6 +925,12 @@ class FilterManager : public ScopeTrackedObject,
*/
void maybeEndEncode(bool end_stream);

/**
* Called before local reply is made by the filter manager.
* @param data the data associated with the local reply.
*/
void onLocalReply(StreamFilterBase::LocalReplyData& data);

void sendLocalReply(bool is_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
Expand Down Expand Up @@ -1072,6 +1082,7 @@ class FilterManager : public ScopeTrackedObject,

std::list<ActiveStreamDecoderFilterPtr> decoder_filters_;
std::list<ActiveStreamEncoderFilterPtr> encoder_filters_;
std::list<StreamFilterBase*> filters_;
std::list<AccessLog::InstanceSharedPtr> access_log_handlers_;

// Stores metadata added in the decoding filter that is being processed. Will be cleared before
Expand Down Expand Up @@ -1121,7 +1132,7 @@ class FilterManager : public ScopeTrackedObject,
State()
: remote_complete_(false), local_complete_(false), has_continue_headers_(false),
created_filter_chain_(false), is_head_request_(false), is_grpc_request_(false),
non_100_response_headers_encoded_(false) {}
non_100_response_headers_encoded_(false), under_on_local_reply_(false) {}

uint32_t filter_call_state_{0};

Expand All @@ -1139,6 +1150,8 @@ class FilterManager : public ScopeTrackedObject,
bool is_grpc_request_ : 1;
// Tracks if headers other than 100-Continue have been encoded to the codec.
bool non_100_response_headers_encoded_ : 1;
// True under the stack of onLocalReply, false otherwise.
bool under_on_local_reply_ : 1;

// The following 3 members are booleans rather than part of the space-saving bitfield as they
// are passed as arguments to functions expecting bools. Extend State using the bitfield
Expand Down
4 changes: 2 additions & 2 deletions test/common/http/conn_manager_impl_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ void HttpConnectionManagerImplTest::setupFilterChain(int num_decoder_filters,
// NOTE: The length/repetition in this routine allows InSequence to work correctly in an outer
// scope.
for (int i = 0; i < num_decoder_filters * num_requests; i++) {
decoder_filters_.push_back(new MockStreamDecoderFilter());
decoder_filters_.push_back(new NiceMock<MockStreamDecoderFilter>());
}

for (int i = 0; i < num_encoder_filters * num_requests; i++) {
encoder_filters_.push_back(new MockStreamEncoderFilter());
encoder_filters_.push_back(new NiceMock<MockStreamEncoderFilter>());
}

InSequence s;
Expand Down
105 changes: 104 additions & 1 deletion test/common/http/filter_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "gtest/gtest.h"

using testing::InSequence;
using testing::Return;

namespace Envoy {
Expand All @@ -36,7 +37,7 @@ class FilterManagerTest : public testing::Test {
Event::MockDispatcher dispatcher_;
NiceMock<Network::MockConnection> connection_;
Envoy::Http::MockFilterChainFactory filter_factory_;
LocalReply::MockLocalReply local_reply_;
NiceMock<LocalReply::MockLocalReply> local_reply_;
Protocol protocol_{Protocol::Http2};
NiceMock<MockTimeSystem> time_source_;
StreamInfo::FilterStateSharedPtr filter_state_ =
Expand Down Expand Up @@ -345,6 +346,108 @@ TEST_F(FilterManagerTest, MatchTreeFilterActionDualFilter) {
filter_manager_->decodeHeaders(*grpc_headers, true);
filter_manager_->destroyFilters();
}

TEST_F(FilterManagerTest, OnLocalReply) {
initialize();

std::shared_ptr<MockStreamDecoderFilter> decoder_filter(new NiceMock<MockStreamDecoderFilter>());
std::shared_ptr<MockStreamEncoderFilter> encoder_filter(new NiceMock<MockStreamEncoderFilter>());
std::shared_ptr<MockStreamFilter> stream_filter(new NiceMock<MockStreamFilter>());

RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};

ON_CALL(filter_manager_callbacks_, requestHeaders()).WillByDefault(Return(makeOptRef(*headers)));

EXPECT_CALL(filter_factory_, createFilterChain(_))
.WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamDecoderFilter(decoder_filter);
callbacks.addStreamFilter(stream_filter);
callbacks.addStreamEncoderFilter(encoder_filter);
}));

filter_manager_->createFilterChain();
filter_manager_->requestHeadersInitialized();
filter_manager_->decodeHeaders(*headers, true);

// Make sure all 3 filters get onLocalReply, and that the reset is preserved
// even if not the last return.
EXPECT_CALL(*decoder_filter, onLocalReply(_));
EXPECT_CALL(*stream_filter, onLocalReply(_))
.WillOnce(Return(LocalErrorStatus::ContinueAndResetStream));
EXPECT_CALL(*encoder_filter, onLocalReply(_));
EXPECT_CALL(filter_manager_callbacks_, resetStream());
decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body", nullptr,
absl::nullopt, "details");

// The reason for the response (in this case the reset) will still be tracked
// but as no response is sent the response code will remain absent.
ASSERT_TRUE(filter_manager_->streamInfo().responseCodeDetails().has_value());
EXPECT_EQ(filter_manager_->streamInfo().responseCodeDetails().value(), "details");
EXPECT_FALSE(filter_manager_->streamInfo().responseCode().has_value());

filter_manager_->destroyFilters();
}

TEST_F(FilterManagerTest, MultipleOnLocalReply) {
initialize();

std::shared_ptr<MockStreamDecoderFilter> decoder_filter(new NiceMock<MockStreamDecoderFilter>());
std::shared_ptr<MockStreamEncoderFilter> encoder_filter(new NiceMock<MockStreamEncoderFilter>());
std::shared_ptr<MockStreamFilter> stream_filter(new NiceMock<MockStreamFilter>());

RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};

ON_CALL(filter_manager_callbacks_, requestHeaders()).WillByDefault(Return(makeOptRef(*headers)));

EXPECT_CALL(filter_factory_, createFilterChain(_))
.WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamDecoderFilter(decoder_filter);
callbacks.addStreamFilter(stream_filter);
callbacks.addStreamEncoderFilter(encoder_filter);
}));

filter_manager_->createFilterChain();
filter_manager_->requestHeadersInitialized();
filter_manager_->decodeHeaders(*headers, true);

{
// Set up expectations to be triggered by sendLocalReply at the bottom of
// this block.
InSequence s;

// Make sure all 3 filters get onLocalReply
EXPECT_CALL(*decoder_filter, onLocalReply(_));
EXPECT_CALL(*stream_filter, onLocalReply(_));
EXPECT_CALL(*encoder_filter, onLocalReply(_));

// Now response encoding begins. Assume a filter co-opts the original reply
// with a new local reply.
EXPECT_CALL(*encoder_filter, encodeHeaders(_, _))
.WillOnce(Invoke([&](ResponseHeaderMap&, bool) -> FilterHeadersStatus {
decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body2", nullptr,
absl::nullopt, "details2");
return FilterHeadersStatus::StopIteration;
}));

// All 3 filters should get the second onLocalReply.
EXPECT_CALL(*decoder_filter, onLocalReply(_));
EXPECT_CALL(*stream_filter, onLocalReply(_));
EXPECT_CALL(*encoder_filter, onLocalReply(_));

decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body", nullptr,
absl::nullopt, "details");
}

// The final details should be details2.
ASSERT_TRUE(filter_manager_->streamInfo().responseCodeDetails().has_value());
EXPECT_EQ(filter_manager_->streamInfo().responseCodeDetails().value(), "details2");
EXPECT_FALSE(filter_manager_->streamInfo().responseCode().has_value());

filter_manager_->destroyFilters();
}

} // namespace
} // namespace Http
} // namespace Envoy
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ envoy_cc_test(
"//source/extensions/filters/http/buffer:config",
"//source/extensions/filters/http/health_check:config",
"//test/integration/filters:metadata_stop_all_filter_config_lib",
"//test/integration/filters:on_local_reply_filter_config_lib",
"//test/integration/filters:request_metadata_filter_config_lib",
"//test/integration/filters:response_metadata_filter_config_lib",
"//test/integration/filters:set_response_code_filter_config_proto_cc_proto",
Expand Down
14 changes: 14 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,20 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "on_local_reply_filter_config_lib",
srcs = [
"on_local_reply_filter.cc",
],
deps = [
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)

envoy_cc_test_library(
name = "passthrough_filter_config_lib",
srcs = [
Expand Down
49 changes: 49 additions & 0 deletions test/integration/filters/on_local_reply_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "extensions/filters/http/common/pass_through_filter.h"

#include "test/extensions/filters/http/common/empty_http_filter_config.h"

namespace Envoy {

class OnLocalReplyFilter : public Http::PassThroughFilter {
public:
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override {
if (!request_headers.get(Http::LowerCaseString("reset")).empty()) {
reset_ = true;
}
decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, "body", nullptr, absl::nullopt,
"details");
return Http::FilterHeadersStatus::StopIteration;
}

Http::LocalErrorStatus onLocalReply(const LocalReplyData&) override {
if (reset_) {
return Http::LocalErrorStatus::ContinueAndResetStream;
}
return Http::LocalErrorStatus::Continue;
}

bool reset_{};
};

class OnLocalReplyFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig {
public:
OnLocalReplyFilterConfig() : EmptyHttpFilterConfig("on-local-reply-filter") {}
Http::FilterFactoryCb createFilter(const std::string&,
Server::Configuration::FactoryContext&) override {
return [](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<::Envoy::OnLocalReplyFilter>());
};
}
};

// perform static registration
static Registry::RegisterFactory<OnLocalReplyFilterConfig,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;
} // namespace Envoy
Loading