Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventsExecutor #468

Merged
merged 14 commits into from
Feb 24, 2022
12 changes: 12 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,4 +525,16 @@ rmw_client_response_subscription_get_actual_qos(

return rmw_fastrtps_shared_cpp::__rmw_client_response_subscription_get_actual_qos(client, qos);
}

rmw_ret_t
rmw_client_set_on_new_response_callback(
rmw_client_t * rmw_client,
rmw_event_callback_t callback,
const void * user_data)
{
return rmw_fastrtps_shared_cpp::__rmw_client_set_on_new_response_callback(
rmw_client,
callback,
user_data);
}
} // extern "C"
12 changes: 12 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,16 @@ rmw_subscription_event_init(
subscription->data,
event_type);
}

rmw_ret_t
rmw_event_set_callback(
rmw_event_t * rmw_event,
rmw_event_callback_t callback,
const void * user_data)
{
return rmw_fastrtps_shared_cpp::__rmw_event_set_callback(
rmw_event,
callback,
user_data);
}
} // extern "C"
12 changes: 12 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,4 +524,16 @@ rmw_service_request_subscription_get_actual_qos(

return rmw_fastrtps_shared_cpp::__rmw_service_request_subscription_get_actual_qos(service, qos);
}

rmw_ret_t
rmw_service_set_on_new_request_callback(
rmw_service_t * rmw_service,
rmw_event_callback_t callback,
const void * user_data)
{
return rmw_fastrtps_shared_cpp::__rmw_service_set_on_new_request_callback(
rmw_service,
callback,
user_data);
}
} // extern "C"
12 changes: 12 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,16 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
return rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
eprosima_fastrtps_identifier, node, subscription);
}

rmw_ret_t
rmw_subscription_set_on_new_message_callback(
rmw_subscription_t * rmw_subscription,
rmw_event_callback_t callback,
const void * user_data)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_set_on_new_message_callback(
rmw_subscription,
callback,
user_data);
}
} // extern "C"
3 changes: 1 addition & 2 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ create_subscription(
/////
// Create Listener
if (create_subscription_listener) {
info->listener_ = new (std::nothrow) SubListener(info);

info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
return nullptr;
Expand Down
8 changes: 7 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ create_subscription(
/////
// Create Listener
if (create_subscription_listener) {
info->listener_ = new (std::nothrow) SubListener(info);
info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth);

if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
Expand Down Expand Up @@ -266,6 +266,12 @@ create_subscription(
return nullptr;
}

info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener");
return nullptr;
}

eprosima::fastdds::dds::DataReaderQos original_qos = reader_qos;
switch (subscription_options->require_unique_network_flow_endpoints) {
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class ClientListener;
Expand Down Expand Up @@ -125,6 +127,14 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
list.emplace_back(std::move(response));
list_has_data_.store(true);
}

std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if (on_new_response_cb_) {
on_new_response_cb_(user_data_, 1);
} else {
unread_count_++;
}
}
}
}
Expand Down Expand Up @@ -181,6 +191,29 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
info_->response_subscriber_matched_count_.store(publishers_.size());
}

// Provide handlers to perform an action when a
// new event from this listener has ocurred
void
set_on_new_response_callback(
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_response_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_response_cb_ = nullptr;
}
}

private:
bool popResponse(CustomClientResponse & response) RCPPUTILS_TSA_REQUIRES(internalMutex_)
{
Expand All @@ -200,6 +233,11 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;

rmw_event_callback_t on_new_response_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_response_m_;
uint64_t unread_count_ = 0;
};

class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "fastcdr/FastBuffer.h"

#include "rmw/event.h"
#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

Expand Down Expand Up @@ -58,6 +59,17 @@ class EventListenerInterface
* \return `false` if data was not available, in this case nothing was written to event_info.
*/
virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_info) = 0;

// Provide handlers to perform an action when a
// new event from this listener has ocurred
virtual void set_on_new_event_callback(
const void * user_data,
rmw_event_callback_t callback) = 0;

rmw_event_callback_t on_new_event_cb_{nullptr};
const void * user_data_{nullptr};
uint64_t unread_events_count_ = 0;
std::mutex on_new_event_m_;
};

class EventListenerInterface::ConditionalScopedLock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
bool
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void set_on_new_event_callback(
const void * user_data,
rmw_event_callback_t callback) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
takeNextEvent(rmw_event_type_t event_type, void * event_info) final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/guid_utils.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

Expand Down Expand Up @@ -246,6 +248,14 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
list.push_back(request);
list_has_data_.store(true);
}

std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if (on_new_request_cb_) {
on_new_request_cb_(user_data_, 1);
} else {
unread_count_++;
}
}
}
}
Expand Down Expand Up @@ -296,13 +306,41 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
return list_has_data_.load();
}

// Provide handlers to perform an action when a
// new event from this listener has ocurred
void
set_on_new_request_callback(
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_request_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_request_cb_ = nullptr;
}
}

private:
CustomServiceInfo * info_;
std::mutex internalMutex_;
std::list<CustomServiceRequest> list RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::atomic_bool list_has_data_;
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

rmw_event_callback_t on_new_request_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_request_m_;
uint64_t unread_count_ = 0;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_

#include <atomic>
#include <algorithm>
#include <condition_variable>
#include <limits>
#include <memory>
#include <mutex>
#include <set>
Expand All @@ -35,6 +37,7 @@
#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/impl/cpp/macros.hpp"
#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"

Expand Down Expand Up @@ -66,7 +69,7 @@ struct CustomSubscriberInfo : public CustomEventInfo
class SubListener : public EventListenerInterface, public eprosima::fastdds::dds::DataReaderListener
{
public:
explicit SubListener(CustomSubscriberInfo * info)
explicit SubListener(CustomSubscriberInfo * info, size_t qos_depth)
: data_(false),
deadline_changes_(false),
liveliness_changes_(false),
Expand All @@ -75,6 +78,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
conditionMutex_(nullptr),
conditionVariable_(nullptr)
{
qos_depth_ = (qos_depth > 0) ? qos_depth : std::numeric_limits<size_t>::max();
// Field is not used right now
(void)info;
}
Expand All @@ -100,6 +104,14 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
on_data_available(eprosima::fastdds::dds::DataReader * reader) final
{
update_has_data(reader);

std::unique_lock<std::mutex> lock_mutex(on_new_message_m_);

if (on_new_message_cb_) {
on_new_message_cb_(user_data_, 1);
} else {
new_data_unread_count_++;
}
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
Expand Down Expand Up @@ -131,6 +143,11 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
bool
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void set_on_new_event_callback(
const void * user_data,
rmw_event_callback_t callback) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
takeNextEvent(rmw_event_type_t event_type, void * event_info) final;
Expand Down Expand Up @@ -177,6 +194,30 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
return publishers_.size();
}

// Provide handlers to perform an action when a
// new event from this listener has ocurred
void
set_on_new_message_callback(
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_message_m_);

if (callback) {
// Push events arrived before setting the executor's callback
if (new_data_unread_count_) {
auto unread_count = std::min(new_data_unread_count_, qos_depth_);
callback(user_data, unread_count);
new_data_unread_count_ = 0;
}
user_data_ = user_data;
on_new_message_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_message_cb_ = nullptr;
}
}

private:
mutable std::mutex internalMutex_;

Expand All @@ -202,6 +243,11 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::set<eprosima::fastrtps::rtps::GUID_t> publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

rmw_event_callback_t on_new_message_cb_{nullptr};
std::mutex on_new_message_m_;
size_t qos_depth_;
size_t new_data_unread_count_ = 0;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
Loading