diff --git a/byteps/common/cpu_reducer.cc b/byteps/common/cpu_reducer.cc index cc28068ff..af65e4312 100644 --- a/byteps/common/cpu_reducer.cc +++ b/byteps/common/cpu_reducer.cc @@ -208,5 +208,19 @@ int CpuReducer::_sum_float16(void* dst, void* src1, void* src2, size_t len) { return 0; } +int CpuReducer::copy(void* dst, void* src, size_t len) { + auto in = (float*)src; + auto out = (float*)dst; +#pragma omp parallel for simd num_threads(_num_threads) + for (size_t i = 0; i < len / 4; ++i) { + out[i] = in[i]; + } + if (len % 4) { + memcpy(out + len / 4, in + len / 4, len % 4); + } + return 0; +} + + } // namespace common } // namespace byteps diff --git a/byteps/common/cpu_reducer.h b/byteps/common/cpu_reducer.h index 1f8fe2802..dceac70de 100644 --- a/byteps/common/cpu_reducer.h +++ b/byteps/common/cpu_reducer.h @@ -46,6 +46,7 @@ class CpuReducer { int sum(void* dst, void* src, size_t len, DataType dtype); int sum(void* dst, void* src1, void* src2, size_t len, DataType dtype); + int copy(void* dst, void* src, size_t len); #ifndef BYTEPS_BUILDING_SERVER bool isRoot(); diff --git a/byteps/server/server.cc b/byteps/server/server.cc index 99e3ba1b2..59e0e9d40 100644 --- a/byteps/server/server.cc +++ b/byteps/server/server.cc @@ -82,7 +82,7 @@ void BytePSServerEngineThread(int i) { << "dst_addr: " << DEBUG_PRINT_TENSOR_ADDRESS(msg.dst) << "\t" << "src_addr: " << DEBUG_PRINT_TENSOR_ADDRESS(msg.src) << "\t"; } - memcpy(msg.dst, msg.src, msg.len); + bps_reducer_->copy(msg.dst, msg.src, msg.len); if (is_debug) { std::lock_guard lock(debug_mu_); LOG(INFO) << "stage: ENGINE_COPY_RECV_AFTER \t" @@ -105,7 +105,7 @@ void BytePSServerEngineThread(int i) { << "dst_addr: " << DEBUG_PRINT_TENSOR_ADDRESS(msg.dst) << "\t" << "src_addr: " << DEBUG_PRINT_TENSOR_ADDRESS(msg.src) << "\t"; } - memcpy(msg.dst, msg.src, msg.len); + bps_reducer_->copy(msg.dst, msg.src, msg.len); if (is_debug) { std::lock_guard lock(debug_mu_); LOG(INFO) << "stage: ENGINE_COPY_MERGED_TO_STORE_AFTER \t" @@ -207,7 +207,7 @@ void BytePSHandler(const ps::KVMeta& req_meta, stored.tensor = (char*) malloc(len); stored.len = len; stored.dtype = type.dtype; - memcpy(stored.tensor, recved, len); // we may not need this copy + bps_reducer_->copy(stored.tensor, recved, len); // we may not need this copy for (const auto& req : updates.request) { SendPushResponse(key, req, server); } @@ -223,7 +223,7 @@ void BytePSHandler(const ps::KVMeta& req_meta, if (updates.request.empty()) { // from the first incoming worker if (sync_mode_) { if (is_engine_blocking_) { - memcpy(updates.merged.tensor, recved, len); + bps_reducer_->copy(updates.merged.tensor, recved, len); } else { // non-blocking if (debug_mode_ && (debug_key_ == key)) { std::lock_guard lock(debug_mu_); @@ -291,7 +291,7 @@ void BytePSHandler(const ps::KVMeta& req_meta, auto& stored = store_[key]; auto& update = updates.merged; if (is_engine_blocking_) { - memcpy(stored.tensor, updates.merged.tensor, len); + bps_reducer_->copy(stored.tensor, updates.merged.tensor, len); } else { if (debug_mode_ && (debug_key_ == key)) { std::lock_guard lock(debug_mu_);