-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rclpy wait for service #127
Closed
+1,512
−524
Closed
Changes from 39 commits
Commits
Show all changes
49 commits
Select commit
Hold shift + click to select a range
372c1a9
Dirty: First proof of concept for wait_for_service implementation
sloretz 817990a
Moved wait set code to its own class for code reuse
sloretz 9b91a62
Added timeout_sec_to_nsec()
sloretz 371c917
wait_for_service() implemented with timers
sloretz b17f497
Added unit tests for timeout_sec_to_nsec()
sloretz 35898a6
Added test for WaitSet class
sloretz 5b1ca62
Use negative timeouts to mean block forever
sloretz 489bc0e
Double quotes to single quotes
sloretz f854466
Added wait_for_service() tests and fixed bugs it caught
sloretz 9255e1f
Eliminate blind exception warning
sloretz f89a222
Reduce flakiness of test by increasing time to 0.1s
sloretz 39ac231
Comment says negative timeouts block forever
sloretz 3237ed8
Use :returns:
sloretz 7acb2a6
Move add_subscriptions()
sloretz 4720d3f
arugments -> arguments
sloretz 395934a
Daemon as keyword arg
sloretz 1c9a098
Remove unnecessary namespace argument
sloretz 655425f
Use S_TO_NS in test
sloretz ccd93d8
More tests using S_TO_NS
sloretz 441bbb5
Use monotonic clock for testing timer time
sloretz 8733c14
Increased test timeout by 30 seconds
sloretz 3f26246
CheckExact -> IsValid
sloretz 0a79f28
Fixed wait_set not clearing ready_pointers
sloretz 65c545f
Remove unnecessary namespace keyword arg
sloretz d81684c
Non-blocking wait
sloretz 4bbc323
remove expression that always evaluates to True
sloretz 99280ae
Raise ValueError on invalid capsule
sloretz 1d2beeb
Simplified timeout_sec_to_nsec
sloretz 6980711
Added 'WaitSet.destroy()' and made executor use it
sloretz c69858f
GraphListener periodically checks if rclpy is shutdown
sloretz 4cd2fb1
Misc fixes after pycapsule names
sloretz 0cf2aeb
Wait set class always clears entities before waiting
sloretz 7843c5c
Remove underscore on import
sloretz ded618f
Reformat timeout line
sloretz e8f7fd6
Use () when raising exceptions
sloretz 149ad43
Removed _ on imports
sloretz dd86a13
Executor optimizations
sloretz 6352e86
Fixed executor yielding entities to wrong node
sloretz 0dbf8c7
Use list() only where necessary
sloretz 1a2ce96
Docstring in imperitive mood
sloretz 7ee5ead
Executors reuse iterator
sloretz 3628db4
moved some wait_set code into C
sloretz b71d1c5
Avoid another list comprehension
sloretz 9d9113d
Replaced WaitSet with C code in executor
sloretz adfddf6
Remove test code
sloretz 315db4e
Use lists instead of set
sloretz e078c47
Use locally defined function instead of member function
sloretz 5c92085
Shorten code using macro
sloretz 70b8652
Move everything to new wait_set code
sloretz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
# Copyright 2017 Open Source Robotics Foundation, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import threading | ||
import traceback | ||
|
||
from rclpy.constants import S_TO_NS | ||
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy | ||
import rclpy.logging | ||
from rclpy.timer import WallTimer | ||
from rclpy.utilities import ok | ||
from rclpy.wait_set import WaitSet | ||
|
||
|
||
class GraphListenerSingleton: | ||
"""Manage a thread to listen for graph events.""" | ||
|
||
def __new__(cls, *args, **kwargs): | ||
if not hasattr(cls, '__singleton'): | ||
setattr(cls, '__singleton', super().__new__(cls, *args, **kwargs)) | ||
return getattr(cls, '__singleton') | ||
|
||
def __init__(self): | ||
# Maps guard_condition pointers to a list of subscriber callbacks | ||
self._callbacks = {} | ||
# Maps timer instances to timer callbacks | ||
self._timers = {} | ||
self._gc_handle, self._gc_pointer = _rclpy.rclpy_create_guard_condition() | ||
self._thread = None | ||
self._lock = threading.RLock() | ||
self._wait_set = WaitSet() | ||
self._wait_set.add_guard_condition(self._gc_handle, self._gc_pointer) | ||
|
||
def __del__(self): | ||
self.destroy() | ||
|
||
@classmethod | ||
def destroy(cls): | ||
self = getattr(cls, '__singleton') | ||
if self is not None: | ||
with self._lock: | ||
setattr(cls, '__singleton', None) | ||
self._thread = None | ||
_rclpy.rclpy_destroy_entity(self._gc_handle) | ||
|
||
def _try_start_thread(self): | ||
# Assumes lock is already held | ||
if self._thread is None: | ||
self._thread = threading.Thread(target=self._runner, daemon=True) | ||
self._thread.start() | ||
|
||
def add_timer(self, timer_period_ns, callback): | ||
""" | ||
Call callback when timer triggers. | ||
|
||
:param timer_period_ns: time until timer triggers in nanoseconds | ||
:type timer_period_ns: integer | ||
:param callback: called when the graph updates | ||
:type callback: callable | ||
:rtype: rclpy.timer.WallTimer | ||
""" | ||
with self._lock: | ||
tmr = WallTimer(callback, None, timer_period_ns) | ||
self._timers[tmr] = callback | ||
self._wait_set.add_timer(tmr.timer_handle, tmr.timer_pointer) | ||
|
||
self._try_start_thread() | ||
return tmr | ||
|
||
def remove_timer(self, timer): | ||
""" | ||
Remove a timer from the wait set. | ||
|
||
:param timer: A timer returned from add_timer() | ||
:type timer: rclpy.timer.WallTimer instance | ||
""" | ||
with self._lock: | ||
if timer in self._timers: | ||
del self._timers[timer] | ||
self._wait_set.remove_timer(timer.timer_pointer) | ||
|
||
def add_callback(self, node_handle, callback): | ||
""" | ||
Call callback when node graph gets updates. | ||
|
||
:param node_handle: rclpy node handle | ||
:type node_handle: PyCapsule | ||
:param callback: called when the graph updates | ||
:type callback: callable | ||
""" | ||
with self._lock: | ||
gc_handle, gc_pointer = _rclpy.rclpy_get_graph_guard_condition(node_handle) | ||
if gc_pointer not in self._callbacks: | ||
# new node, rebuild wait set | ||
self._callbacks[gc_pointer] = [] | ||
self._wait_set.add_guard_condition(gc_handle, gc_pointer) | ||
_rclpy.rclpy_trigger_guard_condition(self._gc_handle) | ||
|
||
# Add a callback | ||
self._callbacks[gc_pointer].append(callback) | ||
|
||
self._try_start_thread() | ||
# start the thread if necessary | ||
if self._thread is None: | ||
self._thread = threading.Thread(target=self._runner) | ||
self._thread.daemon = True | ||
self._thread.start() | ||
|
||
def remove_callback(self, node_handle, callback): | ||
""" | ||
Stop listening for graph updates for the given node and callback. | ||
|
||
:param node_handle: rclpy node handle | ||
:type node_handle: PyCapsule | ||
:param callback: called when the graph updates | ||
:type callback: callable | ||
""" | ||
with self._lock: | ||
gc_handle, gc_pointer = _rclpy.rclpy_get_graph_guard_condition(node_handle) | ||
if gc_pointer in self._callbacks: | ||
# Remove the callback | ||
self._callbacks[gc_pointer].remove(callback) | ||
|
||
if not self._callbacks[gc_pointer]: | ||
# last subscriber for this node, remove the node and rebuild the wait set | ||
del self._callbacks[gc_pointer] | ||
self._wait_set.remove_guard_condition(gc_pointer) | ||
_rclpy.rclpy_trigger_guard_condition(self._gc_handle) | ||
|
||
def _runner(self): | ||
while True: | ||
# Wait 1 second | ||
self._wait_set.wait(S_TO_NS) | ||
|
||
with self._lock: | ||
# Shutdown if necessary | ||
if not ok(): | ||
self.destroy() | ||
break | ||
|
||
# notify graph event subscribers | ||
if not self._thread: | ||
# Asked to shut down thread | ||
return | ||
ready_callbacks = [] | ||
# Guard conditions | ||
for gc_pointer, callback_list in self._callbacks.items(): | ||
if self._wait_set.is_ready(gc_pointer): | ||
for callback in callback_list: | ||
ready_callbacks.append(callback) | ||
# Timers | ||
for tmr, callback in self._timers.items(): | ||
if self._wait_set.is_ready(tmr.timer_pointer): | ||
ready_callbacks.append(callback) | ||
_rclpy.rclpy_call_timer(tmr.timer_handle) | ||
# Call callbacks | ||
for callback in ready_callbacks: | ||
try: | ||
callback() | ||
except Exception: | ||
rclpy.logging.logwarn(traceback.format_exc()) | ||
|
||
|
||
class GraphEventSubscription: | ||
"""Manage subscription to node graph updates.""" | ||
|
||
def __init__(self, node_handle, callback, timeout_ns=-1, timeout_callback=None): | ||
self._listener = GraphListenerSingleton() | ||
self._node_handle = node_handle | ||
self._callback = callback | ||
self._listener.add_callback(self._node_handle, self._callback) | ||
self._timeout_callback = timeout_callback | ||
self._timer = None | ||
if timeout_ns >= 0: | ||
self._timer = self._listener.add_timer(timeout_ns, self.on_timeout) | ||
|
||
def on_timeout(self): | ||
self._timeout_callback() | ||
self._unsubscribe() | ||
|
||
def _unsubscribe(self): | ||
if self._callback: | ||
self._listener.remove_callback(self._node_handle, self._callback) | ||
self._callback = None | ||
if self._timer: | ||
self._listener.remove_timer(self._timer) | ||
self._timeout_callback = None | ||
self._timer = None | ||
|
||
def __del__(self): | ||
self._unsubscribe() | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, t, v, tb): | ||
self._unsubscribe() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this singleton being destroyed in case of e.g. shutting down rclpy? When is the thread being stopped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In d10ee26 it checks if rclpy was shut down once per second.