Skip to content

Commit

Permalink
http conn man: stopped filters can add data from encodeTrailers (envo…
Browse files Browse the repository at this point in the history
…yproxy#8404)

Now filters can return StopIteration from encodeHeaders,
and then call addEncodedData from encodeTrailers.

This allows the JsonTranscoderFilter to properly transcode gRPC status
in trailer headers into a JSON reply body.

Risk Level: Low
Testing: unit-tests, integration tests in gRPC JSON transcoder filter.
Documentation: n/a
Release notes: n/a
Fixes envoyproxy#8402

Signed-off-by: Anatoly Scheglov <ascheglov@yandex-team.ru>
  • Loading branch information
ascheglov committed Oct 1, 2019
1 parent 5c5eb16 commit a76aad9
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 12 deletions.
6 changes: 4 additions & 2 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* 4) If additional data needs to be added in the decodeTrailers() callback, this method can be
* called in the context of the callback. All further filters will receive decodeData(..., false)
* followed by decodeTrailers().
* followed by decodeTrailers(). However if the iteration is stopped, the added data will
* buffered, so that the further filters will not receive decodeData() before decodeHeaders().
*
* It is an error to call this method in any other case.
*
Expand Down Expand Up @@ -562,7 +563,8 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* 4) If additional data needs to be added in the encodeTrailers() callback, this method can be
* called in the context of the callback. All further filters will receive encodeData(..., false)
* followed by encodeTrailers().
* followed by encodeTrailers(). However if the iteration is stopped, the added data will
* buffered, so that the further filters will not receive encodeData() before encodeHeaders().
*
* It is an error to call this method in any other case.
*
Expand Down
6 changes: 4 additions & 2 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,8 @@ void ConnectionManagerImpl::ActiveStream::addDecodedData(ActiveStreamDecoderFilt
Buffer::Instance& data, bool streaming) {
if (state_.filter_call_state_ == 0 ||
(state_.filter_call_state_ & FilterCallState::DecodeHeaders) ||
(state_.filter_call_state_ & FilterCallState::DecodeData)) {
(state_.filter_call_state_ & FilterCallState::DecodeData) ||
((state_.filter_call_state_ & FilterCallState::DecodeTrailers) && !filter.canIterate())) {
// Make sure if this triggers watermarks, the correct action is taken.
state_.decoder_filters_streaming_ = streaming;
// If no call is happening or we are in the decode headers/data callback, buffer the data.
Expand Down Expand Up @@ -1556,7 +1557,8 @@ void ConnectionManagerImpl::ActiveStream::addEncodedData(ActiveStreamEncoderFilt
Buffer::Instance& data, bool streaming) {
if (state_.filter_call_state_ == 0 ||
(state_.filter_call_state_ & FilterCallState::EncodeHeaders) ||
(state_.filter_call_state_ & FilterCallState::EncodeData)) {
(state_.filter_call_state_ & FilterCallState::EncodeData) ||
((state_.filter_call_state_ & FilterCallState::EncodeTrailers) && !filter.canIterate())) {
// Make sure if this triggers watermarks, the correct action is taken.
state_.encoder_filters_streaming_ = streaming;
// If no call is happening or we are in the decode headers/data callback, buffer the data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,6 @@ bool JsonTranscoderFilter::maybeConvertGrpcStatus(Grpc::Status::GrpcStatus grpc_
return false;
}

// We do not support responses with a separate trailer frame.
// TODO(ascheglov): remove this if after HCM can buffer data added from |encodeTrailers|.
if (response_headers_ != &trailers) {
return false;
}

// Send a serialized status only if there was no body.
if (has_body_) {
return false;
Expand Down
124 changes: 124 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,130 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback) {
HeaderMapPtr{new TestHeaderMapImpl{{"some", "trailer"}}});
}

// Don't send data frames, only headers and trailers.
TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback_NoDataFrames) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}};
decoder->decodeTrailers(std::move(trailers));
}));

setupFilterChain(2, 1);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl trailers_data("hello");
EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
decoder_filters_[0]->callbacks_->addDecodedData(trailers_data, false);
return FilterTrailersStatus::Continue;
}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
decoder_filters_[0]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);

EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
encoder_filters_[0]->callbacks_->addEncodedData(trailers_data, false);
return FilterTrailersStatus::Continue;
}));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
EXPECT_CALL(response_encoder_, encodeTrailers(_));
expectOnDestroy();

decoder_filters_[0]->callbacks_->encodeTrailers(
HeaderMapPtr{new TestHeaderMapImpl{{"some", "trailer"}}});
}

// Don't send data frames, only headers and trailers.
TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback_ContinueAfterCallback) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}};
decoder->decodeTrailers(std::move(trailers));
}));

setupFilterChain(2, 1);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl trailers_data("hello");
EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
decoder_filters_[0]->callbacks_->addDecodedData(trailers_data, false);
return FilterTrailersStatus::StopIteration;
}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());

decoder_filters_[0]->callbacks_->continueDecoding();

EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
decoder_filters_[0]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);

EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_))
.WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus {
encoder_filters_[0]->callbacks_->addEncodedData(trailers_data, false);
return FilterTrailersStatus::StopIteration;
}));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());

decoder_filters_[0]->callbacks_->encodeTrailers(
HeaderMapPtr{new TestHeaderMapImpl{{"some", "trailer"}}});

