Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support gpu on Transpiler Async Mode #25034

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 61 additions & 17 deletions paddle/fluid/operators/distributed/parameter_prefetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void prefetch_core(
const framework::ExecutionContext& context, const framework::Scope& scope,
std::unordered_map<int64_t, std::vector<float>>* recved_vec_map) {
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& actual_ctx = *pool.Get(context.GetPlace());
// auto& actual_ctx = *pool.Get(context.GetPlace());
auto& actual_ctx = *pool.Get(platform::CPUPlace());

std::unique_ptr<framework::Scope> local_scope = scope.NewTmpScope();

Expand All @@ -104,8 +105,28 @@ void prefetch_core(
local_scope.get());

// create output var in local scope
for (auto& name : out_var_names) {
local_scope->Var(name)->GetMutable<framework::LoDTensor>();
// for (auto& name : out_var_names) {
// out_var_tensor =
// local_scope->Var(name)->GetMutable<framework::LoDTensor>();
// }

// create output var in local scope cpu place
auto embedding_name = context.InputNames("W").front();
framework::Variable* embedding_var = scope.FindVar(embedding_name);
auto embedding_var_dim_1 =
embedding_var->Get<framework::LoDTensor>().dims()[1];

for (size_t i = 0; i < out_var_names.size(); ++i) {
auto* out_var_tensor =
local_scope->Var(out_var_names[i])->GetMutable<framework::LoDTensor>();
// out_var_tensor->mutable_data<float>(platform::CPUPlace());
auto& ids = splited_ids[i];
if (!ids.empty()) {
out_var_tensor->mutable_data<float>(
framework::make_ddim({static_cast<int64_t>(ids.size()),
static_cast<int64_t>(embedding_var_dim_1)}),
platform::CPUPlace());
}
}

distributed::RPCClient* rpc_client =
Expand Down Expand Up @@ -199,9 +220,9 @@ void prefetchs(const std::vector<std::string>& id_var_names,
const auto place =
scope.FindVar(id_var_names[0])->Get<framework::LoDTensor>().place();

if (!platform::is_cpu_place(place)) {
PADDLE_THROW("multi prefetch only support CPU currently");
}
// if (!platform::is_cpu_place(place)) {
// PADDLE_THROW("multi prefetch only support CPU currently");
// }

std::vector<std::vector<int64_t>> ids_group;
std::vector<int64_t> ids_union;
Expand All @@ -210,13 +231,19 @@ void prefetchs(const std::vector<std::string>& id_var_names,

for (auto& id_name : id_var_names) {
auto& id_tensor = scope.FindVar(id_name)->Get<framework::LoDTensor>();
auto* id_data = id_tensor.data<int64_t>();
std::vector<int64_t> ids;

for (int64_t i = 0; i < id_tensor.numel(); ++i) {
ids.push_back(id_data[i]);
ids_union.push_back(id_data[i]);
}
std::vector<int64_t> ids_union_part;
TensorToVector(id_tensor, context.device_context(), &ids);
VLOG(1) << "Parameter Prefetch: size(): " << ids.size() << " ids[0] "
<< ids[0];
ids_union.insert(ids_union.end(), ids.begin(), ids.end());
// auto* id_data = id_tensor.data<int64_t>();
// std::vector<int64_t> ids;

// for (int64_t i = 0; i < id_tensor.numel(); ++i) {
// ids.push_back(id_data[i]);
// ids_union.push_back(id_data[i]);
// }
ids_group.push_back(ids);
ids_lods.push_back(id_tensor.lod());
}
Expand Down Expand Up @@ -251,12 +278,29 @@ void prefetchs(const std::vector<std::string>& id_var_names,

for (size_t idx = 0; idx < ids.size(); idx++) {
const auto& id = ids[idx];

if (padding_idx != distributed::kNoPadding && id == padding_idx) {
memset(out_d + idx * vec_dim_1, 0, sizeof(float) * vec_dim_1);
if (platform::is_cpu_place(out_t->place())) {
if (padding_idx != distributed::kNoPadding && id == padding_idx) {
memset(out_d + idx * vec_dim_1, 0, sizeof(float) * vec_dim_1);
} else {
std::copy_n(recved_vec_map[id].begin(), vec_dim_1,
out_d + idx * vec_dim_1);
}
} else {
std::copy_n(recved_vec_map[id].begin(), vec_dim_1,
out_d + idx * vec_dim_1);
auto stream = context.cuda_device_context().stream();
if (padding_idx != distributed::kNoPadding && id == padding_idx) {
platform::GpuMemsetAsync(out_d + idx * vec_dim_1, 0,
sizeof(float) * vec_dim_1, stream);

} else {
auto& cpu_place =
BOOST_GET_CONST(platform::CPUPlace,
paddle::platform::CPUDeviceContext().GetPlace());
auto& gpu_place =
BOOST_GET_CONST(platform::CUDAPlace, out_t->place());
memory::Copy(gpu_place, out_d + idx * vec_dim_1, cpu_place,
&recved_vec_map[id][0], sizeof(float) * vec_dim_1,
stream);
}
}
}
}
Expand Down
20 changes: 17 additions & 3 deletions paddle/fluid/operators/distributed/parameter_recv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,23 @@ void ParameterRecv<T>::operator()(const RpcContext &rpc_ctx,
recv_numel += in.numel();
auto in_stride = framework::stride_numel(in.dims());
auto out_stride = framework::stride_numel(recv_tensor->dims());
StridedNumelCopyWithAxis<T>(
dev_ctx, 0, recv_tensor->data<T>() + output_offset, out_stride,
in.data<T>(), in_stride, in_stride[0]);
if (platform::is_cpu_place(recv_tensor->place())) {
VLOG(1) << "StridedNumelCopyWithAxis CPU Begin";
auto cpu_ctx = paddle::platform::CPUDeviceContext();
StridedNumelCopyWithAxis<T>(
cpu_ctx, 0, recv_tensor->data<T>() + output_offset, out_stride,
in.data<T>(), in_stride, in_stride[0]);
} else {
VLOG(1) << "StridedNumelCopyWithAxis CPU<->GPU Begin";
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto *gpu_ctx = reinterpret_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(
recv_tensor->place()));

StridedNumelCopyWithAxis<T>(
gpu_ctx, cpu_ctx, 0, recv_tensor->data<T>() + output_offset,
out_stride, in.data<T>(), in_stride, in_stride[0]);
}
output_offset += in_stride[0];
} else if (recv_var->IsType<framework::SelectedRows>()) {
auto &recv_slr = recv_var->Get<framework::SelectedRows>();
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/operators/distributed/sendrecvop_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static TensorPayload GetCommunicationAllocationFromTensor(
memory::Copy(cuda_pinned, result->ptr(),
BOOST_GET_CONST(platform::CUDAPlace, tensor.place()),
tensor.data<void>(), copy_size, gpu_dev_ctx.stream());
ctx.Wait();
// ctx.Wait();
return TensorPayload(result);
#else
PADDLE_THROW("This situation should not be happened");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License. */
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/distributed/parameter_prefetch.h"
#include "paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.h"
#include "paddle/fluid/operators/math/math_function.h"

namespace paddle {
Expand Down Expand Up @@ -78,28 +79,6 @@ class DistributedLookupTableOp : public framework::OperatorWithKernel {
}
};

template <typename T>
class DistributedLookupTableKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &context) const override {
auto ids_vars = context.MultiInputVar("Ids");
auto emb_vars = context.MultiOutput<framework::Tensor>("Embeddings");

auto id_names = context.InputNames("Ids");
auto embedding_name = context.InputNames("W").front();
auto out_names = context.OutputNames("Outputs");

auto lookup_tables = context.Attr<std::vector<std::string>>("table_names");
auto height_sections =
context.Attr<std::vector<int64_t>>("height_sections");
auto endpoints = context.Attr<std::vector<std::string>>("endpoints");

operators::distributed::prefetchs(
id_names, out_names, embedding_name, false, lookup_tables, endpoints,
height_sections, context, context.scope());
}
};

class DistributedLookupTableOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
Expand Down Expand Up @@ -168,4 +147,5 @@ REGISTER_OPERATOR(distributed_lookup_table, ops::DistributedLookupTableOp,
ops::DistributedLookupTableOpMaker);

REGISTER_OP_CPU_KERNEL(distributed_lookup_table,
ops::DistributedLookupTableKernel<float>);
ops::DistributedLookupTableKernel<
paddle::platform::CPUDeviceContext, float>);
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.h"

namespace ops = paddle::operators;
namespace plat = paddle::platform;

REGISTER_OP_CUDA_KERNEL(
distributed_lookup_table,
ops::DistributedLookupTableKernel<plat::CUDADeviceContext, float>);
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once
#include <algorithm>
#include <string>
#include <vector>
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/distributed/parameter_prefetch.h"
#include "paddle/fluid/operators/math/math_function.h"

namespace paddle {
namespace operators {

template <typename DeviceContext, typename T>
class DistributedLookupTableKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &context) const override {
auto ids_vars = context.MultiInputVar("Ids");
auto emb_vars = context.MultiOutput<framework::Tensor>("Embeddings");

auto id_names = context.InputNames("Ids");
auto embedding_name = context.InputNames("W").front();
auto out_names = context.OutputNames("Outputs");

auto lookup_tables = context.Attr<std::vector<std::string>>("table_names");
auto height_sections =
context.Attr<std::vector<int64_t>>("height_sections");
auto endpoints = context.Attr<std::vector<std::string>>("endpoints");

operators::distributed::prefetchs(
id_names, out_names, embedding_name, false, lookup_tables, endpoints,
height_sections, context, context.scope());
}
};
} // namespace operators
} // namespace paddle
47 changes: 47 additions & 0 deletions paddle/fluid/operators/strided_memcpy.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,53 @@ inline void StridedNumelCopyWithAxis(const platform::DeviceContext& ctx,
}
}

