Skip to content

Commit

Permalink
Add checkUsageLeak flag to arbitrator (#8219)
Browse files Browse the repository at this point in the history
Summary:
SharedArbitrator does sanity check upon destruction. Add this check to be enabled only if checkUsageLeak is true.
Add Running and Queuing information to SharedArbitrator::toString()

Pull Request resolved: #8219

Reviewed By: xiaoxmeng, amitkdutta

Differential Revision: D52491759

Pulled By: tanjialiang

fbshipit-source-id: a11c3c3823ead15c44fe3914aaa8b0c593cbc3c6
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Jan 4, 2024
1 parent fefd586 commit f12b055
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 7 deletions.
3 changes: 2 additions & 1 deletion velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ std::unique_ptr<MemoryArbitrator> createArbitrator(
std::min(options.arbitratorCapacity, options.allocatorCapacity),
.memoryPoolTransferCapacity = options.memoryPoolTransferCapacity,
.memoryReclaimWaitMs = options.memoryReclaimWaitMs,
.arbitrationStateCheckCb = options.arbitrationStateCheckCb});
.arbitrationStateCheckCb = options.arbitrationStateCheckCb,
.checkUsageLeak = options.checkUsageLeak});
}
} // namespace

Expand Down
10 changes: 9 additions & 1 deletion velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ class MemoryArbitrator {
/// the potential deadlock when reclaim memory from the task of the request
/// memory pool.
MemoryArbitrationStateCheckCB arbitrationStateCheckCb{nullptr};

/// If true, do sanity check on the arbitrator state on destruction.
///
/// TODO: deprecate this flag after all the existing memory leak use cases
/// have been fixed.
bool checkUsageLeak{true};
};

using Factory = std::function<std::unique_ptr<MemoryArbitrator>(
Expand Down Expand Up @@ -227,12 +233,14 @@ class MemoryArbitrator {
: capacity_(config.capacity),
memoryPoolTransferCapacity_(config.memoryPoolTransferCapacity),
memoryReclaimWaitMs_(config.memoryReclaimWaitMs),
arbitrationStateCheckCb_(config.arbitrationStateCheckCb) {}
arbitrationStateCheckCb_(config.arbitrationStateCheckCb),
checkUsageLeak_(config.checkUsageLeak) {}

const uint64_t capacity_;
const uint64_t memoryPoolTransferCapacity_;
const uint64_t memoryReclaimWaitMs_;
const MemoryArbitrationStateCheckCB arbitrationStateCheckCb_;
const bool checkUsageLeak_;
};

FOLLY_ALWAYS_INLINE std::ostream& operator<<(
Expand Down
6 changes: 3 additions & 3 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ TEST_F(MemoryManagerTest, Ctor) {
"pools 1\nList of root pools:\n\t__sys_root__\n"
"Memory Allocator[MALLOC capacity 4.00GB allocated bytes 0 "
"allocated pages 0 mapped pages 0]\n"
"ARBITRATOR[SHARED CAPACITY[4.00GB] STATS[numRequests 0 numSucceeded 0 "
"numAborted 0 numFailures 0 numNonReclaimableAttempts 0 "
"numReserves 0 numReleases 0 queueTime 0us "
"ARBITRATOR[SHARED CAPACITY[4.00GB] RUNNING[false] QUEUING[0] "
"STATS[numRequests 0 numSucceeded 0 numAborted 0 numFailures 0 "
"numNonReclaimableAttempts 0 numReserves 0 numReleases 0 queueTime 0us "
"arbitrationTime 0us reclaimTime 0us shrunkMemory 0B "
"reclaimedMemory 0B maxCapacity 4.00GB freeCapacity 4.00GB]]]");
}
Expand Down
18 changes: 16 additions & 2 deletions velox/exec/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,19 @@ SharedArbitrator::findCandidateWithLargestCapacity(
}

SharedArbitrator::~SharedArbitrator() {
VELOX_CHECK_EQ(freeCapacity_, capacity_, "{}", toString());
if (freeCapacity_ != capacity_) {
const std::string errMsg = fmt::format(
"\"There is unexpected free capacity not given back to arbitrator "
"on destruction: freeCapacity_ != capacity_ ({} vs {})\\n{}\"",
freeCapacity_,
capacity_,
toString());
if (checkUsageLeak_) {
VELOX_FAIL(errMsg);
} else {
LOG(ERROR) << errMsg;
}
}
}

uint64_t SharedArbitrator::growCapacity(
Expand Down Expand Up @@ -536,9 +548,11 @@ std::string SharedArbitrator::toString() const {

std::string SharedArbitrator::toStringLocked() const {
return fmt::format(
"ARBITRATOR[{} CAPACITY[{}] {}]",
"ARBITRATOR[{} CAPACITY[{}] RUNNING[{}] QUEUING[{}] {}]",
kind_,
succinctBytes(capacity_),
running_ ? "true" : "false",
waitPromises_.size(),
statsLocked().toString());
}

Expand Down

0 comments on commit f12b055

Please sign in to comment.