@@ -1915,14 +1915,6 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
1915
1915
1916
1916
Status DBImpl::FlushMemTable (ColumnFamilyData* cfd,
1917
1917
const FlushOptions& options) {
1918
- Writer w (&mutex_);
1919
- w.batch = nullptr ;
1920
- w.sync = false ;
1921
- w.disableWAL = false ;
1922
- w.in_batch_group = false ;
1923
- w.done = false ;
1924
- w.timeout_hint_us = kNoTimeOut ;
1925
-
1926
1918
Status s;
1927
1919
{
1928
1920
WriteContext context;
@@ -1933,7 +1925,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
1933
1925
return Status::OK ();
1934
1926
}
1935
1927
1936
- s = BeginWrite (&w, 0 );
1928
+ WriteThread::Writer w (&mutex_);
1929
+ s = write_thread_.EnterWriteThread (&w, 0 );
1937
1930
assert (s.ok () && !w.done ); // No timeout and nobody should do our job
1938
1931
1939
1932
// SetNewMemtableAndNewLogFile() will release and reacquire mutex
@@ -1942,12 +1935,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
1942
1935
cfd->imm ()->FlushRequested ();
1943
1936
MaybeScheduleFlushOrCompaction ();
1944
1937
1945
- assert (!writers_.empty ());
1946
- assert (writers_.front () == &w);
1947
- EndWrite (&w, &w, s);
1938
+ write_thread_.ExitWriteThread (&w, &w, s);
1948
1939
}
1949
1940
1950
-
1951
1941
if (s.ok () && options.wait ) {
1952
1942
// Wait until the compaction completes
1953
1943
s = WaitForFlushMemTable (cfd);
@@ -3652,13 +3642,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
3652
3642
edit.DropColumnFamily ();
3653
3643
edit.SetColumnFamily (cfd->GetID ());
3654
3644
3655
- Writer w (&mutex_);
3656
- w.batch = nullptr ;
3657
- w.sync = false ;
3658
- w.disableWAL = false ;
3659
- w.in_batch_group = false ;
3660
- w.done = false ;
3661
- w.timeout_hint_us = kNoTimeOut ;
3662
3645
3663
3646
Status s;
3664
3647
{
@@ -3668,10 +3651,11 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
3668
3651
}
3669
3652
if (s.ok ()) {
3670
3653
// we drop column family from a single write thread
3671
- s = BeginWrite (&w, 0 );
3654
+ WriteThread::Writer w (&mutex_);
3655
+ s = write_thread_.EnterWriteThread (&w, 0 );
3672
3656
assert (s.ok () && !w.done ); // No timeout and nobody should do our job
3673
3657
s = versions_->LogAndApply (cfd, &edit, &mutex_);
3674
- EndWrite (&w, &w, s);
3658
+ write_thread_. ExitWriteThread (&w, &w, s);
3675
3659
}
3676
3660
}
3677
3661
@@ -3891,88 +3875,12 @@ Status DBImpl::Delete(const WriteOptions& options,
3891
3875
return DB::Delete (options, column_family, key);
3892
3876
}
3893
3877
3894
- // REQUIRES: mutex_ is held
3895
- Status DBImpl::BeginWrite (Writer* w, uint64_t expiration_time) {
3896
- // the following code block pushes the current writer "w" into the writer
3897
- // queue "writers_" and wait until one of the following conditions met:
3898
- // 1. the job of "w" has been done by some other writers.
3899
- // 2. "w" becomes the first writer in "writers_"
3900
- // 3. "w" timed-out.
3901
- mutex_.AssertHeld ();
3902
- writers_.push_back (w);
3903
-
3904
- bool timed_out = false ;
3905
- while (!w->done && w != writers_.front ()) {
3906
- if (expiration_time == 0 ) {
3907
- w->cv .Wait ();
3908
- } else if (w->cv .TimedWait (expiration_time)) {
3909
- if (w->in_batch_group ) {
3910
- // then it means the front writer is currently doing the
3911
- // write on behalf of this "timed-out" writer. Then it
3912
- // should wait until the write completes.
3913
- expiration_time = 0 ;
3914
- } else {
3915
- timed_out = true ;
3916
- break ;
3917
- }
3918
- }
3919
- }
3920
-
3921
- if (timed_out) {
3922
- #ifndef NDEBUG
3923
- bool found = false ;
3924
- #endif
3925
- for (auto iter = writers_.begin (); iter != writers_.end (); iter++) {
3926
- if (*iter == w) {
3927
- writers_.erase (iter);
3928
- #ifndef NDEBUG
3929
- found = true ;
3930
- #endif
3931
- break ;
3932
- }
3933
- }
3934
- #ifndef NDEBUG
3935
- assert (found);
3936
- #endif
3937
- // writers_.front() might still be in cond_wait without a time-out.
3938
- // As a result, we need to signal it to wake it up. Otherwise no
3939
- // one else will wake him up, and RocksDB will hang.
3940
- if (!writers_.empty ()) {
3941
- writers_.front ()->cv .Signal ();
3942
- }
3943
- return Status::TimedOut ();
3944
- }
3945
- return Status::OK ();
3946
- }
3947
-
3948
- // REQUIRES: mutex_ is held
3949
- void DBImpl::EndWrite (Writer* w, Writer* last_writer, Status status) {
3950
- // Pop out the current writer and all writers being pushed before the
3951
- // current writer from the writer queue.
3952
- mutex_.AssertHeld ();
3953
- while (!writers_.empty ()) {
3954
- Writer* ready = writers_.front ();
3955
- writers_.pop_front ();
3956
- if (ready != w) {
3957
- ready->status = status;
3958
- ready->done = true ;
3959
- ready->cv .Signal ();
3960
- }
3961
- if (ready == last_writer) break ;
3962
- }
3963
-
3964
- // Notify new head of write queue
3965
- if (!writers_.empty ()) {
3966
- writers_.front ()->cv .Signal ();
3967
- }
3968
- }
3969
-
3970
3878
Status DBImpl::Write (const WriteOptions& options, WriteBatch* my_batch) {
3971
3879
if (my_batch == nullptr ) {
3972
3880
return Status::Corruption (" Batch is nullptr!" );
3973
3881
}
3974
3882
PERF_TIMER_GUARD (write_pre_and_post_process_time);
3975
- Writer w (&mutex_);
3883
+ WriteThread:: Writer w (&mutex_);
3976
3884
w.batch = my_batch;
3977
3885
w.sync = options.sync ;
3978
3886
w.disableWAL = options.disableWAL ;
@@ -3983,7 +3891,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
3983
3891
uint64_t expiration_time = 0 ;
3984
3892
bool has_timeout = false ;
3985
3893
if (w.timeout_hint_us == 0 ) {
3986
- w.timeout_hint_us = kNoTimeOut ;
3894
+ w.timeout_hint_us = WriteThread:: kNoTimeOut ;
3987
3895
} else {
3988
3896
expiration_time = env_->NowMicros () + w.timeout_hint_us ;
3989
3897
has_timeout = true ;
@@ -3996,7 +3904,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
3996
3904
3997
3905
WriteContext context;
3998
3906
mutex_.Lock ();
3999
- Status status = BeginWrite (&w, expiration_time);
3907
+ Status status = write_thread_. EnterWriteThread (&w, expiration_time);
4000
3908
assert (status.ok () || status.IsTimedOut ());
4001
3909
if (status.IsTimedOut ()) {
4002
3910
mutex_.Unlock ();
@@ -4066,10 +3974,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
4066
3974
}
4067
3975
4068
3976
uint64_t last_sequence = versions_->LastSequence ();
4069
- Writer* last_writer = &w;
3977
+ WriteThread:: Writer* last_writer = &w;
4070
3978
if (status.ok ()) {
4071
3979
autovector<WriteBatch*> write_batch_group;
4072
- BuildBatchGroup (&last_writer, &write_batch_group);
3980
+ write_thread_. BuildBatchGroup (&last_writer, &write_batch_group);
4073
3981
4074
3982
// Add to log and apply to memtable. We can release the lock
4075
3983
// during this phase since &w is currently responsible for logging
@@ -4161,7 +4069,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
4161
4069
bg_error_ = status; // stop compaction & fail any further writes
4162
4070
}
4163
4071
4164
- EndWrite (&w, last_writer, status);
4072
+ write_thread_. ExitWriteThread (&w, last_writer, status);
4165
4073
mutex_.Unlock ();
4166
4074
4167
4075
if (status.IsTimedOut ()) {
@@ -4171,68 +4079,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
4171
4079
return status;
4172
4080
}
4173
4081
4174
- // This function will be called only when the first writer succeeds.
4175
- // All writers in the to-be-built batch group will be processed.
4176
- //
4177
- // REQUIRES: Writer list must be non-empty
4178
- // REQUIRES: First writer must have a non-nullptr batch
4179
- void DBImpl::BuildBatchGroup (Writer** last_writer,
4180
- autovector<WriteBatch*>* write_batch_group) {
4181
- assert (!writers_.empty ());
4182
- Writer* first = writers_.front ();
4183
- assert (first->batch != nullptr );
4184
-
4185
- size_t size = WriteBatchInternal::ByteSize (first->batch );
4186
- write_batch_group->push_back (first->batch );
4187
-
4188
- // Allow the group to grow up to a maximum size, but if the
4189
- // original write is small, limit the growth so we do not slow
4190
- // down the small write too much.
4191
- size_t max_size = 1 << 20 ;
4192
- if (size <= (128 <<10 )) {
4193
- max_size = size + (128 <<10 );
4194
- }
4195
-
4196
- *last_writer = first;
4197
- std::deque<Writer*>::iterator iter = writers_.begin ();
4198
- ++iter; // Advance past "first"
4199
- for (; iter != writers_.end (); ++iter) {
4200
- Writer* w = *iter;
4201
- if (w->sync && !first->sync ) {
4202
- // Do not include a sync write into a batch handled by a non-sync write.
4203
- break ;
4204
- }
4205
-
4206
- if (!w->disableWAL && first->disableWAL ) {
4207
- // Do not include a write that needs WAL into a batch that has
4208
- // WAL disabled.
4209
- break ;
4210
- }
4211
-
4212
- if (w->timeout_hint_us < first->timeout_hint_us ) {
4213
- // Do not include those writes with shorter timeout. Otherwise, we might
4214
- // execute a write that should instead be aborted because of timeout.
4215
- break ;
4216
- }
4217
-
4218
- if (w->batch == nullptr ) {
4219
- // Do not include those writes with nullptr batch. Those are not writes,
4220
- // those are something else. They want to be alone
4221
- break ;
4222
- }
4223
-
4224
- size += WriteBatchInternal::ByteSize (w->batch );
4225
- if (size > max_size) {
4226
- // Do not make batch too big
4227
- break ;
4228
- }
4229
-
4230
- write_batch_group->push_back (w->batch );
4231
- w->in_batch_group = true ;
4232
- *last_writer = w;
4233
- }
4234
- }
4235
-
4236
4082
// REQUIRES: mutex_ is held
4237
4083
// REQUIRES: this thread is currently at the front of the writer queue
4238
4084
void DBImpl::DelayWrite (uint64_t expiration_time) {
0 commit comments