Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thrift filter: support skip decoding data after metadata in the thrift message #13592

Merged
merged 15 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ enum ProtocolType {
TWITTER = 4;
}

// [#next-free-field: 6]
// [#next-free-field: 7]
message ThriftProxy {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.network.thrift_proxy.v2alpha1.ThriftProxy";
Expand All @@ -82,6 +82,12 @@ message ThriftProxy {
// compatibility, if no thrift_filters are specified, a default Thrift router filter
// (`envoy.filters.thrift.router`) is used.
repeated ThriftFilter thrift_filters = 5;

// If set to true, Envoy will try to skip decode data after metadata in the Thrift message.
// This mode will only work if the upstream and downstream protocols are the same and the transport
// is the same, the transport type is framed and the protocol is not Twitter. Otherwise Envoy will
// fallback to decode the data.
bool payload_passthrough = 6;
}

// ThriftFilter configures a Thrift filter.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion source/extensions/filters/network/thrift_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ ConfigImpl::ConfigImpl(
: context_(context), stats_prefix_(fmt::format("thrift.{}.", config.stat_prefix())),
stats_(ThriftFilterStats::generateStats(stats_prefix_, context_.scope())),
transport_(lookupTransport(config.transport())), proto_(lookupProtocol(config.protocol())),
route_matcher_(new Router::RouteMatcher(config.route_config())) {
route_matcher_(new Router::RouteMatcher(config.route_config())),
payload_passthrough_(config.payload_passthrough()) {

if (config.thrift_filters().empty()) {
ENVOY_LOG(debug, "using default router filter");
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/network/thrift_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class ConfigImpl : public Config,
TransportPtr createTransport() override;
ProtocolPtr createProtocol() override;
Router::Config& routerConfig() override { return *this; }
bool payloadPassthrough() override { return payload_passthrough_; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: bool payloadPassthrough() const


private:
void processFilter(
Expand All @@ -94,6 +95,7 @@ class ConfigImpl : public Config,
std::unique_ptr<Router::RouteMatcher> route_matcher_;

std::list<ThriftFilters::FilterFactoryCb> filter_factories_;
const bool payload_passthrough_;
};

} // namespace ThriftProxy
Expand Down
24 changes: 24 additions & 0 deletions source/extensions/filters/network/thrift_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,30 @@ void ConnectionManager::ActiveRpc::finalizeRequest() {
}
}

bool ConnectionManager::ActiveRpc::passthroughEnabled() {
if (!parent_.config_.payloadPassthrough()) {
return false;
}

for (auto& entry : decoder_filters_) {
if (entry->handle_->passthroughEnabled() == false) {
return false;
}
}
return true;
}

FilterStatus ConnectionManager::ActiveRpc::passthroughData(Buffer::Instance& data,
uint64_t bytes_to_passthrough) {
filter_context_ = &data;
filter_action_ = [this, bytes_to_passthrough](DecoderEventHandler* filter) -> FilterStatus {
Buffer::Instance* data = absl::any_cast<Buffer::Instance*>(filter_context_);
return filter->passthroughData(*data, bytes_to_passthrough);
};

return applyDecoderFilters(nullptr);
}

FilterStatus ConnectionManager::ActiveRpc::messageBegin(MessageMetadataSharedPtr metadata) {
ASSERT(metadata->hasSequenceId());
ASSERT(metadata->hasMessageType());
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/filters/network/thrift_proxy/conn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Config {
virtual TransportPtr createTransport() PURE;
virtual ProtocolPtr createProtocol() PURE;
virtual Router::Config& routerConfig() PURE;
virtual bool payloadPassthrough() PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:bool payloadPassthrough() const

};

/**
Expand Down Expand Up @@ -85,6 +86,7 @@ class ConnectionManager : public Network::ReadFilter,
: parent_(parent), decoder_(std::make_unique<Decoder>(transport, protocol, *this)),
complete_(false), first_reply_field_(false) {
initProtocolConverter(*parent_.parent_.protocol_, parent_.response_buffer_);
enablePassthrough(parent_.passthroughEnabled());
}

bool onData(Buffer::Instance& data);
Expand Down Expand Up @@ -180,6 +182,8 @@ class ConnectionManager : public Network::ReadFilter,
// DecoderEventHandler
FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override;
FilterStatus transportEnd() override;
bool passthroughEnabled() override;
FilterStatus passthroughData(Buffer::Instance& data, uint64_t bytes_to_passthrough) override;
FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override;
FilterStatus messageEnd() override;
FilterStatus structBegin(absl::string_view name) override;
Expand Down
21 changes: 20 additions & 1 deletion source/extensions/filters/network/thrift_proxy/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,32 @@ namespace Extensions {
namespace NetworkFilters {
namespace ThriftProxy {

DecoderStateMachine::DecoderStatus DecoderStateMachine::passthroughData(Buffer::Instance& buffer) {
if (body_bytes_ > buffer.length()) {
return {ProtocolState::WaitForData};
}

return {ProtocolState::MessageEnd, handler_.passthroughData(buffer, body_bytes_)};
}

// MessageBegin -> StructBegin
DecoderStateMachine::DecoderStatus DecoderStateMachine::messageBegin(Buffer::Instance& buffer) {
auto total = buffer.length();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: const auto total

if (!proto_.readMessageBegin(buffer, *metadata_)) {
return {ProtocolState::WaitForData};
}

stack_.clear();
stack_.emplace_back(Frame(ProtocolState::MessageEnd));

return {ProtocolState::StructBegin, handler_.messageBegin(metadata_)};
auto status = handler_.messageBegin(metadata_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: const auto


if (handler_.passthroughEnabled()) {
body_bytes_ = metadata_->frameSize() - (total - buffer.length());
return {ProtocolState::PassthroughData, status};
}

return {ProtocolState::StructBegin, status};
}

// MessageEnd -> Done
Expand Down Expand Up @@ -293,6 +309,8 @@ DecoderStateMachine::DecoderStatus DecoderStateMachine::handleValue(Buffer::Inst

DecoderStateMachine::DecoderStatus DecoderStateMachine::handleState(Buffer::Instance& buffer) {
switch (state_) {
case ProtocolState::PassthroughData:
return passthroughData(buffer);
case ProtocolState::MessageBegin:
return messageBegin(buffer);
case ProtocolState::StructBegin:
Expand Down Expand Up @@ -415,6 +433,7 @@ FilterStatus Decoder::onData(Buffer::Instance& data, bool& buffer_underflow) {

request_ = std::make_unique<ActiveRequest>(callbacks_.newDecoderEventHandler());
frame_started_ = true;
// TODO: add an option to configure passthrough
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this still need to be added or is this a future improvement? If the latter, we use the format TODO(github-username): ... to help keep track of these.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already done, will remove this line.

state_machine_ =
std::make_unique<DecoderStateMachine>(protocol_, metadata_, request_->handler_);

Expand Down
3 changes: 3 additions & 0 deletions source/extensions/filters/network/thrift_proxy/decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace ThriftProxy {
#define ALL_PROTOCOL_STATES(FUNCTION) \
FUNCTION(StopIteration) \
FUNCTION(WaitForData) \
FUNCTION(PassthroughData) \
FUNCTION(MessageBegin) \
FUNCTION(MessageEnd) \
FUNCTION(StructBegin) \
Expand Down Expand Up @@ -129,6 +130,7 @@ class DecoderStateMachine : public Logger::Loggable<Logger::Id::thrift> {

// These functions map directly to the matching ProtocolState values. Each returns the next state
// or ProtocolState::WaitForData if more data is required.
DecoderStatus passthroughData(Buffer::Instance& buffer);
DecoderStatus messageBegin(Buffer::Instance& buffer);
DecoderStatus messageEnd(Buffer::Instance& buffer);
DecoderStatus structBegin(Buffer::Instance& buffer);
Expand Down Expand Up @@ -167,6 +169,7 @@ class DecoderStateMachine : public Logger::Loggable<Logger::Id::thrift> {
DecoderEventHandler& handler_;
ProtocolState state_;
std::vector<Frame> stack_;
uint32_t body_bytes_{};
};

using DecoderStateMachinePtr = std::unique_ptr<DecoderStateMachine>;
Expand Down
15 changes: 15 additions & 0 deletions source/extensions/filters/network/thrift_proxy/decoder_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ class DecoderEventHandler {
*/
virtual FilterStatus transportEnd() PURE;

/**
* @return True if payload passthrough is enabled.
* Once any filter returns false, the payload passthrough mode is disabled.
*/
virtual bool passthroughEnabled() PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

virtual bool passthroughEnabled() const


/**
* Indicates raw bytes after metadata in a Thrift transport frame was detected.
* Filters should not modify data except for the router.
* @param data data to send as passthrough
* @param bytes_to_passthrough size of data
* @return FilterStatus to indicate if filter chain iteration should continue
*/
virtual FilterStatus passthroughData(Buffer::Instance& data, uint64_t bytes_to_passthrough) PURE;

/**
* Indicates that the start of a Thrift protocol message was detected.
* @param metadata MessageMetadataSharedPtr describing the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class PassThroughDecoderFilter : public DecoderFilter {

ThriftProxy::FilterStatus transportEnd() override { return ThriftProxy::FilterStatus::Continue; }

bool passthroughEnabled() override { return true; }

ThriftProxy::FilterStatus passthroughData(Buffer::Instance&, uint64_t) override {
return ThriftProxy::FilterStatus::Continue;
}

ThriftProxy::FilterStatus messageBegin(ThriftProxy::MessageMetadataSharedPtr) override {
return ThriftProxy::FilterStatus::Continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,19 @@ class ProtocolConverter : public virtual DecoderEventHandler {
buffer_ = &buffer;
}

void enablePassthrough(bool enable) { enable_passthrough_ = enable; }

// DecoderEventHandler
bool passthroughEnabled() override { return enable_passthrough_; }

FilterStatus passthroughData(Buffer::Instance& data, uint64_t bytes_to_passthrough) override {
// Current implementation will buffer all data after metadata and send it once.
ASSERT(bytes_to_passthrough <= data.length());

buffer_->move(data, bytes_to_passthrough);
return FilterStatus::Continue;
}

FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override {
proto_->writeMessageBegin(*buffer_, *metadata);
return FilterStatus::Continue;
Expand Down Expand Up @@ -125,6 +137,7 @@ class ProtocolConverter : public virtual DecoderEventHandler {
private:
Protocol* proto_;
Buffer::Instance* buffer_{};
bool enable_passthrough_{};
};

} // namespace ThriftProxy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) {
: callbacks_->downstreamProtocolType();
ASSERT(protocol != ProtocolType::Auto);

bool enable_passthrough = false;
if (callbacks_->downstreamTransportType() == TransportType::Framed &&
transport == TransportType::Framed && callbacks_->downstreamProtocolType() == protocol &&
protocol != ProtocolType::Twitter) {
enable_passthrough = true;
}
enablePassthrough(enable_passthrough);

Tcp::ConnectionPool::Instance* conn_pool = cluster_manager_.tcpConnPoolForCluster(
cluster_name, Upstream::ResourcePriority::Default, this);
if (!conn_pool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class ThriftBase : public DecoderEventHandler {
~ThriftBase() override = default;

// DecoderEventHandler
bool passthroughEnabled() override { return false; }
FilterStatus passthroughData(Buffer::Instance&, uint64_t) override {
return FilterStatus::Continue;
}
FilterStatus transportBegin(MessageMetadataSharedPtr) override { return FilterStatus::Continue; }
FilterStatus transportEnd() override { return FilterStatus::Continue; }
FilterStatus messageBegin(MessageMetadataSharedPtr) override { return FilterStatus::Continue; }
Expand Down
2 changes: 2 additions & 0 deletions test/extensions/filters/network/thrift_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ envoy_extension_cc_test(
":mocks",
":utility_lib",
"//source/extensions/filters/network/thrift_proxy:app_exception_lib",
"//source/extensions/filters/network/thrift_proxy:config",
"//source/extensions/filters/network/thrift_proxy/router:config",
"//source/extensions/filters/network/thrift_proxy/router:router_lib",
"//test/mocks/network:network_mocks",
Expand Down Expand Up @@ -333,6 +334,7 @@ envoy_extension_cc_test(
"//test/extensions/filters/network/thrift_proxy/driver:generate_fixture",
],
extension_name = "envoy.filters.network.thrift_proxy",
shard_count = 4,
tags = ["fails_on_windows"],
deps = [
":integration_lib",
Expand Down
18 changes: 18 additions & 0 deletions test/extensions/filters/network/thrift_proxy/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,24 @@ stat_prefix: ingress
EXPECT_EQ("thrift.ingress.", factory.config_stat_prefix_);
}

// Test config with payload passthrough enabled.
TEST_F(ThriftFilterConfigTest, ThriftProxyPayloadPassthrough) {
const std::string yaml = R"EOF(
stat_prefix: ingress
payload_passthrough: true
route_config:
name: local_route
thrift_filters:
- name: envoy.filters.thrift.router
)EOF";

envoy::extensions::filters::network::thrift_proxy::v3::ThriftProxy config =
parseThriftProxyFromV2Yaml(yaml);
testConfig(config);

EXPECT_EQ(true, config.payload_passthrough());
}

} // namespace ThriftProxy
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
Loading