Skip to content

Commit

Permalink
fix: Revert Optimize PrestoBatchVectorSerializer [1/7]: Serialize Fla…
Browse files Browse the repository at this point in the history
…tVectors (#12291)

Summary:
Pull Request resolved: #12291

Revert
#12060
The introduction of state to PrestoBatchVectorSerializer makes it thread unsafe breaking existing use cases.

Reviewed By: kparichay

Differential Revision: D69377518

fbshipit-source-id: 1dc491097c9223e83718896cf10dfb7c15079d94
  • Loading branch information
Dark Knight authored and facebook-github-bot committed Feb 10, 2025
1 parent 54cee38 commit 35711b7
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 576 deletions.
92 changes: 3 additions & 89 deletions velox/serializers/PrestoBatchVectorSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ void PrestoBatchVectorSerializer::serialize(
const auto rowType = vector->type();
const auto numChildren = vector->childrenSize();

StreamArena arena(pool_);
std::vector<VectorStream> streams;
streams.reserve(numChildren);
for (int i = 0; i < numChildren; i++) {
streams.emplace_back(
rowType->childAt(i),
std::nullopt,
vector->childAt(i),
&arena_,
&arena,
numRows,
opts_);

Expand All @@ -47,9 +48,7 @@ void PrestoBatchVectorSerializer::serialize(
}

flushStreams(
streams, numRows, arena_, *codec_, opts_.minCompressionRatio, stream);

arena_.clear();
streams, numRows, arena, *codec_, opts_.minCompressionRatio, stream);
}

void PrestoBatchVectorSerializer::estimateSerializedSizeImpl(
Expand Down Expand Up @@ -179,89 +178,4 @@ void PrestoBatchVectorSerializer::estimateSerializedSizeImpl(
VELOX_UNSUPPORTED("Unsupported vector encoding {}", vector->encoding());
}
}

void PrestoBatchVectorSerializer::writeHeader(
BufferedOutputStream* stream,
const TypePtr& type) {
auto encoding = typeToEncodingName(type);
writeInt32(stream, encoding.size());
stream->write(encoding.data(), encoding.size());
}

template <>
bool PrestoBatchVectorSerializer::hasNulls(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges) {
if (vector->nulls()) {
for (auto& range : ranges) {
if (!bits::isAllSet(
vector->rawNulls(), range.begin, range.begin + range.size)) {
return true;
}
}
}

return false;
}

template <>
bool PrestoBatchVectorSerializer::hasNulls(
const VectorPtr& vector,
const folly::Range<const IndexRangeWithNulls*>& ranges) {
if (vector->nulls()) {
for (auto& range : ranges) {
if (range.isNull ||
!bits::isAllSet(
vector->rawNulls(), range.begin, range.begin + range.size)) {
return true;
}
}
} else {
for (auto& range : ranges) {
if (range.isNull) {
return true;
}
}
}

return false;
}

template <>
void PrestoBatchVectorSerializer::writeNulls(
BufferedOutputStream* stream,
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
const vector_size_t numRows) {
VELOX_DCHECK_EQ(numRows, rangesTotalSize(ranges));

nulls_.startWrite(bits::nbytes(numRows));
for (auto& range : ranges) {
nulls_.appendBits(
vector->rawNulls(), range.begin, range.begin + range.size);
}
nulls_.flush(stream);
}

template <>
void PrestoBatchVectorSerializer::writeNulls(
BufferedOutputStream* stream,
const VectorPtr& vector,
const folly::Range<const IndexRangeWithNulls*>& ranges,
const vector_size_t numRows) {
VELOX_DCHECK_EQ(numRows, rangesTotalSize(ranges));

nulls_.startWrite(bits::nbytes(numRows));
for (auto& range : ranges) {
if (range.isNull) {
nulls_.appendBool(bits::kNull, range.size);
} else if (vector->mayHaveNulls()) {
nulls_.appendBits(
vector->rawNulls(), range.begin, range.begin + range.size);
} else {
nulls_.appendBool(bits::kNotNull, range.size);
}
}
nulls_.flush(stream);
}
} // namespace facebook::velox::serializer::presto::detail
Loading

0 comments on commit 35711b7

Please sign in to comment.