-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka: upstream kafka facade in mesh-filter #17783
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
70 changes: 70 additions & 0 deletions
70
contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
#pragma once | ||
|
||
#include <map> | ||
#include <memory> | ||
|
||
#include "envoy/common/pure.h" | ||
|
||
#include "absl/strings/string_view.h" | ||
|
||
namespace Envoy { | ||
namespace Extensions { | ||
namespace NetworkFilters { | ||
namespace Kafka { | ||
namespace Mesh { | ||
|
||
// Trivial memento that keeps the information about how given request was delivered: | ||
// in case of success this means offset (if acks > 0), or error code. | ||
struct DeliveryMemento { | ||
|
||
// Kafka producer error code. | ||
const int32_t error_code_; | ||
|
||
// Offset (only meaningful if error code is equal to 0). | ||
const int64_t offset_; | ||
}; | ||
|
||
// Callback for objects that want to be notified that record delivery has been finished. | ||
class ProduceFinishCb { | ||
public: | ||
virtual ~ProduceFinishCb() = default; | ||
|
||
// Attempt to process this delivery. | ||
// @returns true if given callback is related to this delivery | ||
virtual bool accept(const DeliveryMemento& memento) PURE; | ||
}; | ||
|
||
using ProduceFinishCbSharedPtr = std::shared_ptr<ProduceFinishCb>; | ||
|
||
/** | ||
* Filter facing interface. | ||
* A thing that takes records and sends them to upstream Kafka. | ||
*/ | ||
class KafkaProducer { | ||
public: | ||
virtual ~KafkaProducer() = default; | ||
|
||
// Sends given record (key, value) to Kafka (topic, partition). | ||
// When delivery is finished, it notifies the callback provided with corresponding delivery data | ||
// (error code, offset). | ||
virtual void send(const ProduceFinishCbSharedPtr origin, const std::string& topic, | ||
const int32_t partition, const absl::string_view key, | ||
const absl::string_view value) PURE; | ||
|
||
// Impl leakage: real implementations of Kafka Producer need to stop a monitoring thread, then | ||
// they can close the producer. Because the polling thread should not be interrupted, we just mark | ||
// it as finished, and it's going to notice that change on the next iteration. | ||
// Theoretically we do not need to do this and leave it all to destructor, but then closing N | ||
// producers would require doing that in sequence, while we can optimize it somewhat (so we just | ||
// wait for the slowest one). | ||
// See https://github.com/edenhill/librdkafka/issues/2972 | ||
virtual void markFinished() PURE; | ||
}; | ||
|
||
using KafkaProducerPtr = std::unique_ptr<KafkaProducer>; | ||
|
||
} // namespace Mesh | ||
} // namespace Kafka | ||
} // namespace NetworkFilters | ||
} // namespace Extensions | ||
} // namespace Envoy |
24 changes: 24 additions & 0 deletions
24
contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.cc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h" | ||
|
||
namespace Envoy { | ||
namespace Extensions { | ||
namespace NetworkFilters { | ||
namespace Kafka { | ||
namespace Mesh { | ||
|
||
// Just a placeholder implementation. | ||
|
||
PlaceholderKafkaProducer::PlaceholderKafkaProducer(Event::Dispatcher&, Thread::ThreadFactory&, | ||
const RawKafkaProducerConfig&){}; | ||
|
||
void PlaceholderKafkaProducer::send(const ProduceFinishCbSharedPtr, const std::string&, | ||
const int32_t, const absl::string_view, | ||
const absl::string_view){}; | ||
|
||
void PlaceholderKafkaProducer::markFinished(){}; | ||
|
||
} // namespace Mesh | ||
} // namespace Kafka | ||
} // namespace NetworkFilters | ||
} // namespace Extensions | ||
} // namespace Envoy |
36 changes: 36 additions & 0 deletions
36
contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
#pragma once | ||
|
||
#include "envoy/event/dispatcher.h" | ||
|
||
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h" | ||
|
||
namespace Envoy { | ||
namespace Extensions { | ||
namespace NetworkFilters { | ||
namespace Kafka { | ||
namespace Mesh { | ||
|
||
using RawKafkaProducerConfig = std::map<std::string, std::string>; | ||
|
||
// Placeholder for proper Kafka Producer object. | ||
// It will also keep a reference to a dedicated thread (that's why we need a factory) that's going | ||
// to be polling for delivery notifications. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
class PlaceholderKafkaProducer : public KafkaProducer { | ||
public: | ||
PlaceholderKafkaProducer(Event::Dispatcher& dispatcher, Thread::ThreadFactory& thread_factory, | ||
const RawKafkaProducerConfig& configuration); | ||
|
||
// KafkaProducer | ||
void send(const ProduceFinishCbSharedPtr origin, const std::string& topic, | ||
const int32_t partition, const absl::string_view key, | ||
const absl::string_view value) override; | ||
|
||
// KafkaProducer | ||
void markFinished() override; | ||
}; | ||
|
||
} // namespace Mesh | ||
} // namespace Kafka | ||
} // namespace NetworkFilters | ||
} // namespace Extensions | ||
} // namespace Envoy |
100 changes: 100 additions & 0 deletions
100
contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.cc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h" | ||
|
||
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h" | ||
|
||
namespace Envoy { | ||
namespace Extensions { | ||
namespace NetworkFilters { | ||
namespace Kafka { | ||
namespace Mesh { | ||
|
||
/** | ||
* Responsible for keeping a map of upstream-facing Kafka clients. | ||
*/ | ||
class ThreadLocalKafkaFacade : public ThreadLocal::ThreadLocalObject, | ||
private Logger::Loggable<Logger::Id::kafka> { | ||
public: | ||
ThreadLocalKafkaFacade(const UpstreamKafkaConfiguration& configuration, | ||
Event::Dispatcher& dispatcher, Thread::ThreadFactory& thread_factory); | ||
~ThreadLocalKafkaFacade() override; | ||
|
||
KafkaProducer& getProducerForTopic(const std::string& topic); | ||
|
||
size_t getProducerCountForTest() const; | ||
|
||
private: | ||
// Mutates 'cluster_to_kafka_client_'. | ||
KafkaProducer& registerNewProducer(const ClusterConfig& cluster_config); | ||
|
||
const UpstreamKafkaConfiguration& configuration_; | ||
Event::Dispatcher& dispatcher_; | ||
Thread::ThreadFactory& thread_factory_; | ||
|
||
std::map<std::string, KafkaProducerPtr> cluster_to_kafka_client_; | ||
}; | ||
|
||
ThreadLocalKafkaFacade::ThreadLocalKafkaFacade(const UpstreamKafkaConfiguration& configuration, | ||
Event::Dispatcher& dispatcher, | ||
Thread::ThreadFactory& thread_factory) | ||
: configuration_{configuration}, dispatcher_{dispatcher}, thread_factory_{thread_factory} {} | ||
|
||
ThreadLocalKafkaFacade::~ThreadLocalKafkaFacade() { | ||
// Because the producers take a moment to shutdown, we mark their monitoring threads as shut down | ||
// before the destructors get called. | ||
for (auto& entry : cluster_to_kafka_client_) { | ||
entry.second->markFinished(); | ||
} | ||
} | ||
|
||
KafkaProducer& ThreadLocalKafkaFacade::getProducerForTopic(const std::string& topic) { | ||
const absl::optional<ClusterConfig> cluster_config = | ||
configuration_.computeClusterConfigForTopic(topic); | ||
if (cluster_config) { | ||
const auto it = cluster_to_kafka_client_.find(cluster_config->name_); | ||
// Return client already present or create new one and register it. | ||
return (cluster_to_kafka_client_.end() == it) ? registerNewProducer(*cluster_config) | ||
: *(it->second); | ||
} else { | ||
throw EnvoyException(absl::StrCat("cannot compute target producer for topic: ", topic)); | ||
} | ||
} | ||
|
||
KafkaProducer& ThreadLocalKafkaFacade::registerNewProducer(const ClusterConfig& cluster_config) { | ||
ENVOY_LOG(debug, "Registering new Kafka producer for cluster [{}]", cluster_config.name_); | ||
KafkaProducerPtr new_producer = std::make_unique<PlaceholderKafkaProducer>( | ||
dispatcher_, thread_factory_, cluster_config.upstream_producer_properties_); | ||
auto result = cluster_to_kafka_client_.emplace(cluster_config.name_, std::move(new_producer)); | ||
return *(result.first->second); | ||
} | ||
|
||
size_t ThreadLocalKafkaFacade::getProducerCountForTest() const { | ||
return cluster_to_kafka_client_.size(); | ||
} | ||
|
||
UpstreamKafkaFacadeImpl::UpstreamKafkaFacadeImpl(const UpstreamKafkaConfiguration& configuration, | ||
ThreadLocal::SlotAllocator& slot_allocator, | ||
Thread::ThreadFactory& thread_factory) | ||
: tls_{slot_allocator.allocateSlot()} { | ||
|
||
ThreadLocal::Slot::InitializeCb cb = | ||
[&configuration, | ||
&thread_factory](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { | ||
return std::make_shared<ThreadLocalKafkaFacade>(configuration, dispatcher, thread_factory); | ||
}; | ||
tls_->set(cb); | ||
} | ||
|
||
// Return Producer instance that is local to given thread, via ThreadLocalKafkaFacade. | ||
KafkaProducer& UpstreamKafkaFacadeImpl::getProducerForTopic(const std::string& topic) { | ||
return tls_->getTyped<ThreadLocalKafkaFacade>().getProducerForTopic(topic); | ||
} | ||
|
||
size_t UpstreamKafkaFacadeImpl::getProducerCountForTest() const { | ||
return tls_->getTyped<ThreadLocalKafkaFacade>().getProducerCountForTest(); | ||
} | ||
|
||
} // namespace Mesh | ||
} // namespace Kafka | ||
} // namespace NetworkFilters | ||
} // namespace Extensions | ||
} // namespace Envoy |
58 changes: 58 additions & 0 deletions
58
contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
#pragma once | ||
|
||
#include "envoy/thread/thread.h" | ||
#include "envoy/thread_local/thread_local.h" | ||
|
||
#include "source/common/common/logger.h" | ||
|
||
#include "contrib/kafka/filters/network/source/mesh/upstream_config.h" | ||
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h" | ||
|
||
namespace Envoy { | ||
namespace Extensions { | ||
namespace NetworkFilters { | ||
namespace Kafka { | ||
namespace Mesh { | ||
|
||
/** | ||
* Provides access to upstream Kafka clients. | ||
*/ | ||
class UpstreamKafkaFacade { | ||
public: | ||
virtual ~UpstreamKafkaFacade() = default; | ||
|
||
/** | ||
* Returns a Kafka producer that points an upstream Kafka cluster that is supposed to receive | ||
* messages for the given topic. | ||
*/ | ||
virtual KafkaProducer& getProducerForTopic(const std::string& topic) PURE; | ||
}; | ||
|
||
using UpstreamKafkaFacadeSharedPtr = std::shared_ptr<UpstreamKafkaFacade>; | ||
|
||
/** | ||
* Provides access to upstream Kafka clients. | ||
* This is done by using thread-local maps of cluster to producer. | ||
* We are going to have one Kafka producer per upstream cluster, per Envoy worker thread. | ||
*/ | ||
class UpstreamKafkaFacadeImpl : public UpstreamKafkaFacade, | ||
private Logger::Loggable<Logger::Id::kafka> { | ||
public: | ||
UpstreamKafkaFacadeImpl(const UpstreamKafkaConfiguration& configuration, | ||
ThreadLocal::SlotAllocator& slot_allocator, | ||
Thread::ThreadFactory& thread_factory); | ||
|
||
// UpstreamKafkaFacade | ||
KafkaProducer& getProducerForTopic(const std::string& topic) override; | ||
|
||
size_t getProducerCountForTest() const; | ||
|
||
private: | ||
ThreadLocal::SlotPtr tls_; | ||
}; | ||
|
||
} // namespace Mesh | ||
} // namespace Kafka | ||
} // namespace NetworkFilters | ||
} // namespace Extensions | ||
} // namespace Envoy |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'marking' as finished is just a simple volatile boolean
in other words:
invoking this markFinished() implementation allows us to wait shorter time if we want to destroy multiple objects with such a destructor because their thread cannot be interrupted here
and when are we closing multiple Kafka producers at the same time? when we shut down envoy, and the whole facade gets destroyed with the
cluster -> producer
mapsso TLDR it implies that facade destruction should take only about ${poll_duration} (I should make that
1000ms
configurable)