Skip to content

Commit

Permalink
sync action + connection termination
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed May 2, 2024
1 parent b3e6111 commit 395596e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 11 deletions.
26 changes: 26 additions & 0 deletions .github/workflows/sync.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: sync

# Controls when the workflow will run
on:

# Trigger the workflow on all pushes
push:
branches:
- '**'
tags:
- '**'

# Trigger the workflow when a branch or tag is deleted
delete: ~

jobs:

# Calls a reusable CI workflow to sync the current with a remote repository.
# It will correctly handle addition of any new and removal of existing Git objects.
sync:
name: sync
uses: ecmwf-actions/reusable-workflows/.github/workflows/sync.yml@v2
secrets:
target_repository: mars/fdb5
target_username: ClonedDuck
target_token: ${{ secrets.BITBUCKET_PAT }}
1 change: 0 additions & 1 deletion src/fdb5/remote/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
* does it submit to any jurisdiction.
*/

#include <future>

#include "fdb5/LibFdb5.h"

Expand Down
2 changes: 0 additions & 2 deletions src/fdb5/remote/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

#pragma once

#include <future>

#include "eckit/config/Configuration.h"
#include "eckit/memory/NonCopyable.h"
#include "eckit/net/Endpoint.h"
Expand Down
16 changes: 9 additions & 7 deletions src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DataWriteRequest {


ClientConnection::ClientConnection(const eckit::net::Endpoint& controlEndpoint, const std::string& defaultEndpoint):
controlEndpoint_(controlEndpoint), defaultEndpoint_(defaultEndpoint), id_(1), connected_(false), dataWriteQueue_(nullptr) {
controlEndpoint_(controlEndpoint), defaultEndpoint_(defaultEndpoint), id_(1), connected_(false), controlStopping_(false), dataStopping_(false), dataWriteQueue_(nullptr) {

LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::ClientConnection() controlEndpoint: " << controlEndpoint << std::endl;
}
Expand All @@ -85,8 +85,9 @@ bool ClientConnection::remove(uint32_t clientID) {
}

if (clients_.empty()) {
Connection::write(Message::Exit, true, 0, 0);
if (!single_) {
if (!controlStopping_)
Connection::write(Message::Exit, true, 0, 0);
if (!single_ && !dataStopping_) {
// TODO make the data connection dying automatically, when there are no more async writes
Connection::write(Message::Exit, false, 0, 0);
}
Expand Down Expand Up @@ -143,13 +144,12 @@ bool ClientConnection::connect(bool singleAttempt) {
listeningControlThread_ = std::thread([this] { listeningControlThreadLoop(); });

connected_ = true;
return true;
} catch(eckit::TooManyRetries& e) {
if (controlClient_.isConnected()) {
controlClient_.close();
}
}
return false;
return connected_;
}

void ClientConnection::disconnect() {
Expand Down Expand Up @@ -191,7 +191,7 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const {

// -----------------------------------------------------------------------------------------------------

eckit::Buffer ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector<std::pair<const void*, uint32_t>> data) {
eckit::Buffer&& ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector<std::pair<const void*, uint32_t>> data) {
auto it = clients_.find(client.clientId());
ASSERT(it != clients_.end());

Expand All @@ -200,7 +200,7 @@ eckit::Buffer ClientConnection::controlWrite(Client& client, Message msg, uint32

Connection::write(msg, true, client.clientId(), requestID, data);

return f.get();
return std::move(f.get());
}

void ClientConnection::dataWrite(DataWriteRequest& r) {
Expand Down Expand Up @@ -362,6 +362,7 @@ void ClientConnection::listeningControlThreadLoop() {
LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningControlThreadLoop - got [message=" << hdr.message << ",clientID=" << hdr.clientID() << ",control=" << hdr.control() << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;

if (hdr.message == Message::Exit) {
controlStopping_ = true;
return;
} else {
if (hdr.clientID()) {
Expand Down Expand Up @@ -432,6 +433,7 @@ void ClientConnection::listeningDataThreadLoop() {
LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningDataThreadLoop - got [message=" << hdr.message << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;

if (hdr.message == Message::Exit) {
dataStopping_ = true;
return;
} else {
if (hdr.clientID()) {
Expand Down
4 changes: 3 additions & 1 deletion src/fdb5/remote/client/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ClientConnection : protected Connection {

virtual ~ClientConnection();

eckit::Buffer controlWrite(Client& client, Message msg, uint32_t requestID, bool startDataListener, std::vector<std::pair<const void*, uint32_t>> data={});
eckit::Buffer&& controlWrite(Client& client, Message msg, uint32_t requestID, bool startDataListener, std::vector<std::pair<const void*, uint32_t>> data={});
void dataWrite(Client& client, Message msg, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data={});

void add(Client& client);
Expand Down Expand Up @@ -101,6 +101,8 @@ class ClientConnection : protected Connection {
uint32_t id_;

bool connected_;
bool controlStopping_;
bool dataStopping_;

std::map<uint32_t, std::promise<eckit::Buffer>> promises_;

Expand Down

0 comments on commit 395596e

Please sign in to comment.