Skip to content

Commit 2d5c6d1

Browse files
authored
Merge pull request facebook#17 from cockroachdb/rdfix
Backport facebook#4432
2 parents fa3a044 + 6f11959 commit 2d5c6d1

16 files changed

+998
-309
lines changed

db/compaction.cc

+81-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,43 @@
2323

2424
namespace rocksdb {
2525

26+
const uint64_t kRangeTombstoneSentinel =
27+
PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
28+
29+
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
30+
const InternalKey& b) {
31+
auto c = user_cmp->Compare(a.user_key(), b.user_key());
32+
if (c != 0) {
33+
return c;
34+
}
35+
auto a_footer = ExtractInternalKeyFooter(a.Encode());
36+
auto b_footer = ExtractInternalKeyFooter(b.Encode());
37+
if (a_footer == kRangeTombstoneSentinel) {
38+
if (b_footer != kRangeTombstoneSentinel) {
39+
return -1;
40+
}
41+
} else if (b_footer == kRangeTombstoneSentinel) {
42+
return 1;
43+
}
44+
return 0;
45+
}
46+
47+
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
48+
const InternalKey& b) {
49+
if (a == nullptr) {
50+
return -1;
51+
}
52+
return sstableKeyCompare(user_cmp, *a, b);
53+
}
54+
55+
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
56+
const InternalKey* b) {
57+
if (b == nullptr) {
58+
return -1;
59+
}
60+
return sstableKeyCompare(user_cmp, a, *b);
61+
}
62+
2663
uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
2764
uint64_t sum = 0;
2865
for (size_t i = 0; i < files.size() && files[i]; i++) {
@@ -81,6 +118,49 @@ void Compaction::GetBoundaryKeys(
81118
}
82119
}
83120

121+
std::vector<CompactionInputFiles> Compaction::PopulateWithAtomicBoundaries(
122+
VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs) {
123+
const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
124+
for (size_t i = 0; i < inputs.size(); i++) {
125+
if (inputs[i].level == 0 || inputs[i].files.empty()) {
126+
continue;
127+
}
128+
inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size());
129+
AtomicCompactionUnitBoundary cur_boundary;
130+
size_t first_atomic_idx = 0;
131+
auto add_unit_boundary = [&](size_t to) {
132+
if (first_atomic_idx == to) return;
133+
for (size_t k = first_atomic_idx; k < to; k++) {
134+
inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary);
135+
}
136+
first_atomic_idx = to;
137+
};
138+
for (size_t j = 0; j < inputs[i].files.size(); j++) {
139+
const auto* f = inputs[i].files[j];
140+
if (j == 0) {
141+
// First file in a level.
142+
cur_boundary.smallest = &f->smallest;
143+
cur_boundary.largest = &f->largest;
144+
} else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) ==
145+
0) {
146+
// SSTs overlap but the end key of the previous file was not
147+
// artificially extended by a range tombstone. Extend the current
148+
// boundary.
149+
cur_boundary.largest = &f->largest;
150+
} else {
151+
// Atomic compaction unit has ended.
152+
add_unit_boundary(j);
153+
cur_boundary.smallest = &f->smallest;
154+
cur_boundary.largest = &f->largest;
155+
}
156+
}
157+
add_unit_boundary(inputs[i].files.size());
158+
assert(inputs[i].files.size() ==
159+
inputs[i].atomic_compaction_unit_boundaries.size());
160+
}
161+
return inputs;
162+
}
163+
84164
// helper function to determine if compaction is creating files at the
85165
// bottommost level
86166
bool Compaction::IsBottommostLevel(
@@ -151,7 +231,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
151231
output_path_id_(_output_path_id),
152232
output_compression_(_compression),
153233
deletion_compaction_(_deletion_compaction),
154-
inputs_(std::move(_inputs)),
234+
inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
155235
grandparents_(std::move(_grandparents)),
156236
score_(_score),
157237
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),

db/compaction.h

+45
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,43 @@
1515

