Skip to content

Commit cda4006

Browse files
committed
Enhance partial merge to support multiple arguments
Summary: * PartialMerge api now takes a list of operands instead of two operands. * Add min_pertial_merge_operands to Options, indicating the minimum number of operands to trigger partial merge. * This diff is based on Schalk's previous diff (D14601), but it also includes necessary changes such as updating the pure C api for partial merge. Test Plan: * make check all * develop tests for cases where partial merge takes more than two operands. TODOs (from Schalk): * Add test with min_partial_merge_operands > 2. * Perform benchmarks to measure the performance improvements (can probably use results of task #2837810.) * Add description of problem to doc/index.html. * Change wiki pages to reflect the interface changes. Reviewers: haobo, igor, vamsi Reviewed By: haobo CC: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D16815
1 parent e6d4b00 commit cda4006

20 files changed

+297
-168
lines changed

db/builder.cc

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
7373

7474
MergeHelper merge(internal_comparator.user_comparator(),
7575
options.merge_operator.get(), options.info_log.get(),
76+
options.min_partial_merge_operands,
7677
true /* internal key corruption is not ok */);
7778

7879
if (purge) {

db/c.cc

+29-35
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,10 @@ struct rocksdb_mergeoperator_t : public MergeOperator {
159159
const char* const* operands_list, const size_t* operands_list_length,
160160
int num_operands,
161161
unsigned char* success, size_t* new_value_length);
162-
char* (*partial_merge_)(
163-
void*,
164-
const char* key, size_t key_length,
165-
const char* left_operand, size_t left_operand_length,
166-
const char* right_operand, size_t right_operand_length,
167-
unsigned char* success, size_t* new_value_length);
162+
char* (*partial_merge_)(void*, const char* key, size_t key_length,
163+
const char* const* operands_list,
164+
const size_t* operands_list_length, int num_operands,
165+
unsigned char* success, size_t* new_value_length);
168166
void (*delete_value_)(
169167
void*,
170168
const char* value, size_t value_length);
@@ -219,21 +217,23 @@ struct rocksdb_mergeoperator_t : public MergeOperator {
219217
return success;
220218
}
221219

222-
virtual bool PartialMerge(
223-
const Slice& key,
224-
const Slice& left_operand,
225-
const Slice& right_operand,
226-
std::string* new_value,
227-
Logger* logger) const {
220+
virtual bool PartialMergeMulti(const Slice& key,
221+
const std::deque<Slice>& operand_list,
222+
std::string* new_value, Logger* logger) const {
223+
size_t operand_count = operand_list.size();
224+
std::vector<const char*> operand_pointers(operand_count);
225+
std::vector<size_t> operand_sizes(operand_count);
226+
for (size_t i = 0; i < operand_count; ++i) {
227+
Slice operand(operand_list[i]);
228+
operand_pointers[i] = operand.data();
229+
operand_sizes[i] = operand.size();
230+
}
228231

229232
unsigned char success;
230233
size_t new_value_len;
231234
char* tmp_new_value = (*partial_merge_)(
232-
state_,
233-
key.data(), key.size(),
234-
left_operand.data(), left_operand.size(),
235-
right_operand.data(), right_operand.size(),
236-
&success, &new_value_len);
235+
state_, key.data(), key.size(), &operand_pointers[0], &operand_sizes[0],
236+
operand_count, &success, &new_value_len);
237237
new_value->assign(tmp_new_value, new_value_len);
238238

239239
if (delete_value_ != nullptr) {
@@ -1094,24 +1094,18 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_bloom(int bits_per_key) {
10941094
}
10951095

10961096
rocksdb_mergeoperator_t* rocksdb_mergeoperator_create(
1097-
void* state,
1098-
void (*destructor)(void*),
1099-
char* (*full_merge)(
1100-
void*,
1101-
const char* key, size_t key_length,
1102-
const char* existing_value, size_t existing_value_length,
1103-
const char* const* operands_list, const size_t* operands_list_length,
1104-
int num_operands,
1105-
unsigned char* success, size_t* new_value_length),
1106-
char* (*partial_merge)(
1107-
void*,
1108-
const char* key, size_t key_length,
1109-
const char* left_operand, size_t left_operand_length,
1110-
const char* right_operand, size_t right_operand_length,
1111-
unsigned char* success, size_t* new_value_length),
1112-
void (*delete_value)(
1113-
void*,
1114-
const char* value, size_t value_length),
1097+
void* state, void (*destructor)(void*),
1098+
char* (*full_merge)(void*, const char* key, size_t key_length,
1099+
const char* existing_value,
1100+
size_t existing_value_length,
1101+
const char* const* operands_list,
1102+
const size_t* operands_list_length, int num_operands,
1103+
unsigned char* success, size_t* new_value_length),
1104+
char* (*partial_merge)(void*, const char* key, size_t key_length,
1105+
const char* const* operands_list,
1106+
const size_t* operands_list_length, int num_operands,
1107+
unsigned char* success, size_t* new_value_length),
1108+
void (*delete_value)(void*, const char* value, size_t value_length),
11151109
const char* (*name)(void*)) {
11161110
rocksdb_mergeoperator_t* result = new rocksdb_mergeoperator_t;
11171111
result->state_ = state;

db/c_test.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ static char* MergeOperatorFullMerge(
175175
static char* MergeOperatorPartialMerge(
176176
void* arg,
177177
const char* key, size_t key_length,
178-
const char* left_operand, size_t left_operand_length,
179-
const char* right_operand, size_t right_operand_length,
178+
const char* const* operands_list, const size_t* operands_list_length,
179+
int num_operands,
180180
unsigned char* success, size_t* new_value_length) {
181181
*new_value_length = 4;
182182
*success = 1;

db/db_impl.cc

+1
Original file line numberDiff line numberDiff line change
@@ -2463,6 +2463,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
24632463
std::vector<char> delete_key; // for compaction filter
24642464
MergeHelper merge(user_comparator(), options_.merge_operator.get(),
24652465
options_.info_log.get(),
2466+
options_.min_partial_merge_operands,
24662467
false /* internal key corruption is expected */);
24672468
auto compaction_filter = options_.compaction_filter;
24682469
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;

db/db_iter.cc

-15
Original file line numberDiff line numberDiff line change
@@ -313,21 +313,6 @@ void DBIter::MergeValuesNewToOld() {
313313
// when complete, add result to operands and continue.
314314
const Slice& value = iter_->value();
315315
operands.push_front(value.ToString());
316-
while(operands.size() >= 2) {
317-
// Call user associative-merge until it returns false
318-
if (user_merge_operator_->PartialMerge(ikey.user_key,
319-
Slice(operands[0]),
320-
Slice(operands[1]),
321-
&merge_result,
322-
logger_)) {
323-
operands.pop_front();
324-
swap(operands.front(), merge_result);
325-
} else {
326-
// Associative merge returns false ==> stack the operands
327-
break;
328-
}
329-
}
330-
331316
}
332317
}
333318

db/memtable.cc

+1-12
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
3636
kWriteBufferSize(options.write_buffer_size),
3737
arena_(options.arena_block_size),
3838
table_(options.memtable_factory->CreateMemTableRep(
39-
comparator_, &arena_, options.prefix_extractor.get())),
39+
comparator_, &arena_, options.prefix_extractor.get())),
4040
flush_in_progress_(false),
4141
flush_completed_(false),
4242
file_number_(0),
@@ -353,17 +353,6 @@ static bool SaveValue(void* arg, const char* entry) {
353353
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
354354
*(s->merge_in_progress) = true;
355355
merge_context->PushOperand(v);
356-
while (merge_context->GetNumOperands() >= 2) {
357-
// Attempt to associative merge. (Returns true if successful)
358-
if (merge_operator->PartialMerge(
359-
s->key->user_key(), merge_context->GetOperand(0),
360-
merge_context->GetOperand(1), &merge_result, s->logger)) {
361-
merge_context->PushPartialMergeResult(merge_result);
362-
} else {
363-
// Stack them because user can't associative merge
364-
break;
365-
}
366-
}
367356
return true;
368357
}
369358
default:

db/merge_context.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ class MergeContext {
2525
operand_list->clear();
2626
}
2727
}
28-
// Replace the first two operands of merge_result, which are expected be the
29-
// merge results of them.
28+
// Replace all operands with merge_result, which are expected to be the
29+
// merge result of them.
3030
void PushPartialMergeResult(std::string& merge_result) {
3131
assert (operand_list);
32-
operand_list->pop_front();
33-
swap(operand_list->front(), merge_result);
32+
operand_list->clear();
33+
operand_list->push_front(std::move(merge_result));
3434
}
3535
// Push a merge operand
3636
void PushOperand(const Slice& operand_slice) {

db/merge_helper.cc

+19-20
Original file line numberDiff line numberDiff line change
@@ -129,28 +129,10 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
129129
// => then continue because we haven't yet seen a Put/Delete.
130130
assert(!operands_.empty()); // Should have at least one element in it
131131

132+
// keep queuing keys and operands until we either meet a put / delete
133+
// request or later did a partial merge.
132134
keys_.push_front(iter->key().ToString());
133135
operands_.push_front(iter->value().ToString());
134-
while (operands_.size() >= 2) {
135-
// Returns false when the merge_operator can no longer process it
136-
if (user_merge_operator_->PartialMerge(ikey.user_key,
137-
Slice(operands_[0]),
138-
Slice(operands_[1]),
139-
&merge_result,
140-
logger_)) {
141-
// Merging of operands (associative merge) was successful.
142-
// Replace these frontmost two operands with the merge result
143-
keys_.pop_front();
144-
operands_.pop_front();
145-
swap(operands_.front(), merge_result);
146-
} else {
147-
// Merging of operands (associative merge) returned false.
148-
// The user merge_operator does not know how to merge these operands.
149-
// So we just stack them up until we find a Put/Delete or end of key.
150-
break;
151-
}
152-
}
153-
continue;
154136
}
155137
}
156138

@@ -192,6 +174,23 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
192174
RecordTick(stats, NUMBER_MERGE_FAILURES);
193175
// Do nothing if not success_. Leave keys() and operands() as they are.
194176
}
177+
} else {
178+
// We haven't seen the beginning of the key nor a Put/Delete.
179+
// Attempt to use the user's associative merge function to
180+
// merge the stacked merge operands into a single operand.
181+
182+
if (operands_.size() >= 2 &&
183+
operands_.size() >= min_partial_merge_operands_ &&
184+
user_merge_operator_->PartialMergeMulti(
185+
ikey.user_key,
186+
std::deque<Slice>(operands_.begin(), operands_.end()),
187+
&merge_result, logger_)) {
188+
// Merging of operands (associative merge) was successful.
189+
// Replace operands with the merge result
190+
operands_.clear();
191+
operands_.push_front(std::move(merge_result));
192+
keys_.erase(keys_.begin(), keys_.end() - 1);
193+
}
195194
}
196195
}
197196

