From 1c645c873272e1e61b1367d579c59d6367813b9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ingo=20L=C3=BCtkebohle?= Date: Fri, 24 Apr 2020 06:31:14 +0200 Subject: [PATCH] Feature/services timestamps (#545) * Take with info in take_response Signed-off-by: Luetkebohle Ingo (CR/AEX3) * Add service_info accessors. Signed-off-by: Luetkebohle Ingo (CR/AEX3) * Fix capsule access. Signed-off-by: Luetkebohle Ingo (CR/AEX3) * Handle both rmw_request_id_t and rmw_service_info_t Signed-off-by: Luetkebohle Ingo (CR/AEX3) * take with service_info Signed-off-by: Luetkebohle Ingo (CR/AEX3) * Add test for timestamps Signed-off-by: Luetkebohle Ingo (CR/AEX3) * remove bad quotes Signed-off-by: Karsten Knese Co-authored-by: Karsten Knese --- rclpy/rclpy/executors.py | 5 +- rclpy/src/rclpy/_rclpy.c | 109 ++++++++++++++++++++++++++++++++++---- rclpy/test/test_client.py | 25 +++++++++ 3 files changed, 126 insertions(+), 13 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 1cd3dbab3..b5dd8366f 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -334,9 +334,10 @@ def _take_client(self, client): return _rclpy.rclpy_take_response(capsule, client.srv_type.Response) async def _execute_client(self, client, seq_and_response): - sequence, response = seq_and_response - if sequence is not None: + header, response = seq_and_response + if header is not None: try: + sequence = _rclpy.rclpy_service_info_get_sequence_number(header) future = client._pending_requests[sequence] except KeyError: # The request was cancelled diff --git a/rclpy/src/rclpy/_rclpy.c b/rclpy/src/rclpy/_rclpy.c index ae328d427..49084cc4d 100644 --- a/rclpy/src/rclpy/_rclpy.c +++ b/rclpy/src/rclpy/_rclpy.c @@ -2716,7 +2716,14 @@ rclpy_send_response(PyObject * Py_UNUSED(self), PyObject * args) rmw_request_id_t * header = PyCapsule_GetPointer( pyheader, "rmw_request_id_t"); if (!header) { - return NULL; + rmw_service_info_t * info_header = PyCapsule_GetPointer( + pyheader, "rmw_service_info_t"); + if (!info_header) { + return NULL; + } + // clear previous error, work-around worked + PyErr_Restore(NULL, NULL, NULL); + header = &(info_header->request_id); } destroy_ros_message_signature * destroy_ros_message = NULL; @@ -3392,12 +3399,12 @@ rclpy_take_request(PyObject * Py_UNUSED(self), PyObject * args) return NULL; } - rmw_request_id_t * header = PyMem_Malloc(sizeof(rmw_request_id_t)); + rmw_service_info_t * header = PyMem_Malloc(sizeof(rmw_service_info_t)); if (!header) { PyErr_Format(PyExc_MemoryError, "Failed to allocate memory for request header"); return NULL; } - rcl_ret_t ret = rcl_take_request(&(srv->service), header, taken_request); + rcl_ret_t ret = rcl_take_request_with_info(&(srv->service), header, taken_request); if (ret != RCL_RET_OK && ret != RCL_RET_SERVICE_TAKE_FAILED) { PyErr_Format( @@ -3423,7 +3430,7 @@ rclpy_take_request(PyObject * Py_UNUSED(self), PyObject * args) Py_DECREF(pytaken_request); return NULL; } - PyObject * pyheader = PyCapsule_New(header, "rmw_request_id_t", NULL); + PyObject * pyheader = PyCapsule_New(header, "rmw_service_info_t", NULL); if (!pyheader) { PyMem_Free(header); Py_DECREF(pytaken_request); @@ -3469,18 +3476,17 @@ rclpy_take_response(PyObject * Py_UNUSED(self), PyObject * args) return NULL; } - rmw_request_id_t * header = PyMem_Malloc(sizeof(rmw_request_id_t)); + rmw_service_info_t * header = PyMem_Malloc(sizeof(rmw_service_info_t)); if (!header) { PyErr_Format(PyExc_MemoryError, "Failed to allocate memory for response header"); return NULL; } - rcl_ret_t ret = rcl_take_response(&(client->client), header, taken_response); - int64_t sequence = header->sequence_number; - PyMem_Free(header); + rcl_ret_t ret = rcl_take_response_with_info(&(client->client), header, taken_response); // Create the tuple to return PyObject * pytuple = PyTuple_New(2); if (!pytuple) { + PyMem_Free(header); return NULL; } @@ -3490,16 +3496,18 @@ rclpy_take_response(PyObject * Py_UNUSED(self), PyObject * args) if (!pytaken_response) { // the function has set the Python error Py_DECREF(pytuple); + PyMem_Free(header); return NULL; } - PyObject * pysequence = PyLong_FromLongLong(sequence); - if (!pysequence) { + PyObject * pyheader = PyCapsule_New(header, "rmw_service_info_t", NULL); + if (!pyheader) { Py_DECREF(pytaken_response); Py_DECREF(pytuple); + PyMem_Free(header); return NULL; } - PyTuple_SET_ITEM(pytuple, 0, pysequence); + PyTuple_SET_ITEM(pytuple, 0, pyheader); PyTuple_SET_ITEM(pytuple, 1, pytaken_response); return pytuple; } @@ -5333,6 +5341,70 @@ rclpy_deserialize(PyObject * Py_UNUSED(self), PyObject * args) return pydeserialized_ros_msg; } +static rmw_service_info_t * +_service_info_from_args(PyObject * args) +{ + PyObject * pyservice_info; + + if (!PyArg_ParseTuple(args, "O", &pyservice_info)) { + return NULL; + } + + return PyCapsule_GetPointer(pyservice_info, "rmw_service_info_t"); +} + +/// Retrieves the sequence number from a rmw_service_info_t capsule +/** + * Raises RuntimeError on failure. + * + * \param[in] pyservice_info Capsule pointing to the rmw_service_info_t + * \return the sequence number as a long + */ +static PyObject * +rclpy_service_info_get_sequence_number(PyObject * Py_UNUSED(self), PyObject * args) +{ + rmw_service_info_t * service_info = _service_info_from_args(args); + if (service_info == NULL) { + return NULL; + } + return PyLong_FromLongLong(service_info->request_id.sequence_number); +} + +/// Retrieves the source timestamp number from a rmw_service_info_t capsule +/** + * Raises RuntimeError on failure. + * + * \param[in] pyservice_info Capsule pointing to the rmw_service_info_t + * \return the source timestamps as a long + */ +static PyObject * +rclpy_service_info_get_source_timestamp(PyObject * Py_UNUSED(self), PyObject * args) +{ + rmw_service_info_t * service_info = _service_info_from_args(args); + if (service_info == NULL) { + return NULL; + } + return PyLong_FromLongLong(service_info->source_timestamp); +} + + +/// Retrieves the received timestsamp number from a rmw_service_info_t capsule +/** + * Raises RuntimeError on failure. + * + * \param[in] pyservice_info Capsule pointing to the rmw_service_info_t + * \return the receive timestamp as a long + */ +static PyObject * +rclpy_service_info_get_received_timestamp(PyObject * Py_UNUSED(self), PyObject * args) +{ + rmw_service_info_t * service_info = _service_info_from_args(args); + if (service_info == NULL) { + return NULL; + } + return PyLong_FromLongLong(service_info->received_timestamp); +} + /// Define the public methods of this module static PyMethodDef rclpy_methods[] = { { @@ -5724,6 +5796,21 @@ static PyMethodDef rclpy_methods[] = { "rclpy_deserialize", rclpy_deserialize, METH_VARARGS, "Deserialize a ROS message." }, + { + "rclpy_service_info_get_sequence_number", rclpy_service_info_get_sequence_number, + METH_VARARGS, + "Retrieve sequence number from service_info" + }, + { + "rclpy_service_info_get_source_timestamp", rclpy_service_info_get_source_timestamp, + METH_VARARGS, + "Retrieve source timestamp from service_info" + }, + { + "rclpy_service_info_get_received_timestamp", rclpy_service_info_get_received_timestamp, + METH_VARARGS, + "Retrieve received timestamp from service_info" + }, {NULL, NULL, 0, NULL} /* sentinel */ }; diff --git a/rclpy/test/test_client.py b/rclpy/test/test_client.py index 4f8e60fca..f48202519 100644 --- a/rclpy/test/test_client.py +++ b/rclpy/test/test_client.py @@ -18,6 +18,7 @@ from rcl_interfaces.srv import GetParameters import rclpy import rclpy.executors +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy # TODO(sloretz) Reduce fudge once wait_for_service uses node graph events @@ -90,6 +91,30 @@ def test_concurrent_calls_to_service(self): self.node.destroy_client(cli) self.node.destroy_service(srv) + def test_service_timestamps(self): + cli = self.node.create_client(GetParameters, 'get/parameters') + srv = self.node.create_service( + GetParameters, 'get/parameters', + lambda request, response: response) + try: + self.assertTrue(cli.wait_for_service(timeout_sec=20)) + cli.call_async(GetParameters.Request()) + cycle_count = 0 + while cycle_count < 5: + with srv.handle as capsule: + result = _rclpy.rclpy_take_request(capsule, srv.srv_type.Request) + if result is not None: + request, header = result + source_timestamp = _rclpy.rclpy_service_info_get_source_timestamp(header) + self.assertNotEqual(0, source_timestamp) + return + else: + time.sleep(0.1) + self.fail('Did not get a request in time') + finally: + self.node.destroy_client(cli) + self.node.destroy_service(srv) + def test_different_type_raises(self): cli = self.node.create_client(GetParameters, 'get/parameters') srv = self.node.create_service(