Skip to content

Commit

Permalink
[core] registration-loopback-logic-fixed-for-service-registrations (e…
Browse files Browse the repository at this point in the history
…clipse-ecal#1667)

* registration sample processing (local/network mode, loopback not loopback) handle identically for pub/sub and client/server registration

Co-authored-by: Kerstin Keller <KerstinKeller@users.noreply.github.com>
  • Loading branch information
rex-schilasky and KerstinKeller authored Jul 22, 2024
1 parent 3441e62 commit 21535da
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 98 deletions.
204 changes: 108 additions & 96 deletions ecal/core/src/registration/ecal_registration_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ namespace eCAL
{
if (!m_created) return false;

if (!AcceptRegistrationSample(sample_))
{
Logging::Log(log_level_debug1, "CRegistrationReceiver::ApplySample : Incoming sample discarded");
return false;
}

// forward all registration samples to outside "customer" (e.g. monitoring, descgate)
{
const std::lock_guard<std::mutex> lock(m_callback_custom_apply_sample_map_mtx);
Expand All @@ -139,17 +145,9 @@ namespace eCAL
}
}

std::string reg_sample;
if (m_callback_pub
|| m_callback_sub
|| m_callback_service
|| m_callback_client
|| m_callback_process
)
{
SerializeToBuffer(sample_, reg_sample);
}

// forward registration to defined gates
// and store user registration callback
RegistrationCallbackT reg_callback(nullptr);
switch (sample_.cmd_type)
{
case bct_none:
Expand All @@ -158,38 +156,43 @@ namespace eCAL
case bct_reg_process:
case bct_unreg_process:
// unregistration event not implemented currently
if (m_callback_process) m_callback_process(reg_sample.c_str(), static_cast<int>(reg_sample.size()));
reg_callback = m_callback_process;
break;
#if ECAL_CORE_SERVICE
case bct_reg_service:
if (g_clientgate() != nullptr) g_clientgate()->ApplyServiceRegistration(sample_);
if (m_callback_service) m_callback_service(reg_sample.c_str(), static_cast<int>(reg_sample.size()));
break;
case bct_unreg_service:
// current client implementation doesn't need that information
if (m_callback_service) m_callback_service(reg_sample.c_str(), static_cast<int>(reg_sample.size()));
ApplyServiceRegistration(sample_);
reg_callback = m_callback_service;
break;
#endif
case bct_reg_client:
case bct_unreg_client:
// current service implementation doesn't need that information
if (m_callback_client) m_callback_client(reg_sample.c_str(), static_cast<int>(reg_sample.size()));
// current client implementation doesn't need that information
reg_callback = m_callback_client;
break;
case bct_reg_subscriber:
case bct_unreg_subscriber:
ApplySubscriberRegistration(sample_);
if (m_callback_sub) m_callback_sub(reg_sample.c_str(), static_cast<int>(reg_sample.size()));
reg_callback = m_callback_sub;
break;
case bct_reg_publisher:
case bct_unreg_publisher:
ApplyPublisherRegistration(sample_);
if (m_callback_pub) m_callback_pub(reg_sample.c_str(), static_cast<int>(reg_sample.size()));
reg_callback = m_callback_pub;
break;
default:
Logging::Log(log_level_debug1, "CRegistrationReceiver::ApplySample : unknown sample type");
break;
}

// call user registration callback
if (reg_callback)
{
std::string reg_sample;
if (SerializeToBuffer(sample_, reg_sample))
{
reg_callback(reg_sample.c_str(), static_cast<int>(reg_sample.size()));
}
}

return true;
}

Expand Down Expand Up @@ -243,99 +246,89 @@ namespace eCAL
}
}

void CRegistrationReceiver::ApplySubscriberRegistration(const Registration::Sample& sample_)
void CRegistrationReceiver::ApplyServiceRegistration(const eCAL::Registration::Sample& sample_)
{
#if ECAL_CORE_PUBLISHER
if (g_pubgate() == nullptr) return;
#if ECAL_CORE_SERVICE
if (g_clientgate() == nullptr) return;

// process registrations from same host group
if (IsHostGroupMember(sample_))
switch (sample_.cmd_type)
{
// do not register local entities, only if loop back flag is set true
if (m_loopback || (sample_.topic.pid != Process::GetProcessID()))
{
switch (sample_.cmd_type)
{
case bct_reg_subscriber:
g_pubgate()->ApplySubRegistration(sample_);
break;
case bct_unreg_subscriber:
g_pubgate()->ApplySubUnregistration(sample_);
break;
default:
break;
}
}
// current service implementation processes registration information only (not the unregistration)
case bct_reg_service:
g_clientgate()->ApplyServiceRegistration(sample_);
break;
default:
break;
}
// process external registrations
else
#endif
}

void CRegistrationReceiver::ApplySubscriberRegistration(const Registration::Sample& sample_)
{
#if ECAL_CORE_PUBLISHER
if (g_pubgate() == nullptr) return;

switch (sample_.cmd_type)
{
if (m_network)
{
switch (sample_.cmd_type)
{
case bct_reg_subscriber:
g_pubgate()->ApplySubRegistration(sample_);
break;
case bct_unreg_subscriber:
g_pubgate()->ApplySubUnregistration(sample_);
break;
default:
break;
}
}
case bct_reg_subscriber:
g_pubgate()->ApplySubRegistration(sample_);
break;
case bct_unreg_subscriber:
g_pubgate()->ApplySubUnregistration(sample_);
break;
default:
break;
}
#endif
}

