-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(stream): optimize native pg sink #19688
perf(stream): optimize native pg sink #19688
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
47a0411
to
1d408c5
Compare
e01f6d2
to
0062ecd
Compare
a195d55
to
068fee0
Compare
068fee0
to
bfa8d33
Compare
e78d259
to
2e302e9
Compare
Bump |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM, thanks!
src/connector/src/sink/postgres.rs
Outdated
let row_parameters: String = pk_indices | ||
.iter() | ||
.map(|j| { | ||
format!( | ||
"{} = ${}", | ||
schema.fields()[*j].name, | ||
i * number_of_pk + j + 1 | ||
) | ||
}) | ||
.collect_vec() | ||
.join(" AND "); | ||
format!("({row_parameters})") | ||
}) | ||
.collect_vec() | ||
.join(" AND "); | ||
.join("OR"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use PK In (...)
instead of PK = xxx or PK = yyy
which is more compacted and easy to be recognized by the external system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve later. Merge to let user try it. #19913
2e302e9
to
81f651f
Compare
bfa8d33
to
4c34456
Compare
59a512d
to
c58d906
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Just measure the throughput for both via grafana.
The performance improvement of unoptimized to optimized native pg sink went from 30r/s to 10K r/s.
The performance is around the same as JDBC. LHS is for native pg sink. RHS is for jdbc sink. Toggle the screenshot below.
screenshot
Implementation details:
updateInsert
handling path, since compaction will normalizzeupdateInsert
toInsert
.LogSinker
instead ofSinkWriter
to avoid redundant flushes to the log store.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.