Skip to content

Commit bf98dcf

Browse files
ajkrfacebook-github-bot
authored andcommitted
Fix kBlockCacheTier read when merge-chain base value is in a blob file (#12462)
Summary: The original goal is to propagate failures from `GetContext::SaveValue()` -> `GetContext::GetBlobValue()` -> `BlobFetcher::FetchBlob()` up to the user. This call sequence happens when a merge chain ends with a base value in a blob file. There's also fixes for bugs encountered along the way where non-ok statuses were ignored/overwritten, and a bit of plumbing work for functions that had no capability to return a status. Pull Request resolved: #12462 Test Plan: A repro command ``` db=/dev/shm/dbstress_db ; exp=/dev/shm/dbstress_exp ; rm -rf $db $exp ; mkdir -p $db $exp ./db_stress \ --clear_column_family_one_in=0 \ --test_batches_snapshots=0 \ --write_fault_one_in=0 \ --use_put_entity_one_in=0 \ --prefixpercent=0 \ --read_fault_one_in=0 \ --readpercent=0 \ --reopen=0 \ --set_options_one_in=10000 \ --delpercent=0 \ --delrangepercent=0 \ --open_metadata_write_fault_one_in=0 \ --open_read_fault_one_in=0 \ --open_write_fault_one_in=0 \ --destroy_db_initially=0 \ --ingest_external_file_one_in=0 \ --iterpercent=0 \ --nooverwritepercent=0 \ --db=$db \ --enable_blob_files=1 \ --expected_values_dir=$exp \ --max_background_compactions=20 \ --max_bytes_for_level_base=2097152 \ --max_key=100000 \ --min_blob_size=0 \ --open_files=-1 \ --ops_per_thread=100000000 \ --prefix_size=-1 \ --target_file_size_base=524288 \ --use_merge=1 \ --value_size_mult=32 \ --write_buffer_size=524288 \ --writepercent=100 ``` It used to fail like: ``` ... frame #9: 0x00007fc63903bc93 libc.so.6`__GI___assert_fail(assertion="HasDefaultColumn(columns)", file="fbcode/internal_repo_rocksdb/repo/db/wide/wide_columns_helper.h", line=33, function="static const rocksdb::Slice &rocksdb::WideColumnsHelper::GetDefaultColumn(const rocksdb::WideColumns &)") at assert.c:101:3 frame #10: 0x00000000006f7e92 db_stress`rocksdb::Version::Get(rocksdb::ReadOptions const&, rocksdb::LookupKey const&, rocksdb::PinnableSlice*, rocksdb::PinnableWideColumns*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>*, rocksdb::Status*, rocksdb::MergeContext*, unsigned long*, rocksdb::PinnedIteratorsManager*, bool*, bool*, unsigned long*, rocksdb::ReadCallback*, bool*, bool) [inlined] rocksdb::WideColumnsHelper::GetDefaultColumn(columns=size=0) at wide_columns_helper.h:33 frame #11: 0x00000000006f7e76 db_stress`rocksdb::Version::Get(this=0x00007fc5ec763000, read_options=<unavailable>, k=<unavailable>, value=0x0000000000000000, columns=0x00007fc6035fd1d8, timestamp=<unavailable>, status=0x00007fc6035fd250, merge_context=0x00007fc6035fce40, max_covering_tombstone_seq=0x00007fc6035fce90, pinned_iters_mgr=0x00007fc6035fcdf0, value_found=0x0000000000000000, key_exists=0x0000000000000000, seq=0x0000000000000000, callback=0x0000000000000000, is_blob=0x0000000000000000, do_merge=<unavailable>) at version_set.cc:2492 frame #12: 0x000000000051e245 db_stress`rocksdb::DBImpl::GetImpl(this=0x00007fc637a86000, read_options=0x00007fc6035fcf60, key=<unavailable>, get_impl_options=0x00007fc6035fd000) at db_impl.cc:2408 frame #13: 0x000000000050cec2 db_stress`rocksdb::DBImpl::GetEntity(this=0x00007fc637a86000, _read_options=<unavailable>, column_family=<unavailable>, key=0x00007fc6035fd3c8, columns=0x00007fc6035fd1d8) at db_impl.cc:2109 frame #14: 0x000000000074f688 db_stress`rocksdb::(anonymous namespace)::MemTableInserter::MergeCF(this=0x00007fc6035fd450, column_family_id=2, key=0x00007fc6035fd3c8, value=0x00007fc6035fd3a0) at write_batch.cc:2656 frame #15: 0x00000000007476fc db_stress`rocksdb::WriteBatchInternal::Iterate(wb=0x00007fc6035fe698, handler=0x00007fc6035fd450, begin=12, end=<unavailable>) at write_batch.cc:607 frame #16: 0x000000000074d7dd db_stress`rocksdb::WriteBatchInternal::InsertInto(rocksdb::WriteThread::WriteGroup&, unsigned long, rocksdb::ColumnFamilyMemTables*, rocksdb::FlushScheduler*, rocksdb::TrimHistoryScheduler*, bool, unsigned long, rocksdb::DB*, bool, bool, bool) [inlined] rocksdb::WriteBatch::Iterate(this=<unavailable>, handler=0x00007fc6035fd450) const at write_batch.cc:505 frame #17: 0x000000000074d77b db_stress`rocksdb::WriteBatchInternal::InsertInto(write_group=<unavailable>, sequence=<unavailable>, memtables=<unavailable>, flush_scheduler=<unavailable>, trim_history_scheduler=<unavailable>, ignore_missing_column_families=<unavailable>, recovery_log_number=0, db=0x00007fc637a86000, concurrent_memtable_writes=<unavailable>, seq_per_batch=false, batch_per_txn=<unavailable>) at write_batch.cc:3084 frame #18: 0x0000000000631d77 db_stress`rocksdb::DBImpl::PipelinedWriteImpl(this=0x00007fc637a86000, write_options=<unavailable>, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=<unavailable>, log_ref=0, disable_memtable=<unavailable>, seq_used=0x0000000000000000) at db_impl_write.cc:807 frame #19: 0x000000000062ceeb db_stress`rocksdb::DBImpl::WriteImpl(this=<unavailable>, write_options=<unavailable>, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=<unavailable>, log_ref=0, disable_memtable=<unavailable>, seq_used=0x0000000000000000, batch_cnt=0, pre_release_callback=0x0000000000000000, post_memtable_callback=0x0000000000000000) at db_impl_write.cc:312 frame #20: 0x000000000062c8ec db_stress`rocksdb::DBImpl::Write(this=0x00007fc637a86000, write_options=0x00007fc6035feca8, my_batch=0x00007fc6035fe698) at db_impl_write.cc:157 frame #21: 0x000000000062b847 db_stress`rocksdb::DB::Merge(this=0x00007fc637a86000, opt=0x00007fc6035feca8, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, value=0x00007fc6035fe830) at db_impl_write.cc:2544 frame #22: 0x000000000062b6ef db_stress`rocksdb::DBImpl::Merge(this=0x00007fc637a86000, o=<unavailable>, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, val=0x00007fc6035fe830) at db_impl_write.cc:72 frame #23: 0x00000000004d6397 db_stress`rocksdb::NonBatchedOpsStressTest::TestPut(this=0x00007fc637041000, thread=0x00007fc6370dbc00, write_opts=0x00007fc6035feca8, read_opts=0x00007fc6035fe9c8, rand_column_families=<unavailable>, rand_keys=size=1, value={P\xe9_\x03\xc6\x7f\0\0}) at no_batched_ops_stress.cc:1317 frame #24: 0x000000000049361d db_stress`rocksdb::StressTest::OperateDb(this=0x00007fc637041000, thread=0x00007fc6370dbc00) at db_stress_test_base.cc:1148 ... ``` Reviewed By: ltamasi Differential Revision: D55157795 Pulled By: ajkr fbshipit-source-id: 5f7c1380ead5794c29d41680028e34b839744764
1 parent 63a105a commit bf98dcf

12 files changed

+110
-41
lines changed

db/blob/db_blob_basic_test.cc

+24
Original file line numberDiff line numberDiff line change
@@ -1182,6 +1182,30 @@ TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) {
11821182
ASSERT_EQ(Get("Key1"), "v1,v2,v3");
11831183
}
11841184

