Skip to content

Commit 56cce4f

Browse files
ximinezMark Travis
and
Mark Travis
committed
Optimize when to acquire ledgers from the network.
Particularly avoid acquiring ledgers likely to be produced locally very soon. Derived from XRPLF#4764 Co-authored-by: Mark Travis <mtravis@ripple.com>
1 parent 0efb318 commit 56cce4f

7 files changed

+136
-43
lines changed

src/xrpld/app/ledger/InboundLedger.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ class InboundLedger final : public TimeoutCounter,
5757
~InboundLedger();
5858

5959
// Called when another attempt is made to fetch this same ledger
60-
void
61-
update(std::uint32_t seq);
60+
//
61+
// Returns true if this triggers requests to be sent
62+
bool
63+
update(std::uint32_t seq, bool broadcast);
6264

6365
/** Returns true if we got all the data. */
6466
bool
@@ -89,7 +91,7 @@ class InboundLedger final : public TimeoutCounter,
8991
bool
9092
checkLocal();
9193
void
92-
init(ScopedLockType& collectionLock);
94+
init(ScopedLockType& collectionLock, bool broadcast);
9395

9496
bool
9597
gotData(

src/xrpld/app/ledger/detail/InboundLedger.cpp

+36-5
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ InboundLedger::InboundLedger(
102102
}
103103

104104
void
105-
InboundLedger::init(ScopedLockType& collectionLock)
105+
InboundLedger::init(ScopedLockType& collectionLock, bool broadcast)
106106
{
107107
ScopedLockType sl(mtx_);
108108
collectionLock.unlock();
@@ -113,8 +113,18 @@ InboundLedger::init(ScopedLockType& collectionLock)
113113

114114
if (!complete_)
115115
{
116-
addPeers();
117-
queueJob(sl);
116+
if (broadcast)
117+
{
118+
addPeers();
119+
queueJob(sl);
120+
}
121+
else
122+
{
123+
// Delay to give time to build the ledger before sending
124+
JLOG(journal_.debug()) << "init: Deferring peer requests";
125+
deferred_ = true;
126+
setTimer(sl);
127+
}
118128
return;
119129
}
120130

@@ -145,8 +155,8 @@ InboundLedger::getPeerCount() const
145155
});
146156
}
147157

148-
void
149-
InboundLedger::update(std::uint32_t seq)
158+
bool
159+
InboundLedger::update(std::uint32_t seq, bool broadcast)
150160
{
151161
ScopedLockType sl(mtx_);
152162

@@ -156,6 +166,27 @@ InboundLedger::update(std::uint32_t seq)
156166

157167
// Prevent this from being swept
158168
touch();
169+
170+
// If the signal is to broadcast, and this request has never tried to
171+
// broadcast before, cancel any waiting timer, then fire off the job to
172+
// broadcast. Note that this is calling mPeerSet->getPeerIds(), not
173+
// getPeerCount(), because the latter will filter out peers that have been
174+
// tried, but are since lost. This wants to check if peers have _ever_ been
175+
// tried. If they have, stick with the normal timer flow.
176+
if (broadcast && mPeerSet->getPeerIds().empty())
177+
{
178+
if (cancelTimer(sl))
179+
{
180+
JLOG(journal_.debug())
181+
<< "update: cancelling timer to send peer requests";
182+
deferred_ = false;
183+
skipNext_ = true;
184+
addPeers();
185+
queueJob(sl);
186+
return true;
187+
}
188+
}
189+
return false;
159190
}
160191

161192
bool

src/xrpld/app/ledger/detail/InboundLedgers.cpp

