From 6faaa91850d6b2eb9fbf16c1256bf7ef11ac4646 Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Wed, 12 Jan 2022 21:03:57 -0800 Subject: [PATCH] Improve asynchronous database handlers: This commit optimizes the way asynchronous nodestore operations are processed both by reducing the amount of time locks are held and by minimizing the number of memory allocations and data copying. --- src/ripple/nodestore/Database.h | 7 +- src/ripple/nodestore/impl/Database.cpp | 127 +++++++++++++------------ 2 files changed, 70 insertions(+), 64 deletions(-) diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index 6cea6d46ced..bb9304507d9 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -366,11 +366,8 @@ class Database std::function const&)>>>> read_; - // last read - uint256 readLastHash_; - - std::vector readThreads_; - bool readStopping_{false}; + std::atomic readStopping_ = false; + std::atomic readThreads_ = 0; virtual std::shared_ptr fetchNodeObject( diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index da062a682da..bf28f5bfbfb 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -43,15 +43,76 @@ Database::Database( , earliestLedgerSeq_( get(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ)) , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_) + , readThreads_(std::min(1, readThreads)) { + assert(readThreads != 0); + if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) Throw("Invalid ledgers_per_shard"); if (earliestLedgerSeq_ < 1) Throw("Invalid earliest_seq"); - while (readThreads-- > 0) - readThreads_.emplace_back(&Database::threadEntry, this); + for (int i = 0; i != readThreads_.load(); ++i) + { + std::thread t( + [this](int i) { + beast::setCurrentThreadName( + "db prefetch #" + std::to_string(i)); + + decltype(read_) read; + + while (!isStopping()) + { + { + std::unique_lock lock(readLock_); + + if (read_.empty()) + readCondVar_.wait(lock); + + if (isStopping()) + continue; + + // We extract up to 64 objects to minimize the overhead + // of acquiring the mutex. + for (int cnt = 0; !read_.empty() && cnt != 64; ++cnt) + read.insert(read_.extract(read_.begin())); + } + + for (auto it = read.begin(); it != read.end(); ++it) + { + assert(!it->second.empty()); + + auto const& hash = it->first; + auto const& data = std::move(it->second); + auto const seqn = data[0].first; + + auto obj = + fetchNodeObject(hash, seqn, FetchType::async); + + // This could be further optimized: if there are + // multiple requests for sequence numbers mapping to + // multiple databases by sorting requests such that all + // indices mapping to the same database are grouped + // together and serviced by a single read. + for (auto const& req : data) + { + req.second( + (seqn == req.first) || isSameDB(req.first, seqn) + ? obj + : fetchNodeObject( + hash, req.first, FetchType::async)); + } + } + + read.clear(); + } + + --readThreads_; + }, + i); + t.detach(); + } } Database::~Database() @@ -68,8 +129,7 @@ Database::~Database() bool Database::isStopping() const { - std::lock_guard lock(readLock_); - return readStopping_; + return readStopping_.load(std::memory_order_relaxed); } std::uint32_t @@ -88,19 +148,15 @@ Database::maxLedgers(std::uint32_t shardIndex) const noexcept void Database::stop() { - // After stop time we can no longer use the JobQueue for background - // reads. Join the background read threads. + if (!readStopping_.exchange(true, std::memory_order_relaxed)) { std::lock_guard lock(readLock_); - if (readStopping_) // Only stop threads once. - return; - - readStopping_ = true; + read_.clear(); readCondVar_.notify_all(); } - for (auto& e : readThreads_) - e.join(); + while (readThreads_.load() != 0) + std::this_thread::yield(); } void @@ -280,53 +336,6 @@ Database::storeLedger( return true; } -// Entry point for async read threads -void -Database::threadEntry() -{ - beast::setCurrentThreadName("prefetch"); - while (true) - { - uint256 lastHash; - std::vector const&)>>> - entry; - - { - std::unique_lock lock(readLock_); - readCondVar_.wait( - lock, [this] { return readStopping_ || !read_.empty(); }); - if (readStopping_) - break; - - // Read in key order to make the back end more efficient - auto it = read_.lower_bound(readLastHash_); - if (it == read_.end()) - { - // start over from the beginning - it = read_.begin(); - } - lastHash = it->first; - entry = std::move(it->second); - read_.erase(it); - readLastHash_ = lastHash; - } - - auto seq = entry[0].first; - auto obj = fetchNodeObject(lastHash, seq, FetchType::async); - - for (auto const& req : entry) - { - if ((seq == req.first) || isSameDB(req.first, seq)) - req.second(obj); - else - req.second( - fetchNodeObject(lastHash, req.first, FetchType::async)); - } - } -} - void Database::getCountsJson(Json::Value& obj) {