1185+
TEST_F(DBBlobBasicTest, GetMergeBlobFromMemoryTier) {
1186+
Options options = GetDefaultOptions();
1187+
options.merge_operator = MergeOperators::CreateStringAppendOperator();
1188+
options.enable_blob_files = true;
1189+
options.min_blob_size = 0;
1190+
1191+
Reopen(options);
1192+
1193+
ASSERT_OK(Put(Key(0), "v1"));
1194+
ASSERT_OK(Flush());
1195+
ASSERT_OK(Merge(Key(0), "v2"));
1196+
ASSERT_OK(Flush());
1197+
1198+
// Regular `Get()` loads data block to cache.
1199+
std::string value;
1200+
ASSERT_OK(db_->Get(ReadOptions(), Key(0), &value));
1201+
ASSERT_EQ("v1,v2", value);
1202+
1203+
// Base value blob is still uncached, so an in-memory read will fail.
1204+
ReadOptions read_options;
1205+
read_options.read_tier = kBlockCacheTier;
1206+
ASSERT_TRUE(db_->Get(read_options, Key(0), &value).IsIncomplete());
1207+
}
1208+
11851209
TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) {
11861210
constexpr size_t num_keys = 3;
11871211

db/table_cache.cc

+6-7
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ uint64_t TableCache::CreateRowCacheKeyPrefix(const ReadOptions& options,
395395

396396
bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
397397
size_t prefix_size, GetContext* get_context,
398-
SequenceNumber seq_no) {
398+
Status* read_status, SequenceNumber seq_no) {
399399
bool found = false;
400400

401401
row_cache_key.TrimAppend(prefix_size, user_key.data(), user_key.size());
@@ -414,8 +414,8 @@ bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
414414
row_cache.RegisterReleaseAsCleanup(row_handle, value_pinner);
415415
// If row cache hit, knowing cache key is the same to row_cache_key,
416416
// can use row_cache_key's seq no to construct InternalKey.
417-
replayGetContextLog(*row_cache.Value(row_handle), user_key, get_context,
418-
&value_pinner, seq_no);
417+
*read_status = replayGetContextLog(*row_cache.Value(row_handle), user_key,
418+
get_context, &value_pinner, seq_no);
419419
RecordTick(ioptions_.stats, ROW_CACHE_HIT);
420420
found = true;
421421
} else {
@@ -440,21 +440,20 @@ Status TableCache::Get(
440440

441441
// Check row cache if enabled.
442442
// Reuse row_cache_key sequence number when row cache hits.
443+
Status s;
443444
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
444445
auto user_key = ExtractUserKey(k);
445446
uint64_t cache_entry_seq_no =
446447
CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
447448
done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
448-
get_context, cache_entry_seq_no);
449+
get_context, &s, cache_entry_seq_no);
449450
if (!done) {
450451
row_cache_entry = &row_cache_entry_buffer;
451452
}
452453
}
453-
Status s;
454454
TableReader* t = fd.table_reader;
455455
TypedHandle* handle = nullptr;
456-
if (!done) {
457-
assert(s.ok());
456+
if (s.ok() && !done) {
458457
if (t == nullptr) {
459458
s = FindTable(options, file_options_, internal_comparator, file_meta,
460459
&handle, block_protection_bytes_per_key, prefix_extractor,

db/table_cache.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ class TableCache {
273273
// user key to row_cache_key at offset prefix_size
274274
bool GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
275275
size_t prefix_size, GetContext* get_context,
276+
Status* read_status,
276277
SequenceNumber seq_no = kMaxSequenceNumber);
277278

278279
const ImmutableOptions& ioptions_;
@@ -286,4 +287,4 @@ class TableCache {
286287
std::string db_session_id_;
287288
};
288289

289-
} // namespace ROCKSDB_NAMESPACE
290+
} // namespace ROCKSDB_NAMESPACE

db/table_cache_sync_and_async.h

+8-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,14 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
5050

5151
GetContext* get_context = miter->get_context;
5252

53-
if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
54-
get_context)) {
53+
Status read_status;
54+
bool ret =
55+
GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
56+
get_context, &read_status);
57+
if (!read_status.ok()) {
58+
CO_RETURN read_status;
59+
}
60+
if (ret) {
5561
table_range.SkipKey(miter);
5662
} else {
5763
row_cache_entries.emplace_back();

table/block_based/block_based_table_reader.cc

+13-4
Original file line numberDiff line numberDiff line change
@@ -2339,11 +2339,18 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
23392339
biter.key(), &parsed_key, false /* log_err_key */); // TODO
23402340
if (!pik_status.ok()) {
23412341
s = pik_status;
2342+
break;
23422343
}
23432344

