Skip to content

Commit

Permalink
[HeterPs] merge dense && data norm && g2sum (#35029)
Browse files Browse the repository at this point in the history
* merge dense

* log level

* tensor copy sync

* format
  • Loading branch information
Thunderbrook authored Sep 1, 2021
1 parent 264ff9e commit a647b80
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 14 deletions.
4 changes: 2 additions & 2 deletions paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Optimizer {
if (w < optimizer_config::min_bound) w = optimizer_config::min_bound;
if (w > optimizer_config::max_bound) w = optimizer_config::max_bound;

add_g2sum = scaled_grad * scaled_grad;
add_g2sum += scaled_grad * scaled_grad;

g2sum += add_g2sum;
}
Expand All @@ -64,7 +64,7 @@ class Optimizer {
w[i] = optimizer_config::mf_min_bound;
if (w[i] > optimizer_config::mf_max_bound)
w[i] = optimizer_config::mf_max_bound;
add_g2sum = scaled_grad * scaled_grad;
add_g2sum += scaled_grad * scaled_grad;
}

g2sum += add_g2sum / n;
Expand Down
36 changes: 26 additions & 10 deletions paddle/fluid/framework/ps_gpu_trainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ void PSGPUTrainer::Initialize(const TrainerDesc& trainer_desc,
trainer_desc.downpour_param().stat_var_names(i));
}
VLOG(3) << "going to initialize pull dense worker";
pull_dense_worker_ = PullDenseWorker::GetInstance();
pull_dense_worker_->Initialize(trainer_desc);
SetDebug(trainer_desc.debug());
trainer_desc_ = trainer_desc;
workers_.resize(place_num);
Expand Down Expand Up @@ -112,15 +110,21 @@ void PSGPUTrainer::InitTrainerEnv(const ProgramDesc& main_program,
}
}
}
for (auto& var : main_program.Block(0).AllVars()) {
if (var->Persistable()) {
auto it = std::find(need_merge_var_names_.begin(),
need_merge_var_names_.end(), var->Name());
if (it == need_merge_var_names_.end()) {
VLOG(2) << "train param: " << var->Name();
trainable_param_.push_back(var->Name());
}
}
}
place_ = place;
return;
}

void PSGPUTrainer::InitOtherEnv(const ProgramDesc& main_program) {
pull_dense_worker_->SetRootScope(root_scope_);
for (size_t i = 0; i < places_.size(); ++i) {
pull_dense_worker_->AddThreadScope(workers_[i]->GetThreadScope());
}
VLOG(3) << "init other env done.";
}

Expand All @@ -141,15 +145,27 @@ Scope* PSGPUTrainer::GetWorkerScope(int thread_id) { return nullptr; }
template <typename T>
void PSGPUTrainer::MergeToRootScope(LoDTensor* root_tensor, LoDTensor* tensor) {
LoDTensor tmp_root;
TensorCopy(*root_tensor, platform::CPUPlace(), &tmp_root);
TensorCopySync(*root_tensor, platform::CPUPlace(), &tmp_root);
T* tmp_root_data = tmp_root.data<T>();
LoDTensor tmp_tensor;
TensorCopy(*tensor, platform::CPUPlace(), &tmp_tensor);
TensorCopySync(*tensor, platform::CPUPlace(), &tmp_tensor);
T* data = tmp_tensor.data<T>();
for (int i = 0; i < tmp_tensor.numel(); i++) {
tmp_root_data[i] += data[i];
}
TensorCopy(tmp_root, platform::CPUPlace(), root_tensor);
TensorCopySync(tmp_root, platform::CPUPlace(), root_tensor);
}

void PSGPUTrainer::MergeDenseParam() {
auto thread_scope = workers_[0]->GetThreadScope();
for (auto& name : trainable_param_) {
VLOG(2) << "merge var " << name << " to root scope";
Variable* root_var = root_scope_->FindVar(name);
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
Variable* var = thread_scope->FindVar(name);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
TensorCopySync((*tensor), root_tensor->place(), root_tensor);
}
}

void PSGPUTrainer::Finalize() {
Expand Down Expand Up @@ -187,7 +203,7 @@ void PSGPUTrainer::Finalize() {
_ForEachDataType_(MergeCallback);
}
}
pull_dense_worker_->MergeDenseParam();
MergeDenseParam();
root_scope_->DropKids();
}
} // namespace framework
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/trainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ class PSGPUTrainer : public TrainerBase {
}
virtual std::string GetDumpPath(int tid) { return ""; }
virtual void InitDumpEnv() {}
virtual void MergeDenseParam();

template <typename T>
void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor);
Expand All @@ -274,6 +275,7 @@ class PSGPUTrainer : public TrainerBase {
DownpourWorkerParameter param_;
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
std::vector<std::string> need_merge_var_names_;
std::vector<std::string> trainable_param_;
float scale_datanorm_;
paddle::platform::Place place_;
ProgramDesc program_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,13 @@ def _minimize(self,
sparse_table_index = 0
for num in range(len(losses)):
loss = losses[num]
parameters = None
if parameter_list != None:
parameters = parameter_list[num]
prog_id = str(id(loss.block.program))
# param_grads of program
params_grads = sorted(
fluid.backward.append_backward(loss, parameter_list,
no_grad_set),
fluid.backward.append_backward(loss, parameters, no_grad_set),
key=lambda x: x[0].name)

flag_use_ps_gpu = strategy.get("use_ps_gpu", False)
Expand Down

0 comments on commit a647b80

Please sign in to comment.