diff --git a/configs/configgen.py b/configs/configgen.py index baf95c4b3c49..4cc5292ba595 100755 --- a/configs/configgen.py +++ b/configs/configgen.py @@ -139,5 +139,7 @@ def generate_config(template_path, template, output_file, **context): mongos_servers=mongos_servers) shutil.copy(os.path.join(SCRIPT_DIR, 'envoyproxy_io_proxy.yaml'), OUT_DIR) -shutil.copy(os.path.join(SCRIPT_DIR, 'encapsulate_in_connect.yaml'), OUT_DIR) -shutil.copy(os.path.join(SCRIPT_DIR, 'terminate_connect.yaml'), OUT_DIR) +shutil.copy(os.path.join(SCRIPT_DIR, 'encapsulate_in_http1_connect.yaml'), OUT_DIR) +shutil.copy(os.path.join(SCRIPT_DIR, 'encapsulate_in_http2_connect.yaml'), OUT_DIR) +shutil.copy(os.path.join(SCRIPT_DIR, 'terminate_http1_connect.yaml'), OUT_DIR) +shutil.copy(os.path.join(SCRIPT_DIR, 'terminate_http2_connect.yaml'), OUT_DIR) diff --git a/configs/encapsulate_in_http1_connect.yaml b/configs/encapsulate_in_http1_connect.yaml new file mode 100644 index 000000000000..1aee73d81841 --- /dev/null +++ b/configs/encapsulate_in_http1_connect.yaml @@ -0,0 +1,44 @@ +# This configuration takes incoming data on port 10000 and encapsulates it in a CONNECT +# request which is sent upstream port 10001. +# It can be used to test TCP tunneling as described in docs/root/intro/arch_overview/http/upgrades.rst +# and running `curl --x 127.0.0.1:10000 https://www.google.com` + +admin: + access_log_path: /tmp/admin_access.log + address: + socket_address: + protocol: TCP + address: 127.0.0.1 + port_value: 9903 +static_resources: + listeners: + - name: listener_0 + address: + socket_address: + protocol: TCP + address: 127.0.0.1 + port_value: 10000 + filter_chains: + - filters: + - name: tcp + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy + stat_prefix: tcp_stats + cluster: "cluster_0" + tunneling_config: + hostname: host.com:443 + clusters: + - name: cluster_0 + connect_timeout: 5s + # This ensures HTTP/1.1 CONNECT is used for establishing the tunnel. + http_protocol_options: + {} + load_assignment: + cluster_name: cluster_0 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 10001 diff --git a/configs/encapsulate_in_connect.yaml b/configs/encapsulate_in_http2_connect.yaml similarity index 91% rename from configs/encapsulate_in_connect.yaml rename to configs/encapsulate_in_http2_connect.yaml index 2394f6e44cc4..1d815f122d82 100644 --- a/configs/encapsulate_in_connect.yaml +++ b/configs/encapsulate_in_http2_connect.yaml @@ -26,10 +26,11 @@ static_resources: stat_prefix: tcp_stats cluster: "cluster_0" tunneling_config: - hostname: host.com + hostname: host.com:443 clusters: - name: cluster_0 connect_timeout: 5s + # This ensures HTTP/2 CONNECT is used for establishing the tunnel. http2_protocol_options: {} load_assignment: diff --git a/configs/terminate_http1_connect.yaml b/configs/terminate_http1_connect.yaml new file mode 100644 index 000000000000..67e6f2289a2c --- /dev/null +++ b/configs/terminate_http1_connect.yaml @@ -0,0 +1,61 @@ +# This configuration terminates a CONNECT request and sends the CONNECT payload upstream. +# It can be used to test TCP tunneling as described in docs/root/intro/arch_overview/http/upgrades.rst +# or used to test CONNECT directly, by running `curl -k -v -x 127.0.0.1:10001 https://www.google.com` +admin: + access_log_path: /tmp/admin_access.log + address: + socket_address: + protocol: TCP + address: 127.0.0.1 + port_value: 9902 +static_resources: + listeners: + - name: listener_0 + address: + socket_address: + protocol: TCP + address: 127.0.0.1 + port_value: 10001 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: + - "*" + routes: + - match: + connect_matcher: + {} + route: + cluster: service_google + upgrade_configs: + - upgrade_type: CONNECT + connect_config: + {} + http_filters: + - name: envoy.filters.http.router + http_protocol_options: {} + upgrade_configs: + - upgrade_type: CONNECT + clusters: + - name: service_google + connect_timeout: 0.25s + type: LOGICAL_DNS + # Comment out the following line to test on v6 networks + dns_lookup_family: V4_ONLY + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: service_google + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: www.google.com + port_value: 443 diff --git a/configs/terminate_connect.yaml b/configs/terminate_http2_connect.yaml similarity index 100% rename from configs/terminate_connect.yaml rename to configs/terminate_http2_connect.yaml diff --git a/docs/root/intro/arch_overview/http/upgrades.rst b/docs/root/intro/arch_overview/http/upgrades.rst index 959c576960df..08e5254a8ca7 100644 --- a/docs/root/intro/arch_overview/http/upgrades.rst +++ b/docs/root/intro/arch_overview/http/upgrades.rst @@ -94,17 +94,32 @@ will synthesize 200 response headers, and then forward the TCP data as the HTTP For an example of proxying connect, please see :repo:`configs/proxy_connect.yaml ` For an example of terminating connect, please see :repo:`configs/terminate_connect.yaml ` -Tunneling TCP over HTTP/2 -^^^^^^^^^^^^^^^^^^^^^^^^^ -Envoy also has support for transforming raw TCP into HTTP/2 CONNECT requests. This can be used to -proxy multiplexed TCP over pre-warmed secure connections and amortize the cost of any TLS handshake. -An example set up proxying SMTP would look something like this +Tunneling TCP over HTTP +^^^^^^^^^^^^^^^^^^^^^^^ +Envoy also has support for tunneling raw TCP over HTTP CONNECT requests. Find +below some usage scenarios. + +HTTP/2 CONNECT can be used to proxy multiplexed TCP over pre-warmed secure connections and amortize the cost of any TLS +handshake. +An example set up proxying SMTP would look something like this: [SMTP Upstream] --- raw SMTP --- [L2 Envoy] --- SMTP tunneled over HTTP/2 --- [L1 Envoy] --- raw SMTP --- [Client] +HTTP/1.1 CONNECT can be used to have TCP client connecting to its own +destination passing through an HTTP proxy server (e.g. corporate proxy not +supporting HTTP/2): + +[HTTP Server] --- raw HTTP --- [L2 Envoy] --- HTTP tunneled over HTTP/1.1 --- [L1 Envoy] --- raw HTTP --- [HTTP Client] + +Note that when using HTTP/1 CONNECT you will end up having a TCP connection +between L1 and L2 Envoy for each TCP client connection, it is preferable to use +HTTP/2 when you have the choice. + Examples of such a set up can be found in the Envoy example config :repo:`directory ` -If you run `bazel-bin/source/exe/envoy-static --config-path configs/encapsulate_in_connect.yaml --base-id 1` -and `bazel-bin/source/exe/envoy-static --config-path configs/terminate_connect.yaml` -you will be running two Envoys, the first listening for TCP traffic on port 10000 and encapsulating it in an HTTP/2 -CONNECT request, and the second listening for HTTP/2 on 10001, stripping the CONNECT headers, and forwarding the +For HTTP/1.1 run `bazel-bin/source/exe/envoy-static --config-path configs/encapsulate_in_http1_connect.yaml --base-id 1` +and `bazel-bin/source/exe/envoy-static --config-path configs/terminate_http1_connect.yaml`. +For HTTP/2 run `bazel-bin/source/exe/envoy-static --config-path configs/encapsulate_in_http2_connect.yaml --base-id 1` +and `bazel-bin/source/exe/envoy-static --config-path configs/terminate_http2_connect.yaml`. +In both cases you will be running a first Envoy listening for TCP traffic on port 10000 and encapsulating it in an HTTP +CONNECT request, and a second one listening on 10001, stripping the CONNECT headers, and forwarding the original TCP upstream, in this case to google.com. diff --git a/include/envoy/network/connection.h b/include/envoy/network/connection.h index 7efc6ada8175..98c995ab5444 100644 --- a/include/envoy/network/connection.h +++ b/include/envoy/network/connection.h @@ -118,6 +118,11 @@ class Connection : public Event::DeferredDeletable, public FilterManager { */ virtual void enableHalfClose(bool enabled) PURE; + /** + * @return true if half-close semantics are enabled, false otherwise. + */ + virtual bool isHalfCloseEnabled() PURE; + /** * Close the connection. */ diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index ca6232bd9d9c..2e14b112a685 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -66,6 +66,11 @@ class CodecClient : Logger::Loggable, connection_->addConnectionCallbacks(cb); } + /** + * Return if half-close semantics are enabled on the underlying connection. + */ + bool isHalfCloseEnabled() { return connection_->isHalfCloseEnabled(); } + /** * Close the underlying network connection. This is immediate and will not attempt to flush any * pending write data. @@ -176,8 +181,14 @@ class CodecClient : Logger::Loggable, CodecReadFilter(CodecClient& parent) : parent_(parent) {} // Network::ReadFilter - Network::FilterStatus onData(Buffer::Instance& data, bool) override { + Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override { parent_.onData(data); + if (end_stream && parent_.isHalfCloseEnabled()) { + // Note that this results in the connection closed as if it was closed + // locally, it would be more correct to convey the end stream to the + // response decoder, but it would require some refactoring. + parent_.close(); + } return Network::FilterStatus::StopIteration; } diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index f8c530cbcf48..466dfd887998 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -77,8 +77,8 @@ const std::string StreamEncoderImpl::LAST_CHUNK = "0\r\n"; StreamEncoderImpl::StreamEncoderImpl(ConnectionImpl& connection, HeaderKeyFormatter* header_key_formatter) : connection_(connection), disable_chunk_encoding_(false), chunk_encoding_(true), - is_response_to_head_request_(false), is_response_to_connect_request_(false), - header_key_formatter_(header_key_formatter) { + connect_request_(false), is_response_to_head_request_(false), + is_response_to_connect_request_(false), header_key_formatter_(header_key_formatter) { if (connection_.connection().aboveHighWatermark()) { runHighWatermarkCallbacks(); } @@ -261,6 +261,10 @@ void StreamEncoderImpl::endEncode() { connection_.flushOutput(true); connection_.onEncodeComplete(); + // With CONNECT, half-closing the connection is used to signal end stream. + if (connect_request_) { + connection_.connection().close(Network::ConnectionCloseType::FlushWriteAndDelay); + } } void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) { @@ -380,6 +384,7 @@ Status RequestEncoderImpl::encodeHeaders(const RequestHeaderMap& headers, bool e head_request_ = true; } else if (method->value() == Headers::get().MethodValues.Connect) { disableChunkEncoding(); + connection_.connection().enableHalfClose(true); connect_request_ = true; } if (Utility::isUpgrade(headers)) { diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index f63b777c6a44..5e38b6c24365 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -87,6 +87,7 @@ class StreamEncoderImpl : public virtual StreamEncoder, uint32_t read_disable_calls_{}; bool disable_chunk_encoding_ : 1; bool chunk_encoding_ : 1; + bool connect_request_ : 1; bool is_response_to_head_request_ : 1; bool is_response_to_connect_request_ : 1; @@ -162,7 +163,6 @@ class RequestEncoderImpl : public StreamEncoderImpl, public RequestEncoder { private: bool upgrade_request_{}; bool head_request_{}; - bool connect_request_{}; }; /** diff --git a/source/common/http/http1/codec_impl_legacy.cc b/source/common/http/http1/codec_impl_legacy.cc index d82a6ae15b2f..ff5fdee9eb20 100644 --- a/source/common/http/http1/codec_impl_legacy.cc +++ b/source/common/http/http1/codec_impl_legacy.cc @@ -78,8 +78,8 @@ const std::string StreamEncoderImpl::LAST_CHUNK = "0\r\n"; StreamEncoderImpl::StreamEncoderImpl(ConnectionImpl& connection, HeaderKeyFormatter* header_key_formatter) : connection_(connection), disable_chunk_encoding_(false), chunk_encoding_(true), - is_response_to_head_request_(false), is_response_to_connect_request_(false), - header_key_formatter_(header_key_formatter) { + connect_request_(false), is_response_to_head_request_(false), + is_response_to_connect_request_(false), header_key_formatter_(header_key_formatter) { if (connection_.connection().aboveHighWatermark()) { runHighWatermarkCallbacks(); } @@ -262,6 +262,10 @@ void StreamEncoderImpl::endEncode() { connection_.flushOutput(true); connection_.onEncodeComplete(); + // With CONNECT half-closing the connection is used to signal end stream. + if (connect_request_) { + connection_.connection().close(Network::ConnectionCloseType::FlushWriteAndDelay); + } } void ServerConnectionImpl::maybeAddSentinelBufferFragment(Buffer::WatermarkBuffer& output_buffer) { @@ -381,6 +385,7 @@ Status RequestEncoderImpl::encodeHeaders(const RequestHeaderMap& headers, bool e head_request_ = true; } else if (method->value() == Headers::get().MethodValues.Connect) { disableChunkEncoding(); + connection_.connection().enableHalfClose(true); connect_request_ = true; } if (Utility::isUpgrade(headers)) { diff --git a/source/common/http/http1/codec_impl_legacy.h b/source/common/http/http1/codec_impl_legacy.h index 6510116d7a36..b68e069dd0c4 100644 --- a/source/common/http/http1/codec_impl_legacy.h +++ b/source/common/http/http1/codec_impl_legacy.h @@ -89,6 +89,7 @@ class StreamEncoderImpl : public virtual StreamEncoder, uint32_t read_disable_calls_{}; bool disable_chunk_encoding_ : 1; bool chunk_encoding_ : 1; + bool connect_request_ : 1; bool is_response_to_head_request_ : 1; bool is_response_to_connect_request_ : 1; @@ -166,7 +167,6 @@ class RequestEncoderImpl : public StreamEncoderImpl, public RequestEncoder { private: bool upgrade_request_{}; bool head_request_{}; - bool connect_request_{}; }; /** diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 1a265cf10287..4c52ee2fc516 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -375,6 +375,7 @@ void ConnectionImpl::readDisable(bool disable) { } void ConnectionImpl::raiseEvent(ConnectionEvent event) { + ENVOY_CONN_LOG(trace, "raising connection event {}", *this, event); ConnectionImplBase::raiseConnectionEvent(event); // We may have pending data in the write buffer on transport handshake // completion, which may also have completed in the context of onReadReady(), diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 020375414265..3ede8c59839b 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -61,6 +61,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // Network::Connection void addBytesSentCallback(BytesSentCb cb) override; void enableHalfClose(bool enabled) override; + bool isHalfCloseEnabled() override { return enable_half_close_; } void close(ConnectionCloseType type) final; std::string nextProtocol() const override { return transport_socket_->protocol(); } void noDelay(bool enable) override; diff --git a/source/common/tcp_proxy/BUILD b/source/common/tcp_proxy/BUILD index 401b8c3e8794..df1b7b222447 100644 --- a/source/common/tcp_proxy/BUILD +++ b/source/common/tcp_proxy/BUILD @@ -21,6 +21,7 @@ envoy_cc_library( "//include/envoy/tcp:upstream_interface", "//include/envoy/upstream:cluster_manager_interface", "//include/envoy/upstream:load_balancer_interface", + "//source/common/http:codec_client_lib", "//source/common/http:header_map_lib", "//source/common/http:headers_lib", "//source/common/http:utility_lib", @@ -58,6 +59,7 @@ envoy_cc_library( "//source/common/common:empty_string", "//source/common/common:macros", "//source/common/common:minimal_logger_lib", + "//source/common/http:codec_client_lib", "//source/common/network:application_protocol_lib", "//source/common/network:cidr_range_lib", "//source/common/network:filter_lib", diff --git a/source/common/tcp_proxy/upstream.cc b/source/common/tcp_proxy/upstream.cc index 3f3d73c6add4..128a7f769039 100644 --- a/source/common/tcp_proxy/upstream.cc +++ b/source/common/tcp_proxy/upstream.cc @@ -2,6 +2,8 @@ #include "envoy/upstream/cluster_manager.h" +#include "common/http/codec_client.h" +#include "common/http/codes.h" #include "common/http/header_map_impl.h" #include "common/http/headers.h" #include "common/http/utility.h" @@ -54,17 +56,10 @@ TcpUpstream::onDownstreamEvent(Network::ConnectionEvent event) { HttpUpstream::HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, const std::string& hostname) - : upstream_callbacks_(callbacks), response_decoder_(*this), hostname_(hostname) {} + : hostname_(hostname), response_decoder_(*this), upstream_callbacks_(callbacks) {} HttpUpstream::~HttpUpstream() { resetEncoder(Network::ConnectionEvent::LocalClose); } -bool HttpUpstream::isValidBytestreamResponse(const Http::ResponseHeaderMap& headers) { - if (Http::Utility::getResponseStatus(headers) != 200) { - return false; - } - return true; -} - bool HttpUpstream::readDisable(bool disable) { if (!request_encoder_) { return false; @@ -112,22 +107,6 @@ void HttpUpstream::onBelowWriteBufferLowWatermark() { upstream_callbacks_.onBelowWriteBufferLowWatermark(); } -void HttpUpstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) { - request_encoder_ = &request_encoder; - request_encoder_->getStream().addCallbacks(*this); - const std::string& scheme = - is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http; - auto headers = Http::createHeaderMap( - {{Http::Headers::get().Method, "CONNECT"}, - {Http::Headers::get().Protocol, Http::Headers::get().ProtocolValues.Bytestream}, - {Http::Headers::get().Scheme, scheme}, - {Http::Headers::get().Path, "/"}, - {Http::Headers::get().Host, hostname_}}); - const auto status = request_encoder_->encodeHeaders(*headers, false); - // Encoding can only fail on missing required request headers. - ASSERT(status.ok()); -} - void HttpUpstream::resetEncoder(Network::ConnectionEvent event, bool inform_downstream) { if (!request_encoder_) { return; @@ -204,8 +183,9 @@ void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data HttpConnPool::HttpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, Upstream::LoadBalancerContext* context, const TunnelingConfig& config, - Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) - : hostname_(config.hostname()), upstream_callbacks_(upstream_callbacks) { + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks, + Http::CodecClient::Type type) + : hostname_(config.hostname()), type_(type), upstream_callbacks_(upstream_callbacks) { conn_pool_ = cluster_manager.httpConnPoolForCluster( cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, context); } @@ -220,7 +200,11 @@ HttpConnPool::~HttpConnPool() { void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) { callbacks_ = &callbacks; - upstream_ = std::make_unique(upstream_callbacks_, hostname_); + if (type_ == Http::CodecClient::Type::HTTP1) { + upstream_ = std::make_unique(upstream_callbacks_, hostname_); + } else { + upstream_ = std::make_unique(upstream_callbacks_, hostname_); + } Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newStream(upstream_->responseDecoder(), *this); if (handle != nullptr) { @@ -246,5 +230,66 @@ void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, info.downstreamSslConnection()); } +Http2Upstream::Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, + const std::string& hostname) + : HttpUpstream(callbacks, hostname) {} + +bool Http2Upstream::isValidResponse(const Http::ResponseHeaderMap& headers) { + if (Http::Utility::getResponseStatus(headers) != 200) { + return false; + } + return true; +} + +void Http2Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) { + request_encoder_ = &request_encoder; + request_encoder_->getStream().addCallbacks(*this); + const std::string& scheme = + is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http; + auto headers = Http::createHeaderMap( + {{Http::Headers::get().Method, "CONNECT"}, + {Http::Headers::get().Protocol, Http::Headers::get().ProtocolValues.Bytestream}, + {Http::Headers::get().Scheme, scheme}, + {Http::Headers::get().Path, "/"}, + {Http::Headers::get().Host, hostname_}}); + const auto status = request_encoder_->encodeHeaders(*headers, false); + // Encoding can only fail on missing required request headers. + ASSERT(status.ok()); +} + +Http1Upstream::Http1Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, + const std::string& hostname) + : HttpUpstream(callbacks, hostname) {} + +void Http1Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool) { + request_encoder_ = &request_encoder; + request_encoder_->getStream().addCallbacks(*this); + + ASSERT(request_encoder_->http1StreamEncoderOptions() != absl::nullopt); + auto headers = Http::createHeaderMap({ + {Http::Headers::get().Method, "CONNECT"}, + {Http::Headers::get().Host, hostname_}, + }); + + const auto status = request_encoder_->encodeHeaders(*headers, false); + // Encoding can only fail on missing required request headers. + ASSERT(status.ok()); +} + +bool Http1Upstream::isValidResponse(const Http::ResponseHeaderMap& headers) { + // According to RFC7231 any 2xx response indicates that the connection is + // established. + // Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored. + // https://tools.ietf.org/html/rfc7231#section-4.3.6 + return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers)); +} + +void Http1Upstream::encodeData(Buffer::Instance& data, bool end_stream) { + if (!request_encoder_) { + return; + } + request_encoder_->encodeData(data, end_stream); +} + } // namespace TcpProxy } // namespace Envoy diff --git a/source/common/tcp_proxy/upstream.h b/source/common/tcp_proxy/upstream.h index e132b667e2ca..7a4da879187a 100644 --- a/source/common/tcp_proxy/upstream.h +++ b/source/common/tcp_proxy/upstream.h @@ -7,6 +7,8 @@ #include "envoy/upstream/load_balancer.h" #include "envoy/upstream/upstream.h" +#include "common/http/codec_client.h" + namespace Envoy { namespace TcpProxy { @@ -44,10 +46,12 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba HttpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, Upstream::LoadBalancerContext* context, const TunnelingConfig& config, - Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks); + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks, + Http::CodecClient::Type type); ~HttpConnPool() override; - bool valid() const { return conn_pool_ != nullptr; } + // HTTP/3 upstreams are not supported at the moment. + bool valid() const { return conn_pool_ != nullptr && type_ <= Http::CodecClient::Type::HTTP2; } // GenericConnPool void newStream(GenericConnectionPoolCallbacks& callbacks) override; @@ -62,6 +66,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba private: const std::string hostname_; + Http::CodecClient::Type type_; Http::ConnectionPool::Instance* conn_pool_{}; Http::ConnectionPool::Cancellable* upstream_handle_{}; GenericConnectionPoolCallbacks* callbacks_{}; @@ -84,15 +89,15 @@ class TcpUpstream : public GenericUpstream { Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_; }; -class HttpUpstream : public GenericUpstream, Http::StreamCallbacks { +class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks { public: - HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, const std::string& hostname); ~HttpUpstream() override; - static bool isValidBytestreamResponse(const Http::ResponseHeaderMap& headers); + virtual bool isValidResponse(const Http::ResponseHeaderMap&) PURE; void doneReading(); void doneWriting(); + Http::ResponseDecoder& responseDecoder() { return response_decoder_; } // GenericUpstream bool readDisable(bool disable) override; @@ -106,20 +111,23 @@ class HttpUpstream : public GenericUpstream, Http::StreamCallbacks { void onAboveWriteBufferHighWatermark() override; void onBelowWriteBufferLowWatermark() override; - virtual void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl); + virtual void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) PURE; - Http::ResponseDecoder& responseDecoder() { return response_decoder_; } - -private: +protected: + HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, const std::string& hostname); void resetEncoder(Network::ConnectionEvent event, bool inform_downstream = true); + Http::RequestEncoder* request_encoder_{}; + const std::string hostname_; + +private: class DecoderShim : public Http::ResponseDecoder { public: DecoderShim(HttpUpstream& parent) : parent_(parent) {} // Http::ResponseDecoder void decode100ContinueHeaders(Http::ResponseHeaderMapPtr&&) override {} void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override { - if (!isValidBytestreamResponse(*headers) || end_stream) { + if (!parent_.isValidResponse(*headers) || end_stream) { parent_.resetEncoder(Network::ConnectionEvent::LocalClose); } } @@ -135,14 +143,28 @@ class HttpUpstream : public GenericUpstream, Http::StreamCallbacks { private: HttpUpstream& parent_; }; - - Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; DecoderShim response_decoder_; - Http::RequestEncoder* request_encoder_{}; - const std::string hostname_; + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; bool read_half_closed_{}; bool write_half_closed_{}; }; +class Http1Upstream : public HttpUpstream { +public: + Http1Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, const std::string& hostname); + + void encodeData(Buffer::Instance& data, bool end_stream) override; + void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) override; + bool isValidResponse(const Http::ResponseHeaderMap& headers) override; +}; + +class Http2Upstream : public HttpUpstream { +public: + Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, const std::string& hostname); + + void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) override; + bool isValidResponse(const Http::ResponseHeaderMap& headers) override; +}; + } // namespace TcpProxy } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc index 3e30e6ec5779..4954196bfa47 100644 --- a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc +++ b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc @@ -42,6 +42,11 @@ void QuicFilterManagerConnectionImpl::enableHalfClose(bool enabled) { RELEASE_ASSERT(!enabled, "Quic connection doesn't support half close."); } +bool QuicFilterManagerConnectionImpl::isHalfCloseEnabled() { + // Quic doesn't support half close. + return false; +} + void QuicFilterManagerConnectionImpl::setBufferLimits(uint32_t /*limit*/) { // Currently read buffer is capped by connection level flow control. And write buffer limit is set // during construction. Changing the buffer limit during the life time of the connection is not diff --git a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h index 8f01d03ca6b9..06ef6a9cb98b 100644 --- a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h +++ b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h @@ -35,6 +35,7 @@ class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase { NOT_REACHED_GCOVR_EXCL_LINE; } void enableHalfClose(bool enabled) override; + bool isHalfCloseEnabled() override; void close(Network::ConnectionCloseType type) override; Event::Dispatcher& dispatcher() override { return dispatcher_; } std::string nextProtocol() const override { return EMPTY_STRING; } diff --git a/source/extensions/upstreams/tcp/generic/BUILD b/source/extensions/upstreams/tcp/generic/BUILD index d6e2f2f3cae2..dc1ae3eb9116 100644 --- a/source/extensions/upstreams/tcp/generic/BUILD +++ b/source/extensions/upstreams/tcp/generic/BUILD @@ -19,6 +19,7 @@ envoy_cc_extension( security_posture = "robust_to_untrusted_downstream", visibility = ["//visibility:public"], deps = [ + "//source/common/http:codec_client_lib", "//source/common/tcp_proxy:upstream_lib", "@envoy_api//envoy/extensions/upstreams/tcp/generic/v3:pkg_cc_proto", ], diff --git a/source/extensions/upstreams/tcp/generic/config.cc b/source/extensions/upstreams/tcp/generic/config.cc index 634862885f5c..675eda9122f3 100644 --- a/source/extensions/upstreams/tcp/generic/config.cc +++ b/source/extensions/upstreams/tcp/generic/config.cc @@ -2,6 +2,7 @@ #include "envoy/upstream/cluster_manager.h" +#include "common/http/codec_client.h" #include "common/tcp_proxy/upstream.h" namespace Envoy { @@ -19,16 +20,11 @@ TcpProxy::GenericConnPoolPtr GenericConnPoolFactory::createGenericConnPool( if (!cluster) { return nullptr; } - // TODO(snowp): Ideally we should prevent this from being configured, but that's tricky to get - // right since whether a cluster is invalid depends on both the tcp_proxy config + cluster - // config. - if ((cluster->info()->features() & Upstream::ClusterInfo::Features::HTTP2) == 0) { - ENVOY_LOG_MISC(error, "Attempted to tunnel over HTTP/1.1, this is not supported. Set " - "http2_protocol_options on the cluster."); - return nullptr; - } - auto ret = std::make_unique(cluster_name, cluster_manager, context, - config.value(), upstream_callbacks); + auto pool_type = ((cluster->info()->features() & Upstream::ClusterInfo::Features::HTTP2) != 0) + ? Http::CodecClient::Type::HTTP2 + : Http::CodecClient::Type::HTTP1; + auto ret = std::make_unique( + cluster_name, cluster_manager, context, config.value(), upstream_callbacks, pool_type); return (ret->valid() ? std::move(ret) : nullptr); } auto ret = std::make_unique(cluster_name, cluster_manager, context, diff --git a/source/server/api_listener_impl.h b/source/server/api_listener_impl.h index b0dd0ef701c3..e56fec57d73b 100644 --- a/source/server/api_listener_impl.h +++ b/source/server/api_listener_impl.h @@ -99,6 +99,7 @@ class ApiListenerImplBase : public ApiListener, NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } void enableHalfClose(bool) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + bool isHalfCloseEnabled() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } void close(Network::ConnectionCloseType) override {} Event::Dispatcher& dispatcher() override { return parent_.parent_.factory_context_.dispatcher(); diff --git a/test/common/tcp_proxy/upstream_test.cc b/test/common/tcp_proxy/upstream_test.cc index 9464d5d25970..af2a786ed8fe 100644 --- a/test/common/tcp_proxy/upstream_test.cc +++ b/test/common/tcp_proxy/upstream_test.cc @@ -11,96 +11,110 @@ using testing::_; using testing::AnyNumber; +using testing::Return; namespace Envoy { namespace TcpProxy { namespace { -class HttpUpstreamTest : public testing::Test { +template class HttpUpstreamTest : public testing::Test { public: HttpUpstreamTest() { EXPECT_CALL(encoder_, getStream()).Times(AnyNumber()); EXPECT_CALL(encoder_, encodeHeaders(_, false)); - upstream_ = std::make_unique(callbacks_, hostname_); + EXPECT_CALL(encoder_, http1StreamEncoderOptions()).Times(AnyNumber()); + if (typeid(T) == typeid(Http1Upstream)) { + ON_CALL(encoder_, http1StreamEncoderOptions()) + .WillByDefault(Return(Http::Http1StreamEncoderOptionsOptRef(stream_encoder_options_))); + } + EXPECT_CALL(stream_encoder_options_, enableHalfClose()).Times(AnyNumber()); + upstream_ = std::make_unique(callbacks_, hostname_); upstream_->setRequestEncoder(encoder_, true); } Http::MockRequestEncoder encoder_; + Http::MockHttp1StreamEncoderOptions stream_encoder_options_; NiceMock callbacks_; std::unique_ptr upstream_; - std::string hostname_{"default.host.com"}; + std::string hostname_{"default.host.com:443"}; }; -TEST_F(HttpUpstreamTest, WriteUpstream) { - EXPECT_CALL(encoder_, encodeData(BufferStringEqual("foo"), false)); +using testing::Types; + +typedef Types Implementations; + +TYPED_TEST_SUITE(HttpUpstreamTest, Implementations); + +TYPED_TEST(HttpUpstreamTest, WriteUpstream) { + EXPECT_CALL(this->encoder_, encodeData(BufferStringEqual("foo"), false)); Buffer::OwnedImpl buffer1("foo"); - upstream_->encodeData(buffer1, false); + this->upstream_->encodeData(buffer1, false); - EXPECT_CALL(encoder_, encodeData(BufferStringEqual("bar"), true)); + EXPECT_CALL(this->encoder_, encodeData(BufferStringEqual("bar"), true)); Buffer::OwnedImpl buffer2("bar"); - upstream_->encodeData(buffer2, true); + this->upstream_->encodeData(buffer2, true); // New upstream with no encoder - upstream_ = std::make_unique(callbacks_, hostname_); - upstream_->encodeData(buffer2, true); + this->upstream_ = std::make_unique(this->callbacks_, this->hostname_); + this->upstream_->encodeData(buffer2, true); } -TEST_F(HttpUpstreamTest, WriteDownstream) { - EXPECT_CALL(callbacks_, onUpstreamData(BufferStringEqual("foo"), false)); +TYPED_TEST(HttpUpstreamTest, WriteDownstream) { + EXPECT_CALL(this->callbacks_, onUpstreamData(BufferStringEqual("foo"), false)); Buffer::OwnedImpl buffer1("foo"); - upstream_->responseDecoder().decodeData(buffer1, false); + this->upstream_->responseDecoder().decodeData(buffer1, false); - EXPECT_CALL(callbacks_, onUpstreamData(BufferStringEqual("bar"), true)); + EXPECT_CALL(this->callbacks_, onUpstreamData(BufferStringEqual("bar"), true)); Buffer::OwnedImpl buffer2("bar"); - upstream_->responseDecoder().decodeData(buffer2, true); + this->upstream_->responseDecoder().decodeData(buffer2, true); } -TEST_F(HttpUpstreamTest, InvalidUpgradeWithEarlyFin) { - EXPECT_CALL(callbacks_, onEvent(_)); +TYPED_TEST(HttpUpstreamTest, InvalidUpgradeWithEarlyFin) { + EXPECT_CALL(this->callbacks_, onEvent(_)); Http::ResponseHeaderMapPtr headers{new Http::TestResponseHeaderMapImpl{{":status", "200"}}}; - upstream_->responseDecoder().decodeHeaders(std::move(headers), true); + this->upstream_->responseDecoder().decodeHeaders(std::move(headers), true); } -TEST_F(HttpUpstreamTest, InvalidUpgradeWithNon200) { - EXPECT_CALL(callbacks_, onEvent(_)); +TYPED_TEST(HttpUpstreamTest, InvalidUpgradeWithNon200) { + EXPECT_CALL(this->callbacks_, onEvent(_)); Http::ResponseHeaderMapPtr headers{new Http::TestResponseHeaderMapImpl{{":status", "301"}}}; - upstream_->responseDecoder().decodeHeaders(std::move(headers), false); + this->upstream_->responseDecoder().decodeHeaders(std::move(headers), false); } -TEST_F(HttpUpstreamTest, ReadDisable) { - EXPECT_CALL(encoder_.stream_, readDisable(true)); - EXPECT_TRUE(upstream_->readDisable(true)); +TYPED_TEST(HttpUpstreamTest, ReadDisable) { + EXPECT_CALL(this->encoder_.stream_, readDisable(true)); + EXPECT_TRUE(this->upstream_->readDisable(true)); - EXPECT_CALL(encoder_.stream_, readDisable(false)); - EXPECT_TRUE(upstream_->readDisable(false)); + EXPECT_CALL(this->encoder_.stream_, readDisable(false)); + EXPECT_TRUE(this->upstream_->readDisable(false)); // New upstream with no encoder - upstream_ = std::make_unique(callbacks_, hostname_); - EXPECT_FALSE(upstream_->readDisable(true)); + this->upstream_ = std::make_unique(this->callbacks_, this->hostname_); + EXPECT_FALSE(this->upstream_->readDisable(true)); } -TEST_F(HttpUpstreamTest, AddBytesSentCallbackForCoverage) { - upstream_->addBytesSentCallback([&](uint64_t) {}); +TYPED_TEST(HttpUpstreamTest, AddBytesSentCallbackForCoverage) { + this->upstream_->addBytesSentCallback([&](uint64_t) {}); } -TEST_F(HttpUpstreamTest, DownstreamDisconnect) { - EXPECT_CALL(encoder_.stream_, resetStream(Http::StreamResetReason::LocalReset)); - EXPECT_CALL(callbacks_, onEvent(_)).Times(0); - EXPECT_TRUE(upstream_->onDownstreamEvent(Network::ConnectionEvent::LocalClose) == nullptr); +TYPED_TEST(HttpUpstreamTest, DownstreamDisconnect) { + EXPECT_CALL(this->encoder_.stream_, resetStream(Http::StreamResetReason::LocalReset)); + EXPECT_CALL(this->callbacks_, onEvent(_)).Times(0); + EXPECT_TRUE(this->upstream_->onDownstreamEvent(Network::ConnectionEvent::LocalClose) == nullptr); } -TEST_F(HttpUpstreamTest, UpstreamReset) { - EXPECT_CALL(encoder_.stream_, resetStream(_)).Times(0); - EXPECT_CALL(callbacks_, onEvent(_)); - upstream_->onResetStream(Http::StreamResetReason::ConnectionTermination, ""); +TYPED_TEST(HttpUpstreamTest, UpstreamReset) { + EXPECT_CALL(this->encoder_.stream_, resetStream(_)).Times(0); + EXPECT_CALL(this->callbacks_, onEvent(_)); + this->upstream_->onResetStream(Http::StreamResetReason::ConnectionTermination, ""); } -TEST_F(HttpUpstreamTest, UpstreamWatermarks) { - EXPECT_CALL(callbacks_, onAboveWriteBufferHighWatermark()); - upstream_->onAboveWriteBufferHighWatermark(); +TYPED_TEST(HttpUpstreamTest, UpstreamWatermarks) { + EXPECT_CALL(this->callbacks_, onAboveWriteBufferHighWatermark()); + this->upstream_->onAboveWriteBufferHighWatermark(); - EXPECT_CALL(callbacks_, onBelowWriteBufferLowWatermark()); - upstream_->onBelowWriteBufferLowWatermark(); + EXPECT_CALL(this->callbacks_, onBelowWriteBufferLowWatermark()); + this->upstream_->onBelowWriteBufferLowWatermark(); } } // namespace diff --git a/test/integration/protocol_integration_test.cc b/test/integration/protocol_integration_test.cc index acaf97b0ce64..68865b4c16e7 100644 --- a/test/integration/protocol_integration_test.cc +++ b/test/integration/protocol_integration_test.cc @@ -387,8 +387,13 @@ TEST_P(ProtocolIntegrationTest, FaultyFilterWithConnect) { codec_client_ = makeHttpConnection(lookupPort("http")); // Missing host for CONNECT - auto response = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ - {":method", "CONNECT"}, {":scheme", "http"}, {":authority", "www.host.com:80"}}); + auto headers = Http::TestRequestHeaderMapImpl{ + {":method", "CONNECT"}, {":scheme", "http"}, {":authority", "www.host.com:80"}}; + + auto response = (downstream_protocol_ == Http::CodecClient::Type::HTTP1) + ? std::move((codec_client_->startRequest(headers)).second) + : codec_client_->makeHeaderOnlyRequest(headers); + response->waitForEndStream(); EXPECT_TRUE(response->complete()); EXPECT_EQ("503", response->headers().getStatusValue()); @@ -2077,8 +2082,10 @@ TEST_P(DownstreamProtocolIntegrationTest, InvalidAuthority) { TEST_P(DownstreamProtocolIntegrationTest, ConnectIsBlocked) { initialize(); codec_client_ = makeHttpConnection(lookupPort("http")); - auto response = codec_client_->makeHeaderOnlyRequest( + auto encoder_decoder = codec_client_->startRequest( Http::TestRequestHeaderMapImpl{{":method", "CONNECT"}, {":authority", "host.com:80"}}); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); if (downstreamProtocol() == Http::CodecClient::Type::HTTP1) { // Because CONNECT requests for HTTP/1 do not include a path, they will fail diff --git a/test/integration/tcp_tunneling_integration_test.cc b/test/integration/tcp_tunneling_integration_test.cc index d5056a28720c..9dd77c67369e 100644 --- a/test/integration/tcp_tunneling_integration_test.cc +++ b/test/integration/tcp_tunneling_integration_test.cc @@ -311,28 +311,39 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, ConnectTerminationIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); -// Tunneling downstream TCP over an upstream HTTP channel. -class TcpTunnelingIntegrationTest : public testing::TestWithParam, +using Params = std::tuple; + +// Tunneling downstream TCP over an upstream HTTP CONNECT tunnel. +class TcpTunnelingIntegrationTest : public testing::TestWithParam, public HttpIntegrationTest { public: - TcpTunnelingIntegrationTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, GetParam()) {} + TcpTunnelingIntegrationTest() + : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, std::get<0>(GetParam())) {} + + static std::string paramsToString(const testing::TestParamInfo& p) { + return fmt::format("{}_{}", + std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6", + std::get<1>(p.param) == FakeHttpConnection::Type::HTTP1 ? "HTTP1Upstream" + : "HTTP2Upstream"); + } void SetUp() override { enable_half_close_ = true; setDownstreamProtocol(Http::CodecClient::Type::HTTP2); - setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); + setUpstreamProtocol(std::get<1>(GetParam())); config_helper_.addConfigModifier( [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy proxy_config; proxy_config.set_stat_prefix("tcp_stats"); proxy_config.set_cluster("cluster_0"); - proxy_config.mutable_tunneling_config()->set_hostname("host.com"); + proxy_config.mutable_tunneling_config()->set_hostname("host.com:80"); auto* listener = bootstrap.mutable_static_resources()->add_listeners(); listener->set_name("tcp_proxy"); auto* socket_address = listener->mutable_address()->mutable_socket_address(); - socket_address->set_address(Network::Test::getLoopbackAddressString(GetParam())); + socket_address->set_address( + Network::Test::getLoopbackAddressString(std::get<0>(GetParam()))); socket_address->set_port_value(0); auto* filter_chain = listener->add_filter_chains(); @@ -368,27 +379,13 @@ TEST_P(TcpTunnelingIntegrationTest, Basic) { ASSERT_TRUE(tcp_client->write("hello", false)); tcp_client->close(); ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); - ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); - - // If the upstream now sends 'end stream' the connection is fully closed. - upstream_request_->encodeData(0, true); -} - -// Validates that if the cluster is not configured with HTTP/2 we don't attempt -// to tunnel the data. -TEST_P(TcpTunnelingIntegrationTest, InvalidCluster) { - config_helper_.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { - bootstrap.mutable_static_resources() - ->mutable_clusters() - ->Mutable(0) - ->clear_http2_protocol_options(); - }); - initialize(); - - // Start a connection and see it close immediately due to the invalid cluster. - IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); - tcp_client->waitForHalfClose(); - tcp_client->close(); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + } else { + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + // If the upstream now sends 'end stream' the connection is fully closed. + upstream_request_->encodeData(0, true); + } } TEST_P(TcpTunnelingIntegrationTest, InvalidResponseHeaders) { @@ -404,7 +401,11 @@ TEST_P(TcpTunnelingIntegrationTest, InvalidResponseHeaders) { // upstream gets a stream reset. default_response_headers_.setStatus(enumToInt(Http::Code::ServiceUnavailable)); upstream_request_->encodeHeaders(default_response_headers_, false); - ASSERT_TRUE(upstream_request_->waitForReset()); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + } else { + ASSERT_TRUE(upstream_request_->waitForReset()); + } // The connection should be fully closed, but the client has no way of knowing // that. Ensure the FIN is read and clean up state. @@ -429,20 +430,31 @@ TEST_P(TcpTunnelingIntegrationTest, CloseUpstreamFirst) { // Send data from upstream to downstream with an end stream and make sure the data is received // before the connection is half-closed. upstream_request_->encodeData(12, true); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(fake_upstream_connection_->close()); + } ASSERT_TRUE(tcp_client->waitForData(12)); tcp_client->waitForHalfClose(); - // Attempt to send data upstream. - // should go through. - ASSERT_TRUE(tcp_client->write("hello", false)); - ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); - - ASSERT_TRUE(tcp_client->write("hello", true)); - ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); - ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + tcp_client->close(); + } else { + // Attempt to send data upstream. + // should go through. + ASSERT_TRUE(tcp_client->write("hello", false)); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); + + ASSERT_TRUE(tcp_client->write("hello", true)); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + } } TEST_P(TcpTunnelingIntegrationTest, ResetStreamTest) { + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + return; + } enable_half_close_ = false; initialize(); @@ -490,7 +502,12 @@ TEST_P(TcpTunnelingIntegrationTest, TestIdletimeoutWithLargeOutstandingData) { upstream_request_->encodeData(data, false); tcp_client->waitForDisconnect(); - ASSERT_TRUE(upstream_request_->waitForReset()); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + tcp_client->close(); + } else { + ASSERT_TRUE(upstream_request_->waitForReset()); + } } // Test that a downstream flush works correctly (all data is flushed) @@ -508,18 +525,29 @@ TEST_P(TcpTunnelingIntegrationTest, TcpProxyDownstreamFlush) { upstream_request_->encodeHeaders(default_response_headers_, false); tcp_client->readDisable(true); - ASSERT_TRUE(tcp_client->write("", true)); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(tcp_client->write("hello", false)); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); - // This ensures that readDisable(true) has been run on its thread - // before tcp_client starts writing. - ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeData(data, true); + ASSERT_TRUE(fake_upstream_connection_->close()); + } else { + ASSERT_TRUE(tcp_client->write("", true)); + + // This ensures that readDisable(true) has been run on its thread + // before tcp_client starts writing. + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); - upstream_request_->encodeData(data, true); + upstream_request_->encodeData(data, true); + } test_server_->waitForCounterGe("cluster.cluster_0.upstream_flow_control_paused_reading_total", 1); tcp_client->readDisable(false); tcp_client->waitForData(data); tcp_client->waitForHalfClose(); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + tcp_client->close(); + } } // Test that an upstream flush works correctly (all data is flushed) @@ -543,19 +571,29 @@ TEST_P(TcpTunnelingIntegrationTest, TcpProxyUpstreamFlush) { ASSERT_TRUE(tcp_client->waitForData(5)); ASSERT_TRUE(tcp_client->write(data, true)); + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + tcp_client->close(); - // Note that upstream_flush_active will *not* be incremented for the HTTP - // tunneling case. The data is already written to the stream, so no drainer - // is necessary. - upstream_request_->readDisable(false); - ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, size)); - ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); - upstream_request_->encodeData("world", true); - tcp_client->waitForHalfClose(); + upstream_request_->readDisable(false); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, size)); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + } else { + // Note that upstream_flush_active will *not* be incremented for the HTTP + // tunneling case. The data is already written to the stream, so no drainer + // is necessary. + upstream_request_->readDisable(false); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, size)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeData("world", true); + tcp_client->waitForHalfClose(); + } } // Test that h2 connection is reused. TEST_P(TcpTunnelingIntegrationTest, H2ConnectionReuse) { + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP1) { + return; + } initialize(); // Establish a connection. @@ -597,9 +635,186 @@ TEST_P(TcpTunnelingIntegrationTest, H2ConnectionReuse) { ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); } -INSTANTIATE_TEST_SUITE_P(IpVersions, TcpTunnelingIntegrationTest, - testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), - TestUtility::ipTestParamsToString); +// Test that with HTTP1 we have no connection reuse with downstream close. +TEST_P(TcpTunnelingIntegrationTest, H1NoConnectionReuse) { + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP2) { + return; + } + initialize(); + + // Establish a connection. + IntegrationTcpClientPtr tcp_client1 = makeTcpConnection(lookupPort("tcp_proxy")); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + upstream_request_->encodeHeaders(default_response_headers_, false); + + // Send data in both directions. + ASSERT_TRUE(tcp_client1->write("hello1", false)); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, "hello1")); + + // Send data from upstream to downstream and close the connection + // from downstream. + upstream_request_->encodeData("world1", false); + tcp_client1->waitForData("world1"); + tcp_client1->close(); + + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + + // Establish a new connection. + IntegrationTcpClientPtr tcp_client2 = makeTcpConnection(lookupPort("tcp_proxy")); + // A new connection is established + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + upstream_request_->encodeHeaders(default_response_headers_, false); + + ASSERT_TRUE(tcp_client2->write("hello1", false)); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, "hello1")); + tcp_client2->close(); + + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); +} + +// Test that with HTTP1 we have no connection with upstream close. +TEST_P(TcpTunnelingIntegrationTest, H1UpstreamCloseNoConnectionReuse) { + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP2) { + return; + } + initialize(); + + // Establish a connection. + IntegrationTcpClientPtr tcp_client1 = makeTcpConnection(lookupPort("tcp_proxy")); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + upstream_request_->encodeHeaders(default_response_headers_, false); + + // Send data in both directions. + ASSERT_TRUE(tcp_client1->write("hello1", false)); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, "hello1")); + + // Send data from upstream to downstream and close the connection + // from the upstream. + upstream_request_->encodeData("world1", false); + tcp_client1->waitForData("world1"); + ASSERT_TRUE(fake_upstream_connection_->close()); + + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + tcp_client1->waitForHalfClose(); + tcp_client1->close(); + + // Establish a new connection. + IntegrationTcpClientPtr tcp_client2 = makeTcpConnection(lookupPort("tcp_proxy")); + // A new connection is established + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + upstream_request_->encodeHeaders(default_response_headers_, false); + + ASSERT_TRUE(tcp_client2->write("hello2", false)); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, "hello2")); + ASSERT_TRUE(fake_upstream_connection_->close()); + + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + tcp_client2->waitForHalfClose(); + tcp_client2->close(); +} + +TEST_P(TcpTunnelingIntegrationTest, 2xxStatusCodeValidHttp1) { + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP2) { + return; + } + initialize(); + + // Start a connection, and verify the upgrade headers are received upstream. + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + + // Send valid response headers, in HTTP1 all status codes in the 2xx range + // are considered valid. + default_response_headers_.setStatus(enumToInt(Http::Code::Accepted)); + upstream_request_->encodeHeaders(default_response_headers_, false); + + // Send some data from downstream to upstream, and make sure it goes through. + ASSERT_TRUE(tcp_client->write("hello", false)); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); + + // Send data from upstream to downstream. + upstream_request_->encodeData(12, false); + ASSERT_TRUE(tcp_client->waitForData(12)); + + // Close the downstream connection and wait for upstream disconnect + tcp_client->close(); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); +} + +TEST_P(TcpTunnelingIntegrationTest, ContentLengthHeaderIgnoredHttp1) { + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP2) { + return; + } + initialize(); + + // Start a connection, and verify the upgrade headers are received upstream. + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + + // Send upgrade headers downstream, including content-length that must be + // ignored. + default_response_headers_.setStatus(enumToInt(Http::Code::IMUsed)); + default_response_headers_.setContentLength(10); + upstream_request_->encodeHeaders(default_response_headers_, false); + + // Send data from upstream to downstream. + upstream_request_->encodeData(12, false); + ASSERT_TRUE(tcp_client->waitForData(12)); + + // Now send some data and close the TCP client. + ASSERT_TRUE(tcp_client->write("hello", false)); + tcp_client->close(); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); +} + +// TODO(irozzo): temporarily disabled as a protocol error is thrown when +// transfer-encoding header is received in CONNECT responses. +TEST_P(TcpTunnelingIntegrationTest, DISABLED_TransferEncodingHeaderIgnoredHttp1) { + if (upstreamProtocol() == FakeHttpConnection::Type::HTTP2) { + return; + } + initialize(); + + // Start a connection, and verify the upgrade headers are received upstream. + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); + // Using raw connection to be able to set Transfer-encoding header. + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + std::string data; + ASSERT_TRUE(fake_upstream_connection->waitForData( + FakeRawConnection::waitForInexactMatch("\r\n\r\n"), &data)); + ASSERT_THAT(data, testing::HasSubstr("CONNECT host.com:80 HTTP/1.1")); + + // Send upgrade headers downstream, fully establishing the connection. + ASSERT_TRUE( + fake_upstream_connection->write("HTTP/1.1 299 OK\r\nTransfer-encoding: chunked\r\n\r\n")); + + // Now send some data and close the TCP client. + ASSERT_TRUE(tcp_client->write("hello", false)); + tcp_client->close(); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, 5)); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); +} + +INSTANTIATE_TEST_SUITE_P( + IpAndHttpVersions, TcpTunnelingIntegrationTest, + ::testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + testing::Values(FakeHttpConnection::Type::HTTP1, + FakeHttpConnection::Type::HTTP2)), + TcpTunnelingIntegrationTest::paramsToString); } // namespace } // namespace Envoy diff --git a/test/mocks/http/stream_encoder.h b/test/mocks/http/stream_encoder.h index df3d62c00e71..8e023f4d98db 100644 --- a/test/mocks/http/stream_encoder.h +++ b/test/mocks/http/stream_encoder.h @@ -17,6 +17,7 @@ class MockHttp1StreamEncoderOptions : public Http1StreamEncoderOptions { ~MockHttp1StreamEncoderOptions() override; MOCK_METHOD(void, disableChunkEncoding, ()); + MOCK_METHOD(void, enableHalfClose, ()); }; class MockRequestEncoder : public RequestEncoder { diff --git a/test/mocks/network/connection.h b/test/mocks/network/connection.h index d33d4797f992..2bf02ccfdc6a 100644 --- a/test/mocks/network/connection.h +++ b/test/mocks/network/connection.h @@ -55,6 +55,7 @@ class MockConnectionBase { MOCK_METHOD(void, addReadFilter, (ReadFilterSharedPtr filter)); \ MOCK_METHOD(void, removeReadFilter, (ReadFilterSharedPtr filter)); \ MOCK_METHOD(void, enableHalfClose, (bool enabled)); \ + MOCK_METHOD(bool, isHalfCloseEnabled, ()); \ MOCK_METHOD(void, close, (ConnectionCloseType type)); \ MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); \ MOCK_METHOD(uint64_t, id, (), (const)); \ @@ -138,6 +139,7 @@ class MockFilterManagerConnection : public FilterManagerConnection, public MockC MOCK_METHOD(void, addReadFilter, (ReadFilterSharedPtr filter)); MOCK_METHOD(void, removeReadFilter, (ReadFilterSharedPtr filter)); MOCK_METHOD(void, enableHalfClose, (bool enabled)); + MOCK_METHOD(bool, isHalfCloseEnabled, ()); MOCK_METHOD(void, close, (ConnectionCloseType type)); MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); MOCK_METHOD(uint64_t, id, (), (const));