Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rclpy wait for service #127

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
372c1a9
Dirty: First proof of concept for wait_for_service implementation
sloretz Oct 10, 2017
817990a
Moved wait set code to its own class for code reuse
sloretz Oct 13, 2017
9b91a62
Added timeout_sec_to_nsec()
sloretz Oct 13, 2017
371c917
wait_for_service() implemented with timers
sloretz Oct 13, 2017
b17f497
Added unit tests for timeout_sec_to_nsec()
sloretz Oct 13, 2017
35898a6
Added test for WaitSet class
sloretz Oct 13, 2017
5b1ca62
Use negative timeouts to mean block forever
sloretz Oct 13, 2017
489bc0e
Double quotes to single quotes
sloretz Oct 13, 2017
f854466
Added wait_for_service() tests and fixed bugs it caught
sloretz Oct 13, 2017
9255e1f
Eliminate blind exception warning
sloretz Oct 13, 2017
f89a222
Reduce flakiness of test by increasing time to 0.1s
sloretz Oct 13, 2017
39ac231
Comment says negative timeouts block forever
sloretz Oct 17, 2017
3237ed8
Use :returns:
sloretz Oct 17, 2017
7acb2a6
Move add_subscriptions()
sloretz Oct 17, 2017
4720d3f
arugments -> arguments
sloretz Oct 17, 2017
395934a
Daemon as keyword arg
sloretz Oct 17, 2017
1c9a098
Remove unnecessary namespace argument
sloretz Oct 17, 2017
655425f
Use S_TO_NS in test
sloretz Oct 17, 2017
ccd93d8
More tests using S_TO_NS
sloretz Oct 17, 2017
441bbb5
Use monotonic clock for testing timer time
sloretz Oct 17, 2017
8733c14
Increased test timeout by 30 seconds
sloretz Oct 17, 2017
3f26246
CheckExact -> IsValid
sloretz Oct 17, 2017
0a79f28
Fixed wait_set not clearing ready_pointers
sloretz Oct 17, 2017
65c545f
Remove unnecessary namespace keyword arg
sloretz Oct 18, 2017
d81684c
Non-blocking wait
sloretz Oct 18, 2017
4bbc323
remove expression that always evaluates to True
sloretz Oct 18, 2017
99280ae
Raise ValueError on invalid capsule
sloretz Oct 19, 2017
1d2beeb
Simplified timeout_sec_to_nsec
sloretz Oct 19, 2017
6980711
Added 'WaitSet.destroy()' and made executor use it
sloretz Oct 24, 2017
c69858f
GraphListener periodically checks if rclpy is shutdown
sloretz Oct 24, 2017
4cd2fb1
Misc fixes after pycapsule names
sloretz Oct 25, 2017
0cf2aeb
Wait set class always clears entities before waiting
sloretz Oct 31, 2017
7843c5c
Remove underscore on import
sloretz Oct 31, 2017
ded618f
Reformat timeout line
sloretz Oct 31, 2017
e8f7fd6
Use () when raising exceptions
sloretz Oct 31, 2017
149ad43
Removed _ on imports
sloretz Oct 31, 2017
dd86a13
Executor optimizations
sloretz Nov 1, 2017
6352e86
Fixed executor yielding entities to wrong node
sloretz Nov 1, 2017
0dbf8c7
Use list() only where necessary
sloretz Nov 1, 2017
1a2ce96
Docstring in imperitive mood
sloretz Nov 1, 2017
7ee5ead
Executors reuse iterator
sloretz Nov 1, 2017
3628db4
moved some wait_set code into C
sloretz Nov 2, 2017
b71d1c5
Avoid another list comprehension
sloretz Nov 2, 2017
9d9113d
Replaced WaitSet with C code in executor
sloretz Nov 3, 2017
adfddf6
Remove test code
sloretz Nov 3, 2017
315db4e
Use lists instead of set
sloretz Nov 8, 2017
e078c47
Use locally defined function instead of member function
sloretz Nov 8, 2017
5c92085
Shorten code using macro
sloretz Nov 8, 2017
70b8652
Move everything to new wait_set code
sloretz Nov 9, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ if(BUILD_TESTING)
WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}"
PYTHON_EXECUTABLE "${PYTHON_EXECUTABLE}"
APPEND_ENV AMENT_PREFIX_PATH=${ament_index_build_path}
TIMEOUT 90
TIMEOUT 120
)
endif()
endif()
Expand Down
37 changes: 37 additions & 0 deletions rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import threading

