Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http conn man: stopped filters can add data from encodeTrailers #8404

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* 4) If additional data needs to be added in the decodeTrailers() callback, this method can be
* called in the context of the callback. All further filters will receive decodeData(..., false)
* followed by decodeTrailers().
* followed by decodeTrailers(). However if the iteration is stopped, the added data will
* buffered, so that the further filters will not receive decodeData() before decodeHeaders().
*
* It is an error to call this method in any other case.
*
Expand Down Expand Up @@ -562,7 +563,8 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* 4) If additional data needs to be added in the encodeTrailers() callback, this method can be
* called in the context of the callback. All further filters will receive encodeData(..., false)
* followed by encodeTrailers().
* followed by encodeTrailers(). However if the iteration is stopped, the added data will
* buffered, so that the further filters will not receive encodeData() before encodeHeaders().
*
* It is an error to call this method in any other case.
*
Expand Down
6 changes: 4 additions & 2 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,8 @@ void ConnectionManagerImpl::ActiveStream::addDecodedData(ActiveStreamDecoderFilt
Buffer::Instance& data, bool streaming) {
if (state_.filter_call_state_ == 0 ||
(state_.filter_call_state_ & FilterCallState::DecodeHeaders) ||
(state_.filter_call_state_ & FilterCallState::DecodeData)) {
(state_.filter_call_state_ & FilterCallState::DecodeData) ||
((state_.filter_call_state_ & FilterCallState::DecodeTrailers) && !filter.canIterate())) {
// Make sure if this triggers watermarks, the correct action is taken.
state_.decoder_filters_streaming_ = streaming;
// If no call is happening or we are in the decode headers/data callback, buffer the data.
Expand Down Expand Up @@ -1555,7 +1556,8 @@ void ConnectionManagerImpl::ActiveStream::addEncodedData(ActiveStreamEncoderFilt
Buffer::Instance& data, bool streaming) {
if (state_.filter_call_state_ == 0 ||
(state_.filter_call_state_ & FilterCallState::EncodeHeaders) ||
(state_.filter_call_state_ & FilterCallState::EncodeData)) {
(state_.filter_call_state_ & FilterCallState::EncodeData) ||
((state_.filter_call_state_ & FilterCallState::EncodeTrailers) && !filter.canIterate())) {
// Make sure if this triggers watermarks, the correct action is taken.
state_.encoder_filters_streaming_ = streaming;
// If no call is happening or we are in the decode headers/data callback, buffer the data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,6 @@ bool JsonTranscoderFilter::maybeConvertGrpcStatus(Grpc::Status::GrpcStatus grpc_
return false;
}

// We do not support responses with a separate trailer frame.
// TODO(ascheglov): remove this if after HCM can buffer data added from |encodeTrailers|.
if (response_headers_ != &trailers) {
return false;
}

// Send a serialized status only if there was no body.
if (has_body_) {
return false;
Expand Down
59 changes: 59 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,65 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback) {
HeaderMapPtr{new TestHeaderMapImpl{{"some", "trailer"}}});
}

// Don't send data frames, only headers and trailers.
TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback_NoDataFrames) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}};
decoder->decodeTrailers(std::move(trailers));
}));

setupFilterChain(2, 1);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl trailers_data("hello");
EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
decoder_filters_[0]->callbacks_->addDecodedData(trailers_data, false);
return FilterTrailersStatus::Continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a similar parallel test in which the filter decode/encodeTrailers() callbacks continue to stop iteration and then do a continuation later outside of callback context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
decoder_filters_[0]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);

EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
encoder_filters_[0]->callbacks_->addEncodedData(trailers_data, false);
return FilterTrailersStatus::Continue;
}));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
EXPECT_CALL(response_encoder_, encodeTrailers(_));
expectOnDestroy();

decoder_filters_[0]->callbacks_->encodeTrailers(
HeaderMapPtr{new TestHeaderMapImpl{{"some", "trailer"}}});
}

