diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 590e229f020..4e2a5996194 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -160,6 +160,10 @@ public class AgentConstants { public static final String JOB_DB_CACHE_CHECK_INTERVAL = "job.db.cache.check.interval"; public static final int DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL = 60 * 60; + public static final String JOB_NUMBER_LIMIT = "job.number.limit"; + public static final int DEFAULT_JOB_NUMBER_LIMIT = 20; + + public static final String AGENT_LOCAL_IP = "agent.local.ip"; public static final String AGENT_LOCAL_UUID = "agent.local.uuid"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java index 038483586c9..c8330f46d4d 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java @@ -292,7 +292,9 @@ public List findAll(String prefix) { while (it.isValid()) { KeyValueEntity keyValueItem = GSON .fromJson(new String(it.value()), KeyValueEntity.class); - results.add(keyValueItem); + if (keyValueItem.getKey().startsWith(prefix)) { + results.add(keyValueItem); + } it.next(); } } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java index 1735a1c6c8d..79b9c1609b1 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java @@ -40,8 +40,10 @@ import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT; import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL; import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT; import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX; import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID; @@ -69,6 +71,7 @@ public class JobManager extends AbstractDaemon { private final JobProfileDb jobProfileDb; private final JobMetrics jobMetrics; private final AtomicLong index = new AtomicLong(0); + private final long jobMaxSize; /** * init job manager @@ -92,6 +95,7 @@ public JobManager(AgentManager agentManager, JobProfileDb jobProfileDb) { AgentConstants.JOB_MONITOR_INTERVAL, AgentConstants.DEFAULT_JOB_MONITOR_INTERVAL); this.jobDbCacheTime = conf.getLong(JOB_DB_CACHE_TIME, DEFAULT_JOB_DB_CACHE_TIME); this.jobDbCacheCheckInterval = conf.getLong(JOB_DB_CACHE_CHECK_INTERVAL, DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL); + this.jobMaxSize = conf.getLong(JOB_NUMBER_LIMIT, DEFAULT_JOB_NUMBER_LIMIT); if (ConfigUtil.isPrometheusEnabled()) { this.jobMetrics = new JobPrometheusMetrics(); @@ -137,9 +141,7 @@ public boolean submitFileJobProfile(JobProfile profile) { * @param profile - job profile. */ public boolean submitJobProfile(JobProfile profile, boolean singleJob) { - if (profile == null || !profile.allRequiredKeyExist()) { - LOGGER.error("profile is null or not all required key exists {}", profile == null ? null - : profile.toJsonStr()); + if (!isJobValid(profile)) { return false; } String jobId = profile.get(JOB_ID); @@ -160,9 +162,7 @@ public boolean submitJobProfile(JobProfile profile, boolean singleJob) { * @param profile - job profile. */ public boolean submitSqlJobProfile(JobProfile profile) { - if (profile == null || !profile.allRequiredKeyExist()) { - LOGGER.error("profile is null or not all required key exists {}", profile == null ? null - : profile.toJsonStr()); + if (isJobValid(profile)) { return false; } profile.set(JOB_INSTANCE_ID, SQL_JOB_ID); @@ -172,6 +172,23 @@ public boolean submitSqlJobProfile(JobProfile profile) { return true; } + private boolean isJobValid(JobProfile profile) { + if (profile == null || !profile.allRequiredKeyExist()) { + LOGGER.error("profile is null or not all required key exists {}", profile == null ? null + : profile.toJsonStr()); + return false; + } + if (isJobOverLimit()) { + LOGGER.error("agent cannot add more job, max job size is {}", jobMaxSize); + return false; + } + return true; + } + + public boolean isJobOverLimit() { + return jobs.size() >= jobMaxSize; + } + /** * delete job profile and stop job thread * diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index 84063da06f8..6ab1dfad1e1 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -39,6 +39,7 @@ import org.apache.inlong.agent.utils.HttpManager; import org.apache.inlong.common.db.CommandEntity; import org.apache.inlong.common.enums.ManagerOpEnum; +import org.apache.inlong.common.enums.PullJobTypeEnum; import org.apache.inlong.common.pojo.agent.CmdConfig; import org.apache.inlong.common.pojo.agent.DataConfig; import org.apache.inlong.common.pojo.agent.TaskRequest; @@ -227,11 +228,11 @@ public void requestTdmList() { */ public void fetchCommand() { List unackedCommands = commandDb.getUnackedCommands(); - String resultStr = httpManager.doSentPost(managerTaskUrl, getFileCmdFetchRequest(unackedCommands)); + String resultStr = httpManager.doSentPost(managerTaskUrl, getFetchRequest(unackedCommands)); JsonObject resultData = getResultData(resultStr); JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA); if (element != null) { - dealWithFileTaskResult(GSON.fromJson(element.getAsJsonObject(), TaskResult.class)); + dealWithFetchResult(GSON.fromJson(element.getAsJsonObject(), TaskResult.class)); } ackCommands(unackedCommands); } @@ -271,7 +272,7 @@ private void dealWithSqlTaskResult(DbCollectorTaskResult taskResult) { /** * the fetch file command can be normal or special */ - private void dealWithFileTaskResult(TaskResult taskResult) { + private void dealWithFetchResult(TaskResult taskResult) { LOGGER.info("deal with fetch result {}", taskResult); for (DataConfig dataConfig : taskResult.getDataConfigs()) { TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig); @@ -291,10 +292,16 @@ private void dealWithFileTaskResult(TaskResult taskResult) { /** * form file command fetch request */ - public TaskRequest getFileCmdFetchRequest(List unackedCommands) { + public TaskRequest getFetchRequest(List unackedCommands) { TaskRequest request = new TaskRequest(); request.setAgentIp(localIp); request.setUuid(uuid); + // when job size is over limit, require no new job + if (agentManager.getJobManager().isJobOverLimit()) { + request.setPullJobType(PullJobTypeEnum.NEVER.getType()); + } else { + request.setPullJobType(PullJobTypeEnum.NEW.getType()); + } request.setCommandInfo(unackedCommands); return request; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java index f9749574a7f..535e510a314 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java @@ -155,7 +155,7 @@ private Runnable flushCache() { LOGGER.info("start flush cache thread for {} ProxySink", inlongGroupId); while (!shutdown) { try { - cache.forEach((s, packProxyMessage) -> { + cache.forEach((batchKey, packProxyMessage) -> { Pair> result = packProxyMessage.fetchBatch(); if (result != null) { long sendTime = AgentUtils.getCurrentTime(); @@ -166,9 +166,9 @@ private Runnable flushCache() { senderManager.sendBatchAsync(jobInstanceId, inlongGroupId, result.getKey(), result.getValue(), 0, sendTime); } - LOGGER.info("send group id {} with message size {}, the job id is {}, read source is {}" - + "sendTime is {} syncSend {}", inlongGroupId, result.getRight().size(), - jobInstanceId, sourceName, sendTime, syncSend); + LOGGER.info("send group id {}, message key {},with message size {}, the job id is {}, " + + "read source is {} sendTime is {} syncSend {}", inlongGroupId, batchKey, + result.getRight().size(), jobInstanceId, sourceName, sendTime, syncSend); } }); diff --git a/inlong-agent/bin/agent-env.sh b/inlong-agent/bin/agent-env.sh index 4c70b954896..a1dec8071b8 100755 --- a/inlong-agent/bin/agent-env.sh +++ b/inlong-agent/bin/agent-env.sh @@ -36,7 +36,7 @@ else export JPS="$JAVA_HOME/bin/jps" fi -HEAP_OPTS="-Xms512m" +HEAP_OPTS="-Xms512m -Xmx=6114" GC_OPTS="-XX:SurvivorRatio=6 -XX:+UseMembar -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+CMSScavengeBeforeRemark -XX:ParallelCMSThreads=3 -XX:+TieredCompilation -XX:+UseCMSCompactAtFullCollection -verbose:gc -Xloggc:$BASE_DIR/logs/gc.log.`date +%Y-%m-%d-%H-%M-%S` -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$BASE_DIR/logs/ -XX:+CMSClassUnloadingEnabled -XX:CMSInitiatingOccupancyFraction=60 -XX:CMSFullGCsBeforeCompaction=1 -Dsun.net.inetaddr.ttl=3 -Dsun.net.inetaddr.negative.ttl=1 -Djava.net.preferIPv4Stack=true" AGENT_JVM_ARGS="$HEAP_OPTS $GC_OPTS" diff --git a/inlong-agent/conf/agent.properties b/inlong-agent/conf/agent.properties index 20b7cb30985..fdc263abacc 100755 --- a/inlong-agent/conf/agent.properties +++ b/inlong-agent/conf/agent.properties @@ -70,6 +70,8 @@ job.running.thread.keepAlive=60 job.monitor.interval=5 # check interval(s) whether job is finished job.finish.checkInterval=6 +# the amount of jobs agent can support +job.number.limit=20 ############################ diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/PullJobTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/PullJobTypeEnum.java new file mode 100644 index 00000000000..0b22156440b --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/PullJobTypeEnum.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.enums; + +import static java.util.Objects.requireNonNull; + +public enum PullJobTypeEnum { + + NEW(0), NEVER(1); + + private int type; + + PullJobTypeEnum(int type) { + this.type = type; + } + + public static PullJobTypeEnum getPullJobType(int type) { + requireNonNull(type); + switch (type) { + case 0: + return NEW; + case 1: + return NEVER; + default: + throw new RuntimeException("such pull job type doesn't exist"); + } + } + + public int getType() { + return type; + } +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java index fdc994a877e..041f50c846d 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java @@ -33,6 +33,8 @@ public class TaskRequest { private String uuid; + private int pullJobType; + private List commandInfo = new ArrayList<>(); }