Skip to content

Commit 8191c91

Browse files
ximinezMark Travis
and
Mark Travis
committed
Optimize when to acquire ledgers from the network.
Particularly avoid acquiring ledgers likely to be produced locally very soon. Derived from XRPLF#4764 Co-authored-by: Mark Travis <mtravis@ripple.com>
1 parent 3f1fd47 commit 8191c91

8 files changed

+270
-27
lines changed

src/ripple/app/ledger/InboundLedger.h

+22-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class InboundLedger final : public TimeoutCounter,
5959

6060
// Called when another attempt is made to fetch this same ledger
6161
void
62-
update(std::uint32_t seq);
62+
update(std::uint32_t seq, bool broadcast);
6363

6464
/** Returns true if we got all the data. */
6565
bool
@@ -90,7 +90,7 @@ class InboundLedger final : public TimeoutCounter,
9090
bool
9191
checkLocal();
9292
void
93-
init(ScopedLockType& collectionLock);
93+
init(ScopedLockType& collectionLock, bool broadcast);
9494

9595
bool
9696
gotData(
@@ -197,6 +197,26 @@ class InboundLedger final : public TimeoutCounter,
197197
std::unique_ptr<PeerSet> mPeerSet;
198198
};
199199

200+
inline std::string
201+
to_string(InboundLedger::Reason reason)
202+
{
203+
using enum InboundLedger::Reason;
204+
switch (reason)
205+
{
206+
case HISTORY:
207+
return "HISTORY";
208+
case SHARD:
209+
return "SHARD";
210+
case GENERIC:
211+
return "GENERIC";
212+
case CONSENSUS:
213+
return "CONSENSUS";
214+
default:
215+
assert(false);
216+
return "unknown";
217+
}
218+
}
219+
200220
} // namespace ripple
201221

202222
#endif

src/ripple/app/ledger/impl/InboundLedger.cpp

+40-5
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ InboundLedger::InboundLedger(
103103
}
104104

105105
void
106-
InboundLedger::init(ScopedLockType& collectionLock)
106+
InboundLedger::init(ScopedLockType& collectionLock, bool broadcast)
107107
{
108108
ScopedLockType sl(mtx_);
109109
collectionLock.unlock();
@@ -148,8 +148,18 @@ InboundLedger::init(ScopedLockType& collectionLock)
148148
}
149149
if (!complete_)
150150
{
151-
addPeers();
152-
queueJob(sl);
151+
if (broadcast)
152+
{
153+
addPeers();
154+
queueJob(sl);
155+
}
156+
else
157+
{
158+
// Delay to give time to build the ledger before sending
159+
JLOG(journal_.debug()) << "init: Deferring peer requests";
160+
deferred_ = true;
161+
setTimer(sl);
162+
}
153163
return;
154164
}
155165

@@ -180,7 +190,7 @@ InboundLedger::getPeerCount() const
180190
}
181191

182192
void
183-
InboundLedger::update(std::uint32_t seq)
193+
InboundLedger::update(std::uint32_t seq, bool broadcast)
184194
{
185195
ScopedLockType sl(mtx_);
186196

@@ -190,6 +200,24 @@ InboundLedger::update(std::uint32_t seq)
190200

191201
// Prevent this from being swept
192202
touch();
203+
204+
// If the signal is to broadcast, and this request has never tried to
205+
// broadcast before, cancel any waiting timer, then fire off the job to
206+
// broadcast. Note that this is calling mPeerSet->getPeerIds(), not
207+
// getPeerCount(), because the latter will filter out peers that have been
208+
// tried, but are since lost. This wants to check if peers have _ever_ been
209+
// tried. If they have, stick with the normal timer flow.
210+
if (broadcast && mPeerSet->getPeerIds().empty())
211+
{
212+
if (cancelTimer(sl))
213+
{
214+
JLOG(journal_.debug())
215+
<< "update: cancelling timer to send peer requests";
216+
deferred_ = false;
217+
addPeers();
218+
queueJob(sl);
219+
}
220+
}
193221
}
194222

195223
bool
@@ -428,7 +456,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
428456

429457
if (!wasProgress)
430458
{
431-
checkLocal();
459+
if (checkLocal())
460+
{
461+
// Done. Something else (probably consensus) built the ledger
462+
// locally while waiting for data (or possibly before requesting)
463+
assert(isDone());
464+
JLOG(journal_.info()) << "Finished while waiting " << hash_;
465+
return;
466+
}
432467

433468
mByHash = true;
434469

src/ripple/app/ledger/impl/InboundLedgers.cpp

+114-10
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,114 @@ class InboundLedgersImp : public InboundLedgers
7777
reason != InboundLedger::Reason::SHARD ||
7878
(seq != 0 && app_.getShardStore()));
7979

