Skip to content

Commit c688040

Browse files
ximinezMark Travis
and
Mark Travis
committed
Optimize when to acquire ledgers from the network.
Particularly avoid acquiring ledgers likely to be produced locally very soon. Derived from XRPLF#4764 Co-authored-by: Mark Travis <mtravis@ripple.com>
1 parent 34431b4 commit c688040

7 files changed

+136
-43
lines changed

src/xrpld/app/ledger/InboundLedger.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ class InboundLedger final : public TimeoutCounter,
5757
~InboundLedger();
5858

5959
// Called when another attempt is made to fetch this same ledger
60-
void
61-
update(std::uint32_t seq);
60+
//
61+
// Returns true if this triggers requests to be sent
62+
bool
63+
update(std::uint32_t seq, bool broadcast);
6264

6365
/** Returns true if we got all the data. */
6466
bool
@@ -89,7 +91,7 @@ class InboundLedger final : public TimeoutCounter,
8991
bool
9092
checkLocal();
9193
void
92-
init(ScopedLockType& collectionLock);
94+
init(ScopedLockType& collectionLock, bool broadcast);
9395

9496
bool
9597
gotData(

src/xrpld/app/ledger/detail/InboundLedger.cpp

+36-5
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ InboundLedger::InboundLedger(
102102
}
103103

104104
void
105-
InboundLedger::init(ScopedLockType& collectionLock)
105+
InboundLedger::init(ScopedLockType& collectionLock, bool broadcast)
106106
{
107107
ScopedLockType sl(mtx_);
108108
collectionLock.unlock();
@@ -113,8 +113,18 @@ InboundLedger::init(ScopedLockType& collectionLock)
113113

114114
if (!complete_)
115115
{
116-
addPeers();
117-
queueJob(sl);
116+
if (broadcast)
117+
{
118+
addPeers();
119+
queueJob(sl);
120+
}
121+
else
122+
{
123+
// Delay to give time to build the ledger before sending
124+
JLOG(journal_.debug()) << "init: Deferring peer requests";
125+
deferred_ = true;
126+
setTimer(sl);
127+
}
118128
return;
119129
}
120130

@@ -144,8 +154,8 @@ InboundLedger::getPeerCount() const
144154
});
145155
}
146156

147-
void
148-
InboundLedger::update(std::uint32_t seq)
157+
bool
158+
InboundLedger::update(std::uint32_t seq, bool broadcast)
149159
{
150160
ScopedLockType sl(mtx_);
151161

@@ -155,6 +165,27 @@ InboundLedger::update(std::uint32_t seq)
155165

156166
// Prevent this from being swept
157167
touch();
168+
169+
// If the signal is to broadcast, and this request has never tried to
170+
// broadcast before, cancel any waiting timer, then fire off the job to
171+
// broadcast. Note that this is calling mPeerSet->getPeerIds(), not
172+
// getPeerCount(), because the latter will filter out peers that have been
173+
// tried, but are since lost. This wants to check if peers have _ever_ been
174+
// tried. If they have, stick with the normal timer flow.
175+
if (broadcast && mPeerSet->getPeerIds().empty())
176+
{
177+
if (cancelTimer(sl))
178+
{
179+
JLOG(journal_.debug())
180+
<< "update: cancelling timer to send peer requests";
181+
deferred_ = false;
182+
skipNext_ = true;
183+
addPeers();
184+
queueJob(sl);
185+
return true;
186+
}
187+
}
188+
return false;
158189
}
159190

160191
bool

src/xrpld/app/ledger/detail/InboundLedgers.cpp

