Skip to content

Commit 0222b5b

Browse files
authored
[INLONG-5555][Manager] Fix missing heartbeat fields and the file task split error (#5556)
1 parent 54471bc commit 0222b5b

File tree

21 files changed

+227
-205
lines changed

21 files changed

+227
-205
lines changed

inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
import java.util.stream.Collectors;
6060

6161
import static java.util.Objects.requireNonNull;
62-
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
62+
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
6363
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HOME;
6464
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE;
6565
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE_TIMEOUT;
@@ -116,7 +116,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
116116
private List<String> managerList;
117117
private String localIp;
118118
private String uuid;
119-
private String clusterTag;
119+
private String clusterName;
120120

121121
private CommandDb commandDb;
122122

@@ -132,7 +132,7 @@ public ManagerFetcher(AgentManager agentManager) {
132132
managerDbCollectorTaskUrl = buildDbCollectorGetTaskUrl(baseManagerUrl);
133133
localFileCache = getLocalFileCache();
134134
uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID);
135-
clusterTag = conf.get(AGENT_CLUSTER_TAG);
135+
clusterName = conf.get(AGENT_CLUSTER_NAME);
136136
this.commandDb = agentManager.getCommandDb();
137137
} else {
138138
throw new RuntimeException("init manager error, cannot find required key");
@@ -305,7 +305,7 @@ public TaskRequest getFetchRequest(List<CommandEntity> unackedCommands) {
305305
TaskRequest request = new TaskRequest();
306306
request.setAgentIp(localIp);
307307
request.setUuid(uuid);
308-
request.setClusterTag(clusterTag);
308+
request.setClusterName(clusterName);
309309
// when job size is over limit, require no new job
310310
if (agentManager.getJobManager().isJobOverLimit()) {
311311
request.setPullJobType(PullJobTypeEnum.NEVER.getType());

inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
@Data
3030
public class TaskRequest {
3131

32-
private String clusterTag;
32+
private String clusterName;
3333

3434
private String agentIp;
3535

inlong-dataproxy/conf/common.properties

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ cluster.id=1
2222
manager.hosts=127.0.0.1:8083
2323
manager.auth.secretId=
2424
manager.auth.secretKey=
25-
proxy.local.ip=127.0.0.1
25+
proxy.report.ip=127.0.0.1
26+
proxy.report.port=46801
2627
# proxy cluster name
2728
proxy.cluster.name=default_dataproxy
29+
proxy.cluster.tag=default_cluster
2830
# check interval of local config (millisecond)
2931
configCheckInterval=10000
3032

inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ cd "${file_path}/"
2323
common_conf_file=./conf/common.properties
2424
sed -i "s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g" "${common_conf_file}"
2525
sed -i "s/audit.proxys=.*$/audit.proxys=${AUDIT_PROXY_URL}/g" "${common_conf_file}"
26-
sed -i "s/proxy.local.ip=.*$/proxy.local.ip=${local_ip}/g" "${common_conf_file}"
26+
sed -i "s/proxy.report.ip=.*$/proxy.report.ip=${local_ip}/g" "${common_conf_file}"
27+
sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${common_conf_file}"
2728

2829
# start
2930
if [ "${MQ_TYPE}" = "pulsar" ]; then

inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private void checkLocalFile() {
267267
}
268268
}
269269

270-
private boolean checkWithManager(String host, String clusterName) {
270+
private boolean checkWithManager(String host, String clusterName, String clusterTag) {
271271
if (StringUtils.isEmpty(clusterName)) {
272272
LOG.error("proxyClusterName is null");
273273
return false;
@@ -283,6 +283,7 @@ private boolean checkWithManager(String host, String clusterName) {
283283
// request body
284284
DataProxyConfigRequest request = new DataProxyConfigRequest();
285285
request.setClusterName(clusterName);
286+
request.setClusterTag(clusterTag);
286287
httpPost.setEntity(HttpUtils.getEntity(request));
287288

288289
// request with post
@@ -355,12 +356,13 @@ private void checkRemoteConfig() {
355356
try {
356357
String managerHosts = configManager.getCommonProperties().get(ConfigConstants.MANAGER_HOST);
357358
String proxyClusterName = configManager.getCommonProperties().get(ConfigConstants.PROXY_CLUSTER_NAME);
359+
String proxyClusterTag = configManager.getCommonProperties().get(ConfigConstants.PROXY_CLUSTER_TAG);
358360
if (StringUtils.isEmpty(managerHosts) || StringUtils.isEmpty(proxyClusterName)) {
359361
return;
360362
}
361363
String[] hostList = StringUtils.split(managerHosts, ",");
362364
for (String host : hostList) {
363-
if (checkWithManager(host, proxyClusterName)) {
365+
if (checkWithManager(host, proxyClusterName, proxyClusterTag)) {
364366
break;
365367
}
366368
}

inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ public class ConfigConstants {
9494
public static final String CLUSTER_ID_KEY = "clusterId";
9595
public static final String MANAGER_HOST = "manager.hosts";
9696
public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name";
97-
public static final String PROXY_LOCAL_IP = "proxy.local.ip";
97+
public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
98+
public static final String PROXY_REPORT_IP = "proxy.report.ip";
99+
public static final String PROXY_REPORT_PORT = "proxy.report.port";
98100
public static final String CONFIG_CHECK_INTERVAL = "configCheckInterval";
99101

100102
public static final String DECODER_BODY = "body";

inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@
5050
@Slf4j
5151
public class HeartbeatManager implements AbstractHeartbeatManager {
5252

53+
public static final String DEFAULT_REPORT_PORT = "46801";
54+
public static final String DEFAULT_CLUSTER_TAG = "default_cluster";
55+
public static final String DEFAULT_CLUSTER_NAME = "default_dataproxy";
56+
5357
private final CloseableHttpClient httpClient;
5458
private final Gson gson;
5559

@@ -112,10 +116,16 @@ private HeartbeatMsg buildHeartbeat() {
112116
ConfigManager configManager = ConfigManager.getInstance();
113117
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
114118
Map<String, String> commonProperties = configManager.getCommonProperties();
115-
String localIp = commonProperties.get(ConfigConstants.PROXY_LOCAL_IP);
116-
heartbeatMsg.setIp(localIp);
119+
String reportIp = commonProperties.get(ConfigConstants.PROXY_REPORT_IP);
120+
String reportPort = commonProperties.getOrDefault(ConfigConstants.PROXY_REPORT_PORT, DEFAULT_REPORT_PORT);
121+
heartbeatMsg.setIp(reportIp);
122+
heartbeatMsg.setPort(Integer.parseInt(reportPort));
117123
heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getName());
118124
heartbeatMsg.setReportTime(System.currentTimeMillis());
125+
String clusterTag = commonProperties.getOrDefault(ConfigConstants.PROXY_CLUSTER_TAG, DEFAULT_CLUSTER_TAG);
126+
String clusterName = commonProperties.getOrDefault(ConfigConstants.PROXY_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
127+
heartbeatMsg.setClusterTag(clusterTag);
128+
heartbeatMsg.setClusterName(clusterName);
119129

120130
Map<String, String> groupIdMappings = configManager.getGroupIdMappingProperties();
121131
Map<String, Map<String, String>> streamIdMappings = configManager.getStreamIdMappingProperties();

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java

+11-20
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.common.collect.Lists;
2121
import com.google.common.collect.Maps;
22+
import lombok.extern.slf4j.Slf4j;
2223
import org.apache.commons.collections.CollectionUtils;
2324
import org.apache.commons.collections.MapUtils;
2425
import org.apache.commons.lang3.tuple.Pair;
@@ -53,6 +54,7 @@
5354
/**
5455
* Default inlong stream builder.
5556
*/
57+
@Slf4j
5658
public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
5759

5860
private final InlongStreamImpl inlongStream;
@@ -183,14 +185,7 @@ private void initOrUpdateTransform() {
183185
StreamTransform transform = StreamTransformTransfer.parseStreamTransform(transformResponse);
184186
final String transformName = transform.getTransformName();
185187
final int id = transformResponse.getId();
186-
if (transformRequests.get(transformName) == null) {
187-
TransformRequest transformRequest = StreamTransformTransfer.createTransformRequest(transform,
188-
streamInfo);
189-
boolean isDelete = transformClient.deleteTransform(transformRequest);
190-
if (!isDelete) {
191-
throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
192-
}
193-
} else {
188+
if (transformRequests.get(transformName) != null) {
194189
TransformRequest transformRequest = transformRequests.get(transformName);
195190
transformRequest.setId(id);
196191
transformRequest.setVersion(transformResponse.getVersion());
@@ -201,6 +196,8 @@ private void initOrUpdateTransform() {
201196
}
202197
transformRequest.setId(transformResponse.getId());
203198
updateTransformNames.add(transformName);
199+
} else {
200+
log.warn("Unknown transform {} from server", transformName);
204201
}
205202
}
206203
for (Map.Entry<String, TransformRequest> requestEntry : transformRequests.entrySet()) {
@@ -224,12 +221,7 @@ private void initOrUpdateSource() {
224221
for (StreamSource source : streamSources) {
225222
final String sourceName = source.getSourceName();
226223
final int id = source.getId();
227-
if (sourceRequests.get(sourceName) == null) {
228-
boolean isDelete = sourceClient.deleteSource(id);
229-
if (!isDelete) {
230-
throw new RuntimeException(String.format("Delete source failed by id=%s", id));
231-
}
232-
} else {
224+
if (sourceRequests.get(sourceName) != null) {
233225
SourceRequest sourceRequest = sourceRequests.get(sourceName);
234226
sourceRequest.setId(id);
235227
sourceRequest.setVersion(source.getVersion());
@@ -240,6 +232,8 @@ private void initOrUpdateSource() {
240232
}
241233
updateSourceNames.add(sourceName);
242234
sourceRequest.setId(source.getId());
235+
} else {
236+
log.warn("Unknown source {} from server", sourceName);
243237
}
244238
}
245239
}
@@ -263,12 +257,7 @@ private void initOrUpdateSink() {
263257
for (StreamSink sink : streamSinks) {
264258
final String sinkName = sink.getSinkName();
265259
final int id = sink.getId();
266-
if (sinkRequests.get(sinkName) == null) {
267-
boolean isDelete = sinkClient.deleteSink(id);
268-
if (!isDelete) {
269-
throw new RuntimeException(String.format("Delete sink=%s failed", sink));
270-
}
271-
} else {
260+
if (sinkRequests.get(sinkName) != null) {
272261
SinkRequest sinkRequest = sinkRequests.get(sinkName);
273262
sinkRequest.setId(id);
274263
sinkRequest.setVersion(sink.getVersion());
@@ -279,6 +268,8 @@ private void initOrUpdateSink() {
279268
}
280269
updateSinkNames.add(sinkName);
281270
sinkRequest.setId(sink.getId());
271+
} else {
272+
log.warn("Unknown sink {} from server", sinkName);
282273
}
283274
}
284275
for (Map.Entry<String, SinkRequest> requestEntry : sinkRequests.entrySet()) {

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java

+11-20
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.common.collect.Maps;
2222
import com.google.common.collect.Sets;
2323
import lombok.Data;
24+
import lombok.extern.slf4j.Slf4j;
2425
import org.apache.commons.collections.CollectionUtils;
2526
import org.apache.commons.collections.MapUtils;
2627
import org.apache.commons.lang3.tuple.Pair;
@@ -54,6 +55,7 @@
5455
/**
5556
* Inlong stream service implementation.
5657
*/
58+
@Slf4j
5759
@Data
5860
public class InlongStreamImpl implements InlongStream {
5961

@@ -331,14 +333,7 @@ private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
331333
StreamTransform transform = StreamTransformTransfer.parseStreamTransform(transformResponse);
332334
final String transformName = transform.getTransformName();
333335
final int id = transformResponse.getId();
334-
if (this.streamTransforms.get(transformName) == null) {
335-
TransformRequest transformRequest = StreamTransformTransfer.createTransformRequest(transform,
336-
streamInfo);
337-
boolean isDelete = transformClient.deleteTransform(transformRequest);
338-
if (!isDelete) {
339-
throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
340-
}
341-
} else {
336+
if (this.streamTransforms.get(transformName) != null) {
342337
StreamTransform newTransform = this.streamTransforms.get(transformName);
343338
TransformRequest transformRequest = StreamTransformTransfer.createTransformRequest(newTransform,
344339
streamInfo);
@@ -350,6 +345,8 @@ private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
350345
updateState.getValue()));
351346
}
352347
updateTransformNames.add(transformName);
348+
} else {
349+
log.warn("Unknown transform {} from server", transformName);
353350
}
354351
}
355352
for (Map.Entry<String, StreamTransform> transformEntry : this.streamTransforms.entrySet()) {
@@ -370,12 +367,7 @@ private void initOrUpdateSource(InlongStreamInfo streamInfo) {
370367
for (StreamSource source : streamSources) {
371368
final String sourceName = source.getSourceName();
372369
final int id = source.getId();
373-
if (this.streamSources.get(sourceName) == null) {
374-
boolean isDelete = sourceClient.deleteSource(id);
375-
if (!isDelete) {
376-
throw new RuntimeException(String.format("Delete source=%s failed", source));
377-
}
378-
} else {
370+
if (this.streamSources.get(sourceName) != null) {
379371
StreamSource streamSource = this.streamSources.get(sourceName);
380372
streamSource.setId(id);
381373
streamSource.setInlongGroupId(streamInfo.getInlongGroupId());
@@ -387,6 +379,8 @@ private void initOrUpdateSource(InlongStreamInfo streamInfo) {
387379
updateState.getValue()));
388380
}
389381
updateSourceNames.add(sourceName);
382+
} else {
383+
log.warn("Unknown source {} from server", sourceName);
390384
}
391385
}
392386
for (Map.Entry<String, StreamSource> sourceEntry : this.streamSources.entrySet()) {
@@ -408,12 +402,7 @@ private void initOrUpdateSink(InlongStreamInfo streamInfo) {
408402
for (StreamSink sink : streamSinks) {
409403
final String sinkName = sink.getSinkName();
410404
final int id = sink.getId();
411-
if (this.streamSinks.get(sinkName) == null) {
412-
boolean isDelete = sinkClient.deleteSink(id);
413-
if (!isDelete) {
414-
throw new RuntimeException(String.format("Delete sink=%s failed", sink));
415-
}
416-
} else {
405+
if (this.streamSinks.get(sinkName) != null) {
417406
StreamSink streamSink = this.streamSinks.get(sinkName);
418407
streamSink.setId(id);
419408
streamSink.setInlongGroupId(streamInfo.getInlongGroupId());
@@ -425,6 +414,8 @@ private void initOrUpdateSink(InlongStreamInfo streamInfo) {
425414
updateState.getValue()));
426415
}
427416
updateSinkNames.add(sinkName);
417+
} else {
418+
log.error("Unknown sink {} from server", sinkName);
428419
}
429420
}
430421

inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ void testListGroup4BinlogSource() {
238238
.inlongGroupId("1")
239239
.inlongStreamId("2")
240240
.sourceType(SourceType.MYSQL_BINLOG)
241-
.clusterId(1)
242241
.status(1)
243242
.user("root")
244243
.password("pwd")
@@ -279,7 +278,7 @@ void testListGroup4FileSource() {
279278
.inlongStreamId("2")
280279
.sourceType(SourceType.FILE)
281280
.status(1)
282-
.ip("127.0.0.1")
281+
.agentIp("127.0.0.1")
283282
.pattern("pattern")
284283
.build()
285284
)
@@ -367,7 +366,7 @@ void testListGroup4AllSource() {
367366
.inlongGroupId("1")
368367
.inlongStreamId("2")
369368
.version(1)
370-
.ip("127.0.0.1")
369+
.agentIp("127.0.0.1")
371370
.pattern("pattern")
372371
.timeOffset("timeOffset")
373372
.build(),

inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class StreamSourceEntity implements Serializable {
3838
private String uuid;
3939

4040
private String dataNodeName;
41-
private Integer clusterId;
41+
private String inlongClusterName;
4242
private String serializationType;
4343
private String snapshot;
4444
private Date reportTime;

inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -65,26 +65,33 @@ List<StreamSourceEntity> selectByRelatedId(@Param("groupId") String groupId, @Pa
6565
/**
6666
* Query the tasks by the given status list.
6767
*/
68-
List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list, @Param("limit") int limit);
68+
List<StreamSourceEntity> selectByStatus(@Param("statusList") List<Integer> list, @Param("limit") int limit);
6969

7070
/**
7171
* Query the tasks by the given status list and type List.
7272
*/
73-
List<StreamSourceEntity> selectByStatusAndType(@Param("list") List<Integer> list,
74-
@Param("sourceType") List<String> sourceTypes, @Param("limit") int limit);
73+
List<StreamSourceEntity> selectByStatusAndType(@Param("statusList") List<Integer> statusList,
74+
@Param("sourceTypeList") List<String> sourceTypeList, @Param("limit") int limit);
75+
76+
/**
77+
* Query the tasks by the given status list and type List.
78+
*/
79+
List<StreamSourceEntity> selectByAgentIpOrCluster(@Param("statusList") List<Integer> statusList,
80+
@Param("sourceTypeList") List<String> sourceTypeList, @Param("agentIp") String agentIp,
81+
@Param("clusterName") String clusterName, @Param("limit") int limit);
7582

7683
/**
7784
* Query the sources with status 20x by the given agent IP and agent UUID.
7885
*
79-
* @apiNote Sources with is_deleted > 0 need to be filtered.
86+
* @apiNote Sources with is_deleted > 0 should also be returned to agents to clear their local tasks.
8087
*/
8188
List<StreamSourceEntity> selectByStatusAndIp(@Param("statusList") List<Integer> statusList,
8289
@Param("agentIp") String agentIp, @Param("uuid") String uuid);
8390

8491
/**
8592
* Select all sources by groupIds
8693
*/
87-
List<StreamSourceEntity> selectByGroupIds(@Param("groupIds") List<String> groupIds);
94+
List<StreamSourceEntity> selectByGroupIds(@Param("groupIdList") List<String> groupIdList);
8895

8996
/**
9097
* Get the distinct source type from the given groupId and streamId
@@ -117,6 +124,8 @@ int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @
117124

118125
int updateSnapshot(StreamSourceEntity entity);
119126

127+
int appendAgentIp(@Param("id") Integer id, @Param("agentIp") String agentIp);
128+
120129
/**
121130
* Physical delete stream sources.
122131
*/

0 commit comments

Comments
 (0)