Skip to content

Commit

Permalink
Merge pull request #4848 from pwojcikdev/network-channel-filtering
Browse files Browse the repository at this point in the history
Filter channels with spare capacity when broadcasting
  • Loading branch information
pwojcikdev authored Feb 20, 2025
2 parents af1af34 + c34b61f commit 3547b45
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 47 deletions.
4 changes: 2 additions & 2 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ TEST (active_elections, fork_replacement_tally)
node_config.peering_port = system.get_available_port ();
auto & node2 (*system.add_node (node_config));
node1.network.filter.clear ();
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
ASSERT_TRUE (node2.network.flood_block (send_last, nano::transport::traffic_type::test));
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0);

// Correct block without votes is ignored
Expand All @@ -985,7 +985,7 @@ TEST (active_elections, fork_replacement_tally)
// ensure vote arrives before the block
ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ());
node1.network.filter.clear ();
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
ASSERT_TRUE (node2.network.flood_block (send_last, nano::transport::traffic_type::test));
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1);

// the send_last block should replace one of the existing block of the election because it has higher vote weight
Expand Down
22 changes: 20 additions & 2 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ TEST (network, send_discarded_publish)
.build ();
{
auto transaction = node1.ledger.tx_begin_read ();
node1.network.flood_block (block, nano::transport::traffic_type::test);
auto sent = node1.network.flood_block (block, nano::transport::traffic_type::test);
ASSERT_EQ (1, sent);
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
}
Expand All @@ -221,7 +222,8 @@ TEST (network, send_invalid_publish)
.build ();
{
auto transaction = node1.ledger.tx_begin_read ();
node1.network.flood_block (block, nano::transport::traffic_type::test);
auto sent = node1.network.flood_block (block, nano::transport::traffic_type::test);
ASSERT_EQ (1, sent);
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
}
Expand Down Expand Up @@ -1149,3 +1151,19 @@ TEST (network, purge_dead_channel_remote)
};
ASSERT_TIMELY (5s, !channel_exists (node2, channel));
}

TEST (network, flood_vote)
{
nano::test::system system{ 4 };

auto & node = *system.nodes[0];

// Make one of the nodes a representative
system.wallet (1)->insert_adhoc (nano::dev::genesis_key.prv);
ASSERT_TIMELY_EQ (5s, node.rep_crawler.representative_count (), 1);

auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () });
ASSERT_EQ (3, node.network.flood_vote_rebroadcasted (vote, 999.0f));
ASSERT_EQ (2, node.network.flood_vote_non_pr (vote, 999.0f));
ASSERT_EQ (1, node.network.flood_vote_pr (vote));
}
4 changes: 2 additions & 2 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2238,11 +2238,11 @@ TEST (node, DISABLED_fork_invalid_block_signature)
node1.process_active (send1);
ASSERT_TIMELY (5s, node1.block (send1->hash ()));
// Send the vote with the corrupt block signature
node2.network.flood_vote (vote_corrupt, 1.0f);
ASSERT_TRUE (node2.network.flood_vote_rebroadcasted (vote_corrupt, 1.0f));
// Wait for the rollback
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::rollback));
// Send the vote with the correct block
node2.network.flood_vote (vote, 1.0f);
ASSERT_TRUE (node2.network.flood_vote_rebroadcasted (vote, 1.0f));
ASSERT_TIMELY (10s, !node1.block (send1->hash ()));
ASSERT_TIMELY (10s, node1.block (send2->hash ()));
ASSERT_EQ (node1.block (send2->hash ())->block_signature (), send2->block_signature ());
Expand Down
5 changes: 4 additions & 1 deletion nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ enum class detail
error,
failed,
refresh,
sent,

// processing queue
queue,
Expand Down Expand Up @@ -432,11 +433,13 @@ enum class detail
cleanup_outdated,
erase_stale,

// vote generator
// vote_generator
generator_broadcasts,
generator_replies,
generator_replies_discarded,
generator_spacing,
sent_pr,
sent_non_pr,

// hinting
missing_block,
Expand Down
101 changes: 74 additions & 27 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,72 +247,109 @@ void nano::network::send_keepalive_self (std::shared_ptr<nano::transport::channe
channel->send (message, nano::transport::traffic_type::keepalive);
}

void nano::network::flood_message (nano::message const & message, nano::transport::traffic_type type, float scale) const
size_t nano::network::flood_message (nano::message const & message, nano::transport::traffic_type type, float scale) const
{
for (auto const & channel : list (fanout (scale)))
auto channels = list (fanout (scale), [type] (auto const & channel) {
return !channel->max (type); // Only use channels that are not full for this traffic type
});
size_t result = 0;
for (auto const & channel : channels)
{
channel->send (message, type);
bool sent = channel->send (message, type);
result += sent;
}
return result;
}

