Skip to content

Commit 8e634d3

Browse files
committed
Merge pull request XRPLF#74 from alberts/lz4
Support for LZ4 compression.
2 parents 5833f47 + df2f922 commit 8e634d3

11 files changed

+345
-42
lines changed

build_tools/build_detect_platform

+13-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
#
2020
# -DLEVELDB_PLATFORM_POSIX if cstdatomic is present
2121
# -DLEVELDB_PLATFORM_NOATOMIC if it is not
22-
# -DSNAPPY if the Snappy library is present
22+
# -DSNAPPY if the Snappy library is present
23+
# -DLZ4 if the LZ4 library is present
2324
#
2425
# Using gflags in rocksdb:
2526
# Our project depends on gflags, which requires users to take some extra steps
@@ -244,6 +245,17 @@ EOF
244245
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lbz2"
245246
fi
246247

248+
# Test whether lz4 library is installed
249+
$CXX $CFLAGS $COMMON_FLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
250+
#include <lz4.h>
251+
#include <lz4hc.h>
252+
int main() {}
253+
EOF
254+
if [ "$?" = 0 ]; then
255+
COMMON_FLAGS="$COMMON_FLAGS -DLZ4"
256+
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -llz4"
257+
fi
258+
247259
# Test whether tcmalloc is available
248260
$CXX $CFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null <<EOF
249261
int main() {}

db/db_bench.cc

+118-20
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ DEFINE_string(benchmarks,
6060
"randomwithverify,"
6161
"fill100K,"
6262
"crc32c,"
63-
"snappycomp,"
64-
"snappyuncomp,"
63+
"compress,"
64+
"uncompress,"
6565
"acquireload,"
6666
"fillfromstdin,",
6767

@@ -338,6 +338,10 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
338338
return rocksdb::kZlibCompression;
339339
else if (!strcasecmp(ctype, "bzip2"))
340340
return rocksdb::kBZip2Compression;
341+
else if (!strcasecmp(ctype, "lz4"))
342+
return rocksdb::kLZ4Compression;
343+
else if (!strcasecmp(ctype, "lz4hc"))
344+
return rocksdb::kLZ4HCCompression;
341345

342346
fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
343347
return rocksdb::kSnappyCompression; //default value
@@ -841,7 +845,13 @@ class Benchmark {
841845
case rocksdb::kBZip2Compression:
842846
fprintf(stdout, "Compression: bzip2\n");
843847
break;
844-
}
848+
case rocksdb::kLZ4Compression:
849+
fprintf(stdout, "Compression: lz4\n");
850+
break;
851+
case rocksdb::kLZ4HCCompression:
852+
fprintf(stdout, "Compression: lz4hc\n");
853+
break;
854+
}
845855

