@@ -60,9 +60,11 @@ CuckooTableBuilder::CuckooTableBuilder(
60
60
hash_table_size_(use_module_hash ? 0 : 2 ),
61
61
is_last_level_file_(false ),
62
62
has_seen_first_key_(false ),
63
+ has_seen_first_value_(false ),
63
64
key_size_(0 ),
64
65
value_size_(0 ),
65
66
num_entries_(0 ),
67
+ num_values_(0 ),
66
68
ucomp_(user_comparator),
67
69
use_module_hash_(use_module_hash),
68
70
identity_as_first_hash_(identity_as_first_hash),
@@ -84,6 +86,12 @@ void CuckooTableBuilder::Add(const Slice& key, const Slice& value) {
84
86
status_ = Status::Corruption (" Unable to parse key into inernal key." );
85
87
return ;
86
88
}
89
+ if (ikey.type != kTypeDeletion && ikey.type != kTypeValue ) {
90
+ status_ = Status::NotSupported (" Unsupported key type " +
91
+ std::to_string (ikey.type ));
92
+ return ;
93
+ }
94
+
87
95
// Determine if we can ignore the sequence number and value type from
88
96
// internal keys by looking at sequence number from first key. We assume
89
97
// that if first key has a zero sequence number, then all the remaining
@@ -94,16 +102,38 @@ void CuckooTableBuilder::Add(const Slice& key, const Slice& value) {
94
102
smallest_user_key_.assign (ikey.user_key .data (), ikey.user_key .size ());
95
103
largest_user_key_.assign (ikey.user_key .data (), ikey.user_key .size ());
96
104
key_size_ = is_last_level_file_ ? ikey.user_key .size () : key.size ();
97
- value_size_ = value.size ();
105
+ }
106
+ if (key_size_ != (is_last_level_file_ ? ikey.user_key .size () : key.size ())) {
107
+ status_ = Status::NotSupported (" all keys have to be the same size" );
108
+ return ;
98
109
}
99
110
// Even if one sequence number is non-zero, then it is not last level.
100
111
assert (!is_last_level_file_ || ikey.sequence == 0 );
101
- if (is_last_level_file_) {
102
- kvs_.append (ikey.user_key .data (), ikey.user_key .size ());
112
+
113
+ if (ikey.type == kTypeValue ) {
114
+ if (!has_seen_first_value_) {
115
+ has_seen_first_value_ = true ;
116
+ value_size_ = value.size ();
117
+ }
118
+ if (value_size_ != value.size ()) {
119
+ status_ = Status::NotSupported (" all values have to be the same size" );
120
+ return ;
121
+ }
122
+
123
+ if (is_last_level_file_) {
124
+ kvs_.append (ikey.user_key .data (), ikey.user_key .size ());
125
+ } else {
126
+ kvs_.append (key.data (), key.size ());
127
+ }
128
+ kvs_.append (value.data (), value.size ());
129
+ ++num_values_;
103
130
} else {
104
- kvs_.append (key.data (), key.size ());
131
+ if (is_last_level_file_) {
132
+ deleted_keys_.append (ikey.user_key .data (), ikey.user_key .size ());
133
+ } else {
134
+ deleted_keys_.append (key.data (), key.size ());
135
+ }
105
136
}
106
- kvs_.append (value.data (), value.size ());
107
137
++num_entries_;
108
138
109
139
// In order to fill the empty buckets in the hash table, we identify a
@@ -123,15 +153,30 @@ void CuckooTableBuilder::Add(const Slice& key, const Slice& value) {
123
153
}
124
154
}
125
155
156
+ bool CuckooTableBuilder::IsDeletedKey (uint64_t idx) const {
157
+ assert (closed_);
158
+ return idx >= num_values_;
159
+ }
160
+
126
161
Slice CuckooTableBuilder::GetKey (uint64_t idx) const {
162
+ assert (closed_);
163
+ if (IsDeletedKey (idx)) {
164
+ return Slice (&deleted_keys_[(idx - num_values_) * key_size_], key_size_);
165
+ }
127
166
return Slice (&kvs_[idx * (key_size_ + value_size_)], key_size_);
128
167
}
129
168
130
169
Slice CuckooTableBuilder::GetUserKey (uint64_t idx) const {
170
+ assert (closed_);
131
171
return is_last_level_file_ ? GetKey (idx) : ExtractUserKey (GetKey (idx));
132
172
}
133
173
134
174
Slice CuckooTableBuilder::GetValue (uint64_t idx) const {
175
+ assert (closed_);
176
+ if (IsDeletedKey (idx)) {
177
+ static std::string empty_value (value_size_, ' a' );
178
+ return Slice (empty_value);
179
+ }
135
180
return Slice (&kvs_[idx * (key_size_ + value_size_) + key_size_], value_size_);
136
181
}
137
182
@@ -256,7 +301,9 @@ Status CuckooTableBuilder::Finish() {
256
301
++num_added;
257
302
s = file_->Append (GetKey (bucket.vector_idx ));
258
303
if (s.ok ()) {
259
- s = file_->Append (GetValue (bucket.vector_idx ));
304
+ if (value_size_ > 0 ) {
305
+ s = file_->Append (GetValue (bucket.vector_idx ));
306
+ }
260
307
}
261
308
}
262
309
if (!s.ok ()) {
0 commit comments