Skip to content

Commit 45d3da5

Browse files
baomingyuvernedeng
authored andcommitted
[INLONG-4332][Manager] Support ClickHouse load node (apache#4333)
1 parent 16bd344 commit 45d3da5

File tree

4 files changed

+34
-13
lines changed

4 files changed

+34
-13
lines changed

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java

+30-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.inlong.manager.common.enums.SinkType;
2525
import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
2626
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
27+
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
2728
import org.apache.inlong.manager.common.pojo.sink.hbase.HbaseSinkResponse;
2829
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
2930
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
@@ -36,6 +37,7 @@
3637
import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
3738
import org.apache.inlong.sort.protocol.node.format.Format;
3839
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
40+
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
3941
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
4042
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
4143
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
@@ -76,6 +78,8 @@ public static LoadNode createLoadNode(SinkResponse sinkResponse) {
7678
return createLoadNode((HbaseSinkResponse) sinkResponse);
7779
case POSTGRES:
7880
return createLoadNode((PostgresSinkResponse) sinkResponse);
81+
case CLICKHOUSE:
82+
return createLoadNode((ClickHouseSinkResponse) sinkResponse);
7983
default:
8084
throw new IllegalArgumentException(
8185
String.format("Unsupported sinkType=%s to create loadNode", sinkType));
@@ -227,6 +231,7 @@ public static HbaseLoadNode createLoadNode(HbaseSinkResponse hbaseSinkResponse)
227231

228232
/**
229233
* create postgres load node
234+
*
230235
* @param postgresSinkResponse postgresSinkResponse
231236
* @return postgres load node
232237
*/
@@ -247,8 +252,31 @@ public static PostgresLoadNode createLoadNode(PostgresSinkResponse postgresSinkR
247252
postgresSinkResponse.getDbName() + "." + postgresSinkResponse.getTableName(),
248253
postgresSinkResponse.getPrimaryKey());
249254
}
250-
251-
/**f
255+
256+
/**
257+
* create clickHouse load node
258+
*
259+
* @param clickHouseSinkResponse clickHouseSinkResponse
260+
* @return
261+
*/
262+
public static ClickHouseLoadNode createLoadNode(ClickHouseSinkResponse clickHouseSinkResponse) {
263+
List<SinkFieldResponse> sinkFieldResponses = clickHouseSinkResponse.getFieldList();
264+
String name = clickHouseSinkResponse.getSinkName();
265+
List<FieldInfo> fields = sinkFieldResponses.stream()
266+
.map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse,
267+
name))
268+
.collect(Collectors.toList());
269+
List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
270+
return new ClickHouseLoadNode(clickHouseSinkResponse.getSinkName(),
271+
clickHouseSinkResponse.getSinkName(),
272+
fields, fieldRelationShips, null, null, 1,
273+
null, clickHouseSinkResponse.getTableName(),
274+
clickHouseSinkResponse.getJdbcUrl(),
275+
clickHouseSinkResponse.getUsername(),
276+
clickHouseSinkResponse.getPassword());
277+
}
278+
279+
/**
252280
* Parse information field of data sink.
253281
*/
254282
public static List<FieldRelationShip> parseSinkFields(List<SinkFieldResponse> sinkFieldResponses, String sinkName) {

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ public class ClickHouseLoadNode extends LoadNode implements Serializable {
6161
@Nonnull
6262
private String password;
6363

64-
@JsonProperty("parFields")
65-
List<FieldInfo> partitionFields;
66-
6764
@JsonCreator
6865
public ClickHouseLoadNode(@JsonProperty("id") String id,
6966
@JsonProperty("name") String name,
@@ -76,14 +73,12 @@ public ClickHouseLoadNode(@JsonProperty("id") String id,
7673
@Nonnull @JsonProperty("tableName") String tableName,
7774
@Nonnull @JsonProperty("url") String url,
7875
@Nonnull @JsonProperty("userName") String userName,
79-
@Nonnull @JsonProperty("passWord") String password,
80-
@JsonProperty("parFields") List<FieldInfo> partitionFields) {
76+
@Nonnull @JsonProperty("passWord") String password) {
8177
super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
8278
this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
8379
this.url = Preconditions.checkNotNull(url, "url is null");
8480
this.userName = Preconditions.checkNotNull(userName, "userName is null");
8581
this.password = Preconditions.checkNotNull(password, "password is null");
86-
this.partitionFields = partitionFields;
8782
}
8883

8984
@Override
@@ -110,7 +105,7 @@ public String getPrimaryKey() {
110105

111106
@Override
112107
public List<FieldInfo> getPartitionFields() {
113-
return partitionFields;
108+
return super.getPartitionFields();
114109
}
115110

116111
}

inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public Node getTestObject() {
4444
"ck_demo",
4545
"jdbc:clickhouse://localhost:8023/default",
4646
"root",
47-
"root",
48-
null);
47+
"root");
4948
}
5049
}

inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ private ClickHouseLoadNode buildClickHouseLoadNode(String id) {
7979
"ck_demo",
8080
"jdbc:clickhouse://localhost:8123/demo",
8181
"default",
82-
"",
83-
null);
82+
"");
8483

8584
}
8685

0 commit comments

Comments
 (0)