Skip to content

Commit

Permalink
server: remove recv->merged copy
Browse files Browse the repository at this point in the history
  • Loading branch information
ymjiang committed Nov 18, 2019
1 parent 3404d61 commit a51a578
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 31 deletions.
41 changes: 11 additions & 30 deletions byteps/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void SendPullResponse(const DataHandleType type,
const uint64_t key,
const ps::KVMeta& req_meta,
ps::KVServer<char>* server) {
std::lock_guard<std::mutex> lock(pullresp_mu_);
auto& stored = store_[key];
CHECK(stored.tensor) << "init " << key << " first";
// as server returns when store_realt is ready in this case
Expand Down Expand Up @@ -73,26 +74,6 @@ void BytePSServerEngineThread(int i) {

bool is_debug = (debug_mode_ && (debug_key_ == msg.key));
switch (msg.ops) {
case COPY_RECV: {
if (is_debug) {
std::lock_guard<std::mutex> lock(debug_mu_);
LOG(INFO) << "stage: ENGINE_COPY_RECV_BEFORE \t"
<< "dst: " << DEBUG_PRINT_TENSOR_VALUE(msg.dst) << "\t"
<< "src: " << DEBUG_PRINT_TENSOR_VALUE(msg.src) << "\t"
<< "dst_addr: " << DEBUG_PRINT_TENSOR_ADDRESS(msg.dst) << "\t"
<< "src_addr: " << DEBUG_PRINT_TENSOR_ADDRESS(msg.src) << "\t";
}
bps_reducer_->copy(msg.dst, msg.src, msg.len);
if (is_debug) {
std::lock_guard<std::mutex> lock(debug_mu_);
LOG(INFO) << "stage: ENGINE_COPY_RECV_AFTER \t"
<< "dst: " << DEBUG_PRINT_TENSOR_VALUE(msg.dst) << "\t"
<< "src: " << DEBUG_PRINT_TENSOR_VALUE(msg.src) << "\t"
<< "dst_addr: " << DEBUG_PRINT_TENSOR_ADDRESS(msg.dst) << "\t"
<< "src_addr: " << DEBUG_PRINT_TENSOR_ADDRESS(msg.src) << "\t";
}
break;
}
case COPY_MERGED: {
if (is_debug) {
std::lock_guard<std::mutex> lock(debug_mu_);
Expand Down Expand Up @@ -187,6 +168,10 @@ void BytePSHandler(const ps::KVMeta& req_meta,
auto len = (size_t) req_data.lens[0];
auto recved = reinterpret_cast<char*>(req_data.vals.data());
if (!stored.tensor) {
if (sync_mode_ && (update_buf_.find(key) == update_buf_.end())) {
update_buf_[key].merged.len = len;
update_buf_[key].merged.dtype = type.dtype;
}
// buffer the request meta
auto &updates = update_buf_[key];
updates.request.push_back(req_meta);
Expand All @@ -201,18 +186,14 @@ void BytePSHandler(const ps::KVMeta& req_meta,
stored.tensor = (char*) malloc(len);
stored.len = len;
stored.dtype = type.dtype;
CHECK(stored.tensor);
bps_reducer_->copy(stored.tensor, recved, len); // we may not need this copy
for (const auto& req : updates.request) {
SendPushResponse(key, req, server);
}
updates.request.clear();
} else {
auto &updates = update_buf_[key];
if (sync_mode_ && !updates.merged.tensor) {
updates.merged.tensor = (char*) malloc(len);
updates.merged.len = len;
updates.merged.dtype = type.dtype;
}
auto tid = GetThreadID(key, len);
if (updates.request.empty()) { // from the first incoming worker
if (sync_mode_) {
Expand All @@ -221,16 +202,15 @@ void BytePSHandler(const ps::KVMeta& req_meta,
} else { // non-blocking
if (debug_mode_ && (debug_key_ == key)) {
std::lock_guard<std::mutex> lock(debug_mu_);
LOG(INFO) << "stage: FIRST_WORKER_COPY \t"
LOG(INFO) << "stage: FIRST_WORKER_RECV \t"
<< "stored: " << DEBUG_PRINT_TENSOR_VALUE(stored.tensor) << "\t"
<< "merged: " << DEBUG_PRINT_TENSOR_VALUE(updates.merged.tensor) << "\t"
<< "recved: " << DEBUG_PRINT_TENSOR_VALUE(recved) << "\t"
<< "len: " << len << "\t"
<< "addr: " << DEBUG_PRINT_TENSOR_ADDRESS(recved);
}

BytePSEngineMessage msg = {type, key, updates.merged.tensor, recved, len, COPY_RECV, &req_data.vals};
engine_queues_[tid]->Push(msg);
// zero copy
updates.merged.tensor = recved;
updates.merged.tmp_sarray = &req_data.vals;
}
} else { // async mode, directly add to the buffer
if (is_engine_blocking_) {
Expand All @@ -245,6 +225,7 @@ void BytePSHandler(const ps::KVMeta& req_meta,
}
} else { // from other workers
CHECK(sync_mode_);
CHECK(updates.merged.tensor);
if (is_engine_blocking_) {
CHECK_GE(bps_reducer_->sum((void *) updates.merged.tensor,
(void *) recved,
Expand Down
4 changes: 3 additions & 1 deletion byteps/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ enum class RequestType {
};

enum BytePSEngineOperation {
COPY_RECV, SUM_RECV, COPY_MERGED, TERMINATE
SUM_RECV, COPY_MERGED, TERMINATE
};

struct PSKV {
Expand All @@ -55,6 +55,7 @@ struct BytePSArray {
char* tensor;
size_t len;
int dtype;
const SArray<char>* tmp_sarray;
};

struct UpdateBuf {
Expand Down Expand Up @@ -90,6 +91,7 @@ static DataHandleType DepairDataHandleType(int cmd) {
KVServer<SERVER_DATA_TYPE>* byteps_server_;
byteps::common::CpuReducer* bps_reducer_;
std::unordered_map<SERVER_KEY_TYPE, KVPairs<SERVER_DATA_TYPE> > mem_map_;
std::mutex pullresp_mu_;
std::unordered_map<uint64_t, ps::KVPairs<char> > push_response_map_;
std::unordered_map<uint64_t, ps::KVPairs<char> > pull_response_map_;

Expand Down

0 comments on commit a51a578

Please sign in to comment.