Skip to content

Commit b315a0b

Browse files
kafka: produce request for mesh-filter
Signed-off-by: Adam Kotwasinski <adam.kotwasinski@gmail.com>
1 parent b9eada5 commit b315a0b

16 files changed

+685
-20
lines changed

contrib/kafka/filters/network/source/mesh/BUILD

+2
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ envoy_cc_library(
4343
deps = [
4444
":abstract_command_lib",
4545
":upstream_config_lib",
46+
":upstream_kafka_facade_lib",
4647
"//contrib/kafka/filters/network/source:kafka_request_codec_lib",
4748
"//contrib/kafka/filters/network/source:kafka_request_parser_lib",
4849
"//contrib/kafka/filters/network/source/mesh/command_handlers:api_versions_lib",
4950
"//contrib/kafka/filters/network/source/mesh/command_handlers:metadata_lib",
51+
"//contrib/kafka/filters/network/source/mesh/command_handlers:produce_lib",
5052
"//source/common/common:minimal_logger_lib",
5153
],
5254
)

contrib/kafka/filters/network/source/mesh/command_handlers/BUILD

+47
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,53 @@ envoy_contrib_package()
1010

1111
# Handlers for particular Kafka requests that are used by Kafka-mesh filter.
1212

13+
envoy_cc_library(
14+
name = "produce_lib",
15+
srcs = [
16+
"produce.cc",
17+
],
18+
hdrs = [
19+
"produce.h",
20+
],
21+
tags = ["skip_on_windows"],
22+
deps = [
23+
":produce_outbound_record_lib",
24+
":produce_record_extractor_lib",
25+
"//contrib/kafka/filters/network/source:kafka_request_parser_lib",
26+
"//contrib/kafka/filters/network/source:kafka_response_parser_lib",
27+
"//contrib/kafka/filters/network/source/mesh:abstract_command_lib",
28+
"//contrib/kafka/filters/network/source/mesh:upstream_kafka_facade_lib",
29+
"//source/common/common:minimal_logger_lib",
30+
],
31+
)
32+
33+
envoy_cc_library(
34+
name = "produce_outbound_record_lib",
35+
srcs = [
36+
],
37+
hdrs = [
38+
"produce_outbound_record.h",
39+
],
40+
tags = ["skip_on_windows"],
41+
deps = [
42+
],
43+
)
44+
45+
envoy_cc_library(
46+
name = "produce_record_extractor_lib",
47+
srcs = [
48+
"produce_record_extractor.cc",
49+
],
50+
hdrs = [
51+
"produce_record_extractor.h",
52+
],
53+
tags = ["skip_on_windows"],
54+
deps = [
55+
":produce_outbound_record_lib",
56+
"//contrib/kafka/filters/network/source:kafka_request_parser_lib",
57+
],
58+
)
59+
1360
envoy_cc_library(
1461
name = "metadata_lib",
1562
srcs = [
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce.h"
2+
3+
#include "contrib/kafka/filters/network/source/external/responses.h"
4+
5+
namespace Envoy {
6+
namespace Extensions {
7+
namespace NetworkFilters {
8+
namespace Kafka {
9+
namespace Mesh {
10+
11+
constexpr static int16_t NO_ERROR = 0;
12+
13+
ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter,
14+
UpstreamKafkaFacade& kafka_facade,
15+
const std::shared_ptr<Request<ProduceRequest>> request)
16+
: ProduceRequestHolder{filter, kafka_facade, PlaceholderRecordExtractor{}, request} {};
17+
18+
ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter,
19+
UpstreamKafkaFacade& kafka_facade,
20+
const RecordExtractor& record_extractor,
21+
const std::shared_ptr<Request<ProduceRequest>> request)
22+
: BaseInFlightRequest{filter}, kafka_facade_{kafka_facade}, request_{request} {
23+
outbound_records_ = record_extractor.extractRecords(request_->data_.topics_);
24+
expected_responses_ = outbound_records_.size();
25+
}
26+
27+
void ProduceRequestHolder::startProcessing() {
28+
// Main part of the proxy: for each outbound record we get the appropriate sink (effectively a
29+
// facade for upstream Kafka cluster), and send the record to it.
30+
for (const auto& outbound_record : outbound_records_) {
31+
KafkaProducer& producer = kafka_facade_.getProducerForTopic(outbound_record.topic_);
32+
// We need to provide our object as first argument, as we will want to be notified when the
33+
// delivery finishes.
34+
producer.send(shared_from_this(), outbound_record.topic_, outbound_record.partition_,
35+
outbound_record.key_, outbound_record.value_);
36+
}
37+
// Corner case handling:
38+
// If we ever receive produce request without records, we need to notify the filter we are ready,
39+
// because otherwise no notification will ever come from the real Kafka producer.
40+
if (finished()) {
41+
notifyFilter();
42+
}
43+
}
44+
45+
bool ProduceRequestHolder::finished() const { return 0 == expected_responses_; }
46+
47+
// Find a record that matches provided delivery confirmation coming from Kafka producer.
48+
// If all the records got their delivery data filled in, we are done, and can notify the origin
49+
// filter.
50+
bool ProduceRequestHolder::accept(const DeliveryMemento& memento) {
51+
for (auto& outbound_record : outbound_records_) {
52+
if (outbound_record.value_.data() == memento.data_) {
53+
// We have matched the downstream request that matches our confirmation from upstream Kafka.
54+
outbound_record.error_code_ = memento.error_code_;
55+
outbound_record.saved_offset_ = memento.offset_;
56+
--expected_responses_;
57+
if (finished()) {
58+
// All elements had their responses matched.
59+
ENVOY_LOG(trace, "All deliveries finished for produce request {}",
60+
request_->request_header_.correlation_id_);
61+
notifyFilter();
62+
}
63+
return true;
64+
}
65+
}
66+
return false;
67+
}
68+
69+
AbstractResponseSharedPtr ProduceRequestHolder::computeAnswer() const {
70+
71+
// Header.
72+
const RequestHeader& rh = request_->request_header_;
73+
ResponseMetadata metadata = {rh.api_key_, rh.api_version_, rh.correlation_id_};
74+
75+
// Real answer.
76+
using ErrorCodeAndOffset = std::pair<int16_t, uint32_t>;
77+
std::map<std::string, std::map<int32_t, ErrorCodeAndOffset>> topic_to_partition_responses;
78+
for (const auto& outbound_record : outbound_records_) {
79+
auto& partition_map = topic_to_partition_responses[outbound_record.topic_];
80+
auto it = partition_map.find(outbound_record.partition_);
81+
if (it == partition_map.end()) {
82+
partition_map[outbound_record.partition_] = {outbound_record.error_code_,
83+
outbound_record.saved_offset_};
84+
} else {
85+
// Proxy logic - aggregating multiple upstream answers into single downstream answer.
86+
// Let's fail if anything fails, otherwise use the lowest offset (like Kafka would have done).
87+
ErrorCodeAndOffset& curr = it->second;
88+
if (NO_ERROR == curr.first) {
89+
curr.first = outbound_record.error_code_;
90+
curr.second = std::min(curr.second, outbound_record.saved_offset_);
91+
}
92+
}
93+
}
94+
95+
std::vector<TopicProduceResponse> topic_responses;
96+
for (const auto& topic_entry : topic_to_partition_responses) {
97+
std::vector<PartitionProduceResponse> partition_responses;
98+
for (const auto& partition_entry : topic_entry.second) {
99+
const int32_t& partition = partition_entry.first;
100+
const int16_t& error_code = partition_entry.second.first;
101+
const int64_t& offset = partition_entry.second.second;
102+
partition_responses.emplace_back(partition, error_code, offset);
103+
}
104+
const std::string& topic = topic_entry.first;
105+
topic_responses.emplace_back(topic, partition_responses);
106+
}
107+
108+
ProduceResponse data = {topic_responses, 0};
109+
return std::make_shared<Response<ProduceResponse>>(metadata, data);
110+
}
111+
112+
} // namespace Mesh
113+
} // namespace Kafka
114+
} // namespace NetworkFilters
115+
} // namespace Extensions
116+
} // namespace Envoy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#pragma once
2+
3+
#include "contrib/kafka/filters/network/source/external/requests.h"
4+
#include "contrib/kafka/filters/network/source/mesh/abstract_command.h"
5+
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_outbound_record.h"
6+
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h"
7+
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h"
8+
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h"
9+
10+
namespace Envoy {
11+
namespace Extensions {
12+
namespace NetworkFilters {
13+
namespace Kafka {
14+
namespace Mesh {
15+
16+
/**
17+
* Kafka 'Produce' request, that is aimed at particular cluster.
18+
* A single Produce request coming from downstream can map into multiple entries,
19+
* as the topics can be hosted on different clusters.
20+
*/
21+
class ProduceRequestHolder : public BaseInFlightRequest,
22+
public ProduceFinishCb,
23+
public std::enable_shared_from_this<ProduceRequestHolder> {
24+
public:
25+
ProduceRequestHolder(AbstractRequestListener& filter, UpstreamKafkaFacade& kafka_facade,
26+
const std::shared_ptr<Request<ProduceRequest>> request);
27+
28+
// Visible for testing.
29+
ProduceRequestHolder(AbstractRequestListener& filter, UpstreamKafkaFacade& kafka_facade,
30+
const RecordExtractor& record_extractor,
31+
const std::shared_ptr<Request<ProduceRequest>> request);
32+
33+
// AbstractInFlightRequest
34+
void startProcessing() override;
35+
36+
// AbstractInFlightRequest
37+
bool finished() const override;
38+
39+
// AbstractInFlightRequest
40+
AbstractResponseSharedPtr computeAnswer() const override;
41+
42+
// ProduceFinishCb
43+
bool accept(const DeliveryMemento& memento) override;
44+
45+
private:
46+
// Access to Kafka producers pointing to upstream Kafka clusters.
47+
UpstreamKafkaFacade& kafka_facade_;
48+
49+
// Original request.
50+
const std::shared_ptr<Request<ProduceRequest>> request_;
51+
52+
// How many responses from Kafka Producer handling our request we still expect.
53+
// This value decreases to 0 as we get confirmations from Kafka (successful or not).
54+
int expected_responses_;
55+
56+
// Real records extracted out of request.
57+
std::vector<OutboundRecord> outbound_records_;
58+
};
59+
60+
} // namespace Mesh
61+
} // namespace Kafka
62+
} // namespace NetworkFilters
63+
} // namespace Extensions
64+
} // namespace Envoy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
3+
#include <string>
4+
5+
#include "absl/strings/string_view.h"
6+
7+
namespace Envoy {
8+
namespace Extensions {
9+
namespace NetworkFilters {
10+
namespace Kafka {
11+
namespace Mesh {
12+
13+
// Binds a single inbound record from Kafka client with its delivery information.
14+
struct OutboundRecord {
15+
16+
// These fields were received from downstream.
17+
const std::string topic_;
18+
const int32_t partition_;
19+
const absl::string_view key_;
20+
const absl::string_view value_;
21+
22+
// These fields will get updated when delivery to upstream Kafka cluster finishes.
23+
int16_t error_code_;
24+
uint32_t saved_offset_;
25+
26+
OutboundRecord(const std::string& topic, const int32_t partition, const absl::string_view key,
27+
const absl::string_view value)
28+
: topic_{topic}, partition_{partition}, key_{key}, value_{value}, error_code_{0},
29+
saved_offset_{0} {};
30+
};
31+
32+
} // namespace Mesh
33+
} // namespace Kafka
34+
} // namespace NetworkFilters
35+
} // namespace Extensions
36+
} // namespace Envoy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h"
2+
3+
namespace Envoy {
4+
namespace Extensions {
5+
namespace NetworkFilters {
6+
namespace Kafka {
7+
namespace Mesh {
8+
9+
std::vector<OutboundRecord>
10+
PlaceholderRecordExtractor::extractRecords(const std::vector<TopicProduceData>&) const {
11+
return {};
12+
}
13+
14+
} // namespace Mesh
15+
} // namespace Kafka
16+
} // namespace NetworkFilters
17+
} // namespace Extensions
18+
} // namespace Envoy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
3+
#include "contrib/kafka/filters/network/source/external/requests.h"
4+
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_outbound_record.h"
5+
6+
namespace Envoy {
7+
namespace Extensions {
8+
namespace NetworkFilters {
9+
namespace Kafka {
10+
namespace Mesh {
11+
12+
/**
13+
* Dependency injection class responsible for extracting records out of produce request's contents.
14+
*/
15+
class RecordExtractor {
16+
public:
17+
virtual ~RecordExtractor() = default;
18+
19+
virtual std::vector<OutboundRecord>
20+
extractRecords(const std::vector<TopicProduceData>& data) const PURE;
21+
};
22+
23+
/**
24+
* Just a placeholder for now.
25+
*/
26+
class PlaceholderRecordExtractor : public RecordExtractor {
27+
public:
28+
std::vector<OutboundRecord>
29+
extractRecords(const std::vector<TopicProduceData>& data) const override;
30+
};
31+
32+
} // namespace Mesh
33+
} // namespace Kafka
34+
} // namespace NetworkFilters
35+
} // namespace Extensions
36+
} // namespace Envoy

contrib/kafka/filters/network/source/mesh/filter.cc

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ namespace NetworkFilters {
1414
namespace Kafka {
1515
namespace Mesh {
1616

17-
KafkaMeshFilter::KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration)
17+
KafkaMeshFilter::KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration,
18+
UpstreamKafkaFacade& upstream_kafka_facade)
1819
: KafkaMeshFilter{std::make_shared<RequestDecoder>(std::vector<RequestCallbackSharedPtr>(
19-
{std::make_shared<RequestProcessor>(*this, configuration)}))} {}
20+
{std::make_shared<RequestProcessor>(*this, configuration, upstream_kafka_facade)}))} {}
2021

2122
KafkaMeshFilter::KafkaMeshFilter(RequestDecoderSharedPtr request_decoder)
2223
: request_decoder_{request_decoder} {}

0 commit comments

Comments
 (0)