Skip to content

Commit 6696bf1

Browse files
committed
Class "Pending" to keep track of pending operations on distinct items
1 parent 8191c91 commit 6696bf1

File tree

3 files changed

+145
-48
lines changed

3 files changed

+145
-48
lines changed

src/ripple/app/ledger/impl/InboundLedgers.cpp

+18-21
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <ripple/app/misc/NetworkOPs.h>
2424
#include <ripple/basics/DecayingSample.h>
2525
#include <ripple/basics/Log.h>
26+
#include <ripple/basics/Pending.h>
2627
#include <ripple/basics/PerfLog.h>
2728
#include <ripple/beast/container/aged_map.h>
2829
#include <ripple/beast/core/LexicalCast.h>
@@ -260,29 +261,25 @@ class InboundLedgersImp : public InboundLedgers
260261
std::uint32_t seq,
261262
InboundLedger::Reason reason) override
262263
{
263-
std::unique_lock lock(acquiresMutex_);
264-
try
265-
{
266-
if (pendingAcquires_.contains(hash))
267-
return;
268-
pendingAcquires_.insert(hash);
269-
lock.unlock();
270-
acquire(hash, seq, reason);
271-
}
272-
catch (std::exception const& e)
264+
if (Pending pending{acquiresMutex_, pendingAcquires_, hash})
273265
{
274-
JLOG(j_.warn())
275-
<< "Exception thrown for acquiring new inbound ledger " << hash
276-
<< ": " << e.what();
277-
}
278-
catch (...)
279-
{
280-
JLOG(j_.warn())
281-
<< "Unknown exception thrown for acquiring new inbound ledger "
282-
<< hash;
266+
try
267+
{
268+
acquire(hash, seq, reason);
269+
}
270+
catch (std::exception const& e)
271+
{
272+
JLOG(j_.warn())
273+
<< "Exception thrown for acquiring new inbound ledger "
274+
<< hash << ": " << e.what();
275+
}
276+
catch (...)
277+
{
278+
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
279+
"inbound ledger "
280+
<< hash;
281+
}
283282
}
284-
lock.lock();
285-
pendingAcquires_.erase(hash);
286283
}
287284

288285
std::shared_ptr<InboundLedger>

src/ripple/app/misc/NetworkOPs.cpp

