17
17
#include < stdint.h>
18
18
#include < string>
19
19
#include < unordered_set>
20
+ #include < utility>
20
21
#include < vector>
21
22
22
23
#include " db/builder.h"
32
33
#include " db/prefix_filter_iterator.h"
33
34
#include " db/table_cache.h"
34
35
#include " db/table_properties_collector.h"
36
+ #include " db/tailing_iter.h"
35
37
#include " db/transaction_log_impl.h"
36
38
#include " db/version_set.h"
37
39
#include " db/write_batch_internal.h"
@@ -267,6 +269,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
267
269
mem_(new MemTable(internal_comparator_, options_)),
268
270
logfile_number_(0 ),
269
271
super_version_(nullptr ),
272
+ super_version_number_(0 ),
270
273
tmp_batch_(),
271
274
bg_compaction_scheduled_(0 ),
272
275
bg_manual_only_(0 ),
@@ -1290,10 +1293,15 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
1290
1293
return s;
1291
1294
}
1292
1295
1293
- void DBImpl::CompactRange (const ColumnFamilyHandle& column_family,
1294
- const Slice* begin, const Slice* end,
1295
- bool reduce_level, int target_level) {
1296
- FlushMemTable (FlushOptions ());
1296
+ Status DBImpl::CompactRange (const ColumnFamilyHandle& column_family,
1297
+ const Slice* begin, const Slice* end,
1298
+ bool reduce_level, int target_level) {
1299
+ Status s = FlushMemTable (FlushOptions ());
1300
+ if (!s.ok ()) {
1301
+ LogFlush (options_.info_log );
1302
+ return s;
1303
+ }
1304
+
1297
1305
int max_level_with_files = 1 ;
1298
1306
{
1299
1307
MutexLock l (&mutex_);
@@ -1309,16 +1317,22 @@ void DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
1309
1317
// bottom-most level, the output level will be the same as input one
1310
1318
if (options_.compaction_style == kCompactionStyleUniversal ||
1311
1319
level == max_level_with_files) {
1312
- RunManualCompaction (level, level, begin, end);
1320
+ s = RunManualCompaction (level, level, begin, end);
1313
1321
} else {
1314
- RunManualCompaction (level, level + 1 , begin, end);
1322
+ s = RunManualCompaction (level, level + 1 , begin, end);
1323
+ }
1324
+ if (!s.ok ()) {
1325
+ LogFlush (options_.info_log );
1326
+ return s;
1315
1327
}
1316
1328
}
1317
1329
1318
1330
if (reduce_level) {
1319
- ReFitLevel (max_level_with_files, target_level);
1331
+ s = ReFitLevel (max_level_with_files, target_level);
1320
1332
}
1321
1333
LogFlush (options_.info_log );
1334
+
1335
+ return s;
1322
1336
}
1323
1337
1324
1338
// return the same level if it cannot be moved
@@ -1337,7 +1351,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) {
1337
1351
return minimum_level;
1338
1352
}
1339
1353
1340
- void DBImpl::ReFitLevel (int level, int target_level) {
1354
+ Status DBImpl::ReFitLevel (int level, int target_level) {
1341
1355
assert (level < NumberLevels ());
1342
1356
1343
1357
SuperVersion* superversion_to_free = nullptr ;
@@ -1351,7 +1365,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
1351
1365
mutex_.Unlock ();
1352
1366
Log (options_.info_log , " ReFitLevel: another thread is refitting" );
1353
1367
delete new_superversion;
1354
- return ;
1368
+ return Status::NotSupported ( " another thread is refitting " ) ;
1355
1369
}
1356
1370
refitting_level_ = true ;
1357
1371
@@ -1372,6 +1386,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
1372
1386
1373
1387
assert (to_level <= level);
1374
1388
1389
+ Status status;
1375
1390
if (to_level < level) {
1376
1391
Log (options_.info_log , " Before refitting:\n %s" ,
1377
1392
versions_->current ()->DebugString ().data ());
@@ -1385,7 +1400,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
1385
1400
Log (options_.info_log , " Apply version edit:\n %s" ,
1386
1401
edit.DebugString ().data ());
1387
1402
1388
- auto status = versions_->LogAndApply (&edit, &mutex_);
1403
+ status = versions_->LogAndApply (&edit, &mutex_);
1389
1404
superversion_to_free = InstallSuperVersion (new_superversion);
1390
1405
new_superversion = nullptr ;
1391
1406
@@ -1403,6 +1418,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
1403
1418
mutex_.Unlock ();
1404
1419
delete superversion_to_free;
1405
1420
delete new_superversion;
1421
+ return status;
1406
1422
}
1407
1423
1408
1424
int DBImpl::NumberLevels (const ColumnFamilyHandle& column_family) {
@@ -1417,6 +1433,10 @@ int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) {
1417
1433
return options_.level0_stop_writes_trigger ;
1418
1434
}
1419
1435
1436
+ uint64_t DBImpl::CurrentVersionNumber () const {
1437
+ return super_version_number_.load ();
1438
+ }
1439
+
1420
1440
Status DBImpl::Flush (const FlushOptions& options,
1421
1441
const ColumnFamilyHandle& column_family) {
1422
1442
Status status = FlushMemTable (options);
@@ -1612,10 +1632,10 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path,
1612
1632
return status;
1613
1633
}
1614
1634
1615
- void DBImpl::RunManualCompaction (int input_level,
1616
- int output_level,
1617
- const Slice* begin,
1618
- const Slice* end) {
1635
+ Status DBImpl::RunManualCompaction (int input_level,
1636
+ int output_level,
1637
+ const Slice* begin,
1638
+ const Slice* end) {
1619
1639
assert (input_level >= 0 );
1620
1640
1621
1641
InternalKey begin_storage, end_storage;
@@ -1682,15 +1702,16 @@ void DBImpl::RunManualCompaction(int input_level,
1682
1702
assert (!manual.in_progress );
1683
1703
assert (bg_manual_only_ > 0 );
1684
1704
--bg_manual_only_;
1705
+ return manual.status ;
1685
1706
}
1686
1707
1687
- void DBImpl::TEST_CompactRange (int level,
1688
- const Slice* begin,
1689
- const Slice* end) {
1708
+ Status DBImpl::TEST_CompactRange (int level,
1709
+ const Slice* begin,
1710
+ const Slice* end) {
1690
1711
int output_level = (options_.compaction_style == kCompactionStyleUniversal )
1691
1712
? level
1692
1713
: level + 1 ;
1693
- RunManualCompaction (level, output_level, begin, end);
1714
+ return RunManualCompaction (level, output_level, begin, end);
1694
1715
}
1695
1716
1696
1717
Status DBImpl::FlushMemTable (const FlushOptions& options) {
@@ -1989,6 +2010,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
1989
2010
if (is_manual) {
1990
2011
ManualCompaction* m = manual_compaction_;
1991
2012
if (!status.ok ()) {
2013
+ m->status = status;
1992
2014
m->done = true ;
1993
2015
}
1994
2016
// For universal compaction:
@@ -2657,11 +2679,14 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
2657
2679
deletion_state.memtables_to_free .push_back (m);
2658
2680
}
2659
2681
}
2660
- state->version ->Unref ();
2682
+ if (state->version ) { // not set for memtable-only iterator
2683
+ state->version ->Unref ();
2684
+ }
2661
2685
// fast path FindObsoleteFiles
2662
2686
state->db ->FindObsoleteFiles (deletion_state, false , true );
2663
2687
state->mu ->Unlock ();
2664
2688
state->db ->PurgeObsoleteFiles (deletion_state);
2689
+
2665
2690
delete state;
2666
2691
}
2667
2692
} // namespace
@@ -2683,18 +2708,20 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
2683
2708
for (unsigned int i = 0 ; i < immutables.size (); i++) {
2684
2709
immutables[i]->Ref ();
2685
2710
}
2686
- // Collect iterators for files in L0 - Ln
2687
2711
versions_->current ()->Ref ();
2688
2712
version = versions_->current ();
2689
2713
mutex_.Unlock ();
2690
2714
2691
2715
std::vector<Iterator*> list;
2692
2716
list.push_back (mutable_mem->NewIterator (options));
2693
2717
cleanup->mem .push_back (mutable_mem);
2718
+
2719
+ // Collect all needed child iterators for immutable memtables
2694
2720
for (MemTable* m : immutables) {
2695
2721
list.push_back (m->NewIterator (options));
2696
2722
cleanup->mem .push_back (m);
2697
2723
}
2724
+ // Collect iterators for files in L0 - Ln
2698
2725
version->AddIterators (options, storage_options_, &list);
2699
2726
Iterator* internal_iter =
2700
2727
NewMergingIterator (&internal_comparator_, &list[0 ], list.size ());
@@ -2711,6 +2738,66 @@ Iterator* DBImpl::TEST_NewInternalIterator() {
2711
2738
return NewInternalIterator (ReadOptions (), &ignored);
2712
2739
}
2713
2740
2741
+ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair (
2742
+ const ReadOptions& options,
2743
+ uint64_t * superversion_number) {
2744
+
2745
+ MemTable* mutable_mem;
2746
+ std::vector<MemTable*> immutables;
2747
+ Version* version;
2748
+
2749
+ immutables.reserve (options_.max_write_buffer_number );
2750
+
2751
+ // get all child iterators and bump their refcounts under lock
2752
+ mutex_.Lock ();
2753
+ mutable_mem = mem_;
2754
+ mutable_mem->Ref ();
2755
+ imm_.GetMemTables (&immutables);
2756
+ for (size_t i = 0 ; i < immutables.size (); ++i) {
2757
+ immutables[i]->Ref ();
2758
+ }
2759
+ version = versions_->current ();
2760
+ version->Ref ();
2761
+ if (superversion_number != nullptr ) {
2762
+ *superversion_number = CurrentVersionNumber ();
2763
+ }
2764
+ mutex_.Unlock ();
2765
+
2766
+ Iterator* mutable_iter = mutable_mem->NewIterator (options);
2767
+ IterState* mutable_cleanup = new IterState ();
2768
+ mutable_cleanup->mem .push_back (mutable_mem);
2769
+ mutable_cleanup->db = this ;
2770
+ mutable_cleanup->mu = &mutex_;
2771
+ mutable_iter->RegisterCleanup (CleanupIteratorState, mutable_cleanup, nullptr );
2772
+
2773
+ // create a DBIter that only uses memtable content; see NewIterator()
2774
+ mutable_iter = NewDBIterator (&dbname_, env_, options_, user_comparator (),
2775
+ mutable_iter, kMaxSequenceNumber );
2776
+
2777
+ Iterator* immutable_iter;
2778
+ IterState* immutable_cleanup = new IterState ();
2779
+ std::vector<Iterator*> list;
2780
+ for (MemTable* m : immutables) {
2781
+ list.push_back (m->NewIterator (options));
2782
+ immutable_cleanup->mem .push_back (m);
2783
+ }
2784
+ version->AddIterators (options, storage_options_, &list);
2785
+ immutable_cleanup->version = version;
2786
+ immutable_cleanup->db = this ;
2787
+ immutable_cleanup->mu = &mutex_;
2788
+
2789
+ immutable_iter =
2790
+ NewMergingIterator (&internal_comparator_, &list[0 ], list.size ());
2791
+ immutable_iter->RegisterCleanup (CleanupIteratorState, immutable_cleanup,
2792
+ nullptr );
2793
+
2794
+ // create a DBIter that only uses memtable content; see NewIterator()
2795
+ immutable_iter = NewDBIterator (&dbname_, env_, options_, user_comparator (),
2796
+ immutable_iter, kMaxSequenceNumber );
2797
+
2798
+ return std::make_pair (mutable_iter, immutable_iter);
2799
+ }
2800
+
2714
2801
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes () {
2715
2802
MutexLock l (&mutex_);
2716
2803
return versions_->current ()->MaxNextLevelOverlappingBytes ();
@@ -2753,6 +2840,7 @@ DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
2753
2840
new_superversion->Init (mem_, imm_, versions_->current ());
2754
2841
SuperVersion* old_superversion = super_version_;
2755
2842
super_version_ = new_superversion;
2843
+ ++super_version_number_;
2756
2844
if (old_superversion != nullptr && old_superversion->Unref ()) {
2757
2845
old_superversion->Cleanup ();
2758
2846
return old_superversion; // will let caller delete outside of mutex
@@ -2975,13 +3063,21 @@ bool DBImpl::KeyMayExist(const ReadOptions& options,
2975
3063
2976
3064
Iterator* DBImpl::NewIterator (const ReadOptions& options,
2977
3065
const ColumnFamilyHandle& column_family) {
2978
- SequenceNumber latest_snapshot;
2979
- Iterator* iter = NewInternalIterator (options, &latest_snapshot);
2980
- iter = NewDBIterator (
2981
- &dbname_, env_, options_, user_comparator (), iter,
2982
- (options.snapshot != nullptr
2983
- ? reinterpret_cast <const SnapshotImpl*>(options.snapshot )->number_
2984
- : latest_snapshot));
3066
+ Iterator* iter;
3067
+
3068
+ if (options.tailing ) {
3069
+ iter = new TailingIterator (this , options, user_comparator ());
3070
+ } else {
3071
+ SequenceNumber latest_snapshot;
3072
+ iter = NewInternalIterator (options, &latest_snapshot);
3073
+
3074
+ iter = NewDBIterator (
3075
+ &dbname_, env_, options_, user_comparator (), iter,
3076
+ (options.snapshot != nullptr
3077
+ ? reinterpret_cast <const SnapshotImpl*>(options.snapshot )->number_
3078
+ : latest_snapshot));
3079
+ }
3080
+
2985
3081
if (options.prefix ) {
2986
3082
// use extra wrapper to exclude any keys from the results which
2987
3083
// don't begin with the prefix
0 commit comments