-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize data size of Broadcast
/ Passthrough
exchange operator
#6880
Conversation
Signed-off-by: Zhigao Tong <tongzhigao@pingcap.com>
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-integration-test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Others LGTM
assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter"); | ||
} | ||
|
||
if (exchange_type == tipb::ExchangeType::Broadcast) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about using template?
template <class ExchangeWriterPtr, bool is_broadcast>
class BroadcastOrPassThroughWriter : public DAGResponseWriter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also an optional way.
if (!expected_types.empty()) | ||
{ | ||
for (auto && block : blocks) | ||
assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
if (!expected_types.empty()) | |
{ | |
for (auto && block : blocks) | |
assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter"); | |
} | |
#ifndef NDEBUG | |
if (!expected_types.empty()) | |
{ | |
for (auto && block : blocks) | |
assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter"); | |
} | |
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertBlockSchema
is always neccesary when using compression because compression codec process will not check the expected types.
auto remote_tunnel_tracked_packet = local_tunnel_cnt == tunnel_cnt ? nullptr : ori_tracked_packet; | ||
|
||
if (compression_method != CompressionMethod::NONE) | ||
remote_tunnel_tracked_packet = MPPTunnelSetHelper::ToCompressedPacket(ori_tracked_packet, version, compression_method); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remote_tunnel_tracked_packet = MPPTunnelSetHelper::ToCompressedPacket(ori_tracked_packet, version, compression_method); | |
remote_tunnel_tracked_packet = MPPTunnelSetHelper::ToCompressedPacket(remote_tunnel_tracked_packet, version, compression_method); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remote_tunnel_tracked_packet
may be null if local_tunnel_cnt == tunnel_cnt
. But I will refine this part and make it clear.
{ | ||
GET_METRIC(tiflash_exchange_data_bytes, type_hash_none_compression_local).Increment(sz); | ||
local_cnt++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local_cnt++; | |
++local_cnt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also an optional way, but there is no different for modern compiler.
} | ||
else | ||
{ | ||
GET_METRIC(tiflash_exchange_data_bytes, type_hash_none_compression_remote).Increment(sz); | ||
remote_cnt++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remote_cnt++; | |
++remote_cnt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
static void broadcastOrPassThroughWriteImpl( | ||
const size_t tunnel_cnt, | ||
const size_t local_tunnel_cnt, // can be 0 for PassThrough writer | ||
size_t ori_packet_bytes, // original data packet size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems unnecessary to pass ori_packet_bytes
.
Why not auto local_tracked_packet_bytes = local_tracked_packet? local_tracked_packet->getPacket().ByteSizeLong() : 0;
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ori_packet_bytes
is used for present uncompressed size of data packet. Especially when local_tunnel_cnt
is 0 and local_tracked_packet
is NULL. It may occur when using Passthrough writer.
void updatePartitionWriterMetrics(CompressionMethod method, size_t original_size, size_t sz, bool is_local) | ||
template <bool is_broadcast, typename FuncIsLocalTunnel, typename FuncWriteToTunnel> | ||
static void broadcastOrPassThroughWriteImpl( | ||
const size_t tunnel_cnt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass remote_tunnel_cnt
looks more symmetrical.
static void broadcastOrPassThroughWriteImpl(
const size_t local_tunnel_cnt,
const size_t remote_tunnel_cnt,
TrackedMppDataPacketPtr && local_tracked_packet,
TrackedMppDataPacketPtr && remote_tracked_packet,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also an optional way.
/merge |
@solotzg: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: 3b778c3
|
/merge |
@solotzg: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: ccb115c
|
/unhold |
What problem does this PR solve?
Issue Number: ref #7084
after #6596
What is changed and how it works?
StorageDisaggregated
, use latest mpp version for task meta.Benchmark
ENV
sender.CompressionMode
for all ExchangeTypeTest Broadcast Exchange With Compression
Make broadcast exchange become bottleneck forcily
tidb_broadcast_join_threshold_count
: 10240 -> 1024000tidb_broadcast_join_threshold_size
: 104857600 -> 10485760000Check List
Tests
Side effects
Documentation
Release note