Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-2898][Manager] Fix Json parsing exceptions #2899

Merged
merged 1 commit into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private List<InlongStream> fetchDataStreams(String groupId) {
InlongStreamImpl inlongStream = new InlongStreamImpl(fullStreamResponse, stream);
if (CollectionUtils.isNotEmpty(sourceListResponses)) {
for (SourceListResponse response : sourceListResponses) {
inlongStream.addSource(
inlongStream.updateSource(
InlongStreamSourceTransfer.parseStreamSource(response));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,7 @@ public List<FullStreamResponse> listStreamInfo(String inlongGroupId) {
org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
AssertUtil.isTrue(responseBody.getErrMsg() == null,
String.format("Inlong request failed: %s", responseBody.getErrMsg()));
PageInfo<FullStreamResponse> pageInfo = InlongParser.parseStreamList(responseBody);
return pageInfo.getList();
return InlongParser.parseStreamList(responseBody);
} catch (Exception e) {
throw new RuntimeException(String.format("List inlong streams failed: %s", e.getMessage()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,30 @@

package org.apache.inlong.manager.client.api.util;

import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;

import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
Expand All @@ -39,16 +50,17 @@
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;

import java.util.List;

import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;

/**
* Parser for Inlong entity
*/
public class InlongParser {

public static final String GROUP_INFO = "groupInfo";
public static final String MQ_EXT_INFO = "mqExtInfo";
public static final String MIDDLEWARE_TYPE = "middlewareType";
public static final String SINK_INFO = "sinkInfo";
public static final String SINK_TYPE = "sinkType";

public static Response parseResponse(String responseBody) {
Response response = GsonUtil.fromJson(responseBody, Response.class);
return response;
Expand All @@ -62,7 +74,16 @@ public static WorkflowResult parseWorkflowResult(Response response) {

public static InlongGroupResponse parseGroupInfo(Response response) {
Object data = response.getData();
return GsonUtil.fromJson(GsonUtil.toJson(data), InlongGroupResponse.class);
JsonObject groupJson = GsonUtil.fromJson(GsonUtil.toJson(data), JsonObject.class);
InlongGroupResponse inlongGroupResponse = GsonUtil.fromJson(GsonUtil.toJson(data), InlongGroupResponse.class);
JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO);
if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
if (Constant.MIDDLEWARE_PULSAR.equals(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString())) {
InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
inlongGroupResponse.setMqExtInfo(pulsarInfo);
}
}
return inlongGroupResponse;
}

public static PageInfo<InlongGroupListResponse> parseGroupList(Response response) {
Expand All @@ -78,12 +99,51 @@ public static InlongStreamInfo parseStreamInfo(Response response) {
return GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamInfo.class);
}

public static PageInfo<FullStreamResponse> parseStreamList(Response response) {
public static List<FullStreamResponse> parseStreamList(Response response) {
Object data = response.getData();
String pageInfoJson = GsonUtil.toJson(data);
return GsonUtil.fromJson(pageInfoJson,
new TypeToken<PageInfo<FullStreamResponse>>() {
}.getType());
JsonObject pageInfoJson = GsonUtil.fromJson(GsonUtil.toJson(data), JsonObject.class);
JsonArray fullStreamArray = pageInfoJson.getAsJsonArray("list");
List<FullStreamResponse> list = Lists.newArrayList();
for (int i = 0; i < fullStreamArray.size(); i++) {
JsonObject fullStreamJson = (JsonObject) fullStreamArray.get(i);
FullStreamResponse fullStreamResponse = GsonUtil.fromJson(fullStreamJson.toString(),
FullStreamResponse.class);
list.add(fullStreamResponse);
//Parse sinkResponse in each stream
JsonArray sinkJsonArr = fullStreamJson.getAsJsonArray(SINK_INFO);
List<SinkResponse> sinkResponses = Lists.newArrayList();
fullStreamResponse.setSinkInfo(sinkResponses);
for (int j = 0; j < sinkJsonArr.size(); j++) {
JsonObject sinkJson = (JsonObject) sinkJsonArr.get(i);
String type = sinkJson.get(SINK_TYPE).getAsString();
SinkType sinkType = SinkType.forType(type);
switch (sinkType) {
case HIVE:
HiveSinkResponse hiveSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
HiveSinkResponse.class);
sinkResponses.add(hiveSinkResponse);
break;
case KAFKA:
KafkaSinkResponse kafkaSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
KafkaSinkResponse.class);
sinkResponses.add(kafkaSinkResponse);
break;
case ICEBERG:
IcebergSinkResponse icebergSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
IcebergSinkResponse.class);
sinkResponses.add(icebergSinkResponse);
break;
case CLICKHOUSE:
ClickHouseSinkResponse clickHouseSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
ClickHouseSinkResponse.class);
sinkResponses.add(clickHouseSinkResponse);
break;
default:
throw new RuntimeException(String.format("Unsupport sinkType=%s for Inlong", sinkType));
}
}
}
return list;
}

public static PageInfo<SourceListResponse> parseSourceList(Response response) {
Expand All @@ -107,9 +167,9 @@ public static PageInfo<SourceListResponse> parseSourceList(Response response) {
}
throw new IllegalArgumentException(
String.format("Unsupported sourceType=%s for Inlong", sourceType));

} else {
return new PageInfo<>();
}
throw new IllegalArgumentException(String.format("pageInfo is empty for Inlong"));
}

public static PageInfo<SinkListResponse> parseSinkList(Response response) {
Expand All @@ -121,25 +181,24 @@ public static PageInfo<SinkListResponse> parseSinkList(Response response) {
}

public static Pair<InlongGroupApproveRequest, List<InlongStreamApproveRequest>> parseGroupForm(String formJson) {
final String groupInfoField = "groupInfo";
final String mqExtInfoField = "mqExtInfo";
JsonObject formData = GsonUtil.fromJson(formJson, JsonObject.class);
JsonObject groupJson = formData.getAsJsonObject(groupInfoField);
JsonObject groupJson = formData.getAsJsonObject(GROUP_INFO);
InlongGroupApproveRequest groupApproveInfo = GsonUtil.fromJson(groupJson.toString(),
InlongGroupApproveRequest.class);
JsonObject mqExtInfo = groupJson.getAsJsonObject(mqExtInfoField);
if (mqExtInfo != null && mqExtInfo.get("middlewareType") != null
&& Constant.MIDDLEWARE_PULSAR.equals(mqExtInfo.get("middlewareType").getAsString())) {
InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum());
groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble());
groupApproveInfo.setWriteQuorum(pulsarInfo.getWriteQuorum());
groupApproveInfo.setRetentionTime(pulsarInfo.getRetentionTime());
groupApproveInfo.setRetentionTimeUnit(pulsarInfo.getRetentionTimeUnit());
groupApproveInfo.setTtl(pulsarInfo.getTtl());
groupApproveInfo.setTtlUnit(pulsarInfo.getTtlUnit());
groupApproveInfo.setRetentionSize(pulsarInfo.getRetentionSize());
groupApproveInfo.setRetentionSizeUnit(pulsarInfo.getRetentionSizeUnit());
JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO);
if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
if (Constant.MIDDLEWARE_PULSAR.equals(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString())) {
InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum());
groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble());
groupApproveInfo.setWriteQuorum(pulsarInfo.getWriteQuorum());
groupApproveInfo.setRetentionTime(pulsarInfo.getRetentionTime());
groupApproveInfo.setRetentionTimeUnit(pulsarInfo.getRetentionTimeUnit());
groupApproveInfo.setTtl(pulsarInfo.getTtl());
groupApproveInfo.setTtlUnit(pulsarInfo.getTtlUnit());
groupApproveInfo.setRetentionSize(pulsarInfo.getRetentionSize());
groupApproveInfo.setRetentionSizeUnit(pulsarInfo.getRetentionSizeUnit());
}
}
JsonArray streamJson = formData.getAsJsonArray("streamInfoList");
List<InlongStreamApproveRequest> streamApproveList = GsonUtil.fromJson(streamJson.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

import java.util.Date;
import java.util.List;
import lombok.NoArgsConstructor;

/**
* Response of the stream sink
*/
@Data
@ApiModel("Response of the stream sink")
@NoArgsConstructor
public class SinkResponse {

private Integer id;
Expand All @@ -40,12 +42,12 @@ public class SinkResponse {
@ApiModelProperty("Inlong stream id")
private String inlongStreamId;

@ApiModelProperty("Sink type, including: HIVE, ES, etc.")
private String sinkType;

@ApiModelProperty("Sink name, unique in one stream.")
private String sinkName;

@ApiModelProperty("Sink type, including: HIVE, ES, etc.")
protected String sinkType;

@ApiModelProperty("Data storage period, unit: day")
private Integer storagePeriod;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
@ApiModel(value = "Response of the ClickHouse sink")
public class ClickHouseSinkResponse extends SinkResponse {

private String sinkType = Constant.SINK_CLICKHOUSE;
public ClickHouseSinkResponse() {
this.sinkType = Constant.SINK_CLICKHOUSE;
}

@ApiModelProperty("ClickHouse JDBC URL")
private String jdbcUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
@ApiModel(value = "Response of the Hive sink")
public class HiveSinkResponse extends SinkResponse {

private String sinkType = Constant.SINK_HIVE;
public HiveSinkResponse() {
this.sinkType = Constant.SINK_HIVE;
}

@ApiModelProperty("Hive JDBC URL")
private String jdbcUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
@ApiModel(value = "Response of the Iceberg sink")
public class IcebergSinkResponse extends SinkResponse {

private String sinkType = Constant.SINK_ICEBERG;
public IcebergSinkResponse() {
this.sinkType = Constant.SINK_ICEBERG;
}

@ApiModelProperty("table Location like hdfs://")
private String tableLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
@ApiModel(value = "Response of the Kafka sink")
public class KafkaSinkResponse extends SinkResponse {

private String sinkType = Constant.SINK_KAFKA;
public KafkaSinkResponse() {
this.sinkType = Constant.SINK_KAFKA;
}

@ApiModelProperty("Kafka bootstrap servers")
private String address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ CREATE TABLE `stream_source`
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
`source_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'source_name',
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
`server_id` int(11) DEFAULT NULL COMMENT 'Id of the source server',
Expand Down Expand Up @@ -592,7 +592,7 @@ CREATE TABLE `stream_sink`
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Owning inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Owning inlong stream id',
`sink_type` varchar(15) DEFAULT 'HIVE' COMMENT 'Sink type, including: HIVE, ES, etc',
`sink_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'Sink name',
`sink_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name',
`storage_period` int(11) DEFAULT '10' COMMENT 'Data storage period, unit: day',
`enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1',
`ext_params` text COMMENT 'Another fields, will saved as JSON type',
Expand Down
6 changes: 3 additions & 3 deletions inlong-manager/manager-web/sql/apache_inlong_manager.sql
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ CREATE TABLE `stream_source`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
`source_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'source_name',
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
Expand Down Expand Up @@ -625,11 +625,11 @@ CREATE TABLE `stream_sink`
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Owning inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Owning inlong stream id',
`sink_type` varchar(15) DEFAULT 'HIVE' COMMENT 'Sink type, including: HIVE, ES, etc',
`sink_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'Sink name',
`sink_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name',
`storage_period` int(11) DEFAULT '10' COMMENT 'Data storage period, unit: day',
`enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1',
`ext_params` text COMMENT 'Another fields, will saved as JSON type',
`operate_log` varchar(5000) DEFAULT NULL COMMENT 'Background operate log',
`operate_log` text DEFAULT NULL COMMENT 'Background operate log',
`status` int(11) DEFAULT '0' COMMENT 'Status',
`previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
Expand Down