80-
// probably not the right rule
81-
if (app_.getOPs().isNeedNetworkLedger() &&
82-
(reason != InboundLedger::Reason::GENERIC) &&
83-
(reason != InboundLedger::Reason::CONSENSUS))
80+
bool const shouldAcquire = [&]() {
81+
if (!app_.getOPs().isNeedNetworkLedger())
82+
return true;
83+
if (reason == InboundLedger::Reason::GENERIC)
84+
return true;
85+
if (reason == InboundLedger::Reason::CONSENSUS)
86+
return true;
87+
return false;
88+
}();
89+
assert(
90+
shouldAcquire ==
91+
!(app_.getOPs().isNeedNetworkLedger() &&
92+
(reason != InboundLedger::Reason::GENERIC) &&
93+
(reason != InboundLedger::Reason::CONSENSUS)));
94+
95+
std::stringstream ss;
96+
ss << "InboundLedger::acquire: "
97+
<< "Request: " << to_string(hash) << ", " << seq
98+
<< " NeedNetworkLedger: "
99+
<< (app_.getOPs().isNeedNetworkLedger() ? "yes" : "no")
100+
<< " Reason: " << to_string(reason)
101+
<< " Should acquire: " << (shouldAcquire ? "true." : "false.");
102+
103+
/* Acquiring ledgers is somewhat expensive. It requires lots of
104+
* computation and network communication. Avoid it when it's not
105+
* appropriate. Every validation from a peer for a ledger that
106+
* we do not have locally results in a call to this function: even
107+
* if we are moments away from validating the same ledger.
108+
*/
109+
bool const shouldBroadcast = [&]() {
110+
// If the node is not in "full" state, it needs to sync to
111+
// the network, and doesn't have the necessary tx's and
112+
// ledger entries to build the ledger.
113+
bool const isFull = app_.getOPs().isFull();
114+
// fallingBehind means the last closed ledger is at least 2
115+
// behind the validated ledger. If the node is falling
116+
// behind the network, it probably needs information from
117+
// the network to catch up.
118+
//
119+
// The reason this should not simply be only at least 1
120+
// behind the validated ledger is that a slight lag is
121+
// normal case because some nodes get there slightly later
122+
// than others. A difference of 2 means that at least a full
123+
// ledger interval has passed, so the node is beginning to
124+
// fall behind.
125+
bool const fallingBehind = app_.getOPs().isFallingBehind();
126+
// If everything else is ok, don't try to acquire the ledger
127+
// if the requested seq is in the near future relative to
128+
// the validated ledger. If the requested ledger is between
129+
// 1 and 19 inclusive ledgers ahead of the valid ledger this
130+
// node has not built it yet, but it's possible/likely it
131+
// has the tx's necessary to build it and get caught up.
132+
// Plus it might not become validated. On the other hand, if
133+
// it's more than 20 in the future, this node should request
134+
// it so that it can jump ahead and get caught up.
135+
LedgerIndex const validSeq =
136+
app_.getLedgerMaster().getValidLedgerIndex();
137+
constexpr std::size_t lagLeeway = 20;
138+
bool const nearFuture =
139+
(seq > validSeq) && (seq < validSeq + lagLeeway);
140+
// If everything else is ok, don't try to acquire the ledger
141+
// if the request is related to consensus. (Note that
142+
// consensus calls usually pass a seq of 0, so nearFuture
143+
// will be false other than on a brand new network.)
144+
bool const consensus =
145+
reason == InboundLedger::Reason::CONSENSUS;
146+
ss << " Evaluating whether to broadcast requests to peers"
147+
<< ". full: " << (isFull ? "true" : "false")
148+
<< ". falling behind: " << (fallingBehind ? "true" : "false")
149+
<< ". ledger sequence " << seq
150+
<< ". Valid sequence: " << validSeq
151+
<< ". Lag leeway: " << lagLeeway
152+
<< ". request for near future ledger: "
153+
<< (nearFuture ? "true" : "false")
154+
<< ". Consensus: " << (consensus ? "true" : "false");
155+
156+
// If the node is not synced, send requests.
157+
if (!isFull)
158+
return true;
159+
// If the node is falling behind, send requests.
160+
if (fallingBehind)
161+
return true;
162+
// If the ledger is in the near future, do NOT send requests.
163+
// This node is probably about to build it.
164+
if (nearFuture)
165+
return false;
166+
// If the request is because of consensus, do NOT send requests.
167+
// This node is probably about to build it.
168+
if (consensus)
169+
return false;
170+
return true;
171+
}();
172+
ss << ". Broadcast to peers? "
173+
<< (shouldBroadcast ? "true." : "false.");
174+
175+
if (!shouldAcquire)
176+
{
177+
JLOG(j_.debug()) << "Abort(rule): " << ss.str();
84178
return {};
179+
}
85180

