Skip to content

Commit a498259

Browse files
ximinezMark Travis
and
Mark Travis
committed
Optimize when to acquire ledgers from the network.
Particularly avoid acquiring ledgers likely to be produced locally very soon. Derived from XRPLF#4764 Co-authored-by: Mark Travis <mtravis@ripple.com>
1 parent 4692d41 commit a498259

7 files changed

+136
-43
lines changed

src/xrpld/app/ledger/InboundLedger.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ class InboundLedger final : public TimeoutCounter,
5757
~InboundLedger();
5858

5959
// Called when another attempt is made to fetch this same ledger
60-
void
61-
update(std::uint32_t seq);
60+
//
61+
// Returns true if this triggers requests to be sent
62+
bool
63+
update(std::uint32_t seq, bool broadcast);
6264

6365
/** Returns true if we got all the data. */
6466
bool
@@ -89,7 +91,7 @@ class InboundLedger final : public TimeoutCounter,
8991
bool
9092
checkLocal();
9193
void
92-
init(ScopedLockType& collectionLock);
94+
init(ScopedLockType& collectionLock, bool broadcast);
9395

9496
bool
9597
gotData(

src/xrpld/app/ledger/detail/InboundLedger.cpp

+36-5
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ InboundLedger::InboundLedger(
102102
}
103103

104104
void
105-
InboundLedger::init(ScopedLockType& collectionLock)
105+
InboundLedger::init(ScopedLockType& collectionLock, bool broadcast)
106106
{
107107
ScopedLockType sl(mtx_);
108108
collectionLock.unlock();
@@ -113,8 +113,18 @@ InboundLedger::init(ScopedLockType& collectionLock)
113113

114114
if (!complete_)
115115
{
116-
addPeers();
117-
queueJob(sl);
116+
if (broadcast)
117+
{
118+
addPeers();
119+
queueJob(sl);
120+
}
121+
else
122+
{
123+
// Delay to give time to build the ledger before sending
124+
JLOG(journal_.debug()) << "init: Deferring peer requests";
125+
deferred_ = true;
126+
setTimer(sl);
127+
}
118128
return;
119129
}
120130

@@ -144,8 +154,8 @@ InboundLedger::getPeerCount() const
144154
});
145155
}
146156

147-
void
148-
InboundLedger::update(std::uint32_t seq)
157+
bool
158+
InboundLedger::update(std::uint32_t seq, bool broadcast)
149159
{
150160
ScopedLockType sl(mtx_);
151161

@@ -155,6 +165,27 @@ InboundLedger::update(std::uint32_t seq)
155165

156166
// Prevent this from being swept
157167
touch();
168+
169+
// If the signal is to broadcast, and this request has never tried to
170+
// broadcast before, cancel any waiting timer, then fire off the job to
171+
// broadcast. Note that this is calling mPeerSet->getPeerIds(), not
172+
// getPeerCount(), because the latter will filter out peers that have been
173+
// tried, but are since lost. This wants to check if peers have _ever_ been
174+
// tried. If they have, stick with the normal timer flow.
175+
if (broadcast && mPeerSet->getPeerIds().empty())
176+
{
177+
if (cancelTimer(sl))
178+
{
179+
JLOG(journal_.debug())
180+
<< "update: cancelling timer to send peer requests";
181+
deferred_ = false;
182+
skipNext_ = true;
183+
addPeers();
184+
queueJob(sl);
185+
return true;
186+
}
187+
}
188+
return false;
158189
}
159190

160191
bool

src/xrpld/app/ledger/detail/InboundLedgers.cpp

