|
| 1 | +#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce.h" |
| 2 | + |
| 3 | +#include "contrib/kafka/filters/network/source/external/responses.h" |
| 4 | + |
| 5 | +namespace Envoy { |
| 6 | +namespace Extensions { |
| 7 | +namespace NetworkFilters { |
| 8 | +namespace Kafka { |
| 9 | +namespace Mesh { |
| 10 | + |
| 11 | +constexpr static int16_t NO_ERROR = 0; |
| 12 | + |
| 13 | +ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter, |
| 14 | + UpstreamKafkaFacade& kafka_facade, |
| 15 | + const std::shared_ptr<Request<ProduceRequest>> request) |
| 16 | + : ProduceRequestHolder{filter, kafka_facade, PlaceholderRecordExtractor{}, request} {}; |
| 17 | + |
| 18 | +ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter, |
| 19 | + UpstreamKafkaFacade& kafka_facade, |
| 20 | + const RecordExtractor& record_extractor, |
| 21 | + const std::shared_ptr<Request<ProduceRequest>> request) |
| 22 | + : BaseInFlightRequest{filter}, kafka_facade_{kafka_facade}, request_{request} { |
| 23 | + outbound_records_ = record_extractor.extractRecords(request_->data_.topics_); |
| 24 | + expected_responses_ = outbound_records_.size(); |
| 25 | +} |
| 26 | + |
| 27 | +void ProduceRequestHolder::startProcessing() { |
| 28 | + // Main part of the proxy: for each outbound record we get the appropriate sink (effectively a |
| 29 | + // facade for upstream Kafka cluster), and send the record to it. |
| 30 | + for (const auto& outbound_record : outbound_records_) { |
| 31 | + KafkaProducer& producer = kafka_facade_.getProducerForTopic(outbound_record.topic_); |
| 32 | + // We need to provide our object as first argument, as we will want to be notified when the |
| 33 | + // delivery finishes. |
| 34 | + producer.send(shared_from_this(), outbound_record.topic_, outbound_record.partition_, |
| 35 | + outbound_record.key_, outbound_record.value_); |
| 36 | + } |
| 37 | + // Corner case handling: |
| 38 | + // If we ever receive produce request without records, we need to notify the filter we are ready, |
| 39 | + // because otherwise no notification will ever come from the real Kafka producer. |
| 40 | + if (finished()) { |
| 41 | + notifyFilter(); |
| 42 | + } |
| 43 | +} |
| 44 | + |
| 45 | +bool ProduceRequestHolder::finished() const { return 0 == expected_responses_; } |
| 46 | + |
| 47 | +// Find a record that matches provided delivery confirmation coming from Kafka producer. |
| 48 | +// If all the records got their delivery data filled in, we are done, and can notify the origin |
| 49 | +// filter. |
| 50 | +bool ProduceRequestHolder::accept(const DeliveryMemento& memento) { |
| 51 | + for (auto& outbound_record : outbound_records_) { |
| 52 | + if (outbound_record.value_.data() == memento.data_) { |
| 53 | + // We have matched the downstream request that matches our confirmation from upstream Kafka. |
| 54 | + outbound_record.error_code_ = memento.error_code_; |
| 55 | + outbound_record.saved_offset_ = memento.offset_; |
| 56 | + --expected_responses_; |
| 57 | + if (finished()) { |
| 58 | + // All elements had their responses matched. |
| 59 | + ENVOY_LOG(trace, "All deliveries finished for produce request {}", |
| 60 | + request_->request_header_.correlation_id_); |
| 61 | + notifyFilter(); |
| 62 | + } |
| 63 | + return true; |
| 64 | + } |
| 65 | + } |
| 66 | + return false; |
| 67 | +} |
| 68 | + |
| 69 | +AbstractResponseSharedPtr ProduceRequestHolder::computeAnswer() const { |
| 70 | + |
| 71 | + // Header. |
| 72 | + const RequestHeader& rh = request_->request_header_; |
| 73 | + ResponseMetadata metadata = {rh.api_key_, rh.api_version_, rh.correlation_id_}; |
| 74 | + |
| 75 | + // Real answer. |
| 76 | + using ErrorCodeAndOffset = std::pair<int16_t, uint32_t>; |
| 77 | + std::map<std::string, std::map<int32_t, ErrorCodeAndOffset>> topic_to_partition_responses; |
| 78 | + for (const auto& outbound_record : outbound_records_) { |
| 79 | + auto& partition_map = topic_to_partition_responses[outbound_record.topic_]; |
| 80 | + auto it = partition_map.find(outbound_record.partition_); |
| 81 | + if (it == partition_map.end()) { |
| 82 | + partition_map[outbound_record.partition_] = {outbound_record.error_code_, |
| 83 | + outbound_record.saved_offset_}; |
| 84 | + } else { |
| 85 | + // Proxy logic - aggregating multiple upstream answers into single downstream answer. |
| 86 | + // Let's fail if anything fails, otherwise use the lowest offset (like Kafka would have done). |
| 87 | + ErrorCodeAndOffset& curr = it->second; |
| 88 | + if (NO_ERROR == curr.first) { |
| 89 | + curr.first = outbound_record.error_code_; |
| 90 | + curr.second = std::min(curr.second, outbound_record.saved_offset_); |
| 91 | + } |
| 92 | + } |
| 93 | + } |
| 94 | + |
| 95 | + std::vector<TopicProduceResponse> topic_responses; |
| 96 | + for (const auto& topic_entry : topic_to_partition_responses) { |
| 97 | + std::vector<PartitionProduceResponse> partition_responses; |
| 98 | + for (const auto& partition_entry : topic_entry.second) { |
| 99 | + const int32_t& partition = partition_entry.first; |
| 100 | + const int16_t& error_code = partition_entry.second.first; |
| 101 | + const int64_t& offset = partition_entry.second.second; |
| 102 | + partition_responses.emplace_back(partition, error_code, offset); |
| 103 | + } |
| 104 | + const std::string& topic = topic_entry.first; |
| 105 | + topic_responses.emplace_back(topic, partition_responses); |
| 106 | + } |
| 107 | + |
| 108 | + ProduceResponse data = {topic_responses, 0}; |
| 109 | + return std::make_shared<Response<ProduceResponse>>(metadata, data); |
| 110 | +} |
| 111 | + |
| 112 | +} // namespace Mesh |
| 113 | +} // namespace Kafka |
| 114 | +} // namespace NetworkFilters |
| 115 | +} // namespace Extensions |
| 116 | +} // namespace Envoy |
0 commit comments