Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-3719][Manager] Support data_transformation feature in Inlong #3774

Merged
merged 15 commits into from
Apr 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.manager.client;

import org.apache.commons.compress.utils.Lists;
import com.google.common.collect.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.DataSeparator;
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
Expand All @@ -29,7 +29,7 @@
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.source.AutoPushSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.manager.client;

import org.apache.commons.compress.utils.Lists;
import com.google.common.collect.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.DataSeparator;
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
Expand All @@ -29,7 +29,7 @@
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.source.AgentFileSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.inlong.manager.client;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.DataSeparator;
Expand All @@ -31,7 +31,7 @@
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.source.KafkaSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.Data;
import org.apache.inlong.manager.client.api.InlongGroupConf;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.StreamSource;

Expand Down
6 changes: 6 additions & 0 deletions inlong-manager/manager-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<groupId>org.apache.inlong</groupId>
<artifactId>manager-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,22 @@ public interface InlongGroup {
*/
void update(InlongGroupConf conf) throws Exception;

/**
* ReInit inlong group after update configuration for group.
* Must be invoked when group is rejected,failed or started
*
* @return inlong group info
*/
InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception;

/**
* Init inlong group on updated conf.
* Must be invoked when group is rejected,failed or started
* This method is deprecated, recommend to use reInitOnUpdate
*
* @return inlong group info
*/
@Deprecated
InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class InlongGroupConf {
@ApiModelProperty("Need zookeeper support")
private boolean zookeeperEnabled = true;

@ApiModelProperty("data proxy cluster id")
@ApiModelProperty("Data proxy cluster id")
private Integer proxyClusterId;

@ApiModelProperty("Use light weight group")
private boolean lightWeight = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.manager.client.api;

import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;

import java.util.List;
import java.util.Map;

Expand All @@ -30,11 +33,19 @@ public abstract class InlongStream {

public abstract Map<String, StreamSink> getSinks();

public abstract Map<String, StreamTransform> getTransforms();

public abstract void addSource(StreamSource source);

public abstract void addSink(StreamSink sink);

public abstract void addTransform(StreamTransform transform);

public abstract StreamPipeline createPipeline();

@Deprecated
public abstract void updateSource(StreamSource source);

@Deprecated
public abstract void updateSink(StreamSink sink);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.manager.client.api;

import org.apache.inlong.manager.common.pojo.stream.StreamField;

import java.util.List;

public abstract class InlongStreamBuilder {
Expand All @@ -43,6 +45,13 @@ public abstract class InlongStreamBuilder {
*/
public abstract InlongStreamBuilder fields(List<StreamField> fieldList);

/**
* Create stream transform
*
* @return inlong stream builder
*/
public abstract InlongStreamBuilder transform(StreamTransform streamTransform);

/**
* Create data stream by builder
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.pojo.stream.StreamField;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.pojo.stream.StreamField;

@Data
@EqualsAndHashCode(callSuper = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.stream.StreamNode;

import java.util.List;
import java.util.Map;

@Data
@ApiModel("Stream sink configuration")
public abstract class StreamSink {
public abstract class StreamSink extends StreamNode {

@ApiModelProperty(value = "DataSink name", required = true)
private String sinkName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import lombok.Data;
import org.apache.inlong.manager.common.enums.SourceState;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.stream.StreamNode;

@Data
@ApiModel("Stream source configuration")
public abstract class StreamSource {
public abstract class StreamSource extends StreamNode {

public enum State {
INIT, NORMAL, FROZING, FROZEN, FAILED, DELETING, DELETE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.client.api;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.inlong.manager.common.pojo.stream.StreamNode;
import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;

@Data
@ApiModel("Stream Transform configuration")
public abstract class StreamTransform extends StreamNode {

@ApiModelProperty(value = "Transform name", required = true)
protected String transformName;

@ApiModelProperty(value = "Transform name", required = true)
protected TransformDefinition transformDefinition;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public void update(InlongGroupConf conf) throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
}

@Override
public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
}

@Override
public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
Expand Down
Loading