Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Validation and Proposal message relaying #3412

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ target_sources (rippled PRIVATE
src/test/overlay/cluster_test.cpp
src/test/overlay/short_read_test.cpp
src/test/overlay/compression_test.cpp
src/test/overlay/reduce_relay_test.cpp
#[===============================[
test sources:
subdir: peerfinder
Expand Down
1 change: 1 addition & 0 deletions bin/ci/ubuntu/build-and-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ else
# ORDER matters here...sorted in approximately
# descending execution time (longest running tests at top)
declare -a manual_tests=(
'ripple.ripple_data.reduce_relay_simulate'
'ripple.ripple_data.digest'
'ripple.tx.Offer_manual'
'ripple.app.PayStrandAllPairs'
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos)
auto const sig = peerPos.signature();
prop.set_signature(sig.data(), sig.size());

app_.overlay().relay(prop, peerPos.suppressionID());
app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
}

void
Expand Down
12 changes: 9 additions & 3 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@ HashRouter::addSuppression(uint256 const& key)

bool
HashRouter::addSuppressionPeer(uint256 const& key, PeerShortID peer)
{
return addSuppressionPeerWithStatus(key, peer).first;
}

std::pair<bool, std::optional<Stopwatch::time_point>>
HashRouter::addSuppressionPeerWithStatus(const uint256& key, PeerShortID peer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HashRouter API was already confusing enough. It's now become a little too confusing. I don't have an improvement to offer, but we need to think about this.

{
std::lock_guard lock(mutex_);

auto result = emplace(key);
result.first.addPeer(peer);
return result.second;
return {result.second, result.first.relayed()};
}

bool
Expand Down Expand Up @@ -110,14 +116,14 @@ HashRouter::setFlags(uint256 const& key, int flags)

auto
HashRouter::shouldRelay(uint256 const& key)
-> boost::optional<std::set<PeerShortID>>
-> std::optional<std::set<PeerShortID>>
{
std::lock_guard lock(mutex_);

auto& s = emplace(key).first;

if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_))
return boost::none;
return {};

return s.releasePeerSet();
}
Expand Down
23 changes: 19 additions & 4 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ class HashRouter
return std::move(peers_);
}

/** Return seated relay time point if the message has been relayed */
std::optional<Stopwatch::time_point>
relayed() const
{
return relayed_;
}

/** Determines if this item should be relayed.

Checks whether the item has been recently relayed.
Expand Down Expand Up @@ -142,8 +149,8 @@ class HashRouter
std::set<PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
boost::optional<Stopwatch::time_point> relayed_;
boost::optional<Stopwatch::time_point> processed_;
std::optional<Stopwatch::time_point> relayed_;
std::optional<Stopwatch::time_point> processed_;
std::uint32_t recoveries_ = 0;
};

Expand Down Expand Up @@ -185,6 +192,14 @@ class HashRouter
bool
addSuppressionPeer(uint256 const& key, PeerShortID peer);

/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the peer is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>
addSuppressionPeerWithStatus(uint256 const& key, PeerShortID peer);

bool
addSuppressionPeer(uint256 const& key, PeerShortID peer, int& flags);

Expand Down Expand Up @@ -214,11 +229,11 @@ class HashRouter
return `true` again until the hold time has expired.
The internal set of peers will also be reset.

@return A `boost::optional` set of peers which do not need to be
@return A `std::optional` set of peers which do not need to be
relayed to. If the result is uninitialized, the item should
_not_ be relayed.
*/
boost::optional<std::set<PeerShortID>>
std::optional<std::set<PeerShortID>>
shouldRelay(uint256 const& key);

