Skip to content

Commit 49256bd

Browse files
committed
Reduce relaying of validation and proposal messages.
A server may receive multiple copies of a message from its directly connected peers. We select three peers as the source of validation and proposal messages from the validator and "squelch" other peers. This change reduces CPU and bandwidth costs and improves link latency.
1 parent 7b048b4 commit 49256bd

21 files changed

+2689
-39
lines changed

Builds/CMake/RippledCore.cmake

+1
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,7 @@ target_sources (rippled PRIVATE
863863
src/test/overlay/cluster_test.cpp
864864
src/test/overlay/short_read_test.cpp
865865
src/test/overlay/compression_test.cpp
866+
src/test/overlay/reduce_relay_test.cpp
866867
#[===============================[
867868
test sources:
868869
subdir: peerfinder

src/ripple/app/consensus/RCLConsensus.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos)
157157
auto const sig = peerPos.signature();
158158
prop.set_signature(sig.data(), sig.size());
159159

160-
app_.overlay().relay(prop, peerPos.suppressionID());
160+
app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
161161
}
162162

163163
void

src/ripple/app/misc/HashRouter.cpp

+10-3
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,19 @@ HashRouter::addSuppression(uint256 const& key)
4949

5050
bool
5151
HashRouter::addSuppressionPeer(uint256 const& key, PeerShortID peer)
52+
{
53+
auto [added, _] = addSuppressionPeerWithStatus(key, peer);
54+
return added;
55+
}
56+
57+
std::pair<bool, std::optional<Stopwatch::time_point>>
58+
HashRouter::addSuppressionPeerWithStatus(const uint256& key, PeerShortID peer)
5259
{
5360
std::lock_guard lock(mutex_);
5461

5562
auto result = emplace(key);
5663
result.first.addPeer(peer);
57-
return result.second;
64+
return {result.second, result.first.relayed()};
5865
}
5966

6067
bool
@@ -110,14 +117,14 @@ HashRouter::setFlags(uint256 const& key, int flags)
110117

111118
auto
112119
HashRouter::shouldRelay(uint256 const& key)
113-
-> boost::optional<std::set<PeerShortID>>
120+
-> std::optional<std::set<PeerShortID>>
114121
{
115122
std::lock_guard lock(mutex_);
116123

117124
auto& s = emplace(key).first;
118125

119126
if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_))
120-
return boost::none;
127+
return {};
121128

122129
return s.releasePeerSet();
123130
}

src/ripple/app/misc/HashRouter.h

+19-4
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ class HashRouter
9797
return std::move(peers_);
9898
}
9999

100+
/** Return seated relay time point if the message has been relayed */
101+
std::optional<Stopwatch::time_point>
102+
relayed() const
103+
{
104+
return relayed_;
105+
}
106+
100107
/** Determines if this item should be relayed.
101108
102109
Checks whether the item has been recently relayed.
@@ -142,8 +149,8 @@ class HashRouter
142149
std::set<PeerShortID> peers_;
143150
// This could be generalized to a map, if more
144151
// than one flag needs to expire independently.
145-
boost::optional<Stopwatch::time_point> relayed_;
146-
boost::optional<Stopwatch::time_point> processed_;
152+
std::optional<Stopwatch::time_point> relayed_;
153+
std::optional<Stopwatch::time_point> processed_;
147154
std::uint32_t recoveries_ = 0;
148155
};
149156

@@ -185,6 +192,14 @@ class HashRouter
185192
bool
186193
addSuppressionPeer(uint256 const& key, PeerShortID peer);
187194

195+
/** Add a suppression peer and get message's relay status.
196+
* Return pair:
197+
* element 1: true if the peer is added.
198+
* element 2: optional is seated to the relay time point or
199+
* is unseated if has not relayed yet. */
200+
std::pair<bool, std::optional<Stopwatch::time_point>>
201+
addSuppressionPeerWithStatus(uint256 const& key, PeerShortID peer);
202+
188203
bool
189204
addSuppressionPeer(uint256 const& key, PeerShortID peer, int& flags);
190205

@@ -214,11 +229,11 @@ class HashRouter
214229
return `true` again until the hold time has expired.
215230
The internal set of peers will also be reset.
216231
217-
@return A `boost::optional` set of peers which do not need to be
232+
@return A `std::optional` set of peers which do not need to be
218233
relayed to. If the result is uninitialized, the item should
219234
_not_ be relayed.
220235
*/
221-
boost::optional<std::set<PeerShortID>>
236+
std::optional<std::set<PeerShortID>>
222237
shouldRelay(uint256 const& key);
223238

