Skip to content

Commit 8db01a4

Browse files
authored
udp: limit number of reads per event loop (#16180)
Commit Message: To prevent long event loop when too many UDP packets are in the queue, limit how many packets to read in each event loop. If haven't finished reading, artifacts a READ event to continue in the next event loop. Additional Description: Add numPacketsExpectedPerEventLoop() callback to UdpListenerCallback, so that QUIC listener can tell how many packets it wants to read in each loop. The actually number of packets read are still bound by MAX_NUM_PACKETS_PER_EVENT_LOOP (6000). Quic listener returns numPacketsExpectedPerEventLoop() based on number of connections it has at the moment and the configured envoy::config::listener::QuicProtocolOptions.packets_to_read_to_connection_count_ratio. Made InjectableSingleton really thread safe. Risk Level: medium, other than quic listener, other UdpListenerCallbacks return max size_t for numPacketsExpectedPerEventLoop(). This will cause those callbacks to read 6000 packets per READ event. Testing: added udp listener unit tests. Fixes #16335 #16278 Part of #16198 #16493 Signed-off-by: Dan Zhang <danzh@google.com>
1 parent c76f2b2 commit 8db01a4

30 files changed

+290
-67
lines changed

api/envoy/config/listener/v3/quic_config.proto

+13
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import "envoy/config/core/v3/base.proto";
66
import "envoy/config/core/v3/protocol.proto";
77

88
import "google/protobuf/duration.proto";
9+
import "google/protobuf/wrappers.proto";
910

1011
import "udpa/annotations/status.proto";
1112
import "udpa/annotations/versioning.proto";
13+
import "validate/validate.proto";
1214

1315
option java_package = "io.envoyproxy.envoy.config.listener.v3";
1416
option java_outer_classname = "QuicConfigProto";
@@ -18,6 +20,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
1820
// [#protodoc-title: QUIC listener config]
1921

2022
// Configuration specific to the UDP QUIC listener.
23+
// [#next-free-field: 6]
2124
message QuicProtocolOptions {
2225
option (udpa.annotations.versioning).previous_message_type =
2326
"envoy.api.v2.listener.QuicProtocolOptions";
@@ -35,4 +38,14 @@ message QuicProtocolOptions {
3538
// Runtime flag that controls whether the listener is enabled or not. If not specified, defaults
3639
// to enabled.
3740
core.v3.RuntimeFeatureFlag enabled = 4;
41+
42+
// A multiplier to number of connections which is used to determine how many packets to read per
43+
// event loop. A reasonable number should allow the listener to process enough payload but not
44+
// starve TCP and other UDP sockets and also prevent long event loop duration.
45+
// The default value is 32. This means if there are N QUIC connections, the total number of
46+
// packets to read in each read event will be 32 * N.
47+
// The actual number of packets to read in total by the UDP listener is also
48+
// bound by 6000, regardless of this field or how many connections there are.
49+
google.protobuf.UInt32Value packets_to_read_to_connection_count_ratio = 5
50+
[(validate.rules).uint32 = {gte: 1}];
3851
}

api/envoy/config/listener/v4alpha/quic_config.proto

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/root/version_history/current.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ Minor Behavior Changes
3434
defined within the listener where the sockets are redirected to. Clear that field to restore the previous behavior.
3535
* tcp: switched to the new connection pool by default. Any unexpected behavioral changes can be reverted by setting runtime guard ``envoy.reloadable_features.new_tcp_connection_pool`` to false.
3636

37-
3837
Bug Fixes
3938
---------
4039
*Changes expected to improve the state of the world and are unlikely to have negative effects*
4140

4241
* http: port stripping now works for CONNECT requests, though the port will be restored if the CONNECT request is sent upstream. This behavior can be temporarily reverted by setting ``envoy.reloadable_features.strip_port_from_connect`` to false.
4342
* http: raise max configurable max_request_headers_kb limit to 8192 KiB (8MiB) from 96 KiB in http connection manager.
4443
* listener: fix the crash which could happen when the ongoing filter chain only listener update is followed by the listener removal or full listener update.
44+
* udp: limit each UDP listener to read maxmium 6000 packets per event loop. This behavior can be temporarily reverted by setting ``envoy.reloadable_features.udp_per_event_loop_read_limit`` to false.
4545
* validation: fix an issue that causes TAP sockets to panic during config validation mode.
4646
* xray: fix the default sampling 'rate' for AWS X-Ray tracer extension to be 5% as opposed to 50%.
4747
* zipkin: fix timestamp serializaiton in annotations. A prior bug fix exposed an issue with timestamps being serialized as strings.

generated_api_shadow/envoy/config/listener/v3/quic_config.proto

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

generated_api_shadow/envoy/config/listener/v4alpha/quic_config.proto

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

include/envoy/network/listener.h

+5
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,11 @@ class UdpListenerCallbacks {
330330
* Posts ``data`` to be delivered on this worker.
331331
*/
332332
virtual void post(Network::UdpRecvData&& data) PURE;
333+
334+
/**
335+
* An estimated number of UDP packets this callback expects to process in current read event.
336+
*/
337+
virtual size_t numPacketsExpectedPerEventLoop() const PURE;
333338
};
334339

335340
using UdpListenerCallbacksOptRef = absl::optional<std::reference_wrapper<UdpListenerCallbacks>>;

source/common/network/udp_listener_impl.cc

+6-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,12 @@ void UdpListenerImpl::handleReadCallback() {
7575
const Api::IoErrorPtr result = Utility::readPacketsFromSocket(
7676
socket_->ioHandle(), *socket_->addressProvider().localAddress(), *this, time_source_,
7777
config_.prefer_gro_, packets_dropped_);
78-
// TODO(mattklein123): Handle no error when we limit the number of packets read.
78+
if (result == nullptr) {
79+
// No error. The number of reads was limited by read rate. There are more packets to read.
80+
// Register to read more in the next event loop.
81+
socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read);
82+
return;
83+
}
7984
if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) {
8085
// TODO(mattklein123): When rate limited logging is implemented log this at error level
8186
// on a periodic basis.

source/common/network/udp_listener_impl.h

+3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ class UdpListenerImpl : public BaseListenerImpl,
4646
MonotonicTime receive_time) override;
4747
uint64_t maxDatagramSize() const override { return config_.max_rx_datagram_size_; }
4848
void onDatagramsDropped(uint32_t dropped) override { cb_.onDatagramsDropped(dropped); }
49+
size_t numPacketsExpectedPerEventLoop() const override {
50+
return cb_.numPacketsExpectedPerEventLoop();
51+
}
4952

5053
protected:
5154
void handleWriteCallback();

source/common/network/utility.cc

+23-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "common/network/io_socket_error_impl.h"
2424
#include "common/protobuf/protobuf.h"
2525
#include "common/protobuf/utility.h"
26+
#include "common/runtime/runtime_features.h"
2627

2728
#include "absl/container/fixed_array.h"
2829
#include "absl/strings/match.h"
@@ -576,10 +577,10 @@ void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer,
576577
Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
577578
const Address::Instance& local_address,
578579
UdpPacketProcessor& udp_packet_processor,
579-
MonotonicTime receive_time, bool prefer_gro,
580+
MonotonicTime receive_time, bool use_gro,
580581
uint32_t* packets_dropped) {
581582

582-
if (prefer_gro && handle.supportsUdpGro()) {
583+
if (use_gro) {
583584
Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
584585
IoHandle::RecvMsgOutput output(1, packets_dropped);
585586

@@ -696,11 +697,24 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
696697
UdpPacketProcessor& udp_packet_processor,
697698
TimeSource& time_source, bool prefer_gro,
698699
uint32_t& packets_dropped) {
700+
// Read at least one time, and attempt to read numPacketsExpectedPerEventLoop() packets unless
701+
// this goes over MAX_NUM_PACKETS_PER_EVENT_LOOP.
702+
size_t num_packets_to_read = std::min<size_t>(
703+
MAX_NUM_PACKETS_PER_EVENT_LOOP, udp_packet_processor.numPacketsExpectedPerEventLoop());
704+
const bool use_gro = prefer_gro && handle.supportsUdpGro();
705+
size_t num_reads =
706+
use_gro ? (num_packets_to_read / NUM_DATAGRAMS_PER_GRO_RECEIVE)
707+
: (handle.supportsMmsg() ? (num_packets_to_read / NUM_DATAGRAMS_PER_MMSG_RECEIVE)
708+
: num_packets_to_read);
709+
// Make sure to read at least once.
710+
num_reads = std::max<size_t>(1, num_reads);
711+
bool honor_read_limit =
712+
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.udp_per_event_loop_read_limit");
699713
do {
700714
const uint32_t old_packets_dropped = packets_dropped;
701715
const MonotonicTime receive_time = time_source.monotonicTime();
702716
Api::IoCallUint64Result result = Utility::readFromSocket(
703-
handle, local_address, udp_packet_processor, receive_time, prefer_gro, &packets_dropped);
717+
handle, local_address, udp_packet_processor, receive_time, use_gro, &packets_dropped);
704718

705719
if (!result.ok()) {
706720
// No more to read or encountered a system error.
@@ -723,6 +737,12 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
723737
delta);
724738
udp_packet_processor.onDatagramsDropped(delta);
725739
}
740+
if (honor_read_limit) {
741+
--num_reads;
742+
}
743+
if (num_reads == 0) {
744+
return std::move(result.err_);
745+
}
726746
} while (true);
727747
}
728748