+31-14
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,29 @@ class InboundLedgersImp : public InboundLedgers
109109
// the network, and doesn't have the necessary tx's and
110110
// ledger entries to build the ledger.
111111
bool const isFull = app_.getOPs().isFull();
112+
// fallingBehind means the last closed ledger is at least 2
113+
// behind the validated ledger. If the node is falling
114+
// behind the network, it probably needs information from
115+
// the network to catch up.
116+
//
117+
// The reason this should not simply be only at least 1
118+
// behind the validated ledger is that a slight lag is
119+
// normal case because some nodes get there slightly later
120+
// than others. A difference of 2 means that at least a full
121+
// ledger interval has passed, so the node is beginning to
122+
// fall behind.
123+
bool const fallingBehind = app_.getOPs().isFallingBehind();
112124
// If everything else is ok, don't try to acquire the ledger
113125
// if the requested seq is in the near future relative to
114-
// the validated ledger. If the requested ledger is between
115-
// 1 and 19 inclusive ledgers ahead of the valid ledger this
116-
// node has not built it yet, but it's possible/likely it
117-
// has the tx's necessary to build it and get caught up.
118-
// Plus it might not become validated. On the other hand, if
119-
// it's more than 20 in the future, this node should request
120-
// it so that it can jump ahead and get caught up.
126+
// the validated ledger. Because validations lag behind
127+
// consensus, if we get any further behind than this, we
128+
// risk losing sync, because we don't have the preferred
129+
// ledger available.
121130
LedgerIndex const validSeq =
122131
app_.getLedgerMaster().getValidLedgerIndex();
123-
constexpr std::size_t lagLeeway = 20;
124-
bool const nearFuture =
125-
(seq > validSeq) && (seq < validSeq + lagLeeway);
132+
constexpr std::size_t lagLeeway = 2;
133+
bool const nearFuture = (validSeq > 0) && (seq > validSeq) &&
134+
(seq < validSeq + lagLeeway);
126135
// If everything else is ok, don't try to acquire the ledger
127136
// if the request is related to consensus. (Note that
128137
// consensus calls usually pass a seq of 0, so nearFuture
@@ -131,6 +140,7 @@ class InboundLedgersImp : public InboundLedgers
131140
reason == InboundLedger::Reason::CONSENSUS;
132141
ss << " Evaluating whether to broadcast requests to peers"
133142
<< ". full: " << (isFull ? "true" : "false")
143+
<< ". falling behind: " << (fallingBehind ? "true" : "false")
134144
<< ". ledger sequence " << seq
135145
<< ". Valid sequence: " << validSeq
136146
<< ". Lag leeway: " << lagLeeway
@@ -141,6 +151,9 @@ class InboundLedgersImp : public InboundLedgers
141151
// If the node is not synced, send requests.
142152
if (!isFull)
143153
return true;
154+
// If the node is falling behind, send requests.
155+
if (fallingBehind)
156+
return true;
144157
// If the ledger is in the near future, do NOT send requests.
145158
// This node is probably about to build it.
146159
if (nearFuture)
@@ -151,7 +164,7 @@ class InboundLedgersImp : public InboundLedgers
151164
return false;
152165
return true;
153166
}();
154-
ss << ". Would broadcast to peers? "
167+
ss << ". Broadcast to peers? "
155168
<< (shouldBroadcast ? "true." : "false.");
156169

157170
if (!shouldAcquire)
@@ -186,7 +199,7 @@ class InboundLedgersImp : public InboundLedgers
186199
std::ref(m_clock),
187200
mPeerSetBuilder->build());
188201
mLedgers.emplace(hash, inbound);
189-
inbound->init(sl);
202+
inbound->init(sl, shouldBroadcast);
190203
++mCounter;
191204
}
192205
}
@@ -198,8 +211,12 @@ class InboundLedgersImp : public InboundLedgers
198211
return {};
199212
}
200213

201-
if (!isNew)
202-
inbound->update(seq);
214+
bool const didBroadcast = [&]() {
215+
if (!isNew)
216+
return inbound->update(seq, shouldBroadcast);
217+
return shouldBroadcast;
218+
}();
219+
ss << " First broadcast: " << (didBroadcast ? "true" : "false");
203220

204221
if (!inbound->isComplete())
205222
{

src/xrpld/app/ledger/detail/TimeoutCounter.cpp

+28-17
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,31 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
5555
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count()
5656
<< "ms";
5757
timer_.expires_after(timerInterval_);
58-
timer_.async_wait(
59-
[wptr = pmDowncast()](boost::system::error_code const& ec) {
60-
if (ec == boost::asio::error::operation_aborted)
61-
return;
62-
63-
if (auto ptr = wptr.lock())
58+
timer_.async_wait([wptr =
59+
pmDowncast()](boost::system::error_code const& ec) {
60+
if (auto ptr = wptr.lock())
61+
{
62+
ScopedLockType sl(ptr->mtx_);
63+
if (ec == boost::asio::error::operation_aborted || ptr->skipNext_)
6464
{
6565
JLOG(ptr->journal_.debug())
66-
<< "timer: ec: " << ec << " (operation_aborted: "
67-
<< boost::asio::error::operation_aborted << " - "
68-
<< (ec == boost::asio::error::operation_aborted ? "aborted"
69-
: "other")
70-
<< ")";
71-
ScopedLockType sl(ptr->mtx_);
72-
ptr->queueJob(sl);
66+
<< "Aborting setTimer: " << ec
67+
<< ", skip: " << (ptr->skipNext_ ? "true" : "false");
68+
ptr->skipNext_ = false;
69+
return;
7370
}
74-
});
71+
72+
ptr->queueJob(sl);
73+
}
74+
});
75+
}
76+
77+
std::size_t
78+
TimeoutCounter::cancelTimer(ScopedLockType& sl)
79+
{
80+
auto const ret = timer_.cancel();
81+
JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)";
82+
return ret;
7583
}
7684

7785
void
@@ -108,9 +116,12 @@ TimeoutCounter::invokeOnTimer()
108116

