@@ -267,6 +267,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
267
267
bg_cv_(&mutex_),
268
268
mem_rep_factory_(options_.memtable_factory.get()),
269
269
mem_(new MemTable(internal_comparator_, options_)),
270
+ imm_(options_.min_write_buffer_number_to_merge),
270
271
logfile_number_(0 ),
271
272
super_version_(nullptr ),
272
273
super_version_number_(0 ),
@@ -363,7 +364,7 @@ DBImpl::~DBImpl() {
363
364
delete mem_->Unref ();
364
365
}
365
366
366
- imm_.UnrefAll (&to_delete);
367
+ imm_.current ()-> Unref (&to_delete);
367
368
for (MemTable* m: to_delete) {
368
369
delete m;
369
370
}
@@ -511,21 +512,21 @@ bool DBImpl::SuperVersion::Unref() {
511
512
512
513
void DBImpl::SuperVersion::Cleanup () {
513
514
assert (refs.load (std::memory_order_relaxed) == 0 );
514
- imm. UnrefAll (&to_delete);
515
+ imm-> Unref (&to_delete);
515
516
MemTable* m = mem->Unref ();
516
517
if (m != nullptr ) {
517
518
to_delete.push_back (m);
518
519
}
519
520
current->Unref ();
520
521
}
521
522
522
- void DBImpl::SuperVersion::Init (MemTable* new_mem, const MemTableList& new_imm,
523
+ void DBImpl::SuperVersion::Init (MemTable* new_mem, MemTableListVersion* new_imm,
523
524
Version* new_current) {
524
525
mem = new_mem;
525
526
imm = new_imm;
526
527
current = new_current;
527
528
mem->Ref ();
528
- imm. RefAll ();
529
+ imm-> Ref ();
529
530
current->Ref ();
530
531
refs.store (1 , std::memory_order_relaxed);
531
532
}
@@ -1226,7 +1227,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
1226
1227
mutex_.AssertHeld ();
1227
1228
assert (imm_.size () != 0 );
1228
1229
1229
- if (!imm_.IsFlushPending (options_. min_write_buffer_number_to_merge )) {
1230
+ if (!imm_.IsFlushPending ()) {
1230
1231
Log (options_.info_log , " FlushMemTableToOutputFile already in progress" );
1231
1232
Status s = Status::IOError (" FlushMemTableToOutputFile already in progress" );
1232
1233
return s;
@@ -1767,8 +1768,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
1767
1768
} else if (shutting_down_.Acquire_Load ()) {
1768
1769
// DB is being deleted; no more background compactions
1769
1770
} else {
1770
- bool is_flush_pending =
1771
- imm_.IsFlushPending (options_.min_write_buffer_number_to_merge );
1771
+ bool is_flush_pending = imm_.IsFlushPending ();
1772
1772
if (is_flush_pending &&
1773
1773
(bg_flush_scheduled_ < options_.max_background_flushes )) {
1774
1774
// memtable flush needed
@@ -1803,8 +1803,7 @@ void DBImpl::BGWorkCompaction(void* db) {
1803
1803
Status DBImpl::BackgroundFlush (bool * madeProgress,
1804
1804
DeletionState& deletion_state) {
1805
1805
Status stat;
1806
- while (stat.ok () &&
1807
- imm_.IsFlushPending (options_.min_write_buffer_number_to_merge )) {
1806
+ while (stat.ok () && imm_.IsFlushPending ()) {
1808
1807
Log (options_.info_log ,
1809
1808
" BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d" ,
1810
1809
options_.max_background_flushes - bg_flush_scheduled_);
@@ -1924,7 +1923,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
1924
1923
mutex_.AssertHeld ();
1925
1924
1926
1925
// TODO: remove memtable flush from formal compaction
1927
- while (imm_.IsFlushPending (options_. min_write_buffer_number_to_merge )) {
1926
+ while (imm_.IsFlushPending ()) {
1928
1927
Log (options_.info_log ,
1929
1928
" BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots "
1930
1929
" available %d" ,
@@ -2330,7 +2329,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
2330
2329
const uint64_t imm_start = env_->NowMicros ();
2331
2330
LogFlush (options_.info_log );
2332
2331
mutex_.Lock ();
2333
- if (imm_.IsFlushPending (options_. min_write_buffer_number_to_merge )) {
2332
+ if (imm_.IsFlushPending ()) {
2334
2333
FlushMemTableToOutputFile (nullptr , deletion_state);
2335
2334
bg_cv_.SignalAll (); // Wakeup MakeRoomForWrite() if necessary
2336
2335
}
@@ -2663,8 +2662,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
2663
2662
namespace {
2664
2663
struct IterState {
2665
2664
port::Mutex* mu;
2666
- Version* version;
2667
- std::vector<MemTable*> mem; // includes both mem_ and imm_
2665
+ Version* version = nullptr ;
2666
+ MemTable* mem = nullptr ;
2667
+ MemTableListVersion* imm = nullptr ;
2668
2668
DBImpl *db;
2669
2669
};
2670
2670
@@ -2673,15 +2673,16 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
2673
2673
DBImpl::DeletionState deletion_state (state->db ->GetOptions ().
2674
2674
max_write_buffer_number);
2675
2675
state->mu ->Lock ();
2676
- for (unsigned int i = 0 ; i < state->mem .size (); i++) {
2677
- MemTable* m = state->mem [i]->Unref ();
2678
- if (m != nullptr ) {
2679
- deletion_state.memtables_to_free .push_back (m);
2680
- }
2676
+ MemTable* m = state->mem ->Unref ();
2677
+ if (m != nullptr ) {
2678
+ deletion_state.memtables_to_free .push_back (m);
2681
2679
}
2682
2680
if (state->version ) { // not set for memtable-only iterator
2683
2681
state->version ->Unref ();
2684
2682
}
2683
+ if (state->imm ) { // not set for memtable-only iterator
2684
+ state->imm ->Unref (&deletion_state.memtables_to_free );
2685
+ }
2685
2686
// fast path FindObsoleteFiles
2686
2687
state->db ->FindObsoleteFiles (deletion_state, false , true );
2687
2688
state->mu ->Unlock ();
@@ -2695,7 +2696,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
2695
2696
SequenceNumber* latest_snapshot) {
2696
2697
IterState* cleanup = new IterState;
2697
2698
MemTable* mutable_mem;
2698
- std::vector<MemTable*> immutables ;
2699
+ MemTableListVersion* immutable_mems ;
2699
2700
Version* version;
2700
2701
2701
2702
// Collect together all needed child iterators for mem
@@ -2704,27 +2705,22 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
2704
2705
mem_->Ref ();
2705
2706
mutable_mem = mem_;
2706
2707
// Collect together all needed child iterators for imm_
2707
- imm_.GetMemTables (&immutables);
2708
- for (unsigned int i = 0 ; i < immutables.size (); i++) {
2709
- immutables[i]->Ref ();
2710
- }
2708
+ immutable_mems = imm_.current ();
2709
+ immutable_mems->Ref ();
2711
2710
versions_->current ()->Ref ();
2712
2711
version = versions_->current ();
2713
2712
mutex_.Unlock ();
2714
2713
2715
- std::vector<Iterator*> list ;
2716
- list .push_back (mutable_mem->NewIterator (options));
2717
- cleanup->mem . push_back ( mutable_mem) ;
2718
-
2714
+ std::vector<Iterator*> iterator_list ;
2715
+ iterator_list .push_back (mutable_mem->NewIterator (options));
2716
+ cleanup->mem = mutable_mem;
2717
+ cleanup-> imm = immutable_mems;
2719
2718
// Collect all needed child iterators for immutable memtables
2720
- for (MemTable* m : immutables) {
2721
- list.push_back (m->NewIterator (options));
2722
- cleanup->mem .push_back (m);
2723
- }
2719
+ immutable_mems->AddIterators (options, &iterator_list);
2724
2720
// Collect iterators for files in L0 - Ln
2725
- version->AddIterators (options, storage_options_, &list );
2726
- Iterator* internal_iter =
2727
- NewMergingIterator ( &internal_comparator_, &list [0 ], list .size ());
2721
+ version->AddIterators (options, storage_options_, &iterator_list );
2722
+ Iterator* internal_iter = NewMergingIterator (
2723
+ &internal_comparator_, &iterator_list [0 ], iterator_list .size ());
2728
2724
cleanup->version = version;
2729
2725
cleanup->mu = &mutex_;
2730
2726
cleanup->db = this ;
@@ -2743,19 +2739,15 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
2743
2739
uint64_t * superversion_number) {
2744
2740
2745
2741
MemTable* mutable_mem;
2746
- std::vector<MemTable*> immutables ;
2742
+ MemTableListVersion* immutable_mems ;
2747
2743
Version* version;
2748
2744
2749
- immutables.reserve (options_.max_write_buffer_number );
2750
-
2751
2745
// get all child iterators and bump their refcounts under lock
2752
2746
mutex_.Lock ();
2753
2747
mutable_mem = mem_;
2754
2748
mutable_mem->Ref ();
2755
- imm_.GetMemTables (&immutables);
2756
- for (size_t i = 0 ; i < immutables.size (); ++i) {
2757
- immutables[i]->Ref ();
2758
- }
2749
+ immutable_mems = imm_.current ();
2750
+ immutable_mems->Ref ();
2759
2751
version = versions_->current ();
2760
2752
version->Ref ();
2761
2753
if (superversion_number != nullptr ) {
@@ -2765,7 +2757,7 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
2765
2757
2766
2758
Iterator* mutable_iter = mutable_mem->NewIterator (options);
2767
2759
IterState* mutable_cleanup = new IterState ();
2768
- mutable_cleanup->mem . push_back ( mutable_mem) ;
2760
+ mutable_cleanup->mem = mutable_mem;
2769
2761
mutable_cleanup->db = this ;
2770
2762
mutable_cleanup->mu = &mutex_;
2771
2763
mutable_iter->RegisterCleanup (CleanupIteratorState, mutable_cleanup, nullptr );
@@ -2777,10 +2769,8 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
2777
2769
Iterator* immutable_iter;
2778
2770
IterState* immutable_cleanup = new IterState ();
2779
2771
std::vector<Iterator*> list;
2780
- for (MemTable* m : immutables) {
2781
- list.push_back (m->NewIterator (options));
2782
- immutable_cleanup->mem .push_back (m);
2783
- }
2772
+ immutable_mems->AddIterators (options, &list);
2773
+ immutable_cleanup->imm = immutable_mems;
2784
2774
version->AddIterators (options, storage_options_, &list);
2785
2775
immutable_cleanup->version = version;
2786
2776
immutable_cleanup->db = this ;
@@ -2837,7 +2827,7 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) {
2837
2827
DBImpl::SuperVersion* DBImpl::InstallSuperVersion (
2838
2828
SuperVersion* new_superversion) {
2839
2829
mutex_.AssertHeld ();
2840
- new_superversion->Init (mem_, imm_, versions_->current ());
2830
+ new_superversion->Init (mem_, imm_. current () , versions_->current ());
2841
2831
SuperVersion* old_superversion = super_version_;
2842
2832
super_version_ = new_superversion;
2843
2833
++super_version_number_;
@@ -2880,7 +2870,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
2880
2870
if (get_version->mem ->Get (lkey, value, &s, merge_context, options_)) {
2881
2871
// Done
2882
2872
RecordTick (options_.statistics .get (), MEMTABLE_HIT);
2883
- } else if (get_version->imm . Get (lkey, value, &s, merge_context, options_)) {
2873
+ } else if (get_version->imm -> Get (lkey, value, &s, merge_context, options_)) {
2884
2874
// Done
2885
2875
RecordTick (options_.statistics .get (), MEMTABLE_HIT);
2886
2876
} else {
@@ -2936,10 +2926,10 @@ std::vector<Status> DBImpl::MultiGet(
2936
2926
}
2937
2927
2938
2928
MemTable* mem = mem_;
2939
- MemTableList imm = imm_;
2929
+ MemTableListVersion* imm = imm_. current () ;
2940
2930
Version* current = versions_->current ();
2941
2931
mem->Ref ();
2942
- imm. RefAll ();
2932
+ imm-> Ref ();
2943
2933
current->Ref ();
2944
2934
2945
2935
// Unlock while reading from files and memtables
@@ -2971,7 +2961,7 @@ std::vector<Status> DBImpl::MultiGet(
2971
2961
LookupKey lkey (keys[i], snapshot);
2972
2962
if (mem->Get (lkey, value, &s, merge_context, options_)) {
2973
2963
// Done
2974
- } else if (imm. Get (lkey, value, &s, merge_context, options_)) {
2964
+ } else if (imm-> Get (lkey, value, &s, merge_context, options_)) {
2975
2965
// Done
2976
2966
} else {
2977
2967
current->Get (options, lkey, value, &s, &merge_context, &stats, options_);
@@ -2990,7 +2980,7 @@ std::vector<Status> DBImpl::MultiGet(
2990
2980
MaybeScheduleFlushOrCompaction ();
2991
2981
}
2992
2982
MemTable* m = mem->Unref ();
2993
- imm. UnrefAll (&to_delete);
2983
+ imm-> Unref (&to_delete);
2994
2984
current->Unref ();
2995
2985
mutex_.Unlock ();
2996
2986
0 commit comments