Skip to content

Commit 88f9f18

Browse files
ximinezMark Travis
and
Mark Travis
committed
Optimize when to acquire ledgers from the network.
Particularly avoid acquiring ledgers likely to be produced locally very soon. Derived from XRPLF#4764 Co-authored-by: Mark Travis <mtravis@ripple.com>
1 parent e22bb6e commit 88f9f18

8 files changed

+270
-27
lines changed

src/ripple/app/ledger/InboundLedger.h

+22-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class InboundLedger final : public TimeoutCounter,
5959

6060
// Called when another attempt is made to fetch this same ledger
6161
void
62-
update(std::uint32_t seq);
62+
update(std::uint32_t seq, bool broadcast);
6363

6464
/** Returns true if we got all the data. */
6565
bool
@@ -90,7 +90,7 @@ class InboundLedger final : public TimeoutCounter,
9090
bool
9191
checkLocal();
9292
void
93-
init(ScopedLockType& collectionLock);
93+
init(ScopedLockType& collectionLock, bool broadcast);
9494

9595
bool
9696
gotData(
@@ -197,6 +197,26 @@ class InboundLedger final : public TimeoutCounter,
197197
std::unique_ptr<PeerSet> mPeerSet;
198198
};
199199

200+
inline std::string
201+
to_string(InboundLedger::Reason reason)
202+
{
203+
using enum InboundLedger::Reason;
204+
switch (reason)
205+
{
206+
case HISTORY:
207+
return "HISTORY";
208+
case SHARD:
209+
return "SHARD";
210+
case GENERIC:
211+
return "GENERIC";
212+
case CONSENSUS:
213+
return "CONSENSUS";
214+
default:
215+
assert(false);
216+
return "unknown";
217+
}
218+
}
219+
200220
} // namespace ripple
201221

202222
#endif

src/ripple/app/ledger/impl/InboundLedger.cpp

+40-5
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ InboundLedger::InboundLedger(
103103
}
104104

105105
void
106-
InboundLedger::init(ScopedLockType& collectionLock)
106+
InboundLedger::init(ScopedLockType& collectionLock, bool broadcast)
107107
{
108108
ScopedLockType sl(mtx_);
109109
collectionLock.unlock();
@@ -148,8 +148,18 @@ InboundLedger::init(ScopedLockType& collectionLock)
148148
}
149149
if (!complete_)
150150
{
151-
addPeers();
152-
queueJob(sl);
151+
if (broadcast)
152+
{
153+
addPeers();
154+
queueJob(sl);
155+
}
156+
else
157+
{
158+
// Delay to give time to build the ledger before sending
159+
JLOG(journal_.debug()) << "init: Deferring peer requests";
160+
deferred_ = true;
161+
setTimer(sl);
162+
}
153163
return;
154164
}
155165

@@ -180,7 +190,7 @@ InboundLedger::getPeerCount() const
180190
}
181191

182192
void
183-
InboundLedger::update(std::uint32_t seq)
193+
InboundLedger::update(std::uint32_t seq, bool broadcast)
184194
{
185195
ScopedLockType sl(mtx_);
186196

@@ -190,6 +200,24 @@ InboundLedger::update(std::uint32_t seq)
190200

191201
// Prevent this from being swept
192202
touch();
203+
204+
// If the signal is to broadcast, and this request has never tried to
205+
// broadcast before, cancel any waiting timer, then fire off the job to
206+
// broadcast. Note that this is calling mPeerSet->getPeerIds(), not
207+
// getPeerCount(), because the latter will filter out peers that have been
208+
// tried, but are since lost. This wants to check if peers have _ever_ been
209+
// tried. If they have, stick with the normal timer flow.
210+
if (broadcast && mPeerSet->getPeerIds().empty())
211+
{
212+
if (cancelTimer(sl))
213+
{
214+
JLOG(journal_.debug())
215+
<< "update: cancelling timer to send peer requests";
216+
deferred_ = false;
217+
addPeers();
218+
queueJob(sl);
219+
}
220+
}
193221
}
194222

195223
bool
@@ -428,7 +456,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
428456

429457
if (!wasProgress)
430458
{
431-
checkLocal();
459+
if (checkLocal())
460+
{
461+
// Done. Something else (probably consensus) built the ledger
462+
// locally while waiting for data (or possibly before requesting)
463+
assert(isDone());
464+
JLOG(journal_.info()) << "Finished while waiting " << hash_;
465+
return;
466+
}
432467

433468
mByHash = true;
434469

src/ripple/app/ledger/impl/InboundLedgers.cpp

+114-10
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,114 @@ class InboundLedgersImp : public InboundLedgers
7878
reason != InboundLedger::Reason::SHARD ||
7979
(seq != 0 && app_.getShardStore()));
8080