template <typename T>
inline void StridedNumelCopyWithAxis(platform::CUDADeviceContext* dst_ctx,
const platform::DeviceContext& src_ctx,
int64_t axis, T* dst,
const framework::DDim& dst_stride_numel,
const T* src,
const framework::DDim& src_stride_numel,
int64_t size) {
#ifdef PADDLE_WITH_CUDA
int64_t before = dst_stride_numel[0] / dst_stride_numel[axis];
int64_t src_after = src_stride_numel[axis];
int64_t dst_after = dst_stride_numel[axis];

auto dst_place = dst_ctx->GetPlace();
auto src_place = src_ctx.GetPlace();
VLOG(1) << "StridedNumelCopyWithAxis CPU<->CUDA Copy before: " << before
<< " dst_after: " << dst_after << " src_after: " << src_after;
PADDLE_ENFORCE_EQ(src_stride_numel.size(), dst_stride_numel.size(),
"src and dst tensor should have the same dims size.");

for (int64_t i = 0; i < axis; ++i) {
if (i < axis) {
PADDLE_ENFORCE_EQ(src_stride_numel[i] / src_stride_numel[axis],
dst_stride_numel[i] / dst_stride_numel[axis],
"src and dst should have the same elements "
"except the specified axis.");
} else if (i == axis) {
continue;
} else {
PADDLE_ENFORCE_EQ(src_stride_numel[i], dst_stride_numel[i],
"src and dst should have the same elements "
"except the specified axis.");
}
}

for (int64_t i = 0; i < before; ++i) {
VLOG(1) << "StridedNumelCopyWithAxis CPU<->GPU Copy";
auto& cpu_place = BOOST_GET_CONST(platform::CPUPlace, src_place);
auto& gpu_place = BOOST_GET_CONST(platform::CUDAPlace, dst_place);
memory::Copy(gpu_place, dst + i * dst_after, cpu_place, src + i * src_after,
sizeof(T) * size, dst_ctx->stream());
}
#else
PADDLE_THROW("Paddle is not compiled with GPU");
#endif
}

template <typename T>
inline void StridedMemcpyWithAxis0(
const platform::DeviceContext& dev_ctx, const framework::Tensor& input,
Expand Down