1616
namespace rocksdb {
1717

18+
// Utility for comparing sstable boundary keys. Returns -1 if either a or b is
19+
// null which provides the property that a==null indicates a key that is less
20+
// than any key and b==null indicates a key that is greater than any key. Note
21+
// that the comparison is performed primarily on the user-key portion of the
22+
// key. If the user-keys compare equal, an additional test is made to sort
23+
// range tombstone sentinel keys before other keys with the same user-key. The
24+
// result is that 2 user-keys will compare equal if they differ purely on
25+
// their sequence number and value, but the range tombstone sentinel for that
26+
// user-key will compare not equal. This is necessary because the range
27+
// tombstone sentinel key is set as the largest key for an sstable even though
28+
// that key never appears in the database. We don't want adjacent sstables to
29+
// be considered overlapping if they are separated by the range tombstone
30+
// sentinel.
31+
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
32+
const InternalKey& b);
33+
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
34+
const InternalKey& b);
35+
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
36+
const InternalKey* b);
37+
38+
// An AtomicCompactionUnitBoundary represents a range of keys [smallest,
39+
// largest] that exactly spans one ore more neighbouring SSTs on the same
40+
// level. Every pair of SSTs in this range "overlap" (i.e., the largest
41+
// user key of one file is the smallest user key of the next file). These
42+
// boundaries are propagated down to RangeDelAggregator during compaction
43+
// to provide safe truncation boundaries for range tombstones.
44+
struct AtomicCompactionUnitBoundary {
45+
const InternalKey* smallest = nullptr;
46+
const InternalKey* largest = nullptr;
47+
};
48+
1849
// The structure that manages compaction input files associated
1950
// with the same physical level.
2051
struct CompactionInputFiles {
2152
int level;
2253
std::vector<FileMetaData*> files;
54+
std::vector<AtomicCompactionUnitBoundary> atomic_compaction_unit_boundaries;
2355
inline bool empty() const { return files.empty(); }
2456
inline size_t size() const { return files.size(); }
2557
inline void clear() { files.clear(); }
@@ -95,6 +127,12 @@ class Compaction {
95127
return inputs_[compaction_input_level][i];
96128
}
97129

130+
const std::vector<AtomicCompactionUnitBoundary>* boundaries(
131+
size_t compaction_input_level) const {
132+
assert(compaction_input_level < inputs_.size());
133+
return &inputs_[compaction_input_level].atomic_compaction_unit_boundaries;
134+
}
135+
98136
// Returns the list of file meta data of the specified compaction
99137
// input level.
100138
// REQUIREMENT: "compaction_input_level" must be >= 0 and
@@ -252,6 +290,13 @@ class Compaction {
252290
const std::vector<CompactionInputFiles>& inputs,
253291
Slice* smallest_key, Slice* largest_key);
254292

293+
// Get the atomic file boundaries for all files in the compaction. Necessary
294+
// in order to avoid the scenario described in
295+
// https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and plumb
296+
// down appropriate key boundaries to RangeDelAggregator during compaction.
297+
static std::vector<CompactionInputFiles> PopulateWithAtomicBoundaries(
298+
VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs);
299+
255300
// helper function to determine if compaction with inputs and storage is
256301
// bottommost
257302
static bool IsBottommostLevel(

db/compaction_job.cc

+5-4
Original file line numberDiff line numberDiff line change
@@ -1092,10 +1092,11 @@ Status CompactionJob::FinishCompactionOutputFile(
10921092
for (; it->Valid(); it->Next()) {
10931093
auto tombstone = it->Tombstone();
10941094
if (upper_bound != nullptr &&
1095-
ucmp->Compare(*upper_bound, tombstone.start_key_) <= 0) {
1096-
// Tombstones starting at upper_bound or later only need to be included
1097-
// in the next table. Break because subsequent tombstones will start
1098-
// even later.
1095+
ucmp->Compare(*upper_bound, tombstone.start_key_) < 0) {
1096+
// Tombstones starting after upper_bound only need to be included in the
1097+
// next table (if the SSTs overlap, then upper_bound is contained in
1098+
// this SST and hence must be covered). Break because subsequent
1099+
// tombstones will start even later.
10991100
break;
11001101
}
11011102

db/dbformat.h

+9-7
Original file line numberDiff line numberDiff line change
@@ -635,13 +635,15 @@ class PartialRangeTombstone {
635635
PartialRangeTombstone()
636636
: start_key_valid_(false), end_key_valid_(false), seq_(0) {}
637637

638-
PartialRangeTombstone(const Slice* sk, const Slice* ek, SequenceNumber sq)
638+
PartialRangeTombstone(const ParsedInternalKey* sk,
639+
const ParsedInternalKey* ek,
640+
SequenceNumber sq)
639641
: seq_(sq) {
640642
SetStartKey(sk);
641643
SetEndKey(ek);
642644
}
643645

644-
void SetStartKey(const Slice* sk) {
646+
void SetStartKey(const ParsedInternalKey* sk) {
645647
if (sk != nullptr) {
646648
start_key_ = *sk;
647649
start_key_valid_ = true;
@@ -650,7 +652,7 @@ class PartialRangeTombstone {
650652
}
651653
}
652654

653-
void SetEndKey(const Slice* ek) {
655+
void SetEndKey(const ParsedInternalKey* ek) {
654656
if (ek != nullptr) {
655657
end_key_ = *ek;
656658
end_key_valid_ = true;
@@ -659,15 +661,15 @@ class PartialRangeTombstone {
659661
}
660662
}
661663

662-
const Slice* start_key() const {
664+
const ParsedInternalKey* start_key() const {
663665
return start_key_valid_ ? &start_key_ : nullptr;
664666
}
665-
const Slice* end_key() const { return end_key_valid_ ? &end_key_ : nullptr; }
667+
const ParsedInternalKey* end_key() const { return end_key_valid_ ? &end_key_ : nullptr; }
666668
SequenceNumber seq() const { return seq_; }
667669

668670
private:
669-
Slice start_key_;
670-
Slice end_key_;
671+
ParsedInternalKey start_key_;
672+
ParsedInternalKey end_key_;
671673
bool start_key_valid_;
672674
bool end_key_valid_;
673675
SequenceNumber seq_;

0 commit comments

Comments
 (0)