void nano::network::flood_keepalive (float scale) const
size_t nano::network::flood_keepalive (float scale) const
{
nano::keepalive message{ node.network_params.network };
random_fill (message.peers);
flood_message (message, nano::transport::traffic_type::keepalive, scale);
return flood_message (message, nano::transport::traffic_type::keepalive, scale);
}

void nano::network::flood_keepalive_self (float scale) const
size_t nano::network::flood_keepalive_self (float scale) const
{
nano::keepalive message{ node.network_params.network };
fill_keepalive_self (message.peers);
flood_message (message, nano::transport::traffic_type::keepalive, scale);
return flood_message (message, nano::transport::traffic_type::keepalive, scale);
}

void nano::network::flood_block (std::shared_ptr<nano::block> const & block, nano::transport::traffic_type type) const
size_t nano::network::flood_block (std::shared_ptr<nano::block> const & block, nano::transport::traffic_type type) const
{
nano::publish message{ node.network_params.network, block };
flood_message (message, type);
return flood_message (message, type);
}

void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block) const
size_t nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block) const
{
nano::publish message{ node.network_params.network, block, /* is_originator */ true };

size_t result = 0;
for (auto const & rep : node.rep_crawler.principal_representatives ())
{
rep.channel->send (message, nano::transport::traffic_type::block_broadcast_initial);
bool sent = rep.channel->send (message, nano::transport::traffic_type::block_broadcast_initial);
result += sent;
}
for (auto & peer : list_non_pr (fanout (1.0)))
{
peer->send (message, nano::transport::traffic_type::block_broadcast_initial);
bool sent = peer->send (message, nano::transport::traffic_type::block_broadcast_initial);
result += sent;
}
return result;
}

void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted) const
size_t nano::network::flood_vote_rebroadcasted (std::shared_ptr<nano::vote> const & vote, float scale) const
{
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
for (auto & channel : list (fanout (scale)))
nano::confirm_ack message{ node.network_params.network, vote, /* rebroadcasted */ true };

auto const type = nano::transport::traffic_type::vote_rebroadcast;

auto channels = list (fanout (scale), [type] (auto const & channel) {
return !channel->max (type); // Only use channels that are not full for this traffic type
});

size_t result = 0;
for (auto & channel : channels)
{
channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote);
bool sent = channel->send (message, type);
result += sent;
}
return result;
}

void nano::network::flood_vote_non_pr (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted) const
size_t nano::network::flood_vote_non_pr (std::shared_ptr<nano::vote> const & vote, float scale) const
{
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
for (auto & channel : list_non_pr (fanout (scale)))
nano::confirm_ack message{ node.network_params.network, vote };

auto const type = transport::traffic_type::vote;

auto channels = list_non_pr (fanout (scale), [type] (auto const & channel) {
return !channel->max (type); // Only use channels that are not full for this traffic type
});

size_t result = 0;
for (auto & channel : channels)
{
channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote);
bool sent = channel->send (message, type);
result += sent;
}
return result;
}

void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote, bool rebroadcasted) const
size_t nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote) const
{
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
nano::confirm_ack message{ node.network_params.network, vote };

auto const type = nano::transport::traffic_type::vote;

size_t result = 0;
for (auto const & channel : node.rep_crawler.principal_representatives ())
{
channel.channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote);
bool sent = channel.channel->send (message, type);
result += sent;
}
return result;
}

