Skip to content

Commit

Permalink
[core] unregistration of timed out samples in one central place (inst…
Browse files Browse the repository at this point in the history
…ead of all over the place). (eclipse-ecal#1675)

Modifies also CExpirationMap interface to return keys and values of expired items.
  • Loading branch information
KerstinKeller authored Aug 12, 2024
1 parent 09d2cc8 commit 938c891
Show file tree
Hide file tree
Showing 22 changed files with 872 additions and 102 deletions.
2 changes: 2 additions & 0 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ if (ECAL_CORE_REGISTRATION)
src/registration/ecal_registration_sample_applier_gates.h
src/registration/ecal_registration_sample_applier_user.cpp
src/registration/ecal_registration_sample_applier_user.h
src/registration/ecal_registration_timeout_provider.cpp
src/registration/ecal_registration_timeout_provider.h
src/registration/ecal_registration_sender.h
src/registration/udp/ecal_registration_receiver_udp.cpp
src/registration/udp/ecal_registration_receiver_udp.h
Expand Down
14 changes: 1 addition & 13 deletions ecal/core/src/ecal_descgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,7 @@ namespace

namespace eCAL
{
CDescGate::CDescGate(const std::chrono::milliseconds& exp_timeout_) :
m_publisher_info_map (exp_timeout_),
m_subscriber_info_map (exp_timeout_),
m_service_info_map (exp_timeout_),
m_client_info_map (exp_timeout_)
{
}
CDescGate::CDescGate() = default;
CDescGate::~CDescGate() = default;

Registration::QualityTopicInfoMultiMap CDescGate::GetPublishers()
Expand Down Expand Up @@ -77,7 +71,6 @@ namespace eCAL
Registration::QualityTopicInfoMultiMap multi_map;

const std::lock_guard<std::mutex> lock(topic_info_map_.mtx);
topic_info_map_.map.erase_expired();

for (const auto& topic_map_it : topic_info_map_.map)
{
Expand All @@ -92,7 +85,6 @@ namespace eCAL
Registration::QualityServiceInfoMultimap multi_map;

const std::lock_guard<std::mutex> lock(service_method_info_map_.mtx);
service_method_info_map_.map.erase_expired();

for (const auto& service_method_info_map_it : service_method_info_map_.map)
{
Expand Down Expand Up @@ -183,14 +175,12 @@ namespace eCAL
topic_quality_info.quality = topic_quality_;

const std::unique_lock<std::mutex> lock(topic_info_map_.mtx);
topic_info_map_.map.erase_expired();
topic_info_map_.map[topic_info_key] = topic_quality_info;
}

void CDescGate::RemTopicDescription(SQualityTopicIdMap& topic_info_map_, const std::string& topic_name_, const Registration::TopicId& topic_id_)
{
const std::unique_lock<std::mutex> lock(topic_info_map_.mtx);
topic_info_map_.map.erase_expired();
topic_info_map_.map.erase(STopicIdKey{ topic_name_, topic_id_ });
}

Expand All @@ -213,7 +203,6 @@ namespace eCAL
service_quality_info.response_quality = response_type_quality_;

const std::lock_guard<std::mutex> lock(service_method_info_map_.mtx);
service_method_info_map_.map.erase_expired();
service_method_info_map_.map[service_method_info_key] = service_quality_info;
}

Expand All @@ -222,7 +211,6 @@ namespace eCAL
std::list<SServiceIdKey> service_method_infos_to_remove;

const std::lock_guard<std::mutex> lock(service_method_info_map_.mtx);
service_method_info_map_.map.erase_expired();

for (auto&& service_it : service_method_info_map_.map)
{
Expand Down
17 changes: 7 additions & 10 deletions ecal/core/src/ecal_descgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include <ecal/ecal_util.h>

#include "serialization/ecal_struct_sample_registration.h"
#include "util/ecal_expmap.h"

#include <chrono>
#include <map>
Expand Down Expand Up @@ -67,7 +66,7 @@ namespace eCAL
class CDescGate
{
public:
CDescGate(const std::chrono::milliseconds& exp_timeout_);
CDescGate();
~CDescGate();

// apply samples to description gate
Expand All @@ -90,20 +89,18 @@ namespace eCAL
CDescGate& operator=(CDescGate&&) = delete;

protected:
using QualityTopicIdExpMap = eCAL::Util::CExpirationMap<STopicIdKey, Registration::SQualityTopicInfo>;
using QualityTopicIdMap = std::map<STopicIdKey, Registration::SQualityTopicInfo>;
struct SQualityTopicIdMap
{
explicit SQualityTopicIdMap(const std::chrono::milliseconds& timeout_) : map(timeout_) {};
mutable std::mutex mtx;
QualityTopicIdExpMap map;
mutable std::mutex mtx;
QualityTopicIdMap map;
};

using QualityServiceIdExpMap = eCAL::Util::CExpirationMap<SServiceIdKey, Registration::SQualityServiceInfo>;
using QualityServiceIdMap = std::map<SServiceIdKey, Registration::SQualityServiceInfo>;
struct SQualityServiceIdMap
{
explicit SQualityServiceIdMap(const std::chrono::milliseconds& timeout_) : map(timeout_) {};
mutable std::mutex mtx;
QualityServiceIdExpMap map;
mutable std::mutex mtx;
QualityServiceIdMap map;
};

static Registration::QualityTopicInfoMultiMap GetTopics (SQualityTopicIdMap& topic_info_map_);
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/ecal_globals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ namespace eCAL
if (descgate_instance == nullptr)
{
// create description gate with configured expiration timeout
descgate_instance = std::make_unique<CDescGate>(std::chrono::milliseconds(Config::GetMonitoringTimeoutMs()));
descgate_instance = std::make_unique<CDescGate>();
new_initialization = true;
}

Expand Down
14 changes: 0 additions & 14 deletions ecal/core/src/monitoring/ecal_monitoring_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ namespace eCAL
////////////////////////////////////////
CMonitoringImpl::CMonitoringImpl(const Monitoring::Configuration& config_) :
m_init(false),
m_process_map (std::chrono::milliseconds(config_.timeout)),
m_publisher_map (std::chrono::milliseconds(config_.timeout)),
m_subscriber_map(std::chrono::milliseconds(config_.timeout)),
m_server_map (std::chrono::milliseconds(config_.timeout)),
m_clients_map (std::chrono::milliseconds(config_.timeout)),
m_config (config_)
{
}
Expand Down Expand Up @@ -623,7 +618,6 @@ namespace eCAL
monitoring_.processes.reserve(m_process_map.map->size());

// iterate map
m_process_map.map->erase_expired();
for (const auto& process : (*m_process_map.map))
{
monitoring_.processes.emplace_back(process.second);
Expand All @@ -641,7 +635,6 @@ namespace eCAL
monitoring_.publisher.reserve(m_publisher_map.map->size());

// iterate map
m_publisher_map.map->erase_expired();
for (const auto& publisher : (*m_publisher_map.map))
{
monitoring_.publisher.emplace_back(publisher.second);
Expand All @@ -659,7 +652,6 @@ namespace eCAL
monitoring_.subscriber.reserve(m_subscriber_map.map->size());

// iterate map
m_subscriber_map.map->erase_expired();
for (const auto& subscriber : (*m_subscriber_map.map))
{
monitoring_.subscriber.emplace_back(subscriber.second);
Expand All @@ -677,7 +669,6 @@ namespace eCAL
monitoring_.server.reserve(m_server_map.map->size());

// iterate map
m_server_map.map->erase_expired();
for (const auto& server : (*m_server_map.map))
{
monitoring_.server.emplace_back(server.second);
Expand All @@ -695,7 +686,6 @@ namespace eCAL
monitoring_.clients.reserve(m_clients_map.map->size());

// iterate map
m_clients_map.map->erase_expired();
for (const auto& client : (*m_clients_map.map))
{
monitoring_.clients.emplace_back(client.second);
Expand All @@ -709,7 +699,6 @@ namespace eCAL
const std::lock_guard<std::mutex> lock(m_process_map.sync);

// iterate map
m_process_map.map->erase_expired();
for (const auto& process : (*m_process_map.map))
{
// add process
Expand All @@ -723,7 +712,6 @@ namespace eCAL
const std::lock_guard<std::mutex> lock(m_server_map.sync);

// iterate map
m_server_map.map->erase_expired();
for (const auto& server : (*m_server_map.map))
{
// add service
Expand All @@ -737,7 +725,6 @@ namespace eCAL
const std::lock_guard<std::mutex> lock(m_clients_map.sync);

// iterate map
m_clients_map.map->erase_expired();
for (const auto& client : (*m_clients_map.map))
{
// add client
Expand All @@ -751,7 +738,6 @@ namespace eCAL
const std::lock_guard<std::mutex> lock(map_.sync);

// iterate map
map_.map->erase_expired();
for (const auto& topic : (*map_.map))
{
if (direction_ == "publisher")
Expand Down
26 changes: 13 additions & 13 deletions ecal/core/src/monitoring/ecal_monitoring_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
#include <ecal/config/monitoring.h>

#include "ecal_def.h"
#include "util/ecal_expmap.h"

#include "serialization/ecal_serialize_sample_registration.h"

#include <memory>
#include <map>
#include <mutex>
#include <set>
#include <string>
Expand Down Expand Up @@ -82,44 +82,44 @@ namespace eCAL
bool RegisterTopic(const Registration::Sample& sample_, enum ePubSub pubsub_type_);
bool UnregisterTopic(const Registration::Sample& sample_, enum ePubSub pubsub_type_);

using TopicMonMapT = Util::CExpirationMap<std::string, Monitoring::STopicMon>;
using TopicMonMapT = std::map<std::string, Monitoring::STopicMon>;
struct STopicMonMap
{
explicit STopicMonMap(const std::chrono::milliseconds& timeout_) :
map(std::make_unique<TopicMonMapT>(timeout_))
explicit STopicMonMap() :
map(std::make_unique<TopicMonMapT>())
{
};
std::mutex sync;
std::unique_ptr<TopicMonMapT> map;
};

using ProcessMonMapT = Util::CExpirationMap<std::string, Monitoring::SProcessMon>;
using ProcessMonMapT = std::map<std::string, Monitoring::SProcessMon>;
struct SProcessMonMap
{
explicit SProcessMonMap(const std::chrono::milliseconds& timeout_) :
map(std::make_unique<ProcessMonMapT>(timeout_))
explicit SProcessMonMap() :
map(std::make_unique<ProcessMonMapT>())
{
};
std::mutex sync;
std::unique_ptr<ProcessMonMapT> map;
};

using ServerMonMapT = Util::CExpirationMap<std::string, Monitoring::SServerMon>;
using ServerMonMapT = std::map<std::string, Monitoring::SServerMon>;
struct SServerMonMap
{
explicit SServerMonMap(const std::chrono::milliseconds& timeout_) :
map(std::make_unique<ServerMonMapT>(timeout_))
explicit SServerMonMap() :
map(std::make_unique<ServerMonMapT>())
{
};
std::mutex sync;
std::unique_ptr<ServerMonMapT> map;
};

using ClientMonMapT = Util::CExpirationMap<std::string, Monitoring::SClientMon>;
using ClientMonMapT = std::map<std::string, Monitoring::SClientMon>;
struct SClientMonMap
{
explicit SClientMonMap(const std::chrono::milliseconds& timeout_) :
map(std::make_unique<ClientMonMapT>(timeout_))
explicit SClientMonMap() :
map(std::make_unique<ClientMonMapT>())
{
};
std::mutex sync;
Expand Down
5 changes: 0 additions & 5 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ namespace eCAL
counter << std::chrono::steady_clock::now().time_since_epoch().count();
m_topic_id = counter.str();

// set registration expiration
const std::chrono::milliseconds registration_timeout(Config::GetRegistrationTimeoutMs());
m_pub_map.set_expiration(registration_timeout);

// start transport layers
InitializeLayers();
StartTransportLayer();
Expand Down Expand Up @@ -547,7 +543,6 @@ namespace eCAL
void CDataReader::CheckConnections()
{
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
m_pub_map.erase_expired();

if (m_pub_map.empty())
{
Expand Down
3 changes: 1 addition & 2 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

#include "serialization/ecal_serialize_sample_payload.h"
#include "serialization/ecal_serialize_sample_registration.h"
#include "util/ecal_expmap.h"
#include "util/frequency_calculator.h"

#include <atomic>
Expand Down Expand Up @@ -143,7 +142,7 @@ namespace eCAL
Subscriber::Configuration m_config;

std::atomic<bool> m_connected;
using PublicationMapT = Util::CExpirationMap<SPublicationInfo, std::tuple<SDataTypeInformation, SLayerStates>>;
using PublicationMapT = std::map<SPublicationInfo, std::tuple<SDataTypeInformation, SLayerStates>>;
mutable std::mutex m_pub_map_mtx;
PublicationMapT m_pub_map;

Expand Down
5 changes: 0 additions & 5 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ namespace eCAL
counter << std::chrono::steady_clock::now().time_since_epoch().count();
m_topic_id = counter.str();

// set registration expiration
const std::chrono::milliseconds registration_timeout(Config::GetRegistrationTimeoutMs());
m_sub_map.set_expiration(registration_timeout);

// mark as created
m_created = true;
}
Expand Down Expand Up @@ -594,7 +590,6 @@ namespace eCAL
void CDataWriter::CheckConnections()
{
const std::lock_guard<std::mutex> lock(m_sub_map_mtx);
m_sub_map.erase_expired();

if (m_sub_map.empty())
{
Expand Down
3 changes: 1 addition & 2 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <ecal/config/publisher.h>

#include "serialization/ecal_serialize_sample_registration.h"
#include "util/ecal_expmap.h"
#include "util/frequency_calculator.h"

#if ECAL_CORE_TRANSPORT_UDP
Expand Down Expand Up @@ -156,7 +155,7 @@ namespace eCAL

std::atomic<bool> m_connected;

using SSubscriptionMapT = Util::CExpirationMap<SSubscriptionInfo, std::tuple<SDataTypeInformation, SLayerStates>>;
using SSubscriptionMapT = std::map<SSubscriptionInfo, std::tuple<SDataTypeInformation, SLayerStates>>;
mutable std::mutex m_sub_map_mtx;
SSubscriptionMapT m_sub_map;

Expand Down
Loading

0 comments on commit 938c891

Please sign in to comment.