Skip to content

Commit

Permalink
misc: Enforce the max size of a write to spill file (#12446)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #12446

In PrestoSQL,  spilling can expose degenerate structures which are over 2GB (int32_t) in size. While being over int32_t does not have intrinsic issues, the Presto wired format requires that the size of the payload be at most 4 bytes. This means that for PrestoSQL case, we cannot spill if the payload is > int32_t. Moreover, many of the serializers have a limit of int32_t. For now, on the write path, just fail the write if a single write is > int32_t.

Reviewed By: xiaoxmeng

Differential Revision: D70187465

fbshipit-source-id: 2f8517bcf23ae6eba328ed068f81e1cba85192a6
  • Loading branch information
yuandagits authored and facebook-github-bot committed Feb 27, 2025
1 parent 6632054 commit 3b9239b
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
11 changes: 10 additions & 1 deletion velox/exec/Spill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ void SpillState::setPartitionSpilled(uint32_t partition) {
common::incrementGlobalSpilledPartitionStats();
}

/*static*/
void SpillState::validateSpillBytesSize(uint64_t bytes) {
static constexpr uint64_t kMaxSpillBytesPerWrite =
std::numeric_limits<int32_t>::max();
VELOX_CHECK_LT(bytes, kMaxSpillBytesPerWrite, "Spill bytes will overflow.");
}

void SpillState::updateSpilledInputBytes(uint64_t bytes) {
auto statsLocked = stats_->wlock();
statsLocked->spilledInputBytes += bytes;
Expand Down Expand Up @@ -164,7 +171,9 @@ uint64_t SpillState::appendToPartition(
stats_);
}

updateSpilledInputBytes(rows->estimateFlatSize());
const uint64_t bytes = rows->estimateFlatSize();
validateSpillBytesSize(bytes);
updateSpilledInputBytes(bytes);

IndexRange range{0, rows->size()};
return partitionWriters_[partition]->write(
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,13 @@ class SpillState {
SpillPartitionNumSet testingNonEmptySpilledPartitionSet() const;

private:
// Ensures that the bytes to spill is within the limit of
// maxSpillBytesPerWrite_ for a given spill write/appendToPartition call.
// NOTE: the Presto serializer used for spill serialization can't handle more
// than 2GB data size. Hence we can't spill a vector which exceeds
// 2GB serialized buffer.
void validateSpillBytesSize(uint64_t bytes);

void updateSpilledInputBytes(uint64_t bytes);

SpillWriter* partitionWriter(uint32_t partition) const;
Expand Down
36 changes: 36 additions & 0 deletions velox/exec/tests/SpillTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,42 @@ TEST_P(SpillTest, nonExistSpillFileOnDeletion) {
state_.reset();
}

TEST_P(SpillTest, validatePerSpillWriteSize) {
struct TestRowVector : RowVector {
explicit TestRowVector(std::shared_ptr<const Type> type)
: RowVector(nullptr, type, nullptr, 0, {}) {}

uint64_t estimateFlatSize() const override {
// Return 10GB
constexpr uint64_t tenGB = 10 * static_cast<uint64_t>(1024 * 1024 * 1024);
return tenGB;
}
};

auto tempDirectory = exec::test::TempDirectoryPath::create();
SpillState state(
[&]() -> const std::string& { return tempDirectory->getPath(); },
updateSpilledBytesCb_,
"test",
1,
1,
{},
1024,
0,
compressionKind_,
std::nullopt,
pool(),
&spillStats_,
"");
int partitionIndex = 0;
state.setPartitionSpilled(partitionIndex);
ASSERT_TRUE(state.isPartitionSpilled(partitionIndex));
VELOX_ASSERT_THROW(
state.appendToPartition(
partitionIndex, std::make_shared<TestRowVector>(HUGEINT())),
"Spill bytes will overflow");
}

namespace {
SpillFiles makeFakeSpillFiles(int32_t numFiles) {
auto tempDir = exec::test::TempDirectoryPath::create();
Expand Down

0 comments on commit 3b9239b

Please sign in to comment.