source/common/network/utility.h

+11-3
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,17 @@ class UdpPacketProcessor {
6161
* the size of datagrams received, they will be dropped.
6262
*/
6363
virtual uint64_t maxDatagramSize() const PURE;
64+
65+
/**
66+
* An estimated number of packets to read in each read event.
67+
*/
68+
virtual size_t numPacketsExpectedPerEventLoop() const PURE;
6469
};
6570

6671
static const uint64_t DEFAULT_UDP_MAX_DATAGRAM_SIZE = 1500;
6772
static const uint64_t NUM_DATAGRAMS_PER_GRO_RECEIVE = 16;
6873
static const uint64_t NUM_DATAGRAMS_PER_MMSG_RECEIVE = 16;
74+
static const uint64_t MAX_NUM_PACKETS_PER_EVENT_LOOP = 6000;
6975

7076
/**
7177
* Wrapper which resolves UDP socket proto config with defaults.
@@ -362,18 +368,20 @@ class Utility {
362368
static Api::IoCallUint64Result readFromSocket(IoHandle& handle,
363369
const Address::Instance& local_address,
364370
UdpPacketProcessor& udp_packet_processor,
365-
MonotonicTime receive_time, bool prefer_gro,
371+
MonotonicTime receive_time, bool use_gro,
366372
uint32_t* packets_dropped);
367373

368374
/**
369-
* Read available packets from a given UDP socket and pass the packet to a given
370-
* UdpPacketProcessor.
375+
* Read some packets from a given UDP socket and pass the packet to a given
376+
* UdpPacketProcessor. Read no more than MAX_NUM_PACKETS_PER_EVENT_LOOP packets.
371377
* @param handle is the UDP socket to read from.
372378
* @param local_address is the socket's local address used to populate port.
373379
* @param udp_packet_processor is the callback to receive the packets.
374380
* @param time_source is the time source used to generate the time stamp of the received packets.
375381
* @param prefer_gro supplies whether to use GRO if the OS supports it.
376382
* @param packets_dropped is the output parameter for number of packets dropped in kernel.
383+
* Return the io error encountered or nullptr if no io error but read stopped
384+
* because of MAX_NUM_PACKETS_PER_EVENT_LOOP.
377385
*
378386
* TODO(mattklein123): Allow the number of packets read to be limited for fairness. Currently
379387
* this function will always return an error, even if EAGAIN. In the future

source/common/quic/active_quic_listener.cc

+22-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "common/quic/envoy_quic_proof_source.h"
1818
#include "common/quic/envoy_quic_utils.h"
1919
#include "common/quic/envoy_quic_utils.h"
20+
#include "common/quic/quic_network_connection.h"
2021
#include "common/runtime/runtime_features.h"
2122

2223
namespace Envoy {
@@ -26,25 +27,29 @@ ActiveQuicListener::ActiveQuicListener(
2627
uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher,
2728
Network::UdpConnectionHandler& parent, Network::ListenerConfig& listener_config,
2829
const quic::QuicConfig& quic_config, Network::Socket::OptionsSharedPtr options,
29-
bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled)
30+
bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
31+
uint32_t packets_received_to_connection_count_ratio)
3032
: ActiveQuicListener(worker_index, concurrency, dispatcher, parent,
3133
listener_config.listenSocketFactory().getListenSocket(), listener_config,
32-
quic_config, std::move(options), kernel_worker_routing, enabled) {}
34+
quic_config, std::move(options), kernel_worker_routing, enabled,
35+
packets_received_to_connection_count_ratio) {}
3336

3437
ActiveQuicListener::ActiveQuicListener(
3538
uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher,
3639
Network::UdpConnectionHandler& parent, Network::SocketSharedPtr listen_socket,
3740
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
3841
Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing,
39-
const envoy::config::core::v3::RuntimeFeatureFlag& enabled)
42+
const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
43+
uint32_t packets_to_read_to_connection_count_ratio)
4044
: Server::ActiveUdpListenerBase(
4145
worker_index, concurrency, parent, *listen_socket,
4246
dispatcher.createUdpListener(
4347
listen_socket, *this,
4448
listener_config.udpListenerConfig()->config().downstream_socket_config()),
4549
&listener_config),
4650
dispatcher_(dispatcher), version_manager_(quic::CurrentSupportedVersions()),
47-
kernel_worker_routing_(kernel_worker_routing) {
51+
kernel_worker_routing_(kernel_worker_routing),
52+
packets_to_read_to_connection_count_ratio_(packets_to_read_to_connection_count_ratio) {
4853
// This flag fix a QUICHE issue which may crash Envoy during connection close.
4954
SetQuicReloadableFlag(quic_single_ack_in_packet2, true);
5055
// Do not include 32-byte per-entry overhead while counting header size.
@@ -215,9 +220,18 @@ uint32_t ActiveQuicListener::destination(const Network::UdpRecvData& data) const
215220
return connection_id_snippet % concurrency_;
216221
}
217222

223+
size_t ActiveQuicListener::numPacketsExpectedPerEventLoop() const {
224+
// Expect each session to read packets_to_read_to_connection_count_ratio_ number of packets in
225+
// this read event.
226+
return quic_dispatcher_->NumSessions() * packets_to_read_to_connection_count_ratio_;
227+
}
228+
218229
ActiveQuicListenerFactory::ActiveQuicListenerFactory(
219230
const envoy::config::listener::v3::QuicProtocolOptions& config, uint32_t concurrency)
220-
: concurrency_(concurrency), enabled_(config.enabled()) {
231+
: concurrency_(concurrency), enabled_(config.enabled()),
232+
packets_to_read_to_connection_count_ratio_(
233+
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, packets_to_read_to_connection_count_ratio,
234+
DEFAULT_PACKETS_TO_READ_PER_CONNECTION)) {
221235
uint64_t idle_network_timeout_ms =
222236
config.has_idle_timeout() ? DurationUtil::durationToMilliseconds(config.idle_timeout())
223237
: 300000;
@@ -300,9 +314,9 @@ Network::ConnectionHandler::ActiveUdpListenerPtr ActiveQuicListenerFactory::crea
300314
}
301315
#endif
302316

303-
return std::make_unique<ActiveQuicListener>(worker_index, concurrency_, disptacher, parent,
304-
config, quic_config_, std::move(options),
305-
kernel_worker_routing, enabled_);
317+
return std::make_unique<ActiveQuicListener>(
318+
worker_index, concurrency_, disptacher, parent, config, quic_config_, std::move(options),
319+
kernel_worker_routing, enabled_, packets_to_read_to_connection_count_ratio_);
306320
} // namespace Quic
307321

308322
} // namespace Quic

0 commit comments

Comments
 (0)