+31-14
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,29 @@ class InboundLedgersImp : public InboundLedgers
107107
// the network, and doesn't have the necessary tx's and
108108
// ledger entries to build the ledger.
109109
bool const isFull = app_.getOPs().isFull();
110+
// fallingBehind means the last closed ledger is at least 2
111+
// behind the validated ledger. If the node is falling
112+
// behind the network, it probably needs information from
113+
// the network to catch up.
114+
//
115+
// The reason this should not simply be only at least 1
116+
// behind the validated ledger is that a slight lag is
117+
// normal case because some nodes get there slightly later
118+
// than others. A difference of 2 means that at least a full
119+
// ledger interval has passed, so the node is beginning to
120+
// fall behind.
121+
bool const fallingBehind = app_.getOPs().isFallingBehind();
110122
// If everything else is ok, don't try to acquire the ledger
111123
// if the requested seq is in the near future relative to
112-
// the validated ledger. If the requested ledger is between
113-
// 1 and 19 inclusive ledgers ahead of the valid ledger this
114-
// node has not built it yet, but it's possible/likely it
115-
// has the tx's necessary to build it and get caught up.
116-
// Plus it might not become validated. On the other hand, if
117-
// it's more than 20 in the future, this node should request
118-
// it so that it can jump ahead and get caught up.
124+
// the validated ledger. Because validations lag behind
125+
// consensus, if we get any further behind than this, we
126+
// risk losing sync, because we don't have the preferred
127+
// ledger available.
119128
LedgerIndex const validSeq =
120129
app_.getLedgerMaster().getValidLedgerIndex();
121-
constexpr std::size_t lagLeeway = 20;
122-
bool const nearFuture =
123-
(seq > validSeq) && (seq < validSeq + lagLeeway);
130+
constexpr std::size_t lagLeeway = 2;
131+
bool const nearFuture = (validSeq > 0) && (seq > validSeq) &&
132+
(seq < validSeq + lagLeeway);
124133
// If everything else is ok, don't try to acquire the ledger
125134
// if the request is related to consensus. (Note that
126135
// consensus calls usually pass a seq of 0, so nearFuture
@@ -129,6 +138,7 @@ class InboundLedgersImp : public InboundLedgers
129138
reason == InboundLedger::Reason::CONSENSUS;
130139
ss << " Evaluating whether to broadcast requests to peers"
131140
<< ". full: " << (isFull ? "true" : "false")
141+
<< ". falling behind: " << (fallingBehind ? "true" : "false")
132142
<< ". ledger sequence " << seq
133143
<< ". Valid sequence: " << validSeq
134144
<< ". Lag leeway: " << lagLeeway
@@ -139,6 +149,9 @@ class InboundLedgersImp : public InboundLedgers
139149
// If the node is not synced, send requests.
140150
if (!isFull)
141151
return true;
152+
// If the node is falling behind, send requests.
153+
if (fallingBehind)
154+
return true;
142155
// If the ledger is in the near future, do NOT send requests.
143156
// This node is probably about to build it.
144157
if (nearFuture)
@@ -149,7 +162,7 @@ class InboundLedgersImp : public InboundLedgers
149162
return false;
150163
return true;
151164
}();
152-
ss << ". Would broadcast to peers? "
165+
ss << ". Broadcast to peers? "
153166
<< (shouldBroadcast ? "true." : "false.");
154167

155168
if (!shouldAcquire)
@@ -184,7 +197,7 @@ class InboundLedgersImp : public InboundLedgers
184197
std::ref(m_clock),
185198
mPeerSetBuilder->build());
186199
mLedgers.emplace(hash, inbound);
187-
inbound->init(sl);
200+
inbound->init(sl, shouldBroadcast);
188201
++mCounter;
189202
}
190203
}
@@ -196,8 +209,12 @@ class InboundLedgersImp : public InboundLedgers
196209
return {};
197210
}
198211

199-
if (!isNew)
200-
inbound->update(seq);
212+
bool const didBroadcast = [&]() {
213+
if (!isNew)
214+
return inbound->update(seq, shouldBroadcast);
215+
return shouldBroadcast;
216+
}();
217+
ss << " First broadcast: " << (didBroadcast ? "true" : "false");
201218

202219
if (!inbound->isComplete())
203220
{

src/xrpld/app/ledger/detail/TimeoutCounter.cpp

+28-17
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,31 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
5757
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count()
5858
<< "ms";
5959
timer_.expires_after(timerInterval_);
60-
timer_.async_wait(
61-
[wptr = pmDowncast()](boost::system::error_code const& ec) {
62-
if (ec == boost::asio::error::operation_aborted)
63-
return;
64-
65-
if (auto ptr = wptr.lock())
60+
timer_.async_wait([wptr =
61+
pmDowncast()](boost::system::error_code const& ec) {
62+
if (auto ptr = wptr.lock())
63+
{
64+
ScopedLockType sl(ptr->mtx_);
65+
if (ec == boost::asio::error::operation_aborted || ptr->skipNext_)
6666
{
6767
JLOG(ptr->journal_.debug())
68-
<< "timer: ec: " << ec << " (operation_aborted: "
69-
<< boost::asio::error::operation_aborted << " - "
70-
<< (ec == boost::asio::error::operation_aborted ? "aborted"
71-
: "other")
72-
<< ")";
73-
ScopedLockType sl(ptr->mtx_);
74-
ptr->queueJob(sl);
68+
<< "Aborting setTimer: " << ec
69+
<< ", skip: " << (ptr->skipNext_ ? "true" : "false");
70+
ptr->skipNext_ = false;
71+
return;
7572
}
76-
});
73+
74+
ptr->queueJob(sl);
75+
}
76+
});
77+
}
78+
79+
std::size_t
80+
TimeoutCounter::cancelTimer(ScopedLockType& sl)
81+
{
82+
auto const ret = timer_.cancel();
83+
JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)";
84+
return ret;
7785
}
7886

7987
void
@@ -110,9 +118,12 @@ TimeoutCounter::invokeOnTimer()
110118