db/merge_helper.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ class Statistics;
2222
class MergeHelper {
2323
public:
2424
MergeHelper(const Comparator* user_comparator,
25-
const MergeOperator* user_merge_operator,
26-
Logger* logger,
25+
const MergeOperator* user_merge_operator, Logger* logger,
26+
unsigned min_partial_merge_operands,
2727
bool assert_valid_internal_key)
2828
: user_comparator_(user_comparator),
2929
user_merge_operator_(user_merge_operator),
3030
logger_(logger),
31+
min_partial_merge_operands_(min_partial_merge_operands),
3132
assert_valid_internal_key_(assert_valid_internal_key),
3233
keys_(),
3334
operands_(),
@@ -88,6 +89,7 @@ class MergeHelper {
8889
const Comparator* user_comparator_;
8990
const MergeOperator* user_merge_operator_;
9091
Logger* logger_;
92+
unsigned min_partial_merge_operands_;
9193
bool assert_valid_internal_key_; // enforce no internal key corruption?
9294

9395
// the scratch area that holds the result of MergeUntil

db/merge_operator.cc

+22-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,28 @@
1111

1212
namespace rocksdb {
1313

14+
// The default implementation of PartialMergeMulti, which invokes
15+
// PartialMerge multiple times internally and merges two operands at
16+
// a time.
17+
bool MergeOperator::PartialMergeMulti(const Slice& key,
18+
const std::deque<Slice>& operand_list,
19+
std::string* new_value,
20+
Logger* logger) const {
21+
// Simply loop through the operands
22+
std::string temp_value;
23+
Slice temp_slice;
24+
for (const auto& operand : operand_list) {
25+
if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) {
26+
return false;
27+
}
28+
swap(temp_value, *new_value);
29+
temp_slice = Slice(*new_value);
30+
}
31+
32+
// The result will be in *new_value. All merges succeeded.
33+
return true;
34+
}
35+
1436
// Given a "real" merge from the library, call the user's
1537
// associative merge function one-by-one on each of the operands.
1638
// NOTE: It is assumed that the client's merge-operator will handle any errors.
@@ -46,7 +68,6 @@ bool AssociativeMergeOperator::PartialMerge(
4668
const Slice& right_operand,
4769
std::string* new_value,
4870
Logger* logger) const {
49-
5071
return Merge(key, &left_operand, right_operand, new_value, logger);
5172
}
5273

0 commit comments

Comments
 (0)