Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka-mesh filter #11936

Merged
merged 114 commits into from
Sep 10, 2021
Merged
Changes from 1 commit
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
7965784
Very simple Kafka-mesh filter, no docs nor automated tests
adamkotwasinski Jul 7, 2020
d34d921
Windows
adamkotwasinski Jul 9, 2020
459b843
Simple integration test
adamkotwasinski Aug 10, 2020
606a7bc
Unit test stubs + formatting
adamkotwasinski Aug 10, 2020
672e62a
Upstream Kafka Facade tests
adamkotwasinski Aug 14, 2020
bac31a9
Upstream Kafka Client tests + formatting
adamkotwasinski Aug 14, 2020
6dcba99
Splitter tests
adamkotwasinski Aug 20, 2020
f6eca04
Filter unit test
adamkotwasinski Aug 21, 2020
cc2d817
Tests for command handlers
adamkotwasinski Sep 8, 2020
5059df4
More Upstream Kafka Client tests
adamkotwasinski Sep 9, 2020
309435b
More Upstream Kafka Facade and Splitter tests
adamkotwasinski Sep 11, 2020
745829c
Serialization
adamkotwasinski Sep 11, 2020
a53a9f3
Additional dependency setup
adamkotwasinski Oct 9, 2020
8e14b2a
Format fixes
adamkotwasinski Oct 23, 2020
7ec9a30
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 6, 2020
4a89311
Produce request unit tests (record extractor that changes a single by…
adamkotwasinski Oct 23, 2020
9478d2f
Formatting
adamkotwasinski Nov 7, 2020
de3dae3
Replace volatile with atomic in upstream Kafka client
adamkotwasinski Nov 7, 2020
fdef913
Proper comparing sizes in produce handler
adamkotwasinski Nov 7, 2020
0d8b193
Move to single config source, but use compile pragmas to generate thr…
adamkotwasinski Nov 9, 2020
19ce0aa
Fix clang-tidy errors; make tests skip on Windows
adamkotwasinski Nov 10, 2020
9584896
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 10, 2020
50b1c84
Disable librdkafka's zstd
adamkotwasinski Nov 10, 2020
3555d5f
Fix failing serialization tests
adamkotwasinski Nov 11, 2020
0b72f93
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 20, 2020
7a196e1
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 24, 2020
1d7217c
Make integration test use v3
adamkotwasinski Nov 25, 2020
f807f78
Add second integration test for multiple records in single produce re…
adamkotwasinski Nov 25, 2020
c1faf02
Randomly generate payloads + refactor integration test a little
adamkotwasinski Nov 25, 2020
c253120
Address some coverage issues
adamkotwasinski Nov 26, 2020
fc760c5
Merge remote-tracking branch 'envoy/master' into master
adamkotwasinski Nov 28, 2020
5417dc8
More coverage fixes
adamkotwasinski Nov 28, 2020
97ba844
Remove KafkaProducerWrapper (mock whole Kafka Producer API)
adamkotwasinski Dec 2, 2020
be62faa
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Feb 1, 2021
e58eca7
Upgrade to librdkafka 1.6.0 (+remove mklove patch for 'ar' step)
adamkotwasinski Feb 2, 2021
76f7349
Patch librdkafka's mklove again due to ar getting duplicate parameter…
adamkotwasinski Feb 2, 2021
b2c9fa5
Reduce librdkafka build scope (found due to gcc build issues with lib…
adamkotwasinski Feb 3, 2021
59e0ffe
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Feb 3, 2021
cb365bb
Missing coverage for produce request handler (parsing request into re…
adamkotwasinski Feb 4, 2021
42bc5d5
Add a test for upstream Kafka client shutdown (verify that polling fo…
adamkotwasinski Feb 5, 2021
8692e2b
Add missing coverage tests: serialization, produce request handling m…
adamkotwasinski Feb 5, 2021
7e2e171
Rename + format
adamkotwasinski Feb 5, 2021
fd1b9c8
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Feb 5, 2021
0401a8c
Category
adamkotwasinski Feb 5, 2021
4eed28d
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Apr 19, 2021
3b13ea6
More formatting
adamkotwasinski Apr 20, 2021
886f1eb
Memcpy in serialization.h
adamkotwasinski Apr 20, 2021
7207d48
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Apr 21, 2021
a38f840
Logging, formatting
adamkotwasinski Apr 23, 2021
e4cee47
Docs and renames
adamkotwasinski Apr 23, 2021
cf46109
Protos
adamkotwasinski Apr 24, 2021
3d279f6
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Apr 27, 2021
acc2ef2
Windows exceptions
adamkotwasinski Apr 27, 2021
ba3c9f1
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Apr 29, 2021
b1c6663
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Apr 29, 2021
7a0f3af
Wider extend of Produce requests accepted and documentation
adamkotwasinski Apr 30, 2021
c6336a3
Merge remote-tracking branch 'envoy/main' into master
adamkotwasinski Apr 30, 2021
c19734d
Formatting
adamkotwasinski Apr 30, 2021
1685776
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Jun 1, 2021
1b3b69b
Merge master
adamkotwasinski Jun 1, 2021
13fa3a8
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Jun 22, 2021
8130158
Adopt new includes
adamkotwasinski Jun 22, 2021
65d43c7
Handle incorrect record length better
adamkotwasinski Jun 23, 2021
ca0d7bf
Uptake the recent changes
adamkotwasinski Jun 23, 2021
bae28c2
Update librdkafka to 1.7.0
adamkotwasinski Jun 23, 2021
decae17
Librdkafka release date
adamkotwasinski Jun 23, 2021
2cff84e
kafka: deserializer for varlen-encoded int32_t and int64_t
adamkotwasinski Jun 30, 2021
b9edc84
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Jun 30, 2021
24a8634
kafka: add missing docs and tests
adamkotwasinski Jul 8, 2021
5bd1a50
kafka: cleanup some signatures in test helpers + format
adamkotwasinski Jul 8, 2021
58996ef
Merge commit '5bd1a500fca3e1bfbf2185e617a9cb363d4f54bd' into HEAD
adamkotwasinski Jul 8, 2021
839240f
Merge remote-tracking branch 'envoy/main' into HEAD
adamkotwasinski Jul 15, 2021
752fc04
kafka: add a filter stub for mesh filter
adamkotwasinski Jul 15, 2021
625132b
Remove 'waiting' label
adamkotwasinski Jul 19, 2021
81a7e93
Merge remote-tracking branch 'envoy/main' into merge-with-stub
adamkotwasinski Jul 20, 2021
c4be85a
Merge remote-tracking branch 'remotes/github/filter-stub' into merge-…
adamkotwasinski Jul 20, 2021
6d62278
Merge
adamkotwasinski Jul 20, 2021
9a422eb
Merge remote-tracking branch 'envoy/main' into merge-with-stub
adamkotwasinski Jul 22, 2021
935d48c
Add generated constants for request api keys and max version
adamkotwasinski Apr 30, 2021
df39dd2
Merge remote-tracking branch 'envoy/main' into main
adamkotwasinski Aug 4, 2021
99ec527
Format
adamkotwasinski Aug 4, 2021
969a8fd
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 17, 2021
a92c31d
More merge fixes after the metadata PR got merged
adamkotwasinski Aug 17, 2021
26ad3ca
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 17, 2021
1b9c553
Missing tests
adamkotwasinski Aug 17, 2021
eeacc41
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 18, 2021
c7b36b1
Move to contrib
adamkotwasinski Aug 18, 2021
52eecbb
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 19, 2021
ce055e2
Protos
adamkotwasinski Aug 19, 2021
f870ea2
Integration test
adamkotwasinski Aug 19, 2021
a59244b
Format
adamkotwasinski Aug 19, 2021
53e50b7
Add missing test references
adamkotwasinski Aug 19, 2021
a70ba4e
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 21, 2021
943351b
Real merge after upstream kafka facade got into mainline
adamkotwasinski Aug 21, 2021
b315a0b
kafka: produce request for mesh-filter
adamkotwasinski Aug 23, 2021
91575de
kafka: record extractor for produce requests in mesh-filter
adamkotwasinski Aug 23, 2021
bf706df
Merge remote-tracking branch 'github/kafka-mesh-produce' into kafka-m…
adamkotwasinski Aug 24, 2021
4d2962e
More merge fixes
adamkotwasinski Aug 24, 2021
df17fd0
Merge remote-tracking branch 'github/kafka-mesh-record-extractor' int…
adamkotwasinski Aug 24, 2021
e49c1ee
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 25, 2021
f5120eb
kafka: record extractor for produce requests in mesh-filter
adamkotwasinski Aug 23, 2021
7078ecb
Code cleanup
adamkotwasinski Aug 25, 2021
2839755
Merge remote-tracking branch 'github/kafka-mesh-record-extractor' int…
adamkotwasinski Aug 25, 2021
ea3b318
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 25, 2021
1ef67e9
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 26, 2021
ed36384
Formatting
adamkotwasinski Aug 26, 2021
34c11fe
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Aug 30, 2021
c01ded7
Fixes
adamkotwasinski Aug 30, 2021
4b267e8
Cleanup
adamkotwasinski Aug 30, 2021
73549a0
Cleanup
adamkotwasinski Aug 31, 2021
d3b549d
Cleanup
adamkotwasinski Sep 1, 2021
7b4ec98
Merge remote-tracking branch 'envoy/main' into kafka-mesh-public
adamkotwasinski Sep 9, 2021
57ff9c8
Docs fixes; inlined filter name
adamkotwasinski Sep 9, 2021
3b6b3cf
Remove well_known_names reference and other includes
adamkotwasinski Sep 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'envoy/main' into main
Signed-off-by: Adam Kotwasinski <adam.kotwasinski@gmail.com>
  • Loading branch information
adamkotwasinski committed Aug 4, 2021
commit df39dd2f54e787985dab0c1908108a151c0bc88f
25 changes: 13 additions & 12 deletions source/extensions/filters/network/kafka/mesh/command_handlers/BUILD
Original file line number Diff line number Diff line change
@@ -11,55 +11,56 @@ licenses(["notice"]) # Apache 2
envoy_extension_package()

envoy_cc_library(
name = "api_versions_lib",
name = "metadata_lib",
srcs = [
"api_versions.cc",
"metadata.cc",
],
hdrs = [
"api_versions.h",
"metadata.h",
],
tags = ["skip_on_windows"],
deps = [
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_request_parser_lib",
"//source/extensions/filters/network/kafka:kafka_response_parser_lib",
"//source/extensions/filters/network/kafka:tagged_fields_lib",
"//source/extensions/filters/network/kafka/mesh:abstract_command_lib",
"//source/extensions/filters/network/kafka/mesh:upstream_config_lib",
],
)

envoy_cc_library(
name = "metadata_lib",
name = "produce_lib",
srcs = [
"metadata.cc",
"produce.cc",
],
hdrs = [
"metadata.h",
"produce.h",
],
tags = ["skip_on_windows"],
deps = [
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_request_parser_lib",
"//source/extensions/filters/network/kafka:kafka_response_parser_lib",
"//source/extensions/filters/network/kafka/mesh:abstract_command_lib",
"//source/extensions/filters/network/kafka/mesh:upstream_config_lib",
"//source/extensions/filters/network/kafka/mesh:upstream_kafka_facade_lib",
],
)

envoy_cc_library(
name = "produce_lib",
name = "api_versions_lib",
srcs = [
"produce.cc",
"api_versions.cc",
],
hdrs = [
"produce.h",
"api_versions.h",
],
tags = ["skip_on_windows"],
deps = [
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_request_parser_lib",
"//source/extensions/filters/network/kafka:kafka_response_parser_lib",
"//source/extensions/filters/network/kafka:tagged_fields_lib",
"//source/extensions/filters/network/kafka/mesh:abstract_command_lib",
"//source/extensions/filters/network/kafka/mesh:upstream_kafka_facade_lib",
],
)

Original file line number Diff line number Diff line change
@@ -22,19 +22,20 @@ constexpr int16_t MAX_PRODUCE_SUPPORTED = PRODUCE_REQUEST_MAX_VERSION; /* Genera
constexpr int16_t MIN_METADATA_SUPPORTED = 1;
constexpr int16_t MAX_METADATA_SUPPORTED = METADATA_REQUEST_MAX_VERSION; /* Generated value. */

ApiVersionsRequestHolder::ApiVersionsRequestHolder(
AbstractRequestListener& filter, const std::shared_ptr<Request<ApiVersionsRequest>> request)
: BaseInFlightRequest{filter}, request_{request} {}
ApiVersionsRequestHolder::ApiVersionsRequestHolder(AbstractRequestListener& filter,
const RequestHeader request_header)
: BaseInFlightRequest{filter}, request_header_{request_header} {}

// Api Versions requests are immediately ready for answer (as they do not need to reach upstream).
void ApiVersionsRequestHolder::startProcessing() { notifyFilter(); }

// Because these requests can be trivially handled, they are okay to be sent downstream at any time.
// Because these requests can be trivially handled, the responses are okay to be sent downstream at
// any time.
bool ApiVersionsRequestHolder::finished() const { return true; }

AbstractResponseSharedPtr ApiVersionsRequestHolder::computeAnswer() const {
const auto& header = request_->request_header_;
const ResponseMetadata metadata = {header.api_key_, header.api_version_, header.correlation_id_};
const ResponseMetadata metadata = {request_header_.api_key_, request_header_.api_version_,
request_header_.correlation_id_};

const int16_t error_code = 0;
const ApiVersionsResponseKey produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED,
Original file line number Diff line number Diff line change
@@ -9,10 +9,14 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Api version requests are the first requests sent by Kafka clients to brokers.
* We send our customized response to fail clients that might be trying to accomplish something more
* than this filter supports.
*/
class ApiVersionsRequestHolder : public BaseInFlightRequest {
public:
ApiVersionsRequestHolder(AbstractRequestListener& filter,
const std::shared_ptr<Request<ApiVersionsRequest>> request);
ApiVersionsRequestHolder(AbstractRequestListener& filter, const RequestHeader request_header);

void startProcessing() override;

@@ -21,8 +25,8 @@ class ApiVersionsRequestHolder : public BaseInFlightRequest {
AbstractResponseSharedPtr computeAnswer() const override;

private:
// Original request.
const std::shared_ptr<Request<ApiVersionsRequest>> request_;
// Original request header.
const RequestHeader request_header_;
};

} // namespace Mesh
20 changes: 10 additions & 10 deletions source/extensions/filters/network/kafka/mesh/request_processor.cc
Original file line number Diff line number Diff line change
@@ -26,27 +26,22 @@ static void throwOnUnsupportedRequest(const std::string& reason, const RequestHe

void RequestProcessor::onMessage(AbstractRequestSharedPtr arg) {
switch (arg->request_header_.api_key_) {
case /* Produce */ 0:
case PRODUCE_REQUEST_API_KEY:
process(std::dynamic_pointer_cast<Request<ProduceRequest>>(arg));
break;
case /* Metadata */ 3:
case METADATA_REQUEST_API_KEY:
process(std::dynamic_pointer_cast<Request<MetadataRequest>>(arg));
break;
case /* ApiVersions */ 18:
case API_VERSIONS_REQUEST_API_KEY:
process(std::dynamic_pointer_cast<Request<ApiVersionsRequest>>(arg));
break;
default:
// We got something else than typical Produce request.
// Client sent a request we cannot handle right now.
throwOnUnsupportedRequest("unsupported (bad client API invoked?)", arg->request_header_);
break;
} // switch
}

// We got something that the parser could not handle.
void RequestProcessor::onFailedParse(RequestParseFailureSharedPtr arg) {
throwOnUnsupportedRequest("unknown", arg->request_header_);
}

void RequestProcessor::process(const std::shared_ptr<Request<ProduceRequest>> request) const {
auto res = std::make_shared<ProduceRequestHolder>(origin_, upstream_kafka_facade_, request);
origin_.onRequest(res);
@@ -58,10 +53,15 @@ void RequestProcessor::process(const std::shared_ptr<Request<MetadataRequest>> r
}

void RequestProcessor::process(const std::shared_ptr<Request<ApiVersionsRequest>> request) const {
auto res = std::make_shared<ApiVersionsRequestHolder>(origin_, request);
auto res = std::make_shared<ApiVersionsRequestHolder>(origin_, request->request_header_);
origin_.onRequest(res);
}

// We got something that the parser could not handle.
void RequestProcessor::onFailedParse(RequestParseFailureSharedPtr arg) {
throwOnUnsupportedRequest("unknown", arg->request_header_);
}

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
19 changes: 10 additions & 9 deletions test/extensions/filters/network/kafka/mesh/command_handlers/BUILD
Original file line number Diff line number Diff line change
@@ -12,36 +12,37 @@ licenses(["notice"]) # Apache 2
envoy_package()

envoy_extension_cc_test(
name = "api_versions_unit_test",
srcs = ["api_versions_unit_test.cc"],
name = "metadata_unit_test",
srcs = ["metadata_unit_test.cc"],
extension_names = ["envoy.filters.network.kafka_broker"],
tags = ["skip_on_windows"],
deps = [
"//source/extensions/filters/network/kafka/mesh/command_handlers:api_versions_lib",
"//source/extensions/filters/network/kafka/mesh/command_handlers:metadata_lib",
"//test/mocks/network:network_mocks",
"//test/mocks/stats:stats_mocks",
],
)

envoy_extension_cc_test(
name = "metadata_unit_test",
srcs = ["metadata_unit_test.cc"],
name = "produce_unit_test",
srcs = ["produce_unit_test.cc"],
extension_names = ["envoy.filters.network.kafka_broker"],
tags = ["skip_on_windows"],
deps = [
"//source/extensions/filters/network/kafka/mesh/command_handlers:metadata_lib",
"//source/extensions/filters/network/kafka/mesh/command_handlers:produce_lib",
"//test/mocks/network:network_mocks",
"//test/mocks/stats:stats_mocks",
],
)

envoy_extension_cc_test(
name = "produce_unit_test",
srcs = ["produce_unit_test.cc"],
name = "api_versions_unit_test",
srcs = ["api_versions_unit_test.cc"],
# This name needs to be changed after we have the mesh filter ready.
extension_names = ["envoy.filters.network.kafka_broker"],
tags = ["skip_on_windows"],
deps = [
"//source/extensions/filters/network/kafka/mesh/command_handlers:produce_lib",
"//source/extensions/filters/network/kafka/mesh/command_handlers:api_versions_lib",
"//test/mocks/network:network_mocks",
"//test/mocks/stats:stats_mocks",
],
Original file line number Diff line number Diff line change
@@ -20,10 +20,8 @@ TEST(ApiVersionsTest, shouldBeAlwaysReadyForAnswer) {
// given
MockAbstractRequestListener filter;
EXPECT_CALL(filter, onRequestReadyForAnswer());
const RequestHeader header = {0, 0, 0, absl::nullopt};
const ApiVersionsRequest data = {};
const auto message = std::make_shared<Request<ApiVersionsRequest>>(header, data);
ApiVersionsRequestHolder testee = {filter, message};
const RequestHeader header = {API_VERSIONS_REQUEST_API_KEY, 0, 0, absl::nullopt};
ApiVersionsRequestHolder testee = {filter, header};

// when, then - invoking should immediately notify the filter.
testee.startProcessing();
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ class RequestProcessorTest : public testing::Test {

TEST_F(RequestProcessorTest, ShouldProcessProduceRequest) {
// given
const RequestHeader header = {0, 0, 0, absl::nullopt};
const RequestHeader header = {PRODUCE_REQUEST_API_KEY, 0, 0, absl::nullopt};
const ProduceRequest data = {0, 0, {}};
const auto message = std::make_shared<Request<ProduceRequest>>(header, data);

@@ -63,7 +63,7 @@ TEST_F(RequestProcessorTest, ShouldProcessProduceRequest) {

TEST_F(RequestProcessorTest, ShouldProcessMetadataRequest) {
// given
const RequestHeader header = {3, 0, 0, absl::nullopt};
const RequestHeader header = {METADATA_REQUEST_API_KEY, 0, 0, absl::nullopt};
const MetadataRequest data = {absl::nullopt};
const auto message = std::make_shared<Request<MetadataRequest>>(header, data);

@@ -79,7 +79,7 @@ TEST_F(RequestProcessorTest, ShouldProcessMetadataRequest) {

TEST_F(RequestProcessorTest, ShouldProcessApiVersionsRequest) {
// given
const RequestHeader header = {18, 0, 0, absl::nullopt};
const RequestHeader header = {API_VERSIONS_REQUEST_API_KEY, 0, 0, absl::nullopt};
const ApiVersionsRequest data = {};
const auto message = std::make_shared<Request<ApiVersionsRequest>>(header, data);

@@ -95,7 +95,7 @@ TEST_F(RequestProcessorTest, ShouldProcessApiVersionsRequest) {

TEST_F(RequestProcessorTest, ShouldHandleUnsupportedRequest) {
// given
const RequestHeader header = {2, 0, 0, absl::nullopt};
const RequestHeader header = {LIST_OFFSET_REQUEST_API_KEY, 0, 0, absl::nullopt};
const ListOffsetRequest data = {0, {}};
const auto message = std::make_shared<Request<ListOffsetRequest>>(header, data);

You are viewing a condensed version of this merge commit. You can view the full changes here.