@@ -1298,8 +1298,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
1298
1298
assert (level < NumberLevels ());
1299
1299
1300
1300
SuperVersion* superversion_to_free = nullptr ;
1301
- SuperVersion* new_superversion =
1302
- new SuperVersion (options_.max_write_buffer_number );
1301
+ SuperVersion* new_superversion = new SuperVersion ();
1303
1302
1304
1303
mutex_.Lock ();
1305
1304
@@ -2949,6 +2948,13 @@ std::vector<Status> DBImpl::MultiGet(
2949
2948
return statList;
2950
2949
}
2951
2950
2951
+ // TODO(icanadi) creating column family while writing will cause a data race.
2952
+ // In write code path, we iterate through column families and call
2953
+ // MakeRoomForWrite() for each. MakeRoomForWrite() can unlock the mutex
2954
+ // and wait (delay the write). If we create or drop a column family when
2955
+ // that mutex is unlocked for delay, that's bad.
2956
+ // Solution TODO: enable iteration by chaining column families in
2957
+ // circular linked lists
2952
2958
Status DBImpl::CreateColumnFamily (const ColumnFamilyOptions& options,
2953
2959
const std::string& column_family_name,
2954
2960
ColumnFamilyHandle* handle) {
@@ -3106,9 +3112,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
3106
3112
RecordTick (options_.statistics .get (), WRITE_DONE_BY_SELF, 1 );
3107
3113
}
3108
3114
3109
- // May temporarily unlock and wait.
3110
- SuperVersion* superversion_to_free = nullptr ;
3111
- Status status = MakeRoomForWrite (my_batch == nullptr , &superversion_to_free);
3115
+ Status status;
3116
+ for (auto cfd : *versions_->GetColumnFamilySet ()) {
3117
+ // May temporarily unlock and wait.
3118
+ status = MakeRoomForWrite (cfd, my_batch == nullptr );
3119
+ if (!status.ok ()) {
3120
+ break ;
3121
+ }
3122
+ }
3112
3123
uint64_t last_sequence = versions_->LastSequence ();
3113
3124
Writer* last_writer = &w;
3114
3125
if (status.ok () && my_batch != nullptr ) { // nullptr batch is for compactions
@@ -3209,7 +3220,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
3209
3220
writers_.front ()->cv .Signal ();
3210
3221
}
3211
3222
mutex_.Unlock ();
3212
- delete superversion_to_free;
3213
3223
return status;
3214
3224
}
3215
3225
@@ -3295,8 +3305,7 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
3295
3305
3296
3306
// REQUIRES: mutex_ is held
3297
3307
// REQUIRES: this thread is currently at the front of the writer queue
3298
- Status DBImpl::MakeRoomForWrite (bool force,
3299
- SuperVersion** superversion_to_free) {
3308
+ Status DBImpl::MakeRoomForWrite (ColumnFamilyData* cfd, bool force) {
3300
3309
mutex_.AssertHeld ();
3301
3310
assert (!writers_.empty ());
3302
3311
bool allow_delay = !force;
@@ -3305,24 +3314,23 @@ Status DBImpl::MakeRoomForWrite(bool force,
3305
3314
uint64_t rate_limit_delay_millis = 0 ;
3306
3315
Status s;
3307
3316
double score;
3308
- *superversion_to_free = nullptr ;
3309
3317
3310
3318
while (true ) {
3311
3319
if (!bg_error_.ok ()) {
3312
3320
// Yield previous error
3313
3321
s = bg_error_;
3314
3322
break ;
3315
- } else if (allow_delay && versions_ ->NeedSlowdownForNumLevel0Files ()) {
3323
+ } else if (allow_delay && cfd ->NeedSlowdownForNumLevel0Files ()) {
3316
3324
// We are getting close to hitting a hard limit on the number of
3317
3325
// L0 files. Rather than delaying a single write by several
3318
3326
// seconds when we hit the hard limit, start delaying each
3319
3327
// individual write by 0-1ms to reduce latency variance. Also,
3320
3328
// this delay hands over some CPU to the compaction thread in
3321
3329
// case it is sharing the same core as the writer.
3322
3330
uint64_t slowdown =
3323
- SlowdownAmount (default_cfd_ ->current ()->NumLevelFiles (0 ),
3324
- options_. level0_slowdown_writes_trigger ,
3325
- options_. level0_stop_writes_trigger );
3331
+ SlowdownAmount (cfd ->current ()->NumLevelFiles (0 ),
3332
+ cfd-> options ()-> level0_slowdown_writes_trigger ,
3333
+ cfd-> options ()-> level0_stop_writes_trigger );
3326
3334
mutex_.Unlock ();
3327
3335
uint64_t delayed;
3328
3336
{
@@ -3335,32 +3343,32 @@ Status DBImpl::MakeRoomForWrite(bool force,
3335
3343
allow_delay = false ; // Do not delay a single write more than once
3336
3344
mutex_.Lock ();
3337
3345
delayed_writes_++;
3338
- } else if (!force && (default_cfd_ ->mem ()->ApproximateMemoryUsage () <=
3339
- options_. write_buffer_size )) {
3346
+ } else if (!force && (cfd ->mem ()->ApproximateMemoryUsage () <=
3347
+ cfd-> options ()-> write_buffer_size )) {
3340
3348
// There is room in current memtable
3341
3349
if (allow_delay) {
3342
3350
DelayLoggingAndReset ();
3343
3351
}
3344
3352
break ;
3345
- } else if (default_cfd_ ->imm ()->size () ==
3346
- options_. max_write_buffer_number - 1 ) {
3353
+ } else if (cfd ->imm ()->size () ==
3354
+ cfd-> options ()-> max_write_buffer_number - 1 ) {
3347
3355
// We have filled up the current memtable, but the previous
3348
3356
// ones are still being compacted, so we wait.
3349
3357
DelayLoggingAndReset ();
3350
3358
Log (options_.info_log , " wait for memtable compaction...\n " );
3351
3359
uint64_t stall;
3352
3360
{
3353
3361
StopWatch sw (env_, options_.statistics .get (),
3354
- STALL_MEMTABLE_COMPACTION_COUNT);
3362
+ STALL_MEMTABLE_COMPACTION_COUNT);
3355
3363
bg_cv_.Wait ();
3356
3364
stall = sw.ElapsedMicros ();
3357
3365
}
3358
3366
RecordTick (options_.statistics .get (),
3359
3367
STALL_MEMTABLE_COMPACTION_MICROS, stall);
3360
3368
internal_stats_.RecordWriteStall (InternalStats::MEMTABLE_COMPACTION,
3361
3369
stall);
3362
- } else if (default_cfd_ ->current ()->NumLevelFiles (0 ) >=
3363
- options_. level0_stop_writes_trigger ) {
3370
+ } else if (cfd ->current ()->NumLevelFiles (0 ) >=
3371
+ cfd-> options ()-> level0_stop_writes_trigger ) {
3364
3372
// There are too many level-0 files.
3365
3373
DelayLoggingAndReset ();
3366
3374
Log (options_.info_log , " wait for fewer level0 files...\n " );
@@ -3374,10 +3382,10 @@ Status DBImpl::MakeRoomForWrite(bool force,
3374
3382
RecordTick (options_.statistics .get (), STALL_L0_NUM_FILES_MICROS, stall);
3375
3383
internal_stats_.RecordWriteStall (InternalStats::LEVEL0_NUM_FILES, stall);
3376
3384
} else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 &&
3377
- (score = default_cfd_ ->current ()->MaxCompactionScore ()) >
3378
- options_. hard_rate_limit ) {
3385
+ (score = cfd ->current ()->MaxCompactionScore ()) >
3386
+ cfd-> options ()-> hard_rate_limit ) {
3379
3387
// Delay a write when the compaction score for any level is too large.
3380
- int max_level = default_cfd_ ->current ()->MaxCompactionScoreLevel ();
3388
+ int max_level = cfd ->current ()->MaxCompactionScoreLevel ();
3381
3389
mutex_.Unlock ();
3382
3390
uint64_t delayed;
3383
3391
{
@@ -3392,26 +3400,25 @@ Status DBImpl::MakeRoomForWrite(bool force,
3392
3400
rate_limit_delay_millis += rate_limit;
3393
3401
RecordTick (options_.statistics .get (),
3394
3402
RATE_LIMIT_DELAY_MILLIS, rate_limit);
3395
- if (options_. rate_limit_delay_max_milliseconds > 0 &&
3403
+ if (cfd-> options ()-> rate_limit_delay_max_milliseconds > 0 &&
3396
3404
rate_limit_delay_millis >=
3397
- (unsigned )options_. rate_limit_delay_max_milliseconds ) {
3405
+ (unsigned )cfd-> options ()-> rate_limit_delay_max_milliseconds ) {
3398
3406
allow_hard_rate_limit_delay = false ;
3399
3407
}
3400
3408
mutex_.Lock ();
3401
- } else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 &&
3402
- (score = default_cfd_->current ()->MaxCompactionScore ()) >
3403
- options_.soft_rate_limit ) {
3409
+ } else if (allow_soft_rate_limit_delay &&
3410
+ cfd->options ()->soft_rate_limit > 0.0 &&
3411
+ (score = cfd->current ()->MaxCompactionScore ()) >
3412
+ cfd->options ()->soft_rate_limit ) {
3404
3413
// Delay a write when the compaction score for any level is too large.
3405
3414
// TODO: add statistics
3406
3415
mutex_.Unlock ();
3407
3416
{
3408
3417
StopWatch sw (env_, options_.statistics .get (),
3409
3418
SOFT_RATE_LIMIT_DELAY_COUNT);
3410
- env_->SleepForMicroseconds (SlowdownAmount (
3411
- score,
3412
- options_.soft_rate_limit ,
3413
- options_.hard_rate_limit )
3414
- );
3419
+ env_->SleepForMicroseconds (
3420
+ SlowdownAmount (score, cfd->options ()->soft_rate_limit ,
3421
+ cfd->options ()->hard_rate_limit ));
3415
3422
rate_limit_delay_millis += sw.ElapsedMicros ();
3416
3423
}
3417
3424
allow_soft_rate_limit_delay = false ;
@@ -3436,9 +3443,10 @@ Status DBImpl::MakeRoomForWrite(bool force,
3436
3443
if (s.ok ()) {
3437
3444
// Our final size should be less than write_buffer_size
3438
3445
// (compression, etc) but err on the side of caution.
3439
- lfile->SetPreallocationBlockSize (1.1 * options_.write_buffer_size );
3440
- memtmp = new MemTable (internal_comparator_, options_);
3441
- new_superversion = new SuperVersion (options_.max_write_buffer_number );
3446
+ lfile->SetPreallocationBlockSize (1.1 *
3447
+ cfd->options ()->write_buffer_size );
3448
+ memtmp = new MemTable (internal_comparator_, *cfd->options ());
3449
+ new_superversion = new SuperVersion ();
3442
3450
}
3443
3451
}
3444
3452
mutex_.Lock ();
@@ -3450,20 +3458,19 @@ Status DBImpl::MakeRoomForWrite(bool force,
3450
3458
}
3451
3459
logfile_number_ = new_log_number;
3452
3460
log_.reset (new log ::Writer (std::move (lfile)));
3453
- default_cfd_ ->mem ()->SetNextLogNumber (logfile_number_);
3454
- default_cfd_ ->imm ()->Add (default_cfd_ ->mem ());
3461
+ cfd ->mem ()->SetNextLogNumber (logfile_number_);
3462
+ cfd ->imm ()->Add (cfd ->mem ());
3455
3463
if (force) {
3456
- default_cfd_ ->imm ()->FlushRequested ();
3464
+ cfd ->imm ()->FlushRequested ();
3457
3465
}
3458
3466
memtmp->Ref ();
3459
3467
memtmp->SetLogNumber (logfile_number_);
3460
- default_cfd_ ->SetMemtable (memtmp);
3468
+ cfd ->SetMemtable (memtmp);
3461
3469
Log (options_.info_log , " New memtable created with log file: #%lu\n " ,
3462
3470
(unsigned long )logfile_number_);
3463
3471
force = false ; // Do not force another compaction if have room
3464
3472
MaybeScheduleFlushOrCompaction ();
3465
- *superversion_to_free =
3466
- default_cfd_->InstallSuperVersion (new_superversion);
3473
+ delete cfd->InstallSuperVersion (new_superversion);
3467
3474
}
3468
3475
}
3469
3476
return s;
0 commit comments