Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve test.remote reliability #12573

Merged
merged 5 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions third-party/realdds/include/realdds/dds-topic-reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener
typedef std::function< void() > on_data_available_callback;
typedef std::function< void( eprosima::fastdds::dds::SubscriptionMatchedStatus const & ) >
on_subscription_matched_callback;
typedef std::function< void( eprosima::fastdds::dds::SampleLostStatus const & ) >
on_sample_lost_callback;

void on_data_available( on_data_available_callback callback ) { _on_data_available = std::move( callback ); }
void on_subscription_matched( on_subscription_matched_callback callback )
{
_on_subscription_matched = std::move( callback );
}
void on_sample_lost( on_sample_lost_callback callback )
{
_on_sample_lost = std::move( callback );
}

class qos : public eprosima::fastdds::dds::DataReaderQos
{
Expand Down Expand Up @@ -91,9 +97,12 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener

void on_data_available( eprosima::fastdds::dds::DataReader * ) override;

void on_sample_lost( eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::SampleLostStatus & ) override;

protected:
on_data_available_callback _on_data_available;
on_subscription_matched_callback _on_subscription_matched;
on_sample_lost_callback _on_sample_lost;
};


Expand Down
5 changes: 5 additions & 0 deletions third-party/realdds/py/pyrealdds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ PYBIND11_MODULE(NAME, m) {
(dds_topic_reader &, int),
( eprosima::fastdds::dds::SubscriptionMatchedStatus const & status ),
callback( self, status.current_count_change ); ) )
.def( FN_FWD( dds_topic_reader,
on_sample_lost,
(dds_topic_reader &, int, int),
(eprosima::fastdds::dds::SampleLostStatus const & status),
callback( self, status.total_count, status.total_count_change ); ) )
.def( "topic", &dds_topic_reader::topic )
.def( "run", &dds_topic_reader::run )
.def( "qos", []() { return reader_qos(); } )
Expand Down
32 changes: 21 additions & 11 deletions third-party/realdds/src/dds-topic-reader-thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
#include <realdds/dds-subscriber.h>
#include <realdds/dds-utilities.h>

#include <rsutils/time/stopwatch.h>

#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/topic/Topic.hpp>
Expand Down Expand Up @@ -41,17 +39,16 @@ void dds_topic_reader_thread::run( qos const & rqos )
if( ! _on_data_available )
DDS_THROW( runtime_error, "on-data-available must be provided" );

eprosima::fastdds::dds::StatusMask status_mask;
status_mask << eprosima::fastdds::dds::StatusMask::subscription_matched();
//status_mask << eprosima::fastdds::dds::StatusMask::data_available();
_reader = DDS_API_CALL( _subscriber->get()->create_datareader( _topic->get(), rqos, this, status_mask ) );
_reader = DDS_API_CALL( _subscriber->get()->create_datareader( _topic->get(), rqos ) );

_th = std::thread(
[this, name = _topic->get()->get_name()]()
{
eprosima::fastdds::dds::WaitSet wait_set;
auto & condition = _reader->get_statuscondition();
condition.set_enabled_statuses( eprosima::fastdds::dds::StatusMask::data_available() );
condition.set_enabled_statuses( eprosima::fastdds::dds::StatusMask::data_available()
<< eprosima::fastdds::dds::StatusMask::subscription_matched()
<< eprosima::fastdds::dds::StatusMask::sample_lost() );
wait_set.attach_condition( condition );

wait_set.attach_condition( _stopped );
Expand All @@ -64,10 +61,23 @@ void dds_topic_reader_thread::run( qos const & rqos )
if( _stopped.get_trigger_value() )
break;

rsutils::time::stopwatch stopwatch;
_on_data_available();
if( stopwatch.get_elapsed() > std::chrono::milliseconds( 500 ) )
LOG_WARNING( "<---- '" << name << "' callback took too long!" );
auto & changed = _reader->get_status_changes();
if( changed.is_active( eprosima::fastdds::dds::StatusMask::sample_lost() ) )
{
eprosima::fastdds::dds::SampleLostStatus status;
_reader->get_sample_lost_status( status );
on_sample_lost( _reader, status );
}
if( changed.is_active( eprosima::fastdds::dds::StatusMask::data_available() ) )
{
on_data_available( _reader );
}
if( changed.is_active( eprosima::fastdds::dds::StatusMask::subscription_matched() ) )
{
eprosima::fastdds::dds::SubscriptionMatchedStatus status;
_reader->get_subscription_matched_status( status );
on_subscription_matched( _reader, status );
}
}
} );
}
Expand Down
22 changes: 21 additions & 1 deletion third-party/realdds/src/dds-topic-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <realdds/dds-serialization.h>
#include <realdds/dds-utilities.h>

#include <rsutils/time/stopwatch.h>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
Expand Down Expand Up @@ -106,6 +108,7 @@ void dds_topic_reader::run( qos const & rqos )
eprosima::fastdds::dds::StatusMask status_mask;
status_mask << eprosima::fastdds::dds::StatusMask::subscription_matched();
status_mask << eprosima::fastdds::dds::StatusMask::data_available();
status_mask << eprosima::fastdds::dds::StatusMask::sample_lost();
_reader = DDS_API_CALL( _subscriber->get()->create_datareader( _topic->get(), rqos, this, status_mask ) );
}

Expand Down Expand Up @@ -133,11 +136,28 @@ void dds_topic_reader::on_subscription_matched(
_on_subscription_matched( info );
}