81-
// probably not the right rule
82-
if (app_.getOPs().isNeedNetworkLedger() &&
83-
(reason != InboundLedger::Reason::GENERIC) &&
84-
(reason != InboundLedger::Reason::CONSENSUS))
81+
bool const shouldAcquire = [&]() {
82+
if (!app_.getOPs().isNeedNetworkLedger())
83+
return true;
84+
if (reason == InboundLedger::Reason::GENERIC)
85+
return true;
86+
if (reason == InboundLedger::Reason::CONSENSUS)
87+
return true;
88+
return false;
89+
}();
90+
assert(
91+
shouldAcquire ==
92+
!(app_.getOPs().isNeedNetworkLedger() &&
93+
(reason != InboundLedger::Reason::GENERIC) &&
94+
(reason != InboundLedger::Reason::CONSENSUS)));
95+
96+
std::stringstream ss;
97+
ss << "InboundLedger::acquire: "
98+
<< "Request: " << to_string(hash) << ", " << seq
99+
<< " NeedNetworkLedger: "
100+
<< (app_.getOPs().isNeedNetworkLedger() ? "yes" : "no")
101+
<< " Reason: " << to_string(reason)
102+
<< " Should acquire: " << (shouldAcquire ? "true." : "false.");
103+
104+
/* Acquiring ledgers is somewhat expensive. It requires lots of
105+
* computation and network communication. Avoid it when it's not
106+
* appropriate. Every validation from a peer for a ledger that
107+
* we do not have locally results in a call to this function: even
108+
* if we are moments away from validating the same ledger.
109+
*/
110+
bool const shouldBroadcast = [&]() {
111+
// If the node is not in "full" state, it needs to sync to
112+
// the network, and doesn't have the necessary tx's and
113+
// ledger entries to build the ledger.
114+
bool const isFull = app_.getOPs().isFull();
115+
// fallingBehind means the last closed ledger is at least 2
116+
// behind the validated ledger. If the node is falling
117+
// behind the network, it probably needs information from
118+
// the network to catch up.
119+
//
120+
// The reason this should not simply be only at least 1
121+
// behind the validated ledger is that a slight lag is
122+
// normal case because some nodes get there slightly later
123+
// than others. A difference of 2 means that at least a full
124+
// ledger interval has passed, so the node is beginning to
125+
// fall behind.
126+
bool const fallingBehind = app_.getOPs().isFallingBehind();
127+
// If everything else is ok, don't try to acquire the ledger
128+
// if the requested seq is in the near future relative to
129+
// the validated ledger. If the requested ledger is between
130+
// 1 and 19 inclusive ledgers ahead of the valid ledger this
131+
// node has not built it yet, but it's possible/likely it
132+
// has the tx's necessary to build it and get caught up.
133+
// Plus it might not become validated. On the other hand, if
134+
// it's more than 20 in the future, this node should request
135+
// it so that it can jump ahead and get caught up.
136+
LedgerIndex const validSeq =
137+
app_.getLedgerMaster().getValidLedgerIndex();
138+
constexpr std::size_t lagLeeway = 20;
139+
bool const nearFuture =
140+
(seq > validSeq) && (seq < validSeq + lagLeeway);
141+
// If everything else is ok, don't try to acquire the ledger
142+
// if the request is related to consensus. (Note that
143+
// consensus calls usually pass a seq of 0, so nearFuture
144+
// will be false other than on a brand new network.)
145+
bool const consensus =
146+
reason == InboundLedger::Reason::CONSENSUS;
147+
ss << " Evaluating whether to broadcast requests to peers"
148+
<< ". full: " << (isFull ? "true" : "false")
149+
<< ". falling behind: " << (fallingBehind ? "true" : "false")
150+
<< ". ledger sequence " << seq
151+
<< ". Valid sequence: " << validSeq
152+
<< ". Lag leeway: " << lagLeeway
153+
<< ". request for near future ledger: "
154+
<< (nearFuture ? "true" : "false")
155+
<< ". Consensus: " << (consensus ? "true" : "false");
156+
157+
// If the node is not synced, send requests.
158+
if (!isFull)
159+
return true;
160+
// If the node is falling behind, send requests.
161+
if (fallingBehind)
162+
return true;
163+
// If the ledger is in the near future, do NOT send requests.
164+
// This node is probably about to build it.
165+
if (nearFuture)
166+
return false;
167+
// If the request is because of consensus, do NOT send requests.
168+
// This node is probably about to build it.
169+
if (consensus)
170+
return false;
171+
return true;
172+
}();
173+
ss << ". Broadcast to peers? "
174+
<< (shouldBroadcast ? "true." : "false.");
175+
176+
if (!shouldAcquire)
177+
{
178+
JLOG(j_.debug()) << "Abort(rule): " << ss.str();
85179
return {};
180+
}
86181

