Skip to content

Commit 01dcef1

Browse files
committed
Env to add a function to allow users to query waiting queue length
Summary: Add a function to Env so that users can query the waiting queue length of each thread pool Test Plan: add a test in env_test Reviewers: haobo Reviewed By: haobo CC: dhruba, igor, yhchiang, ljin, nkg-, leveldb Differential Revision: https://reviews.facebook.net/D16755
1 parent d5de22d commit 01dcef1

File tree

5 files changed

+51
-6
lines changed

5 files changed

+51
-6
lines changed

HISTORY.md

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* Added new option -- verify_checksums_in_compaction
1313
* Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership)
1414
Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly)
15+
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
1516

1617
### New Features
1718
* If we find one truncated record at the end of the MANIFEST or WAL files,

hdfs/env_hdfs.h

+9
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ class HdfsEnv : public Env {
110110

111111
virtual void WaitForJoin() { posixEnv->WaitForJoin(); }
112112

113+
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const
114+
override {
115+
return posixEnv->GetThreadPoolQueueLen(pri);
116+
}
117+
113118
virtual Status GetTestDirectory(std::string* path) {
114119
return posixEnv->GetTestDirectory(path);
115120
}
@@ -292,6 +297,10 @@ class HdfsEnv : public Env {
292297

293298
virtual void WaitForJoin() {}
294299

300+
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const {
301+
return 0;
302+
}
303+
295304
virtual Status GetTestDirectory(std::string* path) {return notsup;}
296305

297306
virtual uint64_t NowMicros() {return 0;}

include/rocksdb/env.h

+8
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ class Env {
210210
// Wait for all threads started by StartThread to terminate.
211211
virtual void WaitForJoin() = 0;
212212

213+
// Get thread pool queue length for specific thrad pool.
214+
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const {
215+
return 0;
216+
}
217+
213218
// *path is set to a temporary directory that can be used for testing. It may
214219
// or many not have just been created. The directory may or may not differ
215220
// between runs of the same process, but subsequent calls will return the
@@ -702,6 +707,9 @@ class EnvWrapper : public Env {
702707
return target_->StartThread(f, a);
703708
}
704709
void WaitForJoin() { return target_->WaitForJoin(); }
710+
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const {
711+
return target_->GetThreadPoolQueueLen(pri);
712+
}
705713
virtual Status GetTestDirectory(std::string* path) {
706714
return target_->GetTestDirectory(path);
707715
}

util/env_posix.cc

+20-6
Original file line numberDiff line numberDiff line change
@@ -1206,6 +1206,8 @@ class PosixEnv : public Env {
12061206

12071207
virtual void WaitForJoin();
12081208

1209+
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
1210+
12091211
virtual Status GetTestDirectory(std::string* result) {
12101212
const char* env = getenv("TEST_TMPDIR");
12111213
if (env && env[0] != '\0') {
@@ -1370,12 +1372,12 @@ class PosixEnv : public Env {
13701372

13711373
class ThreadPool {
13721374
public:
1373-
1374-
ThreadPool() :
1375-
total_threads_limit_(1),
1376-
bgthreads_(0),
1377-
queue_(),
1378-
exit_all_threads_(false) {
1375+
ThreadPool()
1376+
: total_threads_limit_(1),
1377+
bgthreads_(0),
1378+
queue_(),
1379+
queue_len_(0),
1380+
exit_all_threads_(false) {
13791381
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
13801382
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
13811383
}
@@ -1405,6 +1407,7 @@ class PosixEnv : public Env {
14051407
void (*function)(void*) = queue_.front().function;
14061408
void* arg = queue_.front().arg;
14071409
queue_.pop_front();
1410+
queue_len_.store(queue_.size(), std::memory_order_relaxed);
14081411

14091412
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
14101413
(*function)(arg);
@@ -1459,13 +1462,18 @@ class PosixEnv : public Env {
14591462
queue_.push_back(BGItem());
14601463
queue_.back().function = function;
14611464
queue_.back().arg = arg;
1465+
queue_len_.store(queue_.size(), std::memory_order_relaxed);
14621466

14631467
// always wake up at least one waiting thread.
14641468
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
14651469

14661470
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
14671471
}
14681472

1473+
unsigned int GetQueueLen() const {
1474+
return queue_len_.load(std::memory_order_relaxed);
1475+
}
1476+
14691477
private:
14701478
// Entry per Schedule() call
14711479
struct BGItem { void* arg; void (*function)(void*); };
@@ -1476,6 +1484,7 @@ class PosixEnv : public Env {
14761484
int total_threads_limit_;
14771485
std::vector<pthread_t> bgthreads_;
14781486
BGQueue queue_;
1487+
std::atomic_uint queue_len_; // Queue length. Used for stats reporting
14791488
bool exit_all_threads_;
14801489
};
14811490

@@ -1498,6 +1507,11 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) {
14981507
thread_pools_[pri].Schedule(function, arg);
14991508
}
15001509

1510+
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
1511+
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
1512+
return thread_pools_[pri].GetQueueLen();
1513+
}
1514+
15011515
namespace {
15021516
struct StartThreadState {
15031517
void (*user_function)(void*);

util/env_test.cc

+13
Original file line numberDiff line numberDiff line change
@@ -172,17 +172,30 @@ TEST(EnvPosixTest, TwoPools) {
172172
env_->SetBackgroundThreads(kLowPoolSize);
173173
env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH);
174174

175+
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
176+
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
177+
175178
// schedule same number of jobs in each pool
176179
for (int i = 0; i < kJobs; i++) {
177180
env_->Schedule(&CB::Run, &low_pool_job);
178181
env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH);
179182
}
183+
// Wait a short while for the jobs to be dispatched.
184+
Env::Default()->SleepForMicroseconds(kDelayMicros);
185+
ASSERT_EQ(kJobs - kLowPoolSize, env_->GetThreadPoolQueueLen());
186+
ASSERT_EQ(kJobs - kLowPoolSize,
187+
env_->GetThreadPoolQueueLen(Env::Priority::LOW));
188+
ASSERT_EQ(kJobs - kHighPoolSize,
189+
env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
180190

181191
// wait for all jobs to finish
182192
while (low_pool_job.NumFinished() < kJobs ||
183193
high_pool_job.NumFinished() < kJobs) {
184194
env_->SleepForMicroseconds(kDelayMicros);
185195
}
196+
197+
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
198+
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
186199
}
187200

188201
bool IsSingleVarint(const std::string& s) {

0 commit comments

Comments
 (0)