846856
switch (FLAGS_rep_factory) {
847857
case kPrefixHash:
@@ -896,6 +906,16 @@ class Benchmark {
896906
strlen(text), &compressed);
897907
name = "BZip2";
898908
break;
909+
case kLZ4Compression:
910+
result = port::LZ4_Compress(Options().compression_opts, text,
911+
strlen(text), &compressed);
912+
name = "LZ4";
913+
break;
914+
case kLZ4HCCompression:
915+
result = port::LZ4HC_Compress(Options().compression_opts, text,
916+
strlen(text), &compressed);
917+
name = "LZ4HC";
918+
break;
899919
case kNoCompression:
900920
assert(false); // cannot happen
901921
break;
@@ -1146,10 +1166,10 @@ class Benchmark {
11461166
method = &Benchmark::Crc32c;
11471167
} else if (name == Slice("acquireload")) {
11481168
method = &Benchmark::AcquireLoad;
1149-
} else if (name == Slice("snappycomp")) {
1150-
method = &Benchmark::SnappyCompress;
1151-
} else if (name == Slice("snappyuncomp")) {
1152-
method = &Benchmark::SnappyUncompress;
1169+
} else if (name == Slice("compress")) {
1170+
method = &Benchmark::Compress;
1171+
} else if (name == Slice("uncompress")) {
1172+
method = &Benchmark::Uncompress;
11531173
} else if (name == Slice("heapprofile")) {
11541174
HeapProfile();
11551175
} else if (name == Slice("stats")) {
@@ -1302,23 +1322,47 @@ class Benchmark {
13021322
if (ptr == nullptr) exit(1); // Disable unused variable warning.
13031323
}
13041324

1305-
void SnappyCompress(ThreadState* thread) {
1325+
void Compress(ThreadState *thread) {
13061326
RandomGenerator gen;
13071327
Slice input = gen.Generate(Options().block_size);
13081328
int64_t bytes = 0;
13091329
int64_t produced = 0;
13101330
bool ok = true;
13111331
std::string compressed;
1312-
while (ok && bytes < 1024 * 1048576) { // Compress 1G
1313-
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
1332+
1333+
// Compress 1G
1334+
while (ok && bytes < int64_t(1) << 30) {
1335+
switch (FLAGS_compression_type_e) {
1336+
case rocksdb::kSnappyCompression:
1337+
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
1338+
input.size(), &compressed);
1339+
break;
1340+
case rocksdb::kZlibCompression:
1341+
ok = port::Zlib_Compress(Options().compression_opts, input.data(),
13141342
input.size(), &compressed);
1343+
break;
1344+
case rocksdb::kBZip2Compression:
1345+
ok = port::BZip2_Compress(Options().compression_opts, input.data(),
1346+
input.size(), &compressed);
1347+
break;
1348+
case rocksdb::kLZ4Compression:
1349+
ok = port::LZ4_Compress(Options().compression_opts, input.data(),
1350+
input.size(), &compressed);
1351+
break;
1352+
case rocksdb::kLZ4HCCompression:
1353+
ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
1354+
input.size(), &compressed);
1355+
break;
1356+
default:
1357+
ok = false;
1358+
}
13151359
produced += compressed.size();
13161360
bytes += input.size();
13171361
thread->stats.FinishedSingleOp(nullptr);
13181362
}
13191363

13201364
if (!ok) {
1321-
thread->stats.AddMessage("(snappy failure)");
1365+
thread->stats.AddMessage("(compression failure)");
13221366
} else {
13231367
char buf[100];
13241368
snprintf(buf, sizeof(buf), "(output: %.1f%%)",
@@ -1328,24 +1372,78 @@ class Benchmark {
13281372
}
13291373
}
13301374

1331-
void SnappyUncompress(ThreadState* thread) {
1375+
void Uncompress(ThreadState *thread) {
13321376
RandomGenerator gen;
13331377
Slice input = gen.Generate(Options().block_size);
13341378
std::string compressed;
1335-
bool ok = port::Snappy_Compress(Options().compression_opts, input.data(),
1336-
input.size(), &compressed);
1379+
1380+
bool ok;
1381+
switch (FLAGS_compression_type_e) {
1382+
case rocksdb::kSnappyCompression:
1383+
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
1384+
input.size(), &compressed);
1385+
break;
1386+
case rocksdb::kZlibCompression:
1387+
ok = port::Zlib_Compress(Options().compression_opts, input.data(),
1388+
input.size(), &compressed);
1389+
break;
1390+
case rocksdb::kBZip2Compression:
1391+
ok = port::BZip2_Compress(Options().compression_opts, input.data(),
1392+
input.size(), &compressed);
1393+
break;
1394+
case rocksdb::kLZ4Compression:
1395+
ok = port::LZ4_Compress(Options().compression_opts, input.data(),
1396+
input.size(), &compressed);
1397+
break;
1398+
case rocksdb::kLZ4HCCompression:
1399+
ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
1400+
input.size(), &compressed);
1401+
break;
1402+
default:
1403+
ok = false;
1404+
}
1405+
13371406
int64_t bytes = 0;
1338-
char* uncompressed = new char[input.size()];
1339-
while (ok && bytes < 1024 * 1048576) { // Compress 1G
1340-
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
1341-
uncompressed);
1407+
int decompress_size;
1408+
while (ok && bytes < 1024 * 1048576) {
1409+
char *uncompressed = nullptr;
1410+
switch (FLAGS_compression_type_e) {
1411+
case rocksdb::kSnappyCompression:
1412+
// allocate here to make comparison fair
1413+
uncompressed = new char[input.size()];
1414+
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
1415+
uncompressed);
1416+
break;
1417+
case rocksdb::kZlibCompression:
1418+
uncompressed = port::Zlib_Uncompress(
1419+
compressed.data(), compressed.size(), &decompress_size);
1420+
ok = uncompressed != nullptr;
1421+
break;
1422+
case rocksdb::kBZip2Compression:
1423+
uncompressed = port::BZip2_Uncompress(
1424+
compressed.data(), compressed.size(), &decompress_size);
1425+
ok = uncompressed != nullptr;
1426+
break;
1427+
case rocksdb::kLZ4Compression:
1428+
uncompressed = port::LZ4_Uncompress(
1429+
compressed.data(), compressed.size(), &decompress_size);
1430+
ok = uncompressed != nullptr;
1431+
break;
1432+
case rocksdb::kLZ4HCCompression:
1433+
uncompressed = port::LZ4_Uncompress(
1434+
compressed.data(), compressed.size(), &decompress_size);
1435+
ok = uncompressed != nullptr;
1436+
break;
1437+
default:
1438+
ok = false;
1439+
}
1440+
delete[] uncompressed;
13421441
bytes += input.size();
13431442
thread->stats.FinishedSingleOp(nullptr);
13441443
}
1345-
delete[] uncompressed;
13461444

13471445
if (!ok) {
1348-
thread->stats.AddMessage("(snappy failure)");
1446+
thread->stats.AddMessage("(compression failure)");
13491447
} else {
13501448
thread->stats.AddBytes(bytes);
13511449
}

db/db_test.cc

+21-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,19 @@ static bool BZip2CompressionSupported(const CompressionOptions& options) {
5656
return port::BZip2_Compress(options, in.data(), in.size(), &out);
5757
}
5858

59-
static std::string RandomString(Random* rnd, int len) {
59+
static bool LZ4CompressionSupported(const CompressionOptions &options) {
60+
std::string out;
61+
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
62+
return port::LZ4_Compress(options, in.data(), in.size(), &out);
63+
}
64+
65+
static bool LZ4HCCompressionSupported(const CompressionOptions &options) {
66+
std::string out;
67+
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
68+
return port::LZ4HC_Compress(options, in.data(), in.size(), &out);
69+
}
70+
71+
static std::string RandomString(Random *rnd, int len) {
6072
std::string r;
6173
test::RandomString(rnd, len, &r);
6274
return r;
@@ -2624,6 +2636,14 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
26242636
CompressionOptions(wbits, lev, strategy))) {
26252637
type = kBZip2Compression;
26262638
fprintf(stderr, "using bzip2\n");
2639+
} else if (LZ4CompressionSupported(
2640+
CompressionOptions(wbits, lev, strategy))) {
2641+
type = kLZ4Compression;
2642+
fprintf(stderr, "using lz4\n");
2643+
} else if (LZ4HCCompressionSupported(
2644+
CompressionOptions(wbits, lev, strategy))) {
2645+
type = kLZ4HCCompression;
2646+
fprintf(stderr, "using lz4hc\n");
26272647
} else {
26282648
fprintf(stderr, "skipping test, compression disabled\n");
26292649
return false;

include/rocksdb/c.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,10 @@ extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*);
238238
enum {
239239
rocksdb_no_compression = 0,
240240
rocksdb_snappy_compression = 1,
241-
rocksdb_zlib_compression = 1,
242-
rocksdb_bz2_compression = 1
241+
rocksdb_zlib_compression = 2,
242+
rocksdb_bz2_compression = 3,
243+
rocksdb_lz4_compression = 4,
244+
rocksdb_lz4hc_compression = 5
243245
};
244246
extern void rocksdb_options_set_compression(rocksdb_options_t*, int);
245247

include/rocksdb/options.h

+2-4
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,8 @@ using std::shared_ptr;
4545
enum CompressionType : char {
4646
// NOTE: do not change the values of existing entries, as these are
4747
// part of the persistent format on disk.
48-
kNoCompression = 0x0,
49-
kSnappyCompression = 0x1,
50-
kZlibCompression = 0x2,
51-
kBZip2Compression = 0x3
48+
kNoCompression = 0x0, kSnappyCompression = 0x1, kZlibCompression = 0x2,
49+
kBZip2Compression = 0x3, kLZ4Compression = 0x4, kLZ4HCCompression = 0x5
5250
};
5351

5452
enum CompactionStyle : char {

port/port_posix.h

+65-3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@
4646
#include <bzlib.h>
4747
#endif
4848

49+
#if defined(LZ4)
50+
#include <lz4.h>
51+
#include <lz4hc.h>
52+
#endif
53+
4954
#include <stdint.h>
5055
#include <string>
5156
#include <string.h>
@@ -353,8 +358,8 @@ inline bool BZip2_Compress(const CompressionOptions& opts, const char* input,
353358
return false;
354359
}
355360

356-
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
357-
int* decompress_size) {
361+
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
362+
int* decompress_size) {
358363
#ifdef BZIP2
359364
bz_stream _stream;
360365
memset(&_stream, 0, sizeof(bz_stream));
@@ -409,7 +414,64 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
409414
return nullptr;
410415
}
411416

412-
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
417+
inline bool LZ4_Compress(const CompressionOptions &opts, const char *input,
418+
size_t length, ::std::string* output) {
419+
#ifdef LZ4
420+
int compressBound = LZ4_compressBound(length);
421+
output->resize(8 + compressBound);
422+
char *p = const_cast<char *>(output->c_str());
423+
memcpy(p, &length, sizeof(length));
424+
size_t outlen;
425+
outlen = LZ4_compress_limitedOutput(input, p + 8, length, compressBound);
426+
if (outlen == 0) {
427+
return false;
428+
}
429+
output->resize(8 + outlen);
430+
return true;
431+
#endif
432+
return false;
433+
}
434+
435+
inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
436+
int* decompress_size) {
437+
#ifdef LZ4
438+
if (input_length < 8) {
439+
return nullptr;
440+
}
441+
int output_len;
442+
memcpy(&output_len, input_data, sizeof(output_len));
443+
char *output = new char[output_len];
444+
*decompress_size = LZ4_decompress_safe_partial(
445+
input_data + 8, output, input_length - 8, output_len, output_len);
446+
if (*decompress_size < 0) {
447+
delete[] output;
448+
return nullptr;
449+
}
450+
return output;
451+
#endif
452+
return nullptr;
453+
}
454+
455+
inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input,
456+
size_t length, ::std::string* output) {
457+
#ifdef LZ4
458+
int compressBound = LZ4_compressBound(length);
459+
output->resize(8 + compressBound);
460+
char *p = const_cast<char *>(output->c_str());
461+
memcpy(p, &length, sizeof(length));
462+
size_t outlen;
463+
outlen = LZ4_compressHC2_limitedOutput(input, p + 8, length, compressBound,
464+
opts.level);
465+
if (outlen == 0) {
466+
return false;
467+
}
468+
output->resize(8 + outlen);
469+
return true;
470+
#endif
471+
return false;
472+
}
473+
474+
inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) {
413475
return false;
414476
}
415477

0 commit comments

Comments
 (0)