87182
bool isNew = true;
88183
std::shared_ptr<InboundLedger> inbound;
89184
{
90185
ScopedLockType sl(mLock);
91186
if (stopping_)
92187
{
188+
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
93189
return {};
94190
}
95191

@@ -109,19 +205,26 @@ class InboundLedgersImp : public InboundLedgers
109205
std::ref(m_clock),
110206
mPeerSetBuilder->build());
111207
mLedgers.emplace(hash, inbound);
112-
inbound->init(sl);
208+
inbound->init(sl, shouldBroadcast);
113209
++mCounter;
114210
}
115211
}
212+
ss << " IsNew: " << (isNew ? "true" : "false");
116213

117214
if (inbound->isFailed())
215+
{
216+
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
118217
return {};
218+
}
119219

120220
if (!isNew)
121-
inbound->update(seq);
221+
inbound->update(seq, shouldBroadcast);
122222

123223
if (!inbound->isComplete())
224+
{
225+
JLOG(j_.debug()) << "Incomplete: " << ss.str();
124226
return {};
227+
}
125228

126229
if (reason == InboundLedger::Reason::HISTORY)
127230
{
@@ -134,21 +237,22 @@ class InboundLedgersImp : public InboundLedgers
134237
if (!shardStore)
135238
{
136239
JLOG(j_.error())
137-
<< "Acquiring shard with no shard store available";
240+
<< "Acquiring shard with no shard store available"
241+
<< ss.str();
138242
return {};
139243
}
140244
if (inbound->getLedger()->stateMap().family().isShardBacked())
141245
shardStore->setStored(inbound->getLedger());
142246
else
143247
shardStore->storeLedger(inbound->getLedger());
144248
}
249+
250+
JLOG(j_.debug()) << "Complete: " << ss.str();
145251
return inbound->getLedger();
146252
};
147253
using namespace std::chrono_literals;
148-
std::shared_ptr<Ledger const> ledger = perf::measureDurationAndLog(
254+
return perf::measureDurationAndLog(
149255
doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
150-
151-
return ledger;
152256
}
153257

154258
void

src/ripple/app/ledger/impl/LedgerMaster.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -1114,8 +1114,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
11141114
}
11151115

11161116
JLOG(m_journal.info()) << "Advancing accepted ledger to "
1117-
<< ledger->info().seq << " with >= " << minVal
1118-
<< " validations";
1117+
<< ledger->info().seq << " ("
1118+
<< to_short_string(ledger->info().hash)
1119+
<< ") with >= " << minVal << " validations";
11191120

11201121
ledger->setValidated();
11211122
ledger->setFull();

src/ripple/app/ledger/impl/TimeoutCounter.cpp

+19-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
5252
{
5353
if (isDone())
5454
return;
55+
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_;
5556
timer_.expires_after(timerInterval_);
5657
timer_.async_wait(
5758
[wptr = pmDowncast()](boost::system::error_code const& ec) {
@@ -61,11 +62,25 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
6162
if (auto ptr = wptr.lock())
6263
{
6364
ScopedLockType sl(ptr->mtx_);
65+
JLOG(ptr->journal_.debug())
66+
<< "timer: ec: " << ec << " (operation_aborted: "
67+
<< boost::asio::error::operation_aborted << " - "
68+
<< (ec == boost::asio::error::operation_aborted ? "aborted"
69+
: "other")
70+
<< ")";
6471
ptr->queueJob(sl);
6572
}
6673
});
6774
}
6875

76+
std::size_t
77+
TimeoutCounter::cancelTimer(ScopedLockType& sl)
78+
{
79+
auto const ret = timer_.cancel();
80+
JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)";
81+
return ret;
82+
}
83+
6984
void
7085
TimeoutCounter::queueJob(ScopedLockType& sl)
7186
{
@@ -100,7 +115,10 @@ TimeoutCounter::invokeOnTimer()
100115

101116
if (!progress_)
102117
{
103-
++timeouts_;
118+
if (deferred_)
119+
deferred_ = false;
120+
else
121+
++timeouts_;
104122
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
105123
<< " acquiring " << hash_;
106124
onTimer(false, sl);

src/ripple/app/ledger/impl/TimeoutCounter.h

+7
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ class TimeoutCounter
101101
void
102102
setTimer(ScopedLockType&);
103103

104+
/** Cancel any waiting timer */
105+
std::size_t
106+
cancelTimer(ScopedLockType&);
107+
104108
/** Queue a job to call invokeOnTimer(). */
105109
void
106110
queueJob(ScopedLockType&);
@@ -132,6 +136,9 @@ class TimeoutCounter
132136
int timeouts_;
133137
bool complete_;
134138
bool failed_;
139+
/** Whether the initialization deferred doing any work until the first
140+
* timeout. */
141+
bool deferred_ = false;
135142
/** Whether forward progress has been made. */
136143
bool progress_;
137144
/** The minimum time to wait between calls to execute(). */

0 commit comments

Comments
 (0)