Skip to content

Commit

Permalink
Optimize exchange operator & Support mpp version (pingcap#6596)
Browse files Browse the repository at this point in the history
close pingcap#6620

Signed-off-by: Zhigao Tong
  • Loading branch information
solotzg committed Feb 1, 2023
1 parent 1e1cc12 commit dc9e4ff
Show file tree
Hide file tree
Showing 34 changed files with 823 additions and 373 deletions.
9 changes: 2 additions & 7 deletions dbms/src/Columns/ColumnsCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,8 @@ inline UInt64 ToBits64(const Int8 * bytes64)
}
#endif

#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
NO_INLINE NO_SANITIZE_ADDRESS NO_SANITIZE_THREAD
#else
ALWAYS_INLINE inline
#endif
static size_t
CountBytesInFilter(const UInt8 * filt, size_t start, size_t end)
ALWAYS_INLINE inline static size_t
CountBytesInFilter(const UInt8 * filt, size_t start, size_t end)
{
#if defined(__AVX2__)
size_t size = end - start;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class MPMCQueue
public:
using Status = MPMCQueueStatus;
using Result = MPMCQueueResult;
using SteadyClock = std::chrono::steady_clock;

explicit MPMCQueue(size_t capacity_)
: capacity(capacity_)
Expand Down Expand Up @@ -220,7 +219,8 @@ class MPMCQueue
}

private:
using TimePoint = std::chrono::time_point<SteadyClock>;
using SteadyClock = std::chrono::steady_clock;
using TimePoint = SteadyClock::time_point;
using WaitingNode = MPMCQueueDetail::WaitingNode;

void notifyAll()
Expand Down
19 changes: 10 additions & 9 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,16 @@ namespace DB
F(type_mpp_establish_conn, {{"type", "mpp_tunnel"}}), \
F(type_mpp_establish_conn_local, {{"type", "mpp_tunnel_local"}}), \
F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}})) \
M(tiflash_exchange_data_bytes, "Total bytes of exchange operator", Counter, \
F(type_hash_original_all, {"type", "hash_original_all"}), \
F(type_hash_none_remote, {"type", "hash_none_remote"}), \
F(type_hash_none_local, {"type", "hash_none_local"}), \
F(type_hash_lz4, {"type", "hash_lz4"}), \
F(type_hash_zstd, {"type", "hash_zstd"}), \
F(type_broadcast_passthrough_original_all, {"type", "broadcast_passthrough_original_all"}), \
F(type_broadcast_passthrough_none_local, {"type", "broadcast_passthrough_none_local"}), \
) \
M(tiflash_exchange_data_bytes, "Total bytes sent by exchange operators", Counter, \
F(type_hash_original, {"type", "hash_original"}), /*the original data size by hash exchange*/ \
F(type_hash_none_compression_remote, {"type", "hash_none_compression_remote"}), /*the remote exchange data size by hash partition with no compression*/\
F(type_hash_none_compression_local, {"type", "hash_none_compression_local"}), /*the local exchange data size by hash partition with no compression*/ \
F(type_hash_lz4_compression, {"type", "hash_lz4_compression"}), /*the exchange data size by hash partition with lz4 compression*/ \
F(type_hash_zstd_compression, {"type", "hash_zstd_compression"}), /*the exchange data size by hash partition with zstd compression*/ \
F(type_broadcast_passthrough_original, {"type", "broadcast_passthrough_original"}), /*the original exchange data size by broadcast/passthough*/ \
F(type_broadcast_passthrough_none_compression_local, {"type", "broadcast_passthrough_none_compression_local"}), /*the local exchange data size by broadcast/passthough with no compression*/ \
F(type_broadcast_passthrough_none_compression_remote, {"type", "broadcast_passthrough_none_compression_remote"}), /*the remote exchange data size by broadcast/passthough with no compression*/ \
) \
M(tiflash_schema_version, "Current version of tiflash cached schema", Gauge) \
M(tiflash_schema_applying, "Whether the schema is applying or not (holding lock)", Gauge) \
M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_diff, {"type", "diff"}), \
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/ReadIndexStressTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ ReadIndexStressTest::TimeCost ReadIndexStressTest::run(
const auto & kvstore = *tmt.getKVStore();
size_t timeout_ms = 10 * 1000;
const auto wrap_time_cost = [&](std::function<void()> && f) {
auto start_time = Clock::now();
auto start_time = std::chrono::steady_clock::now();
f();
auto end_time = Clock ::now();
auto end_time = std::chrono::steady_clock ::now();
auto time_cost = std::chrono::duration_cast<TimeCost>(end_time - start_time);
LOG_INFO(logger, "time cost {}", time_cost);
return time_cost;
Expand Down
Loading

0 comments on commit dc9e4ff

Please sign in to comment.