Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine the read-ahead logic for user reads and compaction reads #5431

Closed
wants to merge 17 commits into from
Closed
21 changes: 8 additions & 13 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,14 +497,14 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {

// Create new iterator for:
// (1) 1 for verifying flush results
// (2) 3 for compaction input files
// (3) 1 for verifying compaction results.
ASSERT_EQ(num_new_table_reader, 5);
// (2) 1 for verifying compaction results.
// (3) New TableReaders will not be created for compaction inputs
ASSERT_EQ(num_new_table_reader, 2);

num_table_cache_lookup = 0;
num_new_table_reader = 0;
ASSERT_EQ(Key(1), Get(Key(1)));
ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 5);
ASSERT_EQ(num_new_table_reader, 0);

num_table_cache_lookup = 0;
Expand All @@ -519,13 +519,14 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
// May preload table cache too.
ASSERT_GE(num_table_cache_lookup, 1);
old_num_table_cache_lookup2 = num_table_cache_lookup;
// One for compaction input, one for verifying compaction results.
ASSERT_EQ(num_new_table_reader, 2);
// One for verifying compaction results.
// No new iterator created for compaction.
ASSERT_EQ(num_new_table_reader, 1);

num_table_cache_lookup = 0;
num_new_table_reader = 0;
ASSERT_EQ(Key(1), Get(Key(1)));
ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 2);
ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
ASSERT_EQ(num_new_table_reader, 0);

rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
Expand Down Expand Up @@ -4339,12 +4340,6 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
options.env = new MockEnv(Env::Default());
Reopen(options);
bool readahead = false;
SyncPoint::GetInstance()->SetCallBack(
"TableCache::NewIterator:for_compaction", [&](void* arg) {
bool* use_direct_reads = static_cast<bool*>(arg);
ASSERT_EQ(*use_direct_reads,
options.use_direct_reads);
});
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
bool* use_direct_writes = static_cast<bool*>(arg);
Expand Down
76 changes: 16 additions & 60 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "monitoring/perf_context_imp.h"
#include "rocksdb/statistics.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/get_context.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
Expand Down Expand Up @@ -43,13 +44,6 @@ static void UnrefEntry(void* arg1, void* arg2) {
cache->Release(h);
}

static void DeleteTableReader(void* arg1, void* arg2) {
TableReader* table_reader = reinterpret_cast<TableReader*>(arg1);
Statistics* stats = reinterpret_cast<Statistics*>(arg2);
RecordTick(stats, NO_FILE_CLOSES);
delete table_reader;
}