// Add*Data during the *Data callbacks.
TEST_F(HttpConnectionManagerImplTest, FilterAddBodyDuringDecodeData) {
InSequence s;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class GrpcJsonTranscoderIntegrationTest
const std::vector<std::string>& grpc_request_messages,
const std::vector<std::string>& grpc_response_messages,
const Status& grpc_status, Http::HeaderMap&& response_headers,
const std::string& response_body, bool full_response = true) {
const std::string& response_body, bool full_response = true,
bool always_send_trailers = false) {
codec_client_ = makeHttpConnection(lookupPort("http"));

IntegrationStreamDecoderPtr response;
Expand Down Expand Up @@ -98,7 +99,7 @@ class GrpcJsonTranscoderIntegrationTest
Http::TestHeaderMapImpl response_headers;
response_headers.insertStatus().value(200);
response_headers.insertContentType().value(std::string("application/grpc"));
if (grpc_response_messages.empty()) {
if (grpc_response_messages.empty() && !always_send_trailers) {
response_headers.insertGrpcStatus().value(static_cast<uint64_t>(grpc_status.error_code()));
response_headers.insertGrpcMessage().value(absl::string_view(
grpc_status.error_message().data(), grpc_status.error_message().size()));
Expand Down Expand Up @@ -375,6 +376,30 @@ TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryErrorConvertedToJson) {
R"({"code":5,"message":"Shelf 100 Not Found"})");
}

// Upstream sends headers (e.g. sends metadata), and then sends trailer with an error.
TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryErrorInTrailerConvertedToJson) {
const std::string filter =
R"EOF(
name: envoy.grpc_json_transcoder
config:
proto_descriptor: "{}"
services: "bookstore.Bookstore"
convert_grpc_status: true
)EOF";
config_helper_.addFilter(
fmt::format(filter, TestEnvironment::runfilesPath("/test/proto/bookstore.descriptor")));
HttpIntegrationTest::initialize();
testTranscoding<bookstore::GetShelfRequest, bookstore::Shelf>(
Http::TestHeaderMapImpl{
{":method", "GET"}, {":path", "/shelves/100"}, {":authority", "host"}},
"", {"shelf: 100"}, {}, Status(Code::NOT_FOUND, "Shelf 100 Not Found"),
Http::TestHeaderMapImpl{{":status", "404"},
{"content-type", "application/json"},
{"grpc-status", UnexpectedHeaderValue},
{"grpc-message", UnexpectedHeaderValue}},
R"({"code":5,"message":"Shelf 100 Not Found"})", true, true);
}

TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryDelete) {
HttpIntegrationTest::initialize();
testTranscoding<bookstore::DeleteBookRequest, Empty>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,60 @@ TEST_F(GrpcJsonTranscoderFilterConvertGrpcStatusTest,
EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.encodeHeaders(response_headers, true));
}

// Response with a header frame and a trailer frame.
// (E.g. a gRPC server sends metadata and then it sends an error.)
TEST_F(GrpcJsonTranscoderFilterConvertGrpcStatusTest, TranscodingStatusFromTrailer) {
Http::TestHeaderMapImpl response_headers{{"content-type", "application/grpc"},
{":status", "200"}};

EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
filter_.encodeHeaders(response_headers, false));
EXPECT_EQ("application/json", response_headers.get_("content-type"));

std::string expected_response(R"({"code":5,"message":"Resource not found"})");
EXPECT_CALL(encoder_callbacks_, addEncodedData(_, false))
.WillOnce(Invoke([&expected_response](Buffer::Instance& data, bool) {
EXPECT_EQ(expected_response, data.toString());
}));

Http::TestHeaderMapImpl response_trailers{
{"grpc-status", "5"},
{"grpc-message", "unused"},
{"grpc-status-details-bin", "CAUSElJlc291cmNlIG5vdCBmb3VuZA"}};
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.encodeTrailers(response_trailers));
EXPECT_EQ("404", response_headers.get_(":status"));
EXPECT_EQ("application/json", response_headers.get_("content-type"));
EXPECT_FALSE(response_headers.has("grpc-status"));
EXPECT_FALSE(response_headers.has("grpc-message"));
EXPECT_FALSE(response_headers.has("grpc-status-details-bin"));
}

// Server sends a response body, don't replace it.
TEST_F(GrpcJsonTranscoderFilterConvertGrpcStatusTest, SkipTranscodingStatusIfBodyIsPresent) {
Http::TestHeaderMapImpl response_headers{{"content-type", "application/grpc"},
{":status", "200"}};

EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
filter_.encodeHeaders(response_headers, false));
EXPECT_EQ("application/json", response_headers.get_("content-type"));

bookstore::Shelf response;
response.set_id(20);
response.set_theme("Children");

auto response_data = Grpc::Common::serializeToGrpcFrame(response);
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer,
filter_.encodeData(*response_data, false));

std::string response_json = response_data->toString();
EXPECT_EQ(R"({"id":"20","theme":"Children"})", response_json);

EXPECT_CALL(encoder_callbacks_, addEncodedData(_, _)).Times(0);

Http::TestHeaderMapImpl response_trailers{{"grpc-status", "2"}, {"grpc-message", "not good"}};
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.decodeTrailers(response_trailers));
}

struct GrpcJsonTranscoderFilterPrintTestParam {
std::string config_json_;
std::string expected_response_;
Expand Down