Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.0] P2P: Throttle over sync window #1811

Merged
merged 3 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,8 @@ namespace eosio {
std::atomic<size_t> bytes_sent{0};
std::atomic<size_t> block_sync_bytes_received{0};
std::atomic<size_t> block_sync_bytes_sent{0};
std::chrono::nanoseconds block_sync_send_start{0ns}; // start of enqueue blocks
size_t block_sync_send_bytes_sent{0}; // bytes sent in this set of enqueue blocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate to quibble over naming but block_sync_bytes_sent and block_sync_send_bytes_sent just blow my mind.
Can we rename this to something like:
block_sync_total_bytes_sent
block_sync_frame_bytes_sent ?

std::atomic<bool> block_sync_throttling{false};
std::atomic<std::chrono::nanoseconds> last_bytes_sent{0ns};
std::atomic<boost::asio::ip::port_type> remote_endpoint_port{0};
Expand Down Expand Up @@ -1460,6 +1462,9 @@ namespace eosio {
latest_msg_time = std::chrono::system_clock::time_point::min();
latest_blk_time = std::chrono::system_clock::time_point::min();
set_state(connection_state::closed);
block_sync_send_start = 0ns;
block_sync_send_bytes_sent = 0;
block_sync_throttling = false;

if( reconnect && !shutdown ) {
my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ),
Expand Down Expand Up @@ -1740,25 +1745,38 @@ namespace eosio {
} FC_LOG_AND_DROP();
if( sb ) {
// Skip transmitting block this loop if threshold exceeded
if( block_sync_rate_limit > 0 && peer_syncing_from_us ) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(get_time() - connection_start_time);
auto current_rate = double(block_sync_bytes_sent) / elapsed.count();
if (block_sync_send_start == 0ns) { // start of enqueue blocks
block_sync_send_start = get_time();
block_sync_send_bytes_sent = 0;
}
if( block_sync_rate_limit > 0 && block_sync_send_bytes_sent > 0 && peer_syncing_from_us ) {
auto now = get_time();
auto elapsed_us = std::chrono::duration_cast<std::chrono::microseconds>(now - block_sync_send_start);
double current_rate = (double(block_sync_bytes_sent) / elapsed_us.count()) * 100000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are changing variables names from my comment above, please make this line a bit more readable by renaming current_rate to current_rate_sec or something like that. it will make it obvious that these are seconds and why you divide by 100'000. This is optional in case you are changing the code.

peer_dlog(this, "start enqueue block time ${st}, now ${t}, elapsed ${e}, rate ${r}, limit ${l}",
("st", block_sync_send_start.count())("t", now.count())("e", elapsed_us.count())("r", current_rate)("l", block_sync_rate_limit));
if( current_rate >= block_sync_rate_limit ) {
block_sync_throttling = true;
peer_dlog( this, "throttling block sync to peer ${host}:${port}", ("host", log_remote_endpoint_ip)("port", log_remote_endpoint_port));
return false;
}
}
block_sync_throttling = false;
block_sync_bytes_sent += enqueue_block( sb, true );
auto sent = enqueue_block( sb, true );
block_sync_bytes_sent += sent;
block_sync_send_bytes_sent += sent;
++peer_requested->last;
if(num == peer_requested->end_block) {
peer_requested.reset();
block_sync_send_start = 0ns;
block_sync_send_bytes_sent = 0;
peer_dlog( this, "completing enqueue_sync_block ${num}", ("num", num) );
}
} else {
peer_ilog( this, "enqueue sync, unable to fetch block ${num}, sending benign_other go away", ("num", num) );
peer_requested.reset(); // unable to provide requested blocks
block_sync_send_start = 0ns;
block_sync_send_bytes_sent = 0;
no_retry = benign_other;
enqueue( go_away_message( benign_other ) );
}
Expand Down
15 changes: 8 additions & 7 deletions tests/p2p_sync_throttle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
appArgs.add(flag='--plugin',action='append',type=str,help='Run nodes with additional plugins')
appArgs.add(flag='--connection-cleanup-period',type=int,help='Interval in whole seconds to run the connection reaper and metric collection')

args=TestHelper.parse_args({"-p","-d","--keep-logs","--prod-count"
args=TestHelper.parse_args({"-d","--keep-logs"
,"--dump-error-details","-v","--leave-running"
,"--unshared"},
applicationSpecificArgs=appArgs)
pnodes=args.p
pnodes=1
delay=args.d
debug=args.v
prod_count = args.prod_count
prod_count = 2
total_nodes=4
dumpErrorDetails=args.dump_error_details

Expand Down Expand Up @@ -106,10 +106,11 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
throttlingNode = cluster.unstartedNodes[0]
i = throttlingNode.cmd.index('--p2p-listen-endpoint')
throttleListenAddr = throttlingNode.cmd[i+1]
# Using 4000 bytes per second to allow syncing of ~250 transaction blocks resulting from
# the trx generators in a reasonable amount of time, while still being able to capture
# Using 40 Kilobytes per second to allow syncing of ~250 transaction blocks at ~175 bytes per transaction
# (250*175=43750 per block or 87500 per second)
# resulting from the trx generators in a reasonable amount of time, while still being able to capture
# throttling state within the Prometheus update window (3 seconds in this test).
throttlingNode.cmd[i+1] = throttlingNode.cmd[i+1] + ':4000B/s'
throttlingNode.cmd[i+1] = throttlingNode.cmd[i+1] + ':40KB/s'
throttleListenIP, throttleListenPort = throttleListenAddr.split(':')
throttlingNode.cmd.append('--p2p-listen-endpoint')
throttlingNode.cmd.append(f'{throttleListenIP}:{int(throttleListenPort)+100}:1TB/s')
Expand Down Expand Up @@ -213,7 +214,7 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
if throttledState:
wasThrottled = True
break
assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=30), f'Wait for block {endLargeBlocksHeadBlock} on sync node timed out'
assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=90), f'Wait for block {endLargeBlocksHeadBlock} on sync node timed out'
endThrottledSync = time.time()
response = throttledNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode()
Print('Throttled Node End State')
Expand Down