void dds_topic_reader::on_data_available( eprosima::fastdds::dds::DataReader * )
{
// Called when a new Data Message is received
// Called when a new Data Message is received
if( _on_data_available )
{
rsutils::time::stopwatch stopwatch;
_on_data_available();
if( stopwatch.get_elapsed() > std::chrono::milliseconds( 500 ) )
LOG_WARNING( "<---- '" << _topic->get()->get_name() << "' callback took too long!" );
}
}


void dds_topic_reader::on_sample_lost( eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::SampleLostStatus & status )
{
// Called when a sample is lost: i.e., when a fragment is received that is a jump in sequence number
// If such a jump in sequence number (sample) isn't received then we never get here!
if( _on_sample_lost )
_on_sample_lost( status );
else
LOG_WARNING( "[" << _topic->get()->get_name() << "] " << status.total_count_change << " sample(s) lost" );
}


Expand Down
17 changes: 17 additions & 0 deletions unit-tests/dds/test-options.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,23 @@ def close_server():
device.set_option_value( option, 12. ) # updates server & client
test.check_equal( option.get_value(), 12. )

device = None
stream = None


#############################################################################################
with test.closure( "New device should get the new option value" ):
test.check( option is not None )
test.check( option.stream() is None ) # Because we removed the device & stream references
device = dds.device( participant, info )
device.wait_until_ready()
if test.check_equal( device.n_streams(), 1 ):
stream = device.streams()[0]
options = stream.options();
test.check_equal( len( options ), 3 )
option = options[1]
test.check_equal( option.get_value(), 12. ) # The new value - not the default

remote.run( 'close_server()' )
device = None

Expand Down
49 changes: 32 additions & 17 deletions unit-tests/py/rspy/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
messages in case of a failed check
"""

import os, sys, subprocess, traceback, platform, math
import os, sys, subprocess, threading, traceback, platform, math

from rspy import log

Expand Down Expand Up @@ -732,29 +732,37 @@ def status( self ):
def on_finish( self, callback ):
self._on_finish = callback

def _output_ready( self ):
log.d( self._name, self._exception and 'raised an error' or 'is ready' )
if self._events:
event = self._events.pop(0)
if event:
event.set()
else:
# We raise the error here only as a last resort: we prefer handing it over to
# the waiting thread on the event!
self._raise_if_needed()

def _output_reader( self ):
"""
This is the worker function called from a thread to output the process stdout.
It is in danger of HANGING UP our own process because readline blocks forever! To avoid this
please follow the usage guidelines in the class notes.
"""
nested_prefix = self._nested_indent and f'[{self._nested_indent}] ' or ''
if not nested_prefix:
x = -1
missing_ready = None
for line in iter( self._process.stdout.readline, '' ):
# NOTE: line will include the terminating \n EOL
# NOTE: so readline will return '' (with no EOL) when EOF is reached - the "sentinel"
# 2nd argument to iter() - and we'll break out of the loop
if line == '___ready\n':
log.d( self._name, self._exception and 'raised an error' or 'is ready' )
if self._events:
event = self._events.pop(0)
if event:
event.set()
else:
# We raise the error here only as a last resort: we prefer handing it over to
# the waiting thread on the event!
self._raise_if_needed()
self._output_ready()
continue
if not nested_prefix or line.find( nested_prefix ) < 0: # there could be color codes in the line
if nested_prefix:
x = line.find( nested_prefix ) # there could be color codes in the line
if x < 0:
if self._exception:
self._exception.append( line[:-1] )
# We cannot raise an error here -- it'll just exit the thread and not be
Expand All @@ -764,8 +772,16 @@ def _output_reader( self ):
elif line.startswith( ' File "' ):
# Some exception are syntax errors in the command, which would not have a 'Traceback'...
self._exception = [line[:-1]]
if missing_ready and line.endswith( missing_ready ):
self._output_ready()
line = line[:-len(missing_ready)]
missing_ready = None
if not line:
continue
print( nested_prefix + line, end='', flush=True )
else:
if x > 0 and line[:x] == '___ready\n'[:x]:
missing_ready = '___ready\n'[x:]
print( line, end='', flush=True )
#
log.d( self._name, 'stdout is finished' )
Expand All @@ -777,13 +793,11 @@ def start( self ):
"""
Start the process
"""
import subprocess, threading
log.d( self._name, 'starting:', self._cmd )
self._process = subprocess.Popen( self._cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True )
self._thread = threading.Thread( target = remote._output_reader, args=(self,) )
#
# We allow waiting until the script is ready for input: see wait_until_ready()
import threading
self._ready = threading.Event()
self._events = [ self._ready ]
#
Expand Down Expand Up @@ -850,10 +864,11 @@ def wait( self, timeout=10 ):
if self._interactive:
self._process.stdin.write( 'exit()\n' ) # make sure we respond to it to avoid timeouts
self._process.stdin.flush()
log.d( 'waiting for', self._name, 'to finish...' )
self._thread.join( timeout )
if self._thread.is_alive():
log.d( self._name, 'waiting for thread join timed out after', timeout, 'seconds' )
if self._thread != threading.current_thread():
log.d( 'waiting for', self._name, 'to finish...' )
self._thread.join( timeout )
if self._thread.is_alive():
log.d( self._name, 'waiting for thread join timed out after', timeout, 'seconds' )
self._terminate()
self._raise_if_needed()
return self.status()
Expand Down
Loading