From cf6cfe1de15873732610e2237e3f5e68fbd265ae Mon Sep 17 00:00:00 2001 From: Ed Hennis Date: Fri, 26 Apr 2024 18:19:11 -0400 Subject: [PATCH] Improve transaction relaying (#4985) Process held transactions through existing NetworkOPs batching: * Ensures that successful transactions are broadcast to peers, appropriate failed transactions are held for later attempts, fee changes are sent to subscribers, etc. Pop all transactions with sequential sequences, or tickets Give a transaction more chances to be retried: * Hold if the transaction gets a ter, tel, or tef result. * Use the new SF_HELD flag to ultimately prevent the transaction from being held and retried too many times. Decrease `shouldRelay` limit to 30s: * Allows transactions, validator lists, proposals, and validations to be relayed more often, but only when triggered by another event, such as receiving it from a peer * Decrease from 5min. * Expected to help transaction throughput on poorly connected networks. --- src/ripple/app/ledger/LocalTxs.h | 5 + src/ripple/app/ledger/impl/LedgerMaster.cpp | 29 ++-- src/ripple/app/ledger/impl/LocalTxs.cpp | 7 +- src/ripple/app/main/Application.cpp | 3 +- src/ripple/app/misc/CanonicalTXSet.cpp | 12 +- src/ripple/app/misc/HashRouter.cpp | 2 +- src/ripple/app/misc/HashRouter.h | 23 ++- src/ripple/app/misc/NetworkOPs.cpp | 168 +++++++++++++++++--- src/ripple/app/misc/NetworkOPs.h | 10 ++ src/test/app/HashRouter_test.cpp | 12 +- 10 files changed, 211 insertions(+), 60 deletions(-) diff --git a/src/ripple/app/ledger/LocalTxs.h b/src/ripple/app/ledger/LocalTxs.h index f427a5e0477..dfd644fd207 100644 --- a/src/ripple/app/ledger/LocalTxs.h +++ b/src/ripple/app/ledger/LocalTxs.h @@ -33,6 +33,11 @@ namespace ripple { class LocalTxs { public: + // The number of ledgers to hold a transaction is essentially + // arbitrary. It should be sufficient to allow the transaction to + // get into a fully-validated ledger. + static constexpr int holdLedgers = 5; + virtual ~LocalTxs() = default; // Add a new local transaction diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 9388a3005ba..339e872b312 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -548,26 +548,17 @@ LedgerMaster::storeLedger(std::shared_ptr ledger) void LedgerMaster::applyHeldTransactions() { - std::lock_guard sl(m_mutex); + CanonicalTXSet const set = [this]() { + std::lock_guard sl(m_mutex); + // VFALCO NOTE The hash for an open ledger is undefined so we use + // something that is a reasonable substitute. + CanonicalTXSet set(app_.openLedger().current()->info().parentHash); + std::swap(mHeldTransactions, set); + return set; + }(); - app_.openLedger().modify([&](OpenView& view, beast::Journal j) { - bool any = false; - for (auto const& it : mHeldTransactions) - { - ApplyFlags flags = tapNONE; - auto const result = - app_.getTxQ().apply(app_, view, it.second, flags, j); - if (result.second) - any = true; - } - return any; - }); - - // VFALCO TODO recreate the CanonicalTxSet object instead of resetting - // it. - // VFALCO NOTE The hash for an open ledger is undefined so we use - // something that is a reasonable substitute. - mHeldTransactions.reset(app_.openLedger().current()->info().parentHash); + if (!set.empty()) + app_.getOPs().processTransactionSet(set); } std::shared_ptr diff --git a/src/ripple/app/ledger/impl/LocalTxs.cpp b/src/ripple/app/ledger/impl/LocalTxs.cpp index afee5e2d4d0..789a29c19ee 100644 --- a/src/ripple/app/ledger/impl/LocalTxs.cpp +++ b/src/ripple/app/ledger/impl/LocalTxs.cpp @@ -53,14 +53,9 @@ namespace ripple { class LocalTx { public: - // The number of ledgers to hold a transaction is essentially - // arbitrary. It should be sufficient to allow the transaction to - // get into a fully-validated ledger. - static int const holdLedgers = 5; - LocalTx(LedgerIndex index, std::shared_ptr const& txn) : m_txn(txn) - , m_expire(index + holdLedgers) + , m_expire(index + LocalTxs::holdLedgers) , m_id(txn->getTransactionID()) , m_account(txn->getAccountID(sfAccount)) , m_seqProxy(txn->getSeqProxy()) diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 55300a390c9..06e92d13bd8 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -457,7 +457,8 @@ class ApplicationImp : public Application, public BasicApp , hashRouter_(std::make_unique( stopwatch(), - HashRouter::getDefaultHoldTime())) + HashRouter::getDefaultHoldTime(), + HashRouter::getDefaultRelayTime())) , mValidations( ValidationParms(), diff --git a/src/ripple/app/misc/CanonicalTXSet.cpp b/src/ripple/app/misc/CanonicalTXSet.cpp index a9fcd17f056..e0415406aa1 100644 --- a/src/ripple/app/misc/CanonicalTXSet.cpp +++ b/src/ripple/app/misc/CanonicalTXSet.cpp @@ -66,18 +66,22 @@ CanonicalTXSet::popAcctTransaction(std::shared_ptr const& tx) // 1. Prioritize transactions with Sequences over transactions with // Tickets. // - // 2. Don't worry about consecutive Sequence numbers. Creating Tickets - // can introduce a discontinuity in Sequence numbers. + // 2. For transactions not using Tickets, look for consecutive Sequence + // numbers. For transactions using Tickets, don't worry about + // consecutive Sequence numbers. Tickets can process out of order. // // 3. After handling all transactions with Sequences, return Tickets // with the lowest Ticket ID first. std::shared_ptr result; uint256 const effectiveAccount{accountKey(tx->getAccountID(sfAccount))}; - Key const after(effectiveAccount, tx->getSeqProxy(), beast::zero); + auto const seqProxy = tx->getSeqProxy(); + Key const after(effectiveAccount, seqProxy, beast::zero); auto const itrNext{map_.lower_bound(after)}; if (itrNext != map_.end() && - itrNext->first.getAccount() == effectiveAccount) + itrNext->first.getAccount() == effectiveAccount && + (!itrNext->second->getSeqProxy().isSeq() || + itrNext->second->getSeqProxy().value() == seqProxy.value() + 1)) { result = std::move(itrNext->second); map_.erase(itrNext); diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index 8085d6892ab..ce7f9cbfd1c 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -122,7 +122,7 @@ HashRouter::shouldRelay(uint256 const& key) auto& s = emplace(key).first; - if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_)) + if (!s.shouldRelay(suppressionMap_.clock().now(), relayTime_)) return {}; return s.releasePeerSet(); diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index 8c546b2c51d..2abe6788b36 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -33,6 +33,7 @@ namespace ripple { // TODO convert these macros to int constants or an enum #define SF_BAD 0x02 // Temporarily bad #define SF_SAVED 0x04 +#define SF_HELD 0x08 // Held by LedgerMaster after potential processing failure #define SF_TRUSTED 0x10 // comes from trusted source // Private flags, used internally in apply.cpp. @@ -143,8 +144,21 @@ class HashRouter return 300s; } - HashRouter(Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds) - : suppressionMap_(clock), holdTime_(entryHoldTimeInSeconds) + static inline std::chrono::seconds + getDefaultRelayTime() + { + using namespace std::chrono; + + return 30s; + } + + HashRouter( + Stopwatch& clock, + std::chrono::seconds entryHoldTime, + std::chrono::seconds entryRelayTime) + : suppressionMap_(clock) + , holdTime_(entryHoldTime) + , relayTime_(entryRelayTime) { } @@ -195,11 +209,11 @@ class HashRouter Effects: If the item should be relayed, this function will not - return `true` again until the hold time has expired. + return a seated optional again until the relay time has expired. The internal set of peers will also be reset. @return A `std::optional` set of peers which do not need to be - relayed to. If the result is uninitialized, the item should + relayed to. If the result is unseated, the item should _not_ be relayed. */ std::optional> @@ -221,6 +235,7 @@ class HashRouter suppressionMap_; std::chrono::seconds const holdTime_; + std::chrono::seconds const relayTime_; }; } // namespace ripple diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index cd85bc9e4e1..f85974fc603 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -288,6 +288,9 @@ class NetworkOPsImp final : public NetworkOPs bool bLocal, FailHard failType) override; + void + processTransactionSet(CanonicalTXSet const& set) override; + /** * For transactions submitted directly by a client, apply batch of * transactions and wait for this transaction to complete. @@ -317,6 +320,16 @@ class NetworkOPsImp final : public NetworkOPs bool bUnlimited, FailHard failtype); +private: + bool + preProcessTransaction(std::shared_ptr& transaction); + + void + doTransactionSyncBatch( + std::unique_lock& lock, + std::function&)> retryCallback); + +public: /** * Apply transactions in batches. Continue until none are queued. */ @@ -1187,14 +1200,9 @@ NetworkOPsImp::submitTransaction(std::shared_ptr const& iTrans) }); } -void -NetworkOPsImp::processTransaction( - std::shared_ptr& transaction, - bool bUnlimited, - bool bLocal, - FailHard failType) +bool +NetworkOPsImp::preProcessTransaction(std::shared_ptr& transaction) { - auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN"); auto const newFlags = app_.getHashRouter().getFlags(transaction->getID()); if ((newFlags & SF_BAD) != 0) @@ -1203,7 +1211,7 @@ NetworkOPsImp::processTransaction( JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n"; transaction->setStatus(INVALID); transaction->setResult(temBAD_SIGNATURE); - return; + return false; } // NOTE eahennis - I think this check is redundant, @@ -1224,12 +1232,28 @@ NetworkOPsImp::processTransaction( transaction->setStatus(INVALID); transaction->setResult(temBAD_SIGNATURE); app_.getHashRouter().setFlags(transaction->getID(), SF_BAD); - return; + return false; } // canonicalize can change our pointer app_.getMasterTransaction().canonicalize(&transaction); + return true; +} + +void +NetworkOPsImp::processTransaction( + std::shared_ptr& transaction, + bool bUnlimited, + bool bLocal, + FailHard failType) +{ + auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN"); + + // preProcessTransaction can change our pointer + if (!preProcessTransaction(transaction)) + return; + if (bLocal) doTransactionSync(transaction, bUnlimited, failType); else @@ -1276,6 +1300,17 @@ NetworkOPsImp::doTransactionSync( transaction->setApplying(); } + doTransactionSyncBatch( + lock, [&transaction](std::unique_lock& lock) { + return transaction->getApplying(); + }); +} + +void +NetworkOPsImp::doTransactionSyncBatch( + std::unique_lock& lock, + std::function&)> retryCallback) +{ do { if (mDispatchState == DispatchState::running) @@ -1298,7 +1333,59 @@ NetworkOPsImp::doTransactionSync( } } } - } while (transaction->getApplying()); + } while (retryCallback(lock)); +} + +void +NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set) +{ + auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet"); + std::vector transactions; + transactions.reserve(set.size()); + for (auto const& [_, tx] : set) + { + std::string reason; + auto transaction = std::make_shared(tx, reason, app_); + + if (transaction->getStatus() == INVALID) + { + if (!reason.empty()) + { + JLOG(m_journal.trace()) + << "Exception checking transaction: " << reason; + } + app_.getHashRouter().setFlags(tx->getTransactionID(), SF_BAD); + continue; + } + + // preProcessTransaction can change our pointer + if (!preProcessTransaction(transaction)) + continue; + + if (!transaction->getApplying()) + { + transactions.emplace_back(transaction, false, false, FailHard::no); + transaction->setApplying(); + } + } + + std::unique_lock lock(mMutex); + + if (mTransactions.empty()) + mTransactions.swap(transactions); + else + { + for (auto& t : transactions) + mTransactions.push_back(std::move(t)); + } + + doTransactionSyncBatch(lock, [&](std::unique_lock& lock) { + assert(lock.owns_lock()); + return std::any_of( + mTransactions.begin(), mTransactions.end(), [](auto const& t) { + return t.transaction->getApplying(); + }); + }); } void @@ -1401,15 +1488,20 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) << "Transaction is now included in open ledger"; e.transaction->setStatus(INCLUDED); + // Pop as many "reasonable" transactions for this account as + // possible. "Reasonable" means they have sequential sequence + // numbers, or use tickets. auto const& txCur = e.transaction->getSTransaction(); - auto const txNext = m_ledgerMaster.popAcctTransaction(txCur); - if (txNext) + auto txNext = m_ledgerMaster.popAcctTransaction(txCur); + while (txNext) { std::string reason; auto const trans = sterilize(*txNext); auto t = std::make_shared(trans, reason, app_); submit_held.emplace_back(t, false, false, FailHard::no); t->setApplying(); + + txNext = m_ledgerMaster.popAcctTransaction(trans); } } else if (e.result == tefPAST_SEQ) @@ -1432,16 +1524,54 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) e.transaction->setQueued(); e.transaction->setKept(); } - else if (isTerRetry(e.result)) + else if ( + isTerRetry(e.result) || isTelLocal(e.result) || + isTefFailure(e.result)) { if (e.failType != FailHard::yes) { - // transaction should be held - JLOG(m_journal.debug()) - << "Transaction should be held: " << e.result; - e.transaction->setStatus(HELD); - m_ledgerMaster.addHeldTransaction(e.transaction); - e.transaction->setKept(); + auto const lastLedgerSeq = + e.transaction->getSTransaction()->at( + ~sfLastLedgerSequence); + auto const ledgersLeft = lastLedgerSeq + ? *lastLedgerSeq - + m_ledgerMaster.getCurrentLedgerIndex() + : std::optional{}; + // If any of these conditions are met, the transaction can + // be held: + // 1. It was submitted locally. (Note that this flag is only + // true on the initial submission.) + // 2. The transaction has a LastLedgerSequence, and the + // LastLedgerSequence is fewer than LocalTxs::holdLedgers + // (5) ledgers into the future. (Remember that an + // unseated optional compares as less than all seated + // values, so it has to be checked explicitly first.) + // 3. The SF_HELD flag is not set on the txID. (setFlags + // checks before setting. If the flag is set, it returns + // false, which means it's been held once without one of + // the other conditions, so don't hold it again. Time's + // up!) + // + if (e.local || + (ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) || + app_.getHashRouter().setFlags( + e.transaction->getID(), SF_HELD)) + { + // transaction should be held + JLOG(m_journal.debug()) + << "Transaction should be held: " << e.result; + e.transaction->setStatus(HELD); + m_ledgerMaster.addHeldTransaction(e.transaction); + e.transaction->setKept(); + } + else + JLOG(m_journal.debug()) + << "Not holding transaction " + << e.transaction->getID() << ": " + << (e.local ? "local" : "network") << ", " + << "result: " << e.result << " ledgers left: " + << (ledgersLeft ? to_string(*ledgersLeft) + : "unspecified"); } } else diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index d53127ed3b6..f8c3c3df046 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -41,6 +41,7 @@ class Peer; class LedgerMaster; class Transaction; class ValidatorKeys; +class CanonicalTXSet; // This is the primary interface into the "client" portion of the program. // Code that wants to do normal operations on the network such as @@ -139,6 +140,15 @@ class NetworkOPs : public InfoSub::Source bool bLocal, FailHard failType) = 0; + /** + * Process a set of transactions synchronously, and ensuring that they are + * processed in one batch. + * + * @param set Transaction object set + */ + virtual void + processTransactionSet(CanonicalTXSet const& set) = 0; + //-------------------------------------------------------------------------- // // Owner functions diff --git a/src/test/app/HashRouter_test.cpp b/src/test/app/HashRouter_test.cpp index 96d14e824cf..f45692aa819 100644 --- a/src/test/app/HashRouter_test.cpp +++ b/src/test/app/HashRouter_test.cpp @@ -31,7 +31,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, 1s); uint256 const key1(1); uint256 const key2(2); @@ -68,7 +68,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, 1s); uint256 const key1(1); uint256 const key2(2); @@ -146,7 +146,7 @@ class HashRouter_test : public beast::unit_test::suite // Normal HashRouter using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, 1s); uint256 const key1(1); uint256 const key2(2); @@ -174,7 +174,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, 1s); uint256 const key1(1); BEAST_EXPECT(router.setFlags(key1, 10)); @@ -187,7 +187,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 1s); + HashRouter router(stopwatch, 50s, 1s); uint256 const key1(1); @@ -230,7 +230,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 5s); + HashRouter router(stopwatch, 5s, 1s); uint256 const key(1); HashRouter::PeerShortID peer = 1; int flags;