void nano::network::flood_block_many (std::deque<std::shared_ptr<nano::block>> blocks, nano::transport::traffic_type type, std::chrono::milliseconds delay, std::function<void ()> callback) const
Expand Down Expand Up @@ -397,9 +434,9 @@ bool nano::network::track_reachout (nano::endpoint const & endpoint_a)
return tcp_channels.track_reachout (endpoint_a);
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t max_count, uint8_t minimum_version) const
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t max_count, channel_filter filter) const
{
auto result = tcp_channels.list (minimum_version);
auto result = tcp_channels.list (filter);
nano::random_pool_shuffle (result.begin (), result.end ()); // Randomize returned peer order
if (max_count > 0 && result.size () > max_count)
{
Expand All @@ -408,9 +445,9 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::
return result;
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (std::size_t max_count, uint8_t minimum_version) const
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (std::size_t max_count, channel_filter filter) const
{
auto result = tcp_channels.list (minimum_version);
auto result = tcp_channels.list (filter);

auto partition_point = std::partition (result.begin (), result.end (),
[this] (std::shared_ptr<nano::transport::channel> const & channel) {
Expand All @@ -427,6 +464,16 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr
return result;
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t max_count, uint8_t minimum_version) const
{
return list (max_count, [minimum_version] (auto const & channel) { return channel->get_network_version () >= minimum_version; });
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (std::size_t max_count, uint8_t minimum_version) const
{
return list_non_pr (max_count, [minimum_version] (auto const & channel) { return channel->get_network_version () >= minimum_version; });
}

// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability
std::size_t nano::network::fanout (float scale) const
{
Expand Down
25 changes: 15 additions & 10 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ class network final

nano::endpoint endpoint () const;

void flood_message (nano::message const &, nano::transport::traffic_type, float scale = 1.0f) const;
void flood_keepalive (float scale = 1.0f) const;
void flood_keepalive_self (float scale = 0.5f) const;
void flood_vote (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false) const;
void flood_vote_pr (std::shared_ptr<nano::vote> const &, bool rebroadcasted = false) const;
void flood_vote_non_pr (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false) const;
size_t flood_message (nano::message const &, nano::transport::traffic_type, float scale = 1.0f) const;
size_t flood_keepalive (float scale = 1.0f) const;
size_t flood_keepalive_self (float scale = 0.5f) const;
size_t flood_vote_pr (std::shared_ptr<nano::vote> const &) const;
size_t flood_vote_non_pr (std::shared_ptr<nano::vote> const &, float scale) const;
size_t flood_vote_rebroadcasted (std::shared_ptr<nano::vote> const &, float scale) const;
// Flood block to all PRs and a random selection of non-PRs
void flood_block_initial (std::shared_ptr<nano::block> const &) const;
size_t flood_block_initial (std::shared_ptr<nano::block> const &) const;
// Flood block to a random selection of peers
void flood_block (std::shared_ptr<nano::block> const &, nano::transport::traffic_type) const;
size_t flood_block (std::shared_ptr<nano::block> const &, nano::transport::traffic_type) const;
void flood_block_many (std::deque<std::shared_ptr<nano::block>>, nano::transport::traffic_type, std::chrono::milliseconds delay = 10ms, std::function<void ()> callback = nullptr) const;

void send_keepalive (std::shared_ptr<nano::transport::channel> const &) const;
Expand All @@ -121,8 +121,13 @@ class network final
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
bool track_reachout (nano::endpoint const &);

std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count = 0, uint8_t minimum_version = 0) const;
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t max_count, uint8_t minimum_version = 0) const;
using channel_filter = std::function<bool (std::shared_ptr<nano::transport::channel> const &)>;

std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count = 0, channel_filter = nullptr) const;
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t max_count = 0, channel_filter = nullptr) const;

std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count, uint8_t minimum_version) const;
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t max_count, uint8_t minimum_version) const;

// Desired fanout for a given scale
std::size_t fanout (float scale = 1.0f) const;
Expand Down
15 changes: 15 additions & 0 deletions nano/node/transport/tcp_channels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,21 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_chann
return result;
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::list (channel_filter filter) const
{
nano::lock_guard<nano::mutex> lock{ mutex };

std::deque<std::shared_ptr<nano::transport::channel>> result;
for (auto const & entry : channels)
{
if (filter == nullptr || filter (entry.channel))
{
result.push_back (entry.channel);
}
}
return result;
}

bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint)
{
return node.tcp_listener.connect (endpoint.address (), endpoint.port ());
Expand Down
4 changes: 4 additions & 0 deletions nano/node/transport/tcp_channels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ class tcp_channels final
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
bool track_reachout (nano::endpoint const &);
void purge (std::chrono::steady_clock::time_point cutoff_deadline);

using channel_filter = std::function<bool (std::shared_ptr<nano::transport::channel> const &)>;
std::deque<std::shared_ptr<nano::transport::channel>> list (channel_filter) const;
std::deque<std::shared_ptr<nano::transport::channel>> list (uint8_t minimum_version = 0) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t max_count, uint8_t minimum_version = 0) const;

void keepalive ();
std::optional<nano::keepalive> sample_keepalive ();

Expand Down
8 changes: 6 additions & 2 deletions nano/node/vote_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,13 @@ void nano::vote_generator::vote (std::vector<nano::block_hash> const & hashes_a,

void nano::vote_generator::broadcast_action (std::shared_ptr<nano::vote> const & vote_a) const
{
network.flood_vote_pr (vote_a);
network.flood_vote_non_pr (vote_a, 2.0f);
vote_processor.vote (vote_a, inproc_channel);

auto sent_pr = network.flood_vote_pr (vote_a);
auto sent_non_pr = network.flood_vote_non_pr (vote_a, 2.0f);

stats.add (nano::stat::type::vote_generator, nano::stat::detail::sent_pr, sent_pr);
stats.add (nano::stat::type::vote_generator, nano::stat::detail::sent_non_pr, sent_non_pr);
}

void nano::vote_generator::run ()
Expand Down
4 changes: 3 additions & 1 deletion nano/node/vote_rebroadcaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ void nano::vote_rebroadcaster::run ()

stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast);
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast_hashes, vote->hashes.size ());
network.flood_vote (vote, 0.5f, /* rebroadcasted */ true); // TODO: Track number of peers that we sent the vote to

auto sent = network.flood_vote_rebroadcasted (vote, 0.5f);
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::sent, sent);

lock.lock ();
}
Expand Down

0 comments on commit 3547b45

Please sign in to comment.