+31-14
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,29 @@ class InboundLedgersImp : public InboundLedgers
110110
// the network, and doesn't have the necessary tx's and
111111
// ledger entries to build the ledger.
112112
bool const isFull = app_.getOPs().isFull();
113+
// fallingBehind means the last closed ledger is at least 2
114+
// behind the validated ledger. If the node is falling
115+
// behind the network, it probably needs information from
116+
// the network to catch up.
117+
//
118+
// The reason this should not simply be only at least 1
119+
// behind the validated ledger is that a slight lag is
120+
// normal case because some nodes get there slightly later
121+
// than others. A difference of 2 means that at least a full
122+
// ledger interval has passed, so the node is beginning to
123+
// fall behind.
124+
bool const fallingBehind = app_.getOPs().isFallingBehind();
113125
// If everything else is ok, don't try to acquire the ledger
114126
// if the requested seq is in the near future relative to
115-
// the validated ledger. If the requested ledger is between
116-
// 1 and 19 inclusive ledgers ahead of the valid ledger this
117-
// node has not built it yet, but it's possible/likely it
118-
// has the tx's necessary to build it and get caught up.
119-
// Plus it might not become validated. On the other hand, if
120-
// it's more than 20 in the future, this node should request
121-
// it so that it can jump ahead and get caught up.
127+
// the validated ledger. Because validations lag behind
128+
// consensus, if we get any further behind than this, we
129+
// risk losing sync, because we don't have the preferred
130+
// ledger available.
122131
LedgerIndex const validSeq =
123132
app_.getLedgerMaster().getValidLedgerIndex();
124-
constexpr std::size_t lagLeeway = 20;
125-
bool const nearFuture =
126-
(seq > validSeq) && (seq < validSeq + lagLeeway);
133+
constexpr std::size_t lagLeeway = 2;
134+
bool const nearFuture = (validSeq > 0) && (seq > validSeq) &&
135+
(seq < validSeq + lagLeeway);
127136
// If everything else is ok, don't try to acquire the ledger
128137
// if the request is related to consensus. (Note that
129138
// consensus calls usually pass a seq of 0, so nearFuture
@@ -132,6 +141,7 @@ class InboundLedgersImp : public InboundLedgers
132141
reason == InboundLedger::Reason::CONSENSUS;
133142
ss << " Evaluating whether to broadcast requests to peers"
134143
<< ". full: " << (isFull ? "true" : "false")
144+
<< ". falling behind: " << (fallingBehind ? "true" : "false")
135145
<< ". ledger sequence " << seq
136146
<< ". Valid sequence: " << validSeq
137147
<< ". Lag leeway: " << lagLeeway
@@ -142,6 +152,9 @@ class InboundLedgersImp : public InboundLedgers
142152
// If the node is not synced, send requests.
143153
if (!isFull)
144154
return true;
155+
// If the node is falling behind, send requests.
156+
if (fallingBehind)
157+
return true;
145158
// If the ledger is in the near future, do NOT send requests.
146159
// This node is probably about to build it.
147160
if (nearFuture)
@@ -152,7 +165,7 @@ class InboundLedgersImp : public InboundLedgers
152165
return false;
153166
return true;
154167
}();
155-
ss << ". Would broadcast to peers? "
168+
ss << ". Broadcast to peers? "
156169
<< (shouldBroadcast ? "true." : "false.");
157170

158171
if (!shouldAcquire)
@@ -187,7 +200,7 @@ class InboundLedgersImp : public InboundLedgers
187200
std::ref(m_clock),
188201
mPeerSetBuilder->build());
189202
mLedgers.emplace(hash, inbound);
190-
inbound->init(sl);
203+
inbound->init(sl, shouldBroadcast);
191204
++mCounter;
192205
}
193206
}
@@ -199,8 +212,12 @@ class InboundLedgersImp : public InboundLedgers
199212
return {};
200213
}
201214

202-
if (!isNew)
203-
inbound->update(seq);
215+
bool const didBroadcast = [&]() {
216+
if (!isNew)
217+
return inbound->update(seq, shouldBroadcast);
218+
return shouldBroadcast;
219+
}();
220+
ss << " First broadcast: " << (didBroadcast ? "true" : "false");
204221

205222
if (!inbound->isComplete())
206223
{

src/xrpld/app/ledger/detail/TimeoutCounter.cpp

+28-17
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,31 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
5555
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count()
5656
<< "ms";
5757
timer_.expires_after(timerInterval_);
58-
timer_.async_wait(
59-
[wptr = pmDowncast()](boost::system::error_code const& ec) {
60-
if (ec == boost::asio::error::operation_aborted)
61-
return;
62-
63-
if (auto ptr = wptr.lock())
58+
timer_.async_wait([wptr =
59+
pmDowncast()](boost::system::error_code const& ec) {
60+
if (auto ptr = wptr.lock())
61+
{
62+
ScopedLockType sl(ptr->mtx_);
63+
if (ec == boost::asio::error::operation_aborted || ptr->skipNext_)
6464
{
6565
JLOG(ptr->journal_.debug())
66-
<< "timer: ec: " << ec << " (operation_aborted: "
67-
<< boost::asio::error::operation_aborted << " - "
68-
<< (ec == boost::asio::error::operation_aborted ? "aborted"
69-
: "other")
70-
<< ")";
71-
ScopedLockType sl(ptr->mtx_);
72-
ptr->queueJob(sl);
66+
<< "Aborting setTimer: " << ec
67+
<< ", skip: " << (ptr->skipNext_ ? "true" : "false");
68+
ptr->skipNext_ = false;
69+
return;
7370
}
74-
});
71+
72+
ptr->queueJob(sl);
73+
}
74+
});
75+
}
76+
77+
std::size_t
78+
TimeoutCounter::cancelTimer(ScopedLockType& sl)
79+
{
80+
auto const ret = timer_.cancel();
81+
JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)";
82+
return ret;
7583
}
7684

7785
void
@@ -108,9 +116,12 @@ TimeoutCounter::invokeOnTimer()
108116

