diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 781a06cf3b..51b028f090 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -4,6 +4,7 @@ project(rclcpp) find_package(ament_cmake_ros REQUIRED) find_package(builtin_interfaces REQUIRED) +find_package(libstatistics_collector REQUIRED) find_package(rcl REQUIRED) find_package(rcl_interfaces REQUIRED) find_package(rcl_yaml_param_parser REQUIRED) @@ -14,6 +15,7 @@ find_package(rosgraph_msgs REQUIRED) find_package(rosidl_runtime_cpp REQUIRED) find_package(rosidl_typesupport_c REQUIRED) find_package(rosidl_typesupport_cpp REQUIRED) +find_package(statistics_msgs REQUIRED) find_package(tracetools REQUIRED) # Default to C++14 @@ -116,6 +118,7 @@ add_library(${PROJECT_NAME} ${${PROJECT_NAME}_SRCS}) # specific order: dependents before dependencies ament_target_dependencies(${PROJECT_NAME} + "libstatistics_collector" "rcl" "rcl_yaml_param_parser" "rcpputils" @@ -124,6 +127,7 @@ ament_target_dependencies(${PROJECT_NAME} "rosgraph_msgs" "rosidl_typesupport_cpp" "rosidl_runtime_cpp" + "statistics_msgs" "tracetools" ) @@ -143,6 +147,7 @@ install( ament_export_include_directories(include) ament_export_libraries(${PROJECT_NAME}) +ament_export_dependencies(libstatistics_collector) ament_export_dependencies(rcl) ament_export_dependencies(rcpputils) ament_export_dependencies(rcutils) @@ -152,6 +157,7 @@ ament_export_dependencies(rosidl_typesupport_cpp) ament_export_dependencies(rosidl_typesupport_c) ament_export_dependencies(rosidl_runtime_cpp) ament_export_dependencies(rcl_yaml_param_parser) +ament_export_dependencies(statistics_msgs) ament_export_dependencies(tracetools) if(BUILD_TESTING) @@ -535,6 +541,20 @@ if(BUILD_TESTING) target_link_libraries(test_wait_set ${PROJECT_NAME}) endif() + ament_add_gtest(test_subscription_topic_statistics test/topic_statistics/test_subscription_topic_statistics.cpp) + if(TARGET test_subscription_topic_statistics) + ament_target_dependencies(test_subscription_topic_statistics + "libstatistics_collector" + "rcl_interfaces" + "rcutils" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "statistics_msgs" + "test_msgs") + target_link_libraries(test_subscription_topic_statistics ${PROJECT_NAME}) + endif() + ament_add_gtest(test_subscription_options test/test_subscription_options.cpp) if(TARGET test_subscription_options) ament_target_dependencies(test_subscription_options "rcl") diff --git a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp new file mode 100644 index 0000000000..711b04998c --- /dev/null +++ b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp @@ -0,0 +1,204 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_ +#define RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_ + +#include +#include +#include +#include + +#include "libstatistics_collector/collector/generate_statistics_message.hpp" +#include "libstatistics_collector/moving_average_statistics/types.hpp" +#include "libstatistics_collector/topic_statistics_collector/constants.hpp" +#include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp" + +#include "rcl/time.h" +#include "rclcpp/time.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/timer.hpp" + +#include "statistics_msgs/msg/metrics_message.hpp" + +namespace rclcpp +{ +namespace topic_statistics +{ + +constexpr const char kDefaultPublishTopicName[]{"/statistics"}; +constexpr const std::chrono::milliseconds kDefaultPublishingPeriod{std::chrono::seconds(1)}; + +using libstatistics_collector::collector::GenerateStatisticMessage; +using statistics_msgs::msg::MetricsMessage; +using libstatistics_collector::moving_average_statistics::StatisticData; + +/** + * Class used to collect, measure, and publish topic statistics data. Current statistics + * supported for subscribers are received message age and received message period. + * + * \tparam CallbackMessageT the subscribed message type + */ +template +class SubscriptionTopicStatistics +{ + using TopicStatsCollector = + libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector< + CallbackMessageT>; + using ReceivedMessagePeriod = + libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector< + CallbackMessageT>; + +public: + /// Construct a SubscriptionTopicStatistics object. + /** + * This object wraps utilities, defined in libstatistics_collector, to collect, + * measure, and publish topic statistics data. This throws an invalid_argument + * if the input publisher is null. + * + * \param node_name the name of the node, which created this instance, in order to denote + * topic source + * \param publisher instance constructed by the node in order to publish statistics data. + * This class owns the publisher. + */ + SubscriptionTopicStatistics( + const std::string & node_name, + rclcpp::Publisher::SharedPtr publisher) + : node_name_(node_name), + publisher_(std::move(publisher)) + { + // TODO(dbbonnie): ros-tooling/aws-roadmap/issues/226, received message age + + if (nullptr == publisher_) { + throw std::invalid_argument("publisher pointer is nullptr"); + } + + bring_up(); + } + + virtual ~SubscriptionTopicStatistics() + { + tear_down(); + } + + /// Handle a message received by the subscription to collect statistics. + /** + * \param received_message the message received by the subscription + * \param now_nanoseconds current time in nanoseconds + */ + virtual void handle_message( + const CallbackMessageT & received_message, + const rclcpp::Time now_nanoseconds) const + { + for (const auto & collector : subscriber_statistics_collectors_) { + collector->OnMessageReceived(received_message, now_nanoseconds.nanoseconds()); + } + } + + /// Set the timer used to publish statistics messages. + /** + * \param measurement_timer the timer to fire the publisher, created by the node + */ + void set_publisher_timer(rclcpp::TimerBase::SharedPtr publisher_timer) + { + publisher_timer_ = publisher_timer; + } + + /// Publish a populated MetricsStatisticsMessage. + virtual void publish_message() + { + rclcpp::Time window_end{get_current_nanoseconds_since_epoch()}; + + for (auto & collector : subscriber_statistics_collectors_) { + const auto collected_stats = collector->GetStatisticsResults(); + + auto message = libstatistics_collector::collector::GenerateStatisticMessage( + node_name_, + collector->GetMetricName(), + collector->GetMetricUnit(), + window_start_, + window_end, + collected_stats); + publisher_->publish(message); + } + window_start_ = window_end; + } + +protected: + /// Return a vector of all the currently collected data. + /** + * \return a vector of all the collected data + */ + std::vector get_current_collector_data() const + { + std::vector data; + for (const auto & collector : subscriber_statistics_collectors_) { + data.push_back(collector->GetStatisticsResults()); + } + return data; + } + +private: + /// Construct and start all collectors and set window_start_. + void bring_up() + { + auto received_message_period = std::make_unique(); + received_message_period->Start(); + subscriber_statistics_collectors_.emplace_back(std::move(received_message_period)); + + window_start_ = rclcpp::Time(get_current_nanoseconds_since_epoch()); + } + + /// Stop all collectors, clear measurements, stop publishing timer, and reset publisher. + void tear_down() + { + for (auto & collector : subscriber_statistics_collectors_) { + collector->Stop(); + } + + subscriber_statistics_collectors_.clear(); + + if (publisher_timer_) { + publisher_timer_->cancel(); + publisher_timer_.reset(); + } + + publisher_.reset(); + } + + /// Return the current nanoseconds (count) since epoch. + /** + * \return the current nanoseconds (count) since epoch + */ + int64_t get_current_nanoseconds_since_epoch() const + { + const auto now = std::chrono::system_clock::now(); + return std::chrono::duration_cast(now.time_since_epoch()).count(); + } + + /// Collection of statistics collectors + std::vector> subscriber_statistics_collectors_{}; + /// Node name used to generate topic statistics messages to be published + const std::string node_name_; + /// Publisher, created by the node, used to publish topic statistics messages + rclcpp::Publisher::SharedPtr publisher_; + /// Timer which fires the publisher + rclcpp::TimerBase::SharedPtr publisher_timer_; + /// The start of the collection window, used in the published topic statistics message + rclcpp::Time window_start_; +}; +} // namespace topic_statistics +} // namespace rclcpp + +#endif // RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_ diff --git a/rclcpp/package.xml b/rclcpp/package.xml index 751450e1d5..a5e09c0f18 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -22,11 +22,13 @@ rosidl_typesupport_c rosidl_typesupport_cpp + libstatistics_collector rcl rcl_yaml_param_parser rcpputils rcutils rmw + statistics_msgs tracetools ament_cmake_gmock diff --git a/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp b/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp new file mode 100644 index 0000000000..de90ecb590 --- /dev/null +++ b/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp @@ -0,0 +1,140 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "libstatistics_collector/moving_average_statistics/types.hpp" + +#include "rclcpp/create_publisher.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/qos.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" + +#include "statistics_msgs/msg/metrics_message.hpp" +#include "test_msgs/msg/empty.hpp" + +namespace +{ +constexpr const char kTestNodeName[]{"test_sub_stats_node"}; +constexpr const char kTestSubStatsTopic[]{"/test_sub_stats_topic"}; +constexpr const char kTestTopicStatisticsTopic[]{"/test_topic_statistics_topic"}; +constexpr const uint64_t kNoSamples{0}; +} // namespace + +using test_msgs::msg::Empty; +using statistics_msgs::msg::MetricsMessage; +using rclcpp::topic_statistics::SubscriptionTopicStatistics; +using libstatistics_collector::moving_average_statistics::StatisticData; + +template +class TestSubscriptionTopicStatistics : public SubscriptionTopicStatistics +{ +public: + TestSubscriptionTopicStatistics( + const std::string & node_name, + rclcpp::Publisher::SharedPtr publisher) + : SubscriptionTopicStatistics(node_name, publisher) + { + } + + virtual ~TestSubscriptionTopicStatistics() = default; + + /// Exposed for testing + std::vector get_current_collector_data() const + { + return SubscriptionTopicStatistics::get_current_collector_data(); + } +}; + +/** + * Empty subscriber node: used to create subscriber topic statistics requirements + */ +class EmptySubscriber : public rclcpp::Node +{ +public: + EmptySubscriber(const std::string & name, const std::string & topic) + : Node(name) + { + auto callback = [this](Empty::UniquePtr msg) { + this->receive_message(*msg); + }; + subscription_ = create_subscription>( + topic, + rclcpp::QoS(rclcpp::KeepAll()), + callback); + } + + virtual ~EmptySubscriber() = default; + +private: + void receive_message(const Empty &) const + { + } + + rclcpp::Subscription::SharedPtr subscription_; +}; + +/** + * Test fixture to bring up and teardown rclcpp + */ +class TestSubscriptionTopicStatisticsFixture : public ::testing::Test +{ +protected: + void SetUp() + { + rclcpp::init(0 /* argc */, nullptr /* argv */); + empty_subscriber = std::make_shared( + kTestNodeName, + kTestSubStatsTopic); + } + + void TearDown() + { + rclcpp::shutdown(); + empty_subscriber.reset(); + } + std::shared_ptr empty_subscriber; +}; + +TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) +{ + // manually create publisher tied to the node + auto topic_stats_publisher = + empty_subscriber->create_publisher( + kTestTopicStatisticsTopic, + 10); + + // construct the instance + auto sub_topic_stats = std::make_unique>( + empty_subscriber->get_name(), + topic_stats_publisher); + + using libstatistics_collector::moving_average_statistics::StatisticData; + + // expect no data has been collected / no samples received + for (const auto & data : sub_topic_stats->get_current_collector_data()) { + EXPECT_TRUE(std::isnan(data.average)); + EXPECT_TRUE(std::isnan(data.min)); + EXPECT_TRUE(std::isnan(data.max)); + EXPECT_TRUE(std::isnan(data.standard_deviation)); + EXPECT_EQ(kNoSamples, data.sample_count); + } +}