86181
bool isNew = true;
87182
std::shared_ptr<InboundLedger> inbound;
88183
{
89184
ScopedLockType sl(mLock);
90185
if (stopping_)
91186
{
187+
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
92188
return {};
93189
}
94190

@@ -108,19 +204,26 @@ class InboundLedgersImp : public InboundLedgers
108204
std::ref(m_clock),
109205
mPeerSetBuilder->build());
110206
mLedgers.emplace(hash, inbound);
111-
inbound->init(sl);
207+
inbound->init(sl, shouldBroadcast);
112208
++mCounter;
113209
}
114210
}
211+
ss << " IsNew: " << (isNew ? "true" : "false");
115212

116213
if (inbound->isFailed())
214+
{
215+
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
117216
return {};
217+
}
118218

119219
if (!isNew)
120-
inbound->update(seq);
220+
inbound->update(seq, shouldBroadcast);
121221

122222
if (!inbound->isComplete())
223+
{
224+
JLOG(j_.debug()) << "Incomplete: " << ss.str();
123225
return {};
226+
}
124227

125228
if (reason == InboundLedger::Reason::HISTORY)
126229
{
@@ -133,21 +236,22 @@ class InboundLedgersImp : public InboundLedgers
133236
if (!shardStore)
134237
{
135238
JLOG(j_.error())
136-
<< "Acquiring shard with no shard store available";
239+
<< "Acquiring shard with no shard store available"
240+
<< ss.str();
137241
return {};
138242
}
139243
if (inbound->getLedger()->stateMap().family().isShardBacked())
140244
shardStore->setStored(inbound->getLedger());
141245
else
142246
shardStore->storeLedger(inbound->getLedger());
143247
}
248+
249+
JLOG(j_.debug()) << "Complete: " << ss.str();
144250
return inbound->getLedger();
145251
};
146252
using namespace std::chrono_literals;
147-
std::shared_ptr<Ledger const> ledger = perf::measureDurationAndLog(
253+
return perf::measureDurationAndLog(
148254
doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
149-
150-
return ledger;
151255
}
152256

153257
void

src/ripple/app/ledger/impl/LedgerMaster.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -1114,8 +1114,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
11141114
}
11151115

11161116
JLOG(m_journal.info()) << "Advancing accepted ledger to "
1117-
<< ledger->info().seq << " with >= " << minVal
1118-
<< " validations";
1117+
<< ledger->info().seq << " ("
1118+
<< to_short_string(ledger->info().hash)
1119+
<< ") with >= " << minVal << " validations";
11191120

11201121
ledger->setValidated();
11211122
ledger->setFull();

src/ripple/app/ledger/impl/TimeoutCounter.cpp

+19-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
5252
{
5353
if (isDone())
5454
return;
55+
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_;
5556
timer_.expires_after(timerInterval_);
5657
timer_.async_wait(
5758
[wptr = pmDowncast()](boost::system::error_code const& ec) {
@@ -61,11 +62,25 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
6162
if (auto ptr = wptr.lock())
6263
{
6364
ScopedLockType sl(ptr->mtx_);
65+
JLOG(ptr->journal_.debug())
66+
<< "timer: ec: " << ec << " (operation_aborted: "
67+
<< boost::asio::error::operation_aborted << " - "
68+
<< (ec == boost::asio::error::operation_aborted ? "aborted"
69+
: "other")
70+
<< ")";
6471
ptr->queueJob(sl);
6572
}
6673
});
6774
}
6875

76+
std::size_t
77+
TimeoutCounter::cancelTimer(ScopedLockType& sl)
78+
{
79+
auto const ret = timer_.cancel();
80+
JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)";
81+
return ret;
82+
}
83+
6984
void
7085
TimeoutCounter::queueJob(ScopedLockType& sl)
7186
{
@@ -100,7 +115,10 @@ TimeoutCounter::invokeOnTimer()
100115

101116
if (!progress_)
102117
{
103-
++timeouts_;
118+
if (deferred_)
119+
deferred_ = false;
120+
else
121+
++timeouts_;
104122
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
105123
<< " acquiring " << hash_;
106124
onTimer(false, sl);

src/ripple/app/ledger/impl/TimeoutCounter.h

+7
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ class TimeoutCounter
101101
void
102102
setTimer(ScopedLockType&);
103103

104+
/** Cancel any waiting timer */
105+
std::size_t
106+
cancelTimer(ScopedLockType&);
107+
104108
/** Queue a job to call invokeOnTimer(). */
105109
void
106110
queueJob(ScopedLockType&);
@@ -132,6 +136,9 @@ class TimeoutCounter
132136
int timeouts_;
133137
bool complete_;
134138
bool failed_;
139+
/** Whether the initialization deferred doing any work until the first
140+
* timeout. */
141+
bool deferred_ = false;
135142
/** Whether forward progress has been made. */
136143
bool progress_;
137144
/** The minimum time to wait between calls to execute(). */

0 commit comments

Comments
 (0)