@@ -290,8 +290,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
290
290
return result;
291
291
}
292
292
293
+ namespace {
294
+
293
295
Status SanitizeDBOptionsByCFOptions (
294
- DBOptions* db_opts,
296
+ const DBOptions* db_opts,
295
297
const std::vector<ColumnFamilyDescriptor>& column_families) {
296
298
Status s;
297
299
for (auto cf : column_families) {
@@ -303,7 +305,6 @@ Status SanitizeDBOptionsByCFOptions(
303
305
return Status::OK ();
304
306
}
305
307
306
- namespace {
307
308
CompressionType GetCompressionFlush (const Options& options) {
308
309
// Compressing memtable flushes might not help unless the sequential load
309
310
// optimization is used for leveled compaction. Otherwise the CPU and
@@ -631,7 +632,7 @@ bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first,
631
632
} else if (first.file_name < second.file_name ) {
632
633
return false ;
633
634
} else {
634
- return (first.path_id > first .path_id );
635
+ return (first.path_id > second .path_id );
635
636
}
636
637
}
637
638
}; // namespace
@@ -1301,14 +1302,20 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
1301
1302
WriteBatch batch;
1302
1303
while (reader.ReadRecord (&record, &scratch)) {
1303
1304
if (record.size () < 12 ) {
1304
- reporter.Corruption (
1305
- record. size (), Status::Corruption (" log record too small" ));
1305
+ reporter.Corruption (record. size (),
1306
+ Status::Corruption (" log record too small" ));
1306
1307
continue ;
1307
1308
}
1308
1309
WriteBatchInternal::SetContents (&batch, record);
1309
1310
1311
+ // If column family was not found, it might mean that the WAL write
1312
+ // batch references to the column family that was dropped after the
1313
+ // insert. We don't want to fail the whole write batch in that case -- we
1314
+ // just ignore the update. That's why we set ignore missing column families
1315
+ // to true
1310
1316
status = WriteBatchInternal::InsertInto (
1311
- &batch, column_family_memtables_.get (), true , log_number);
1317
+ &batch, column_family_memtables_.get (),
1318
+ true /* ignore missing column families */ , log_number);
1312
1319
1313
1320
MaybeIgnoreError (&status);
1314
1321
if (!status.ok ()) {
@@ -1677,6 +1684,13 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
1677
1684
}
1678
1685
LogFlush (options_.info_log );
1679
1686
1687
+ {
1688
+ MutexLock l (&mutex_);
1689
+ // an automatic compaction that has been scheduled might have been
1690
+ // preempted by the manual compactions. Need to schedule it back.
1691
+ MaybeScheduleFlushOrCompaction ();
1692
+ }
1693
+
1680
1694
return s;
1681
1695
}
1682
1696
@@ -1864,18 +1878,15 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
1864
1878
bg_cv_.Wait ();
1865
1879
} else {
1866
1880
manual_compaction_ = &manual;
1867
- MaybeScheduleFlushOrCompaction ();
1881
+ assert (bg_compaction_scheduled_ == 0 );
1882
+ bg_compaction_scheduled_++;
1883
+ env_->Schedule (&DBImpl::BGWorkCompaction, this , Env::Priority::LOW);
1868
1884
}
1869
1885
}
1870
1886
1871
1887
assert (!manual.in_progress );
1872
1888
assert (bg_manual_only_ > 0 );
1873
1889
--bg_manual_only_;
1874
- if (bg_manual_only_ == 0 ) {
1875
- // an automatic compaction should have been scheduled might have be
1876
- // preempted by the manual compactions. Need to schedule it back.
1877
- MaybeScheduleFlushOrCompaction ();
1878
- }
1879
1890
return manual.status ;
1880
1891
}
1881
1892
@@ -1963,11 +1974,11 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
1963
1974
1964
1975
// Schedule BGWorkCompaction if there's a compaction pending (or a memtable
1965
1976
// flush, but the HIGH pool is not enabled)
1966
- // Do it only if max_background_compactions hasn't been reached and, in case
1967
- // bg_manual_only_ > 0, if it's a manual compaction.
1968
- if ((manual_compaction_ || is_compaction_needed ||
1969
- (is_flush_pending && options_. max_background_flushes == 0 )) &&
1970
- (!bg_manual_only_ || manual_compaction_ )) {
1977
+ // Do it only if max_background_compactions hasn't been reached and
1978
+ // bg_manual_only_ == 0
1979
+ if (!bg_manual_only_ &&
1980
+ (is_compaction_needed ||
1981
+ (is_flush_pending && options_. max_background_flushes == 0 ) )) {
1971
1982
if (bg_compaction_scheduled_ < options_.max_background_compactions ) {
1972
1983
bg_compaction_scheduled_++;
1973
1984
env_->Schedule (&DBImpl::BGWorkCompaction, this , Env::Priority::LOW);
@@ -1979,7 +1990,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
1979
1990
}
1980
1991
1981
1992
void DBImpl::RecordFlushIOStats () {
1982
- RecordTick (stats_, FLUSH_WRITE_BYTES, iostats_context. bytes_written );
1993
+ RecordTick (stats_, FLUSH_WRITE_BYTES, IOSTATS ( bytes_written) );
1983
1994
IOSTATS_RESET (bytes_written);
1984
1995
}
1985
1996
@@ -2194,6 +2205,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
2194
2205
if (is_manual) {
2195
2206
// another thread cannot pick up the same work
2196
2207
manual_compaction_->in_progress = true ;
2208
+ } else if (manual_compaction_ != nullptr ) {
2209
+ // there should be no automatic compactions running when manual compaction
2210
+ // is running
2211
+ return Status::OK ();
2197
2212
}
2198
2213
2199
2214
// FLUSH preempts compaction
@@ -2313,7 +2328,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
2313
2328
2314
2329
if (status.ok ()) {
2315
2330
// Done
2316
- } else if (shutting_down_. Acquire_Load ()) {
2331
+ } else if (status. IsShutdownInProgress ()) {
2317
2332
// Ignore compaction errors found during shutting down
2318
2333
} else {
2319
2334
Log (InfoLogLevel::WARN_LEVEL, options_.info_log , " Compaction error: %s" ,
@@ -2573,6 +2588,10 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
2573
2588
uint64_t DBImpl::CallFlushDuringCompaction (ColumnFamilyData* cfd,
2574
2589
DeletionState& deletion_state,
2575
2590
LogBuffer* log_buffer) {
2591
+ if (options_.max_background_flushes > 0 ) {
2592
+ // flush thread will take care of this
2593
+ return 0 ;
2594
+ }
2576
2595
if (cfd->imm ()->imm_flush_needed .NoBarrier_Load () != nullptr ) {
2577
2596
const uint64_t imm_start = env_->NowMicros ();
2578
2597
mutex_.Lock ();
@@ -2626,9 +2645,29 @@ Status DBImpl::ProcessKeyValueCompaction(
2626
2645
compaction_filter = compaction_filter_from_factory.get ();
2627
2646
}
2628
2647
2648
+ int64_t key_drop_user = 0 ;
2649
+ int64_t key_drop_newer_entry = 0 ;
2650
+ int64_t key_drop_obsolete = 0 ;
2651
+ int64_t loop_cnt = 0 ;
2629
2652
while (input->Valid () && !shutting_down_.Acquire_Load () &&
2630
2653
!cfd->IsDropped ()) {
2631
- RecordCompactionIOStats ();
2654
+ if (++loop_cnt > 1000 ) {
2655
+ if (key_drop_user > 0 ) {
2656
+ RecordTick (stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
2657
+ key_drop_user = 0 ;
2658
+ }
2659
+ if (key_drop_newer_entry > 0 ) {
2660
+ RecordTick (stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
2661
+ key_drop_newer_entry);
2662
+ key_drop_newer_entry = 0 ;
2663
+ }
2664
+ if (key_drop_obsolete > 0 ) {
2665
+ RecordTick (stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
2666
+ key_drop_obsolete = 0 ;
2667
+ }
2668
+ RecordCompactionIOStats ();
2669
+ loop_cnt = 0 ;
2670
+ }
2632
2671
// FLUSH preempts compaction
2633
2672
// TODO(icanadi) this currently only checks if flush is necessary on
2634
2673
// compacting column family. we should also check if flush is necessary on
@@ -2709,7 +2748,7 @@ Status DBImpl::ProcessKeyValueCompaction(
2709
2748
ParseInternalKey (key, &ikey);
2710
2749
// no value associated with delete
2711
2750
value.clear ();
2712
- RecordTick (stats_, COMPACTION_KEY_DROP_USER) ;
2751
+ ++key_drop_user ;
2713
2752
} else if (value_changed) {
2714
2753
value = compaction_filter_value;
2715
2754
}
@@ -2733,7 +2772,7 @@ Status DBImpl::ProcessKeyValueCompaction(
2733
2772
// TODO: why not > ?
2734
2773
assert (last_sequence_for_key >= ikey.sequence );
2735
2774
drop = true ; // (A)
2736
- RecordTick (stats_, COMPACTION_KEY_DROP_NEWER_ENTRY) ;
2775
+ ++key_drop_newer_entry ;
2737
2776
} else if (ikey.type == kTypeDeletion &&
2738
2777
ikey.sequence <= earliest_snapshot &&
2739
2778
compact->compaction ->KeyNotExistsBeyondOutputLevel (ikey.user_key )) {
@@ -2745,7 +2784,7 @@ Status DBImpl::ProcessKeyValueCompaction(
2745
2784
// few iterations of this loop (by rule (A) above).
2746
2785
// Therefore this deletion marker is obsolete and can be dropped.
2747
2786
drop = true ;
2748
- RecordTick (stats_, COMPACTION_KEY_DROP_OBSOLETE) ;
2787
+ ++key_drop_obsolete ;
2749
2788
} else if (ikey.type == kTypeMerge ) {
2750
2789
if (!merge.HasOperator ()) {
2751
2790
LogToBuffer (log_buffer, " Options::merge_operator is null." );
@@ -2892,7 +2931,15 @@ Status DBImpl::ProcessKeyValueCompaction(
2892
2931
input->Next ();
2893
2932
}
2894
2933
}
2895
-
2934
+ if (key_drop_user > 0 ) {
2935
+ RecordTick (stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
2936
+ }
2937
+ if (key_drop_newer_entry > 0 ) {
2938
+ RecordTick (stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry);
2939
+ }
2940
+ if (key_drop_obsolete > 0 ) {
2941
+ RecordTick (stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
2942
+ }
2896
2943
RecordCompactionIOStats ();
2897
2944
2898
2945
return status;
@@ -3367,7 +3414,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
3367
3414
ColumnFamilyHandle* column_family, const Slice& key,
3368
3415
std::string* value, bool * value_found) {
3369
3416
StopWatch sw (env_, stats_, DB_GET);
3370
- PERF_TIMER_AUTO (get_snapshot_time);
3417
+ PERF_TIMER_GUARD (get_snapshot_time);
3371
3418
3372
3419
auto cfh = reinterpret_cast <ColumnFamilyHandleImpl*>(column_family);
3373
3420
auto cfd = cfh->cfd ();
@@ -3391,27 +3438,27 @@ Status DBImpl::GetImpl(const ReadOptions& options,
3391
3438
// merge_operands will contain the sequence of merges in the latter case.
3392
3439
LookupKey lkey (key, snapshot);
3393
3440
PERF_TIMER_STOP (get_snapshot_time);
3441
+
3394
3442
if (sv->mem ->Get (lkey, value, &s, merge_context, *cfd->options ())) {
3395
3443
// Done
3396
3444
RecordTick (stats_, MEMTABLE_HIT);
3397
3445
} else if (sv->imm ->Get (lkey, value, &s, merge_context, *cfd->options ())) {
3398
3446
// Done
3399
3447
RecordTick (stats_, MEMTABLE_HIT);
3400
3448
} else {
3401
- PERF_TIMER_START (get_from_output_files_time);
3402
-
3449
+ PERF_TIMER_GUARD (get_from_output_files_time);
3403
3450
sv->current ->Get (options, lkey, value, &s, &merge_context, value_found);
3404
- PERF_TIMER_STOP (get_from_output_files_time);
3405
3451
RecordTick (stats_, MEMTABLE_MISS);
3406
3452
}
3407
3453
3408
- PERF_TIMER_START (get_post_process_time);
3454
+ {
3455
+ PERF_TIMER_GUARD (get_post_process_time);
3409
3456
3410
- ReturnAndCleanupSuperVersion (cfd, sv);
3457
+ ReturnAndCleanupSuperVersion (cfd, sv);
3411
3458
3412
- RecordTick (stats_, NUMBER_KEYS_READ);
3413
- RecordTick (stats_, BYTES_READ, value->size ());
3414
- PERF_TIMER_STOP (get_post_process_time);
3459
+ RecordTick (stats_, NUMBER_KEYS_READ);
3460
+ RecordTick (stats_, BYTES_READ, value->size ());
3461
+ }
3415
3462
return s;
3416
3463
}
3417
3464
@@ -3421,7 +3468,7 @@ std::vector<Status> DBImpl::MultiGet(
3421
3468
const std::vector<Slice>& keys, std::vector<std::string>* values) {
3422
3469
3423
3470
StopWatch sw (env_, stats_, DB_MULTIGET);
3424
- PERF_TIMER_AUTO (get_snapshot_time);
3471
+ PERF_TIMER_GUARD (get_snapshot_time);
3425
3472
3426
3473
SequenceNumber snapshot;
3427
3474
@@ -3497,7 +3544,7 @@ std::vector<Status> DBImpl::MultiGet(
3497
3544
}
3498
3545
3499
3546
// Post processing (decrement reference counts and record statistics)
3500
- PERF_TIMER_START (get_post_process_time);
3547
+ PERF_TIMER_GUARD (get_post_process_time);
3501
3548
autovector<SuperVersion*> superversions_to_delete;
3502
3549
3503
3550
// TODO(icanadi) do we need lock here or just around Cleanup()?
@@ -3870,7 +3917,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
3870
3917
if (my_batch == nullptr ) {
3871
3918
return Status::Corruption (" Batch is nullptr!" );
3872
3919
}
3873
- PERF_TIMER_AUTO (write_pre_and_post_process_time);
3920
+ PERF_TIMER_GUARD (write_pre_and_post_process_time);
3874
3921
Writer w (&mutex_);
3875
3922
w.batch = my_batch;
3876
3923
w.sync = options.sync ;
@@ -4003,7 +4050,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
4003
4050
4004
4051
uint64_t log_size = 0 ;
4005
4052
if (!options.disableWAL ) {
4006
- PERF_TIMER_START (write_wal_time);
4053
+ PERF_TIMER_GUARD (write_wal_time);
4007
4054
Slice log_entry = WriteBatchInternal::Contents (updates);
4008
4055
status = log_->AddRecord (log_entry);
4009
4056
total_log_size_ += log_entry.size ();
@@ -4021,13 +4068,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
4021
4068
status = log_->file ()->Sync ();
4022
4069
}
4023
4070
}
4024
- PERF_TIMER_STOP (write_wal_time);
4025
4071
}
4026
4072
if (status.ok ()) {
4027
- PERF_TIMER_START (write_memtable_time);
4073
+ PERF_TIMER_GUARD (write_memtable_time);
4028
4074
4029
4075
status = WriteBatchInternal::InsertInto (
4030
- updates, column_family_memtables_.get (), false , 0 , this , false );
4076
+ updates, column_family_memtables_.get (),
4077
+ options.ignore_missing_column_families , 0 , this , false );
4031
4078
// A non-OK status here indicates iteration failure (either in-memory
4032
4079
// writebatch corruption (very bad), or the client specified invalid
4033
4080
// column family). This will later on trigger bg_error_.
@@ -4036,8 +4083,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
4036
4083
// into the memtable would result in a state that some write ops might
4037
4084
// have succeeded in memtable but Status reports error for all writes.
4038
4085
4039
- PERF_TIMER_STOP (write_memtable_time);
4040
-
4041
4086
SetTickerCount (stats_, SEQUENCE_NUMBER, last_sequence);
4042
4087
}
4043
4088
PERF_TIMER_START (write_pre_and_post_process_time);
@@ -4071,7 +4116,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
4071
4116
RecordTick (stats_, WRITE_TIMEDOUT);
4072
4117
}
4073
4118
4074
- PERF_TIMER_STOP (write_pre_and_post_process_time);
4075
4119
return status;
4076
4120
}
4077
4121
@@ -4759,11 +4803,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
4759
4803
column_families.push_back (
4760
4804
ColumnFamilyDescriptor (kDefaultColumnFamilyName , cf_options));
4761
4805
std::vector<ColumnFamilyHandle*> handles;
4762
- Status s = SanitizeDBOptionsByCFOptions (&db_options, column_families);
4763
- if (!s.ok ()) {
4764
- return s;
4765
- }
4766
- s = DB::Open (db_options, dbname, column_families, &handles, dbptr);
4806
+ Status s = DB::Open (db_options, dbname, column_families, &handles, dbptr);
4767
4807
if (s.ok ()) {
4768
4808
assert (handles.size () == 1 );
4769
4809
// i can delete the handle since DBImpl is always holding a reference to
@@ -4776,6 +4816,10 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
4776
4816
Status DB::Open (const DBOptions& db_options, const std::string& dbname,
4777
4817
const std::vector<ColumnFamilyDescriptor>& column_families,
4778
4818
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
4819
+ Status s = SanitizeDBOptionsByCFOptions (&db_options, column_families);
4820
+ if (!s.ok ()) {
4821
+ return s;
4822
+ }
4779
4823
if (db_options.db_paths .size () > 1 ) {
4780
4824
for (auto & cfd : column_families) {
4781
4825
if (cfd.options .compaction_style != kCompactionStyleUniversal ) {
@@ -4801,7 +4845,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
4801
4845
}
4802
4846
4803
4847
DBImpl* impl = new DBImpl (db_options, dbname);
4804
- Status s = impl->env_ ->CreateDirIfMissing (impl->options_ .wal_dir );
4848
+ s = impl->env_ ->CreateDirIfMissing (impl->options_ .wal_dir );
4805
4849
if (s.ok ()) {
4806
4850
for (auto db_path : impl->options_ .db_paths ) {
4807
4851
s = impl->env_ ->CreateDirIfMissing (db_path.path );
0 commit comments