Skip to content

Commit

Permalink
Add method to take with message_info (#542)
Browse files Browse the repository at this point in the history
* Add method to take with message_info

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* rework rclpy_take

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* always decref

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* remove double free

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* Test that timestamp is nonzero.

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* int64_t might be long long on some platforms

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* Style fixes.

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>
  • Loading branch information
iluetkeb authored Apr 23, 2020
1 parent fe68a6c commit 6fb1696
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 33 deletions.
2 changes: 1 addition & 1 deletion rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ async def _execute_timer(self, tmr, _):

def _take_subscription(self, sub):
with sub.handle as capsule:
msg = _rclpy.rclpy_take(capsule, sub.msg_type, sub.raw)
msg, _ = _rclpy.rclpy_take(capsule, sub.msg_type, sub.raw)
return msg

async def _execute_subscription(self, sub, msg):
Expand Down
98 changes: 67 additions & 31 deletions rclpy/src/rclpy/_rclpy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3216,10 +3216,11 @@ rclpy_wait(PyObject * Py_UNUSED(self), PyObject * args)
/// Take a raw message from a given subscription (internal- for rclpy_take with raw=True)
/**
* \param[in] rcl subscription pointer pointing to the subscription to process the message
* \param[in] message_info struct pointer, may be null. if non-null, will be filled with message info
* \return Python byte array with the raw serialized message contents
*/
static PyObject *
rclpy_take_raw(rcl_subscription_t * subscription)
rclpy_take_raw_with_info(rcl_subscription_t * subscription, rmw_message_info_t * message_info)
{
// Create a serialized message object
rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message();
Expand All @@ -3237,7 +3238,7 @@ rclpy_take_raw(rcl_subscription_t * subscription)
return NULL;
}

ret = rcl_take_serialized_message(subscription, &msg, NULL, NULL);
ret = rcl_take_serialized_message(subscription, &msg, message_info, NULL);
if (ret != RCL_RET_OK) {
PyErr_Format(
RCLError,
Expand All @@ -3261,18 +3262,47 @@ rclpy_take_raw(rcl_subscription_t * subscription)
return python_bytes;
}

/// Take a message from a given subscription
static PyObject *
rclpy_message_info_to_dict(rmw_message_info_t * message_info)
{
PyObject * dict = PyDict_New();
if (dict == NULL) {
PyErr_Format(PyExc_RuntimeError, "Failed to create dictionary object");
return NULL;
}

// we bail out at the end in case of errors
PyObject * source_timestamp = PyLong_FromLongLong(message_info->source_timestamp);
if (source_timestamp != NULL) {
PyDict_SetItemString(dict, "source_timestamp", source_timestamp);
}
PyObject * received_timestamp = PyLong_FromLongLong(message_info->source_timestamp);
if (received_timestamp != NULL) {
PyDict_SetItemString(dict, "received_timestamp", source_timestamp);
}

// check for errors
if (source_timestamp == NULL || received_timestamp == NULL) {
Py_DECREF(dict);
dict = NULL;
}

return dict;
}

/// Take a message and its message_info from a given subscription
/**
* \param[in] pysubscription Capsule pointing to the subscription to process the message
* \param[in] pymsg_type Instance of the message type to take
* \return Python message with all fields populated with received message
* \return Tuple of (Python message with all fields populated with received message, message_info)
*/
static PyObject *
rclpy_take(PyObject * Py_UNUSED(self), PyObject * args)
{
PyObject * pysubscription;
PyObject * pymsg_type;
PyObject * pyraw;
PyObject * pytaken_msg;

if (!PyArg_ParseTuple(args, "OOO", &pysubscription, &pymsg_type, &pyraw)) {
return NULL;
Expand All @@ -3288,41 +3318,47 @@ rclpy_take(PyObject * Py_UNUSED(self), PyObject * args)
return NULL;
}

rmw_message_info_t message_info;
if (PyObject_IsTrue(pyraw) == 1) { // raw=True
return rclpy_take_raw(&(sub->subscription));
}

destroy_ros_message_signature * destroy_ros_message = NULL;
void * taken_msg = rclpy_create_from_py(pymsg_type, &destroy_ros_message);
if (!taken_msg) {
return NULL;
}

rcl_ret_t ret = rcl_take(&(sub->subscription), taken_msg, NULL, NULL);
pytaken_msg = rclpy_take_raw_with_info(&(sub->subscription), &message_info);
} else {
destroy_ros_message_signature * destroy_ros_message = NULL;
void * taken_msg = rclpy_create_from_py(pymsg_type, &destroy_ros_message);
if (!taken_msg) {
return NULL;
}

if (ret != RCL_RET_OK && ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
PyErr_Format(
RCLError,
"Failed to take from a subscription: %s", rcl_get_error_string().str);
rcl_reset_error();
destroy_ros_message(taken_msg);
return NULL;
}
rcl_ret_t ret = rcl_take(&(sub->subscription), taken_msg, &message_info, NULL);

if (ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
PyObject * pytaken_msg = rclpy_convert_to_py(taken_msg, pymsg_type);
destroy_ros_message(taken_msg);
if (!pytaken_msg) {
// the function has set the Python error
if (ret != RCL_RET_OK && ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
PyErr_Format(
RCLError,
"Failed to take from a subscription: %s", rcl_get_error_string().str);
rcl_reset_error();
destroy_ros_message(taken_msg);
return NULL;
}

return pytaken_msg;
if (ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
pytaken_msg = rclpy_convert_to_py(taken_msg, pymsg_type);
destroy_ros_message(taken_msg);
if (!pytaken_msg) {
// the function has set the Python error
return NULL;
}
}
}

// if take failed, just do nothing
destroy_ros_message(taken_msg);
Py_RETURN_NONE;
// make result tuple
PyObject * mi_dict = rclpy_message_info_to_dict(&message_info);
if (mi_dict == NULL) {
Py_DECREF(pytaken_msg);
return NULL;
}
PyObject * tuple = PyTuple_Pack(2, pytaken_msg, mi_dict);
Py_DECREF(pytaken_msg);
Py_DECREF(mi_dict);
return tuple;
}

/// Take a request from a given service
Expand Down
25 changes: 25 additions & 0 deletions rclpy/test/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import pathlib
import time
import unittest
from unittest.mock import Mock

Expand All @@ -34,6 +35,7 @@
from rclpy.exceptions import ParameterImmutableException
from rclpy.exceptions import ParameterNotDeclaredException
from rclpy.executors import SingleThreadedExecutor
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.parameter import Parameter
from rclpy.qos import qos_profile_sensor_data
from rclpy.qos import QoSDurabilityPolicy
Expand Down Expand Up @@ -134,6 +136,29 @@ def test_create_raw_subscription(self):

executor.shutdown()

def dummy_cb(self, msg):
pass

def test_take(self):
basic_types_pub = self.node.create_publisher(BasicTypes, 'take_test', 1)
sub = self.node.create_subscription(
BasicTypes,
'take_test',
self.dummy_cb,
1)
basic_types_msg = BasicTypes()
basic_types_pub.publish(basic_types_msg)
cycle_count = 0
while cycle_count < 5:
with sub.handle as capsule:
result = _rclpy.rclpy_take(capsule, sub.msg_type, False)
if result is not None:
msg, info = result
self.assertNotEqual(0, info['source_timestamp'])
return
else:
time.sleep(0.1)

def test_create_client(self):
self.node.create_client(GetParameters, 'get/parameters')
with self.assertRaisesRegex(InvalidServiceNameException, 'must not contain characters'):
Expand Down
2 changes: 1 addition & 1 deletion rclpy/test/test_waitable.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def take_data(self):
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
if self.subscription_is_ready:
self.subscription_is_ready = False
return _rclpy.rclpy_take(self.subscription, EmptyMsg, False)
return _rclpy.rclpy_take(self.subscription, EmptyMsg, False)[0]
return None

async def execute(self, taken_data):
Expand Down

0 comments on commit 6fb1696

Please sign in to comment.