Skip to content

Commit 6a29348

Browse files
ximinezMark Travis
and
Mark Travis
committed
Optimize when to acquire ledgers from the network.
Particularly avoid acquiring ledgers likely to be produced locally very soon. Derived from XRPLF#4764 Co-authored-by: Mark Travis <mtravis@ripple.com>
1 parent f0f363c commit 6a29348

7 files changed

+136
-43
lines changed

src/xrpld/app/ledger/InboundLedger.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,10 @@ class InboundLedger final : public TimeoutCounter,
5858
~InboundLedger();
5959

6060
// Called when another attempt is made to fetch this same ledger
61-
void
62-
update(std::uint32_t seq);
61+
//
62+
// Returns true if this triggers requests to be sent
63+
bool
64+
update(std::uint32_t seq, bool broadcast);
6365

6466
/** Returns true if we got all the data. */
6567
bool
@@ -90,7 +92,7 @@ class InboundLedger final : public TimeoutCounter,
9092
bool
9193
checkLocal();
9294
void
93-
init(ScopedLockType& collectionLock);
95+
init(ScopedLockType& collectionLock, bool broadcast);
9496

9597
bool
9698
gotData(

src/xrpld/app/ledger/detail/InboundLedger.cpp

+36-5
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ InboundLedger::InboundLedger(
101101
}
102102

103103
void
104-
InboundLedger::init(ScopedLockType& collectionLock)
104+
InboundLedger::init(ScopedLockType& collectionLock, bool broadcast)
105105
{
106106
ScopedLockType sl(mtx_);
107107
collectionLock.unlock();
@@ -112,8 +112,18 @@ InboundLedger::init(ScopedLockType& collectionLock)
112112

113113
if (!complete_)
114114
{
115-
addPeers();
116-
queueJob(sl);
115+
if (broadcast)
116+
{
117+
addPeers();
118+
queueJob(sl);
119+
}
120+
else
121+
{
122+
// Delay to give time to build the ledger before sending
123+
JLOG(journal_.debug()) << "init: Deferring peer requests";
124+
deferred_ = true;
125+
setTimer(sl);
126+
}
117127
return;
118128
}
119129

@@ -144,8 +154,8 @@ InboundLedger::getPeerCount() const
144154
});
145155
}
146156

147-
void
148-
InboundLedger::update(std::uint32_t seq)
157+
bool
158+
InboundLedger::update(std::uint32_t seq, bool broadcast)
149159
{
150160
ScopedLockType sl(mtx_);
151161

@@ -155,6 +165,27 @@ InboundLedger::update(std::uint32_t seq)
155165

156166
// Prevent this from being swept
157167
touch();
168+
169+
// If the signal is to broadcast, and this request has never tried to
170+
// broadcast before, cancel any waiting timer, then fire off the job to
171+
// broadcast. Note that this is calling mPeerSet->getPeerIds(), not
172+
// getPeerCount(), because the latter will filter out peers that have been
173+
// tried, but are since lost. This wants to check if peers have _ever_ been
174+
// tried. If they have, stick with the normal timer flow.
175+
if (broadcast && mPeerSet->getPeerIds().empty())
176+
{
177+
if (cancelTimer(sl))
178+
{
179+
JLOG(journal_.debug())
180+
<< "update: cancelling timer to send peer requests";
181+
deferred_ = false;
182+
skipNext_ = true;
183+
addPeers();
184+
queueJob(sl);
185+
return true;
186+
}
187+
}
188+
return false;
158189
}
159190

160191
bool

src/xrpld/app/ledger/detail/InboundLedgers.cpp

