Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signal in rsutils #12228

Merged
merged 13 commits into from
Oct 1, 2023
2 changes: 2 additions & 0 deletions src/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "core/extension.h"
#include "device.h"
#include <rsutils/string/from.h>
#include <rsutils/subscription.h>

#include <type_traits>
#include <iostream>
Expand All @@ -29,6 +30,7 @@ struct rs2_notification
struct rs2_device
{
std::shared_ptr<librealsense::device_interface> device;
mutable rsutils::subscription playback_status_changed;
};

rs2_error * rs2_create_error(const char* what, const char* name, const char* args, rs2_exception_type type);
Expand Down
24 changes: 12 additions & 12 deletions src/media/playback/playback_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ std::map<uint32_t, std::shared_ptr<playback_sensor>> playback_device::create_pla
//Each sensor will know its capabilities from the sensor_snapshot
auto sensor = std::make_shared<playback_sensor>(*this, sensor_snapshot);

sensor->started += [this](uint32_t id, frame_callback_ptr user_callback) -> void
sensor->on_started( [this](uint32_t id, frame_callback_ptr user_callback) -> void
{
(*m_read_thread)->invoke([this, id, user_callback](dispatcher::cancellable_timer c)
{
Expand All @@ -87,9 +87,9 @@ std::map<uint32_t, std::shared_ptr<playback_sensor>> playback_device::create_pla
}
}
});
};
} );

sensor->stopped += [this](uint32_t id, bool invoke_required) -> void
sensor->on_stopped( [this](uint32_t id, bool invoke_required) -> void
{
//stopped could be called by the user (when calling sensor.stop(), main thread==invoke required) or from the reader_thread when
// reaching eof, or some read error (which means invoke is not required)
Expand Down Expand Up @@ -127,23 +127,23 @@ std::map<uint32_t, std::shared_ptr<playback_sensor>> playback_device::create_pla
{
action();
}
};
} );

sensor->opened += [this](const std::vector<device_serializer::stream_identifier>& filters) -> void
sensor->on_opened( [this](const std::vector<device_serializer::stream_identifier>& filters) -> void
{
(*m_read_thread)->invoke([this, filters](dispatcher::cancellable_timer c)
{
m_reader->enable_stream(filters);
});
};
} );

sensor->closed += [this](const std::vector<device_serializer::stream_identifier>& filters) -> void
sensor->on_closed( [this](const std::vector<device_serializer::stream_identifier>& filters) -> void
{
(*m_read_thread)->invoke([this, filters](dispatcher::cancellable_timer c)
{
m_reader->disable_stream(filters);
});
};
} );

sensors[sensor_snapshot.get_sensor_index()] = sensor;
}
Expand Down Expand Up @@ -357,7 +357,7 @@ void playback_device::pause()
}
}
LOG_DEBUG("Notifying RS2_PLAYBACK_STATUS_PAUSED");
playback_status_changed(RS2_PLAYBACK_STATUS_PAUSED);
playback_status_changed.raise( RS2_PLAYBACK_STATUS_PAUSED );
});
if ((*m_read_thread)->flush() == false)
{
Expand Down Expand Up @@ -513,7 +513,7 @@ void playback_device::stop_internal()
m_reader->reset();
m_prev_timestamp = std::chrono::nanoseconds(0);
catch_up();
playback_status_changed(RS2_PLAYBACK_STATUS_STOPPED);
playback_status_changed.raise( RS2_PLAYBACK_STATUS_STOPPED );
LOG_DEBUG("stop_internal() end");
}

Expand Down Expand Up @@ -589,11 +589,11 @@ void playback_device::try_looping()
//Notify subscribers that playback status changed
if (m_is_paused)
{
playback_status_changed(RS2_PLAYBACK_STATUS_PAUSED);
playback_status_changed.raise( RS2_PLAYBACK_STATUS_PAUSED );
}
else
{
playback_status_changed(RS2_PLAYBACK_STATUS_PLAYING);
playback_status_changed.raise( RS2_PLAYBACK_STATUS_PLAYING );
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/media/playback/playback_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#include "../../archive.h"
#include "../../sensor.h"
#include "playback_sensor.h"

#include <rsutils/lazy.h>
#include <rsutils/signal.h>


namespace librealsense
Expand Down Expand Up @@ -45,7 +47,7 @@ namespace librealsense
bool is_real_time() const;
const std::string& get_file_name() const;
uint64_t get_position() const;
signal<playback_device, rs2_playback_status> playback_status_changed;
rsutils::public_signal< playback_device, rs2_playback_status > playback_status_changed;
std::shared_ptr< const device_info > get_device_info() const override;
std::pair<uint32_t, rs2_extrinsics> get_extrinsics(const stream_interface& stream) const override;
static bool try_extend_snapshot(std::shared_ptr<extension_snapshot>& e, rs2_extension extension_type, void** ext);
Expand Down
14 changes: 8 additions & 6 deletions src/media/playback/playback_sensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ void playback_sensor::open(const stream_profiles& requests)
opened_streams.push_back(f);
}
set_active_streams(requests);
opened(opened_streams);
if( _on_opened )
_on_opened( opened_streams );
}

