Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-3719][Manager] Support data_transformation feature in Inlong #3774

Merged
merged 15 commits into from
Apr 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.source.AutoPushSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.source.AgentFileSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.source.KafkaSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.Data;
import org.apache.inlong.manager.client.api.InlongGroupConf;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.StreamSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,22 @@ public interface InlongGroup {
*/
void update(InlongGroupConf conf) throws Exception;

/**
* ReInit inlong group after update configuration for group.
* Must be invoked when group is rejected,failed or started
*
* @return inlong group info
*/
InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception;

/**
* Init inlong group on updated conf.
* Must be invoked when group is rejected,failed or started
* This method is deprecated, recommend to use reInitOnUpdate
*
* @return inlong group info
*/
@Deprecated
InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class InlongGroupConf {
@ApiModelProperty("Need zookeeper support")
private boolean zookeeperEnabled = true;

@ApiModelProperty("data proxy cluster id")
@ApiModelProperty("Data proxy cluster id")
private Integer proxyClusterId;

@ApiModelProperty("Use light weight group")
private boolean lightWeight = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.manager.client.api;

import org.apache.inlong.manager.common.pojo.stream.StreamField;

import java.util.List;
import java.util.Map;

Expand All @@ -30,11 +32,17 @@ public abstract class InlongStream {

public abstract Map<String, StreamSink> getSinks();

public abstract Map<String, StreamTransform> getTransforms();

public abstract void addSource(StreamSource source);

public abstract void addSink(StreamSink sink);

public abstract void addTransform(StreamTransform transform);

@Deprecated
public abstract void updateSource(StreamSource source);

@Deprecated
public abstract void updateSink(StreamSink sink);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.manager.client.api;

import org.apache.inlong.manager.common.pojo.stream.StreamField;

import java.util.List;

public abstract class InlongStreamBuilder {
Expand All @@ -43,6 +45,13 @@ public abstract class InlongStreamBuilder {
*/
public abstract InlongStreamBuilder fields(List<StreamField> fieldList);

/**
* Create stream transform
*
* @return inlong stream builder
*/
public abstract InlongStreamBuilder transform(StreamTransform streamTransform);

/**
* Create data stream by builder
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.pojo.stream.StreamField;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.pojo.stream.StreamField;

@Data
@EqualsAndHashCode(callSuper = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.stream.StreamNode;

import java.util.List;
import java.util.Map;

@Data
@ApiModel("Stream sink configuration")
public abstract class StreamSink {
public abstract class StreamSink extends StreamNode {

@ApiModelProperty(value = "DataSink name", required = true)
private String sinkName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import lombok.Data;
import org.apache.inlong.manager.common.enums.SourceState;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.stream.StreamNode;

@Data
@ApiModel("Stream source configuration")
public abstract class StreamSource {
public abstract class StreamSource extends StreamNode {

public enum State {
INIT, NORMAL, FROZING, FROZEN, FAILED, DELETING, DELETE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.client.api;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.inlong.manager.common.pojo.stream.StreamNode;
import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;

@Data
@ApiModel("Stream Transform configuration")
public abstract class StreamTransform extends StreamNode {

@ApiModelProperty(value = "Transform name", required = true)
protected String transformName;

@ApiModelProperty(value = "Transform name", required = true)
protected TransformDefinition transformDefinition;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public void update(InlongGroupConf conf) throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
}

@Override
public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
}

@Override
public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.client.api.StreamTransform;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
import org.apache.inlong.manager.client.api.util.GsonUtil;
import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransformTransfer;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
Expand All @@ -43,17 +44,15 @@
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;

import java.util.List;

public class DefaultInlongStreamBuilder extends InlongStreamBuilder {

private InlongStreamImpl inlongStream;

private InlongStreamConf streamConf;

private InnerGroupContext groupContext;

private InnerStreamContext streamContext;

private InnerInlongManagerClient managerClient;
Expand All @@ -62,8 +61,6 @@ public DefaultInlongStreamBuilder(
InlongStreamConf streamConf,
InnerGroupContext groupContext,
InnerInlongManagerClient managerClient) {
this.streamConf = streamConf;
this.groupContext = groupContext;
this.managerClient = managerClient;
if (MapUtils.isEmpty(groupContext.getStreamContextMap())) {
groupContext.setStreamContextMap(Maps.newHashMap());
Expand Down Expand Up @@ -105,9 +102,19 @@ public InlongStreamBuilder fields(List<StreamField> fieldList) {
return this;
}

@Override
public InlongStreamBuilder transform(StreamTransform streamTransform) {
inlongStream.addTransform(streamTransform);
TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(streamTransform,
streamContext.getStreamInfo());
streamContext.setTransformRequest(transformRequest);
return this;
}

@Override
public InlongStream init() {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
//todo makeup pipeline
String streamIndex = managerClient.createStreamInfo(streamInfo);
streamInfo.setId(Double.valueOf(streamIndex).intValue());
//Create source and update index
Expand All @@ -128,6 +135,7 @@ public InlongStream init() {
@Override
public InlongStream initOrUpdate() {
InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
//todo makeup pipeline
Pair<Boolean, InlongStreamInfo> existMsg = managerClient.isStreamExists(dataStreamInfo);
if (existMsg.getKey()) {
Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public void update(InlongGroupConf conf) throws Exception {
AssertUtil.isNull(errMsg, errMsg);
}

@Override
public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception {
return initOnUpdate(conf);
}

@Override
public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception {
update(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.client.api.StreamTransform;
import org.apache.inlong.manager.client.api.util.AssertUtil;
import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
Expand All @@ -36,6 +36,7 @@
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;

import java.util.List;
import java.util.Map;
Expand All @@ -52,6 +53,8 @@ public class InlongStreamImpl extends InlongStream {

private Map<String, StreamSink> streamSinks = Maps.newHashMap();

private Map<String, StreamTransform> streamTransforms = Maps.newHashMap();

private List<StreamField> streamFields = Lists.newArrayList();

public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
Expand Down Expand Up @@ -116,6 +119,11 @@ public Map<String, StreamSink> getSinks() {
return this.streamSinks;
}

@Override
public Map<String, StreamTransform> getTransforms() {
return this.streamTransforms;
}

@Override
public void addSource(StreamSource source) {
AssertUtil.notNull(source.getSourceName(), "Source name should not be empty");
Expand All @@ -136,6 +144,16 @@ public void addSink(StreamSink sink) {
streamSinks.put(sinkName, sink);
}

@Override
public void addTransform(StreamTransform transform) {
AssertUtil.notNull(transform.getTransformName(), "Transform name should not be empty");
String transformName = transform.getTransformName();
if (streamTransforms.get(transformName) != null) {
throw new IllegalArgumentException(String.format("TransformName=%s has already be set", transform));
}
streamTransforms.put(transformName, transform);
}

@Override
public void updateSource(StreamSource source) {
AssertUtil.notNull(source.getSourceName(), "Source name should not be empty");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;

import java.util.List;
import java.util.Map;
Expand All @@ -38,6 +39,8 @@ public class InnerStreamContext {

private Map<String, SinkRequest> sinkRequests = Maps.newHashMap();

private Map<String, TransformRequest> transformRequests = Maps.newHashMap();

public InnerStreamContext(InlongStreamInfo streamInfo) {
this.streamInfo = streamInfo;
}
Expand All @@ -54,4 +57,7 @@ public void setSinkRequest(SinkRequest sinkRequest) {
this.sinkRequests.put(sinkRequest.getSinkName(), sinkRequest);
}

public void setTransformRequest(TransformRequest transformRequest) {
this.transformRequests.put(transformRequest.getTransformName(), transformRequest);
}
}
Loading