Skip to content

Commit 9f1c80b

Browse files
committed
Drop column family from write thread
Summary: If we drop column family only from (single) write thread, we can be sure that nobody will drop the column family while we're writing (and our mutex is released). This greatly simplifies my patch that's getting rid of MakeRoomForWrite(). Test Plan: make check, but also running stress test Reviewers: ljin, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D22965
1 parent 8de151b commit 9f1c80b

File tree

4 files changed

+97
-22
lines changed

4 files changed

+97
-22
lines changed

db/db_impl.cc

+23-21
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,6 @@ const std::string kDefaultColumnFamilyName("default");
7777

7878
void DumpLeveldbBuildVersion(Logger * log);
7979

80-
// Information kept for every waiting writer
81-
struct DBImpl::Writer {
82-
Status status;
83-
WriteBatch* batch;
84-
bool sync;
85-
bool disableWAL;
86-
bool in_batch_group;
87-
bool done;
88-
uint64_t timeout_hint_us;
89-
port::CondVar cv;
90-
91-
explicit Writer(port::Mutex* mu) : cv(mu) { }
92-
};
93-
9480
struct DBImpl::WriteContext {
9581
autovector<SuperVersion*> superversions_to_free_;
9682
autovector<log::Writer*> logs_to_free_;
@@ -3627,14 +3613,26 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
36273613
edit.DropColumnFamily();
36283614
edit.SetColumnFamily(cfd->GetID());
36293615

3616+
Writer w(&mutex_);
3617+
w.batch = nullptr;
3618+
w.sync = false;
3619+
w.disableWAL = false;
3620+
w.in_batch_group = false;
3621+
w.done = false;
3622+
w.timeout_hint_us = kNoTimeOut;
3623+
36303624
Status s;
36313625
{
36323626
MutexLock l(&mutex_);
36333627
if (cfd->IsDropped()) {
36343628
s = Status::InvalidArgument("Column family already dropped!\n");
36353629
}
36363630
if (s.ok()) {
3631+
// we drop column family from a single write thread
3632+
s = BeginWrite(&w, 0);
3633+
assert(s.ok() && !w.done); // No timeout and nobody should do our job
36373634
s = versions_->LogAndApply(cfd, &edit, &mutex_);
3635+
EndWrite(&w, &w, s);
36383636
}
36393637
}
36403638

@@ -4173,15 +4171,19 @@ void DBImpl::BuildBatchGroup(Writer** last_writer,
41734171
break;
41744172
}
41754173

4176-
if (w->batch != nullptr) {
4177-
size += WriteBatchInternal::ByteSize(w->batch);
4178-
if (size > max_size) {
4179-
// Do not make batch too big
4180-
break;
4181-
}
4174+
if (w->batch == nullptr) {
4175+
// Do not include those writes with nullptr batch. Those are not writes,
4176+
// those are something else. They want to be alone
4177+
break;
4178+
}
41824179

4183-
write_batch_group->push_back(w->batch);
4180+
size += WriteBatchInternal::ByteSize(w->batch);
4181+
if (size > max_size) {
4182+
// Do not make batch too big
4183+
break;
41844184
}
4185+
4186+
write_batch_group->push_back(w->batch);
41854187
w->in_batch_group = true;
41864188
*last_writer = w;
41874189
}

db/db_impl.h

+26-1
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,17 @@ class DBImpl : public DB {
203203
SequenceNumber* sequence);
204204

205205
Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
206+
207+
void TEST_LockMutex();
208+
209+
void TEST_UnlockMutex();
210+
211+
// REQUIRES: mutex locked
212+
void* TEST_BeginWrite();
213+
214+
// REQUIRES: mutex locked
215+
// pass the pointer that you got from TEST_BeginWrite()
216+
void TEST_EndWrite(void* w);
206217
#endif // NDEBUG
207218

208219
// Structure to store information for candidate files to delete.
@@ -309,7 +320,7 @@ class DBImpl : public DB {
309320
#endif
310321
friend struct SuperVersion;
311322
struct CompactionState;
312-
struct Writer;
323+
313324
struct WriteContext;
314325

315326
Status NewDB();
@@ -349,6 +360,20 @@ class DBImpl : public DB {
349360

350361
uint64_t SlowdownAmount(int n, double bottom, double top);
351362

363+
// Information kept for every waiting writer
364+
struct Writer {
365+
Status status;
366+
WriteBatch* batch;
367+
bool sync;
368+
bool disableWAL;
369+
bool in_batch_group;
370+
bool done;
371+
uint64_t timeout_hint_us;
372+
port::CondVar cv;
373+
374+
explicit Writer(port::Mutex* mu) : cv(mu) {}
375+
};
376+
352377
// Before applying write operation (such as DBImpl::Write, DBImpl::Flush)
353378
// thread should grab the mutex_ and be the first on writers queue.
354379
// BeginWrite is used for it.

db/db_impl_debug.cc

+27
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,32 @@ Status DBImpl::TEST_ReadFirstLine(const std::string& fname,
130130
SequenceNumber* sequence) {
131131
return ReadFirstLine(fname, sequence);
132132
}
133+
134+
void DBImpl::TEST_LockMutex() {
135+
mutex_.Lock();
136+
}
137+
138+
void DBImpl::TEST_UnlockMutex() {
139+
mutex_.Unlock();
140+
}
141+
142+
void* DBImpl::TEST_BeginWrite() {
143+
auto w = new Writer(&mutex_);
144+
w->batch = nullptr;
145+
w->sync = false;
146+
w->disableWAL = false;
147+
w->in_batch_group = false;
148+
w->done = false;
149+
w->timeout_hint_us = kNoTimeOut;
150+
Status s = BeginWrite(w, 0);
151+
assert(s.ok() && !w->done); // No timeout and nobody should do our job
152+
return reinterpret_cast<void*>(w);
153+
}
154+
155+
void DBImpl::TEST_EndWrite(void* w) {
156+
auto writer = reinterpret_cast<Writer*>(w);
157+
EndWrite(writer, writer, Status::OK());
158+
}
159+
133160
} // namespace rocksdb
134161
#endif // ROCKSDB_LITE

db/db_test.cc

+21
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <iostream>
1212
#include <set>
1313
#include <unistd.h>
14+
#include <thread>
1415
#include <unordered_set>
1516
#include <utility>
1617

@@ -7894,6 +7895,26 @@ TEST(DBTest, DBIteratorBoundTest) {
78947895
}
78957896
}
78967897

7898+
TEST(DBTest, WriteSingleThreadEntry) {
7899+
std::vector<std::thread> threads;
7900+
dbfull()->TEST_LockMutex();
7901+
auto w = dbfull()->TEST_BeginWrite();
7902+
threads.emplace_back([&] { Put("a", "b"); });
7903+
env_->SleepForMicroseconds(10000);
7904+
threads.emplace_back([&] { Flush(); });
7905+
env_->SleepForMicroseconds(10000);
7906+
dbfull()->TEST_UnlockMutex();
7907+
dbfull()->TEST_LockMutex();
7908+
dbfull()->TEST_EndWrite(w);
7909+
dbfull()->TEST_UnlockMutex();
7910+
7911+
for (auto& t : threads) {
7912+
t.join();
7913+
}
7914+
}
7915+
7916+
7917+
78977918
} // namespace rocksdb
78987919

78997920
int main(int argc, char** argv) {

0 commit comments

Comments
 (0)