Skip to content

Commit 529bb2e

Browse files
authored
[INLONG-2898][Manager] Fix Json parsing exceptions #2899
1 parent d918248 commit 529bb2e

File tree

10 files changed

+112
-44
lines changed

10 files changed

+112
-44
lines changed

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private List<InlongStream> fetchDataStreams(String groupId) {
216216
InlongStreamImpl inlongStream = new InlongStreamImpl(fullStreamResponse, stream);
217217
if (CollectionUtils.isNotEmpty(sourceListResponses)) {
218218
for (SourceListResponse response : sourceListResponses) {
219-
inlongStream.addSource(
219+
inlongStream.updateSource(
220220
InlongStreamSourceTransfer.parseStreamSource(response));
221221
}
222222
}

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,7 @@ public List<FullStreamResponse> listStreamInfo(String inlongGroupId) {
337337
org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
338338
AssertUtil.isTrue(responseBody.getErrMsg() == null,
339339
String.format("Inlong request failed: %s", responseBody.getErrMsg()));
340-
PageInfo<FullStreamResponse> pageInfo = InlongParser.parseStreamList(responseBody);
341-
return pageInfo.getList();
340+
return InlongParser.parseStreamList(responseBody);
342341
} catch (Exception e) {
343342
throw new RuntimeException(String.format("List inlong streams failed: %s", e.getMessage()), e);
344343
}

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java

+88-29
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,30 @@
1717

1818
package org.apache.inlong.manager.client.api.util;
1919

20+
import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
21+
import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
22+
2023
import com.github.pagehelper.PageInfo;
24+
import com.google.common.collect.Lists;
2125
import com.google.common.reflect.TypeToken;
2226
import com.google.gson.JsonArray;
2327
import com.google.gson.JsonObject;
28+
import java.util.List;
2429
import org.apache.commons.lang3.tuple.Pair;
2530
import org.apache.inlong.manager.common.beans.Response;
2631
import org.apache.inlong.manager.common.enums.Constant;
32+
import org.apache.inlong.manager.common.enums.SinkType;
2733
import org.apache.inlong.manager.common.enums.SourceType;
2834
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
2935
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
3036
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
3137
import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
3238
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
39+
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
40+
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
41+
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
42+
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
43+
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
3344
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
3445
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
3546
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
@@ -39,16 +50,17 @@
3950
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
4051
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
4152

42-
import java.util.List;
43-
44-
import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
45-
import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
46-
4753
/**
4854
* Parser for Inlong entity
4955
*/
5056
public class InlongParser {
5157

58+
public static final String GROUP_INFO = "groupInfo";
59+
public static final String MQ_EXT_INFO = "mqExtInfo";
60+
public static final String MIDDLEWARE_TYPE = "middlewareType";
61+
public static final String SINK_INFO = "sinkInfo";
62+
public static final String SINK_TYPE = "sinkType";
63+
5264
public static Response parseResponse(String responseBody) {
5365
Response response = GsonUtil.fromJson(responseBody, Response.class);
5466
return response;
@@ -62,7 +74,16 @@ public static WorkflowResult parseWorkflowResult(Response response) {
6274

6375
public static InlongGroupResponse parseGroupInfo(Response response) {
6476
Object data = response.getData();
65-
return GsonUtil.fromJson(GsonUtil.toJson(data), InlongGroupResponse.class);
77+
JsonObject groupJson = GsonUtil.fromJson(GsonUtil.toJson(data), JsonObject.class);
78+
InlongGroupResponse inlongGroupResponse = GsonUtil.fromJson(GsonUtil.toJson(data), InlongGroupResponse.class);
79+
JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO);
80+
if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
81+
if (Constant.MIDDLEWARE_PULSAR.equals(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString())) {
82+
InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
83+
inlongGroupResponse.setMqExtInfo(pulsarInfo);
84+
}
85+
}
86+
return inlongGroupResponse;
6687
}
6788

6889
public static PageInfo<InlongGroupListResponse> parseGroupList(Response response) {
@@ -78,12 +99,51 @@ public static InlongStreamInfo parseStreamInfo(Response response) {
7899
return GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamInfo.class);
79100
}
80101

81-
public static PageInfo<FullStreamResponse> parseStreamList(Response response) {
102+
public static List<FullStreamResponse> parseStreamList(Response response) {
82103
Object data = response.getData();
83-
String pageInfoJson = GsonUtil.toJson(data);
84-
return GsonUtil.fromJson(pageInfoJson,
85-
new TypeToken<PageInfo<FullStreamResponse>>() {
86-
}.getType());
104+
JsonObject pageInfoJson = GsonUtil.fromJson(GsonUtil.toJson(data), JsonObject.class);
105+
JsonArray fullStreamArray = pageInfoJson.getAsJsonArray("list");
106+
List<FullStreamResponse> list = Lists.newArrayList();
107+
for (int i = 0; i < fullStreamArray.size(); i++) {
108+
JsonObject fullStreamJson = (JsonObject) fullStreamArray.get(i);
109+
FullStreamResponse fullStreamResponse = GsonUtil.fromJson(fullStreamJson.toString(),
110+
FullStreamResponse.class);
111+
list.add(fullStreamResponse);
112+
//Parse sinkResponse in each stream
113+
JsonArray sinkJsonArr = fullStreamJson.getAsJsonArray(SINK_INFO);
114+
List<SinkResponse> sinkResponses = Lists.newArrayList();
115+
fullStreamResponse.setSinkInfo(sinkResponses);
116+
for (int j = 0; j < sinkJsonArr.size(); j++) {
117+
JsonObject sinkJson = (JsonObject) sinkJsonArr.get(i);
118+
String type = sinkJson.get(SINK_TYPE).getAsString();
119+
SinkType sinkType = SinkType.forType(type);
120+
switch (sinkType) {
121+
case HIVE:
122+
HiveSinkResponse hiveSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
123+
HiveSinkResponse.class);
124+
sinkResponses.add(hiveSinkResponse);
125+
break;
126+
case KAFKA:
127+
KafkaSinkResponse kafkaSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
128+
KafkaSinkResponse.class);
129+
sinkResponses.add(kafkaSinkResponse);
130+
break;
131+
case ICEBERG:
132+
IcebergSinkResponse icebergSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
133+
IcebergSinkResponse.class);
134+
sinkResponses.add(icebergSinkResponse);
135+
break;
136+
case CLICKHOUSE:
137+
ClickHouseSinkResponse clickHouseSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
138+
ClickHouseSinkResponse.class);
139+
sinkResponses.add(clickHouseSinkResponse);
140+
break;
141+
default:
142+
throw new RuntimeException(String.format("Unsupport sinkType=%s for Inlong", sinkType));
143+
}
144+
}
145+
}
146+
return list;
87147
}
88148

