From 55027eb2a7f6272401a0374c7b7982bb32783b1a Mon Sep 17 00:00:00 2001 From: JLBuenoLopez-eProsima Date: Tue, 6 Jun 2023 10:47:22 +0200 Subject: [PATCH 1/3] Add `rmw_publisher_count_non_local_matched_subscriptions` Co-Authored-by: Miguel Company Signed-off-by: JLBuenoLopez-eProsima Signed-off-by: Miguel Company --- rmw_fastrtps_cpp/src/rmw_publisher.cpp | 17 ++++++++++++ .../src/rmw_publisher.cpp | 17 ++++++++++++ .../custom_publisher_info.hpp | 14 ++++++++-- .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 6 +++++ .../src/custom_publisher_info.cpp | 26 ++++++++++++++----- rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp | 12 +++++++++ 6 files changed, 83 insertions(+), 9 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_cpp/src/rmw_publisher.cpp index 3d1cfe8f5..10fd5e3b2 100644 --- a/rmw_fastrtps_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publisher.cpp @@ -150,6 +150,23 @@ rmw_publisher_count_matched_subscriptions( publisher, subscription_count); } +rmw_ret_t +rmw_publisher_count_non_local_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * non_local_subscription_count) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, + publisher->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(non_local_subscription_count, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_publisher_count_non_local_matched_subscriptions( + publisher, non_local_subscription_count); +} + rmw_ret_t rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp index c5b1ca459..d7b6b9cc8 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp @@ -150,6 +150,23 @@ rmw_publisher_count_matched_subscriptions( publisher, subscription_count); } +rmw_ret_t +rmw_publisher_count_non_local_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * non_local_subscription_count) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, + publisher->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(non_local_subscription_count, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_publisher_count_non_local_matched_subscriptions( + publisher, non_local_subscription_count); +} + rmw_ret_t rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) { diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index f3023f72b..5b0e8ec77 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -122,7 +122,7 @@ class RMWPublisherEvent final : public EventListenerInterface * \param[in] guid The GUID of the newly-matched subscription to track. */ RMW_FASTRTPS_SHARED_CPP_PUBLIC - void track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid); + void track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local); /// Remove a GUID from the internal set of unique subscriptions matched to this publisher. /** @@ -132,7 +132,7 @@ class RMWPublisherEvent final : public EventListenerInterface * \param[in] guid The GUID of the newly-unmatched subscription to track. */ RMW_FASTRTPS_SHARED_CPP_PUBLIC - void untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid); + void untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local); /// Return the number of unique subscriptions matched to this publisher. /** @@ -141,6 +141,13 @@ class RMWPublisherEvent final : public EventListenerInterface RMW_FASTRTPS_SHARED_CPP_PUBLIC size_t subscription_count() const; + /// Return the number of unique non-local subscriptions matched to this publisher. + /** + * \return Number of unique non-local subscriptions matched to this publisher. + */ + RMW_FASTRTPS_SHARED_CPP_PUBLIC + size_t non_local_subscription_count() const; + RMW_FASTRTPS_SHARED_CPP_PUBLIC void update_deadline(uint32_t total_count, uint32_t total_count_change); @@ -165,6 +172,9 @@ class RMWPublisherEvent final : public EventListenerInterface std::set subscriptions_ RCPPUTILS_TSA_GUARDED_BY(subscriptions_mutex_); + std::set non_local_subscriptions_ + RCPPUTILS_TSA_GUARDED_BY(subscriptions_mutex_); + mutable std::mutex subscriptions_mutex_; bool deadline_changed_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 2994c108f..b693bc5fa 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -217,6 +217,12 @@ __rmw_publisher_count_matched_subscriptions( const rmw_publisher_t * publisher, size_t * subscription_count); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_publisher_count_non_local_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * non_local_subscription_count); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_publisher_get_actual_qos( diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index f6d4cfbe8..79e1770f0 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -38,14 +38,14 @@ void CustomDataWriterListener::on_publication_matched( eprosima::fastdds::dds::DataWriter * writer, const eprosima::fastdds::dds::PublicationMatchedStatus & status) { - (void)writer; + eprosima::fastrtps::rtps::GUID_t subscription_guid = + eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle); + bool is_local = writer->guid().guidPrefix == subscription_guid.guidPrefix; if (status.current_count_change == 1) { - publisher_event_->track_unique_subscription( - eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle)); + publisher_event_->track_unique_subscription(subscription_guid, is_local); } else if (status.current_count_change == -1) { - publisher_event_->untrack_unique_subscription( - eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle)); + publisher_event_->untrack_unique_subscription(subscription_guid, is_local); } else { return; } @@ -277,16 +277,22 @@ void RMWPublisherEvent::set_on_new_event_callback( publisher_info_->data_writer_->set_listener(publisher_info_->data_writer_listener_, status_mask); } -void RMWPublisherEvent::track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid) +void RMWPublisherEvent::track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local) { std::lock_guard lock(subscriptions_mutex_); subscriptions_.insert(guid); + if (!is_local) { + non_local_subscriptions_.insert(guid); + } } -void RMWPublisherEvent::untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid) +void RMWPublisherEvent::untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local) { std::lock_guard lock(subscriptions_mutex_); subscriptions_.erase(guid); + if (!is_local) { + non_local_subscriptions_.erase(guid); + } } size_t RMWPublisherEvent::subscription_count() const @@ -295,6 +301,12 @@ size_t RMWPublisherEvent::subscription_count() const return subscriptions_.size(); } +size_t RMWPublisherEvent::non_local_subscription_count() const +{ + std::lock_guard lock(subscriptions_mutex_); + return non_local_subscriptions_.size(); +} + void RMWPublisherEvent::update_deadline(uint32_t total_count, uint32_t total_count_change) { rcpputils::unique_lock lock_mutex(on_new_event_m_); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp index 8ad697aba..e08c56018 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp @@ -99,6 +99,18 @@ __rmw_publisher_count_matched_subscriptions( return RMW_RET_OK; } +rmw_ret_t +__rmw_publisher_count_non_local_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * non_local_subscription_count) +{ + auto info = static_cast(publisher->data); + + *non_local_subscription_count = info->publisher_event_->non_local_subscription_count(); + + return RMW_RET_OK; +} + rmw_ret_t __rmw_publisher_assert_liveliness( const char * identifier, From 3bce1351c98c9a8478c5160d73d78f41be039f6c Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 20 Oct 2023 08:51:29 +0200 Subject: [PATCH 2/3] Keep linters happy Signed-off-by: Miguel Company --- rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index 79e1770f0..36c53c4c0 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -277,7 +277,9 @@ void RMWPublisherEvent::set_on_new_event_callback( publisher_info_->data_writer_->set_listener(publisher_info_->data_writer_listener_, status_mask); } -void RMWPublisherEvent::track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local) +void RMWPublisherEvent::track_unique_subscription( + eprosima::fastrtps::rtps::GUID_t guid, + bool is_local) { std::lock_guard lock(subscriptions_mutex_); subscriptions_.insert(guid); @@ -286,7 +288,9 @@ void RMWPublisherEvent::track_unique_subscription(eprosima::fastrtps::rtps::GUID } } -void RMWPublisherEvent::untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local) +void RMWPublisherEvent::untrack_unique_subscription( + eprosima::fastrtps::rtps::GUID_t guid, + bool is_local) { std::lock_guard lock(subscriptions_mutex_); subscriptions_.erase(guid); From 0095ca1b5265f49fd391a8e76f3d5dcc2de38e84 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 20 Oct 2023 09:51:07 +0200 Subject: [PATCH 3/3] Update doxygen documentation Signed-off-by: Miguel Company --- .../include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 5b0e8ec77..cf339ef66 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -120,6 +120,7 @@ class RMWPublisherEvent final : public EventListenerInterface * user calls rmw_count_subscribers(). * * \param[in] guid The GUID of the newly-matched subscription to track. + * \param[in] is_local Whether \c guid belongs to the same participant as this publisher. */ RMW_FASTRTPS_SHARED_CPP_PUBLIC void track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local); @@ -130,6 +131,7 @@ class RMWPublisherEvent final : public EventListenerInterface * user calls rmw_count_subscribers(). * * \param[in] guid The GUID of the newly-unmatched subscription to track. + * \param[in] is_local Whether \c guid belongs to the same participant as this publisher. */ RMW_FASTRTPS_SHARED_CPP_PUBLIC void untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local);