2344-
if (!get_context->SaveValue(
2345-
parsed_key, biter.value(), &matched,
2346-
biter.IsValuePinned() ? &biter : nullptr)) {
2345+
Status read_status;
2346+
bool ret = get_context->SaveValue(
2347+
parsed_key, biter.value(), &matched, &read_status,
2348+
biter.IsValuePinned() ? &biter : nullptr);
2349+
if (!read_status.ok()) {
2350+
s = read_status;
2351+
break;
2352+
}
2353+
if (!ret) {
23472354
if (get_context->State() == GetContext::GetState::kFound) {
23482355
does_referenced_key_exist = true;
23492356
referenced_data_size = biter.key().size() + biter.value().size();
@@ -2352,7 +2359,9 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
23522359
break;
23532360
}
23542361
}
2355-
s = biter.status();
2362+
if (s.ok()) {
2363+
s = biter.status();
2364+
}
23562365
if (!s.ok()) {
23572366
break;
23582367
}

table/block_based/block_based_table_reader_sync_and_async.h

+13-3
Original file line numberDiff line numberDiff line change
@@ -735,9 +735,17 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
735735
biter->key(), &parsed_key, false /* log_err_key */); // TODO
736736
if (!pik_status.ok()) {
737737
s = pik_status;
738+
break;
739+
}
740+
Status read_status;
741+
bool ret = get_context->SaveValue(
742+
parsed_key, biter->value(), &matched, &read_status,
743+
value_pinner ? value_pinner : nullptr);
744+
if (!read_status.ok()) {
745+
s = read_status;
746+
break;
738747
}
739-
if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
740-
value_pinner)) {
748+
if (!ret) {
741749
if (get_context->State() == GetContext::GetState::kFound) {
742750
does_referenced_key_exist = true;
743751
referenced_data_size =
@@ -746,7 +754,9 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
746754
done = true;
747755
break;
748756
}
749-
s = biter->status();
757+
if (s.ok()) {
758+
s = biter->status();
759+
}
750760
}
751761
// Write the block cache access.
752762
// XXX: There appear to be 'break' statements above that bypass this

table/cuckoo/cuckoo_table_reader.cc

+4-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/,
186186
return s;
187187
}
188188
bool dont_care __attribute__((__unused__));
189-
get_context->SaveValue(found_ikey, value, &dont_care);
189+
get_context->SaveValue(found_ikey, value, &dont_care, &s);
190+
if (!s.ok()) {
191+
return s;
192+
}
190193
}
191194
// We don't support merge operations. So, we return here.
192195
return Status::OK();

