Skip to content

Commit 81c9cc9

Browse files
committed
Tailing iterator
Summary: This diff implements a special type of iterator that doesn't create a snapshot (can be used to read newly inserted data) and is optimized for doing sequential reads. TailingIterator uses current superversion number to determine whether to invalidate its internal iterators. If the version hasn't changed, it can often avoid doing expensive seeks over immutable structures (sst files and immutable memtables). Test Plan: * new unit tests * running LD with this patch Reviewers: igor, dhruba, haobo, sdong, kailiu Reviewed By: sdong CC: leveldb, lovro, march Differential Revision: https://reviews.facebook.net/D15285
1 parent 4e91f27 commit 81c9cc9

File tree

6 files changed

+490
-11
lines changed

6 files changed

+490
-11
lines changed

db/db_impl.cc

+90-9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <stdint.h>
1818
#include <string>
1919
#include <unordered_set>
20+
#include <utility>
2021
#include <vector>
2122

2223
#include "db/builder.h"
@@ -32,6 +33,7 @@
3233
#include "db/prefix_filter_iterator.h"
3334
#include "db/table_cache.h"
3435
#include "db/table_properties_collector.h"
36+
#include "db/tailing_iter.h"
3537
#include "db/transaction_log_impl.h"
3638
#include "db/version_set.h"
3739
#include "db/write_batch_internal.h"
@@ -264,6 +266,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
264266
mem_(new MemTable(internal_comparator_, options_)),
265267
logfile_number_(0),
266268
super_version_(nullptr),
269+
super_version_number_(0),
267270
tmp_batch_(),
268271
bg_compaction_scheduled_(0),
269272
bg_manual_only_(0),
@@ -1413,6 +1416,10 @@ int DBImpl::Level0StopWriteTrigger() {
14131416
return options_.level0_stop_writes_trigger;
14141417
}
14151418

1419+
uint64_t DBImpl::CurrentVersionNumber() const {
1420+
return super_version_number_.load();
1421+
}
1422+
14161423
Status DBImpl::Flush(const FlushOptions& options) {
14171424
Status status = FlushMemTable(options);
14181425
return status;
@@ -2652,11 +2659,14 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
26522659
deletion_state.memtables_to_free.push_back(m);
26532660
}
26542661
}
2655-
state->version->Unref();
2662+
if (state->version) { // not set for memtable-only iterator
2663+
state->version->Unref();
2664+
}
26562665
// fast path FindObsoleteFiles
26572666
state->db->FindObsoleteFiles(deletion_state, false, true);
26582667
state->mu->Unlock();
26592668
state->db->PurgeObsoleteFiles(deletion_state);
2669+
26602670
delete state;
26612671
}
26622672
} // namespace
@@ -2678,18 +2688,20 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
26782688
for (unsigned int i = 0; i < immutables.size(); i++) {
26792689
immutables[i]->Ref();
26802690
}
2681-
// Collect iterators for files in L0 - Ln
26822691
versions_->current()->Ref();
26832692
version = versions_->current();
26842693
mutex_.Unlock();
26852694

26862695
std::vector<Iterator*> list;
26872696
list.push_back(mutable_mem->NewIterator(options));
26882697
cleanup->mem.push_back(mutable_mem);
2698+
2699+
// Collect all needed child iterators for immutable memtables
26892700
for (MemTable* m : immutables) {
26902701
list.push_back(m->NewIterator(options));
26912702
cleanup->mem.push_back(m);
26922703
}
2704+
// Collect iterators for files in L0 - Ln
26932705
version->AddIterators(options, storage_options_, &list);
26942706
Iterator* internal_iter =
26952707
NewMergingIterator(&internal_comparator_, &list[0], list.size());
@@ -2706,6 +2718,66 @@ Iterator* DBImpl::TEST_NewInternalIterator() {
27062718
return NewInternalIterator(ReadOptions(), &ignored);
27072719
}
27082720