109117
if (!progress_)
110118
{
111-
++timeouts_;
112-
JLOG(journal_.debug())
113-
<< "Timeout(" << timeouts_ << ") " << " acquiring " << hash_;
119+
if (deferred_)
120+
deferred_ = false;
121+
else
122+
++timeouts_;
123+
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
124+
<< " acquiring " << hash_;
114125
onTimer(false, sl);
115126
}
116127
else

src/xrpld/app/ledger/detail/TimeoutCounter.h

+9
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ class TimeoutCounter
102102
void
103103
setTimer(ScopedLockType&);
104104

105+
/** Cancel any waiting timer */
106+
std::size_t
107+
cancelTimer(ScopedLockType&);
108+
105109
/** Queue a job to call invokeOnTimer(). */
106110
void
107111
queueJob(ScopedLockType&);
@@ -133,6 +137,11 @@ class TimeoutCounter
133137
int timeouts_;
134138
bool complete_;
135139
bool failed_;
140+
/** Whether the initialization deferred doing any work until the first
141+
* timeout. */
142+
bool deferred_ = false;
143+
/** Skip the next timeout, regardless of ec */
144+
bool skipNext_ = false;
136145
/** Whether forward progress has been made. */
137146
bool progress_;
138147
/** The minimum time to wait between calls to execute(). */

src/xrpld/app/misc/NetworkOPs.cpp

+25-4
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,8 @@ class NetworkOPsImp final : public NetworkOPs
431431
clearLedgerFetch() override;
432432
Json::Value
433433
getLedgerFetchInfo() override;
434+
bool
435+
isFallingBehind() const override;
434436
std::uint32_t
435437
acceptLedger(
436438
std::optional<std::chrono::milliseconds> consensusDelay) override;
@@ -721,6 +723,7 @@ class NetworkOPsImp final : public NetworkOPs
721723
std::atomic<bool> amendmentBlocked_{false};
722724
std::atomic<bool> amendmentWarned_{false};
723725
std::atomic<bool> unlBlocked_{false};
726+
std::atomic<bool> fallingBehind_{false};
724727

725728
ClosureCounter<void, boost::system::error_code const&> waitHandlerCounter_;
726729
boost::asio::steady_timer heartbeatTimer_;
@@ -1811,13 +1814,25 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
18111814

18121815
auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
18131816

1814-
JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq
1817+
JLOG(m_journal.info()) << "beginConsensus time for #" << closingInfo.seq
18151818
<< " with LCL " << closingInfo.parentHash;
18161819

1817-
auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
1820+
fallingBehind_ = false;
1821+
if (closingInfo.seq < m_ledgerMaster.getValidLedgerIndex() - 1)
1822+
{
1823+
fallingBehind_ = true;
1824+
JLOG(m_journal.warn())
1825+
<< "beginConsensus Current ledger " << closingInfo.seq
1826+
<< " is at least 2 behind validated "
1827+
<< m_ledgerMaster.getValidLedgerIndex();
1828+
}
1829+
1830+
auto const prevLedger =
1831+
m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
18181832

18191833
if (!prevLedger)
18201834
{
1835+
fallingBehind_ = true;
18211836
// this shouldn't happen unless we jump ledgers
18221837
if (mMode == OperatingMode::FULL)
18231838
{
@@ -1865,7 +1880,7 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
18651880
mLastConsensusPhase = currPhase;
18661881
}
18671882

1868-
JLOG(m_journal.debug()) << "Initiating consensus engine";
1883+
JLOG(m_journal.debug()) << "beginConsensus Initiating consensus engine";
18691884
return true;
18701885
}
18711886

@@ -1938,7 +1953,7 @@ NetworkOPsImp::endConsensus()
19381953
{
19391954
// check if the ledger is good enough to go to FULL
19401955
// Note: Do not go to FULL if we don't have the previous ledger
1941-
// check if the ledger is bad enough to go to CONNECTE D -- TODO
1956+
// check if the ledger is bad enough to go to CONNECTED -- TODO
19421957
auto current = m_ledgerMaster.getCurrentLedger();
19431958
if (app_.timeKeeper().now() < (current->info().parentCloseTime +
19441959
2 * current->info().closeTimeResolution))
@@ -2751,6 +2766,12 @@ NetworkOPsImp::getLedgerFetchInfo()
27512766
return app_.getInboundLedgers().getInfo();
27522767
}
27532768

2769+
bool
2770+
NetworkOPsImp::isFallingBehind() const
2771+
{
2772+
return fallingBehind_;
2773+
}
2774+
27542775
void
27552776
NetworkOPsImp::pubProposedTransaction(
27562777
std::shared_ptr<ReadView const> const& ledger,

src/xrpld/app/misc/NetworkOPs.h

+2
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ class NetworkOPs : public InfoSub::Source
227227
clearLedgerFetch() = 0;
228228
virtual Json::Value
229229
getLedgerFetchInfo() = 0;
230+
virtual bool
231+
isFallingBehind() const = 0;
230232

231233
/** Accepts the current transaction tree, return the new ledger's sequence
232234

0 commit comments

Comments
 (0)