Skip to content

Commit

Permalink
http conn man: stopped filters can add data from encodeTrailers (#8404)
Browse files Browse the repository at this point in the history
Now filters can return StopIteration from encodeHeaders,
and then call addEncodedData from encodeTrailers.

This allows the JsonTranscoderFilter to properly transcode gRPC status
in trailer headers into a JSON reply body.

Risk Level: Low
Testing: unit-tests, integration tests in gRPC JSON transcoder filter.
Documentation: n/a
Release notes: n/a
Fixes #8402

Signed-off-by: Anatoly Scheglov <ascheglov@yandex-team.ru>
  • Loading branch information
ascheglov authored and mattklein123 committed Oct 1, 2019
1 parent a662ad5 commit 429d6c4
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 12 deletions.
6 changes: 4 additions & 2 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* 4) If additional data needs to be added in the decodeTrailers() callback, this method can be
* called in the context of the callback. All further filters will receive decodeData(..., false)
* followed by decodeTrailers().
* followed by decodeTrailers(). However if the iteration is stopped, the added data will
* buffered, so that the further filters will not receive decodeData() before decodeHeaders().
*
* It is an error to call this method in any other case.
*
Expand Down Expand Up @@ -562,7 +563,8 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* 4) If additional data needs to be added in the encodeTrailers() callback, this method can be
* called in the context of the callback. All further filters will receive encodeData(..., false)
* followed by encodeTrailers().
* followed by encodeTrailers(). However if the iteration is stopped, the added data will
* buffered, so that the further filters will not receive encodeData() before encodeHeaders().
*
* It is an error to call this method in any other case.
*
Expand Down
6 changes: 4 additions & 2 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,8 @@ void ConnectionManagerImpl::ActiveStream::addDecodedData(ActiveStreamDecoderFilt
Buffer::Instance& data, bool streaming) {
if (state_.filter_call_state_ == 0 ||
(state_.filter_call_state_ & FilterCallState::DecodeHeaders) ||
(state_.filter_call_state_ & FilterCallState::DecodeData)) {
(state_.filter_call_state_ & FilterCallState::DecodeData) ||
((state_.filter_call_state_ & FilterCallState::DecodeTrailers) && !filter.canIterate())) {
// Make sure if this triggers watermarks, the correct action is taken.
state_.decoder_filters_streaming_ = streaming;
// If no call is happening or we are in the decode headers/data callback, buffer the data.
Expand Down Expand Up @@ -1556,7 +1557,8 @@ void ConnectionManagerImpl::ActiveStream::addEncodedData(ActiveStreamEncoderFilt
Buffer::Instance& data, bool streaming) {
if (state_.filter_call_state_ == 0 ||
(state_.filter_call_state_ & FilterCallState::EncodeHeaders) ||
(state_.filter_call_state_ & FilterCallState::EncodeData)) {
(state_.filter_call_state_ & FilterCallState::EncodeData) ||
((state_.filter_call_state_ & FilterCallState::EncodeTrailers) && !filter.canIterate())) {
// Make sure if this triggers watermarks, the correct action is taken.
state_.encoder_filters_streaming_ = streaming;
// If no call is happening or we are in the decode headers/data callback, buffer the data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,6 @@ bool JsonTranscoderFilter::maybeConvertGrpcStatus(Grpc::Status::GrpcStatus grpc_
return false;
}

// We do not support responses with a separate trailer frame.
// TODO(ascheglov): remove this if after HCM can buffer data added from |encodeTrailers|.
if (response_headers_ != &trailers) {
return false;
}

// Send a serialized status only if there was no body.
if (has_body_) {
return false;
Expand Down
124 changes: 124 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,130 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback) {
HeaderMapPtr{new TestHeaderMapImpl{{"some", "trailer"}}});
}

// Don't send data frames, only headers and trailers.
TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback_NoDataFrames) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}};
decoder->decodeTrailers(std::move(trailers));
}));

setupFilterChain(2, 1);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl trailers_data("hello");
EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
decoder_filters_[0]->callbacks_->addDecodedData(trailers_data, false);
return FilterTrailersStatus::Continue;
}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
decoder_filters_[0]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);

EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
encoder_filters_[0]->callbacks_->addEncodedData(trailers_data, false);
return FilterTrailersStatus::Continue;
}));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
EXPECT_CALL(response_encoder_, encodeTrailers(_));
expectOnDestroy();

decoder_filters_[0]->callbacks_->encodeTrailers(
HeaderMapPtr{new TestHeaderMapImpl{{"some", "trailer"}}});
}

// Don't send data frames, only headers and trailers.
TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback_ContinueAfterCallback) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}};
decoder->decodeTrailers(std::move(trailers));
}));

setupFilterChain(2, 1);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl trailers_data("hello");
EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
decoder_filters_[0]->callbacks_->addDecodedData(trailers_data, false);
return FilterTrailersStatus::StopIteration;
}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());

decoder_filters_[0]->callbacks_->continueDecoding();

EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
decoder_filters_[0]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);

EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
encoder_filters_[0]->callbacks_->addEncodedData(trailers_data, false);
return FilterTrailersStatus::StopIteration;
}));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());

decoder_filters_[0]->callbacks_->encodeTrailers(
HeaderMapPtr{new TestHeaderMapImpl{{"some", "trailer"}}});