void CRegistrationReceiver::ApplyPublisherRegistration(const Registration::Sample& sample_)
{
#if ECAL_CORE_SUBSCRIBER
if (g_subgate() == nullptr) return;
if (g_subgate() == nullptr) return;

// process registrations from same host group
if (IsHostGroupMember(sample_))
{
// do not register local entities, only if loop back flag is set true
if (m_loopback || (sample_.topic.pid != Process::GetProcessID()))
{
switch (sample_.cmd_type)
{
case bct_reg_publisher:
g_subgate()->ApplyPubRegistration(sample_);
break;
case bct_unreg_publisher:
g_subgate()->ApplyPubUnregistration(sample_);
break;
default:
break;
}
}
}
// process external registrations
else
switch (sample_.cmd_type)
{
if (m_network)
{
switch (sample_.cmd_type)
{
case bct_reg_publisher:
g_subgate()->ApplyPubRegistration(sample_);
break;
case bct_unreg_publisher:
g_subgate()->ApplyPubUnregistration(sample_);
break;
default:
break;
}
}
case bct_reg_publisher:
g_subgate()->ApplyPubRegistration(sample_);
break;
case bct_unreg_publisher:
g_subgate()->ApplyPubUnregistration(sample_);
break;
default:
break;
}
#endif
}

bool CRegistrationReceiver::IsHostGroupMember(const Registration::Sample& sample_)
{
const std::string& sample_host_group_name = sample_.topic.hgname.empty() ? sample_.topic.hname : sample_.topic.hgname;
std::string host_group_name;
std::string host_name;
switch (sample_.cmd_type)
{
case bct_reg_publisher:
case bct_unreg_publisher:
case bct_reg_subscriber:
case bct_unreg_subscriber:
host_group_name = sample_.topic.hgname;
host_name = sample_.topic.hname;
break;
case bct_reg_service:
case bct_unreg_service:
//host_group_name = sample_.service.hgname; // TODO: we need to add hgname attribute to services
host_name = sample_.service.hname;
break;
case bct_reg_client:
case bct_unreg_client:
//host_group_name = sample_.client.hgname; // TODO: we need to add hgname attribute to clients
host_name = sample_.client.hname;
break;
default:
break;
}

const std::string& sample_host_group_name = host_group_name.empty() ? host_name : host_group_name;

if (sample_host_group_name.empty() || m_host_group_name.empty())
return false;
Expand All @@ -345,6 +338,25 @@ namespace eCAL
return true;
}

bool CRegistrationReceiver::AcceptRegistrationSample(const Registration::Sample& sample_)
{
// check if the sample is from the same host group
if (IsHostGroupMember(sample_))
{
// register if the sample is from another process
// or if loopback mode is enabled
return m_loopback || (sample_.topic.pid != Process::GetProcessID());
}
else
{
// if the sample is from an external host, register only if network mode is enabled
return m_network;
}

// do not process the registration
return false;
}

void CRegistrationReceiver::SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_)
{
const std::lock_guard<std::mutex> lock(m_callback_custom_apply_sample_map_mtx);
Expand Down
3 changes: 3 additions & 0 deletions ecal/core/src/registration/ecal_registration_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ namespace eCAL
void RemCustomApplySampleCallback(const std::string& customer_);

protected:
void ApplyServiceRegistration(const eCAL::Registration::Sample& sample_);

void ApplySubscriberRegistration(const eCAL::Registration::Sample& sample_);
void ApplyPublisherRegistration(const eCAL::Registration::Sample& sample_);

bool IsHostGroupMember(const eCAL::Registration::Sample& sample_);
bool AcceptRegistrationSample(const Registration::Sample& sample_);

static std::atomic<bool> m_created;
bool m_network;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,9 @@ TEST(core_cpp_clientserver_proto, ProtoCallback)
// initialize eCAL API
eCAL::Initialize(0, nullptr, "clientserver proto callback test");

// enable loop back communication in the same thread
eCAL::Util::EnableLoopback(true);

// create MathService server
std::shared_ptr<MathServiceImpl> math_service_impl = std::make_shared<MathServiceImpl>();
eCAL::protobuf::CServiceServer<MathService> math_server(math_service_impl);
Expand Down Expand Up @@ -160,6 +163,9 @@ TEST(core_cpp_clientserver_proto, ProtoBlocking)
// initialize eCAL API
eCAL::Initialize(0, nullptr, "clientserver proto blocking test");

// enable loop back communication in the same thread
eCAL::Util::EnableLoopback(true);

// create PingService server
std::shared_ptr<PingServiceImpl> ping_service_impl = std::make_shared<PingServiceImpl>();
eCAL::protobuf::CServiceServer<PingService> ping_server(ping_service_impl);
Expand Down
Loading

0 comments on commit 21535da

Please sign in to comment.