static Slice GetSliceForFileNumber(const uint64_t* file_number) {
return Slice(reinterpret_cast<const char*>(file_number),
sizeof(*file_number));
Expand Down Expand Up @@ -96,8 +90,8 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) {
Status TableCache::GetTableReader(
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
bool sequential_mode, size_t readahead, bool record_read_stats,
HistogramImpl* file_read_hist, std::unique_ptr<TableReader>* table_reader,
bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
std::unique_ptr<TableReader>* table_reader,
const SliceTransform* prefix_extractor, bool skip_filters, int level,
bool prefetch_index_and_filter_in_cache, bool for_compaction) {
std::string fname =
Expand All @@ -107,13 +101,6 @@ Status TableCache::GetTableReader(

RecordTick(ioptions_.statistics, NO_FILE_OPENS);
if (s.ok()) {
if (readahead > 0 && !env_options.use_mmap_reads) {
// Not compatible with mmap files since ReadaheadRandomAccessFile requires
// its wrapped file's Read() to copy data into the provided scratch
// buffer, which mmap files don't use.
// TODO(ajkr): try madvise for mmap files in place of buffered readahead.
file = NewReadaheadRandomAccessFile(std::move(file), readahead);
}
if (!sequential_mode && ioptions_.advise_random_on_open) {
file->Hint(RandomAccessFile::RANDOM);
}
Expand Down Expand Up @@ -164,10 +151,9 @@ Status TableCache::FindTable(const EnvOptions& env_options,
}
std::unique_ptr<TableReader> table_reader;
s = GetTableReader(env_options, internal_comparator, fd,
false /* sequential mode */, 0 /* readahead */,
record_read_stats, file_read_hist, &table_reader,
prefix_extractor, skip_filters, level,
prefetch_index_and_filter_in_cache);
false /* sequential mode */, record_read_stats,
file_read_hist, &table_reader, prefix_extractor,
skip_filters, level, prefetch_index_and_filter_in_cache);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
Expand Down Expand Up @@ -196,48 +182,21 @@ InternalIterator* TableCache::NewIterator(
PERF_TIMER_GUARD(new_table_iterator_nanos);

Status s;
bool create_new_table_reader = false;
TableReader* table_reader = nullptr;
Cache::Handle* handle = nullptr;
if (table_reader_ptr != nullptr) {
*table_reader_ptr = nullptr;
}
size_t readahead = 0;
if (for_compaction) {
#ifndef NDEBUG
bool use_direct_reads_for_compaction = env_options.use_direct_reads;
TEST_SYNC_POINT_CALLBACK("TableCache::NewIterator:for_compaction",
&use_direct_reads_for_compaction);
#endif // !NDEBUG
if (ioptions_.new_table_reader_for_compaction_inputs) {
// get compaction_readahead_size from env_options allows us to set the
// value dynamically
readahead = env_options.compaction_readahead_size;
create_new_table_reader = true;
}
}

auto& fd = file_meta.fd;
if (create_new_table_reader) {
std::unique_ptr<TableReader> table_reader_unique_ptr;
s = GetTableReader(
env_options, icomparator, fd, true /* sequential_mode */, readahead,
!for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
prefix_extractor, false /* skip_filters */, level,
true /* prefetch_index_and_filter_in_cache */, for_compaction);
table_reader = fd.table_reader;
if (table_reader == nullptr) {
s = FindTable(env_options, icomparator, fd, &handle, prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
!for_compaction /* record read_stats */, file_read_hist,
skip_filters, level);
if (s.ok()) {
table_reader = table_reader_unique_ptr.release();
}
} else {
table_reader = fd.table_reader;
if (table_reader == nullptr) {
s = FindTable(env_options, icomparator, fd, &handle, prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
!for_compaction /* record read_stats */, file_read_hist,
skip_filters, level);
if (s.ok()) {
table_reader = GetTableReaderFromHandle(handle);
}
table_reader = GetTableReaderFromHandle(handle);
}
}
InternalIterator* result = nullptr;
Expand All @@ -247,13 +206,10 @@ InternalIterator* TableCache::NewIterator(
result = NewEmptyInternalIterator<Slice>(arena);
} else {
result = table_reader->NewIterator(options, prefix_extractor, arena,
skip_filters, for_compaction);
skip_filters, for_compaction,
env_options.compaction_readahead_size);
}
if (create_new_table_reader) {
assert(handle == nullptr);
result->RegisterCleanup(&DeleteTableReader, table_reader,
ioptions_.statistics);
} else if (handle != nullptr) {
if (handle != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
handle = nullptr; // prevent from releasing below
}
Expand Down
3 changes: 1 addition & 2 deletions db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ class TableCache {
Status GetTableReader(const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, bool sequential_mode,
size_t readahead, bool record_read_stats,
HistogramImpl* file_read_hist,
bool record_read_stats, HistogramImpl* file_read_hist,
std::unique_ptr<TableReader>* table_reader,
const SliceTransform* prefix_extractor = nullptr,
bool skip_filters = false, int level = -1,
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,8 @@ struct DBOptions {
// for this mode if using block-based table.
//
// Default: false
// This flag has no affect on the behavior of compaction and plan to delete
// in the future.
bool new_table_reader_for_compaction_inputs = false;

// If non-zero, we perform bigger reads when doing compaction. If you're
Expand Down
39 changes: 25 additions & 14 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ Status ReadBlockFromFile(
bool do_uncompress, bool maybe_compressed, BlockType block_type,
const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator) {
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
bool for_compaction = false) {
BlockContents contents;
BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
&contents, ioptions, do_uncompress,
maybe_compressed, block_type, uncompression_dict,
cache_options, memory_allocator);
BlockFetcher block_fetcher(
file, prefetch_buffer, footer, options, handle, &contents, ioptions,
do_uncompress, maybe_compressed, block_type, uncompression_dict,
cache_options, memory_allocator, nullptr, for_compaction);
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno,
Expand Down Expand Up @@ -1906,7 +1907,7 @@ CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(

if (!is_a_filter_partition && rep_->filter_entry.IsCached()) {
return {rep_->filter_entry.GetValue(), /*cache=*/nullptr,
/*cache_handle=*/nullptr, /*own_value=*/false};
/*cache_handle=*/nullptr, /*own_value=*/false};
}

PERF_TIMER_GUARD(read_filter_block_nanos);
Expand Down Expand Up @@ -2075,7 +2076,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
BlockType block_type, bool key_includes_seq, bool index_key_is_full,
GetContext* get_context, BlockCacheLookupContext* lookup_context, Status s,
FilePrefetchBuffer* prefetch_buffer) const {
FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const {
PERF_TIMER_GUARD(new_table_block_iter_nanos);

TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
Expand All @@ -2094,7 +2095,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(

CachableEntry<Block> block;
s = RetrieveBlock(prefetch_buffer, ro, handle, uncompression_dict, &block,
block_type, get_context, lookup_context);
block_type, get_context, lookup_context, for_compaction);

if (!s.ok()) {
assert(block.IsEmpty());
Expand Down Expand Up @@ -2144,6 +2145,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
s = block_cache->Insert(unique_key, nullptr,
block.GetValue()->ApproximateMemoryUsage(),
nullptr, &cache_handle);

if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
Expand Down Expand Up @@ -2297,7 +2299,8 @@ Status BlockBasedTable::RetrieveBlock(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context) const {
GetContext* get_context, BlockCacheLookupContext* lookup_context,
bool for_compaction) const {
assert(block_entry);
assert(block_entry->IsEmpty());

Expand Down Expand Up @@ -2340,7 +2343,7 @@ Status BlockBasedTable::RetrieveBlock(
block_type == BlockType::kData
? rep_->table_options.read_amp_bytes_per_bit
: 0,
GetMemoryAllocator(rep_->table_options));
GetMemoryAllocator(rep_->table_options), for_compaction);
}

if (!s.ok()) {
Expand Down Expand Up @@ -2714,13 +2717,18 @@ void BlockBasedTableIterator<TBlockIter, TValue>::InitDataBlock() {
rep->file.get(), read_options_.readahead_size,
read_options_.readahead_size));
}
} else if (!prefetch_buffer_) {
prefetch_buffer_.reset(
new FilePrefetchBuffer(rep->file.get(), compaction_readahead_size_,
compaction_readahead_size_));
}

Status s;
table_->NewDataBlockIterator<TBlockIter>(
read_options_, data_block_handle, &block_iter_, block_type_,
key_includes_seq_, index_key_is_full_,
/*get_context=*/nullptr, &lookup_context_, s, prefetch_buffer_.get());
/*get_context=*/nullptr, &lookup_context_, s, prefetch_buffer_.get(),
for_compaction_);
block_iter_points_to_real_block_ = true;
}
}
Expand Down Expand Up @@ -2806,7 +2814,8 @@ void BlockBasedTableIterator<TBlockIter, TValue>::CheckOutOfBound() {

InternalIterator* BlockBasedTable::NewIterator(
const ReadOptions& read_options, const SliceTransform* prefix_extractor,
Arena* arena, bool skip_filters, bool for_compaction) {
Arena* arena, bool skip_filters, bool for_compaction,
size_t compaction_readahead_size) {
BlockCacheLookupContext lookup_context{
for_compaction ? BlockCacheLookupCaller::kCompaction
: BlockCacheLookupCaller::kUserIterator};
Expand All @@ -2823,7 +2832,8 @@ InternalIterator* BlockBasedTable::NewIterator(
!skip_filters && !read_options.total_order_seek &&
prefix_extractor != nullptr,
need_upper_bound_check, prefix_extractor, BlockType::kData,
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction);
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction,
compaction_readahead_size);
} else {
auto* mem =
arena->AllocateAligned(sizeof(BlockBasedTableIterator<DataBlockIter>));
Expand All @@ -2835,7 +2845,8 @@ InternalIterator* BlockBasedTable::NewIterator(
!skip_filters && !read_options.total_order_seek &&
prefix_extractor != nullptr,
need_upper_bound_check, prefix_extractor, BlockType::kData,
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction);
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction,
compaction_readahead_size);
}
}

Expand Down
Loading