Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector-node): support stream chunk payload in connector node #8548

Merged
merged 35 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
39978e1
feat(java-binding): support java binding on stream chunk
wenym1 Mar 13, 2023
6486af4
rename the previous general iterator and add demo and ci for data chu…
wenym1 Mar 14, 2023
ddbcfd9
fix license
wenym1 Mar 14, 2023
c4a6e9d
fix license again
wenym1 Mar 14, 2023
fc324f9
add java rpc code
yufansong Mar 14, 2023
423b64f
add sink observer case: streamchunk payload
yufansong Mar 14, 2023
2308d72
move demo, anmotation remote.rs and obeserver.java
yufansong Mar 15, 2023
5cfd57b
rename iterator to HummockJavaBindingIterator
wenym1 Mar 15, 2023
faa566e
Merge branch 'main' into yiming/stream-chunk-java-binding
wenym1 Mar 15, 2023
3c2489d
Merge branch 'yiming/stream-chunk-java-binding' into yufan/stream-chunk
wenym1 Mar 15, 2023
d169f13
feat(connector-node): specify sink payload format in start sink
wenym1 Mar 16, 2023
dd90d72
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
4c78b31
fix ci
wenym1 Mar 16, 2023
a0c2260
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
2d063a8
impl Closeable for iterator and sink row
wenym1 Mar 16, 2023
95e53e3
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
79f829e
update typescript
wenym1 Mar 16, 2023
fc8d6b1
add license
wenym1 Mar 16, 2023
7dc1a84
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
d0e8c97
complete rpc with stream chunk
wenym1 Mar 16, 2023
481335e
add log and initialize stream chunk iter
wenym1 Mar 16, 2023
e0c636c
Merge branch 'main' into yiming/set-sink-payload-format
wenym1 Mar 16, 2023
ffb5313
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 16, 2023
ad9bdc0
implement closure for column value getter
wenym1 Mar 16, 2023
eee1f0a
Merge branch 'main' into yiming/set-sink-payload-format
wenym1 Mar 17, 2023
a8f4915
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 17, 2023
f0db319
Merge branch 'main' into yiming/set-sink-payload-format
wenym1 Mar 17, 2023
51888ea
Merge branch 'yiming/set-sink-payload-format' into yufan/stream-chunk
wenym1 Mar 17, 2023
ae6beb1
update proto
wenym1 Mar 17, 2023
19f2ba5
enable stream payload in e2e ci
wenym1 Mar 17, 2023
ae2b845
fix(conector-node): do not store sink row inside upsert iceberg sink
wenym1 Mar 17, 2023
c726631
Merge branch 'yiming/fix-iceberg-sink-row-leak' into yufan/stream-chunk
wenym1 Mar 17, 2023
e6ace50
Merge branch 'main' into yiming/fix-iceberg-sink-row-leak
wenym1 Mar 17, 2023
19bc205
Merge branch 'yiming/fix-iceberg-sink-row-leak' into yufan/stream-chunk
wenym1 Mar 17, 2023
7a6509d
update proto
wenym1 Mar 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ci/scripts/e2e-iceberg-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ echo "--- Download artifacts"
mkdir -p target/debug
buildkite-agent artifact download risingwave-"$profile" target/debug/
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
mv target/debug/risingwave-"$profile" target/debug/risingwave
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so

export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
Expand Down
5 changes: 5 additions & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ echo "--- Download artifacts"
mkdir -p target/debug
buildkite-agent artifact download risingwave-"$profile" target/debug/
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
mv target/debug/risingwave-"$profile" target/debug/risingwave
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so

export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
Expand Down
4 changes: 4 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ echo "--- Download artifacts"
mkdir -p target/debug
buildkite-agent artifact download risingwave-"$profile" target/debug/
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
mv target/debug/risingwave-"$profile" target/debug/risingwave
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so

export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug


echo "--- Download connector node package"
Expand Down
104 changes: 103 additions & 1 deletion dashboard/proto/gen/connector_service.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ RUN rustup self update \

RUN cargo fetch

RUN cargo build -p risingwave_cmd_all --release --features "static-link static-log-level" && \
RUN cargo build -p risingwave_cmd_all -p risingwave_java_binding --release --features "static-link static-log-level" && \
mkdir -p /risingwave/bin && mv /risingwave/target/release/risingwave /risingwave/bin/ && \
mkdir -p /risingwave/lib && mv /risingwave/target/release/librisingwave_java_binding.so /risingwave/lib && \
cargo clean

RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true && \
Expand All @@ -47,10 +48,13 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certi

FROM image-base as risingwave
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
RUN mkdir -p /risingwave/bin/connector-node
RUN mkdir -p /risingwave/bin/connector-node && mkdir -p /risingwave/lib
COPY --from=builder /risingwave/bin/risingwave /risingwave/bin/risingwave
COPY --from=builder /risingwave/bin/connector-node /risingwave/bin/connector-node
COPY --from=builder /risingwave/ui /risingwave/ui
COPY --from=builder /risingwave/lib/librisingwave_java_binding.so /risingwave/lib/librisingwave_java_binding.so
# Set java.library.path env to /risingwave/lib
ENV RW_JAVA_BINDING_LIB_PATH /risingwave/lib
# Set default playground mode to docker-playground profile
ENV PLAYGROUND_PROFILE docker-playground
# Set default dashboard UI to local path instead of github proxy
Expand Down
2 changes: 1 addition & 1 deletion java/connector-node/assembly/scripts/start-service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ if [ -z "${port}" ]; then
port=$PORT
fi

java -classpath "${DIR}/libs/*" $MAIN --port ${port}
java -classpath "${DIR}/libs/*" -Djava.library.path="${RW_JAVA_BINDING_LIB_PATH}" $MAIN --port ${port}
4 changes: 4 additions & 0 deletions java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<groupId>com.risingwave.java</groupId>
<artifactId>proto</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>java-binding</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>connector-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.*;
import com.risingwave.connector.deserializer.StreamChunkDeserializer;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.metrics.MonitoredRowIterator;
import com.risingwave.proto.ConnectorServiceProto;
Expand Down Expand Up @@ -202,6 +203,9 @@ private void bindSink(SinkConfig sinkConfig, ConnectorServiceProto.SinkPayloadFo
case JSON:
deserializer = new JsonDeserializer(tableSchema);
break;
case STREAM_CHUNK:
deserializer = new StreamChunkDeserializer(tableSchema);
break;
}
ConnectorNodeMetrics.incActiveConnections(sinkConfig.getConnectorType(), "node1");
}
Expand Down
Loading