Skip to content

Commit 39e7bc9

Browse files
kipshihealchow
andauthored
[INLONG-3719][Manager] Support data_transformation feature in Inlong (#3774)
* Add constructor to streamTransform * Add unit tests for stream transform * Update API doc * Add processing_time for stream * resolve circular dependency in manager service * Update SQL file format Co-authored-by: healchow <healchow@gmail.com>
1 parent eb487b0 commit 39e7bc9

File tree

100 files changed

+3392
-415
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+3392
-415
lines changed

inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.inlong.manager.client;
1919

20-
import org.apache.commons.compress.utils.Lists;
20+
import com.google.common.collect.Lists;
2121
import org.apache.inlong.manager.client.api.ClientConfiguration;
2222
import org.apache.inlong.manager.client.api.DataSeparator;
2323
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
@@ -29,7 +29,7 @@
2929
import org.apache.inlong.manager.client.api.InlongStreamConf;
3030
import org.apache.inlong.manager.client.api.PulsarBaseConf;
3131
import org.apache.inlong.manager.client.api.SinkField;
32-
import org.apache.inlong.manager.client.api.StreamField;
32+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
3333
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
3434
import org.apache.inlong.manager.client.api.sink.HiveSink;
3535
import org.apache.inlong.manager.client.api.source.AutoPushSource;

inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.inlong.manager.client;
1919

20-
import org.apache.commons.compress.utils.Lists;
20+
import com.google.common.collect.Lists;
2121
import org.apache.inlong.manager.client.api.ClientConfiguration;
2222
import org.apache.inlong.manager.client.api.DataSeparator;
2323
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
@@ -29,7 +29,7 @@
2929
import org.apache.inlong.manager.client.api.InlongStreamConf;
3030
import org.apache.inlong.manager.client.api.PulsarBaseConf;
3131
import org.apache.inlong.manager.client.api.SinkField;
32-
import org.apache.inlong.manager.client.api.StreamField;
32+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
3333
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
3434
import org.apache.inlong.manager.client.api.sink.HiveSink;
3535
import org.apache.inlong.manager.client.api.source.AgentFileSource;

inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.inlong.manager.client;
1919

20+
import com.google.common.collect.Lists;
2021
import lombok.extern.slf4j.Slf4j;
21-
import org.apache.commons.compress.utils.Lists;
2222
import org.apache.inlong.manager.client.api.ClientConfiguration;
2323
import org.apache.inlong.manager.client.api.DataFormat;
2424
import org.apache.inlong.manager.client.api.DataSeparator;
@@ -31,7 +31,7 @@
3131
import org.apache.inlong.manager.client.api.InlongStreamConf;
3232
import org.apache.inlong.manager.client.api.PulsarBaseConf;
3333
import org.apache.inlong.manager.client.api.SinkField;
34-
import org.apache.inlong.manager.client.api.StreamField;
34+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
3535
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
3636
import org.apache.inlong.manager.client.api.sink.HiveSink;
3737
import org.apache.inlong.manager.client.api.source.KafkaSource;

inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import lombok.Data;
2121
import org.apache.inlong.manager.client.api.InlongGroupConf;
2222
import org.apache.inlong.manager.client.api.InlongStreamConf;
23-
import org.apache.inlong.manager.client.api.StreamField;
23+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
2424
import org.apache.inlong.manager.client.api.StreamSink;
2525
import org.apache.inlong.manager.client.api.StreamSource;
2626

inlong-manager/manager-client/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
<groupId>org.apache.inlong</groupId>
3333
<artifactId>manager-common</artifactId>
3434
<version>${project.version}</version>
35+
<exclusions>
36+
<exclusion>
37+
<groupId>org.apache.commons</groupId>
38+
<artifactId>commons-compress</artifactId>
39+
</exclusion>
40+
</exclusions>
3541
</dependency>
3642
<dependency>
3743
<groupId>org.projectlombok</groupId>

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java

+10
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,22 @@ public interface InlongGroup {
5353
*/
5454
void update(InlongGroupConf conf) throws Exception;
5555

56+
/**
57+
* ReInit inlong group after update configuration for group.
58+
* Must be invoked when group is rejected,failed or started
59+
*
60+
* @return inlong group info
61+
*/
62+
InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception;
63+
5664
/**
5765
* Init inlong group on updated conf.
5866
* Must be invoked when group is rejected,failed or started
67+
* This method is deprecated, recommend to use reInitOnUpdate
5968
*
6069
* @return inlong group info
6170
*/
71+
@Deprecated
6272
InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception;
6373

6474
/**

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public class InlongGroupConf {
5959
@ApiModelProperty("Need zookeeper support")
6060
private boolean zookeeperEnabled = true;
6161

62-
@ApiModelProperty("data proxy cluster id")
62+
@ApiModelProperty("Data proxy cluster id")
6363
private Integer proxyClusterId;
64+
65+
@ApiModelProperty("Use light weight group")
66+
private boolean lightWeight = false;
6467
}

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java

+11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.inlong.manager.client.api;
1919

20+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
21+
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
22+
2023
import java.util.List;
2124
import java.util.Map;
2225

@@ -30,11 +33,19 @@ public abstract class InlongStream {
3033

3134
public abstract Map<String, StreamSink> getSinks();
3235

36+
public abstract Map<String, StreamTransform> getTransforms();
37+
3338
public abstract void addSource(StreamSource source);
3439

3540
public abstract void addSink(StreamSink sink);
3641

42+
public abstract void addTransform(StreamTransform transform);
43+
44+
public abstract StreamPipeline createPipeline();
45+
46+
@Deprecated
3747
public abstract void updateSource(StreamSource source);
3848

49+
@Deprecated
3950
public abstract void updateSink(StreamSink sink);
4051
}

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java

+9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.inlong.manager.client.api;
1919

20+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
21+
2022
import java.util.List;
2123

2224
public abstract class InlongStreamBuilder {
@@ -43,6 +45,13 @@ public abstract class InlongStreamBuilder {
4345
*/
4446
public abstract InlongStreamBuilder fields(List<StreamField> fieldList);
4547

48+
/**
49+
* Create stream transform
50+
*
51+
* @return inlong stream builder
52+
*/
53+
public abstract InlongStreamBuilder transform(StreamTransform streamTransform);
54+
4655
/**
4756
* Create data stream by builder
4857
*

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import lombok.AllArgsConstructor;
2323
import lombok.Data;
2424
import lombok.NoArgsConstructor;
25+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
2526

2627
import java.nio.charset.Charset;
2728
import java.nio.charset.StandardCharsets;

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import lombok.EqualsAndHashCode;
2424
import lombok.NoArgsConstructor;
2525
import org.apache.inlong.manager.common.enums.FieldType;
26+
import org.apache.inlong.manager.common.pojo.stream.StreamField;
2627

2728
@Data
2829
@EqualsAndHashCode(callSuper = true)

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@
2121
import io.swagger.annotations.ApiModelProperty;
2222
import lombok.Data;
2323
import org.apache.inlong.manager.common.enums.SinkType;
24+
import org.apache.inlong.manager.common.pojo.stream.StreamNode;
2425

2526
import java.util.List;
2627
import java.util.Map;
2728

2829
@Data
2930
@ApiModel("Stream sink configuration")
30-
public abstract class StreamSink {
31+
public abstract class StreamSink extends StreamNode {
3132

3233
@ApiModelProperty(value = "DataSink name", required = true)
3334
private String sinkName;

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
import lombok.Data;
2323
import org.apache.inlong.manager.common.enums.SourceState;
2424
import org.apache.inlong.manager.common.enums.SourceType;
25+
import org.apache.inlong.manager.common.pojo.stream.StreamNode;
2526

2627
@Data
2728
@ApiModel("Stream source configuration")
28-
public abstract class StreamSource {
29+
public abstract class StreamSource extends StreamNode {
2930

3031
public enum State {
3132
INIT, NORMAL, FROZING, FROZEN, FAILED, DELETING, DELETE;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.client.api;
19+
20+
import io.swagger.annotations.ApiModel;
21+
import io.swagger.annotations.ApiModelProperty;
22+
import lombok.Data;
23+
import org.apache.inlong.manager.common.pojo.stream.StreamNode;
24+
import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
25+
26+
@Data
27+
@ApiModel("Stream Transform configuration")
28+
public abstract class StreamTransform extends StreamNode {
29+
30+
@ApiModelProperty(value = "Transform name", required = true)
31+
protected String transformName;
32+
33+
@ApiModelProperty(value = "Transform name", required = true)
34+
protected TransformDefinition transformDefinition;
35+
}

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java

+5
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public void update(InlongGroupConf conf) throws Exception {
4848
throw new UnsupportedOperationException("Inlong group is not exists");
4949
}
5050

51+
@Override
52+
public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception {
53+
throw new UnsupportedOperationException("Inlong group is not exists");
54+
}
55+
5156
@Override
5257
public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception {
5358
throw new UnsupportedOperationException("Inlong group is not exists");

0 commit comments

Comments
 (0)