Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka-mesh filter #11936

Merged
merged 114 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
7965784
Very simple Kafka-mesh filter, no docs nor automated tests
adamkotwasinski Jul 7, 2020
d34d921
Windows
adamkotwasinski Jul 9, 2020
459b843
Simple integration test
adamkotwasinski Aug 10, 2020
606a7bc
Unit test stubs + formatting
adamkotwasinski Aug 10, 2020
672e62a
Upstream Kafka Facade tests
adamkotwasinski Aug 14, 2020
bac31a9
Upstream Kafka Client tests + formatting
adamkotwasinski Aug 14, 2020
6dcba99
Splitter tests
adamkotwasinski Aug 20, 2020
f6eca04
Filter unit test
adamkotwasinski Aug 21, 2020
cc2d817
Tests for command handlers
adamkotwasinski Sep 8, 2020
5059df4
More Upstream Kafka Client tests
adamkotwasinski Sep 9, 2020
309435b
More Upstream Kafka Facade and Splitter tests
adamkotwasinski Sep 11, 2020
745829c
Serialization
adamkotwasinski Sep 11, 2020
a53a9f3
Additional dependency setup
adamkotwasinski Oct 9, 2020
8e14b2a
Format fixes
adamkotwasinski Oct 23, 2020
7ec9a30
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 6, 2020
4a89311
Produce request unit tests (record extractor that changes a single by…
adamkotwasinski Oct 23, 2020
9478d2f
Formatting
adamkotwasinski Nov 7, 2020
de3dae3
Replace volatile with atomic in upstream Kafka client
adamkotwasinski Nov 7, 2020
fdef913
Proper comparing sizes in produce handler
adamkotwasinski Nov 7, 2020
0d8b193
Move to single config source, but use compile pragmas to generate thr…
adamkotwasinski Nov 9, 2020
19ce0aa
Fix clang-tidy errors; make tests skip on Windows
adamkotwasinski Nov 10, 2020
9584896
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 10, 2020
50b1c84
Disable librdkafka's zstd
adamkotwasinski Nov 10, 2020
3555d5f
Fix failing serialization tests
adamkotwasinski Nov 11, 2020
0b72f93
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 20, 2020
7a196e1
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 24, 2020
1d7217c
Make integration test use v3
adamkotwasinski Nov 25, 2020
f807f78
Add second integration test for multiple records in single produce re…
adamkotwasinski Nov 25, 2020
c1faf02
Randomly generate payloads + refactor integration test a little
adamkotwasinski Nov 25, 2020
c253120
Address some coverage issues
adamkotwasinski Nov 26, 2020
fc760c5
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 28, 2020
5417dc8
More coverage fixes
adamkotwasinski Nov 28, 2020
97ba844
Remove KafkaProducerWrapper (mock whole Kafka Producer API)
adamkotwasinski Dec 2, 2020
be62faa
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Feb 1, 2021
e58eca7
Upgrade to librdkafka 1.6.0 (+remove mklove patch for 'ar' step)
adamkotwasinski Feb 2, 2021
76f7349
Patch librdkafka's mklove again due to ar getting duplicate parameter…
adamkotwasinski Feb 2, 2021
b2c9fa5
Reduce librdkafka build scope (found due to gcc build issues with lib…
adamkotwasinski Feb 3, 2021
59e0ffe
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Feb 3, 2021
cb365bb
Missing coverage for produce request handler (parsing request into re…
adamkotwasinski Feb 4, 2021
42bc5d5
Add a test for upstream Kafka client shutdown (verify that polling fo…
adamkotwasinski Feb 5, 2021
8692e2b
Add missing coverage tests: serialization, produce request handling m…
adamkotwasinski Feb 5, 2021
7e2e171
Rename + format
adamkotwasinski Feb 5, 2021
fd1b9c8
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Feb 5, 2021
0401a8c
Category
adamkotwasinski Feb 5, 2021
4eed28d
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Apr 19, 2021
3b13ea6
More formatting
adamkotwasinski Apr 20, 2021
886f1eb
Memcpy in serialization.h
adamkotwasinski Apr 20, 2021
7207d48
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Apr 21, 2021
a38f840
Logging, formatting
adamkotwasinski Apr 23, 2021
e4cee47
Docs and renames
adamkotwasinski Apr 23, 2021
cf46109
Protos
adamkotwasinski Apr 24, 2021
3d279f6
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Apr 27, 2021
acc2ef2
Windows exceptions
adamkotwasinski Apr 27, 2021
ba3c9f1
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Apr 29, 2021
b1c6663
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Apr 29, 2021
7a0f3af
Wider extend of Produce requests accepted and documentation
adamkotwasinski Apr 30, 2021
c6336a3
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Apr 30, 2021
c19734d
Formatting
adamkotwasinski Apr 30, 2021
1685776
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Jun 1, 2021
1b3b69b
Merge master
adamkotwasinski Jun 1, 2021
13fa3a8
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Jun 22, 2021
8130158
Adopt new includes
adamkotwasinski Jun 22, 2021
65d43c7
Handle incorrect record length better
adamkotwasinski Jun 23, 2021
ca0d7bf
Uptake the recent changes
adamkotwasinski Jun 23, 2021
bae28c2
Update librdkafka to 1.7.0
adamkotwasinski Jun 23, 2021
decae17
Librdkafka release date
adamkotwasinski Jun 23, 2021
2cff84e
kafka: deserializer for varlen-encoded int32_t and int64_t
adamkotwasinski Jun 30, 2021
b9edc84
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Jun 30, 2021
24a8634
kafka: add missing docs and tests
adamkotwasinski Jul 8, 2021
5bd1a50
kafka: cleanup some signatures in test helpers + format
adamkotwasinski Jul 8, 2021
58996ef
Merge commit '5bd1a500fca3e1bfbf2185e617a9cb363d4f54bd' into HEAD
adamkotwasinski Jul 8, 2021
839240f
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Jul 15, 2021
752fc04
kafka: add a filter stub for mesh filter
adamkotwasinski Jul 15, 2021
625132b
Remove 'waiting' label
adamkotwasinski Jul 19, 2021
81a7e93
Merge remote-tracking branch 'envoy/main' into merge-with-stub
adamkotwasinski Jul 20, 2021
c4be85a
Merge remote-tracking branch 'remotes/github/filter-stub' into merge-…
adamkotwasinski Jul 20, 2021
6d62278
Merge
adamkotwasinski Jul 20, 2021
9a422eb
Merge remote-tracking branch 'envoy/main' into merge-with-stub
adamkotwasinski Jul 22, 2021
935d48c
Add generated constants for request api keys and max version
adamkotwasinski Apr 30, 2021
df39dd2
Merge remote-tracking branch 'envoy/main' into main
adamkotwasinski Aug 4, 2021
99ec527
Format
adamkotwasinski Aug 4, 2021
969a8fd
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 17, 2021
a92c31d
More merge fixes after the metadata PR got merged
adamkotwasinski Aug 17, 2021
26ad3ca
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 17, 2021
1b9c553
Missing tests
adamkotwasinski Aug 17, 2021
eeacc41
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 18, 2021
c7b36b1
Move to contrib
adamkotwasinski Aug 18, 2021
52eecbb
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 19, 2021
ce055e2
Protos
adamkotwasinski Aug 19, 2021
f870ea2
Integration test
adamkotwasinski Aug 19, 2021
a59244b
Format
adamkotwasinski Aug 19, 2021
53e50b7
Add missing test references
adamkotwasinski Aug 19, 2021
a70ba4e
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 21, 2021
943351b
Real merge after upstream kafka facade got into mainline
adamkotwasinski Aug 21, 2021
b315a0b
kafka: produce request for mesh-filter
adamkotwasinski Aug 23, 2021
91575de
kafka: record extractor for produce requests in mesh-filter
adamkotwasinski Aug 23, 2021
bf706df
Merge remote-tracking branch 'github/kafka-mesh-produce' into kafka-m…
adamkotwasinski Aug 24, 2021
4d2962e
More merge fixes
adamkotwasinski Aug 24, 2021
df17fd0
Merge remote-tracking branch 'github/kafka-mesh-record-extractor' int…
adamkotwasinski Aug 24, 2021
e49c1ee
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 25, 2021
f5120eb
kafka: record extractor for produce requests in mesh-filter
adamkotwasinski Aug 23, 2021
7078ecb
Code cleanup
adamkotwasinski Aug 25, 2021
2839755
Merge remote-tracking branch 'github/kafka-mesh-record-extractor' int…
adamkotwasinski Aug 25, 2021
ea3b318
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 25, 2021
1ef67e9
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 26, 2021
ed36384
Formatting
adamkotwasinski Aug 26, 2021
34c11fe
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 30, 2021
c01ded7
Fixes
adamkotwasinski Aug 30, 2021
4b267e8
Cleanup
adamkotwasinski Aug 30, 2021
73549a0
Cleanup
adamkotwasinski Aug 31, 2021
d3b549d
Cleanup
adamkotwasinski Sep 1, 2021
7b4ec98
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Sep 9, 2021
57ff9c8
Docs fixes; inlined filter name
adamkotwasinski Sep 9, 2021
3b6b3cf
Remove well_known_names reference and other includes
adamkotwasinski Sep 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ proto_library(
"//contrib/envoy/extensions/filters/http/squash/v3:pkg",
"//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/kafka_broker/v3:pkg",
"//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/mysql_proxy/v3:pkg",
"//contrib/envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/rocketmq_proxy/v3:pkg",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
syntax = "proto3";

package envoy.extensions.filters.network.kafka_mesh.v3alpha;

import "udpa/annotations/status.proto";
import "validate/validate.proto";

option java_package = "io.envoyproxy.envoy.extensions.filters.network.kafka_mesh.v3alpha";
option java_outer_classname = "KafkaMeshProto";
option java_multiple_files = true;
option (udpa.annotations.file_status).work_in_progress = true;
option (udpa.annotations.file_status).package_version_status = ACTIVE;

// [#protodoc-title: Kafka Mesh]
// Kafka Mesh :ref:`configuration overview <config_network_filters_kafka_mesh>`.
// [#extension: envoy.filters.network.kafka_mesh]

message KafkaMesh {
// Envoy's host that's advertised to clients.
// Has the same meaning as corresponding Kafka broker properties.
// Usually equal to filter chain's listener config, but needs to be reachable by clients
// (so 0.0.0.0 will not work).
string advertised_host = 1 [(validate.rules).string = {min_len: 1}];

// Envoy's port that's advertised to clients.
int32 advertised_port = 2 [(validate.rules).int32 = {gt: 0}];

// Upstream clusters this filter will connect to.
repeated KafkaClusterDefinition upstream_clusters = 3;

// Rules that will decide which cluster gets which request.
repeated ForwardingRule forwarding_rules = 4;
}

message KafkaClusterDefinition {
// Cluster name.
string cluster_name = 1 [(validate.rules).string = {min_len: 1}];

// Kafka cluster address.
string bootstrap_servers = 2 [(validate.rules).string = {min_len: 1}];

// Default number of partitions present in this cluster.
// This is especially important for clients that do not specify partition in their payloads and depend on this value for hashing.
int32 partition_count = 3 [(validate.rules).int32 = {gt: 0}];

// Custom configuration passed to Kafka producer.
map<string, string> producer_config = 4;
}

message ForwardingRule {
// Cluster name.
string target_cluster = 1;

oneof trigger {
// Intended place for future types of forwarding rules.
string topic_prefix = 2;
}
}
1 change: 1 addition & 0 deletions api/versioning/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ proto_library(
"//contrib/envoy/extensions/filters/http/squash/v3:pkg",
"//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/kafka_broker/v3:pkg",
"//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/mysql_proxy/v3:pkg",
"//contrib/envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/rocketmq_proxy/v3:pkg",
Expand Down
5 changes: 5 additions & 0 deletions bazel/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -586,3 +586,8 @@ alias(
name = "remote_jdk11",
actual = "@bazel_tools//tools/jdk:remote_jdk11",
)

alias(
name = "windows",
actual = "@bazel_tools//src/conditions:windows",
)
4 changes: 2 additions & 2 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ REPOSITORY_LOCATIONS_SPEC = dict(
strip_prefix = "kafka-{version}/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/{version}.zip"],
use_category = ["dataplane_ext"],
extensions = ["envoy.filters.network.kafka_broker"],
extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"],
release_date = "2020-03-03",
cpe = "cpe:2.3:a:apache:kafka:*",
),
Expand All @@ -948,7 +948,7 @@ REPOSITORY_LOCATIONS_SPEC = dict(
strip_prefix = "librdkafka-{version}",
urls = ["https://github.com/edenhill/librdkafka/archive/v{version}.tar.gz"],
use_category = ["dataplane_ext"],
extensions = ["envoy.filters.network.kafka_broker"],
extensions = ["envoy.filters.network.kafka_mesh"],
release_date = "2021-05-10",
cpe = "N/A",
),
Expand Down
1 change: 1 addition & 0 deletions contrib/contrib_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ CONTRIB_EXTENSIONS = {
#

"envoy.filters.network.kafka_broker": "//contrib/kafka/filters/network/source:kafka_broker_config_lib",
"envoy.filters.network.kafka_mesh": "//contrib/kafka/filters/network/source/mesh:config_lib",
"envoy.filters.network.mysql_proxy": "//contrib/mysql_proxy/filters/network/source:config",
"envoy.filters.network.postgres_proxy": "//contrib/postgres_proxy/filters/network/source:config",
"envoy.filters.network.rocketmq_proxy": "//contrib/rocketmq_proxy/filters/network/source:config",
Expand Down
5 changes: 5 additions & 0 deletions contrib/extensions_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ envoy.filters.network.kafka_broker:
- envoy.filters.network
security_posture: requires_trusted_downstream_and_upstream
status: wip
envoy.filters.network.kafka_mesh:
categories:
- envoy.filters.network
security_posture: requires_trusted_downstream_and_upstream
status: wip
envoy.filters.network.rocketmq_proxy:
categories:
- envoy.filters.network
Expand Down
24 changes: 24 additions & 0 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_contrib_extension",
"envoy_cc_library",
"envoy_contrib_package",
)
Expand All @@ -10,6 +11,25 @@ licenses(["notice"]) # Apache 2
envoy_contrib_package()

# Kafka-mesh network filter.
# Mesh filter public docs: docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst

envoy_cc_contrib_extension(
name = "config_lib",
srcs = ["config.cc"],
hdrs = ["config.h"],
deps = [
"//envoy/registry",
"//source/extensions/filters/network/common:factory_base_lib",
"@envoy_api//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg_cc_proto",
] + select({
"//bazel:windows": [],
"//conditions:default": [
":filter_lib",
":upstream_config_lib",
":upstream_kafka_facade_lib",
],
}),
)

envoy_cc_library(
name = "filter_lib",
Expand Down Expand Up @@ -121,11 +141,15 @@ envoy_cc_library(
envoy_cc_library(
name = "upstream_config_lib",
srcs = [
"upstream_config.cc",
],
hdrs = [
"upstream_config.h",
],
tags = ["skip_on_windows"],
deps = [
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"@envoy_api//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg_cc_proto",
],
)
55 changes: 55 additions & 0 deletions contrib/kafka/filters/network/source/mesh/config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "contrib/kafka/filters/network/source/mesh/config.h"

#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"
#include "envoy/stats/scope.h"

#ifndef WIN32
#include "contrib/kafka/filters/network/source/mesh/upstream_config.h"
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h"
#include "contrib/kafka/filters/network/source/mesh/filter.h"
#else
#include "envoy/common/exception.h"
#endif

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// The mesh filter doesn't do anything special, it just sets up the shared entities.
// Any extra configuration validation is done in UpstreamKafkaConfiguration constructor.
Network::FilterFactoryCb KafkaMeshConfigFactory::createFilterFactoryFromProtoTyped(
const KafkaMeshProtoConfig& config, Server::Configuration::FactoryContext& context) {

#ifdef WIN32
throw EnvoyException("Kafka mesh filter is not supported on Windows");
#else
// Shared configuration (tells us where the upstream clusters are).
const UpstreamKafkaConfigurationSharedPtr configuration =
std::make_shared<UpstreamKafkaConfigurationImpl>(config);

// Shared upstream facade (connects us to upstream Kafka clusters).
const UpstreamKafkaFacadeSharedPtr upstream_kafka_facade =
std::make_shared<UpstreamKafkaFacadeImpl>(*configuration, context.threadLocal(),
context.api().threadFactory());

return [configuration, upstream_kafka_facade](Network::FilterManager& filter_manager) -> void {
Network::ReadFilterSharedPtr filter =
std::make_shared<KafkaMeshFilter>(*configuration, *upstream_kafka_facade);
filter_manager.addReadFilter(filter);
};
#endif
}

/**
* Static registration for the Kafka filter. @see RegisterFactory.
*/
REGISTER_FACTORY(KafkaMeshConfigFactory, Server::Configuration::NamedNetworkFilterConfigFactory);

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
33 changes: 33 additions & 0 deletions contrib/kafka/filters/network/source/mesh/config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "source/extensions/filters/network/common/factory_base.h"

#include "contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.pb.h"
#include "contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.pb.validate.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

using KafkaMeshProtoConfig = envoy::extensions::filters::network::kafka_mesh::v3alpha::KafkaMesh;

/**
* Config registration for the Kafka mesh filter.
*/
class KafkaMeshConfigFactory : public Common::FactoryBase<KafkaMeshProtoConfig> {
public:
KafkaMeshConfigFactory() : FactoryBase("envoy.filters.network.kafka_mesh", true) {}

private:
Network::FilterFactoryCb
createFilterFactoryFromProtoTyped(const KafkaMeshProtoConfig& config,
Server::Configuration::FactoryContext& context) override;
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
93 changes: 93 additions & 0 deletions contrib/kafka/filters/network/source/mesh/upstream_config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include "contrib/kafka/filters/network/source/mesh/upstream_config.h"

#include "envoy/common/exception.h"

#include "source/common/common/assert.h"

#include "absl/strings/str_cat.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

using KafkaClusterDefinition =
envoy::extensions::filters::network::kafka_mesh::v3alpha::KafkaClusterDefinition;
using ForwardingRule = envoy::extensions::filters::network::kafka_mesh::v3alpha::ForwardingRule;

UpstreamKafkaConfigurationImpl::UpstreamKafkaConfigurationImpl(const KafkaMeshProtoConfig& config)
: advertised_address_{config.advertised_host(), config.advertised_port()} {

// Processing cluster data.
const auto& upstream_clusters = config.upstream_clusters();
if (upstream_clusters.empty()) {
throw EnvoyException("kafka-mesh filter needs to have at least one upstream Kafka cluster");
}

// Processing cluster configuration.
std::map<std::string, ClusterConfig> cluster_name_to_cluster_config;
for (const auto& upstream_cluster_definition : upstream_clusters) {
const std::string& cluster_name = upstream_cluster_definition.cluster_name();

// No duplicates are allowed.
if (cluster_name_to_cluster_config.find(cluster_name) != cluster_name_to_cluster_config.end()) {
throw EnvoyException(
absl::StrCat("kafka-mesh filter has multiple Kafka clusters referenced by the same name",
cluster_name));
}

// Upstream client configuration - use all the optional custom configs provided, and then use
// the target IPs.
std::map<std::string, std::string> producer_configs = {
upstream_cluster_definition.producer_config().begin(),
upstream_cluster_definition.producer_config().end()};
producer_configs["bootstrap.servers"] = upstream_cluster_definition.bootstrap_servers();
ClusterConfig cluster_config = {cluster_name, upstream_cluster_definition.partition_count(),
producer_configs};
cluster_name_to_cluster_config[cluster_name] = cluster_config;
}

// Processing forwarding rules.
const auto& forwarding_rules = config.forwarding_rules();
if (forwarding_rules.empty()) {
throw EnvoyException("kafka-mesh filter needs to have at least one forwarding rule");
}

for (const auto& rule : forwarding_rules) {
const std::string& target_cluster = rule.target_cluster();
ASSERT(rule.trigger_case() == ForwardingRule::TriggerCase::kTopicPrefix);
ENVOY_LOG(trace, "Setting up forwarding rule: {} -> {}", rule.topic_prefix(), target_cluster);
// Each forwarding rule needs to reference a cluster.
if (cluster_name_to_cluster_config.find(target_cluster) ==
cluster_name_to_cluster_config.end()) {
throw EnvoyException(absl::StrCat(
"kafka-mesh filter forwarding rule is referencing unknown upstream Kafka cluster: ",
target_cluster));
}
topic_prefix_to_cluster_config_[rule.topic_prefix()] =
cluster_name_to_cluster_config[target_cluster];
}
}

absl::optional<ClusterConfig>
UpstreamKafkaConfigurationImpl::computeClusterConfigForTopic(const std::string& topic) const {
// We find the first matching prefix (this is why ordering is important).
for (const auto& it : topic_prefix_to_cluster_config_) {
if (topic.rfind(it.first, 0) == 0) {
const ClusterConfig cluster_config = it.second;
return absl::make_optional(cluster_config);
}
}
return absl::nullopt;
}

std::pair<std::string, int32_t> UpstreamKafkaConfigurationImpl::getAdvertisedAddress() const {
return advertised_address_;
}

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Loading