Skip to content

Commit 6137f91

Browse files
huyuanfeng2018bruceneenhl
authored andcommitted
[INLONG-5074][Sort] KafkaExtractNode support more StartupMode (apache#5079)
1 parent a215cd3 commit 6137f91

File tree

19 files changed

+176
-31
lines changed

19 files changed

+176
-31
lines changed

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaOffset.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ public enum KafkaOffset {
2828

2929
EARLIEST("earliest"),
3030
LATEST("latest"),
31+
SPECIFIC("specific"),
3132
NONE("none");
3233

34+
3335
@Getter
3436
private final String name;
3537

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSource.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,12 @@ public class KafkaSource extends StreamSource {
5858
private String byteSpeedLimit;
5959

6060
@ApiModelProperty(value = "Topic partition offset",
61-
notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
62-
private String topicPartitionOffset;
61+
notes = "For example,'partition:0,offset:42;partition:1,offset:300' "
62+
+ "indicates offset 42 for partition 0 and offset 300 for partition 1.")
63+
private String partitionOffsets;
6364

6465
@ApiModelProperty(value = "The strategy of auto offset reset",
65-
notes = "including earliest, latest (the default), none")
66+
notes = "including earliest, specific, latest (the default), none")
6667
private String autoOffsetReset;
6768

6869
@ApiModelProperty("database pattern used for filter in canal format")

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ public class KafkaSourceDTO {
5959
private String byteSpeedLimit;
6060

6161
@ApiModelProperty(value = "Topic partition offset",
62-
notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
63-
private String topicPartitionOffset;
62+
notes = "For example,'partition:0,offset:42;partition:1,offset:300' "
63+
+ "indicates offset 42 for partition 0 and offset 300 for partition 1.")
64+
private String partitionOffsets;
6465

6566
/**
6667
* The strategy of auto offset reset.
@@ -102,7 +103,7 @@ public static KafkaSourceDTO getFromRequest(KafkaSourceRequest request) {
102103
.bootstrapServers(request.getBootstrapServers())
103104
.recordSpeedLimit(request.getRecordSpeedLimit())
104105
.byteSpeedLimit(request.getByteSpeedLimit())
105-
.topicPartitionOffset(request.getTopicPartitionOffset())
106+
.partitionOffsets(request.getPartitionOffsets())
106107
.autoOffsetReset(request.getAutoOffsetReset())
107108
.serializationType(request.getSerializationType())
108109
.databasePattern(request.getDatabasePattern())

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ public class KafkaSourceRequest extends SourceRequest {
5454
private String byteSpeedLimit;
5555

5656
@ApiModelProperty(value = "Topic partition offset",
57-
notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
58-
private String topicPartitionOffset;
57+
notes = "For example,'partition:0,offset:42;partition:1,offset:300' "
58+
+ "indicates offset 42 for partition 0 and offset 300 for partition 1.")
59+
private String partitionOffsets;
5960

6061
@ApiModelProperty(value = "The strategy of auto offset reset",
6162
notes = "including earliest, latest (the default), none")

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
201201
case EARLIEST:
202202
startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
203203
break;
204+
case SPECIFIC:
205+
startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
206+
break;
204207
case LATEST:
205208
default:
206209
startupMode = KafkaScanStartupMode.LATEST_OFFSET;
@@ -209,6 +212,7 @@ public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
209212
String groupId = kafkaSource.getGroupId();
210213
Map<String, String> properties = kafkaSource.getProperties().entrySet().stream()
211214
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
215+
String partitionOffset = kafkaSource.getPartitionOffsets();
212216
return new KafkaExtractNode(id,
213217
name,
214218
fieldInfos,
@@ -219,7 +223,9 @@ public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
219223
format,
220224
startupMode,
221225
primaryKey,
222-
groupId);
226+
groupId,
227+
partitionOffset
228+
);
223229
}
224230

225231
/**

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class KafkaConstant {
3535

3636
public static final String KAFKA = "kafka";
3737

38+
public static final String SCAN_STARTUP_SPECIFIC_OFFSETS = "scan.startup.specific-offsets";
39+
3840
/**
3941
* upsert-kafka
4042
*

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
*/
2626
public enum KafkaScanStartupMode {
2727
EARLIEST_OFFSET("earliest-offset"),
28-
LATEST_OFFSET("latest-offset");
28+
LATEST_OFFSET("latest-offset"),
29+
SPECIFIC_OFFSETS("specific-offsets");
2930

3031
KafkaScanStartupMode(String value) {
3132
this.value = value;

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public class KafkaExtractNode extends ExtractNode implements Metadata, Serializa
7575
@JsonProperty("groupId")
7676
private String groupId;
7777

78+
@JsonProperty("scanSpecificOffsets")
79+
private String scanSpecificOffsets;
80+
7881
@JsonCreator
7982
public KafkaExtractNode(@JsonProperty("id") String id,
8083
@JsonProperty("name") String name,
@@ -86,14 +89,19 @@ public KafkaExtractNode(@JsonProperty("id") String id,
8689
@Nonnull @JsonProperty("format") Format format,
8790
@JsonProperty("scanStartupMode") KafkaScanStartupMode kafkaScanStartupMode,
8891
@JsonProperty("primaryKey") String primaryKey,
89-
@JsonProperty("groupId") String groupId) {
92+
@JsonProperty("groupId") String groupId,
93+
@JsonProperty("scanSpecificOffsets") String scanSpecificOffsets) {
9094
super(id, name, fields, watermarkField, properties);
9195
this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
9296
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "kafka bootstrapServers is empty");
9397
this.format = Preconditions.checkNotNull(format, "kafka format is empty");
9498
this.kafkaScanStartupMode = Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is empty");
9599
this.primaryKey = primaryKey;
96100
this.groupId = groupId;
101+
if (kafkaScanStartupMode == KafkaScanStartupMode.SPECIFIC_OFFSETS) {
102+
Preconditions.checkArgument(StringUtils.isNotEmpty(scanSpecificOffsets), "scanSpecificOffsets is empty");
103+
this.scanSpecificOffsets = scanSpecificOffsets;
104+
}
97105
}
98106

99107
/**
@@ -110,6 +118,7 @@ public Map<String, String> tableOptions() {
110118
if (StringUtils.isEmpty(this.primaryKey)) {
111119
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
112120
options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
121+
options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
113122
options.putAll(format.generateOptions(false));
114123
} else {
115124
options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
@@ -118,6 +127,7 @@ public Map<String, String> tableOptions() {
118127
} else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
119128
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
120129
options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
130+
options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
121131
options.putAll(format.generateOptions(false));
122132
} else {
123133
throw new IllegalArgumentException("kafka extract node format is IllegalArgument");

inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public KafkaExtractNode getTestObject() {
4343
new FieldInfo("name", new StringFormatInfo()),
4444
new FieldInfo("age", new IntFormatInfo()));
4545
return new KafkaExtractNode("1", "kafka_input", fields, null, null, "workerCsv",
46-
"localhost:9092", new CsvFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
46+
"localhost:9092", new CsvFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId", null);
4747
}
4848

4949
@Test

inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ private KafkaExtractNode buildKafkaExtractNode() {
6363
return new KafkaExtractNode("1", "kafka_input", fields, null,
6464
null, "topic_input", "localhost:9092",
6565
new CanalJsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
66-
null, "group_1");
66+
null, "group_1", null);
6767
}
6868

6969
/**

inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DecimalFormatSqlParseTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private KafkaExtractNode buildKafkaExtractNode() {
5454
return new KafkaExtractNode("1", "kafka_input", fields, null,
5555
null, "kafka_input", "localhost:9092",
5656
new CanalJsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
57-
null, "groupId_1");
57+
null, "groupId_1", null);
5858
}
5959

6060
private KafkaLoadNode buildKafkaLoadNode() {

inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ private KafkaExtractNode buildKafkaExtractNode() {
6969
return new KafkaExtractNode("1", "kafka_input", fields, null,
7070
null, "topic_input", "localhost:9092",
7171
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
72-
null, "groupId");
72+
null, "groupId", null);
7373
}
7474

7575
private KafkaExtractNode buildKafkaExtractNode2() {
@@ -85,7 +85,7 @@ private KafkaExtractNode buildKafkaExtractNode2() {
8585
return new KafkaExtractNode("1", "kafka_input", fields, wk,
8686
null, "topic_input", "localhost:9092",
8787
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
88-
null, "groupId");
88+
null, "groupId", null);
8989
}
9090

9191
private KafkaExtractNode buildKafkaExtractNode3() {
@@ -96,7 +96,7 @@ private KafkaExtractNode buildKafkaExtractNode3() {
9696
new FieldInfo("ts", new TimestampFormatInfo()));
9797
return new KafkaExtractNode("1", "kafka_input", fields, null,
9898
null, "topic_input", "localhost:9092",
99-
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
99+
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId", null);
100100
}
101101

102102
private KafkaLoadNode buildKafkaLoadNode() {

inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private KafkaExtractNode buildKafkaExtractNode(String id) {
8787
new StringConstantParam("5"),
8888
new TimeUnitConstantParam(TimeUnit.SECOND));
8989
return new KafkaExtractNode(id, "kafka_input", fields, wk, null, "workerJson",
90-
"localhost:9092", new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
90+
"localhost:9092", new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId", null);
9191
}
9292

9393
private KafkaLoadNode buildKafkaNode(String id) {

inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private KafkaExtractNode buildKafkaExtractNode() {
7676
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
7777
null, "topic_input_1", "localhost:9092",
7878
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
79-
null, "groupId");
79+
null, "groupId", null);
8080
}
8181

8282
/**
@@ -90,7 +90,7 @@ private KafkaExtractNode buildKafkaExtractNode2() {
9090
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
9191
null, "topic_input_2", "localhost:9092",
9292
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
93-
null, "groupId");
93+
null, "groupId", null);
9494
}
9595

9696
/**
@@ -104,7 +104,7 @@ private KafkaExtractNode buildKafkaExtractNode3() {
104104
new FieldInfo("ts", new TimestampFormatInfo()));
105105
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
106106
null, "topic_input_3", "localhost:9092",
107-
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
107+
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId", null);
108108
}
109109

110110
/**

inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private KafkaExtractNode buildKafkaExtractNode() {
7676
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
7777
null, "topic_input_1", "localhost:9092",
7878
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
79-
null, "groupId");
79+
null, "groupId", null);
8080
}
8181

8282
/**
@@ -90,7 +90,7 @@ private KafkaExtractNode buildKafkaExtractNode2() {
9090
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
9191
null, "topic_input_2", "localhost:9092",
9292
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
93-
null, "groupId");
93+
null, "groupId", null);
9494
}
9595

9696
/**
@@ -104,7 +104,7 @@ private KafkaExtractNode buildKafkaExtractNode3() {
104104
new FieldInfo("ts", new TimestampFormatInfo()));
105105
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
106106
null, "topic_input_3", "localhost:9092",
107-
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
107+
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId", null);
108108
}
109109

110110
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.inlong.sort.parser;
20+
21+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
22+
import org.apache.flink.table.api.EnvironmentSettings;
23+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
24+
import org.apache.flink.test.util.AbstractTestBase;
25+
import org.apache.inlong.sort.formats.common.IntFormatInfo;
26+
import org.apache.inlong.sort.formats.common.LongFormatInfo;
27+
import org.apache.inlong.sort.formats.common.StringFormatInfo;
28+
import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
29+
import org.apache.inlong.sort.parser.result.ParseResult;
30+
import org.apache.inlong.sort.protocol.FieldInfo;
31+
import org.apache.inlong.sort.protocol.GroupInfo;
32+
import org.apache.inlong.sort.protocol.StreamInfo;
33+
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
34+
import org.apache.inlong.sort.protocol.node.Node;
35+
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
36+
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
37+
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
38+
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
39+
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
40+
import org.junit.Assert;
41+
import org.junit.Test;
42+
43+
import java.util.Arrays;
44+
import java.util.Collections;
45+
import java.util.List;
46+
import java.util.stream.Collectors;
47+
48+
/**
49+
* Test for kafka sql parse
50+
*/
51+
public class KafkaSqlParseTest extends AbstractTestBase {
52+
53+
/**
54+
* Test flink sql task for extract is kafka {@link KafkaExtractNode} and load is mysql {@link MySqlLoadNode}
55+
*
56+
* @throws Exception The exception may be thrown when executing
57+
*/
58+
@Test
59+
public void testKafkaSourceSqlParse() throws Exception {
60+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
61+
env.setParallelism(1);
62+
env.enableCheckpointing(10000);
63+
env.disableOperatorChaining();
64+
EnvironmentSettings settings = EnvironmentSettings
65+
.newInstance()
66+
.useBlinkPlanner()
67+
.inStreamingMode()
68+
.build();
69+
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
70+
Node inputNode = buildKafkaExtractSpecificOffset();
71+
Node outputNode = buildMysqlLoadNode();
72+
StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
73+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
74+
Collections.singletonList(outputNode))));
75+
GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
76+
FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
77+
ParseResult result = parser.parse();
78+
Assert.assertTrue(result.tryExecute());
79+
}
80+
81+
private KafkaExtractNode buildKafkaExtractSpecificOffset() {
82+
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
83+
new FieldInfo("name", new StringFormatInfo()),
84+
new FieldInfo("age", new IntFormatInfo()));
85+
return new KafkaExtractNode("1", "kafka_input", fields, null,
86+
null, "topic_input", "localhost:9092",
87+
new JsonFormat(), KafkaScanStartupMode.SPECIFIC_OFFSETS, null, "groupId",
88+
"partition:0,offset:42;partition:1,offset:300");
89+
}
90+
91+
private Node buildMysqlLoadNode() {
92+
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
93+
new FieldInfo("name", new StringFormatInfo()),
94+
new FieldInfo("age", new IntFormatInfo())
95+
);
96+
List<FieldRelation> relations = Arrays
97+
.asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
98+
new FieldInfo("id", new LongFormatInfo())),
99+
new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
100+
new FieldInfo("name", new StringFormatInfo())),
101+
new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
102+
new FieldInfo("age", new IntFormatInfo()))
103+
);
104+
return new MySqlLoadNode("2", "mysql_output", fields, relations, null,
105+
null, null, null, "jdbc:mysql://localhost:3306/inlong",
106+
"inlong", "inlong", "table_output", "id");
107+
}
108+
109+
/**
110+
* build node relation
111+
*
112+
* @param inputs extract node
113+
* @param outputs load node
114+
* @return node relation
115+
*/
116+
private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
117+
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
118+
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
119+
return new NodeRelation(inputIds, outputIds);
120+
}
121+
}

0 commit comments

Comments
 (0)