Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-5555][Manager] Fix missing heartbeat fields and the file task split error #5556

Merged
merged 9 commits into from
Aug 18, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HOME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE_TIMEOUT;
Expand Down Expand Up @@ -116,7 +116,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
private List<String> managerList;
private String localIp;
private String uuid;
private String clusterTag;
private String clusterName;

private CommandDb commandDb;

Expand All @@ -132,7 +132,7 @@ public ManagerFetcher(AgentManager agentManager) {
managerDbCollectorTaskUrl = buildDbCollectorGetTaskUrl(baseManagerUrl);
localFileCache = getLocalFileCache();
uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID);
clusterTag = conf.get(AGENT_CLUSTER_TAG);
clusterName = conf.get(AGENT_CLUSTER_NAME);
this.commandDb = agentManager.getCommandDb();
} else {
throw new RuntimeException("init manager error, cannot find required key");
Expand Down Expand Up @@ -305,7 +305,7 @@ public TaskRequest getFetchRequest(List<CommandEntity> unackedCommands) {
TaskRequest request = new TaskRequest();
request.setAgentIp(localIp);
request.setUuid(uuid);
request.setClusterTag(clusterTag);
request.setClusterName(clusterName);
// when job size is over limit, require no new job
if (agentManager.getJobManager().isJobOverLimit()) {
request.setPullJobType(PullJobTypeEnum.NEVER.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
@Data
public class TaskRequest {

private String clusterTag;
private String clusterName;

private String agentIp;

Expand Down
4 changes: 3 additions & 1 deletion inlong-dataproxy/conf/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ cluster.id=1
manager.hosts=127.0.0.1:8083
manager.auth.secretId=
manager.auth.secretKey=
proxy.local.ip=127.0.0.1
proxy.report.ip=127.0.0.1
proxy.report.port=46801
# proxy cluster name
proxy.cluster.name=default_dataproxy
proxy.cluster.tag=default_cluster
# check interval of local config (millisecond)
configCheckInterval=10000

Expand Down
3 changes: 2 additions & 1 deletion inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ cd "${file_path}/"
common_conf_file=./conf/common.properties
sed -i "s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g" "${common_conf_file}"
sed -i "s/audit.proxys=.*$/audit.proxys=${AUDIT_PROXY_URL}/g" "${common_conf_file}"
sed -i "s/proxy.local.ip=.*$/proxy.local.ip=${local_ip}/g" "${common_conf_file}"
sed -i "s/proxy.report.ip=.*$/proxy.report.ip=${local_ip}/g" "${common_conf_file}"
sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${common_conf_file}"

# start
if [ "${MQ_TYPE}" = "pulsar" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private void checkLocalFile() {
}
}

private boolean checkWithManager(String host, String clusterName) {
private boolean checkWithManager(String host, String clusterName, String clusterTag) {
if (StringUtils.isEmpty(clusterName)) {
LOG.error("proxyClusterName is null");
return false;
Expand All @@ -283,6 +283,7 @@ private boolean checkWithManager(String host, String clusterName) {
// request body
DataProxyConfigRequest request = new DataProxyConfigRequest();
request.setClusterName(clusterName);
request.setClusterTag(clusterTag);
httpPost.setEntity(HttpUtils.getEntity(request));

// request with post
Expand Down Expand Up @@ -355,12 +356,13 @@ private void checkRemoteConfig() {
try {
String managerHosts = configManager.getCommonProperties().get(ConfigConstants.MANAGER_HOST);
String proxyClusterName = configManager.getCommonProperties().get(ConfigConstants.PROXY_CLUSTER_NAME);
String proxyClusterTag = configManager.getCommonProperties().get(ConfigConstants.PROXY_CLUSTER_TAG);
if (StringUtils.isEmpty(managerHosts) || StringUtils.isEmpty(proxyClusterName)) {
return;
}
String[] hostList = StringUtils.split(managerHosts, ",");
for (String host : hostList) {
if (checkWithManager(host, proxyClusterName)) {
if (checkWithManager(host, proxyClusterName, proxyClusterTag)) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ public class ConfigConstants {
public static final String CLUSTER_ID_KEY = "clusterId";
public static final String MANAGER_HOST = "manager.hosts";
public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name";
public static final String PROXY_LOCAL_IP = "proxy.local.ip";
public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
public static final String PROXY_REPORT_IP = "proxy.report.ip";
public static final String PROXY_REPORT_PORT = "proxy.report.port";
public static final String CONFIG_CHECK_INTERVAL = "configCheckInterval";

public static final String DECODER_BODY = "body";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
@Slf4j
public class HeartbeatManager implements AbstractHeartbeatManager {

public static final String DEFAULT_REPORT_PORT = "46801";
public static final String DEFAULT_CLUSTER_TAG = "default_cluster";
public static final String DEFAULT_CLUSTER_NAME = "default_dataproxy";

private final CloseableHttpClient httpClient;
private final Gson gson;

Expand Down Expand Up @@ -112,10 +116,16 @@ private HeartbeatMsg buildHeartbeat() {
ConfigManager configManager = ConfigManager.getInstance();
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
Map<String, String> commonProperties = configManager.getCommonProperties();
String localIp = commonProperties.get(ConfigConstants.PROXY_LOCAL_IP);
heartbeatMsg.setIp(localIp);
String reportIp = commonProperties.get(ConfigConstants.PROXY_REPORT_IP);
String reportPort = commonProperties.getOrDefault(ConfigConstants.PROXY_REPORT_PORT, DEFAULT_REPORT_PORT);
heartbeatMsg.setIp(reportIp);
heartbeatMsg.setPort(Integer.parseInt(reportPort));
heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getName());
heartbeatMsg.setReportTime(System.currentTimeMillis());
String clusterTag = commonProperties.getOrDefault(ConfigConstants.PROXY_CLUSTER_TAG, DEFAULT_CLUSTER_TAG);
String clusterName = commonProperties.getOrDefault(ConfigConstants.PROXY_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
heartbeatMsg.setClusterTag(clusterTag);
heartbeatMsg.setClusterName(clusterName);

Map<String, String> groupIdMappings = configManager.getGroupIdMappingProperties();
Map<String, Map<String, String>> streamIdMappings = configManager.getStreamIdMappingProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -53,6 +54,7 @@
/**
* Default inlong stream builder.
*/
@Slf4j
public class DefaultInlongStreamBuilder extends InlongStreamBuilder {

private final InlongStreamImpl inlongStream;
Expand Down Expand Up @@ -183,14 +185,7 @@ private void initOrUpdateTransform() {
StreamTransform transform = StreamTransformTransfer.parseStreamTransform(transformResponse);
final String transformName = transform.getTransformName();
final int id = transformResponse.getId();
if (transformRequests.get(transformName) == null) {
TransformRequest transformRequest = StreamTransformTransfer.createTransformRequest(transform,
streamInfo);
boolean isDelete = transformClient.deleteTransform(transformRequest);
if (!isDelete) {
throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
}
} else {
if (transformRequests.get(transformName) != null) {
TransformRequest transformRequest = transformRequests.get(transformName);
transformRequest.setId(id);
transformRequest.setVersion(transformResponse.getVersion());
Expand All @@ -201,6 +196,8 @@ private void initOrUpdateTransform() {
}
transformRequest.setId(transformResponse.getId());
updateTransformNames.add(transformName);
} else {
log.warn("Unknown transform {} from server", transformName);
}
}
for (Map.Entry<String, TransformRequest> requestEntry : transformRequests.entrySet()) {
Expand All @@ -224,12 +221,7 @@ private void initOrUpdateSource() {
for (StreamSource source : streamSources) {
final String sourceName = source.getSourceName();
final int id = source.getId();
if (sourceRequests.get(sourceName) == null) {
boolean isDelete = sourceClient.deleteSource(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete source failed by id=%s", id));
}
} else {
if (sourceRequests.get(sourceName) != null) {
SourceRequest sourceRequest = sourceRequests.get(sourceName);
sourceRequest.setId(id);
sourceRequest.setVersion(source.getVersion());
Expand All @@ -240,6 +232,8 @@ private void initOrUpdateSource() {
}
updateSourceNames.add(sourceName);
sourceRequest.setId(source.getId());
} else {
log.warn("Unknown source {} from server", sourceName);
}
}
}
Expand All @@ -263,12 +257,7 @@ private void initOrUpdateSink() {
for (StreamSink sink : streamSinks) {
final String sinkName = sink.getSinkName();
final int id = sink.getId();
if (sinkRequests.get(sinkName) == null) {
boolean isDelete = sinkClient.deleteSink(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete sink=%s failed", sink));
}
} else {
if (sinkRequests.get(sinkName) != null) {
SinkRequest sinkRequest = sinkRequests.get(sinkName);
sinkRequest.setId(id);
sinkRequest.setVersion(sink.getVersion());
Expand All @@ -279,6 +268,8 @@ private void initOrUpdateSink() {
}
updateSinkNames.add(sinkName);
sinkRequest.setId(sink.getId());
} else {
log.warn("Unknown sink {} from server", sinkName);
}
}
for (Map.Entry<String, SinkRequest> requestEntry : sinkRequests.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -54,6 +55,7 @@
/**
* Inlong stream service implementation.
*/
@Slf4j
@Data
public class InlongStreamImpl implements InlongStream {

Expand Down Expand Up @@ -331,14 +333,7 @@ private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
StreamTransform transform = StreamTransformTransfer.parseStreamTransform(transformResponse);
final String transformName = transform.getTransformName();
final int id = transformResponse.getId();
if (this.streamTransforms.get(transformName) == null) {
TransformRequest transformRequest = StreamTransformTransfer.createTransformRequest(transform,
streamInfo);
boolean isDelete = transformClient.deleteTransform(transformRequest);
if (!isDelete) {
throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
}
} else {
if (this.streamTransforms.get(transformName) != null) {
StreamTransform newTransform = this.streamTransforms.get(transformName);
TransformRequest transformRequest = StreamTransformTransfer.createTransformRequest(newTransform,
streamInfo);
Expand All @@ -350,6 +345,8 @@ private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
updateState.getValue()));
}
updateTransformNames.add(transformName);
} else {
log.warn("Unknown transform {} from server", transformName);
}
}
for (Map.Entry<String, StreamTransform> transformEntry : this.streamTransforms.entrySet()) {
Expand All @@ -370,12 +367,7 @@ private void initOrUpdateSource(InlongStreamInfo streamInfo) {
for (StreamSource source : streamSources) {
final String sourceName = source.getSourceName();
final int id = source.getId();
if (this.streamSources.get(sourceName) == null) {
boolean isDelete = sourceClient.deleteSource(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete source=%s failed", source));
}
} else {
if (this.streamSources.get(sourceName) != null) {
StreamSource streamSource = this.streamSources.get(sourceName);
streamSource.setId(id);
streamSource.setInlongGroupId(streamInfo.getInlongGroupId());
Expand All @@ -387,6 +379,8 @@ private void initOrUpdateSource(InlongStreamInfo streamInfo) {
updateState.getValue()));
}
updateSourceNames.add(sourceName);
} else {
log.warn("Unknown source {} from server", sourceName);
}
}
for (Map.Entry<String, StreamSource> sourceEntry : this.streamSources.entrySet()) {
Expand All @@ -408,12 +402,7 @@ private void initOrUpdateSink(InlongStreamInfo streamInfo) {
for (StreamSink sink : streamSinks) {
final String sinkName = sink.getSinkName();
final int id = sink.getId();
if (this.streamSinks.get(sinkName) == null) {
boolean isDelete = sinkClient.deleteSink(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete sink=%s failed", sink));
}
} else {
if (this.streamSinks.get(sinkName) != null) {
StreamSink streamSink = this.streamSinks.get(sinkName);
streamSink.setId(id);
streamSink.setInlongGroupId(streamInfo.getInlongGroupId());
Expand All @@ -425,6 +414,8 @@ private void initOrUpdateSink(InlongStreamInfo streamInfo) {
updateState.getValue()));
}
updateSinkNames.add(sinkName);
} else {
log.error("Unknown sink {} from server", sinkName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ void testListGroup4BinlogSource() {
.inlongGroupId("1")
.inlongStreamId("2")
.sourceType(SourceType.MYSQL_BINLOG)
.clusterId(1)
.status(1)
.user("root")
.password("pwd")
Expand Down Expand Up @@ -279,7 +278,7 @@ void testListGroup4FileSource() {
.inlongStreamId("2")
.sourceType(SourceType.FILE)
.status(1)
.ip("127.0.0.1")
.agentIp("127.0.0.1")
.pattern("pattern")
.build()
)
Expand Down Expand Up @@ -367,7 +366,7 @@ void testListGroup4AllSource() {
.inlongGroupId("1")
.inlongStreamId("2")
.version(1)
.ip("127.0.0.1")
.agentIp("127.0.0.1")
.pattern("pattern")
.timeOffset("timeOffset")
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class StreamSourceEntity implements Serializable {
private String uuid;

private String dataNodeName;
private Integer clusterId;
private String inlongClusterName;
private String serializationType;
private String snapshot;
private Date reportTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,33 @@ List<StreamSourceEntity> selectByRelatedId(@Param("groupId") String groupId, @Pa
/**
* Query the tasks by the given status list.
*/
List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list, @Param("limit") int limit);
List<StreamSourceEntity> selectByStatus(@Param("statusList") List<Integer> list, @Param("limit") int limit);

/**
* Query the tasks by the given status list and type List.
*/
List<StreamSourceEntity> selectByStatusAndType(@Param("list") List<Integer> list,
@Param("sourceType") List<String> sourceTypes, @Param("limit") int limit);
List<StreamSourceEntity> selectByStatusAndType(@Param("statusList") List<Integer> statusList,
@Param("sourceTypeList") List<String> sourceTypeList, @Param("limit") int limit);

/**
* Query the tasks by the given status list and type List.
*/
List<StreamSourceEntity> selectByAgentIpOrCluster(@Param("statusList") List<Integer> statusList,
@Param("sourceTypeList") List<String> sourceTypeList, @Param("agentIp") String agentIp,
@Param("clusterName") String clusterName, @Param("limit") int limit);

/**
* Query the sources with status 20x by the given agent IP and agent UUID.
*
* @apiNote Sources with is_deleted > 0 need to be filtered.
* @apiNote Sources with is_deleted > 0 should also be returned to agents to clear their local tasks.
*/
List<StreamSourceEntity> selectByStatusAndIp(@Param("statusList") List<Integer> statusList,
@Param("agentIp") String agentIp, @Param("uuid") String uuid);

/**
* Select all sources by groupIds
*/
List<StreamSourceEntity> selectByGroupIds(@Param("groupIds") List<String> groupIds);
List<StreamSourceEntity> selectByGroupIds(@Param("groupIdList") List<String> groupIdList);

/**
* Get the distinct source type from the given groupId and streamId
Expand Down Expand Up @@ -117,6 +124,8 @@ int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @

int updateSnapshot(StreamSourceEntity entity);

int appendAgentIp(@Param("id") Integer id, @Param("agentIp") String agentIp);

/**
* Physical delete stream sources.
*/
Expand Down
Loading