Skip to content

Commit

Permalink
Feature/services timestamps (#545)
Browse files Browse the repository at this point in the history
* Take with info in take_response

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* Add service_info accessors.

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* Fix capsule access.

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* Handle both rmw_request_id_t and rmw_service_info_t

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* take with service_info

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* Add test for timestamps

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* remove bad quotes

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

Co-authored-by: Karsten Knese <karsten@openrobotics.org>
  • Loading branch information
iluetkeb and Karsten1987 authored Apr 24, 2020
1 parent 6fb1696 commit 1c645c8
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 13 deletions.
5 changes: 3 additions & 2 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,10 @@ def _take_client(self, client):
return _rclpy.rclpy_take_response(capsule, client.srv_type.Response)

async def _execute_client(self, client, seq_and_response):
sequence, response = seq_and_response
if sequence is not None:
header, response = seq_and_response
if header is not None:
try:
sequence = _rclpy.rclpy_service_info_get_sequence_number(header)
future = client._pending_requests[sequence]
except KeyError:
# The request was cancelled
Expand Down
109 changes: 98 additions & 11 deletions rclpy/src/rclpy/_rclpy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2716,7 +2716,14 @@ rclpy_send_response(PyObject * Py_UNUSED(self), PyObject * args)
rmw_request_id_t * header = PyCapsule_GetPointer(
pyheader, "rmw_request_id_t");
if (!header) {
return NULL;
rmw_service_info_t * info_header = PyCapsule_GetPointer(
pyheader, "rmw_service_info_t");
if (!info_header) {
return NULL;
}
// clear previous error, work-around worked
PyErr_Restore(NULL, NULL, NULL);
header = &(info_header->request_id);
}

destroy_ros_message_signature * destroy_ros_message = NULL;
Expand Down Expand Up @@ -3392,12 +3399,12 @@ rclpy_take_request(PyObject * Py_UNUSED(self), PyObject * args)
return NULL;
}

rmw_request_id_t * header = PyMem_Malloc(sizeof(rmw_request_id_t));
rmw_service_info_t * header = PyMem_Malloc(sizeof(rmw_service_info_t));
if (!header) {
PyErr_Format(PyExc_MemoryError, "Failed to allocate memory for request header");
return NULL;
}
rcl_ret_t ret = rcl_take_request(&(srv->service), header, taken_request);
rcl_ret_t ret = rcl_take_request_with_info(&(srv->service), header, taken_request);

if (ret != RCL_RET_OK && ret != RCL_RET_SERVICE_TAKE_FAILED) {
PyErr_Format(
Expand All @@ -3423,7 +3430,7 @@ rclpy_take_request(PyObject * Py_UNUSED(self), PyObject * args)
Py_DECREF(pytaken_request);
return NULL;
}
PyObject * pyheader = PyCapsule_New(header, "rmw_request_id_t", NULL);
PyObject * pyheader = PyCapsule_New(header, "rmw_service_info_t", NULL);
if (!pyheader) {
PyMem_Free(header);
Py_DECREF(pytaken_request);
Expand Down Expand Up @@ -3469,18 +3476,17 @@ rclpy_take_response(PyObject * Py_UNUSED(self), PyObject * args)
return NULL;
}

rmw_request_id_t * header = PyMem_Malloc(sizeof(rmw_request_id_t));
rmw_service_info_t * header = PyMem_Malloc(sizeof(rmw_service_info_t));
if (!header) {
PyErr_Format(PyExc_MemoryError, "Failed to allocate memory for response header");
return NULL;
}
rcl_ret_t ret = rcl_take_response(&(client->client), header, taken_response);
int64_t sequence = header->sequence_number;
PyMem_Free(header);
rcl_ret_t ret = rcl_take_response_with_info(&(client->client), header, taken_response);

// Create the tuple to return
PyObject * pytuple = PyTuple_New(2);
if (!pytuple) {
PyMem_Free(header);
return NULL;
}

Expand All @@ -3490,16 +3496,18 @@ rclpy_take_response(PyObject * Py_UNUSED(self), PyObject * args)
if (!pytaken_response) {
// the function has set the Python error
Py_DECREF(pytuple);
PyMem_Free(header);
return NULL;
}

PyObject * pysequence = PyLong_FromLongLong(sequence);
if (!pysequence) {
PyObject * pyheader = PyCapsule_New(header, "rmw_service_info_t", NULL);
if (!pyheader) {
Py_DECREF(pytaken_response);
Py_DECREF(pytuple);
PyMem_Free(header);
return NULL;
}
PyTuple_SET_ITEM(pytuple, 0, pysequence);
PyTuple_SET_ITEM(pytuple, 0, pyheader);
PyTuple_SET_ITEM(pytuple, 1, pytaken_response);
return pytuple;
}
Expand Down Expand Up @@ -5333,6 +5341,70 @@ rclpy_deserialize(PyObject * Py_UNUSED(self), PyObject * args)
return pydeserialized_ros_msg;
}

