Skip to content

Commit dd3cf6e

Browse files
authored
[INLONG-5380][Manager] Modify the saving function of the data node (#5381)
1 parent cbfe2f1 commit dd3cf6e

File tree

21 files changed

+724
-116
lines changed

21 files changed

+724
-116
lines changed

inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java

+7-10
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@
6262
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
6363
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
6464
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
65-
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
6665
import org.apache.inlong.manager.pojo.node.DataNodeResponse;
66+
import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
6767
import org.apache.inlong.manager.pojo.sink.StreamSink;
6868
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
6969
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
@@ -965,9 +965,8 @@ void testSaveDataNode() {
965965
Response.success(1))
966966
))
967967
);
968-
DataNodeRequest request = new DataNodeRequest();
969-
request.setName("test_node");
970-
request.setType(DataNodeType.HIVE);
968+
HiveDataNodeRequest request = new HiveDataNodeRequest();
969+
request.setName("test_hive_node");
971970
Integer nodeId = dataNodeClient.save(request);
972971
Assertions.assertEquals(1, nodeId);
973972
}
@@ -1007,9 +1006,8 @@ void testListDataNode() {
10071006
)
10081007
);
10091008

1010-
DataNodeRequest request = new DataNodeRequest();
1011-
request.setName("test_node");
1012-
request.setToken(DataNodeType.HIVE);
1009+
HiveDataNodeRequest request = new HiveDataNodeRequest();
1010+
request.setName("test_hive_node");
10131011
PageInfo<DataNodeResponse> nodePageInfo = dataNodeClient.list(request);
10141012
Assertions.assertEquals(JsonUtils.toJsonString(nodePageInfo.getList()), JsonUtils.toJsonString(nodeResponses));
10151013
}
@@ -1025,10 +1023,9 @@ void testUpdateDataNode() {
10251023
)
10261024
);
10271025

1028-
DataNodeRequest request = new DataNodeRequest();
1026+
HiveDataNodeRequest request = new HiveDataNodeRequest();
10291027
request.setId(1);
1030-
request.setName("test_node");
1031-
request.setType(DataNodeType.HIVE);
1028+
request.setName("test_hive_node");
10321029
Boolean isUpdate = dataNodeClient.update(request);
10331030
Assertions.assertTrue(isUpdate);
10341031
}

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ public enum ErrorCodeEnum {
6060
CLUSTER_TYPE_NOT_SUPPORTED(1102, "Cluster type '%s' not supported"),
6161
CLUSTER_INFO_INCORRECT(1103, "Cluster info was incorrect"),
6262

63+
DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"),
64+
DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"),
65+
6366
STREAM_NOT_FOUND(1201, "Inlong stream does not exist/no operation permission"),
6467
STREAM_ID_DUPLICATE(1202, "The current inlong group has a inlong stream with the same ID"),
6568
STREAM_OPT_NOT_ALLOWED(1203,

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java

+20-6
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public static <T> T request(RestTemplate restTemplate, String url, HttpMethod me
9898
/**
9999
* Send an HTTP request
100100
*/
101-
public <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
101+
public static <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
102102
HttpHeaders header, ParameterizedTypeReference<T> typeReference) {
103103
if (log.isDebugEnabled()) {
104104
log.debug("begin request to {} by request body {}", url, GSON.toJson(requestBody));
@@ -112,17 +112,31 @@ public <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMetho
112112
return response.getBody();
113113
}
114114

115-
public <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
115+
/**
116+
* Send GET request to the specified URL.
117+
*/
118+
public static <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params,
119+
HttpHeaders header, ParameterizedTypeReference<T> typeReference) {
120+
return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference);
121+
}
122+
123+
/**
124+
* Send PUT request to the specified URL.
125+
*/
126+
public static <T> T putRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
116127
ParameterizedTypeReference<T> typeReference) {
117-
return request(restTemplate, url, HttpMethod.POST, params, header, typeReference);
128+
return request(restTemplate, url, HttpMethod.PUT, params, header, typeReference);
118129
}
119130

120-
public <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params, HttpHeaders header,
131+
/**
132+
* Send POST request to the specified URL.
133+
*/
134+
public static <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
121135
ParameterizedTypeReference<T> typeReference) {
122-
return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference);
136+
return request(restTemplate, url, HttpMethod.POST, params, header, typeReference);
123137
}
124138

