-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http: adding an interface to inform filters of local replies #15172
Changes from 10 commits
ecbba2f
d059f32
0b11bfe
6084a39
b374b19
bd74e50
c077e50
81f63f5
eee1580
9209fa7
3f0e7dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,6 +163,18 @@ enum class FilterMetadataStatus { | |
Continue, | ||
}; | ||
|
||
/** | ||
* Return codes for onLocalReply filter invocations. | ||
*/ | ||
enum class LocalErrorStatus { | ||
// Continue sending the local reply after onLocalError has been sent to all filters. | ||
Continue, | ||
|
||
// Continue sending onLocalReply to all filters, but reset the stream once all filters have been | ||
// informed rather than sending the local reply. | ||
ContinueAndResetStream, | ||
}; | ||
|
||
/** | ||
* The stream filter callbacks are passed to all filters to use for writing response data and | ||
* interacting with the underlying stream in general. | ||
|
@@ -596,6 +608,36 @@ class StreamFilterBase { | |
* @param action the resulting match action | ||
*/ | ||
virtual void onMatchCallback(const Matcher::Action&) {} | ||
|
||
struct LocalReplyData { | ||
// The error code which (barring reset) will be sent to the client. | ||
Http::Code code_; | ||
// The details of why a local reply is being sent. | ||
absl::string_view details_; | ||
// True if a reset will occur rather than the local reply (some prior filter | ||
// has returned ContinueAndResetStream) | ||
bool reset_imminent_; | ||
}; | ||
|
||
/** | ||
* Called after sendLocalReply is called, and before any local reply is | ||
* serialized either to filters, or downstream. | ||
* This will be called on both encoder and decoder filters starting at the | ||
* router filter and working towards the first filter configured. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In theory someone could be using another terminal filter that isn't the router filter and still make use of local replies right? Maybe make this "at the terminal filter" |
||
* | ||
* Note that in some circumstances, onLocalReply may be called more than once | ||
* for a given stream, because it is possible that a filter call | ||
* sendLocalReply while processing the original local reply response. | ||
* | ||
* Filters implementing onLocalReply are responsible for never calling sendLocalReply | ||
* from onLocalReply, as that has the potential for looping. | ||
alyssawilk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* | ||
* @param data data associated with the sendLocalReply call. | ||
* @param LocalErrorStatus the action to take after onLocalError completes. | ||
*/ | ||
virtual LocalErrorStatus onLocalReply(const LocalReplyData&) { | ||
return LocalErrorStatus::Continue; | ||
} | ||
}; | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -812,10 +812,23 @@ FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter, | |
return std::next(filter->entry()); | ||
} | ||
|
||
void FilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) { | ||
state_.under_on_local_reply_ = true; | ||
filter_manager_callbacks_.onLocalReply(data.code_); | ||
|
||
for (auto entry : filters_) { | ||
if (entry->onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) { | ||
data.reset_imminent_ = true; | ||
} | ||
} | ||
state_.under_on_local_reply_ = false; | ||
} | ||
|
||
void FilterManager::sendLocalReply( | ||
bool old_was_grpc_request, Code code, absl::string_view body, | ||
const std::function<void(ResponseHeaderMap& headers)>& modify_headers, | ||
const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) { | ||
ASSERT(!state_.under_on_local_reply_); | ||
const bool is_head_request = state_.is_head_request_; | ||
bool is_grpc_request = old_was_grpc_request; | ||
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unify_grpc_handling")) { | ||
|
@@ -824,7 +837,14 @@ void FilterManager::sendLocalReply( | |
|
||
stream_info_.setResponseCodeDetails(details); | ||
|
||
filter_manager_callbacks_.onLocalReply(code); | ||
StreamFilterBase::LocalReplyData data{code, details, false}; | ||
FilterManager::onLocalReply(data); | ||
if (data.reset_imminent_) { | ||
ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. onLocalReply requested reset.", *this, | ||
details); | ||
Comment on lines
+843
to
+844
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any way we can capture which filter is triggering the reset? Or let the filter that triggers a reset include information? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean we don't really for sendLocalReply logs either, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At least with local replies we can infer it from the response code details, but yea I don't think this is necessary, just a nice to have |
||
filter_manager_callbacks_.resetStream(); | ||
return; | ||
} | ||
|
||
if (!filter_manager_callbacks_.responseHeaders().has_value()) { | ||
// If the response has not started at all, send the response through the filter chain. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -771,6 +771,7 @@ class FilterManager : public ScopeTrackedObject, | |
// Http::FilterChainFactoryCallbacks | ||
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override { | ||
addStreamDecoderFilterWorker(filter, nullptr, false); | ||
filters_.push_back(filter.get()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we document the order in which filters are given the callback? Seems like the order is first added to last added, regardless of whether it was a decoder or encoder filter. I think people might intuitively expect this to follow the decoder or encoder filter chains, but currently this can interleave between the two. I think with the current impl the order doesn't matter at all, but I can imagine iterations of this feature (like my suggestion of capturing information about which filter is doing the reset) where it starts mattering. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, added to include. |
||
} | ||
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter, | ||
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override { | ||
|
@@ -787,6 +788,7 @@ class FilterManager : public ScopeTrackedObject, | |
} | ||
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter) override { | ||
addStreamEncoderFilterWorker(filter, nullptr, false); | ||
filters_.push_back(filter.get()); | ||
} | ||
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter, | ||
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override { | ||
|
@@ -804,6 +806,8 @@ class FilterManager : public ScopeTrackedObject, | |
void addStreamFilter(StreamFilterSharedPtr filter) override { | ||
addStreamDecoderFilterWorker(filter, nullptr, true); | ||
addStreamEncoderFilterWorker(filter, nullptr, true); | ||
StreamDecoderFilter* decoder_filter = filter.get(); | ||
filters_.push_back(decoder_filter); | ||
} | ||
void addStreamFilter(StreamFilterSharedPtr filter, | ||
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override { | ||
|
@@ -921,6 +925,12 @@ class FilterManager : public ScopeTrackedObject, | |
*/ | ||
void maybeEndEncode(bool end_stream); | ||
|
||
/** | ||
* Called before local reply is made by the filter manager. | ||
* @param data the data associated with the local reply. | ||
*/ | ||
void onLocalReply(StreamFilterBase::LocalReplyData& data); | ||
|
||
void sendLocalReply(bool is_grpc_request, Code code, absl::string_view body, | ||
const std::function<void(ResponseHeaderMap& headers)>& modify_headers, | ||
const absl::optional<Grpc::Status::GrpcStatus> grpc_status, | ||
|
@@ -1072,6 +1082,7 @@ class FilterManager : public ScopeTrackedObject, | |
|
||
std::list<ActiveStreamDecoderFilterPtr> decoder_filters_; | ||
std::list<ActiveStreamEncoderFilterPtr> encoder_filters_; | ||
std::list<StreamFilterBase*> filters_; | ||
std::list<AccessLog::InstanceSharedPtr> access_log_handlers_; | ||
|
||
// Stores metadata added in the decoding filter that is being processed. Will be cleared before | ||
|
@@ -1121,7 +1132,7 @@ class FilterManager : public ScopeTrackedObject, | |
State() | ||
: remote_complete_(false), local_complete_(false), has_continue_headers_(false), | ||
created_filter_chain_(false), is_head_request_(false), is_grpc_request_(false), | ||
non_100_response_headers_encoded_(false) {} | ||
non_100_response_headers_encoded_(false), under_on_local_reply_(false) {} | ||
|
||
uint32_t filter_call_state_{0}; | ||
|
||
|
@@ -1139,6 +1150,8 @@ class FilterManager : public ScopeTrackedObject, | |
bool is_grpc_request_ : 1; | ||
// Tracks if headers other than 100-Continue have been encoded to the codec. | ||
bool non_100_response_headers_encoded_ : 1; | ||
// True under the stack of onLocalReply, false otherwise. | ||
bool under_on_local_reply_ : 1; | ||
|
||
// The following 3 members are booleans rather than part of the space-saving bitfield as they | ||
// are passed as arguments to functions expecting bools. Extend State using the bitfield | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
#include <string> | ||
|
||
#include "envoy/http/filter.h" | ||
#include "envoy/registry/registry.h" | ||
#include "envoy/server/filter_config.h" | ||
|
||
#include "extensions/filters/http/common/pass_through_filter.h" | ||
|
||
#include "test/extensions/filters/http/common/empty_http_filter_config.h" | ||
|
||
namespace Envoy { | ||
|
||
class OnLocalReplyFilter : public Http::PassThroughFilter { | ||
public: | ||
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override { | ||
if (!request_headers.get(Http::LowerCaseString("reset")).empty()) { | ||
reset_ = true; | ||
} | ||
decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, "body", nullptr, absl::nullopt, | ||
"details"); | ||
return Http::FilterHeadersStatus::StopIteration; | ||
} | ||
|
||
Http::LocalErrorStatus onLocalReply(const LocalReplyData&) override { | ||
if (reset_) { | ||
return Http::LocalErrorStatus::ContinueAndResetStream; | ||
} | ||
return Http::LocalErrorStatus::Continue; | ||
} | ||
|
||
bool reset_{}; | ||
}; | ||
|
||
class OnLocalReplyFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig { | ||
public: | ||
OnLocalReplyFilterConfig() : EmptyHttpFilterConfig("on-local-reply-filter") {} | ||
Http::FilterFactoryCb createFilter(const std::string&, | ||
Server::Configuration::FactoryContext&) override { | ||
return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { | ||
callbacks.addStreamFilter(std::make_shared<::Envoy::OnLocalReplyFilter>()); | ||
}; | ||
} | ||
}; | ||
|
||
// perform static registration | ||
static Registry::RegisterFactory<OnLocalReplyFilterConfig, | ||
Server::Configuration::NamedHttpFilterConfigFactory> | ||
register_; | ||
} // namespace Envoy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have some way of indicating to filters that the local reply is going to be reset? I can imagine filters wanting to do some mutation of the local reply in some cases, but it doesn't make sense for them to do this if a previous filter has triggered a reset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they can't mutate the local reply here, only from the actual sendlocalReply, but can't hurt to pass on that info