+31-14
Original file line numberDiff line numberDiff line change
@@ -106,20 +106,29 @@ class InboundLedgersImp : public InboundLedgers
106106
// the network, and doesn't have the necessary tx's and
107107
// ledger entries to build the ledger.
108108
bool const isFull = app_.getOPs().isFull();
109+
// fallingBehind means the last closed ledger is at least 2
110+
// behind the validated ledger. If the node is falling
111+
// behind the network, it probably needs information from
112+
// the network to catch up.
113+
//
114+
// The reason this should not simply be only at least 1
115+
// behind the validated ledger is that a slight lag is
116+
// normal case because some nodes get there slightly later
117+
// than others. A difference of 2 means that at least a full
118+
// ledger interval has passed, so the node is beginning to
119+
// fall behind.
120+
bool const fallingBehind = app_.getOPs().isFallingBehind();
109121
// If everything else is ok, don't try to acquire the ledger
110122
// if the requested seq is in the near future relative to
111-
// the validated ledger. If the requested ledger is between
112-
// 1 and 19 inclusive ledgers ahead of the valid ledger this
113-
// node has not built it yet, but it's possible/likely it
114-
// has the tx's necessary to build it and get caught up.
115-
// Plus it might not become validated. On the other hand, if
116-
// it's more than 20 in the future, this node should request
117-
// it so that it can jump ahead and get caught up.
123+
// the validated ledger. Because validations lag behind
124+
// consensus, if we get any further behind than this, we
125+
// risk losing sync, because we don't have the preferred
126+
// ledger available.
118127
LedgerIndex const validSeq =
119128
app_.getLedgerMaster().getValidLedgerIndex();
120-
constexpr std::size_t lagLeeway = 20;
121-
bool const nearFuture =
122-
(seq > validSeq) && (seq < validSeq + lagLeeway);
129+
constexpr std::size_t lagLeeway = 2;
130+
bool const nearFuture = (validSeq > 0) && (seq > validSeq) &&
131+
(seq < validSeq + lagLeeway);
123132
// If everything else is ok, don't try to acquire the ledger
124133
// if the request is related to consensus. (Note that
125134
// consensus calls usually pass a seq of 0, so nearFuture
@@ -128,6 +137,7 @@ class InboundLedgersImp : public InboundLedgers
128137
reason == InboundLedger::Reason::CONSENSUS;
129138
ss << " Evaluating whether to broadcast requests to peers"
130139
<< ". full: " << (isFull ? "true" : "false")
140+
<< ". falling behind: " << (fallingBehind ? "true" : "false")
131141
<< ". ledger sequence " << seq
132142
<< ". Valid sequence: " << validSeq
133143
<< ". Lag leeway: " << lagLeeway
@@ -138,6 +148,9 @@ class InboundLedgersImp : public InboundLedgers
138148
// If the node is not synced, send requests.
139149
if (!isFull)
140150
return true;
151+
// If the node is falling behind, send requests.
152+
if (fallingBehind)
153+
return true;
141154
// If the ledger is in the near future, do NOT send requests.
142155
// This node is probably about to build it.
143156
if (nearFuture)
@@ -148,7 +161,7 @@ class InboundLedgersImp : public InboundLedgers
148161
return false;
149162
return true;
150163
}();
151-
ss << ". Would broadcast to peers? "
164+
ss << ". Broadcast to peers? "
152165
<< (shouldBroadcast ? "true." : "false.");
153166

154167
if (!shouldAcquire)
@@ -183,7 +196,7 @@ class InboundLedgersImp : public InboundLedgers
183196
std::ref(m_clock),
184197
mPeerSetBuilder->build());
185198
mLedgers.emplace(hash, inbound);
186-
inbound->init(sl);
199+
inbound->init(sl, shouldBroadcast);
187200
++mCounter;
188201
}
189202
}
@@ -195,8 +208,12 @@ class InboundLedgersImp : public InboundLedgers
195208
return {};
196209
}
197210

198-
if (!isNew)
199-
inbound->update(seq);
211+
bool const didBroadcast = [&]() {
212+
if (!isNew)
213+
return inbound->update(seq, shouldBroadcast);
214+
return shouldBroadcast;
215+
}();
216+
ss << " First broadcast: " << (didBroadcast ? "true" : "false");
200217

201218
if (!inbound->isComplete())
202219
{

src/xrpld/app/ledger/detail/TimeoutCounter.cpp

+28-17
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,31 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
5555
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count()
5656
<< "ms";
5757
timer_.expires_after(timerInterval_);
58-
timer_.async_wait(
59-
[wptr = pmDowncast()](boost::system::error_code const& ec) {
60-
if (ec == boost::asio::error::operation_aborted)
61-
return;
62-
63-
if (auto ptr = wptr.lock())
58+
timer_.async_wait([wptr =
59+
pmDowncast()](boost::system::error_code const& ec) {
60+
if (auto ptr = wptr.lock())
61+
{
62+
ScopedLockType sl(ptr->mtx_);
63+
if (ec == boost::asio::error::operation_aborted || ptr->skipNext_)
6464
{
6565
JLOG(ptr->journal_.debug())
66-
<< "timer: ec: " << ec << " (operation_aborted: "
67-
<< boost::asio::error::operation_aborted << " - "
68-
<< (ec == boost::asio::error::operation_aborted ? "aborted"
69-
: "other")
70-
<< ")";
71-
ScopedLockType sl(ptr->mtx_);
72-
ptr->queueJob(sl);
66+
<< "Aborting setTimer: " << ec
67+
<< ", skip: " << (ptr->skipNext_ ? "true" : "false");
68+
ptr->skipNext_ = false;
69+
return;
7370
}
74-
});
71+
72+
ptr->queueJob(sl);
73+
}
74+
});
75+
}
76+
77+
std::size_t
78+
TimeoutCounter::cancelTimer(ScopedLockType& sl)
79+
{
80+
auto const ret = timer_.cancel();
81+
JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)";
82+
return ret;
7583
}
7684

7785
void
@@ -108,9 +116,12 @@ TimeoutCounter::invokeOnTimer()
108116

