Skip to content

Commit d918248

Browse files
authored
[INLONG-2894][Agent] Adapt the interface and field modification of the manager module (#2897)
1 parent a6eaee8 commit d918248

File tree

5 files changed

+17
-14
lines changed

5 files changed

+17
-14
lines changed

inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.inlong.agent.constant;
1919

20+
/**
21+
* Constants of job fetcher.
22+
*/
2023
public class FetcherConstants {
2124

2225
public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval";
@@ -37,7 +40,7 @@ public class FetcherConstants {
3740
public static final String DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH = "/api/inlong/manager/openapi";
3841

3942
public static final String AGENT_MANAGER_TASK_HTTP_PATH = "agent.manager.task.http.path";
40-
public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH = "/agent/getTask";
43+
public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH = "/agent/reportAndGetTask";
4144

4245
public static final String AGENT_MANAGER_IP_CHECK_HTTP_PATH = "agent.manager.vip.http.checkIP.path";
4346
public static final String DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH = "/fileAgent/confirmAgentIp";

inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ public static TriggerProfile convertToTriggerProfile(DataConfig dataConfigs) {
169169
profileDto.setProxy(proxy);
170170
Job job = new Job();
171171

172-
//common Attribu
173-
job.setId(String.valueOf(dataConfigs.getJobId()));
172+
// common attribute
173+
job.setId(String.valueOf(dataConfigs.getTaskId()));
174174
job.setChannel(DEFAULT_CHANNEL);
175175
job.setIp(dataConfigs.getIp());
176176
job.setOp(dataConfigs.getOp());

inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,10 @@ public class ManagerResultFormatter {
4646
*/
4747
public static JsonObject getResultData(String jsonStr) {
4848
JsonObject object = GSON.fromJson(jsonStr, JsonObject.class);
49-
if (object == null || !object.has(RESULT_CODE)
50-
|| !object.has(RESULT_DATA)
49+
if (object == null || !object.has(RESULT_CODE) || !object.has(RESULT_DATA)
5150
|| !SUCCESS_CODE.equals(object.get(RESULT_CODE).getAsString())) {
5251
throw new IllegalArgumentException("cannot get result data,"
53-
+ " please check manager status, return str is" + jsonStr);
52+
+ " please check manager status, return str is " + jsonStr);
5453

5554
}
5655
return object;

inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class DataConfig {
3333
private String inlongGroupId;
3434
private String inlongStreamId;
3535
private String op;
36-
private Integer jobId;
36+
private Integer taskId;
3737
private Integer taskType;
3838
private String taskName;
3939
private String snapshot;

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public class AgentServiceImpl implements AgentService {
8383
@Autowired
8484
private InlongStreamFieldEntityMapper streamFieldMapper;
8585
@Autowired
86-
private InlongStreamEntityMapper inlongStreamMapper;
86+
private InlongStreamEntityMapper streamMapper;
8787

8888
/**
8989
* If the reported task time and the modification time in the database exceed this value,
@@ -121,20 +121,21 @@ private TaskResult getTaskResult(TaskRequest request) {
121121
List<StreamSourceEntity> entityList = sourceMapper.selectByIpAndUuid(agentIp, uuid);
122122
for (StreamSourceEntity entity : entityList) {
123123
DataConfig dataConfig = new DataConfig();
124-
dataConfig.setJobId(entity.getId());
124+
dataConfig.setTaskId(entity.getId());
125125
SourceType sourceType = SourceType.forType(entity.getSourceType());
126126
dataConfig.setTaskType(sourceType.getTaskType().getType());
127127
dataConfig.setTaskName(entity.getSourceName());
128128
dataConfig.setOp(String.valueOf(entity.getStatus() % 100));
129-
String inlongGroupId = entity.getInlongGroupId();
130-
String inlongStreamId = entity.getInlongStreamId();
131-
dataConfig.setInlongGroupId(inlongGroupId);
132-
dataConfig.setInlongStreamId(inlongStreamId);
133129
dataConfig.setIp(entity.getAgentIp());
134130
dataConfig.setUuid(entity.getUuid());
135131
dataConfig.setExtParams(entity.getExtParams());
136132
dataConfig.setSnapshot(entity.getSnapshot());
137-
InlongStreamEntity inlongStreamEntity = inlongStreamMapper.selectByIdentifier(inlongGroupId,inlongStreamId);
133+
134+
String groupId = entity.getInlongGroupId();
135+
String streamId = entity.getInlongStreamId();
136+
dataConfig.setInlongGroupId(groupId);
137+
dataConfig.setInlongStreamId(streamId);
138+
InlongStreamEntity inlongStreamEntity = streamMapper.selectByIdentifier(groupId, streamId);
138139
dataConfig.setSyncSend(inlongStreamEntity.getSyncSend());
139140
dataConfigs.add(dataConfig);
140141
}

0 commit comments

Comments
 (0)