+21-27
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include <ripple/app/rdb/backend/SQLiteDatabase.h>
4343
#include <ripple/app/reporting/ReportingETL.h>
4444
#include <ripple/app/tx/apply.h>
45+
#include <ripple/basics/Pending.h>
4546
#include <ripple/basics/PerfLog.h>
4647
#include <ripple/basics/UptimeClock.h>
4748
#include <ripple/basics/mulDiv.h>
@@ -2366,34 +2367,27 @@ NetworkOPsImp::recvValidation(
23662367
JLOG(m_journal.trace())
23672368
<< "recvValidation " << val->getLedgerHash() << " from " << source;
23682369

2369-
std::unique_lock lock(validationsMutex_);
2370-
BypassAccept bypassAccept = BypassAccept::no;
2371-
try
2372-
{
2373-
if (pendingValidations_.contains(val->getLedgerHash()))
2374-
bypassAccept = BypassAccept::yes;
2375-
else
2376-
pendingValidations_.insert(val->getLedgerHash());
2377-
lock.unlock();
2378-
handleNewValidation(app_, val, source, bypassAccept, m_journal);
2379-
}
2380-
catch (std::exception const& e)
2381-
{
2382-
JLOG(m_journal.warn())
2383-
<< "Exception thrown for handling new validation "
2384-
<< val->getLedgerHash() << ": " << e.what();
2385-
}
2386-
catch (...)
2387-
{
2388-
JLOG(m_journal.warn())
2389-
<< "Unknown exception thrown for handling new validation "
2390-
<< val->getLedgerHash();
2391-
}
2392-
if (bypassAccept == BypassAccept::no)
23932370
{
2394-
lock.lock();
2395-
pendingValidations_.erase(val->getLedgerHash());
2396-
lock.unlock();
2371+
Pending pending(
2372+
validationsMutex_, pendingValidations_, val->getLedgerHash());
2373+
try
2374+
{
2375+
BypassAccept bypassAccept =
2376+
pending.unusable() ? BypassAccept::yes : BypassAccept::no;
2377+
handleNewValidation(app_, val, source, bypassAccept, m_journal);
2378+
}
2379+
catch (std::exception const& e)
2380+
{
2381+
JLOG(m_journal.warn())
2382+
<< "Exception thrown for handling new validation "
2383+
<< val->getLedgerHash() << ": " << e.what();
2384+
}
2385+
catch (...)
2386+
{
2387+
JLOG(m_journal.warn())
2388+
<< "Unknown exception thrown for handling new validation "
2389+
<< val->getLedgerHash();
2390+
}
23972391
}
23982392

23992393
pubValidation(val);

src/ripple/basics/Pending.h

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
//------------------------------------------------------------------------------
2+
/*
3+
This file is part of rippled: https://github.com/ripple/rippled
4+
Copyright (c) 2024 Ripple Labs Inc.
5+
6+
Permission to use, copy, modify, and/or distribute this software for any
7+
purpose with or without fee is hereby granted, provided that the above
8+
copyright notice and this permission notice appear in all copies.
9+
10+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
//==============================================================================
19+
20+
#ifndef RIPPLE_BASICS_PENDING_H_INCLUDED
21+
#define RIPPLE_BASICS_PENDING_H_INCLUDED
22+
23+
/** RAII class to check if an Item is Pending processing in a Collection.
24+
*
25+
* If the Item is not in the Collection, it will be added under lock in the
26+
* ctor, and removed under lock in the dtor.
27+
*
28+
* If the Item is in the Collection, no changes will be made to the collection,
29+
* and the Pending object will be considered "unusable". It's up to the caller
30+
* to decide what "unusable" means. (e.g. Skip a block of code, or set a flag.)
31+
*
32+
* The current use is to avoid lock contention that would be involved in
33+
* processing something associated with the Item.
34+
*
35+
* Example:
36+
*
37+
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
38+
* {
39+
* Pending pending{acquiresMutex_, pendingAcquires_, hash};
40+
* if (pending.unusable())
41+
* return;
42+
* acquire(hash, ...);
43+
* }
44+
*
45+
* Additionally, a Pending will evaluate to `true` if it is "usable", `false` if
46+
* it is "unusable".
47+
*
48+
* Example:
49+
*
50+
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
51+
* {
52+
* if (Pending pending{acquiresMutex_, pendingAcquires_, hash})
53+
* {
54+
* acquire(hash, ...);
55+
* }
56+
* }
57+
*
58+
59+
*/
60+
template <class Mutex, class Collection, class Item>
61+
class Pending
62+
{
63+
public:
64+
Pending(Mutex& mtx, Collection& collection, Item const& item)
65+
: mtx_(mtx), collection_(collection), item_(item), unusable_(insert())
66+
{
67+
}
68+
69+
~Pending()
70+
{
71+
if (!unusable_)
72+
{
73+
std::unique_lock<Mutex> lock_(mtx_);
74+
collection_.erase(item_);
75+
}
76+
}
77+
78+
bool
79+
unusable() const
80+
{
81+
return unusable_;
82+
}
83+
84+
operator bool() const
85+
{
86+
return !unusable_;
87+
}
88+
89+
private:
90+
bool
91+
insert()
92+
{
93+
std::unique_lock<Mutex> lock_(mtx_);
94+
bool exists = collection_.contains(item_);
95+
if (!exists)
96+
collection_.insert(item_);
97+
return exists;
98+
}
99+
100+
Mutex& mtx_;
101+
Collection& collection_;
102+
Item const item_;
103+
bool const unusable_;
104+
};
105+
106+
#endif

0 commit comments

Comments
 (0)