Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: upstream kafka facade in mesh-filter #17783

Merged
merged 4 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,52 @@ envoy_cc_library(
deps = [
"//contrib/kafka/filters/network/source:kafka_response_lib",
"//contrib/kafka/filters/network/source:tagged_fields_lib",
],
)

envoy_cc_library(
name = "upstream_kafka_facade_lib",
srcs = [
"upstream_kafka_facade.cc",
],
hdrs = [
"upstream_kafka_facade.h",
],
tags = ["skip_on_windows"],
deps = [
":upstream_config_lib",
":upstream_kafka_client_impl_lib",
":upstream_kafka_client_lib",
"//envoy/thread:thread_interface",
"//envoy/thread_local:thread_local_interface",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "upstream_kafka_client_lib",
srcs = [
],
hdrs = [
"upstream_kafka_client.h",
],
tags = ["skip_on_windows"],
deps = [
],
)

envoy_cc_library(
name = "upstream_kafka_client_impl_lib",
srcs = [
"upstream_kafka_client_impl.cc",
],
hdrs = [
"upstream_kafka_client_impl.h",
],
tags = ["skip_on_windows"],
deps = [
":upstream_kafka_client_lib",
"//envoy/event:dispatcher_interface",
"//source/common/common:minimal_logger_lib",
],
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#pragma once

#include <map>
#include <memory>

#include "envoy/common/pure.h"

#include "absl/strings/string_view.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// Trivial memento that keeps the information about how given request was delivered:
// in case of success this means offset (if acks > 0), or error code.
struct DeliveryMemento {

// Kafka producer error code.
const int32_t error_code_;

// Offset (only meaningful if error code is equal to 0).
const int64_t offset_;
};

// Callback for objects that want to be notified that record delivery has been finished.
class ProduceFinishCb {
public:
virtual ~ProduceFinishCb() = default;

// Attempt to process this delivery.
// @returns true if given callback is related to this delivery
virtual bool accept(const DeliveryMemento& memento) PURE;
};

using ProduceFinishCbSharedPtr = std::shared_ptr<ProduceFinishCb>;

/**
* Filter facing interface.
* A thing that takes records and sends them to upstream Kafka.
*/
class KafkaProducer {
public:
virtual ~KafkaProducer() = default;

// Sends given record (key, value) to Kafka (topic, partition).
// When delivery is finished, it notifies the callback provided with corresponding delivery data
// (error code, offset).
virtual void send(const ProduceFinishCbSharedPtr origin, const std::string& topic,
const int32_t partition, const absl::string_view key,
const absl::string_view value) PURE;

// Impl leakage: real implementations of Kafka Producer need to stop a monitoring thread, then
// they can close the producer. Because the polling thread should not be interrupted, we just mark
// it as finished, and it's going to notice that change on the next iteration.
Copy link
Contributor Author

@adamkotwasinski adamkotwasinski Aug 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'marking' as finished is just a simple volatile boolean

in other words:
invoking this markFinished() implementation allows us to wait shorter time if we want to destroy multiple objects with such a destructor because their thread cannot be interrupted here

and when are we closing multiple Kafka producers at the same time? when we shut down envoy, and the whole facade gets destroyed with the cluster -> producer maps
so TLDR it implies that facade destruction should take only about ${poll_duration} (I should make that 1000ms configurable)

// Theoretically we do not need to do this and leave it all to destructor, but then closing N
// producers would require doing that in sequence, while we can optimize it somewhat (so we just
// wait for the slowest one).
// See https://github.com/edenhill/librdkafka/issues/2972
virtual void markFinished() PURE;
};

using KafkaProducerPtr = std::unique_ptr<KafkaProducer>;

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// Just a placeholder implementation.

PlaceholderKafkaProducer::PlaceholderKafkaProducer(Event::Dispatcher&, Thread::ThreadFactory&,
const RawKafkaProducerConfig&){};

void PlaceholderKafkaProducer::send(const ProduceFinishCbSharedPtr, const std::string&,
const int32_t, const absl::string_view,
const absl::string_view){};

void PlaceholderKafkaProducer::markFinished(){};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include "envoy/event/dispatcher.h"

#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

using RawKafkaProducerConfig = std::map<std::string, std::string>;

// Placeholder for proper Kafka Producer object.
// It will also keep a reference to a dedicated thread (that's why we need a factory) that's going
// to be polling for delivery notifications.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class PlaceholderKafkaProducer : public KafkaProducer {
public:
PlaceholderKafkaProducer(Event::Dispatcher& dispatcher, Thread::ThreadFactory& thread_factory,
const RawKafkaProducerConfig& configuration);

// KafkaProducer
void send(const ProduceFinishCbSharedPtr origin, const std::string& topic,
const int32_t partition, const absl::string_view key,
const absl::string_view value) override;

// KafkaProducer
void markFinished() override;
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
100 changes: 100 additions & 0 deletions contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h"

#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Responsible for keeping a map of upstream-facing Kafka clients.
*/
class ThreadLocalKafkaFacade : public ThreadLocal::ThreadLocalObject,
private Logger::Loggable<Logger::Id::kafka> {
public:
ThreadLocalKafkaFacade(const UpstreamKafkaConfiguration& configuration,
Event::Dispatcher& dispatcher, Thread::ThreadFactory& thread_factory);
~ThreadLocalKafkaFacade() override;

KafkaProducer& getProducerForTopic(const std::string& topic);

size_t getProducerCountForTest() const;

private:
// Mutates 'cluster_to_kafka_client_'.
KafkaProducer& registerNewProducer(const ClusterConfig& cluster_config);

const UpstreamKafkaConfiguration& configuration_;
Event::Dispatcher& dispatcher_;
Thread::ThreadFactory& thread_factory_;

std::map<std::string, KafkaProducerPtr> cluster_to_kafka_client_;
};

ThreadLocalKafkaFacade::ThreadLocalKafkaFacade(const UpstreamKafkaConfiguration& configuration,
Event::Dispatcher& dispatcher,
Thread::ThreadFactory& thread_factory)
: configuration_{configuration}, dispatcher_{dispatcher}, thread_factory_{thread_factory} {}

ThreadLocalKafkaFacade::~ThreadLocalKafkaFacade() {
// Because the producers take a moment to shutdown, we mark their monitoring threads as shut down
// before the destructors get called.
for (auto& entry : cluster_to_kafka_client_) {
entry.second->markFinished();
}
}

KafkaProducer& ThreadLocalKafkaFacade::getProducerForTopic(const std::string& topic) {
const absl::optional<ClusterConfig> cluster_config =
configuration_.computeClusterConfigForTopic(topic);
if (cluster_config) {
const auto it = cluster_to_kafka_client_.find(cluster_config->name_);
// Return client already present or create new one and register it.
return (cluster_to_kafka_client_.end() == it) ? registerNewProducer(*cluster_config)
: *(it->second);
} else {
throw EnvoyException(absl::StrCat("cannot compute target producer for topic: ", topic));
}
}

KafkaProducer& ThreadLocalKafkaFacade::registerNewProducer(const ClusterConfig& cluster_config) {
ENVOY_LOG(debug, "Registering new Kafka producer for cluster [{}]", cluster_config.name_);
KafkaProducerPtr new_producer = std::make_unique<PlaceholderKafkaProducer>(
dispatcher_, thread_factory_, cluster_config.upstream_producer_properties_);
auto result = cluster_to_kafka_client_.emplace(cluster_config.name_, std::move(new_producer));
return *(result.first->second);
}

size_t ThreadLocalKafkaFacade::getProducerCountForTest() const {
return cluster_to_kafka_client_.size();
}

UpstreamKafkaFacadeImpl::UpstreamKafkaFacadeImpl(const UpstreamKafkaConfiguration& configuration,
ThreadLocal::SlotAllocator& slot_allocator,
Thread::ThreadFactory& thread_factory)
: tls_{slot_allocator.allocateSlot()} {

ThreadLocal::Slot::InitializeCb cb =
[&configuration,
&thread_factory](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<ThreadLocalKafkaFacade>(configuration, dispatcher, thread_factory);
};
tls_->set(cb);
}

// Return Producer instance that is local to given thread, via ThreadLocalKafkaFacade.
KafkaProducer& UpstreamKafkaFacadeImpl::getProducerForTopic(const std::string& topic) {
return tls_->getTyped<ThreadLocalKafkaFacade>().getProducerForTopic(topic);
}

size_t UpstreamKafkaFacadeImpl::getProducerCountForTest() const {
return tls_->getTyped<ThreadLocalKafkaFacade>().getProducerCountForTest();
}

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "envoy/thread/thread.h"
#include "envoy/thread_local/thread_local.h"

#include "source/common/common/logger.h"

#include "contrib/kafka/filters/network/source/mesh/upstream_config.h"
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Provides access to upstream Kafka clients.
*/
class UpstreamKafkaFacade {
public:
virtual ~UpstreamKafkaFacade() = default;

/**
* Returns a Kafka producer that points an upstream Kafka cluster that is supposed to receive
* messages for the given topic.
*/
virtual KafkaProducer& getProducerForTopic(const std::string& topic) PURE;
};

using UpstreamKafkaFacadeSharedPtr = std::shared_ptr<UpstreamKafkaFacade>;

/**
* Provides access to upstream Kafka clients.
* This is done by using thread-local maps of cluster to producer.
* We are going to have one Kafka producer per upstream cluster, per Envoy worker thread.
*/
class UpstreamKafkaFacadeImpl : public UpstreamKafkaFacade,
private Logger::Loggable<Logger::Id::kafka> {
public:
UpstreamKafkaFacadeImpl(const UpstreamKafkaConfiguration& configuration,
ThreadLocal::SlotAllocator& slot_allocator,
Thread::ThreadFactory& thread_factory);

// UpstreamKafkaFacade
KafkaProducer& getProducerForTopic(const std::string& topic) override;

size_t getProducerCountForTest() const;

private:
ThreadLocal::SlotPtr tls_;
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
11 changes: 11 additions & 0 deletions contrib/kafka/filters/network/test/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "upstream_kafka_facade_unit_test",
srcs = ["upstream_kafka_facade_unit_test.cc"],
tags = ["skip_on_windows"],
deps = [
"//contrib/kafka/filters/network/source/mesh:upstream_kafka_facade_lib",
"//test/mocks/thread_local:thread_local_mocks",
"//test/test_common:thread_factory_for_test_lib",
],
)

envoy_cc_test(
name = "abstract_command_unit_test",
srcs = ["abstract_command_unit_test.cc"],
Expand Down
Loading