Skip to content

Commit

Permalink
Process held transactions through existing NetworkOPs batching:
Browse files Browse the repository at this point in the history
* Ensures that successful transactions are broadcast to peers,
  appropriate failed transactions are held for later attempts, fee
  changes are sent to subscribers, etc.
  • Loading branch information
ximinez committed Apr 10, 2024
1 parent 8590d37 commit fed03f7
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 29 deletions.
29 changes: 10 additions & 19 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,26 +548,17 @@ LedgerMaster::storeLedger(std::shared_ptr<Ledger const> ledger)
void
LedgerMaster::applyHeldTransactions()
{
std::lock_guard sl(m_mutex);
CanonicalTXSet const set = [this]() {
std::lock_guard sl(m_mutex);
// VFALCO NOTE The hash for an open ledger is undefined so we use
// something that is a reasonable substitute.
CanonicalTXSet set(app_.openLedger().current()->info().parentHash);
std::swap(mHeldTransactions, set);
return set;
}();

app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
bool any = false;
for (auto const& it : mHeldTransactions)
{
ApplyFlags flags = tapNONE;
auto const result =
app_.getTxQ().apply(app_, view, it.second, flags, j);
if (result.second)
any = true;
}
return any;
});

// VFALCO TODO recreate the CanonicalTxSet object instead of resetting
// it.
// VFALCO NOTE The hash for an open ledger is undefined so we use
// something that is a reasonable substitute.
mHeldTransactions.reset(app_.openLedger().current()->info().parentHash);
if (!set.empty())
app_.getOPs().processTransactionSet(set);
}

std::shared_ptr<STTx const>
Expand Down
107 changes: 97 additions & 10 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ class NetworkOPsImp final : public NetworkOPs
bool bLocal,
FailHard failType) override;

void
processTransactionSet(CanonicalTXSet const& set) override;

/**
* For transactions submitted directly by a client, apply batch of
* transactions and wait for this transaction to complete.
Expand Down Expand Up @@ -317,6 +320,16 @@ class NetworkOPsImp final : public NetworkOPs
bool bUnlimited,
FailHard failtype);

private:
bool
preProcessTransaction(std::shared_ptr<Transaction>& transaction);

void
doTransactionSyncBatch(
std::unique_lock<std::mutex>& lock,
std::function<bool(std::unique_lock<std::mutex>&)> retryCallback);

public:
/**
* Apply transactions in batches. Continue until none are queued.
*/
Expand Down Expand Up @@ -1187,14 +1200,9 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
});
}

void
NetworkOPsImp::processTransaction(
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
bool bLocal,
FailHard failType)
bool
NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
{
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());

if ((newFlags & SF_BAD) != 0)
Expand All @@ -1203,7 +1211,7 @@ NetworkOPsImp::processTransaction(
JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
return;
return false;
}

// NOTE eahennis - I think this check is redundant,
Expand All @@ -1224,12 +1232,28 @@ NetworkOPsImp::processTransaction(
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
app_.getHashRouter().setFlags(transaction->getID(), SF_BAD);
return;
return false;
}

// canonicalize can change our pointer
app_.getMasterTransaction().canonicalize(&transaction);

return true;
}

void
NetworkOPsImp::processTransaction(
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
bool bLocal,
FailHard failType)
{
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");

// preProcessTransaction can change our pointer
if (!preProcessTransaction(transaction))
return;

if (bLocal)
doTransactionSync(transaction, bUnlimited, failType);
else
Expand Down Expand Up @@ -1276,6 +1300,17 @@ NetworkOPsImp::doTransactionSync(
transaction->setApplying();
}

doTransactionSyncBatch(
lock, [&transaction](std::unique_lock<std::mutex>& lock) {
return transaction->getApplying();
});
}

void
NetworkOPsImp::doTransactionSyncBatch(
std::unique_lock<std::mutex>& lock,
std::function<bool(std::unique_lock<std::mutex>&)> retryCallback)
{
do
{
if (mDispatchState == DispatchState::running)
Expand All @@ -1298,7 +1333,59 @@ NetworkOPsImp::doTransactionSync(
}
}
}
} while (transaction->getApplying());
} while (retryCallback(lock));
}

void
NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
{
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet");
std::vector<TransactionStatus> transactions;
transactions.reserve(set.size());
for (auto const& [_, tx] : set)
{
std::string reason;
auto transaction = std::make_shared<Transaction>(tx, reason, app_);

if (transaction->getStatus() == INVALID)
{
if (!reason.empty())
{
JLOG(m_journal.trace())
<< "Exception checking transaction: " << reason;
}
app_.getHashRouter().setFlags(tx->getTransactionID(), SF_BAD);
continue;
}

// preProcessTransaction can change our pointer
if (!preProcessTransaction(transaction))
continue;

if (!transaction->getApplying())
{
transactions.emplace_back(transaction, false, false, FailHard::no);
transaction->setApplying();
}
}

std::unique_lock lock(mMutex);

if (mTransactions.empty())
mTransactions.swap(transactions);
else
{
for (auto& t : transactions)
mTransactions.push_back(std::move(t));
}

doTransactionSyncBatch(lock, [&](std::unique_lock<std::mutex>& lock) {
assert(lock.owns_lock());
return std::any_of(
mTransactions.begin(), mTransactions.end(), [](auto const& t) {
return t.transaction->getApplying();
});
});
}

void
Expand Down
10 changes: 10 additions & 0 deletions src/ripple/app/misc/NetworkOPs.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Peer;
class LedgerMaster;
class Transaction;
class ValidatorKeys;
class CanonicalTXSet;

// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
Expand Down Expand Up @@ -139,6 +140,15 @@ class NetworkOPs : public InfoSub::Source
bool bLocal,
FailHard failType) = 0;

/**
* Process a set of transactions synchronously, and ensuring that they are
* processed in one batch.
*
* @param set Transaction object set
*/
virtual void
processTransactionSet(CanonicalTXSet const& set) = 0;

//--------------------------------------------------------------------------
//
// Owner functions
Expand Down

0 comments on commit fed03f7

Please sign in to comment.