-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prezent na święta: Batching #14
Conversation
This change allows kafka_connection to be used in non-sequential way, allowing to "queue" multiple messages without waiting for the previous to have ended. Semaphores with count=1 are used to implement queuing of sends and receives, taking advantage of semaphore FIFO functionality. It is crucial to preserve the ordering, so that receive reads the correct response.
The change allows use of connection_manager in non-sequential manner.
dd172aa
to
ada9c5e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, I left some questions
src/kafka/producer/sender.cc
Outdated
sender::connection_id sender::broker_for_topic_partition(const std::string& topic, int32_t partition_index) { | ||
for (const auto& current_topic : *_metadata_manager->get_metadata()._topics) { | ||
if (*current_topic._name != topic) { | ||
continue; | ||
} | ||
for (const auto& current_partition : *current_topic._partitions) { | ||
if (*current_partition._partition_index == partition_index) { | ||
return broker_for_id(*current_partition._leader_id); | ||
} | ||
} | ||
} | ||
return {}; | ||
} | ||
|
||
bool sender::is_broker_for_topic_partition_known(const std::string &topic, int32_t partition_index) { | ||
for (const auto& current_topic : *_metadata_manager->get_metadata()._topics) { | ||
if (*current_topic._name != topic) { | ||
continue; | ||
} | ||
if (current_topic._error_code != error::kafka_error_code::NONE) { | ||
continue; | ||
} | ||
for (const auto& current_partition : *current_topic._partitions) { | ||
if (*current_partition._partition_index == partition_index | ||
&& current_partition._error_code == error::kafka_error_code::NONE) { | ||
return true; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code for these two methods is almost the same. Isn't there a way to merge them into one? Especially given, that the only difference is the one if
statement with kafka_error_code
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should probably be moved to metadata_response
and also optimized (sort topics, partitions and search for them in log N
not linearly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not requiring updating the time complexity as of now (although leave a TODO in the code), but I'd say it's better to have this code better organized right now than submit nearly duplicated methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor tweaks requested
ada9c5e
to
84ddbd9
Compare
Implement batching of messages, initial error handling (via retries). A manual flush is currently needed to send the messages in producer (no automatic flushing).
84ddbd9
to
1f923ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
N O I C E
…o_with Fixes failures in debug mode: ``` $ build/debug/tests/unit/closeable_test -l all -t deferred_close_test WARNING: debug mode. Not for benchmarking or production random-seed=3064133628 Running 1 test case... Entering test module "../../tests/unit/closeable_test.cc" ../../tests/unit/closeable_test.cc(0): Entering test case "deferred_close_test" ../../src/testing/seastar_test.cc(43): info: check true has passed ==9449==WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false positives in some cases! terminate called after throwing an instance of 'seastar::broken_promise' what(): broken promise ==9449==WARNING: ASan is ignoring requested __asan_handle_no_return: stack top: 0x7fbf1f49f000; bottom 0x7fbf40971000; size: 0xffffffffdeb2e000 (-558702592) False positive error reports may follow For details see google/sanitizers#189 ================================================================= ==9449==AddressSanitizer CHECK failed: ../../../../libsanitizer/asan/asan_thread.cpp:356 "((ptr[0] == kCurrentStackFrameMagic)) != (0)" (0x0, 0x0) #0 0x7fbf45f39d0b (/lib64/libasan.so.6+0xb3d0b) #1 0x7fbf45f57d4e (/lib64/libasan.so.6+0xd1d4e) #2 0x7fbf45f3e724 (/lib64/libasan.so.6+0xb8724) #3 0x7fbf45eb3e5b (/lib64/libasan.so.6+0x2de5b) #4 0x7fbf45eb51e8 (/lib64/libasan.so.6+0x2f1e8) #5 0x7fbf45eb7694 (/lib64/libasan.so.6+0x31694) #6 0x7fbf45f39398 (/lib64/libasan.so.6+0xb3398) #7 0x7fbf45f3a00b in __asan_report_load8 (/lib64/libasan.so.6+0xb400b) #8 0xfe6d52 in bool __gnu_cxx::operator!=<dl_phdr_info*, std::vector<dl_phdr_info, std::allocator<dl_phdr_info> > >(__gnu_cxx::__normal_iterator<dl_phdr_info*, std::vector<dl_phdr_info, std::allocator<dl_phdr_info> > > const&, __gnu_cxx::__normal_iterator<dl_phdr_info*, std::vector<dl_phdr_info, std::allocator<dl_phdr_info> > > const&) /usr/include/c++/10/bits/stl_iterator.h:1116 #9 0xfe615c in dl_iterate_phdr ../../src/core/exception_hacks.cc:121 #10 0x7fbf44bd1810 in _Unwind_Find_FDE (/lib64/libgcc_s.so.1+0x13810) #11 0x7fbf44bcd897 (/lib64/libgcc_s.so.1+0xf897) #12 0x7fbf44bcea5f (/lib64/libgcc_s.so.1+0x10a5f) #13 0x7fbf44bcefd8 in _Unwind_RaiseException (/lib64/libgcc_s.so.1+0x10fd8) #14 0xfe6281 in _Unwind_RaiseException ../../src/core/exception_hacks.cc:148 #15 0x7fbf457364bb in __cxa_throw (/lib64/libstdc++.so.6+0xaa4bb) #16 0x7fbf45e10a21 (/lib64/libboost_unit_test_framework.so.1.73.0+0x1aa21) #17 0x7fbf45e20fe0 in boost::execution_monitor::execute(boost::function<int ()> const&) (/lib64/libboost_unit_test_framework.so.1.73.0+0x2afe0) #18 0x7fbf45e21094 in boost::execution_monitor::vexecute(boost::function<void ()> const&) (/lib64/libboost_unit_test_framework.so.1.73.0+0x2b094) #19 0x7fbf45e43921 in boost::unit_test::unit_test_monitor_t::execute_and_translate(boost::function<void ()> const&, unsigned long) (/lib64/libboost_unit_test_framework.so.1.73.0+0x4d921) #20 0x7fbf45e5eae1 (/lib64/libboost_unit_test_framework.so.1.73.0+0x68ae1) #21 0x7fbf45e5ed31 (/lib64/libboost_unit_test_framework.so.1.73.0+0x68d31) psarna#22 0x7fbf45e2e547 in boost::unit_test::framework::run(unsigned long, bool) (/lib64/libboost_unit_test_framework.so.1.73.0+0x38547) psarna#23 0x7fbf45e43618 in boost::unit_test::unit_test_main(bool (*)(), int, char**) (/lib64/libboost_unit_test_framework.so.1.73.0+0x4d618) psarna#24 0x44798d in seastar::testing::entry_point(int, char**) ../../src/testing/entry_point.cc:77 psarna#25 0x4134b5 in main ../../include/seastar/testing/seastar_test.hh:65 psarna#26 0x7fbf44a1b1e1 in __libc_start_main (/lib64/libc.so.6+0x281e1) psarna#27 0x4133dd in _start (/home/bhalevy/dev/seastar/build/debug/tests/unit/closeable_test+0x4133dd) ``` Signed-off-by: Benny Halevy <bhalevy@scylladb.com> Message-Id: <20210406100911.12278-1-bhalevy@scylladb.com>
kafka_connection
andconnection_manager
to allow parallel requests.