2721+
std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
2722+
const ReadOptions& options,
2723+
uint64_t* superversion_number) {
2724+
2725+
MemTable* mutable_mem;
2726+
std::vector<MemTable*> immutables;
2727+
Version* version;
2728+
2729+
immutables.reserve(options_.max_write_buffer_number);
2730+
2731+
// get all child iterators and bump their refcounts under lock
2732+
mutex_.Lock();
2733+
mutable_mem = mem_;
2734+
mutable_mem->Ref();
2735+
imm_.GetMemTables(&immutables);
2736+
for (size_t i = 0; i < immutables.size(); ++i) {
2737+
immutables[i]->Ref();
2738+
}
2739+
version = versions_->current();
2740+
version->Ref();
2741+
if (superversion_number != nullptr) {
2742+
*superversion_number = CurrentVersionNumber();
2743+
}
2744+
mutex_.Unlock();
2745+
2746+
Iterator* mutable_iter = mutable_mem->NewIterator(options);
2747+
IterState* mutable_cleanup = new IterState();
2748+
mutable_cleanup->mem.push_back(mutable_mem);
2749+
mutable_cleanup->db = this;
2750+
mutable_cleanup->mu = &mutex_;
2751+
mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr);
2752+
2753+
// create a DBIter that only uses memtable content; see NewIterator()
2754+
mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
2755+
mutable_iter, kMaxSequenceNumber);
2756+
2757+
Iterator* immutable_iter;
2758+
IterState* immutable_cleanup = new IterState();
2759+
std::vector<Iterator*> list;
2760+
for (MemTable* m : immutables) {
2761+
list.push_back(m->NewIterator(options));
2762+
immutable_cleanup->mem.push_back(m);
2763+
}
2764+
version->AddIterators(options, storage_options_, &list);
2765+
immutable_cleanup->version = version;
2766+
immutable_cleanup->db = this;
2767+
immutable_cleanup->mu = &mutex_;
2768+
2769+
immutable_iter =
2770+
NewMergingIterator(&internal_comparator_, &list[0], list.size());
2771+
immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup,
2772+
nullptr);
2773+
2774+
// create a DBIter that only uses memtable content; see NewIterator()
2775+
immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
2776+
immutable_iter, kMaxSequenceNumber);
2777+
2778+
return std::make_pair(mutable_iter, immutable_iter);
2779+
}
2780+
27092781
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
27102782
MutexLock l(&mutex_);
27112783
return versions_->current()->MaxNextLevelOverlappingBytes();
@@ -2748,6 +2820,7 @@ DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
27482820
new_superversion->Init(mem_, imm_, versions_->current());
27492821
SuperVersion* old_superversion = super_version_;
27502822
super_version_ = new_superversion;
2823+
++super_version_number_;
27512824
if (old_superversion != nullptr && old_superversion->Unref()) {
27522825
old_superversion->Cleanup();
27532826
return old_superversion; // will let caller delete outside of mutex
@@ -2930,13 +3003,21 @@ bool DBImpl::KeyMayExist(const ReadOptions& options,
29303003
}
29313004

29323005
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
2933-
SequenceNumber latest_snapshot;
2934-
Iterator* iter = NewInternalIterator(options, &latest_snapshot);
2935-
iter = NewDBIterator(
2936-
&dbname_, env_, options_, user_comparator(), iter,
2937-
(options.snapshot != nullptr
2938-
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
2939-
: latest_snapshot));
3006+
Iterator* iter;
3007+
3008+
if (options.tailing) {
3009+
iter = new TailingIterator(this, options, user_comparator());
3010+
} else {
3011+
SequenceNumber latest_snapshot;
3012+
iter = NewInternalIterator(options, &latest_snapshot);
3013+
3014+
iter = NewDBIterator(
3015+
&dbname_, env_, options_, user_comparator(), iter,
3016+
(options.snapshot != nullptr
3017+
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
3018+
: latest_snapshot));
3019+
}
3020+
29403021
if (options.prefix) {
29413022
// use extra wrapper to exclude any keys from the results which
29423023
// don't begin with the prefix

db/db_impl.h

+18
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <atomic>
1111
#include <deque>
1212
#include <set>
13+
#include <utility>
1314
#include <vector>
1415
#include "db/dbformat.h"
1516
#include "db/log_writer.h"
@@ -256,6 +257,7 @@ class DBImpl : public DB {
256257

257258
private:
258259
friend class DB;
260+
friend class TailingIterator;
259261
struct CompactionState;
260262
struct Writer;
261263

@@ -359,6 +361,17 @@ class DBImpl : public DB {
359361
// hold the data set.
360362
void ReFitLevel(int level, int target_level = -1);
361363

364+
// Returns the current SuperVersion number.
365+
uint64_t CurrentVersionNumber() const;
366+
367+
// Returns a pair of iterators (mutable-only and immutable-only) used
368+
// internally by TailingIterator and stores CurrentVersionNumber() in
369+
// *superversion_number. These iterators are always up-to-date, i.e. can
370+
// be used to read new data.
371+
std::pair<Iterator*, Iterator*> GetTailingIteratorPair(
372+
const ReadOptions& options,
373+
uint64_t* superversion_number);
374+
362375
// Constant after construction
363376
const InternalFilterPolicy internal_filter_policy_;
364377
bool owns_info_log_;
@@ -381,6 +394,11 @@ class DBImpl : public DB {
381394

382395
SuperVersion* super_version_;
383396

397+
// An ordinal representing the current SuperVersion. Updated by
398+
// InstallSuperVersion(), i.e. incremented every time super_version_
399+
// changes.
400+
std::atomic<uint64_t> super_version_number_;
401+
384402
std::string host_name_;
385403

386404
// Queue of writers.

db/db_test.cc

+112
Original file line numberDiff line numberDiff line change
@@ -5034,6 +5034,118 @@ void BM_LogAndApply(int iters, int num_base_files) {
50345034
buf, iters, us, ((float)us) / iters);
50355035
}
50365036

5037+
TEST(DBTest, TailingIteratorSingle) {
5038+
ReadOptions read_options;
5039+
read_options.tailing = true;
5040+
5041+
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
5042+
iter->SeekToFirst();
5043+
ASSERT_TRUE(!iter->Valid());
5044+
5045+
// add a record and check that iter can see it
5046+
ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor"));
5047+
iter->SeekToFirst();
5048+
ASSERT_TRUE(iter->Valid());
5049+
ASSERT_EQ(iter->key().ToString(), "mirko");
5050+
5051+
iter->Next();
5052+
ASSERT_TRUE(!iter->Valid());
5053+
}
5054+
5055+
TEST(DBTest, TailingIteratorKeepAdding) {
5056+
ReadOptions read_options;
5057+
read_options.tailing = true;
5058+
5059+
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
5060+
std::string value(1024, 'a');
5061+
5062+
const int num_records = 10000;
5063+
for (int i = 0; i < num_records; ++i) {
5064+
char buf[32];
5065+
snprintf(buf, sizeof(buf), "%016d", i);
5066+
5067+
Slice key(buf, 16);
5068+
ASSERT_OK(db_->Put(WriteOptions(), key, value));
5069+
5070+
iter->Seek(key);
5071+
ASSERT_TRUE(iter->Valid());
5072+
ASSERT_EQ(iter->key().compare(key), 0);
5073+
}
5074+
}
5075+
5076+
TEST(DBTest, TailingIteratorDeletes) {
5077+
ReadOptions read_options;
5078+
read_options.tailing = true;
5079+
5080+
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
5081+
5082+
// write a single record, read it using the iterator, then delete it
5083+
ASSERT_OK(db_->Put(WriteOptions(), "0test", "test"));
5084+
iter->SeekToFirst();
5085+
ASSERT_TRUE(iter->Valid());
5086+
ASSERT_EQ(iter->key().ToString(), "0test");
5087+
ASSERT_OK(db_->Delete(WriteOptions(), "0test"));
5088+
5089+
// write many more records
5090+
const int num_records = 10000;
5091+
std::string value(1024, 'A');
5092+
5093+
for (int i = 0; i < num_records; ++i) {
5094+
char buf[32];
5095+
snprintf(buf, sizeof(buf), "1%015d", i);
5096+
5097+
Slice key(buf, 16);
5098+
ASSERT_OK(db_->Put(WriteOptions(), key, value));
5099+
}
5100+
5101+
// force a flush to make sure that no records are read from memtable
5102+
dbfull()->TEST_FlushMemTable();
5103+
5104+
// skip "0test"
5105+
iter->Next();
5106+
5107+
// make sure we can read all new records using the existing iterator
5108+
int count = 0;
5109+
for (; iter->Valid(); iter->Next(), ++count) ;
5110+
5111+
ASSERT_EQ(count, num_records);
5112+
}
5113+
5114+
TEST(DBTest, TailingIteratorPrefixSeek) {
5115+
ReadOptions read_options;
5116+
read_options.tailing = true;
5117+
read_options.prefix_seek = true;
5118+
5119+
auto prefix_extractor = NewFixedPrefixTransform(2);
5120+
5121+
Options options = CurrentOptions();
5122+
options.env = env_;
5123+
options.create_if_missing = true;
5124+
options.disable_auto_compactions = true;
5125+
options.prefix_extractor = prefix_extractor;
5126+
options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor));
5127+
DestroyAndReopen(&options);
5128+
5129+
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
5130+
ASSERT_OK(db_->Put(WriteOptions(), "0101", "test"));
5131+
5132+
dbfull()->TEST_FlushMemTable();
5133+
5134+
ASSERT_OK(db_->Put(WriteOptions(), "0202", "test"));
5135+
5136+
// Seek(0102) shouldn't find any records since 0202 has a different prefix
5137+
iter->Seek("0102");
5138+
ASSERT_TRUE(!iter->Valid());
5139+
5140+
iter->Seek("0202");
5141+
ASSERT_TRUE(iter->Valid());
5142+
ASSERT_EQ(iter->key().ToString(), "0202");
5143+
5144+
iter->Next();
5145+
ASSERT_TRUE(!iter->Valid());
5146+
}
5147+
5148+
50375149
} // namespace rocksdb
50385150

50395151
int main(int argc, char** argv) {

0 commit comments

Comments
 (0)