table/get_context.cc

+19-14
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ void GetContext::ReportCounters() {
221221

222222
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
223223
const Slice& value, bool* matched,
224-
Cleanable* value_pinner) {
224+
Status* read_status, Cleanable* value_pinner) {
225225
assert(matched);
226226
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
227227
merge_context_ != nullptr);
@@ -356,8 +356,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
356356
// merge_context_->operand_list
357357
if (type == kTypeBlobIndex) {
358358
PinnableSlice pin_val;
359-
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) ==
360-
false) {
359+
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
360+
read_status) == false) {
361361
return false;
362362
}
363363
Slice blob_value(pin_val);
@@ -383,8 +383,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
383383
assert(merge_operator_ != nullptr);
384384
if (type == kTypeBlobIndex) {
385385
PinnableSlice pin_val;
386-
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) ==
387-
false) {
386+
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
387+
read_status) == false) {
388388
return false;
389389
}
390390
Slice blob_value(pin_val);
@@ -547,14 +547,14 @@ void GetContext::MergeWithWideColumnBaseValue(const Slice& entity) {
547547
}
548548

549549
bool GetContext::GetBlobValue(const Slice& user_key, const Slice& blob_index,
550-
PinnableSlice* blob_value) {
550+
PinnableSlice* blob_value, Status* read_status) {
551551
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
552552
constexpr uint64_t* bytes_read = nullptr;
553553

554-
Status status = blob_fetcher_->FetchBlob(
555-
user_key, blob_index, prefetch_buffer, blob_value, bytes_read);
556-
if (!status.ok()) {
557-
if (status.IsIncomplete()) {
554+
*read_status = blob_fetcher_->FetchBlob(user_key, blob_index, prefetch_buffer,
555+
blob_value, bytes_read);
556+
if (!read_status->ok()) {
557+
if (read_status->IsIncomplete()) {
558558
// FIXME: this code is not covered by unit tests
559559
MarkKeyMayExist();
560560
return false;
@@ -577,9 +577,9 @@ void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
577577
}
578578
}
579579

580-
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
581-
GetContext* get_context, Cleanable* value_pinner,
582-
SequenceNumber seq_no) {
580+
Status replayGetContextLog(const Slice& replay_log, const Slice& user_key,
581+
GetContext* get_context, Cleanable* value_pinner,
582+
SequenceNumber seq_no) {
583583
Slice s = replay_log;
584584
Slice ts;
585585
size_t ts_sz = get_context->TimestampSize();
@@ -610,8 +610,13 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
610610

611611
(void)ret;
612612

613-
get_context->SaveValue(ikey, value, &dont_care, value_pinner);
613+
Status read_status;
614+
get_context->SaveValue(ikey, value, &dont_care, &read_status, value_pinner);
615+
if (!read_status.ok()) {
616+
return read_status;
617+
}
614618
}
619+
return Status::OK();
615620
}
616621

617622
} // namespace ROCKSDB_NAMESPACE

table/get_context.h

+7-6
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ class GetContext {
135135
// Returns True if more keys need to be read (due to merges) or
136136
// False if the complete value has been found.
137137
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value,
138-
bool* matched, Cleanable* value_pinner = nullptr);
138+
bool* matched, Status* read_status,
139+
Cleanable* value_pinner = nullptr);
139140

