Skip to content

Commit

Permalink
release tensor by ReleaseTensor instruction (#4737)
Browse files Browse the repository at this point in the history
* release tensor by instructions, update shut_down_util, skip tensor releasing when exiting

Signed-off-by: daquexian <daquexian566@gmail.com>

* Captures shared_ptr instread of raw pointer

Co-authored-by: lixinqi <lixinqi0703106@163.com>
  • Loading branch information
daquexian and lixinqi authored Apr 26, 2021
1 parent 591021f commit 5170ffa
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ limitations under the License.
*/
#include <pybind11/pybind11.h>
#include "oneflow/api/python/of_api_registry.h"
#include "oneflow/core/framework/python_interpreter_util.h"
#include "oneflow/core/framework/shut_down_util.h"

namespace py = pybind11;

namespace oneflow {

ONEFLOW_API_PYBIND11_MODULE("", m) {
m.def("SetShuttingDown", []() { return SetShuttingDown().GetOrThrow(); });
m.def("SetShuttingDown", []() { return SetShuttingDown(); });
}

} // namespace oneflow
2 changes: 2 additions & 0 deletions oneflow/core/eager/eager_blob_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#include "oneflow/core/vm/allocator.h"
#include "oneflow/core/job/parallel_desc.h"
#include "oneflow/core/framework/to_string.h"
#include "oneflow/core/framework/shut_down_util.h"

namespace oneflow {
namespace eager {
Expand Down Expand Up @@ -78,6 +79,7 @@ Maybe<void> EagerBlobObject::TryAllocateBlobBodyMemory(DeviceCtx* device_ctx) {
{
// reset tensor_buffer_;
const auto& Free = [allocator, required_body_bytes](char* dptr) {
if (IsShuttingDown()) { return; }
allocator->Deallocate(dptr, required_body_bytes);
};
char* dptr = nullptr;
Expand Down
17 changes: 17 additions & 0 deletions oneflow/core/framework/instructions_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ limitations under the License.
#include "oneflow/core/vm/read_tensor_shape_arg_cb_phy_instr_operand.h"
#include "oneflow/core/vm/no_arg_cb_phy_instr_operand.h"
#include "oneflow/core/vm/access_blob_arg_cb_phy_instr_operand.h"
#include "oneflow/core/vm/release_tensor_arg_phy_instr_operand.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/tensor.h"

Expand Down Expand Up @@ -888,6 +889,22 @@ Maybe<void> InstructionsBuilder::FeedBlob(
return Maybe<void>::Ok();
}

Maybe<void> InstructionsBuilder::ReleaseTensor(
const std::shared_ptr<eager::EagerBlobObject>& eager_blob_object,
const std::shared_ptr<const ParallelDesc>& parallel_desc) {
std::string instr_name = parallel_desc->device_tag() + ".ReleaseTensor";
ObjectMsgPtr<vm::InstructionMsg> instruction = ObjectMsgPtr<vm::InstructionMsg>::New(instr_name);
const std::shared_ptr<VmLocalDepObject>& infer_local_dep_object =
JUST(eager_blob_object->infer_local_dep_object());
const std::shared_ptr<VmLocalDepObject>& compute_local_dep_object =
JUST(eager_blob_object->compute_local_dep_object());
*instruction->mutable_phy_instr_operand() = std::make_shared<vm::ReleaseTensorArgPhyInstrOperand>(
eager_blob_object, infer_local_dep_object, compute_local_dep_object);
instruction->set_parallel_desc_symbol_id(JUST(parallel_desc->symbol_id()));
instruction_list_->EmplaceBack(std::move(instruction.Mutable()));
return Maybe<void>::Ok();
}

Maybe<void> InstructionsBuilder::AccessBlobByCallback(
const std::shared_ptr<one::MirroredTensor>& tensor,
const std::function<void(uint64_t)>& callback, const std::string& modifier) {
Expand Down
3 changes: 3 additions & 0 deletions oneflow/core/framework/instructions_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ class InstructionsBuilder : public std::enable_shared_from_this<InstructionsBuil
const std::shared_ptr<compatible_py::BlobObject>& blob_object,
const std::shared_ptr<compatible_py::OpArgParallelAttribute>& op_arg_parallel_attr);

Maybe<void> ReleaseTensor(const std::shared_ptr<eager::EagerBlobObject>& eager_blob_object,
const std::shared_ptr<const ParallelDesc>& parallel_desc);

Maybe<void> AccessBlobByCallback(const std::shared_ptr<one::MirroredTensor>& tensor,
const std::function<void(uint64_t)>& callback,
const std::string& modifier);
Expand Down
4 changes: 2 additions & 2 deletions oneflow/core/framework/object.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ limitations under the License.

#include <functional>
#include "oneflow/core/framework/op_arg_util.h"
#include "oneflow/core/framework/python_interpreter_util.h"
#include "oneflow/core/framework/shut_down_util.h"

namespace oneflow {

Expand All @@ -42,7 +42,7 @@ class BlobObject : public Object {
BlobObject(int64_t object_id, const std::shared_ptr<OpArgParallelAttribute>& op_arg_parallel_attr,
const std::shared_ptr<OpArgBlobAttribute>& op_arg_blob_attr);
~BlobObject() override {
if (!(CHECK_JUST(IsShuttingDown()))) { ForceReleaseAll(); }
if (!IsShuttingDown()) { ForceReleaseAll(); }
}

std::shared_ptr<OpArgParallelAttribute> op_arg_parallel_attr() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,29 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <vector>
#include "oneflow/core/framework/python_interpreter_util.h"
#include "oneflow/core/framework/shut_down_util.h"

namespace oneflow {

namespace {

Maybe<std::vector<bool>*> GetShuttingDown() {
static std::vector<bool> shutting_down{false};
std::atomic<bool>* GetShuttingDown() {
static std::atomic<bool> shutting_down{false};
return &shutting_down;
}

} // namespace

Maybe<bool> IsShuttingDown() {
auto* shutting_down = JUST(GetShuttingDown());
CHECK_EQ_OR_RETURN(shutting_down->size(), 1);
bool is_interpreter_shutdown = (*shutting_down)[0];
bool IsShuttingDown() {
auto* shutting_down = GetShuttingDown();
bool is_interpreter_shutdown = *shutting_down;
return is_interpreter_shutdown;
}

Maybe<void> SetShuttingDown() {
auto* shutting_down = JUST(GetShuttingDown());
CHECK_EQ_OR_RETURN(shutting_down->size(), 1);
(*shutting_down)[0] = true;
return Maybe<void>::Ok();
void SetShuttingDown() {
auto* shutting_down = GetShuttingDown();
CHECK_EQ(*shutting_down, false);
*shutting_down = true;
}

} // namespace oneflow
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ limitations under the License.

namespace oneflow {

Maybe<bool> IsShuttingDown();
bool IsShuttingDown();

Maybe<void> SetShuttingDown();
void SetShuttingDown();

} // namespace oneflow

Expand Down
7 changes: 7 additions & 0 deletions oneflow/core/framework/tensor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ EagerMirroredTensorImpl::EagerMirroredTensorImpl(
eager_blob_object_(eager_blob_object) {
dtype_ = CHECK_JUST(DType::GetDTypeByDataType(eager_blob_object->blob_desc().data_type()));
tensor_storage_ = std::make_shared<TensorStorage>(eager_blob_object->tensor_buffer());
const auto& parallel_desc = this->parallel_desc();
tensor_storage_->set_releaser_hook(
[eager_blob_object, parallel_desc](const std::shared_ptr<eager::TensorBuffer>&) {
PhysicalRun([&](const std::shared_ptr<InstructionsBuilder>& builder) {
builder->ReleaseTensor(eager_blob_object, parallel_desc);
});
});
}

Maybe<VmLocalDepObject> EagerMirroredTensorImpl::infer_local_dep_object() const {
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/framework/tensor_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
#include "oneflow/core/framework/tensor_storage.h"
#include "oneflow/core/eager/eager_blob_object.h"
#include "oneflow/core/framework/vm_local_dep_object.h"
#include "oneflow/core/framework/shut_down_util.h"

namespace oneflow {
namespace one {
Expand All @@ -27,7 +28,7 @@ TensorStorage::TensorStorage(const std::shared_ptr<eager::TensorBuffer>& tensor_
: buffer_(tensor_buffer) {}

TensorStorage::~TensorStorage() {
if (releaser_hook_) { (*releaser_hook_)(buffer_); }
if (!IsShuttingDown() && releaser_hook_) { (*releaser_hook_)(buffer_); }
}

} // namespace one
Expand Down

0 comments on commit 5170ffa

Please sign in to comment.