111119
if (!progress_)
112120
{
113-
++timeouts_;
114-
JLOG(journal_.debug())
115-
<< "Timeout(" << timeouts_ << ") " << " acquiring " << hash_;
121+
if (deferred_)
122+
deferred_ = false;
123+
else
124+
++timeouts_;
125+
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
126+
<< " acquiring " << hash_;
116127
onTimer(false, sl);
117128
}
118129
else

src/xrpld/app/ledger/detail/TimeoutCounter.h

+9
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ class TimeoutCounter
102102
void
103103
setTimer(ScopedLockType&);
104104

105+
/** Cancel any waiting timer */
106+
std::size_t
107+
cancelTimer(ScopedLockType&);
108+
105109
/** Queue a job to call invokeOnTimer(). */
106110
void
107111
queueJob(ScopedLockType&);
@@ -133,6 +137,11 @@ class TimeoutCounter
133137
int timeouts_;
134138
bool complete_;
135139
bool failed_;
140+
/** Whether the initialization deferred doing any work until the first
141+
* timeout. */
142+
bool deferred_ = false;
143+
/** Skip the next timeout, regardless of ec */
144+
bool skipNext_ = false;
136145
/** Whether forward progress has been made. */
137146
bool progress_;
138147
/** The minimum time to wait between calls to execute(). */

src/xrpld/app/misc/NetworkOPs.cpp

+25-4
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,8 @@ class NetworkOPsImp final : public NetworkOPs
434434
clearLedgerFetch() override;
435435
Json::Value
436436
getLedgerFetchInfo() override;
437+
bool
438+
isFallingBehind() const override;
437439
std::uint32_t
438440
acceptLedger(
439441
std::optional<std::chrono::milliseconds> consensusDelay) override;
@@ -724,6 +726,7 @@ class NetworkOPsImp final : public NetworkOPs
724726
std::atomic<bool> amendmentBlocked_{false};
725727
std::atomic<bool> amendmentWarned_{false};
726728
std::atomic<bool> unlBlocked_{false};
729+
std::atomic<bool> fallingBehind_{false};
727730

728731
ClosureCounter<void, boost::system::error_code const&> waitHandlerCounter_;
729732
boost::asio::steady_timer heartbeatTimer_;
@@ -1828,13 +1831,25 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
18281831

18291832
auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
18301833

1831-
JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq
1834+
JLOG(m_journal.info()) << "beginConsensus time for #" << closingInfo.seq
18321835
<< " with LCL " << closingInfo.parentHash;
18331836

1834-
auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
1837+
fallingBehind_ = false;
1838+
if (closingInfo.seq < m_ledgerMaster.getValidLedgerIndex() - 1)
1839+
{
1840+
fallingBehind_ = true;
1841+
JLOG(m_journal.warn())
1842+
<< "beginConsensus Current ledger " << closingInfo.seq
1843+
<< " is at least 2 behind validated "
1844+
<< m_ledgerMaster.getValidLedgerIndex();
1845+
}
1846+
1847+
auto const prevLedger =
1848+
m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
18351849

18361850
if (!prevLedger)
18371851
{
1852+
fallingBehind_ = true;
18381853
// this shouldn't happen unless we jump ledgers
18391854
if (mMode == OperatingMode::FULL)
18401855
{
@@ -1886,7 +1901,7 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
18861901
mLastConsensusPhase = currPhase;
18871902
}
18881903

1889-
JLOG(m_journal.debug()) << "Initiating consensus engine";
1904+
JLOG(m_journal.debug()) << "beginConsensus Initiating consensus engine";
18901905
return true;
18911906
}
18921907

@@ -1959,7 +1974,7 @@ NetworkOPsImp::endConsensus()
19591974
{
19601975
// check if the ledger is good enough to go to FULL
19611976
// Note: Do not go to FULL if we don't have the previous ledger
1962-
// check if the ledger is bad enough to go to CONNECTE D -- TODO
1977+
// check if the ledger is bad enough to go to CONNECTED -- TODO
19631978
auto current = m_ledgerMaster.getCurrentLedger();
19641979
if (app_.timeKeeper().now() < (current->info().parentCloseTime +
19651980
2 * current->info().closeTimeResolution))
@@ -2784,6 +2799,12 @@ NetworkOPsImp::getLedgerFetchInfo()
27842799
return app_.getInboundLedgers().getInfo();
27852800
}
27862801

2802+
bool
2803+
NetworkOPsImp::isFallingBehind() const
2804+
{
2805+
return fallingBehind_;
2806+
}
2807+
27872808
void
27882809
NetworkOPsImp::pubProposedTransaction(
27892810
std::shared_ptr<ReadView const> const& ledger,

src/xrpld/app/misc/NetworkOPs.h

+2
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ class NetworkOPs : public InfoSub::Source
227227
clearLedgerFetch() = 0;
228228
virtual Json::Value
229229
getLedgerFetchInfo() = 0;
230+
virtual bool
231+
isFallingBehind() const = 0;
230232

231233
/** Accepts the current transaction tree, return the new ledger's sequence
232234

0 commit comments

Comments
 (0)