109117
if (!progress_)
110118
{
111-
++timeouts_;
112-
JLOG(journal_.debug())
113-
<< "Timeout(" << timeouts_ << ") " << " acquiring " << hash_;
119+
if (deferred_)
120+
deferred_ = false;
121+
else
122+
++timeouts_;
123+
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
124+
<< " acquiring " << hash_;
114125
onTimer(false, sl);
115126
}
116127
else

src/xrpld/app/ledger/detail/TimeoutCounter.h

+9
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ class TimeoutCounter
102102
void
103103
setTimer(ScopedLockType&);
104104

105+
/** Cancel any waiting timer */
106+
std::size_t
107+
cancelTimer(ScopedLockType&);
108+
105109
/** Queue a job to call invokeOnTimer(). */
106110
void
107111
queueJob(ScopedLockType&);
@@ -133,6 +137,11 @@ class TimeoutCounter
133137
int timeouts_;
134138
bool complete_;
135139
bool failed_;
140+
/** Whether the initialization deferred doing any work until the first
141+
* timeout. */
142+
bool deferred_ = false;
143+
/** Skip the next timeout, regardless of ec */
144+
bool skipNext_ = false;
136145
/** Whether forward progress has been made. */
137146
bool progress_;
138147
/** The minimum time to wait between calls to execute(). */

src/xrpld/app/misc/NetworkOPs.cpp

+25-4
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,8 @@ class NetworkOPsImp final : public NetworkOPs
433433
clearLedgerFetch() override;
434434
Json::Value
435435
getLedgerFetchInfo() override;
436+
bool
437+
isFallingBehind() const override;
436438
std::uint32_t
437439
acceptLedger(
438440
std::optional<std::chrono::milliseconds> consensusDelay) override;
@@ -723,6 +725,7 @@ class NetworkOPsImp final : public NetworkOPs
723725
std::atomic<bool> amendmentBlocked_{false};
724726
std::atomic<bool> amendmentWarned_{false};
725727
std::atomic<bool> unlBlocked_{false};
728+
std::atomic<bool> fallingBehind_{false};
726729

727730
ClosureCounter<void, boost::system::error_code const&> waitHandlerCounter_;
728731
boost::asio::steady_timer heartbeatTimer_;
@@ -1854,13 +1857,25 @@ NetworkOPsImp::beginConsensus(
18541857

18551858
auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
18561859

1857-
JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq
1860+
JLOG(m_journal.info()) << "beginConsensus time for #" << closingInfo.seq
18581861
<< " with LCL " << closingInfo.parentHash;
18591862

1860-
auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
1863+
fallingBehind_ = false;
1864+
if (closingInfo.seq < m_ledgerMaster.getValidLedgerIndex() - 1)
1865+
{
1866+
fallingBehind_ = true;
1867+
JLOG(m_journal.warn())
1868+
<< "beginConsensus Current ledger " << closingInfo.seq
1869+
<< " is at least 2 behind validated "
1870+
<< m_ledgerMaster.getValidLedgerIndex();
1871+
}
1872+
1873+
auto const prevLedger =
1874+
m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
18611875

18621876
if (!prevLedger)
18631877
{
1878+
fallingBehind_ = true;
18641879
// this shouldn't happen unless we jump ledgers
18651880
if (mMode == OperatingMode::FULL)
18661881
{
@@ -1915,7 +1930,7 @@ NetworkOPsImp::beginConsensus(
19151930
mLastConsensusPhase = currPhase;
19161931
}
19171932

1918-
JLOG(m_journal.debug()) << "Initiating consensus engine";
1933+
JLOG(m_journal.debug()) << "beginConsensus Initiating consensus engine";
19191934
return true;
19201935
}
19211936

@@ -1991,7 +2006,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
19912006
{
19922007
// check if the ledger is good enough to go to FULL
19932008
// Note: Do not go to FULL if we don't have the previous ledger
1994-
// check if the ledger is bad enough to go to CONNECTE D -- TODO
2009+
// check if the ledger is bad enough to go to CONNECTED -- TODO
19952010
auto current = m_ledgerMaster.getCurrentLedger();
19962011
if (app_.timeKeeper().now() < (current->info().parentCloseTime +
19972012
2 * current->info().closeTimeResolution))
@@ -2831,6 +2846,12 @@ NetworkOPsImp::getLedgerFetchInfo()
28312846
return app_.getInboundLedgers().getInfo();
28322847
}
28332848

2849+
bool
2850+
NetworkOPsImp::isFallingBehind() const
2851+
{
2852+
return fallingBehind_;
2853+
}
2854+
28342855
void
28352856
NetworkOPsImp::pubProposedTransaction(
28362857
std::shared_ptr<ReadView const> const& ledger,

src/xrpld/app/misc/NetworkOPs.h

+2
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ class NetworkOPs : public InfoSub::Source
229229
clearLedgerFetch() = 0;
230230
virtual Json::Value
231231
getLedgerFetchInfo() = 0;
232+
virtual bool
233+
isFallingBehind() const = 0;
232234

233235
/** Accepts the current transaction tree, return the new ledger's sequence
234236

0 commit comments

Comments
 (0)