89149
public static PageInfo<SourceListResponse> parseSourceList(Response response) {
@@ -107,9 +167,9 @@ public static PageInfo<SourceListResponse> parseSourceList(Response response) {
107167
}
108168
throw new IllegalArgumentException(
109169
String.format("Unsupported sourceType=%s for Inlong", sourceType));
110-
170+
} else {
171+
return new PageInfo<>();
111172
}
112-
throw new IllegalArgumentException(String.format("pageInfo is empty for Inlong"));
113173
}
114174

115175
public static PageInfo<SinkListResponse> parseSinkList(Response response) {
@@ -121,25 +181,24 @@ public static PageInfo<SinkListResponse> parseSinkList(Response response) {
121181
}
122182

123183
public static Pair<InlongGroupApproveRequest, List<InlongStreamApproveRequest>> parseGroupForm(String formJson) {
124-
final String groupInfoField = "groupInfo";
125-
final String mqExtInfoField = "mqExtInfo";
126184
JsonObject formData = GsonUtil.fromJson(formJson, JsonObject.class);
127-
JsonObject groupJson = formData.getAsJsonObject(groupInfoField);
185+
JsonObject groupJson = formData.getAsJsonObject(GROUP_INFO);
128186
InlongGroupApproveRequest groupApproveInfo = GsonUtil.fromJson(groupJson.toString(),
129187
InlongGroupApproveRequest.class);
130-
JsonObject mqExtInfo = groupJson.getAsJsonObject(mqExtInfoField);
131-
if (mqExtInfo != null && mqExtInfo.get("middlewareType") != null
132-
&& Constant.MIDDLEWARE_PULSAR.equals(mqExtInfo.get("middlewareType").getAsString())) {
133-
InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
134-
groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum());
135-
groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble());
136-
groupApproveInfo.setWriteQuorum(pulsarInfo.getWriteQuorum());
137-
groupApproveInfo.setRetentionTime(pulsarInfo.getRetentionTime());
138-
groupApproveInfo.setRetentionTimeUnit(pulsarInfo.getRetentionTimeUnit());
139-
groupApproveInfo.setTtl(pulsarInfo.getTtl());
140-
groupApproveInfo.setTtlUnit(pulsarInfo.getTtlUnit());
141-
groupApproveInfo.setRetentionSize(pulsarInfo.getRetentionSize());
142-
groupApproveInfo.setRetentionSizeUnit(pulsarInfo.getRetentionSizeUnit());
188+
JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO);
189+
if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
190+
if (Constant.MIDDLEWARE_PULSAR.equals(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString())) {
191+
InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
192+
groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum());
193+
groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble());
194+
groupApproveInfo.setWriteQuorum(pulsarInfo.getWriteQuorum());
195+
groupApproveInfo.setRetentionTime(pulsarInfo.getRetentionTime());
196+
groupApproveInfo.setRetentionTimeUnit(pulsarInfo.getRetentionTimeUnit());
197+
groupApproveInfo.setTtl(pulsarInfo.getTtl());
198+
groupApproveInfo.setTtlUnit(pulsarInfo.getTtlUnit());
199+
groupApproveInfo.setRetentionSize(pulsarInfo.getRetentionSize());
200+
groupApproveInfo.setRetentionSizeUnit(pulsarInfo.getRetentionSizeUnit());
201+
}
143202
}
144203
JsonArray streamJson = formData.getAsJsonArray("streamInfoList");
145204
List<InlongStreamApproveRequest> streamApproveList = GsonUtil.fromJson(streamJson.toString(),

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424

2525
import java.util.Date;
2626
import java.util.List;
27+
import lombok.NoArgsConstructor;
2728

2829
/**
2930
* Response of the stream sink
3031
*/
3132
@Data
3233
@ApiModel("Response of the stream sink")
34+
@NoArgsConstructor
3335
public class SinkResponse {
3436

3537
private Integer id;
@@ -40,12 +42,12 @@ public class SinkResponse {
4042
@ApiModelProperty("Inlong stream id")
4143
private String inlongStreamId;
4244

43-
@ApiModelProperty("Sink type, including: HIVE, ES, etc.")
44-
private String sinkType;
45-
4645
@ApiModelProperty("Sink name, unique in one stream.")
4746
private String sinkName;
4847

48+
@ApiModelProperty("Sink type, including: HIVE, ES, etc.")
49+
protected String sinkType;
50+
4951
@ApiModelProperty("Data storage period, unit: day")
5052
private Integer storagePeriod;
5153

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
@ApiModel(value = "Response of the ClickHouse sink")
3535
public class ClickHouseSinkResponse extends SinkResponse {
3636

37-
private String sinkType = Constant.SINK_CLICKHOUSE;
37+
public ClickHouseSinkResponse() {
38+
this.sinkType = Constant.SINK_CLICKHOUSE;
39+
}
3840

3941
@ApiModelProperty("ClickHouse JDBC URL")
4042
private String jdbcUrl;

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
@ApiModel(value = "Response of the Hive sink")
3535
public class HiveSinkResponse extends SinkResponse {
3636

37-
private String sinkType = Constant.SINK_HIVE;
37+
public HiveSinkResponse() {
38+
this.sinkType = Constant.SINK_HIVE;
39+
}
3840

3941
@ApiModelProperty("Hive JDBC URL")
4042
private String jdbcUrl;

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
@ApiModel(value = "Response of the Iceberg sink")
3535
public class IcebergSinkResponse extends SinkResponse {
3636

37-
private String sinkType = Constant.SINK_ICEBERG;
37+
public IcebergSinkResponse() {
38+
this.sinkType = Constant.SINK_ICEBERG;
39+
}
3840

3941
@ApiModelProperty("table Location like hdfs://")
4042
private String tableLocation;

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
@ApiModel(value = "Response of the Kafka sink")
3535
public class KafkaSinkResponse extends SinkResponse {
3636

37-
private String sinkType = Constant.SINK_KAFKA;
37+
public KafkaSinkResponse() {
38+
this.sinkType = Constant.SINK_KAFKA;
39+
}
3840

3941
@ApiModelProperty("Kafka bootstrap servers")
4042
private String address;

inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ CREATE TABLE `stream_source`
561561
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
562562
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
563563
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
564-
`source_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'source_name',
564+
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
565565
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task',
566566
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
567567
`server_id` int(11) DEFAULT NULL COMMENT 'Id of the source server',
@@ -592,7 +592,7 @@ CREATE TABLE `stream_sink`
592592
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Owning inlong group id',
593593
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Owning inlong stream id',
594594
`sink_type` varchar(15) DEFAULT 'HIVE' COMMENT 'Sink type, including: HIVE, ES, etc',
595-
`sink_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'Sink name',
595+
`sink_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name',
596596
`storage_period` int(11) DEFAULT '10' COMMENT 'Data storage period, unit: day',
597597
`enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1',
598598
`ext_params` text COMMENT 'Another fields, will saved as JSON type',

inlong-manager/manager-web/sql/apache_inlong_manager.sql

+3-3
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ CREATE TABLE `stream_source`
592592
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
593593
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
594594
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
595-
`source_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'source_name',
595+
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
596596
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
597597
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task',
598598
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
@@ -625,11 +625,11 @@ CREATE TABLE `stream_sink`
625625
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Owning inlong group id',
626626
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Owning inlong stream id',
627627
`sink_type` varchar(15) DEFAULT 'HIVE' COMMENT 'Sink type, including: HIVE, ES, etc',
628-
`sink_name` varchar(256) NOT NULL DEFAULT '' COMMENT 'Sink name',
628+
`sink_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name',
629629
`storage_period` int(11) DEFAULT '10' COMMENT 'Data storage period, unit: day',
630630
`enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1',
631631
`ext_params` text COMMENT 'Another fields, will saved as JSON type',
632-
`operate_log` varchar(5000) DEFAULT NULL COMMENT 'Background operate log',
632+
`operate_log` text DEFAULT NULL COMMENT 'Background operate log',
633633
`status` int(11) DEFAULT '0' COMMENT 'Status',
634634
`previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
635635
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',

0 commit comments

Comments
 (0)