140141
// Simplified version of the previous function. Should only be used when we
141142
// know that the operation is a Put.
@@ -204,7 +205,7 @@ class GetContext {
204205
void MergeWithWideColumnBaseValue(const Slice& entity);
205206

206207
bool GetBlobValue(const Slice& user_key, const Slice& blob_index,
207-
PinnableSlice* blob_value);
208+
PinnableSlice* blob_value, Status* read_status);
208209

209210
void appendToReplayLog(ValueType type, Slice value, Slice ts);
210211

@@ -250,9 +251,9 @@ class GetContext {
250251
// Call this to replay a log and bring the get_context up to date. The replay
251252
// log must have been created by another GetContext object, whose replay log
252253
// must have been set by calling GetContext::SetReplayLog().
253-
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
254-
GetContext* get_context,
255-
Cleanable* value_pinner = nullptr,
256-
SequenceNumber seq_no = kMaxSequenceNumber);
254+
Status replayGetContextLog(const Slice& replay_log, const Slice& user_key,
255+
GetContext* get_context,
256+
Cleanable* value_pinner = nullptr,
257+
SequenceNumber seq_no = kMaxSequenceNumber);
257258

258259
} // namespace ROCKSDB_NAMESPACE

table/mock_table.cc

+7-1
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,13 @@ Status MockTableReader::Get(const ReadOptions&, const Slice& key,
220220
}
221221

222222
bool dont_care __attribute__((__unused__));
223-
if (!get_context->SaveValue(parsed_key, iter->value(), &dont_care)) {
223+
Status read_status;
224+
bool ret = get_context->SaveValue(parsed_key, iter->value(), &dont_care,
225+
&read_status);
226+
if (!read_status.ok()) {
227+
return read_status;
228+
}
229+
if (!ret) {
224230
break;
225231
}
226232
}

table/plain/plain_table_reader.cc

+6-2
Original file line numberDiff line numberDiff line change
@@ -614,8 +614,12 @@ Status PlainTableReader::Get(const ReadOptions& /*ro*/, const Slice& target,
614614
// can we enable the fast path?
615615
if (internal_comparator_.Compare(found_key, parsed_target) >= 0) {
616616
bool dont_care __attribute__((__unused__));
617-
if (!get_context->SaveValue(found_key, found_value, &dont_care,
618-
dummy_cleanable_.get())) {
617+
bool ret = get_context->SaveValue(found_key, found_value, &dont_care, &s,
618+
dummy_cleanable_.get());
619+
if (!s.ok()) {
620+
return s;
621+
}
622+
if (!ret) {
619623
break;
620624
}
621625
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
* Fixed `kBlockCacheTier` reads to return `Status::Incomplete` when I/O is needed to fetch a merge chain's base value from a blob file.

0 commit comments

Comments
 (0)