Skip to content

Commit 46a2afc

Browse files
vjnadimpallifacebook-github-bot
authored andcommitted
Combine the read-ahead logic for user reads and compaction reads (facebook#5431)
Summary: Currently the read-ahead logic for user reads and compaction reads go through different code paths where compaction reads create new table readers and use `ReadaheadRandomAccessFile`. This change is to unify read-ahead logic to use read-ahead in BlockBasedTableReader::InitDataBlock(). As a result of the change `ReadAheadRandomAccessFile` class and `new_table_reader_for_compaction_inputs` option will no longer be used. Pull Request resolved: facebook#5431 Test Plan: make check Here is the benchmarking - https://gist.github.com/vjnadimpalli/083cf423f7b6aa12dcdb14c858bc18a5 Differential Revision: D15772533 Pulled By: vjnadimpalli fbshipit-source-id: b71dca710590471ede6fb37553388654e2e479b9
1 parent ba4f884 commit 46a2afc

19 files changed

+124
-126
lines changed

db/db_compaction_test.cc

+8-13
Original file line numberDiff line numberDiff line change
@@ -497,14 +497,14 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
497497

498498
// Create new iterator for:
499499
// (1) 1 for verifying flush results
500-
// (2) 3 for compaction input files
501-
// (3) 1 for verifying compaction results.
502-
ASSERT_EQ(num_new_table_reader, 5);
500+
// (2) 1 for verifying compaction results.
501+
// (3) New TableReaders will not be created for compaction inputs
502+
ASSERT_EQ(num_new_table_reader, 2);
503503

504504
num_table_cache_lookup = 0;
505505
num_new_table_reader = 0;
506506
ASSERT_EQ(Key(1), Get(Key(1)));
507-
ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
507+
ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 5);
508508
ASSERT_EQ(num_new_table_reader, 0);
509509

510510
num_table_cache_lookup = 0;
@@ -519,13 +519,14 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
519519
// May preload table cache too.
520520
ASSERT_GE(num_table_cache_lookup, 1);
521521
old_num_table_cache_lookup2 = num_table_cache_lookup;
522-
// One for compaction input, one for verifying compaction results.
523-
ASSERT_EQ(num_new_table_reader, 2);
522+
// One for verifying compaction results.
523+
// No new iterator created for compaction.
524+
ASSERT_EQ(num_new_table_reader, 1);
524525

525526
num_table_cache_lookup = 0;
526527
num_new_table_reader = 0;
527528
ASSERT_EQ(Key(1), Get(Key(1)));
528-
ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 2);
529+
ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
529530
ASSERT_EQ(num_new_table_reader, 0);
530531

