Skip to content

Commit

Permalink
Avoid making two write requests per record
Browse files Browse the repository at this point in the history
Summary: By preallocating space for a record header at the start of record buffers, we can avoid having to make two file write requests when writing records.

Reviewed By: jtbraun

Differential Revision: D60557273

fbshipit-source-id: cabb2e893a411b6c4900639491b553f7c9183039
  • Loading branch information
Georges Berenger authored and facebook-github-bot committed Aug 2, 2024
1 parent 694b6f3 commit f3a5ef5
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 42 deletions.
34 changes: 23 additions & 11 deletions vrs/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,17 @@ class Compressor::CompressorImpl {
vector<uninitialized_byte>& buffer,
const void* data,
size_t dataSize,
CompressionPreset preset) {
CompressionPreset preset,
size_t headerSpace) {
const auto* prefs = getLz4Preferences(preset);
size_t maxCompressedSize = LZ4F_compressFrameBound(dataSize, prefs);
// increase our internal buffer size if necessary
if (buffer.size() < maxCompressedSize) {
if (buffer.size() < headerSpace + maxCompressedSize) {
buffer.resize(0); // avoid copy of current data when resizing
buffer.resize(maxCompressedSize);
buffer.resize(headerSpace + maxCompressedSize);
}
size_t result = LZ4F_compressFrame(buffer.data(), maxCompressedSize, data, dataSize, prefs);
size_t result =
LZ4F_compressFrame(buffer.data() + headerSpace, maxCompressedSize, data, dataSize, prefs);
if (LZ4F_isError(result)) {
XR_LOGE("Compression error {}", LZ4F_getErrorName(result));
compressionType_ = CompressionType::None;
Expand All @@ -146,19 +148,25 @@ class Compressor::CompressorImpl {
vector<uninitialized_byte>& buffer,
const void* data,
size_t dataSize,
CompressionPreset preset) {
CompressionPreset preset,
size_t headerSpace) {
int compressionLevel = sZstdPresets[preset];
size_t maxCompressedSize = ZSTD_compressBound(dataSize);
// increase our internal buffer size if necessary
if (buffer.size() < maxCompressedSize) {
if (buffer.size() < headerSpace + maxCompressedSize) {
buffer.resize(0); // avoid copy of current data when resizing
buffer.resize(maxCompressedSize);
buffer.resize(headerSpace + maxCompressedSize);
}
if (zstdContext_ == nullptr) {
zstdContext_ = ZSTD_createCCtx();
}
size_t result = ZSTD_compressCCtx(
zstdContext_, buffer.data(), buffer.size(), data, dataSize, compressionLevel);
zstdContext_,
buffer.data() + headerSpace,
maxCompressedSize,
data,
dataSize,
compressionLevel);
if (ZSTD_isError(result)) {
XR_LOGE("Compression error {}", ZSTD_getErrorName(result));
compressionType_ = CompressionType::None;
Expand Down Expand Up @@ -250,14 +258,18 @@ Compressor::Compressor() : impl_(new CompressorImpl()) {}

Compressor::~Compressor() = default;

uint32_t Compressor::compress(const void* data, size_t dataSize, CompressionPreset preset) {
uint32_t Compressor::compress(
const void* data,
size_t dataSize,
CompressionPreset preset,
size_t headerSpace) {
if (shouldTryToCompress(preset, dataSize)) {
if (preset >= CompressionPreset::FirstLz4Preset && preset <= CompressionPreset::LastLz4Preset) {
return impl_->lz4Compress(buffer_, data, dataSize, preset);
return impl_->lz4Compress(buffer_, data, dataSize, preset, headerSpace);
}
if (preset >= CompressionPreset::FirstZstdPreset &&
preset <= CompressionPreset::LastZstdPreset) {
return impl_->zstdCompress(buffer_, data, dataSize, preset);
return impl_->zstdCompress(buffer_, data, dataSize, preset, headerSpace);
}
}
return 0; // means failure
Expand Down
10 changes: 9 additions & 1 deletion vrs/Compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ class Compressor {
/// @param data: Pointer to the data to compress.
/// @param dataSize: Number of bytes in the buffer to compress.
/// @param preset: Compression preset to use.
/// @param headerSpace: Number of bytes to reserve at the beginning of the buffer for a header
/// initialized manually later.
/// @return The number of bytes of compressed data, or 0 in case of failure.
uint32_t compress(const void* data, size_t dataSize, CompressionPreset preset);
uint32_t
compress(const void* data, size_t dataSize, CompressionPreset preset, size_t headerSpace = 0);

/// Frame compression APIs, with streaming to a file.
/// Write to a file a block of data (a "frame") to be compressed. That data will be logically self
Expand Down Expand Up @@ -136,6 +139,11 @@ class Compressor {
const void* getData() const {
return buffer_.data();
}
/// Get the space reserved for a header.
template <class HeaderType>
HeaderType* getHeader() {
return reinterpret_cast<HeaderType*>(buffer_.data());
}
CompressionType getCompressionType() const;

/// Really deallocate the buffer's memory (clear() doesn't do that)
Expand Down
20 changes: 20 additions & 0 deletions vrs/FileFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,26 @@ RecordHeader::RecordHeader(
this->uncompressedSize.set(uncompressedSize);
}

void RecordHeader::initHeader(
Record::Type _recordType,
StreamId _streamId,
double _timestamp,
uint32_t _formatVersion,
CompressionType _compressionType,
uint32_t _previousRecordSize,
uint32_t _recordSize,
uint32_t _uncompressedSize) {
this->recordSize.set(_recordSize);
this->previousRecordSize.set(_previousRecordSize);
this->recordableTypeId.set(static_cast<int32_t>(_streamId.getTypeId()));
this->formatVersion.set(_formatVersion);
this->timestamp.set(_timestamp);
this->recordableInstanceId.set(_streamId.getInstanceId());
setRecordType(_recordType);
setCompressionType(_compressionType);
this->uncompressedSize.set(_uncompressedSize);
}

void RecordHeader::initIndexHeader(
uint32_t _formatVersion,
uint32_t indexSize,
Expand Down
12 changes: 12 additions & 0 deletions vrs/FileFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ struct RecordHeader {
this->compressionType.set(static_cast<uint8_t>(type));
}

/// Initialize this header, for use as a regular record.
/// Same parameters as the constructor.
void initHeader(
Record::Type recordType,
StreamId streamId,
double timestamp,
uint32_t formatVersion,
CompressionType compressionType,
uint32_t previousRecordSize,
uint32_t recordSize,
uint32_t uncompressedSize);

/// Initialize this header, for use as an index record.
/// @param formatVersion: Record format version for this index record.
/// @param indexSize: Payload size of the index record (without header).
Expand Down
57 changes: 32 additions & 25 deletions vrs/Record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

using namespace std;

namespace {
constexpr size_t kRecordHeaderSize = sizeof(vrs::FileFormat::RecordHeader);
}

namespace vrs {

namespace {
Expand Down Expand Up @@ -65,30 +69,33 @@ void Record::set(
uint32_t formatVersion,
const DataSource& data,
uint64_t creationOrder) {
this->timestamp_ = timestamp;
this->recordType_ = type;
this->formatVersion_ = formatVersion;
bufferUsedSize_ = data.getDataSize();
if (bufferUsedSize_ > 0) {
// only resize if we have to
if (buffer_.size() < bufferUsedSize_) {
// If we're going to reallocate our buffer, then ask for a bit more right away...
if (bufferUsedSize_ > buffer_.capacity()) {
buffer_.resize(0); // make sure we don't copy existing data for no reason!
}
buffer_.resize(bufferUsedSize_);
timestamp_ = timestamp;
recordType_ = type;
formatVersion_ = formatVersion;
usedBufferSize_ = data.getDataSize();
uint64_t allocateSize = kRecordHeaderSize + usedBufferSize_;
// only resize if we have to
if (buffer_.size() < allocateSize) {
// If we're going to reallocate our buffer, then ask for a bit more right away...
if (allocateSize > buffer_.capacity()) {
buffer_.resize(0); // make sure we don't copy existing data for no reason!
}
data.copyTo(&buffer_[0].byte);
buffer_.resize(allocateSize);
}
this->creationOrder_ = creationOrder;
data.copyTo(&buffer_.data()->byte + kRecordHeaderSize);
creationOrder_ = creationOrder;
}

bool Record::shouldTryToCompress() const {
return Compressor::shouldTryToCompress(recordManager_.getCompression(), bufferUsedSize_);
return Compressor::shouldTryToCompress(recordManager_.getCompression(), usedBufferSize_);
}

uint32_t Record::compressRecord(Compressor& compressor) {
return compressor.compress(buffer_.data(), bufferUsedSize_, recordManager_.getCompression());
return compressor.compress(
buffer_.data() + kRecordHeaderSize,
usedBufferSize_,
recordManager_.getCompression(),
kRecordHeaderSize);
}

int Record::writeRecord(
Expand All @@ -99,22 +106,23 @@ int Record::writeRecord(
uint32_t compressedSize) {
CompressionType compressionType = compressor.getCompressionType();
if (compressionType != CompressionType::None && compressedSize > 0) {
uint32_t recordSize = static_cast<uint32_t>(sizeof(FileFormat::RecordHeader) + compressedSize);
FileFormat::RecordHeader recordHeader(
uint32_t recordSize = static_cast<uint32_t>(kRecordHeaderSize + compressedSize);
auto* header = compressor.getHeader<FileFormat::RecordHeader>();
header->initHeader(
getRecordType(),
streamId,
timestamp_,
formatVersion_,
compressionType,
inOutRecordSize,
recordSize,
static_cast<uint32_t>(bufferUsedSize_));
WRITE_OR_LOG_AND_RETURN(file, &recordHeader, sizeof(recordHeader));
WRITE_OR_LOG_AND_RETURN(file, compressor.getData(), compressedSize);
static_cast<uint32_t>(usedBufferSize_));
WRITE_OR_LOG_AND_RETURN(file, header, recordSize);
inOutRecordSize = recordSize;
} else {
uint32_t recordSize = static_cast<uint32_t>(sizeof(FileFormat::RecordHeader) + bufferUsedSize_);
FileFormat::RecordHeader recordHeader(
uint32_t recordSize = static_cast<uint32_t>(kRecordHeaderSize + usedBufferSize_);
auto* header = reinterpret_cast<FileFormat::RecordHeader*>(buffer_.data());
header->initHeader(
getRecordType(),
streamId,
timestamp_,
Expand All @@ -123,8 +131,7 @@ int Record::writeRecord(
inOutRecordSize,
recordSize,
0);
WRITE_OR_LOG_AND_RETURN(file, &recordHeader, sizeof(recordHeader));
WRITE_OR_LOG_AND_RETURN(file, buffer_.data(), bufferUsedSize_);
WRITE_OR_LOG_AND_RETURN(file, header, recordSize);
inOutRecordSize = recordSize;
}
return 0;
Expand Down
8 changes: 5 additions & 3 deletions vrs/Record.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ namespace vrs {
class DataSource;
class Compressor;
class RecordManager;
class FileHandler;

/// Type of compression. Used in VRS record headers, so never modify the values.
enum class CompressionType : uint8_t {
Expand Down Expand Up @@ -69,6 +68,9 @@ enum class CompressionType : uint8_t {
/// records whenever data is received from some kind of device driver, or arbitrarily in the
/// case of synthetic data.
///
/// Records and compressed Records have space for a RecordHeader allocated before their data,
/// so records can be written in a single write operation.
///
/// See Recordable::createRecord() to see how to create records.
class Record final {
public:
Expand Down Expand Up @@ -131,7 +133,7 @@ class Record final {

/// Get the record's payload size, uncompressed.
size_t getSize() const {
return bufferUsedSize_;
return usedBufferSize_;
}

/// Get the record's record type.
Expand Down Expand Up @@ -160,7 +162,7 @@ class Record final {
Type recordType_{};
uint32_t formatVersion_{};
std::vector<uninitialized_byte> buffer_;
size_t bufferUsedSize_{};
size_t usedBufferSize_{};
uint64_t creationOrder_{};

RecordManager& recordManager_;
Expand Down
4 changes: 3 additions & 1 deletion vrs/RecordFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,9 @@ void RecordFileWriter::writeOneRecord(
if (timestamp > rwd.newest) {
rwd.newest = timestamp;
}
if (rwd.currentChunkSize > 0 && rwd.currentChunkSize + record->getSize() >= maxChunkSize_) {
size_t recordDiskSize =
sizeof(FileFormat::RecordHeader) + (compressedSize > 0 ? compressedSize : record->getSize());
if (rwd.currentChunkSize > 0 && rwd.currentChunkSize + recordDiskSize >= maxChunkSize_) {
NewChunkNotifier newChunkNotifier(*file_, newChunkHandler_);
// AddChunk() preserves the current chunk on error.
XR_VERIFY(
Expand Down
3 changes: 2 additions & 1 deletion vrs/RecordManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "Compressor.h"
#include "DataSource.h"
#include "FileFormat.h"

using namespace std;

Expand Down Expand Up @@ -59,7 +60,7 @@ Record* RecordManager::createRecord(
// Step 1: find a record to reuse, if there is any
Record* record = nullptr;
unique_lock<mutex> guard{mutex_};
const size_t dataSize = data.getDataSize();
const size_t dataSize = sizeof(FileFormat::RecordHeader) + data.getDataSize();
const size_t maxSize = getAcceptableOverCapacity(dataSize);
// reuse the most recently inserted records first, as they're less likely to have been swapped out
// of memory
Expand Down

0 comments on commit f3a5ef5

Please sign in to comment.