from rclpy.graph_listener import GraphEventSubscription
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
import rclpy.utilities

Expand Down Expand Up @@ -77,3 +78,39 @@ def wait_for_future(self):
thread1 = ResponseThread(self)
thread1.start()
thread1.join()

def service_is_ready(self):
return _rclpy.rclpy_service_server_is_available(self.node_handle, self.client_handle)

def wait_for_service(self, timeout_sec=None):
"""
Block until the service is available.

:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:rtype: bool
:returns: true if the service is available
"""
timeout_nsec = rclpy.utilities.timeout_sec_to_nsec(timeout_sec)
result = self.service_is_ready()
if result or timeout_sec == 0:
return result

event = threading.Event()

def on_graph_event():
nonlocal self
nonlocal event
nonlocal result
result = self.service_is_ready()
if result:
event.set()

def on_timeout():
nonlocal event
event.set()

with GraphEventSubscription(self.node_handle, on_graph_event, timeout_nsec, on_timeout):
event.wait()

return result
262 changes: 124 additions & 138 deletions rclpy/rclpy/executors.py

Large diffs are not rendered by default.

208 changes: 208 additions & 0 deletions rclpy/rclpy/graph_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
import traceback

from rclpy.constants import S_TO_NS
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
import rclpy.logging
from rclpy.timer import WallTimer
from rclpy.utilities import ok
from rclpy.wait_set import WaitSet


class GraphListenerSingleton:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this singleton being destroyed in case of e.g. shutting down rclpy? When is the thread being stopped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In d10ee26 it checks if rclpy was shut down once per second.

"""Manage a thread to listen for graph events."""

def __new__(cls, *args, **kwargs):
if not hasattr(cls, '__singleton'):
setattr(cls, '__singleton', super().__new__(cls, *args, **kwargs))
return getattr(cls, '__singleton')

def __init__(self):
# Maps guard_condition pointers to a list of subscriber callbacks
self._callbacks = {}
# Maps timer instances to timer callbacks
self._timers = {}
self._gc_handle, self._gc_pointer = _rclpy.rclpy_create_guard_condition()
self._thread = None
self._lock = threading.RLock()
self._wait_set = WaitSet()
self._wait_set.add_guard_condition(self._gc_handle, self._gc_pointer)

def __del__(self):
self.destroy()

@classmethod
def destroy(cls):
self = getattr(cls, '__singleton')
if self is not None:
with self._lock:
setattr(cls, '__singleton', None)
self._thread = None
_rclpy.rclpy_destroy_entity(self._gc_handle)

def _try_start_thread(self):
# Assumes lock is already held
if self._thread is None:
self._thread = threading.Thread(target=self._runner, daemon=True)
self._thread.start()

def add_timer(self, timer_period_ns, callback):
"""
Call callback when timer triggers.

:param timer_period_ns: time until timer triggers in nanoseconds
:type timer_period_ns: integer
:param callback: called when the graph updates
:type callback: callable
:rtype: rclpy.timer.WallTimer
"""
with self._lock:
tmr = WallTimer(callback, None, timer_period_ns)
self._timers[tmr] = callback
self._wait_set.add_timer(tmr.timer_handle, tmr.timer_pointer)

self._try_start_thread()
return tmr

def remove_timer(self, timer):
"""
Remove a timer from the wait set.

:param timer: A timer returned from add_timer()
:type timer: rclpy.timer.WallTimer instance
"""
with self._lock:
if timer in self._timers:
del self._timers[timer]
self._wait_set.remove_timer(timer.timer_pointer)