EXPECT_CALL(response_encoder_, encodeHeaders(_, false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
EXPECT_CALL(response_encoder_, encodeTrailers(_));
expectOnDestroy();

encoder_filters_[0]->callbacks_->continueEncoding();
}

// Add*Data during the *Data callbacks.
TEST_F(HttpConnectionManagerImplTest, FilterAddBodyDuringDecodeData) {
InSequence s;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class GrpcJsonTranscoderIntegrationTest
const std::vector<std::string>& grpc_request_messages,
const std::vector<std::string>& grpc_response_messages,
const Status& grpc_status, Http::HeaderMap&& response_headers,
const std::string& response_body, bool full_response = true) {
const std::string& response_body, bool full_response = true,
bool always_send_trailers = false) {
codec_client_ = makeHttpConnection(lookupPort("http"));

IntegrationStreamDecoderPtr response;
Expand Down Expand Up @@ -98,7 +99,7 @@ class GrpcJsonTranscoderIntegrationTest
Http::TestHeaderMapImpl response_headers;
response_headers.insertStatus().value(200);
response_headers.insertContentType().value(std::string("application/grpc"));
if (grpc_response_messages.empty()) {
if (grpc_response_messages.empty() && !always_send_trailers) {
response_headers.insertGrpcStatus().value(static_cast<uint64_t>(grpc_status.error_code()));
response_headers.insertGrpcMessage().value(absl::string_view(
grpc_status.error_message().data(), grpc_status.error_message().size()));
Expand Down Expand Up @@ -375,6 +376,30 @@ TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryErrorConvertedToJson) {
R"({"code":5,"message":"Shelf 100 Not Found"})");
}

// Upstream sends headers (e.g. sends metadata), and then sends trailer with an error.
TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryErrorInTrailerConvertedToJson) {
const std::string filter =
R"EOF(
name: envoy.grpc_json_transcoder
config:
proto_descriptor: "{}"
services: "bookstore.Bookstore"
convert_grpc_status: true
)EOF";
config_helper_.addFilter(
fmt::format(filter, TestEnvironment::runfilesPath("/test/proto/bookstore.descriptor")));
HttpIntegrationTest::initialize();
testTranscoding<bookstore::GetShelfRequest, bookstore::Shelf>(
Http::TestHeaderMapImpl{
{":method", "GET"}, {":path", "/shelves/100"}, {":authority", "host"}},
"", {"shelf: 100"}, {}, Status(Code::NOT_FOUND, "Shelf 100 Not Found"),
Http::TestHeaderMapImpl{{":status", "404"},
{"content-type", "application/json"},
{"grpc-status", UnexpectedHeaderValue},
{"grpc-message", UnexpectedHeaderValue}},
R"({"code":5,"message":"Shelf 100 Not Found"})", true, true);
}

TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryDelete) {
HttpIntegrationTest::initialize();
testTranscoding<bookstore::DeleteBookRequest, Empty>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,60 @@ TEST_F(GrpcJsonTranscoderFilterConvertGrpcStatusTest,
EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.encodeHeaders(response_headers, true));
}

// Response with a header frame and a trailer frame.
// (E.g. a gRPC server sends metadata and then it sends an error.)
TEST_F(GrpcJsonTranscoderFilterConvertGrpcStatusTest, TranscodingStatusFromTrailer) {
Http::TestHeaderMapImpl response_headers{{"content-type", "application/grpc"},
{":status", "200"}};

EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
filter_.encodeHeaders(response_headers, false));
EXPECT_EQ("application/json", response_headers.get_("content-type"));

std::string expected_response(R"({"code":5,"message":"Resource not found"})");
EXPECT_CALL(encoder_callbacks_, addEncodedData(_, false))
.WillOnce(Invoke([&expected_response](Buffer::Instance& data, bool) {
EXPECT_EQ(expected_response, data.toString());
}));

Http::TestHeaderMapImpl response_trailers{
{"grpc-status", "5"},
{"grpc-message", "unused"},
{"grpc-status-details-bin", "CAUSElJlc291cmNlIG5vdCBmb3VuZA"}};
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.encodeTrailers(response_trailers));
EXPECT_EQ("404", response_headers.get_(":status"));
EXPECT_EQ("application/json", response_headers.get_("content-type"));
EXPECT_FALSE(response_headers.has("grpc-status"));
EXPECT_FALSE(response_headers.has("grpc-message"));
EXPECT_FALSE(response_headers.has("grpc-status-details-bin"));
}

// Server sends a response body, don't replace it.
TEST_F(GrpcJsonTranscoderFilterConvertGrpcStatusTest, SkipTranscodingStatusIfBodyIsPresent) {
Http::TestHeaderMapImpl response_headers{{"content-type", "application/grpc"},
{":status", "200"}};

EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
filter_.encodeHeaders(response_headers, false));
EXPECT_EQ("application/json", response_headers.get_("content-type"));

bookstore::Shelf response;
response.set_id(20);
response.set_theme("Children");

auto response_data = Grpc::Common::serializeToGrpcFrame(response);
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer,
filter_.encodeData(*response_data, false));

std::string response_json = response_data->toString();
EXPECT_EQ(R"({"id":"20","theme":"Children"})", response_json);

EXPECT_CALL(encoder_callbacks_, addEncodedData(_, _)).Times(0);

Http::TestHeaderMapImpl response_trailers{{"grpc-status", "2"}, {"grpc-message", "not good"}};
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.decodeTrailers(response_trailers));
}

struct GrpcJsonTranscoderFilterPrintTestParam {
std::string config_json_;
std::string expected_response_;
Expand Down

0 comments on commit 429d6c4

Please sign in to comment.