/** Determines whether the hashed item should be recovered
Expand Down
6 changes: 6 additions & 0 deletions src/ripple/core/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ class Config : public BasicConfig
// Thread pool configuration
std::size_t WORKERS = 0;

// Reduce-relay - these parameters are experimental.
// Enable reduce-relay functionality
bool REDUCE_RELAY_ENABLE = false;
// Send squelch message to peers
bool REDUCE_RELAY_SQUELCH = false;

// These override the command line client settings
boost::optional<beast::IP::Endpoint> rpc_ip;

Expand Down
1 change: 1 addition & 0 deletions src/ripple/core/ConfigSections.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct ConfigSection
#define SECTION_PATH_SEARCH_MAX "path_search_max"
#define SECTION_PEER_PRIVATE "peer_private"
#define SECTION_PEERS_MAX "peers_max"
#define SECTION_REDUCE_RELAY "reduce_relay"
#define SECTION_RELAY_PROPOSALS "relay_proposals"
#define SECTION_RELAY_VALIDATIONS "relay_validations"
#define SECTION_RPC_STARTUP "rpc_startup"
Expand Down
7 changes: 7 additions & 0 deletions src/ripple/core/impl/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,13 @@ Config::loadFromString(std::string const& fileContents)
if (getSingleSection(secConfig, SECTION_COMPRESSION, strTemp, j_))
COMPRESSION = beast::lexicalCastThrow<bool>(strTemp);

if (exists(SECTION_REDUCE_RELAY))
{
auto sec = section(SECTION_REDUCE_RELAY);
REDUCE_RELAY_ENABLE = sec.value_or("enable", false);
REDUCE_RELAY_SQUELCH = sec.value_or("squelch", false);
}

if (getSingleSection(
secConfig, SECTION_AMENDMENT_MAJORITY_TIME, strTemp, j_))
{
Expand Down
16 changes: 15 additions & 1 deletion src/ripple/overlay/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED

#include <ripple/overlay/Compression.h>
#include <ripple/protocol/PublicKey.h>
#include <ripple/protocol/messages.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/buffers_iterator.hpp>
Expand Down Expand Up @@ -55,8 +56,13 @@ class Message : public std::enable_shared_from_this<Message>
/** Constructor
* @param message Protocol message to serialize
* @param type Protocol message type
* @param validator Public Key of the source validator for Validation or
* Proposal message. Used to check if the message should be squelched.
*/
Message(::google::protobuf::Message const& message, int type);
Message(
::google::protobuf::Message const& message,
int type,
boost::optional<PublicKey> const& validator = {});

/** Retrieve the packed message data. If compressed message is requested but
* the message is not compressible then the uncompressed buffer is returned.
Expand All @@ -74,11 +80,19 @@ class Message : public std::enable_shared_from_this<Message>
return category_;
}

/** Get the validator's key */
boost::optional<PublicKey> const&
getValidatorKey() const
{
return validatorKey_;
}

private:
std::vector<uint8_t> buffer_;
std::vector<uint8_t> bufferCompressed_;
std::size_t category_;
std::once_flag once_flag_;
boost::optional<PublicKey> validatorKey_;

/** Set the payload header
* @param in Pointer to the payload
Expand Down
32 changes: 24 additions & 8 deletions src/ripple/overlay/Overlay.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class Overlay : public Stoppable, public beast::PropertyStream::Source

/** Returns the peer with the matching short id, or null. */
virtual std::shared_ptr<Peer>
findPeerByShortID(Peer::id_t const& id) = 0;
findPeerByShortID(Peer::id_t const& id) const = 0;

/** Returns the peer with the matching public key, or null. */
virtual std::shared_ptr<Peer>
Expand All @@ -147,13 +147,29 @@ class Overlay : public Stoppable, public beast::PropertyStream::Source
virtual void
broadcast(protocol::TMValidation& m) = 0;

/** Relay a proposal. */
virtual void
relay(protocol::TMProposeSet& m, uint256 const& uid) = 0;

/** Relay a validation. */
virtual void
relay(protocol::TMValidation& m, uint256 const& uid) = 0;
/** Relay a proposal.
* @param m the serialized proposal
* @param uid the id used to identify this proposal
* @param validator The pubkey of the validator that issued this proposal
* @return the set of peers which have already sent us this proposal
*/
virtual std::set<Peer::id_t>
relay(
protocol::TMProposeSet& m,
uint256 const& uid,
PublicKey const& validator) = 0;

/** Relay a validation.
* @param m the serialized validation
* @param uid the id used to identify this validation
* @param validator The pubkey of the validator that issued this validation
* @return the set of peers which have already sent us this validation
*/
virtual std::set<Peer::id_t>
relay(
protocol::TMValidation& m,
uint256 const& uid,
PublicKey const& validator) = 0;

/** Visit every active peer.
*
Expand Down
Loading