def add_callback(self, node_handle, callback):
"""
Call callback when node graph gets updates.

:param node_handle: rclpy node handle
:type node_handle: PyCapsule
:param callback: called when the graph updates
:type callback: callable
"""
with self._lock:
gc_handle, gc_pointer = _rclpy.rclpy_get_graph_guard_condition(node_handle)
if gc_pointer not in self._callbacks:
# new node, rebuild wait set
self._callbacks[gc_pointer] = []
self._wait_set.add_guard_condition(gc_handle, gc_pointer)
_rclpy.rclpy_trigger_guard_condition(self._gc_handle)

# Add a callback
self._callbacks[gc_pointer].append(callback)

self._try_start_thread()
# start the thread if necessary
if self._thread is None:
self._thread = threading.Thread(target=self._runner)
self._thread.daemon = True
self._thread.start()

def remove_callback(self, node_handle, callback):
"""
Stop listening for graph updates for the given node and callback.

:param node_handle: rclpy node handle
:type node_handle: PyCapsule
:param callback: called when the graph updates
:type callback: callable
"""
with self._lock:
gc_handle, gc_pointer = _rclpy.rclpy_get_graph_guard_condition(node_handle)
if gc_pointer in self._callbacks:
# Remove the callback
self._callbacks[gc_pointer].remove(callback)

if not self._callbacks[gc_pointer]:
# last subscriber for this node, remove the node and rebuild the wait set
del self._callbacks[gc_pointer]
self._wait_set.remove_guard_condition(gc_pointer)
_rclpy.rclpy_trigger_guard_condition(self._gc_handle)

def _runner(self):
while True:
# Wait 1 second
self._wait_set.wait(S_TO_NS)

with self._lock:
# Shutdown if necessary
if not ok():
self.destroy()
break

# notify graph event subscribers
if not self._thread:
# Asked to shut down thread
return
ready_callbacks = []
# Guard conditions
for gc_pointer, callback_list in self._callbacks.items():
if self._wait_set.is_ready(gc_pointer):
for callback in callback_list:
ready_callbacks.append(callback)
# Timers
for tmr, callback in self._timers.items():
if self._wait_set.is_ready(tmr.timer_pointer):
ready_callbacks.append(callback)
_rclpy.rclpy_call_timer(tmr.timer_handle)
# Call callbacks
for callback in ready_callbacks:
try:
callback()
except Exception:
rclpy.logging.logwarn(traceback.format_exc())


class GraphEventSubscription:
"""Manage subscription to node graph updates."""

def __init__(self, node_handle, callback, timeout_ns=-1, timeout_callback=None):
self._listener = GraphListenerSingleton()
self._node_handle = node_handle
self._callback = callback
self._listener.add_callback(self._node_handle, self._callback)
self._timeout_callback = timeout_callback
self._timer = None
if timeout_ns >= 0:
self._timer = self._listener.add_timer(timeout_ns, self.on_timeout)

def on_timeout(self):
self._timeout_callback()
self._unsubscribe()

def _unsubscribe(self):
if self._callback:
self._listener.remove_callback(self._node_handle, self._callback)
self._callback = None
if self._timer:
self._listener.remove_timer(self._timer)
self._timeout_callback = None
self._timer = None

def __del__(self):
self._unsubscribe()

def __enter__(self):
return self

def __exit__(self, t, v, tb):
self._unsubscribe()
21 changes: 21 additions & 0 deletions rclpy/rclpy/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import threading

from rclpy.constants import S_TO_NS
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy

g_shutdown_lock = threading.Lock()
Expand All @@ -38,3 +39,23 @@ def try_shutdown():

def get_rmw_implementation_identifier():
return _rclpy.rclpy_get_rmw_implementation_identifier()


def timeout_sec_to_nsec(timeout_sec):
"""
Convert timeout in seconds to rcl compatible timeout in nanoseconds.

Python tends to use floating point numbers in seconds for timeouts. This utility converts a
python-style timeout to an integer in nanoseconds that can be used by rcl_wait.

:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if < 1ns
:type timeout_sec: float or None
:rtype: int
:returns: rcl_wait compatible timeout in nanoseconds
"""
if timeout_sec is None or timeout_sec < 0:
# Block forever
return -1
else:
# wait for given time
return int(float(timeout_sec) * S_TO_NS)
Loading