EXPECT_CALL(response_encoder_, encodeHeaders(_, false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
EXPECT_CALL(response_encoder_, encodeTrailers(_));
expectOnDestroy();

encoder_filters_[0]->callbacks_->continueEncoding();
}

// Add*Data during the *Data callbacks.
TEST_F(HttpConnectionManagerImplTest, FilterAddBodyDuringDecodeData) {
InSequence s;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class GrpcJsonTranscoderIntegrationTest
const std::vector<std::string>& grpc_request_messages,
const std::vector<std::string>& grpc_response_messages,
const Status& grpc_status, Http::HeaderMap&& response_headers,
const std::string& response_body, bool full_response = true) {
const std::string& response_body, bool full_response = true,
bool always_send_trailers = false) {
codec_client_ = makeHttpConnection(lookupPort("http"));

IntegrationStreamDecoderPtr response;
Expand Down Expand Up @@ -98,7 +99,7 @@ class GrpcJsonTranscoderIntegrationTest
Http::TestHeaderMapImpl response_headers;
response_headers.insertStatus().value(200);
response_headers.insertContentType().value(std::string("application/grpc"));
if (grpc_response_messages.empty()) {
if (grpc_response_messages.empty() && !always_send_trailers) {
response_headers.insertGrpcStatus().value(static_cast<uint64_t>(grpc_status.error_code()));
response_headers.insertGrpcMessage().value(absl::string_view(
grpc_status.error_message().data(), grpc_status.error_message().size()));
Expand Down Expand Up @@ -375,6 +376,30 @@ TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryErrorConvertedToJson) {
R"({"code":5,"message":"Shelf 100 Not Found"})");
}

// Upstream sends headers (e.g. sends metadata), and then sends trailer with an error.
TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryErrorInTrailerConvertedToJson) {
const std::string filter =
R"EOF(
name: envoy.grpc_json_transcoder
config:
proto_descriptor: "{}"
services: "bookstore.Bookstore"
convert_grpc_status: true
)EOF";
config_helper_.addFilter(
fmt::format(filter, TestEnvironment::runfilesPath("/test/proto/bookstore.descriptor")));
HttpIntegrationTest::initialize();
testTranscoding<bookstore::GetShelfRequest, bookstore::Shelf>(
Http::TestHeaderMapImpl{
{":method", "GET"}, {":path", "/shelves/100"}, {":authority", "host"}},
"", {"shelf: 100"}, {}, Status(Code::NOT_FOUND, "Shelf 100 Not Found"),
Http::TestHeaderMapImpl{{":status", "404"},
{"content-type", "application/json"},
{"grpc-status", UnexpectedHeaderValue},
{"grpc-message", UnexpectedHeaderValue}},
R"({"code":5,"message":"Shelf 100 Not Found"})", true, true);
}

TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryDelete) {
HttpIntegrationTest::initialize();
testTranscoding<bookstore::DeleteBookRequest, Empty>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,60 @@ TEST_F(GrpcJsonTranscoderFilterConvertGrpcStatusTest,
EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.encodeHeaders(response_headers, true));
}

// Response with a header frame and a trailer frame.
// (E.g. a gRPC server sends metadata and then it sends an error.)
TEST_F(GrpcJsonTranscoderFilterConvertGrpcStatusTest, TranscodingStatusFromTrailer) {
Http::TestHeaderMapImpl response_headers{{"content-type", "application/grpc"},
{":status", "200"}};

EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
filter_.encodeHeaders(response_headers, false));
EXPECT_EQ("application/json", response_headers.get_("content-type"));

std::string expected_response(R"({"code":5,"message":"Resource not found"})");
EXPECT_CALL(encoder_callbacks_, addEncodedData(_, false))
.WillOnce(Invoke([&expected_response](Buffer::Instance& data, bool) {
EXPECT_EQ(expected_response, data.toString());
}));

Http::TestHeaderMapImpl response_trailers{
{"grpc-status", "5"},
{"grpc-message", "unused"},
{"grpc-status-details-bin", "CAUSElJlc291cmNlIG5vdCBmb3VuZA"}};
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.encodeTrailers(response_trailers));
EXPECT_EQ("404", response_headers.get_(":status"));
EXPECT_EQ("application/json", response_headers.get_("content-type"));
EXPECT_FALSE(response_headers.has("grpc-status"));
EXPECT_FALSE(response_headers.has("grpc-message"));
EXPECT_FALSE(response_headers.has("grpc-status-details-bin"));
}

// Server sends a response body, don't replace it.
TEST_F(GrpcJsonTranscoderFilterConvertGrpcStatusTest, SkipTranscodingStatusIfBodyIsPresent) {
Http::TestHeaderMapImpl response_headers{{"content-type", "application/grpc"},
{":status", "200"}};

EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
filter_.encodeHeaders(response_headers, false));
EXPECT_EQ("application/json", response_headers.get_("content-type"));

bookstore::Shelf response;
response.set_id(20);
response.set_theme("Children");

auto response_data = Grpc::Common::serializeToGrpcFrame(response);
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer,
filter_.encodeData(*response_data, false));

std::string response_json = response_data->toString();
EXPECT_EQ(R"({"id":"20","theme":"Children"})", response_json);

EXPECT_CALL(encoder_callbacks_, addEncodedData(_, _)).Times(0);

Http::TestHeaderMapImpl response_trailers{{"grpc-status", "2"}, {"grpc-message", "not good"}};
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.decodeTrailers(response_trailers));
}

struct GrpcJsonTranscoderFilterPrintTestParam {
std::string config_json_;
std::string expected_response_;
Expand Down

0 comments on commit a76aad9

Please sign in to comment.