Skip to content

Commit 236e5d9

Browse files
authored
[INLONG-4085][Manager] Change the inlong_group and inlong_stream table structure (#4214)
1 parent 0553b32 commit 236e5d9

File tree

72 files changed

+863
-1311
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+863
-1311
lines changed

inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
2929
import org.apache.inlong.manager.client.api.InlongStreamConf;
3030
import org.apache.inlong.manager.client.api.PulsarBaseConf;
31-
import org.apache.inlong.manager.common.pojo.stream.SinkField;
32-
import org.apache.inlong.manager.common.pojo.stream.StreamField;
3331
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
3432
import org.apache.inlong.manager.client.api.sink.HiveSink;
3533
import org.apache.inlong.manager.client.api.source.AutoPushSource;
3634
import org.apache.inlong.manager.common.enums.FieldType;
3735
import org.apache.inlong.manager.common.enums.FileFormat;
36+
import org.apache.inlong.manager.common.pojo.stream.SinkField;
37+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
3838
import org.apache.shiro.util.Assert;
3939
import org.junit.Test;
4040

@@ -118,8 +118,8 @@ private InlongGroupConf createGroupConf() {
118118
InlongGroupConf inlongGroupConf = new InlongGroupConf();
119119
inlongGroupConf.setGroupName(GROUP_NAME);
120120
inlongGroupConf.setDescription(GROUP_NAME);
121-
inlongGroupConf.setProxyClusterId(1);
122-
//pulsar conf
121+
122+
// pulsar conf
123123
PulsarBaseConf pulsarBaseConf = new PulsarBaseConf();
124124
inlongGroupConf.setMqBaseConf(pulsarBaseConf);
125125
pulsarBaseConf.setPulsarServiceUrl(PULSAR_SERVICE_URL);
@@ -128,14 +128,19 @@ private InlongGroupConf createGroupConf() {
128128
pulsarBaseConf.setEnableCreateResource(false);
129129
pulsarBaseConf.setTenant(tenant);
130130

131-
//flink conf
131+
// flink conf
132132
FlinkSortBaseConf sortBaseConf = new FlinkSortBaseConf();
133133
inlongGroupConf.setSortBaseConf(sortBaseConf);
134134
sortBaseConf.setServiceUrl(FLINK_URL);
135135
Map<String, String> map = new HashMap<>(16);
136136
sortBaseConf.setProperties(map);
137-
//enable zk
138-
inlongGroupConf.setZookeeperEnabled(false);
137+
138+
// set enable zk, create resource, lightweight mode, and cluster tag
139+
inlongGroupConf.setEnableZookeeper(0);
140+
inlongGroupConf.setEnableCreateResource(1);
141+
inlongGroupConf.setLightweight(1);
142+
inlongGroupConf.setInlongClusterTag("default_cluster");
143+
139144
inlongGroupConf.setDailyRecords(10000000L);
140145
inlongGroupConf.setPeakRecords(100000L);
141146
inlongGroupConf.setMaxLength(10000);

inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import lombok.extern.slf4j.Slf4j;
2121
import org.apache.inlong.manager.client.api.ClientConfiguration;
22-
import org.apache.inlong.manager.common.enums.DataFormat;
2322
import org.apache.inlong.manager.client.api.DataSeparator;
2423
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
2524
import org.apache.inlong.manager.client.api.InlongClient;
@@ -32,12 +31,13 @@
3231
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
3332
import org.apache.inlong.manager.client.api.sink.KafkaSink;
3433
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
34+
import org.apache.inlong.manager.common.enums.DataFormat;
3535
import org.apache.inlong.manager.common.enums.MQType;
3636
import org.apache.shiro.util.Assert;
3737
import org.junit.Test;
3838

3939
import java.nio.charset.StandardCharsets;
40-
import java.util.Arrays;
40+
import java.util.Collections;
4141
import java.util.HashMap;
4242
import java.util.Map;
4343
import java.util.concurrent.TimeUnit;
@@ -153,8 +153,7 @@ private InlongGroupConf createGroupConf() {
153153
InlongGroupConf inlongGroupConf = new InlongGroupConf();
154154
inlongGroupConf.setGroupName(GROUP_NAME);
155155
inlongGroupConf.setDescription(GROUP_NAME);
156-
inlongGroupConf.setProxyClusterId(1);
157-
//pulsar conf
156+
// pulsar conf
158157
PulsarBaseConf pulsarBaseConf = new PulsarBaseConf();
159158
pulsarBaseConf.setType(MQType.PULSAR);
160159
inlongGroupConf.setMqBaseConf(pulsarBaseConf);
@@ -163,14 +162,20 @@ private InlongGroupConf createGroupConf() {
163162
pulsarBaseConf.setNamespace("public");
164163
pulsarBaseConf.setEnableCreateResource(false);
165164
pulsarBaseConf.setTenant(tenant);
166-
//flink conf
165+
166+
// flink conf
167167
FlinkSortBaseConf sortBaseConf = new FlinkSortBaseConf();
168168
inlongGroupConf.setSortBaseConf(sortBaseConf);
169169
sortBaseConf.setServiceUrl(FLINK_URL);
170170
Map<String, String> map = new HashMap<>(16);
171171
sortBaseConf.setProperties(map);
172-
//enable zk
173-
inlongGroupConf.setZookeeperEnabled(false);
172+
173+
// set enable zk, create resource, lightweight mode, and cluster tag
174+
inlongGroupConf.setEnableZookeeper(0);
175+
inlongGroupConf.setEnableCreateResource(1);
176+
inlongGroupConf.setLightweight(1);
177+
inlongGroupConf.setInlongClusterTag("default_cluster");
178+
174179
inlongGroupConf.setDailyRecords(10000000L);
175180
inlongGroupConf.setPeakRecords(100000L);
176181
inlongGroupConf.setMaxLength(10000);
@@ -191,7 +196,7 @@ private InlongStreamConf createStreamConf() {
191196

192197
private MySQLBinlogSource createMysqlSource() {
193198
MySQLBinlogSource mySQLBinlogSource = new MySQLBinlogSource();
194-
mySQLBinlogSource.setDbNames(Arrays.asList("{db.name}"));
199+
mySQLBinlogSource.setDbNames(Collections.singletonList("{db.name}"));
195200
mySQLBinlogSource.setHostname("{db.url}");
196201
mySQLBinlogSource.setAuthentication(new DefaultAuthentication("root", "inlong"));
197202
mySQLBinlogSource.setSourceName("{mysql.source.name}");
@@ -207,7 +212,7 @@ private KafkaSink createKafkaSink() {
207212
kafkaSink.setNeedCreated(false);
208213
kafkaSink.setSinkName("{kafka.sink.name}");
209214
Map<String, Object> properties = new HashMap<>();
210-
//Not needed if kafka cluster is not set
215+
// Not needed if kafka cluster is not set
211216
properties.put("transaction.timeout.ms", 9000000);
212217
kafkaSink.setProperties(properties);
213218
return kafkaSink;

inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
2929
import org.apache.inlong.manager.client.api.InlongStreamConf;
3030
import org.apache.inlong.manager.client.api.PulsarBaseConf;
31-
import org.apache.inlong.manager.common.pojo.stream.SinkField;
32-
import org.apache.inlong.manager.common.pojo.stream.StreamField;
3331
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
3432
import org.apache.inlong.manager.client.api.sink.HiveSink;
3533
import org.apache.inlong.manager.client.api.source.AgentFileSource;
3634
import org.apache.inlong.manager.common.enums.FieldType;
3735
import org.apache.inlong.manager.common.enums.FileFormat;
36+
import org.apache.inlong.manager.common.pojo.stream.SinkField;
37+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
3838
import org.apache.shiro.util.Assert;
3939
import org.junit.Test;
4040

@@ -118,8 +118,7 @@ private InlongGroupConf createGroupConf() {
118118
InlongGroupConf inlongGroupConf = new InlongGroupConf();
119119
inlongGroupConf.setGroupName(GROUP_NAME);
120120
inlongGroupConf.setDescription(GROUP_NAME);
121-
inlongGroupConf.setProxyClusterId(1);
122-
//pulsar conf
121+
// pulsar conf
123122
PulsarBaseConf pulsarBaseConf = new PulsarBaseConf();
124123
inlongGroupConf.setMqBaseConf(pulsarBaseConf);
125124
pulsarBaseConf.setPulsarServiceUrl(PULSAR_SERVICE_URL);
@@ -128,14 +127,19 @@ private InlongGroupConf createGroupConf() {
128127
pulsarBaseConf.setEnableCreateResource(false);
129128
pulsarBaseConf.setTenant(tenant);
130129

131-
//flink conf
130+
// flink conf
132131
FlinkSortBaseConf sortBaseConf = new FlinkSortBaseConf();
133132
inlongGroupConf.setSortBaseConf(sortBaseConf);
134133
sortBaseConf.setServiceUrl(FLINK_URL);
135134
Map<String, String> map = new HashMap<>(16);
136135
sortBaseConf.setProperties(map);
137-
//enable zk
138-
inlongGroupConf.setZookeeperEnabled(false);
136+
137+
// set enable zk, create resource, lightweight mode, and cluster tag
138+
inlongGroupConf.setEnableZookeeper(0);
139+
inlongGroupConf.setEnableCreateResource(1);
140+
inlongGroupConf.setLightweight(1);
141+
inlongGroupConf.setInlongClusterTag("default_cluster");
142+
139143
inlongGroupConf.setDailyRecords(10000000L);
140144
inlongGroupConf.setPeakRecords(100000L);
141145
inlongGroupConf.setMaxLength(10000);

inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.common.collect.Lists;
2121
import lombok.extern.slf4j.Slf4j;
2222
import org.apache.inlong.manager.client.api.ClientConfiguration;
23-
import org.apache.inlong.manager.common.enums.DataFormat;
2423
import org.apache.inlong.manager.client.api.DataSeparator;
2524
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
2625
import org.apache.inlong.manager.client.api.InlongClient;
@@ -30,13 +29,14 @@
3029
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
3130
import org.apache.inlong.manager.client.api.InlongStreamConf;
3231
import org.apache.inlong.manager.client.api.PulsarBaseConf;
33-
import org.apache.inlong.manager.common.pojo.stream.SinkField;
34-
import org.apache.inlong.manager.common.pojo.stream.StreamField;
3532
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
3633
import org.apache.inlong.manager.client.api.sink.HiveSink;
3734
import org.apache.inlong.manager.client.api.source.KafkaSource;
35+
import org.apache.inlong.manager.common.enums.DataFormat;
3836
import org.apache.inlong.manager.common.enums.FieldType;
3937
import org.apache.inlong.manager.common.enums.FileFormat;
38+
import org.apache.inlong.manager.common.pojo.stream.SinkField;
39+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
4040
import org.apache.shiro.util.Assert;
4141
import org.junit.Test;
4242

@@ -121,8 +121,8 @@ private InlongGroupConf createGroupConf() {
121121
InlongGroupConf inlongGroupConf = new InlongGroupConf();
122122
inlongGroupConf.setGroupName(GROUP_NAME);
123123
inlongGroupConf.setDescription(GROUP_NAME);
124-
inlongGroupConf.setProxyClusterId(1);
125-
//pulsar conf
124+
125+
// pulsar conf
126126
PulsarBaseConf pulsarBaseConf = new PulsarBaseConf();
127127
inlongGroupConf.setMqBaseConf(pulsarBaseConf);
128128
pulsarBaseConf.setPulsarServiceUrl(PULSAR_SERVICE_URL);
@@ -131,14 +131,19 @@ private InlongGroupConf createGroupConf() {
131131
pulsarBaseConf.setEnableCreateResource(false);
132132
pulsarBaseConf.setTenant(tenant);
133133

134-
//flink conf
134+
// flink conf
135135
FlinkSortBaseConf sortBaseConf = new FlinkSortBaseConf();
136136
inlongGroupConf.setSortBaseConf(sortBaseConf);
137137
sortBaseConf.setServiceUrl(FLINK_URL);
138138
Map<String, String> map = new HashMap<>(16);
139139
sortBaseConf.setProperties(map);
140-
//enable zk
141-
inlongGroupConf.setZookeeperEnabled(false);
140+
141+
// set enable zk, create resource, lightweight mode, and cluster tag
142+
inlongGroupConf.setEnableZookeeper(0);
143+
inlongGroupConf.setEnableCreateResource(1);
144+
inlongGroupConf.setLightweight(1);
145+
inlongGroupConf.setInlongClusterTag("default_cluster");
146+
142147
inlongGroupConf.setDailyRecords(10000000L);
143148
inlongGroupConf.setPeakRecords(100000L);
144149
inlongGroupConf.setMaxLength(10000);

inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/GroupInfo.java

-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ public class GroupInfo {
3131
private Integer id;
3232
private String inlongGroupId;
3333
private String name;
34-
private String cnName;
3534
private String status;
3635
private Date modifyTime;
3736

inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/StreamInfo.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class StreamInfo {
2929

3030
private Integer id;
3131
private String name;
32-
private String mqResourceObj;
32+
private String mqResource;
3333
private String dataType;
3434
private String dataEncoding;
3535
private String dataSeparator;

inlong-manager/manager-client/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
~ See the License for the specific language governing permissions and
1616
~ limitations under the License.
1717
-->
18-
<project xmlns="http://maven.apache.org/POM/4.0.0"
19-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xmlns="http://maven.apache.org/POM/4.0.0"
2020
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2121
<modelVersion>4.0.0</modelVersion>
2222
<parent>

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ public class InlongGroupConf {
3535
@ApiModelProperty(value = "Group name", required = true)
3636
private String groupName;
3737

38-
@ApiModelProperty("Chinese display name")
39-
private String cnName;
40-
4138
@ApiModelProperty("Group description")
4239
private String description;
4340

@@ -59,12 +56,16 @@ public class InlongGroupConf {
5956
@ApiModelProperty("The operator of stream group, default : admin")
6057
private String operator = "admin";
6158

62-
@ApiModelProperty("Need zookeeper support")
63-
private boolean zookeeperEnabled = true;
59+
@ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
60+
private Integer enableZookeeper;
61+
62+
@ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
63+
private Integer enableCreateResource;
64+
65+
@ApiModelProperty(value = "Whether to use lightweight mode, 0: false, 1: true")
66+
private Integer lightweight;
6467

65-
@ApiModelProperty("Data proxy cluster id")
66-
private Integer proxyClusterId;
68+
@ApiModelProperty("Inlong cluster tag")
69+
private String inlongClusterTag;
6770

68-
@ApiModelProperty("Use lightweight group")
69-
private boolean lightweight = false;
7071
}

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public InlongStreamBuilder transform(StreamTransform streamTransform) {
120120
public InlongStream init() {
121121
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
122122
StreamPipeline streamPipeline = inlongStream.createPipeline();
123-
streamInfo.setTempView(GsonUtil.toJson(streamPipeline));
123+
streamInfo.setExtParams(GsonUtil.toJson(streamPipeline));
124124
Double streamIndex = managerClient.createStreamInfo(streamInfo);
125125
streamInfo.setId(streamIndex.intValue());
126126
//Create source and update index
@@ -145,7 +145,7 @@ public InlongStream init() {
145145
public InlongStream initOrUpdate() {
146146
InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
147147
StreamPipeline streamPipeline = inlongStream.createPipeline();
148-
dataStreamInfo.setTempView(GsonUtil.toJson(streamPipeline));
148+
dataStreamInfo.setExtParams(GsonUtil.toJson(streamPipeline));
149149
Boolean isExist = managerClient.isStreamExists(dataStreamInfo);
150150
if (isExist) {
151151
Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ public InlongStream update() {
306306
}
307307
streamInfo.setFieldList(InlongStreamTransfer.createStreamFields(this.streamFields, streamInfo));
308308
StreamPipeline streamPipeline = createPipeline();
309-
streamInfo.setTempView(GsonUtil.toJson(streamPipeline));
309+
streamInfo.setExtParams(GsonUtil.toJson(streamPipeline));
310310
Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(streamInfo);
311311
if (!updateMsg.getKey()) {
312312
throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));

0 commit comments

Comments
 (0)