Skip to content

Commit

Permalink
Fixed executor yielding entities to wrong node
Browse files Browse the repository at this point in the history
Also refactored some code to a sub-generator
  • Loading branch information
sloretz committed Nov 1, 2017
1 parent dd86a13 commit 6352e86
Showing 1 changed file with 51 additions and 41 deletions.
92 changes: 51 additions & 41 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,56 @@ def _can_execute(self, entity):
"""
return entity.callback_group.can_execute(entity) and not entity._executor_event

def _new_callbacks(self, nodes, wait_set):
"""
Yields brand new work to executor implementations.
:param nodes: nodes to yield work for
:type nodes: list
:param wait_set: wait set that has already been waited on
:type wait_set: rclpy.wait_set.WaitSet
:rtype: Generator[(callable, entity, :class:`rclpy.node.Node`)]
"""
yielded_work = False
# Process ready entities one node at a time
for node in nodes:
for tmr in node.timers:
if wait_set.is_ready(tmr.timer_pointer) and tmr.callback_group.can_execute(tmr):
# TODO(Sloretz) Which rcl cancelled timer bug does this workaround?
if not _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
continue
handler = self._make_handler(tmr, self._take_timer, self._execute_timer)
yielded_work = True
yield handler, tmr, node

for sub in node.subscriptions:
if (wait_set.is_ready(sub.subscription_pointer) and
sub.callback_group.can_execute(sub)):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in node.guards:
if gc._executor_triggered and gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

for cli in node.clients:
if wait_set.is_ready(cli.client_pointer) and cli.callback_group.can_execute(cli):
handler = self._make_handler(cli, self._take_client, self._execute_client)
yielded_work = True
yield handler, cli, node

for srv in node.services:
if wait_set.is_ready(srv.service_pointer) and srv.callback_group.can_execute(srv):
handler = self._make_handler(srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, srv, node
return yielded_work

def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
"""
Yield callbacks that are ready to be performed.
Expand Down Expand Up @@ -320,47 +370,7 @@ def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
for gc in [g for g in guards if wait_set.is_ready(g.guard_pointer)]:
gc._executor_triggered = True

# Process ready entities one node at a time
for node in nodes:
for tmr in [t for t in timers if wait_set.is_ready(t.timer_pointer)]:
# Check that a timer is ready to workaround rcl issue with cancelled timers
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr == timeout_timer:
continue
elif tmr.callback_group.can_execute(tmr):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
yielded_work = True
yield handler, tmr, node

for sub in [s for s in subscriptions if wait_set.is_ready(
s.subscription_pointer)]:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in [g for g in node.guards if g._executor_triggered]:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

for client in [c for c in clients if wait_set.is_ready(c.client_pointer)]:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

for srv in [s for s in services if wait_set.is_ready(s.service_pointer)]:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, srv, node
yielded_work = yield from self._new_callbacks(nodes, wait_set)

# Check timeout timer
if (
Expand Down

0 comments on commit 6352e86

Please sign in to comment.