static rmw_service_info_t *
_service_info_from_args(PyObject * args)
{
PyObject * pyservice_info;

if (!PyArg_ParseTuple(args, "O", &pyservice_info)) {
return NULL;
}

return PyCapsule_GetPointer(pyservice_info, "rmw_service_info_t");
}

/// Retrieves the sequence number from a rmw_service_info_t capsule
/**
* Raises RuntimeError on failure.
*
* \param[in] pyservice_info Capsule pointing to the rmw_service_info_t
* \return the sequence number as a long
*/
static PyObject *
rclpy_service_info_get_sequence_number(PyObject * Py_UNUSED(self), PyObject * args)
{
rmw_service_info_t * service_info = _service_info_from_args(args);
if (service_info == NULL) {
return NULL;
}
return PyLong_FromLongLong(service_info->request_id.sequence_number);
}

/// Retrieves the source timestamp number from a rmw_service_info_t capsule
/**
* Raises RuntimeError on failure.
*
* \param[in] pyservice_info Capsule pointing to the rmw_service_info_t
* \return the source timestamps as a long
*/
static PyObject *
rclpy_service_info_get_source_timestamp(PyObject * Py_UNUSED(self), PyObject * args)
{
rmw_service_info_t * service_info = _service_info_from_args(args);
if (service_info == NULL) {
return NULL;
}
return PyLong_FromLongLong(service_info->source_timestamp);
}


/// Retrieves the received timestsamp number from a rmw_service_info_t capsule
/**
* Raises RuntimeError on failure.
*
* \param[in] pyservice_info Capsule pointing to the rmw_service_info_t
* \return the receive timestamp as a long
*/
static PyObject *
rclpy_service_info_get_received_timestamp(PyObject * Py_UNUSED(self), PyObject * args)
{
rmw_service_info_t * service_info = _service_info_from_args(args);
if (service_info == NULL) {
return NULL;
}
return PyLong_FromLongLong(service_info->received_timestamp);
}

/// Define the public methods of this module
static PyMethodDef rclpy_methods[] = {
{
Expand Down Expand Up @@ -5724,6 +5796,21 @@ static PyMethodDef rclpy_methods[] = {
"rclpy_deserialize", rclpy_deserialize, METH_VARARGS,
"Deserialize a ROS message."
},
{
"rclpy_service_info_get_sequence_number", rclpy_service_info_get_sequence_number,
METH_VARARGS,
"Retrieve sequence number from service_info"
},
{
"rclpy_service_info_get_source_timestamp", rclpy_service_info_get_source_timestamp,
METH_VARARGS,
"Retrieve source timestamp from service_info"
},
{
"rclpy_service_info_get_received_timestamp", rclpy_service_info_get_received_timestamp,
METH_VARARGS,
"Retrieve received timestamp from service_info"
},

{NULL, NULL, 0, NULL} /* sentinel */
};
Expand Down
25 changes: 25 additions & 0 deletions rclpy/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from rcl_interfaces.srv import GetParameters
import rclpy
import rclpy.executors
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


# TODO(sloretz) Reduce fudge once wait_for_service uses node graph events
Expand Down Expand Up @@ -90,6 +91,30 @@ def test_concurrent_calls_to_service(self):
self.node.destroy_client(cli)
self.node.destroy_service(srv)

def test_service_timestamps(self):
cli = self.node.create_client(GetParameters, 'get/parameters')
srv = self.node.create_service(
GetParameters, 'get/parameters',
lambda request, response: response)
try:
self.assertTrue(cli.wait_for_service(timeout_sec=20))
cli.call_async(GetParameters.Request())
cycle_count = 0
while cycle_count < 5:
with srv.handle as capsule:
result = _rclpy.rclpy_take_request(capsule, srv.srv_type.Request)
if result is not None:
request, header = result
source_timestamp = _rclpy.rclpy_service_info_get_source_timestamp(header)
self.assertNotEqual(0, source_timestamp)
return
else:
time.sleep(0.1)
self.fail('Did not get a request in time')
finally:
self.node.destroy_client(cli)
self.node.destroy_service(srv)

def test_different_type_raises(self):
cli = self.node.create_client(GetParameters, 'get/parameters')
srv = self.node.create_service(
Expand Down

0 comments on commit 1c645c8

Please sign in to comment.