531532
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
@@ -4339,12 +4340,6 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
43394340
options.env = new MockEnv(Env::Default());
43404341
Reopen(options);
43414342
bool readahead = false;
4342-
SyncPoint::GetInstance()->SetCallBack(
4343-
"TableCache::NewIterator:for_compaction", [&](void* arg) {
4344-
bool* use_direct_reads = static_cast<bool*>(arg);
4345-
ASSERT_EQ(*use_direct_reads,
4346-
options.use_direct_reads);
4347-
});
43484343
SyncPoint::GetInstance()->SetCallBack(
43494344
"CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
43504345
bool* use_direct_writes = static_cast<bool*>(arg);

db/table_cache.cc

+16-60
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "monitoring/perf_context_imp.h"
1818
#include "rocksdb/statistics.h"
19+
#include "table/block_based/block_based_table_reader.h"
1920
#include "table/get_context.h"
2021
#include "table/internal_iterator.h"
2122
#include "table/iterator_wrapper.h"
@@ -43,13 +44,6 @@ static void UnrefEntry(void* arg1, void* arg2) {
4344
cache->Release(h);
4445
}
4546

46-
static void DeleteTableReader(void* arg1, void* arg2) {
47-
TableReader* table_reader = reinterpret_cast<TableReader*>(arg1);
48-
Statistics* stats = reinterpret_cast<Statistics*>(arg2);
49-
RecordTick(stats, NO_FILE_CLOSES);
50-
delete table_reader;
51-
}
52-
5347
static Slice GetSliceForFileNumber(const uint64_t* file_number) {
5448
return Slice(reinterpret_cast<const char*>(file_number),
5549
sizeof(*file_number));
@@ -96,8 +90,8 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) {
9690
Status TableCache::GetTableReader(
9791
const EnvOptions& env_options,
9892
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
99-
bool sequential_mode, size_t readahead, bool record_read_stats,
100-
HistogramImpl* file_read_hist, std::unique_ptr<TableReader>* table_reader,
93+
bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
94+
std::unique_ptr<TableReader>* table_reader,
10195
const SliceTransform* prefix_extractor, bool skip_filters, int level,
10296
bool prefetch_index_and_filter_in_cache, bool for_compaction) {
10397
std::string fname =
@@ -107,13 +101,6 @@ Status TableCache::GetTableReader(
107101

108102
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
109103
if (s.ok()) {
110-
if (readahead > 0 && !env_options.use_mmap_reads) {
111-
// Not compatible with mmap files since ReadaheadRandomAccessFile requires
112-
// its wrapped file's Read() to copy data into the provided scratch
113-
// buffer, which mmap files don't use.
114-
// TODO(ajkr): try madvise for mmap files in place of buffered readahead.
115-
file = NewReadaheadRandomAccessFile(std::move(file), readahead);
116-
}
117104
if (!sequential_mode && ioptions_.advise_random_on_open) {
118105
file->Hint(RandomAccessFile::RANDOM);
119106
}
@@ -164,10 +151,9 @@ Status TableCache::FindTable(const EnvOptions& env_options,
164151
}
165152
std::unique_ptr<TableReader> table_reader;
166153
s = GetTableReader(env_options, internal_comparator, fd,
167-
false /* sequential mode */, 0 /* readahead */,
168-
record_read_stats, file_read_hist, &table_reader,
169-
prefix_extractor, skip_filters, level,
170-
prefetch_index_and_filter_in_cache);
154+
false /* sequential mode */, record_read_stats,
155+
file_read_hist, &table_reader, prefix_extractor,
156+
skip_filters, level, prefetch_index_and_filter_in_cache);
171157
if (!s.ok()) {
172158
assert(table_reader == nullptr);
173159
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
@@ -196,48 +182,21 @@ InternalIterator* TableCache::NewIterator(
196182
PERF_TIMER_GUARD(new_table_iterator_nanos);
197183

198184
Status s;
199-
bool create_new_table_reader = false;
200185
TableReader* table_reader = nullptr;
201186
Cache::Handle* handle = nullptr;
202187
if (table_reader_ptr != nullptr) {
203188
*table_reader_ptr = nullptr;
204189
}
205-
size_t readahead = 0;
206-
if (for_compaction) {
207-
#ifndef NDEBUG
208-
bool use_direct_reads_for_compaction = env_options.use_direct_reads;
209-
TEST_SYNC_POINT_CALLBACK("TableCache::NewIterator:for_compaction",
210-
&use_direct_reads_for_compaction);
211-
#endif // !NDEBUG
212-
if (ioptions_.new_table_reader_for_compaction_inputs) {
213-
// get compaction_readahead_size from env_options allows us to set the
214-
// value dynamically
215-
readahead = env_options.compaction_readahead_size;
216-
create_new_table_reader = true;
217-
}
218-
}
219190

220191
auto& fd = file_meta.fd;
221-
if (create_new_table_reader) {
222-
std::unique_ptr<TableReader> table_reader_unique_ptr;
223-
s = GetTableReader(
224-
env_options, icomparator, fd, true /* sequential_mode */, readahead,
225-
!for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
226-
prefix_extractor, false /* skip_filters */, level,
227-
true /* prefetch_index_and_filter_in_cache */, for_compaction);
192+
table_reader = fd.table_reader;
193+
if (table_reader == nullptr) {
194+
s = FindTable(env_options, icomparator, fd, &handle, prefix_extractor,
195+
options.read_tier == kBlockCacheTier /* no_io */,
196+
!for_compaction /* record read_stats */, file_read_hist,
197+
skip_filters, level);
228198
if (s.ok()) {
229-
table_reader = table_reader_unique_ptr.release();
230-
}
231-
} else {
232-
table_reader = fd.table_reader;
233-
if (table_reader == nullptr) {
234-
s = FindTable(env_options, icomparator, fd, &handle, prefix_extractor,
235-
options.read_tier == kBlockCacheTier /* no_io */,
236-
!for_compaction /* record read_stats */, file_read_hist,
237-
skip_filters, level);
238-
if (s.ok()) {
239-
table_reader = GetTableReaderFromHandle(handle);
240-
}
199+
table_reader = GetTableReaderFromHandle(handle);
241200
}
242201
}
243202
InternalIterator* result = nullptr;
@@ -247,13 +206,10 @@ InternalIterator* TableCache::NewIterator(
247206
result = NewEmptyInternalIterator<Slice>(arena);
248207
} else {
249208
result = table_reader->NewIterator(options, prefix_extractor, arena,
250-
skip_filters, for_compaction);
209+
skip_filters, for_compaction,
210+
env_options.compaction_readahead_size);
251211
}
252-
if (create_new_table_reader) {
253-
assert(handle == nullptr);
254-
result->RegisterCleanup(&DeleteTableReader, table_reader,
255-
ioptions_.statistics);
256-
} else if (handle != nullptr) {
212+
if (handle != nullptr) {
257213
result->RegisterCleanup(&UnrefEntry, cache_, handle);
258214
handle = nullptr; // prevent from releasing below
259215
}

db/table_cache.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,7 @@ class TableCache {
177177
Status GetTableReader(const EnvOptions& env_options,
178178
const InternalKeyComparator& internal_comparator,
179179
const FileDescriptor& fd, bool sequential_mode,
180-
size_t readahead, bool record_read_stats,
181-
HistogramImpl* file_read_hist,
180+
bool record_read_stats, HistogramImpl* file_read_hist,
182181
std::unique_ptr<TableReader>* table_reader,
183182
const SliceTransform* prefix_extractor = nullptr,
184183
bool skip_filters = false, int level = -1,

include/rocksdb/options.h

+2
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,8 @@ struct DBOptions {
760760
// for this mode if using block-based table.
761761
//
762762
// Default: false
763+
// This flag has no affect on the behavior of compaction and plan to delete
764+
// in the future.
763765
bool new_table_reader_for_compaction_inputs = false;
764766

765767
// If non-zero, we perform bigger reads when doing compaction. If you're

table/block_based/block_based_table_reader.cc

+25-14
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,13 @@ Status ReadBlockFromFile(
8383
bool do_uncompress, bool maybe_compressed, BlockType block_type,
8484
const UncompressionDict& uncompression_dict,
8585
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
86-
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator) {
86+
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
87+
bool for_compaction = false) {
8788
BlockContents contents;
88-
BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
89-
&contents, ioptions, do_uncompress,
90-
maybe_compressed, block_type, uncompression_dict,
91-
cache_options, memory_allocator);
89+
BlockFetcher block_fetcher(
90+
file, prefetch_buffer, footer, options, handle, &contents, ioptions,
91+
do_uncompress, maybe_compressed, block_type, uncompression_dict,
92+
cache_options, memory_allocator, nullptr, for_compaction);
9293
Status s = block_fetcher.ReadBlockContents();
9394
if (s.ok()) {
9495
result->reset(new Block(std::move(contents), global_seqno,
@@ -1906,7 +1907,7 @@ CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
19061907

19071908
if (!is_a_filter_partition && rep_->filter_entry.IsCached()) {
19081909
return {rep_->filter_entry.GetValue(), /*cache=*/nullptr,
1909-
/*cache_handle=*/nullptr, /*own_value=*/false};
1910+
/*cache_handle=*/nullptr, /*own_value=*/false};
19101911
}
19111912

19121913
PERF_TIMER_GUARD(read_filter_block_nanos);
@@ -2075,7 +2076,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
20752076
const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
20762077
BlockType block_type, bool key_includes_seq, bool index_key_is_full,
20772078
GetContext* get_context, BlockCacheLookupContext* lookup_context, Status s,
2078-
FilePrefetchBuffer* prefetch_buffer) const {
2079+
FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const {
20792080
PERF_TIMER_GUARD(new_table_block_iter_nanos);
20802081

20812082
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
@@ -2094,7 +2095,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
20942095

20952096
CachableEntry<Block> block;
20962097
s = RetrieveBlock(prefetch_buffer, ro, handle, uncompression_dict, &block,
2097-
block_type, get_context, lookup_context);
2098+
block_type, get_context, lookup_context, for_compaction);
20982099

20992100
if (!s.ok()) {
21002101
assert(block.IsEmpty());
@@ -2144,6 +2145,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
21442145
s = block_cache->Insert(unique_key, nullptr,
21452146
block.GetValue()->ApproximateMemoryUsage(),
21462147
nullptr, &cache_handle);
2148+
21472149
if (s.ok()) {
21482150
assert(cache_handle != nullptr);
21492151
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
@@ -2297,7 +2299,8 @@ Status BlockBasedTable::RetrieveBlock(
22972299
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
22982300
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
22992301
CachableEntry<Block>* block_entry, BlockType block_type,
2300-
GetContext* get_context, BlockCacheLookupContext* lookup_context) const {
2302+
GetContext* get_context, BlockCacheLookupContext* lookup_context,
2303+
bool for_compaction) const {
23012304
assert(block_entry);
23022305
assert(block_entry->IsEmpty());
23032306

@@ -2340,7 +2343,7 @@ Status BlockBasedTable::RetrieveBlock(
23402343
block_type == BlockType::kData
23412344
? rep_->table_options.read_amp_bytes_per_bit
23422345
: 0,
2343-
GetMemoryAllocator(rep_->table_options));
2346+
GetMemoryAllocator(rep_->table_options), for_compaction);
23442347
}
23452348

23462349
if (!s.ok()) {
@@ -2714,13 +2717,18 @@ void BlockBasedTableIterator<TBlockIter, TValue>::InitDataBlock() {
27142717
rep->file.get(), read_options_.readahead_size,
27152718
read_options_.readahead_size));
27162719
}
2720+
} else if (!prefetch_buffer_) {
2721+
prefetch_buffer_.reset(
2722+
new FilePrefetchBuffer(rep->file.get(), compaction_readahead_size_,
2723+
compaction_readahead_size_));
27172724
}
27182725

27192726
Status s;
27202727
table_->NewDataBlockIterator<TBlockIter>(
27212728
read_options_, data_block_handle, &block_iter_, block_type_,
27222729
key_includes_seq_, index_key_is_full_,
2723-
/*get_context=*/nullptr, &lookup_context_, s, prefetch_buffer_.get());
2730+
/*get_context=*/nullptr, &lookup_context_, s, prefetch_buffer_.get(),
2731+
for_compaction_);
27242732
block_iter_points_to_real_block_ = true;
27252733
}
27262734
}
@@ -2806,7 +2814,8 @@ void BlockBasedTableIterator<TBlockIter, TValue>::CheckOutOfBound() {
28062814

28072815
InternalIterator* BlockBasedTable::NewIterator(
28082816
const ReadOptions& read_options, const SliceTransform* prefix_extractor,
2809-
Arena* arena, bool skip_filters, bool for_compaction) {
2817+
Arena* arena, bool skip_filters, bool for_compaction,
2818+
size_t compaction_readahead_size) {
28102819
BlockCacheLookupContext lookup_context{
28112820
for_compaction ? BlockCacheLookupCaller::kCompaction
28122821
: BlockCacheLookupCaller::kUserIterator};
@@ -2823,7 +2832,8 @@ InternalIterator* BlockBasedTable::NewIterator(
28232832
!skip_filters && !read_options.total_order_seek &&
28242833
prefix_extractor != nullptr,
28252834
need_upper_bound_check, prefix_extractor, BlockType::kData,
2826-
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction);
2835+
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction,
2836+
compaction_readahead_size);
28272837
} else {
28282838
auto* mem =
28292839
arena->AllocateAligned(sizeof(BlockBasedTableIterator<DataBlockIter>));
@@ -2835,7 +2845,8 @@ InternalIterator* BlockBasedTable::NewIterator(
28352845
!skip_filters && !read_options.total_order_seek &&
28362846
prefix_extractor != nullptr,
28372847
need_upper_bound_check, prefix_extractor, BlockType::kData,
2838-
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction);
2848+
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction,
2849+
compaction_readahead_size);
28392850
}
28402851
}
28412852

0 commit comments

Comments
 (0)