109117
if (!progress_)
110118
{
111-
++timeouts_;
112-
JLOG(journal_.debug())
113-
<< "Timeout(" << timeouts_ << ") " << " acquiring " << hash_;
119+
if (deferred_)
120+
deferred_ = false;
121+
else
122+
++timeouts_;
123+
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
124+
<< " acquiring " << hash_;
114125
onTimer(false, sl);
115126
}
116127
else

src/xrpld/app/ledger/detail/TimeoutCounter.h

+9
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ class TimeoutCounter
102102
void
103103
setTimer(ScopedLockType&);
104104

105+
/** Cancel any waiting timer */
106+
std::size_t
107+
cancelTimer(ScopedLockType&);
108+
105109
/** Queue a job to call invokeOnTimer(). */
106110
void
107111
queueJob(ScopedLockType&);
@@ -133,6 +137,11 @@ class TimeoutCounter
133137
int timeouts_;
134138
bool complete_;
135139
bool failed_;
140+
/** Whether the initialization deferred doing any work until the first
141+
* timeout. */
142+
bool deferred_ = false;
143+
/** Skip the next timeout, regardless of ec */
144+
bool skipNext_ = false;
136145
/** Whether forward progress has been made. */
137146
bool progress_;
138147
/** The minimum time to wait between calls to execute(). */

src/xrpld/app/misc/NetworkOPs.cpp

+25-4
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,8 @@ class NetworkOPsImp final : public NetworkOPs
431431
clearLedgerFetch() override;
432432
Json::Value
433433
getLedgerFetchInfo() override;
434+
bool
435+
isFallingBehind() const override;
434436
std::uint32_t
435437
acceptLedger(
436438
std::optional<std::chrono::milliseconds> consensusDelay) override;
@@ -721,6 +723,7 @@ class NetworkOPsImp final : public NetworkOPs
721723
std::atomic<bool> amendmentBlocked_{false};
722724
std::atomic<bool> amendmentWarned_{false};
723725
std::atomic<bool> unlBlocked_{false};
726+
std::atomic<bool> fallingBehind_{false};
724727

725728
ClosureCounter<void, boost::system::error_code const&> waitHandlerCounter_;
726729
boost::asio::steady_timer heartbeatTimer_;
@@ -1811,13 +1814,25 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
18111814

18121815
auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
18131816

1814-
JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq
1817+
JLOG(m_journal.info()) << "beginConsensus time for #" << closingInfo.seq
18151818
<< " with LCL " << closingInfo.parentHash;
18161819

1817-
auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
1820+
fallingBehind_ = false;
1821+
if (closingInfo.seq < m_ledgerMaster.getValidLedgerIndex() - 1)
1822+
{
1823+
fallingBehind_ = true;
1824+
JLOG(m_journal.warn())
1825+
<< "beginConsensus Current ledger " << closingInfo.seq
1826+
<< " is at least 2 behind validated "
1827+
<< m_ledgerMaster.getValidLedgerIndex();
1828+
}
1829+
1830+
auto const prevLedger =
1831+
m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
18181832

18191833
if (!prevLedger)
18201834
{
1835+
fallingBehind_ = true;
18211836
// this shouldn't happen unless we jump ledgers
18221837
if (mMode == OperatingMode::FULL)
18231838
{
@@ -1865,7 +1880,7 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
18651880
mLastConsensusPhase = currPhase;
18661881
}
18671882

1868-
JLOG(m_journal.debug()) << "Initiating consensus engine";
1883+
JLOG(m_journal.debug()) << "beginConsensus Initiating consensus engine";
18691884
return true;
18701885
}
18711886

@@ -1938,7 +1953,7 @@ NetworkOPsImp::endConsensus()
19381953
{
19391954
// check if the ledger is good enough to go to FULL
19401955
// Note: Do not go to FULL if we don't have the previous ledger
1941-
// check if the ledger is bad enough to go to CONNECTE D -- TODO
1956+
// check if the ledger is bad enough to go to CONNECTED -- TODO
19421957
auto current = m_ledgerMaster.getCurrentLedger();
19431958
if (app_.timeKeeper().now() < (current->info().parentCloseTime +
19441959
2 * current->info().closeTimeResolution))
@@ -2751,6 +2766,12 @@ NetworkOPsImp::getLedgerFetchInfo()
27512766
return app_.getInboundLedgers().getInfo();
27522767
}
27532768

2769+
bool
2770+
NetworkOPsImp::isFallingBehind() const
2771+
{
2772+
return fallingBehind_;
2773+
}
2774+
27542775
void
27552776
NetworkOPsImp::pubProposedTransaction(
27562777
std::shared_ptr<ReadView const> const& ledger,

src/xrpld/app/misc/NetworkOPs.h

+2
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ class NetworkOPs : public InfoSub::Source
227227
clearLedgerFetch() = 0;
228228
virtual Json::Value
229229
getLedgerFetchInfo() = 0;
230+
virtual bool
231+
isFallingBehind() const = 0;
230232

231233
/** Accepts the current transaction tree, return the new ledger's sequence
232234

0 commit comments

Comments
 (0)