Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic Statistics: Add SubscriptionTopicStatistics class #1050

Merged
merged 15 commits into from
Apr 20, 2020
Merged
20 changes: 20 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ project(rclcpp)

find_package(ament_cmake_ros REQUIRED)
find_package(builtin_interfaces REQUIRED)
find_package(libstatistics_collector REQUIRED)
find_package(rcl REQUIRED)
find_package(rcl_interfaces REQUIRED)
find_package(rcl_yaml_param_parser REQUIRED)
Expand All @@ -14,6 +15,7 @@ find_package(rosgraph_msgs REQUIRED)
find_package(rosidl_runtime_cpp REQUIRED)
find_package(rosidl_typesupport_c REQUIRED)
find_package(rosidl_typesupport_cpp REQUIRED)
find_package(statistics_msgs REQUIRED)
find_package(tracetools REQUIRED)

# Default to C++14
Expand Down Expand Up @@ -116,6 +118,7 @@ add_library(${PROJECT_NAME}
${${PROJECT_NAME}_SRCS})
# specific order: dependents before dependencies
ament_target_dependencies(${PROJECT_NAME}
"libstatistics_collector"
"rcl"
"rcl_yaml_param_parser"
"rcpputils"
Expand All @@ -124,6 +127,7 @@ ament_target_dependencies(${PROJECT_NAME}
"rosgraph_msgs"
"rosidl_typesupport_cpp"
"rosidl_runtime_cpp"
"statistics_msgs"
"tracetools"
)

Expand All @@ -143,6 +147,7 @@ install(
ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME})

ament_export_dependencies(libstatistics_collector)
ament_export_dependencies(rcl)
ament_export_dependencies(rcpputils)
ament_export_dependencies(rcutils)
Expand All @@ -152,6 +157,7 @@ ament_export_dependencies(rosidl_typesupport_cpp)
ament_export_dependencies(rosidl_typesupport_c)
ament_export_dependencies(rosidl_runtime_cpp)
ament_export_dependencies(rcl_yaml_param_parser)
ament_export_dependencies(statistics_msgs)
ament_export_dependencies(tracetools)

if(BUILD_TESTING)
Expand Down Expand Up @@ -535,6 +541,20 @@ if(BUILD_TESTING)
target_link_libraries(test_wait_set ${PROJECT_NAME})
endif()

ament_add_gtest(test_subscription_topic_statistics test/topic_statistics/test_subscription_topic_statistics.cpp)
if(TARGET test_subscription_topic_statistics)
ament_target_dependencies(test_subscription_topic_statistics
"libstatistics_collector"
"rcl_interfaces"
"rcutils"
"rmw"
"rosidl_runtime_cpp"
"rosidl_typesupport_cpp"
"statistics_msgs"
"test_msgs")
target_link_libraries(test_subscription_topic_statistics ${PROJECT_NAME})
endif()

ament_add_gtest(test_subscription_options test/test_subscription_options.cpp)
if(TARGET test_subscription_options)
ament_target_dependencies(test_subscription_options "rcl")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
#define RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "libstatistics_collector/collector/generate_statistics_message.hpp"
#include "libstatistics_collector/moving_average_statistics/types.hpp"
#include "libstatistics_collector/topic_statistics_collector/constants.hpp"
#include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp"

#include "rcl/time.h"
#include "rclcpp/time.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/timer.hpp"

#include "statistics_msgs/msg/metrics_message.hpp"