125-
private String buildUrlWithQueryParam(String url, Map<String, Object> params) {
139+
private static String buildUrlWithQueryParam(String url, Map<String, Object> params) {
126140
if (params == null) {
127141
return url;
128142
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.pojo.node;
19+
20+
import com.fasterxml.jackson.annotation.JsonFormat;
21+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
22+
import io.swagger.annotations.ApiModel;
23+
import io.swagger.annotations.ApiModelProperty;
24+
import lombok.AllArgsConstructor;
25+
import lombok.Data;
26+
import lombok.NoArgsConstructor;
27+
28+
import java.util.Date;
29+
30+
/**
31+
* Data node info
32+
*/
33+
@Data
34+
@NoArgsConstructor
35+
@AllArgsConstructor
36+
@ApiModel("Data node info")
37+
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
38+
public abstract class DataNodeInfo {
39+
40+
@ApiModelProperty(value = "Primary key")
41+
private Integer id;
42+
43+
@ApiModelProperty(value = "Data node name")
44+
private String name;
45+
46+
@ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
47+
private String type;
48+
49+
@ApiModelProperty(value = "Data node URL")
50+
private String url;
51+
52+
@ApiModelProperty("Data node username")
53+
private String username;
54+
55+
@ApiModelProperty(value = "Data node token if needed")
56+
private String token;
57+
58+
@ApiModelProperty(value = "Extended params")
59+
private String extParams;
60+
61+
@ApiModelProperty(value = "Description of the data node")
62+
private String description;
63+
64+
@ApiModelProperty(value = "Name of in charges, separated by commas")
65+
private String inCharges;
66+
67+
@ApiModelProperty(value = "Name of in creator")
68+
private String creator;
69+
70+
@ApiModelProperty(value = "Name of in modifier")
71+
private String modifier;
72+
73+
@ApiModelProperty(value = "Version number")
74+
private Integer version;
75+
76+
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
77+
private Date createTime;
78+
79+
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
80+
private Date modifyTime;
81+
82+
public abstract DataNodeRequest genRequest();
83+
}

inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
@ApiModel("Data node paging query request")
3232
public class DataNodePageRequest extends PageRequest {
3333

34-
@ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
34+
@ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
3535
private String type;
3636

37-
@ApiModelProperty(value = "Node name")
37+
@ApiModelProperty(value = "Data node name")
3838
private String name;
3939

4040
@ApiModelProperty(value = "Keywords, name, url, etc.")

inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
package org.apache.inlong.manager.pojo.node;
1919

20+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
2021
import io.swagger.annotations.ApiModel;
2122
import io.swagger.annotations.ApiModelProperty;
2223
import lombok.AllArgsConstructor;
23-
import lombok.Builder;
2424
import lombok.Data;
2525
import lombok.NoArgsConstructor;
2626
import org.apache.inlong.manager.common.validation.UpdateValidation;
@@ -32,31 +32,31 @@
3232
* Data node request
3333
*/
3434
@Data
35-
@Builder
3635
@NoArgsConstructor
3736
@AllArgsConstructor
3837
@ApiModel("Data node request")
39-
public class DataNodeRequest {
38+
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
39+
public abstract class DataNodeRequest {
4040

4141
@NotNull(groups = UpdateValidation.class)
4242
@ApiModelProperty(value = "Primary key")
4343
private Integer id;
4444

4545
@NotBlank(message = "node name cannot be blank")
46-
@ApiModelProperty(value = "Node name")
46+
@ApiModelProperty(value = "Data node name")
4747
private String name;
4848

4949
@NotBlank(message = "node type cannot be blank")
50-
@ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
50+
@ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
5151
private String type;
5252

53-
@ApiModelProperty(value = "Node url")
53+
@ApiModelProperty(value = "Data node URL")
5454
private String url;
5555

56-
@ApiModelProperty(value = "Node username")
56+
@ApiModelProperty(value = "Data node username")
5757
private String username;
5858

59-
@ApiModelProperty(value = "Node token if needed")
59+
@ApiModelProperty(value = "Data node token if needed")
6060
private String token;
6161

6262
@ApiModelProperty(value = "Extended params")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.pojo.node.hive;
19+
20+
import com.fasterxml.jackson.databind.DeserializationFeature;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import io.swagger.annotations.ApiModel;
23+
import io.swagger.annotations.ApiModelProperty;
24+
import lombok.AllArgsConstructor;
25+
import lombok.Builder;
26+
import lombok.Data;
27+
import lombok.NoArgsConstructor;
28+
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
29+
import org.apache.inlong.manager.common.exceptions.BusinessException;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import javax.validation.constraints.NotNull;
34+
35+
/**
36+
* Hive data node info
37+
*/
38+
@Data
39+
@Builder
40+
@NoArgsConstructor
41+
@AllArgsConstructor
42+
@ApiModel("Hive data node info")
43+
public class HiveDataNodeDTO {
44+
45+
private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeDTO.class);
46+
47+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
48+
49+
@ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
50+
private String jdbcUrl;
51+
52+
@ApiModelProperty("Version for Hive, such as: 3.2.1")
53+
private String hiveVersion;
54+
55+
@ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
56+
private String hiveConfDir;
57+
58+
@ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
59+
private String hdfsPath;
60+
61+
@ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
62+
private String warehouse;
63+
64+
@ApiModelProperty("User and group information for writing data to HDFS")
65+
private String hdfsUgi;
66+
67+
/**
68+
* Get the dto instance from the request
69+
*/
70+
public static HiveDataNodeDTO getFromRequest(HiveDataNodeRequest request) throws Exception {
71+
return HiveDataNodeDTO.builder()
72+
.jdbcUrl(request.getJdbcUrl())
73+
.hiveVersion(request.getHiveVersion())
74+
.hiveConfDir(request.getHiveConfDir())
75+
.hdfsPath(request.getHdfsPath())
76+
.warehouse(request.getWarehouse())
77+
.hdfsUgi(request.getHdfsUgi())
78+
.build();
79+
}
80+
81+
/**
82+
* Get the dto instance from the JSON string.
83+
*/
84+
public static HiveDataNodeDTO getFromJson(@NotNull String extParams) {
85+
try {
86+
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
87+
return OBJECT_MAPPER.readValue(extParams, HiveDataNodeDTO.class);
88+
} catch (Exception e) {
89+
LOGGER.error("Failed to extract additional parameters for hive data node: ", e);
90+
throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
91+
}
92+
}
93+
94+
}

0 commit comments

Comments
 (0)