Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[foxy backport] Make service wait for response reader (#390) #412

Merged
merged 1 commit into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ rmw_create_client(
}

info->writer_guid_ = info->request_publisher_->getGuid();
info->reader_guid_ = info->response_subscriber_->getGuid();

rmw_client = rmw_client_allocate();
if (!rmw_client) {
Expand Down
7 changes: 6 additions & 1 deletion rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ rmw_create_service(
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
}
info->pub_listener_ = new ServicePubListener();
info->response_publisher_ =
Domain::createPublisher(participant, publisherParam, nullptr);
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->response_publisher_) {
RMW_SET_ERROR_MSG("create_service() could not create publisher");
goto fail;
Expand Down Expand Up @@ -255,6 +256,10 @@ rmw_create_service(
Domain::removePublisher(info->response_publisher_);
}

if (info->pub_listener_) {
delete info->pub_listener_;
}

if (info->request_subscriber_) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_subscriber_->getGuid());
Expand Down
3 changes: 2 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,12 @@ rmw_create_client(
info->request_publisher_ =
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->request_publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
RMW_SET_ERROR_MSG("create_client() could not create publisher");
goto fail;
}

info->writer_guid_ = info->request_publisher_->getGuid();
info->reader_guid_ = info->response_subscriber_->getGuid();

rmw_client = rmw_client_allocate();
if (!rmw_client) {
Expand Down
7 changes: 6 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ rmw_create_service(
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
}
info->pub_listener_ = new ServicePubListener();
info->response_publisher_ =
Domain::createPublisher(participant, publisherParam, nullptr);
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->response_publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
goto fail;
Expand Down Expand Up @@ -285,6 +286,10 @@ rmw_create_service(
Domain::removePublisher(info->response_publisher_);
}

if (info->pub_listener_) {
delete info->pub_listener_;
}

if (info->request_subscriber_) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_subscriber_->getGuid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef struct CustomClientInfo
eprosima::fastrtps::Publisher * request_publisher_;
ClientListener * listener_;
eprosima::fastrtps::rtps::GUID_t writer_guid_;
eprosima::fastrtps::rtps::GUID_t reader_guid_;
eprosima::fastrtps::Participant * participant_;
const char * typesupport_identifier_;
ClientPubListener * pub_listener_;
Expand Down Expand Up @@ -88,7 +89,9 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
if (eprosima::fastrtps::rtps::ALIVE == response.sample_info_.sampleKind) {
response.sample_identity_ = response.sample_info_.related_sample_identity;

if (response.sample_identity_.writer_guid() == info_->writer_guid_) {
if (response.sample_identity_.writer_guid() == info_->reader_guid_ ||
response.sample_identity_.writer_guid() == info_->writer_guid_)
{
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <condition_variable>
#include <list>
#include <mutex>
#include <unordered_set>

#include "fastcdr/FastBuffer.h"

Expand All @@ -32,8 +33,10 @@
#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/guid_utils.hpp"

class ServiceListener;
class ServicePubListener;

typedef struct CustomServiceInfo
{
Expand All @@ -44,6 +47,7 @@ typedef struct CustomServiceInfo
eprosima::fastrtps::Subscriber * request_subscriber_;
eprosima::fastrtps::Publisher * response_publisher_;
ServiceListener * listener_;
ServicePubListener * pub_listener_;
eprosima::fastrtps::Participant * participant_;
const char * typesupport_identifier_;
} CustomServiceInfo;
Expand Down Expand Up @@ -84,6 +88,12 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
if (sub->takeNextData(&data, &request.sample_info_)) {
if (eprosima::fastrtps::rtps::ALIVE == request.sample_info_.sampleKind) {
request.sample_identity_ = request.sample_info_.sample_identity;
// Use response subscriber guid (on related_sample_identity) when present.
const eprosima::fastrtps::rtps::GUID_t & reader_guid =
request.sample_info_.related_sample_identity.writer_guid();
if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown() ) {
request.sample_identity_.writer_guid() = reader_guid;
}

std::lock_guard<std::mutex> lock(internalMutex_);

Expand Down Expand Up @@ -159,4 +169,49 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
public:
ServicePubListener() = default;

template<class Rep, class Period>
bool wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() -> bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
} else {
return;
}
cv_.notify_all();
}

private:
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define RMW_FASTRTPS_SHARED_CPP__GUID_UTILS_HPP_

#include <cassert>
#include <cstddef>
#include <cstring>
#include <type_traits>

Expand Down Expand Up @@ -55,6 +56,36 @@ copy_from_fastrtps_guid_to_byte_array(
memcpy(&guid_byte_array[prefix_size], &guid.entityId, guid.entityId.size);
}

struct hash_fastrtps_guid
{
std::size_t operator()(const eprosima::fastrtps::rtps::GUID_t & guid) const
{
union u_convert {
uint8_t plain_value[sizeof(guid)];
uint32_t plain_ints[sizeof(guid) / sizeof(uint32_t)];
} u;

static_assert(
sizeof(guid) == 16 &&
sizeof(u.plain_value) == sizeof(u.plain_ints) &&
offsetof(u_convert, plain_value) == offsetof(u_convert, plain_ints),
"Plain guid should be easily convertible to uint32_t[4]");

copy_from_fastrtps_guid_to_byte_array(guid, u.plain_value);

constexpr std::size_t prime_1 = 7;
constexpr std::size_t prime_2 = 31;
constexpr std::size_t prime_3 = 59;

size_t ret_val = prime_1 * u.plain_ints[0];
ret_val = prime_2 * (u.plain_ints[1] + ret_val);
ret_val = prime_3 * (u.plain_ints[2] + ret_val);
ret_val = u.plain_ints[3] + ret_val;

return ret_val;
}
};

} // namespace rmw_fastrtps_shared_cpp

#endif // RMW_FASTRTPS_SHARED_CPP__GUID_UTILS_HPP_
1 change: 1 addition & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ __rmw_send_request(
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_request);
data.impl = info->request_type_support_impl_;
wparams.related_sample_identity().writer_guid() = info->reader_guid_;
if (info->request_publisher_->write(&data, wparams)) {
returnedValue = RMW_RET_OK;
*sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 |
Expand Down
22 changes: 22 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,28 @@ __rmw_send_response(
wparams.related_sample_identity().sequence_number().low =
(int32_t)(request_header->sequence_number & 0xFFFFFFFF);

// TODO(MiguelCompany) The following block is a workaround for the race on the
// discovery of services. It is (ab)using a related_sample_identity on the request
// with the GUID of the response reader, so we can wait here for it to be matched to
// the server response writer. In the future, this should be done with the mechanism
// explained on OMG DDS-RPC 1.0 spec under section 7.6.2 (Enhanced Service Mapping)

// According to the list of possible entity kinds in section 9.3.1.2 of RTPS
// readers will have this bit on, while writers will not. We use this to know
// if the related guid is the request writer or the response reader.
constexpr uint8_t entity_id_is_reader_bit = 0x04;
const eprosima::fastrtps::rtps::GUID_t & related_guid =
wparams.related_sample_identity().writer_guid();
if ((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0) {
// Related guid is a reader, so it is the response subscription guid.
// Wait for the response writer to be matched with it.
auto listener = info->pub_listener_;
if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) {
RMW_SET_ERROR_MSG("client will not receive response");
return RMW_RET_ERROR;
}
}

rmw_fastrtps_shared_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_response);
Expand Down
3 changes: 3 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ __rmw_destroy_service(
if (info->response_publisher_ != nullptr) {
Domain::removePublisher(info->response_publisher_);
}
if (info->pub_listener_ != nullptr) {
delete info->pub_listener_;
}
if (info->listener_ != nullptr) {
delete info->listener_;
}
Expand Down
15 changes: 13 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,22 @@ __rmw_service_server_is_available(
return RMW_RET_OK;
}

if (0 == client_info->request_publisher_matched_count_.load()) {
if (number_of_request_subscribers != number_of_response_publishers) {
// not ready
return RMW_RET_OK;
}
if (0 == client_info->response_subscriber_matched_count_.load()) {

size_t matched_request_pubs = client_info->request_publisher_matched_count_.load();
if (0 == matched_request_pubs) {
// not ready
return RMW_RET_OK;
}
size_t matched_response_subs = client_info->response_subscriber_matched_count_.load();
if (0 == matched_response_subs) {
// not ready
return RMW_RET_OK;
}
if (matched_request_pubs != matched_response_subs) {
// not ready
return RMW_RET_OK;
}
Expand Down