forked from psarna/seastar
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry policies #19
Closed
Closed
Retry policies #19
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Implemented Terraform configuration to provision Kafka cluster locally using Docker. Network IP address and number of Kafka brokers are configurable by user. Decided to write Terraform configuration from scratch, as the other already available (Docker Compose) solution was incorrect. I chose Terraform as it could be also used in the future to model other test deployments (e.g. AWS).
…cal-final Kafka: Local Kafka deployment for development testing.
Merge changes from origin into the fork (branch zpp_kafka)
This change adds a basic, low level network connection management for the Kafka client. Later, this should be encapsulated in a TCP Client class to work on the Kafka messages abstraction. So far the tests are also somewhat makeshift - ideally, they have to be separated into both unit and integration tests, with the former not rellying on existing Kafka brokers.
Kafka TCP connection class
Implement classes for parsing basic Kafka protocol primitives, as well as ApiVersions and Metadata request/response messages.
Implement 32-bit zigzag varint parsing, as defined in the Kafka protocol specification.
Implement structures storing RECORDS type (as defined in Kafka protocol specification) and parsing of a single record (only version 2).
Implement parsing of records batch from RECORDS Kafka protocol type (version 2).
Implemented parsing of RECORDS type (version 2), as defined in the Kafka protocol specification.
Implement parsing of Produce request from Kafka protocol.
Implement parsing of Produce response from Kafka protocol.
Implement kafka_connection class which provides functionality to send Kafka protocol messages without manual serialization or deserialization. Add support for API versioning as kafka_connection asks the broker for ApiVersions and serializes, deserializes next messages based on broker versions.
Added timeout functionality to tcp_connection in connect, read, write. Updated kafka_connection and kafka_producer with timeout support.
Implement tcp_connection_exception which is thrown when connection is disconnected before reading the response.
In the case of network error or parsing error, send now catches the exception, translates it into appropriate error code and returns response message with this code.
This change adds a connection manager unit to keep track of open connections and allow for a fast access by the producer. With that we can now manage multiple brokers and read their addresses from metadata requests. It also introduces a dummy partitioner for further development. Connection Pool + Multibroker demo
Implement classes to support parsing of error codes and list of error codes.
Implement retry_helper: a future utility class which allows retries of futures. Retry count is capped and controlled by constructor argument. Retries are started with backoff time, using exponential backoff with jitter strategy.
Added metadata_manager class.
This change allows kafka_connection to be used in non-sequential way, allowing to "queue" multiple messages without waiting for the previous to have ended. Semaphores with count=1 are used to implement queuing of sends and receives, taking advantage of semaphore FIFO functionality. It is crucial to preserve the ordering, so that receive reads the correct response.
The change allows use of connection_manager in non-sequential manner.
Implement batching of messages, initial error handling (via retries). A manual flush is currently needed to send the messages in producer (no automatic flushing).
Implement "framework" for E2E Kafka producer tests and a few basic producer tests.
psarna#17) * Introduces partitioner abstract base class while preserving old behaviour and existing partitioner class. Introduces round robin partitioner as derived class. * Replaced partitioner* type with std::unique_ptr<partitioner> inside producer. Replaced atomic uint with normal uint. * Added hashing in case of non-zero length key.
avelanarius
pushed a commit
that referenced
this pull request
Apr 29, 2021
…o_with Fixes failures in debug mode: ``` $ build/debug/tests/unit/closeable_test -l all -t deferred_close_test WARNING: debug mode. Not for benchmarking or production random-seed=3064133628 Running 1 test case... Entering test module "../../tests/unit/closeable_test.cc" ../../tests/unit/closeable_test.cc(0): Entering test case "deferred_close_test" ../../src/testing/seastar_test.cc(43): info: check true has passed ==9449==WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false positives in some cases! terminate called after throwing an instance of 'seastar::broken_promise' what(): broken promise ==9449==WARNING: ASan is ignoring requested __asan_handle_no_return: stack top: 0x7fbf1f49f000; bottom 0x7fbf40971000; size: 0xffffffffdeb2e000 (-558702592) False positive error reports may follow For details see google/sanitizers#189 ================================================================= ==9449==AddressSanitizer CHECK failed: ../../../../libsanitizer/asan/asan_thread.cpp:356 "((ptr[0] == kCurrentStackFrameMagic)) != (0)" (0x0, 0x0) #0 0x7fbf45f39d0b (/lib64/libasan.so.6+0xb3d0b) #1 0x7fbf45f57d4e (/lib64/libasan.so.6+0xd1d4e) #2 0x7fbf45f3e724 (/lib64/libasan.so.6+0xb8724) #3 0x7fbf45eb3e5b (/lib64/libasan.so.6+0x2de5b) #4 0x7fbf45eb51e8 (/lib64/libasan.so.6+0x2f1e8) #5 0x7fbf45eb7694 (/lib64/libasan.so.6+0x31694) #6 0x7fbf45f39398 (/lib64/libasan.so.6+0xb3398) #7 0x7fbf45f3a00b in __asan_report_load8 (/lib64/libasan.so.6+0xb400b) #8 0xfe6d52 in bool __gnu_cxx::operator!=<dl_phdr_info*, std::vector<dl_phdr_info, std::allocator<dl_phdr_info> > >(__gnu_cxx::__normal_iterator<dl_phdr_info*, std::vector<dl_phdr_info, std::allocator<dl_phdr_info> > > const&, __gnu_cxx::__normal_iterator<dl_phdr_info*, std::vector<dl_phdr_info, std::allocator<dl_phdr_info> > > const&) /usr/include/c++/10/bits/stl_iterator.h:1116 #9 0xfe615c in dl_iterate_phdr ../../src/core/exception_hacks.cc:121 #10 0x7fbf44bd1810 in _Unwind_Find_FDE (/lib64/libgcc_s.so.1+0x13810) #11 0x7fbf44bcd897 (/lib64/libgcc_s.so.1+0xf897) #12 0x7fbf44bcea5f (/lib64/libgcc_s.so.1+0x10a5f) #13 0x7fbf44bcefd8 in _Unwind_RaiseException (/lib64/libgcc_s.so.1+0x10fd8) #14 0xfe6281 in _Unwind_RaiseException ../../src/core/exception_hacks.cc:148 #15 0x7fbf457364bb in __cxa_throw (/lib64/libstdc++.so.6+0xaa4bb) #16 0x7fbf45e10a21 (/lib64/libboost_unit_test_framework.so.1.73.0+0x1aa21) #17 0x7fbf45e20fe0 in boost::execution_monitor::execute(boost::function<int ()> const&) (/lib64/libboost_unit_test_framework.so.1.73.0+0x2afe0) #18 0x7fbf45e21094 in boost::execution_monitor::vexecute(boost::function<void ()> const&) (/lib64/libboost_unit_test_framework.so.1.73.0+0x2b094) #19 0x7fbf45e43921 in boost::unit_test::unit_test_monitor_t::execute_and_translate(boost::function<void ()> const&, unsigned long) (/lib64/libboost_unit_test_framework.so.1.73.0+0x4d921) #20 0x7fbf45e5eae1 (/lib64/libboost_unit_test_framework.so.1.73.0+0x68ae1) #21 0x7fbf45e5ed31 (/lib64/libboost_unit_test_framework.so.1.73.0+0x68d31) psarna#22 0x7fbf45e2e547 in boost::unit_test::framework::run(unsigned long, bool) (/lib64/libboost_unit_test_framework.so.1.73.0+0x38547) psarna#23 0x7fbf45e43618 in boost::unit_test::unit_test_main(bool (*)(), int, char**) (/lib64/libboost_unit_test_framework.so.1.73.0+0x4d618) psarna#24 0x44798d in seastar::testing::entry_point(int, char**) ../../src/testing/entry_point.cc:77 psarna#25 0x4134b5 in main ../../include/seastar/testing/seastar_test.hh:65 psarna#26 0x7fbf44a1b1e1 in __libc_start_main (/lib64/libc.so.6+0x281e1) psarna#27 0x4133dd in _start (/home/bhalevy/dev/seastar/build/debug/tests/unit/closeable_test+0x4133dd) ``` Signed-off-by: Benny Halevy <bhalevy@scylladb.com> Message-Id: <20210406100911.12278-1-bhalevy@scylladb.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.