From 395596ee31649e0db439603701c5df61fb7ead1a Mon Sep 17 00:00:00 2001 From: Emanuele Danovaro Date: Thu, 2 May 2024 14:23:40 +0100 Subject: [PATCH] sync action + connection termination --- .github/workflows/sync.yml | 26 ++++++++++++++++++++++ src/fdb5/remote/client/Client.cc | 1 - src/fdb5/remote/client/Client.h | 2 -- src/fdb5/remote/client/ClientConnection.cc | 16 +++++++------ src/fdb5/remote/client/ClientConnection.h | 4 +++- 5 files changed, 38 insertions(+), 11 deletions(-) create mode 100644 .github/workflows/sync.yml diff --git a/.github/workflows/sync.yml b/.github/workflows/sync.yml new file mode 100644 index 000000000..50b8478ee --- /dev/null +++ b/.github/workflows/sync.yml @@ -0,0 +1,26 @@ +name: sync + +# Controls when the workflow will run +on: + + # Trigger the workflow on all pushes + push: + branches: + - '**' + tags: + - '**' + + # Trigger the workflow when a branch or tag is deleted + delete: ~ + +jobs: + + # Calls a reusable CI workflow to sync the current with a remote repository. + # It will correctly handle addition of any new and removal of existing Git objects. + sync: + name: sync + uses: ecmwf-actions/reusable-workflows/.github/workflows/sync.yml@v2 + secrets: + target_repository: mars/fdb5 + target_username: ClonedDuck + target_token: ${{ secrets.BITBUCKET_PAT }} diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index d7e975305..a89e26180 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -8,7 +8,6 @@ * does it submit to any jurisdiction. */ -#include #include "fdb5/LibFdb5.h" diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 680d504a2..0937720e0 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -10,8 +10,6 @@ #pragma once -#include - #include "eckit/config/Configuration.h" #include "eckit/memory/NonCopyable.h" #include "eckit/net/Endpoint.h" diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 20395f145..cc56955f0 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -58,7 +58,7 @@ class DataWriteRequest { ClientConnection::ClientConnection(const eckit::net::Endpoint& controlEndpoint, const std::string& defaultEndpoint): - controlEndpoint_(controlEndpoint), defaultEndpoint_(defaultEndpoint), id_(1), connected_(false), dataWriteQueue_(nullptr) { + controlEndpoint_(controlEndpoint), defaultEndpoint_(defaultEndpoint), id_(1), connected_(false), controlStopping_(false), dataStopping_(false), dataWriteQueue_(nullptr) { LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::ClientConnection() controlEndpoint: " << controlEndpoint << std::endl; } @@ -85,8 +85,9 @@ bool ClientConnection::remove(uint32_t clientID) { } if (clients_.empty()) { - Connection::write(Message::Exit, true, 0, 0); - if (!single_) { + if (!controlStopping_) + Connection::write(Message::Exit, true, 0, 0); + if (!single_ && !dataStopping_) { // TODO make the data connection dying automatically, when there are no more async writes Connection::write(Message::Exit, false, 0, 0); } @@ -143,13 +144,12 @@ bool ClientConnection::connect(bool singleAttempt) { listeningControlThread_ = std::thread([this] { listeningControlThreadLoop(); }); connected_ = true; - return true; } catch(eckit::TooManyRetries& e) { if (controlClient_.isConnected()) { controlClient_.close(); } } - return false; + return connected_; } void ClientConnection::disconnect() { @@ -191,7 +191,7 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const { // ----------------------------------------------------------------------------------------------------- -eckit::Buffer ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector> data) { +eckit::Buffer&& ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector> data) { auto it = clients_.find(client.clientId()); ASSERT(it != clients_.end()); @@ -200,7 +200,7 @@ eckit::Buffer ClientConnection::controlWrite(Client& client, Message msg, uint32 Connection::write(msg, true, client.clientId(), requestID, data); - return f.get(); + return std::move(f.get()); } void ClientConnection::dataWrite(DataWriteRequest& r) { @@ -362,6 +362,7 @@ void ClientConnection::listeningControlThreadLoop() { LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningControlThreadLoop - got [message=" << hdr.message << ",clientID=" << hdr.clientID() << ",control=" << hdr.control() << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl; if (hdr.message == Message::Exit) { + controlStopping_ = true; return; } else { if (hdr.clientID()) { @@ -432,6 +433,7 @@ void ClientConnection::listeningDataThreadLoop() { LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningDataThreadLoop - got [message=" << hdr.message << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl; if (hdr.message == Message::Exit) { + dataStopping_ = true; return; } else { if (hdr.clientID()) { diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index 37973d983..86650a9d6 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -38,7 +38,7 @@ class ClientConnection : protected Connection { virtual ~ClientConnection(); - eckit::Buffer controlWrite(Client& client, Message msg, uint32_t requestID, bool startDataListener, std::vector> data={}); + eckit::Buffer&& controlWrite(Client& client, Message msg, uint32_t requestID, bool startDataListener, std::vector> data={}); void dataWrite(Client& client, Message msg, uint32_t requestID, std::vector> data={}); void add(Client& client); @@ -101,6 +101,8 @@ class ClientConnection : protected Connection { uint32_t id_; bool connected_; + bool controlStopping_; + bool dataStopping_; std::map> promises_;