void playback_sensor::close()
Expand All @@ -125,7 +126,8 @@ void playback_sensor::close()
}
m_dispatchers.clear();
set_active_streams({});
closed(closed_streams);
if( _on_closed )
_on_closed( closed_streams );
}

void playback_sensor::register_notifications_callback(notifications_callback_ptr callback)
Expand Down Expand Up @@ -153,8 +155,8 @@ void playback_sensor::start(frame_callback_ptr callback)
m_user_callback = callback;
}
}
if(was_started)
started(m_sensor_id, callback);
if( was_started && _on_started )
_on_started( m_sensor_id, callback );
}

void playback_sensor::stop(bool invoke_required)
Expand All @@ -176,8 +178,8 @@ void playback_sensor::stop(bool invoke_required)
}
}

if(was_stopped)
stopped(m_sensor_id, invoke_required);
if( was_stopped && _on_stopped )
_on_stopped( m_sensor_id, invoke_required );

}
void playback_sensor::stop()
Expand Down
21 changes: 17 additions & 4 deletions src/media/playback/playback_sensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include "../../sensor.h"
#include "../../types.h"

#include <rsutils/signal.h>


namespace librealsense
{
class playback_sensor : public sensor_interface,
Expand All @@ -18,16 +21,26 @@ namespace librealsense
public options_container,
public std::enable_shared_from_this<playback_sensor>
{
std::function< void( uint32_t, frame_callback_ptr ) > _on_started;
std::function< void( uint32_t, bool ) > _on_stopped;
std::function< void( const std::vector< device_serializer::stream_identifier > & ) > _on_opened;
std::function< void( const std::vector< device_serializer::stream_identifier > & ) > _on_closed;

public:
using frame_interface_callback_t = std::function<void(frame_holder)>;
signal<playback_sensor, uint32_t, frame_callback_ptr> started;
signal<playback_sensor, uint32_t, bool> stopped;
signal<playback_sensor, const std::vector<device_serializer::stream_identifier>& > opened;
signal<playback_sensor, const std::vector<device_serializer::stream_identifier>& > closed;

playback_sensor(device_interface& parent_device, const device_serializer::sensor_snapshot& sensor_description);
virtual ~playback_sensor();

void on_started( std::function< void( uint32_t id, frame_callback_ptr user_callback ) > && callback )
{ _on_started = std::move( callback ); }
void on_stopped( std::function< void( uint32_t id, bool invoke_required ) > && callback )
{ _on_stopped = std::move( callback ); }
void on_opened( std::function< void( const std::vector< device_serializer::stream_identifier > & ) > && callback )
{ _on_opened = std::move( callback ); }
void on_closed( std::function< void( const std::vector< device_serializer::stream_identifier > & ) > && callback )
{ _on_closed = std::move( callback ); }

stream_profiles get_stream_profiles(int tag = profile_tag::PROFILE_TAG_ANY) const override;
void open(const stream_profiles& requests) override;
void close() override;
Expand Down
17 changes: 9 additions & 8 deletions src/media/record/record_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ std::vector<std::shared_ptr<librealsense::record_sensor>> librealsense::record_d
{
auto& live_sensor = device->get_sensor(sensor_index);
auto recording_sensor = std::make_shared<librealsense::record_sensor>(*this, live_sensor);
m_on_notification_token = recording_sensor->on_notification += [this, recording_sensor, sensor_index](const notification& n) { write_notification(sensor_index, n); };
recording_sensor->on_notification(
[this, recording_sensor, sensor_index]( const notification & n )
{ write_notification( sensor_index, n ); } );
auto on_error = [recording_sensor](const std::string& s) {recording_sensor->stop_with_error(s); };
m_on_frame_token = recording_sensor->on_frame += [this, recording_sensor, sensor_index, on_error](frame_holder f) {
write_data(sensor_index, std::move(f), on_error);
};
m_on_extension_change_token = recording_sensor->on_extension_change += [this, recording_sensor, sensor_index, on_error](rs2_extension ext, std::shared_ptr<extension_snapshot> snapshot) { write_sensor_extension_snapshot(sensor_index, ext, snapshot, on_error); };
recording_sensor->on_frame( [this, recording_sensor, sensor_index, on_error]( frame_holder f )
{ write_data( sensor_index, std::move( f ), on_error ); } );
recording_sensor->on_extension_change(
[this, recording_sensor, sensor_index, on_error]( rs2_extension ext,
std::shared_ptr< extension_snapshot > snapshot )
{ write_sensor_extension_snapshot( sensor_index, ext, snapshot, on_error ); } );
recording_sensor->init(); //Calling init AFTER register to the above events
record_sensors.emplace_back(recording_sensor);
}
Expand All @@ -55,9 +59,6 @@ librealsense::record_device::~record_device()
{
for (auto&& s : m_sensors)
{
s->on_notification -= m_on_notification_token;
s->on_frame -= m_on_frame_token;
s->on_extension_change -= m_on_extension_change_token;
s->disable_recording();
}
if ((*m_write_thread)->flush() == false)
Expand Down
3 changes: 0 additions & 3 deletions src/media/record/record_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ namespace librealsense
std::mutex m_mutex;
bool m_is_recording;
std::once_flag m_first_frame_flag;
int m_on_notification_token;
int m_on_frame_token;
int m_on_extension_change_token;
uint64_t m_cached_data_size;
std::once_flag m_first_call_flag;
void initialize_recording();
Expand Down
11 changes: 6 additions & 5 deletions src/media/record/record_sensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void librealsense::record_sensor::register_notifications_callback(notifications_
{
if (m_is_recording)
{
on_notification(*(n->_notification));
_on_notification( *( n->_notification ) );
}
if (m_user_notification_callback)
{
Expand Down Expand Up @@ -219,7 +219,7 @@ void librealsense::record_sensor::record_snapshot(rs2_extension extension_type,
if(m_is_recording)
{
//Send to recording thread
on_extension_change(extension_type, ext_snapshot);
_on_extension_change( extension_type, ext_snapshot );
}
}

Expand Down Expand Up @@ -250,7 +250,7 @@ void record_sensor::record_frame(frame_holder frame)
if(m_is_recording)
{
//Send to recording thread
on_frame(std::move(frame));
_on_frame( std::move( frame ) );
}
}

Expand Down Expand Up @@ -373,9 +373,10 @@ void record_sensor::wrap_streams()
else
throw std::runtime_error("Unsupported stream");

on_extension_change(extension_type, std::dynamic_pointer_cast<extension_snapshot>(snapshot));
if( m_is_recording )
_on_extension_change( extension_type, std::dynamic_pointer_cast< extension_snapshot >( snapshot ) );

m_recorded_streams_ids.insert(id);
m_recorded_streams_ids.insert(id);
}
}
}
16 changes: 13 additions & 3 deletions src/media/record/record_sensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ namespace librealsense
record_sensor(device_interface& device,
sensor_interface& sensor);
virtual ~record_sensor();

void on_notification( std::function< void( const notification & ) > && callback )
{ _on_notification = std::move( callback ); }
void on_frame( std::function< void( frame_holder ) > && callback )
{ _on_frame = std::move( callback ); }
void on_extension_change( std::function< void( rs2_extension, std::shared_ptr< extension_snapshot > ) > && callback )
{ _on_extension_change = std::move( callback ); }

void init();
stream_profiles get_stream_profiles(int tag = profile_tag::PROFILE_TAG_ANY) const override;
void open(const stream_profiles& requests) override;
Expand All @@ -45,14 +53,15 @@ namespace librealsense
stream_profiles const & get_raw_stream_profiles() const override;
int register_before_streaming_changes_callback(std::function<void(bool)> callback) override;
void unregister_before_start_callback(int token) override;
signal<record_sensor, const notification&> on_notification;
signal<record_sensor, frame_holder> on_frame;
signal<record_sensor, rs2_extension, std::shared_ptr<extension_snapshot>> on_extension_change;
void stop_with_error(const std::string& message);
void disable_recording();
virtual processing_blocks get_recommended_processing_blocks() const override;

private /*methods*/:
std::function< void( const notification & ) > _on_notification;
std::function< void( frame_holder ) > _on_frame;
std::function< void( rs2_extension, std::shared_ptr< extension_snapshot > ) > _on_extension_change;

template <typename T> void record_snapshot(rs2_extension extension_type, const librealsense::recordable<T>& snapshot);
void record_frame(frame_holder holder);
void enable_sensor_hooks();
Expand All @@ -65,6 +74,7 @@ namespace librealsense
void wrap_streams();

private /*members*/:

sensor_interface& m_sensor;
std::set<int> m_recorded_streams_ids;
std::set<rs2_option> m_recording_options;
Expand Down
6 changes: 3 additions & 3 deletions src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ namespace librealsense
auto dev = profile->get_device();
if (auto playback = As<librealsense::playback_device>(dev))
{
_playback_stopped_token = playback->playback_status_changed += [this, callbacks](rs2_playback_status status)
_playback_stopped_token = playback->playback_status_changed.subscribe( [this, callbacks](rs2_playback_status status)
{
if (status == RS2_PLAYBACK_STATUS_STOPPED)
{
Expand All @@ -111,7 +111,7 @@ namespace librealsense
}
});
}
};
} );
}

_dispatcher.start();
Expand Down Expand Up @@ -142,7 +142,7 @@ namespace librealsense
auto dev = _active_profile->get_device();
if (auto playback = As<librealsense::playback_device>(dev))
{
playback->playback_status_changed -= _playback_stopped_token;
_playback_stopped_token.cancel();
}
_active_profile->_multistream.stop();
_active_profile->_multistream.close();
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace librealsense
std::shared_ptr<profile> unsafe_get_active_profile() const;

std::shared_ptr<librealsense::context> _ctx;
int _playback_stopped_token = -1;
rsutils::subscription _playback_stopped_token;
dispatcher _dispatcher;

std::unique_ptr<syncer_process_unit> _syncer;
Expand Down
Loading