namespace rclcpp
{
namespace topic_statistics
{

constexpr const char kDefaultPublishTopicName[]{"/statistics"};
constexpr const std::chrono::milliseconds kDefaultPublishingPeriod{std::chrono::seconds(1)};

using libstatistics_collector::collector::GenerateStatisticMessage;
using statistics_msgs::msg::MetricsMessage;
using libstatistics_collector::moving_average_statistics::StatisticData;

/**
* Class used to collect, measure, and publish topic statistics data. Current statistics
* supported for subscribers are received message age and received message period.
*
* \tparam CallbackMessageT the subscribed message type
*/
template<typename CallbackMessageT>
class SubscriptionTopicStatistics
{
using TopicStatsCollector =
libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector<
CallbackMessageT>;
using ReceivedMessagePeriod =
libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector<
CallbackMessageT>;

public:
/// Construct a SubscriptionTopicStatistics object.
/**
* This object wraps utilities, defined in libstatistics_collector, to collect,
* measure, and publish topic statistics data. This throws an invalid_argument
* if the input publisher is null.
*
* \param node_name the name of the node, which created this instance, in order to denote
* topic source
* \param publisher instance constructed by the node in order to publish statistics data.
* This class owns the publisher.
*/
SubscriptionTopicStatistics(
const std::string & node_name,
rclcpp::Publisher<statistics_msgs::msg::MetricsMessage>::SharedPtr publisher)
: node_name_(node_name),
publisher_(std::move(publisher))
{
// TODO(dbbonnie): ros-tooling/aws-roadmap/issues/226, received message age

if (nullptr == publisher_) {
throw std::invalid_argument("publisher pointer is nullptr");
}

bring_up();
}

virtual ~SubscriptionTopicStatistics()
{
tear_down();
}

/// Handle a message received by the subscription to collect statistics.
/**
* \param received_message the message received by the subscription
* \param now_nanoseconds current time in nanoseconds
*/
virtual void handle_message(
const CallbackMessageT & received_message,
const rclcpp::Time now_nanoseconds) const
{
for (const auto & collector : subscriber_statistics_collectors_) {
collector->OnMessageReceived(received_message, now_nanoseconds.nanoseconds());
}
}

/// Set the timer used to publish statistics messages.
/**
* \param measurement_timer the timer to fire the publisher, created by the node
*/
void set_publisher_timer(rclcpp::TimerBase::SharedPtr publisher_timer)
{
publisher_timer_ = publisher_timer;
}

/// Publish a populated MetricsStatisticsMessage.
virtual void publish_message()
{
rclcpp::Time window_end{get_current_nanoseconds_since_epoch()};

for (auto & collector : subscriber_statistics_collectors_) {
const auto collected_stats = collector->GetStatisticsResults();

auto message = libstatistics_collector::collector::GenerateStatisticMessage(
node_name_,
collector->GetMetricName(),
collector->GetMetricUnit(),
window_start_,
window_end,
collected_stats);
publisher_->publish(message);
}
window_start_ = window_end;
}

protected:
/// Return a vector of all the currently collected data.
/**
* \return a vector of all the collected data
*/
std::vector<StatisticData> get_current_collector_data() const
{
std::vector<StatisticData> data;
for (const auto & collector : subscriber_statistics_collectors_) {
data.push_back(collector->GetStatisticsResults());
}
return data;
}

private:
/// Construct and start all collectors and set window_start_.
void bring_up()
{
auto received_message_period = std::make_unique<ReceivedMessagePeriod>();
received_message_period->Start();
subscriber_statistics_collectors_.emplace_back(std::move(received_message_period));

window_start_ = rclcpp::Time(get_current_nanoseconds_since_epoch());
}

/// Stop all collectors, clear measurements, stop publishing timer, and reset publisher.
void tear_down()
{
for (auto & collector : subscriber_statistics_collectors_) {
collector->Stop();
}

subscriber_statistics_collectors_.clear();

if (publisher_timer_) {
publisher_timer_->cancel();
publisher_timer_.reset();
}

publisher_.reset();
}

/// Return the current nanoseconds (count) since epoch.
/**
* \return the current nanoseconds (count) since epoch
*/
int64_t get_current_nanoseconds_since_epoch() const
{
const auto now = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
}

/// Collection of statistics collectors
std::vector<std::unique_ptr<TopicStatsCollector>> subscriber_statistics_collectors_{};
/// Node name used to generate topic statistics messages to be published
const std::string node_name_;
/// Publisher, created by the node, used to publish topic statistics messages
rclcpp::Publisher<statistics_msgs::msg::MetricsMessage>::SharedPtr publisher_;
/// Timer which fires the publisher
rclcpp::TimerBase::SharedPtr publisher_timer_;
/// The start of the collection window, used in the published topic statistics message
rclcpp::Time window_start_;
};
} // namespace topic_statistics
} // namespace rclcpp

#endif // RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
2 changes: 2 additions & 0 deletions rclcpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
<build_export_depend>rosidl_typesupport_c</build_export_depend>
<build_export_depend>rosidl_typesupport_cpp</build_export_depend>

<depend>libstatistics_collector</depend>
<depend>rcl</depend>
<depend>rcl_yaml_param_parser</depend>
<depend>rcpputils</depend>
<depend>rcutils</depend>
<depend>rmw</depend>
<depend>statistics_msgs</depend>
<depend>tracetools</depend>

<test_depend>ament_cmake_gmock</test_depend>
Expand Down
Loading