224239
/** Determines whether the hashed item should be recovered

src/ripple/core/Config.h

+6
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,12 @@ class Config : public BasicConfig
179179
// Thread pool configuration
180180
std::size_t WORKERS = 0;
181181

182+
// Reduce-relay
183+
// Enable reduce-relay functionality
184+
bool REDUCE_RELAY_ENABLE = false;
185+
// Send squelch message to peers
186+
bool REDUCE_RELAY_SQUELCH = false;
187+
182188
// These override the command line client settings
183189
boost::optional<beast::IP::Endpoint> rpc_ip;
184190

src/ripple/core/ConfigSections.h

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ struct ConfigSection
7070
#define SECTION_PATH_SEARCH_MAX "path_search_max"
7171
#define SECTION_PEER_PRIVATE "peer_private"
7272
#define SECTION_PEERS_MAX "peers_max"
73+
#define SECTION_REDUCE_RELAY "reduce_relay"
7374
#define SECTION_RELAY_PROPOSALS "relay_proposals"
7475
#define SECTION_RELAY_VALIDATIONS "relay_validations"
7576
#define SECTION_RPC_STARTUP "rpc_startup"

src/ripple/core/impl/Config.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,13 @@ Config::loadFromString(std::string const& fileContents)
481481
if (getSingleSection(secConfig, SECTION_COMPRESSION, strTemp, j_))
482482
COMPRESSION = beast::lexicalCastThrow<bool>(strTemp);
483483

484+
if (exists(SECTION_REDUCE_RELAY))
485+
{
486+
auto sec = section(SECTION_REDUCE_RELAY);
487+
REDUCE_RELAY_ENABLE = sec.value_or("enable", false);
488+
REDUCE_RELAY_SQUELCH = sec.value_or("squelch", false);
489+
}
490+
484491
if (getSingleSection(
485492
secConfig, SECTION_AMENDMENT_MAJORITY_TIME, strTemp, j_))
486493
{

src/ripple/overlay/Message.h

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
2222

2323
#include <ripple/overlay/Compression.h>
24+
#include <ripple/protocol/PublicKey.h>
2425
#include <ripple/protocol/messages.h>
2526
#include <boost/asio/buffer.hpp>
2627
#include <boost/asio/buffers_iterator.hpp>
@@ -55,8 +56,13 @@ class Message : public std::enable_shared_from_this<Message>
5556
/** Constructor
5657
* @param message Protocol message to serialize
5758
* @param type Protocol message type
59+
* @param validator Public Key of the source validator for Validation or
60+
* Proposal message. Used to check if the message should be squelched.
5861
*/
59-
Message(::google::protobuf::Message const& message, int type);
62+
Message(
63+
::google::protobuf::Message const& message,
64+
int type,
65+
boost::optional<PublicKey> const& validator = {});
6066

6167
/** Retrieve the packed message data. If compressed message is requested but
6268
* the message is not compressible then the uncompressed buffer is returned.
@@ -74,11 +80,19 @@ class Message : public std::enable_shared_from_this<Message>
7480
return category_;
7581
}
7682

83+
/** Get the validator's key */
84+
boost::optional<PublicKey> const&
85+
getValidatorKey() const
86+
{
87+
return validatorKey_;
88+
}
89+
7790
private:
7891
std::vector<uint8_t> buffer_;
7992
std::vector<uint8_t> bufferCompressed_;
8093
std::size_t category_;
8194
std::once_flag once_flag_;
95+
boost::optional<PublicKey> validatorKey_;
8296

8397
/** Set the payload header
8498
* @param in Pointer to the payload

src/ripple/overlay/Overlay.h

+22-8
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ class Overlay : public Stoppable, public beast::PropertyStream::Source
133133

134134
/** Returns the peer with the matching short id, or null. */
135135
virtual std::shared_ptr<Peer>
136-
findPeerByShortID(Peer::id_t const& id) = 0;
136+
findPeerByShortID(Peer::id_t const& id) const = 0;
137137

138138
/** Returns the peer with the matching public key, or null. */
139139
virtual std::shared_ptr<Peer>
@@ -147,13 +147,27 @@ class Overlay : public Stoppable, public beast::PropertyStream::Source
147147
virtual void
148148
broadcast(protocol::TMValidation& m) = 0;
149149

150-
/** Relay a proposal. */
151-
virtual void
152-
relay(protocol::TMProposeSet& m, uint256 const& uid) = 0;
153-
154-
/** Relay a validation. */
155-
virtual void
156-
relay(protocol::TMValidation& m, uint256 const& uid) = 0;
150+
/** Relay a proposal. Return set
151+
* of peers, which have already seen the
152+
* message; i.e. the message has been
153+
* received from these peers and added
154+
* to the hash router */
155+
virtual std::set<Peer::id_t>
156+
relay(
157+
protocol::TMProposeSet& m,
158+
uint256 const& uid,
159+
PublicKey const& validator) = 0;
160+
161+
/** Relay a validation. Return set
162+
* of peers, which have already seen the
163+
* message; i.e. the message has been
164+
* received from these peers and added
165+
* to the hash router */
166+
virtual std::set<Peer::id_t>
167+
relay(
168+
protocol::TMValidation& m,
169+
uint256 const& uid,
170+
PublicKey const& validator) = 0;
157171

158172
/** Visit every active peer.
159173
*

0 commit comments

Comments
 (0)