diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java index b5fc252f1af..63f2a26c651 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java @@ -216,7 +216,7 @@ private List fetchDataStreams(String groupId) { InlongStreamImpl inlongStream = new InlongStreamImpl(fullStreamResponse, stream); if (CollectionUtils.isNotEmpty(sourceListResponses)) { for (SourceListResponse response : sourceListResponses) { - inlongStream.addSource( + inlongStream.updateSource( InlongStreamSourceTransfer.parseStreamSource(response)); } } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java index d885a93dd2c..a934b590f51 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java @@ -337,8 +337,7 @@ public List listStreamInfo(String inlongGroupId) { org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - PageInfo pageInfo = InlongParser.parseStreamList(responseBody); - return pageInfo.getList(); + return InlongParser.parseStreamList(responseBody); } catch (Exception e) { throw new RuntimeException(String.format("List inlong streams failed: %s", e.getMessage()), e); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java index c11264be278..1c8987c0d05 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java @@ -17,19 +17,30 @@ package org.apache.inlong.manager.client.api.util; +import static org.apache.inlong.manager.common.enums.SourceType.BINLOG; +import static org.apache.inlong.manager.common.enums.SourceType.KAFKA; + import com.github.pagehelper.PageInfo; +import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import java.util.List; import org.apache.commons.lang3.tuple.Pair; import org.apache.inlong.manager.common.beans.Response; import org.apache.inlong.manager.common.enums.Constant; +import org.apache.inlong.manager.common.enums.SinkType; import org.apache.inlong.manager.common.enums.SourceType; import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest; import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse; import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo; import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse; import org.apache.inlong.manager.common.pojo.sink.SinkListResponse; +import org.apache.inlong.manager.common.pojo.sink.SinkResponse; +import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse; +import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse; +import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse; +import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse; import org.apache.inlong.manager.common.pojo.source.SourceListResponse; import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse; import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse; @@ -39,16 +50,17 @@ import org.apache.inlong.manager.common.pojo.workflow.EventLogView; import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult; -import java.util.List; - -import static org.apache.inlong.manager.common.enums.SourceType.BINLOG; -import static org.apache.inlong.manager.common.enums.SourceType.KAFKA; - /** * Parser for Inlong entity */ public class InlongParser { + public static final String GROUP_INFO = "groupInfo"; + public static final String MQ_EXT_INFO = "mqExtInfo"; + public static final String MIDDLEWARE_TYPE = "middlewareType"; + public static final String SINK_INFO = "sinkInfo"; + public static final String SINK_TYPE = "sinkType"; + public static Response parseResponse(String responseBody) { Response response = GsonUtil.fromJson(responseBody, Response.class); return response; @@ -62,7 +74,16 @@ public static WorkflowResult parseWorkflowResult(Response response) { public static InlongGroupResponse parseGroupInfo(Response response) { Object data = response.getData(); - return GsonUtil.fromJson(GsonUtil.toJson(data), InlongGroupResponse.class); + JsonObject groupJson = GsonUtil.fromJson(GsonUtil.toJson(data), JsonObject.class); + InlongGroupResponse inlongGroupResponse = GsonUtil.fromJson(GsonUtil.toJson(data), InlongGroupResponse.class); + JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO); + if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) { + if (Constant.MIDDLEWARE_PULSAR.equals(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString())) { + InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class); + inlongGroupResponse.setMqExtInfo(pulsarInfo); + } + } + return inlongGroupResponse; } public static PageInfo parseGroupList(Response response) { @@ -78,12 +99,51 @@ public static InlongStreamInfo parseStreamInfo(Response response) { return GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamInfo.class); } - public static PageInfo parseStreamList(Response response) { + public static List parseStreamList(Response response) { Object data = response.getData(); - String pageInfoJson = GsonUtil.toJson(data); - return GsonUtil.fromJson(pageInfoJson, - new TypeToken>() { - }.getType()); + JsonObject pageInfoJson = GsonUtil.fromJson(GsonUtil.toJson(data), JsonObject.class); + JsonArray fullStreamArray = pageInfoJson.getAsJsonArray("list"); + List list = Lists.newArrayList(); + for (int i = 0; i < fullStreamArray.size(); i++) { + JsonObject fullStreamJson = (JsonObject) fullStreamArray.get(i); + FullStreamResponse fullStreamResponse = GsonUtil.fromJson(fullStreamJson.toString(), + FullStreamResponse.class); + list.add(fullStreamResponse); + //Parse sinkResponse in each stream + JsonArray sinkJsonArr = fullStreamJson.getAsJsonArray(SINK_INFO); + List sinkResponses = Lists.newArrayList(); + fullStreamResponse.setSinkInfo(sinkResponses); + for (int j = 0; j < sinkJsonArr.size(); j++) { + JsonObject sinkJson = (JsonObject) sinkJsonArr.get(i); + String type = sinkJson.get(SINK_TYPE).getAsString(); + SinkType sinkType = SinkType.forType(type); + switch (sinkType) { + case HIVE: + HiveSinkResponse hiveSinkResponse = GsonUtil.fromJson(sinkJson.toString(), + HiveSinkResponse.class); + sinkResponses.add(hiveSinkResponse); + break; + case KAFKA: + KafkaSinkResponse kafkaSinkResponse = GsonUtil.fromJson(sinkJson.toString(), + KafkaSinkResponse.class); + sinkResponses.add(kafkaSinkResponse); + break; + case ICEBERG: + IcebergSinkResponse icebergSinkResponse = GsonUtil.fromJson(sinkJson.toString(), + IcebergSinkResponse.class); + sinkResponses.add(icebergSinkResponse); + break; + case CLICKHOUSE: + ClickHouseSinkResponse clickHouseSinkResponse = GsonUtil.fromJson(sinkJson.toString(), + ClickHouseSinkResponse.class); + sinkResponses.add(clickHouseSinkResponse); + break; + default: + throw new RuntimeException(String.format("Unsupport sinkType=%s for Inlong", sinkType)); + } + } + } + return list; } public static PageInfo parseSourceList(Response response) { @@ -107,9 +167,9 @@ public static PageInfo parseSourceList(Response response) { } throw new IllegalArgumentException( String.format("Unsupported sourceType=%s for Inlong", sourceType)); - + } else { + return new PageInfo<>(); } - throw new IllegalArgumentException(String.format("pageInfo is empty for Inlong")); } public static PageInfo parseSinkList(Response response) { @@ -121,25 +181,24 @@ public static PageInfo parseSinkList(Response response) { } public static Pair> parseGroupForm(String formJson) { - final String groupInfoField = "groupInfo"; - final String mqExtInfoField = "mqExtInfo"; JsonObject formData = GsonUtil.fromJson(formJson, JsonObject.class); - JsonObject groupJson = formData.getAsJsonObject(groupInfoField); + JsonObject groupJson = formData.getAsJsonObject(GROUP_INFO); InlongGroupApproveRequest groupApproveInfo = GsonUtil.fromJson(groupJson.toString(), InlongGroupApproveRequest.class); - JsonObject mqExtInfo = groupJson.getAsJsonObject(mqExtInfoField); - if (mqExtInfo != null && mqExtInfo.get("middlewareType") != null - && Constant.MIDDLEWARE_PULSAR.equals(mqExtInfo.get("middlewareType").getAsString())) { - InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class); - groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum()); - groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble()); - groupApproveInfo.setWriteQuorum(pulsarInfo.getWriteQuorum()); - groupApproveInfo.setRetentionTime(pulsarInfo.getRetentionTime()); - groupApproveInfo.setRetentionTimeUnit(pulsarInfo.getRetentionTimeUnit()); - groupApproveInfo.setTtl(pulsarInfo.getTtl()); - groupApproveInfo.setTtlUnit(pulsarInfo.getTtlUnit()); - groupApproveInfo.setRetentionSize(pulsarInfo.getRetentionSize()); - groupApproveInfo.setRetentionSizeUnit(pulsarInfo.getRetentionSizeUnit()); + JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO); + if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) { + if (Constant.MIDDLEWARE_PULSAR.equals(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString())) { + InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class); + groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum()); + groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble()); + groupApproveInfo.setWriteQuorum(pulsarInfo.getWriteQuorum()); + groupApproveInfo.setRetentionTime(pulsarInfo.getRetentionTime()); + groupApproveInfo.setRetentionTimeUnit(pulsarInfo.getRetentionTimeUnit()); + groupApproveInfo.setTtl(pulsarInfo.getTtl()); + groupApproveInfo.setTtlUnit(pulsarInfo.getTtlUnit()); + groupApproveInfo.setRetentionSize(pulsarInfo.getRetentionSize()); + groupApproveInfo.setRetentionSizeUnit(pulsarInfo.getRetentionSizeUnit()); + } } JsonArray streamJson = formData.getAsJsonArray("streamInfoList"); List streamApproveList = GsonUtil.fromJson(streamJson.toString(), diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java index ff6ecef95a5..f4d3a1e86c0 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java @@ -24,12 +24,14 @@ import java.util.Date; import java.util.List; +import lombok.NoArgsConstructor; /** * Response of the stream sink */ @Data @ApiModel("Response of the stream sink") +@NoArgsConstructor public class SinkResponse { private Integer id; @@ -40,12 +42,12 @@ public class SinkResponse { @ApiModelProperty("Inlong stream id") private String inlongStreamId; - @ApiModelProperty("Sink type, including: HIVE, ES, etc.") - private String sinkType; - @ApiModelProperty("Sink name, unique in one stream.") private String sinkName; + @ApiModelProperty("Sink type, including: HIVE, ES, etc.") + protected String sinkType; + @ApiModelProperty("Data storage period, unit: day") private Integer storagePeriod; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java index 3afffb40b62..0d669589ff6 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java @@ -34,7 +34,9 @@ @ApiModel(value = "Response of the ClickHouse sink") public class ClickHouseSinkResponse extends SinkResponse { - private String sinkType = Constant.SINK_CLICKHOUSE; + public ClickHouseSinkResponse() { + this.sinkType = Constant.SINK_CLICKHOUSE; + } @ApiModelProperty("ClickHouse JDBC URL") private String jdbcUrl; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java index f0a7b16e901..aa66d33ad0e 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java @@ -34,7 +34,9 @@ @ApiModel(value = "Response of the Hive sink") public class HiveSinkResponse extends SinkResponse { - private String sinkType = Constant.SINK_HIVE; + public HiveSinkResponse() { + this.sinkType = Constant.SINK_HIVE; + } @ApiModelProperty("Hive JDBC URL") private String jdbcUrl; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java index f64a3aed369..1abe819b082 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java @@ -34,7 +34,9 @@ @ApiModel(value = "Response of the Iceberg sink") public class IcebergSinkResponse extends SinkResponse { - private String sinkType = Constant.SINK_ICEBERG; + public IcebergSinkResponse() { + this.sinkType = Constant.SINK_ICEBERG; + } @ApiModelProperty("table Location like hdfs://") private String tableLocation; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java index 6f85bd55a27..665a74a73e2 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java @@ -34,7 +34,9 @@ @ApiModel(value = "Response of the Kafka sink") public class KafkaSinkResponse extends SinkResponse { - private String sinkType = Constant.SINK_KAFKA; + public KafkaSinkResponse() { + this.sinkType = Constant.SINK_KAFKA; + } @ApiModelProperty("Kafka bootstrap servers") private String address; diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql index dc86a21495c..499999e523e 100644 --- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql @@ -561,7 +561,7 @@ CREATE TABLE `stream_source` `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id', `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id', `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', - `source_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'source_name', + `source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `server_id` int(11) DEFAULT NULL COMMENT 'Id of the source server', @@ -592,7 +592,7 @@ CREATE TABLE `stream_sink` `inlong_group_id` varchar(256) NOT NULL COMMENT 'Owning inlong group id', `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Owning inlong stream id', `sink_type` varchar(15) DEFAULT 'HIVE' COMMENT 'Sink type, including: HIVE, ES, etc', - `sink_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'Sink name', + `sink_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name', `storage_period` int(11) DEFAULT '10' COMMENT 'Data storage period, unit: day', `enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1', `ext_params` text COMMENT 'Another fields, will saved as JSON type', diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 3fecc58b0e6..63172489427 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -592,7 +592,7 @@ CREATE TABLE `stream_source` `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID', `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id', `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id', - `source_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'source_name', + `source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', @@ -625,11 +625,11 @@ CREATE TABLE `stream_sink` `inlong_group_id` varchar(256) NOT NULL COMMENT 'Owning inlong group id', `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Owning inlong stream id', `sink_type` varchar(15) DEFAULT 'HIVE' COMMENT 'Sink type, including: HIVE, ES, etc', - `sink_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'Sink name', + `sink_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name', `storage_period` int(11) DEFAULT '10' COMMENT 'Data storage period, unit: day', `enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1', `ext_params` text COMMENT 'Another fields, will saved as JSON type', - `operate_log` varchar(5000) DEFAULT NULL COMMENT 'Background operate log', + `operate_log` text DEFAULT NULL COMMENT 'Background operate log', `status` int(11) DEFAULT '0' COMMENT 'Status', `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status', `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',