Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement request header processing in ext_proc #14385

Merged
merged 13 commits into from
Jan 11, 2021
15 changes: 13 additions & 2 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@ envoy_extension_package()

envoy_cc_library(
name = "ext_proc",
srcs = ["ext_proc.cc"],
hdrs = ["ext_proc.h"],
srcs = [
"ext_proc.cc",
"headers.cc",
],
hdrs = [
"ext_proc.h",
"headers.h",
],
deps = [
":client_interface",
"//include/envoy/http:filter_interface",
"//include/envoy/http:header_map_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)

Expand All @@ -27,6 +37,7 @@ envoy_cc_extension(
security_posture = "unknown",
status = "alpha",
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/common:factory_base_lib",
Expand Down
17 changes: 13 additions & 4 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <string>

#include "extensions/filters/http/ext_proc/client_impl.h"
#include "extensions/filters/http/ext_proc/ext_proc.h"

namespace Envoy {
Expand All @@ -11,11 +12,19 @@ namespace ExternalProcessing {

Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string&, Server::Configuration::FactoryContext&) {
const auto filter_config = std::make_shared<FilterConfig>(proto_config);
const std::string&, Server::Configuration::FactoryContext& context) {
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(timeout_ms));

return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) {
callbacks.addStreamFilter(Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config)});
return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), grpc_service, context.scope());

callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}

Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ExternalProcessingFilterConfig
ExternalProcessingFilterConfig() : FactoryBase(HttpFilterNames::get().ExternalProcessing) {}

private:
static constexpr uint64_t DefaultTimeout = 200;

Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override;
Expand Down
107 changes: 106 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,116 @@
#include "extensions/filters/http/ext_proc/ext_proc.h"

#include "extensions/filters/http/ext_proc/headers.h"

#include "absl/strings/str_format.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

void Filter::onDestroy() {}
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

void Filter::onDestroy() {
if (stream_ && !stream_closed_) {
ENVOY_LOG(debug, "Closing gRPC stream to processing server");
stream_->close();
}
}

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of_stream) {
// We're at the start, so start the stream and send a headers message
request_headers_ = &headers;
stream_ = client_->start(*this, config_->grpcTimeout());
ProcessingRequest req;
auto headers_req = req.mutable_request_headers();
buildHttpHeaders(headers, headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
request_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);

// Wait until we have a gRPC response before allowing any more callbacks
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
applyHeaderMutations(common_response.header_mutation(), request_headers_);
}
}
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
}

if (!message_valid) {
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
stream_closed_ = true;
stream_->close();
}
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
}
}

void Filter::onGrpcClose() {
ENVOY_LOG(debug, "Received gRPC stream close");
stream_closed_ = true;
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
Expand Down
49 changes: 44 additions & 5 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <chrono>
#include <memory>

#include "envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.pb.h"
Expand All @@ -9,6 +10,7 @@
#include "common/common/logger.h"

#include "extensions/filters/http/common/pass_through_filter.h"
#include "extensions/filters/http/ext_proc/client.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -17,25 +19,62 @@ namespace ExternalProcessing {

class FilterConfig {
public:
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& config)
: failure_mode_allow_(config.failure_mode_allow()) {}
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& config,
const std::chrono::milliseconds grpc_timeout)
: failure_mode_allow_(config.failure_mode_allow()), grpc_timeout_(grpc_timeout) {}

bool failureModeAllow() const { return failure_mode_allow_; }

const std::chrono::milliseconds& grpcTimeout() const { return grpc_timeout_; }

private:
const bool failure_mode_allow_;
const std::chrono::milliseconds grpc_timeout_;
};

using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;

class Filter : public Logger::Loggable<Logger::Id::filter>, public Http::PassThroughFilter {
class Filter : public Logger::Loggable<Logger::Id::filter>,
public Http::PassThroughFilter,
public ExternalProcessorCallbacks {
enum FilterState {
IDLE,
HEADERS,
};

public:
Filter(const FilterConfigSharedPtr& config) : config_(config) {}
Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client)
: config_(config), client_(std::move(client)) {}

void onDestroy() override;

void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;

// ExternalProcessorCallbacks

void onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& response) override;

void onGrpcError(Grpc::Status::GrpcStatus error) override;

void onGrpcClose() override;

private:
FilterConfigSharedPtr config_;
const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;

Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr;

FilterState request_state_ = FilterState::IDLE;
ExternalProcessorStreamPtr stream_;
bool stream_closed_ = false;

Http::HeaderMap* request_headers_ = nullptr;
};

} // namespace ExternalProcessing
Expand Down
45 changes: 45 additions & 0 deletions source/extensions/filters/http/ext_proc/headers.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "extensions/filters/http/ext_proc/headers.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

using Http::LowerCaseString;

void buildHttpHeaders(const Http::HeaderMap& headers_in,
envoy::config::core::v3::HeaderMap* headers_out) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Envoy style prefers mutable refs, e.g. envoy::config::core::v3::HeaderMap& headers_out. This applies here but also below to other functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixed that.

headers_in.iterate([headers_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate {
auto new_header = headers_out->add_headers();
new_header->set_key(std::string(e.key().getStringView()));
new_header->set_value(std::string(e.value().getStringView()));
return Http::HeaderMap::Iterate::Continue;
});
}

void applyHeaderMutations(const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation,
Http::HeaderMap* headers) {
for (const auto& sh : mutation.set_headers()) {
if (!sh.has_header()) {
continue;
}
bool append = false;
if (sh.has_append()) {
append = sh.append().value();
}
if (append) {
headers->addCopy(LowerCaseString(sh.header().key()), sh.header().value());
} else {
headers->setCopy(LowerCaseString(sh.header().key()), sh.header().value());
}
}

for (const auto& rh : mutation.remove_headers()) {
headers->remove(LowerCaseString(rh));
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
20 changes: 20 additions & 0 deletions source/extensions/filters/http/ext_proc/headers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include "envoy/http/header_map.h"
#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

extern void buildHttpHeaders(const Http::HeaderMap& headers_in,
envoy::config::core::v3::HeaderMap* headers_out);

extern void applyHeaderMutations(const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation,
Http::HeaderMap* headers);

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Loading