Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-3349][Agent] Agent add limitation for job number #3353

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ public class AgentConstants {
public static final String JOB_DB_CACHE_CHECK_INTERVAL = "job.db.cache.check.interval";
public static final int DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL = 60 * 60;

public static final String JOB_NUMBER_LIMIT = "job.number.limit";
public static final int DEFAULT_JOB_NUMBER_LIMIT = 20;


public static final String AGENT_LOCAL_IP = "agent.local.ip";

public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ public List<KeyValueEntity> findAll(String prefix) {
while (it.isValid()) {
KeyValueEntity keyValueItem = GSON
.fromJson(new String(it.value()), KeyValueEntity.class);
results.add(keyValueItem);
if (keyValueItem.getKey().startsWith(prefix)) {
results.add(keyValueItem);
}
it.next();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@

import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID;
Expand Down Expand Up @@ -69,6 +71,7 @@ public class JobManager extends AbstractDaemon {
private final JobProfileDb jobProfileDb;
private final JobMetrics jobMetrics;
private final AtomicLong index = new AtomicLong(0);
private final long jobMaxSize;

/**
* init job manager
Expand All @@ -92,6 +95,7 @@ public JobManager(AgentManager agentManager, JobProfileDb jobProfileDb) {
AgentConstants.JOB_MONITOR_INTERVAL, AgentConstants.DEFAULT_JOB_MONITOR_INTERVAL);
this.jobDbCacheTime = conf.getLong(JOB_DB_CACHE_TIME, DEFAULT_JOB_DB_CACHE_TIME);
this.jobDbCacheCheckInterval = conf.getLong(JOB_DB_CACHE_CHECK_INTERVAL, DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL);
this.jobMaxSize = conf.getLong(JOB_NUMBER_LIMIT, DEFAULT_JOB_NUMBER_LIMIT);

if (ConfigUtil.isPrometheusEnabled()) {
this.jobMetrics = new JobPrometheusMetrics();
Expand Down Expand Up @@ -137,9 +141,7 @@ public boolean submitFileJobProfile(JobProfile profile) {
* @param profile - job profile.
*/
public boolean submitJobProfile(JobProfile profile, boolean singleJob) {
if (profile == null || !profile.allRequiredKeyExist()) {
LOGGER.error("profile is null or not all required key exists {}", profile == null ? null
: profile.toJsonStr());
if (!isJobValid(profile)) {
return false;
}
String jobId = profile.get(JOB_ID);
Expand All @@ -160,9 +162,7 @@ public boolean submitJobProfile(JobProfile profile, boolean singleJob) {
* @param profile - job profile.
*/
public boolean submitSqlJobProfile(JobProfile profile) {
if (profile == null || !profile.allRequiredKeyExist()) {
LOGGER.error("profile is null or not all required key exists {}", profile == null ? null
: profile.toJsonStr());
if (isJobValid(profile)) {
return false;
}
profile.set(JOB_INSTANCE_ID, SQL_JOB_ID);
Expand All @@ -172,6 +172,23 @@ public boolean submitSqlJobProfile(JobProfile profile) {
return true;
}

private boolean isJobValid(JobProfile profile) {
if (profile == null || !profile.allRequiredKeyExist()) {
LOGGER.error("profile is null or not all required key exists {}", profile == null ? null
: profile.toJsonStr());
return false;
}
if (isJobOverLimit()) {
LOGGER.error("agent cannot add more job, max job size is {}", jobMaxSize);
return false;
}
return true;
}

public boolean isJobOverLimit() {
return jobs.size() >= jobMaxSize;
}

/**
* delete job profile and stop job thread
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.enums.ManagerOpEnum;
import org.apache.inlong.common.enums.PullJobTypeEnum;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
Expand Down Expand Up @@ -227,11 +228,11 @@ public void requestTdmList() {
*/
public void fetchCommand() {
List<CommandEntity> unackedCommands = commandDb.getUnackedCommands();
String resultStr = httpManager.doSentPost(managerTaskUrl, getFileCmdFetchRequest(unackedCommands));
String resultStr = httpManager.doSentPost(managerTaskUrl, getFetchRequest(unackedCommands));
JsonObject resultData = getResultData(resultStr);
JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
if (element != null) {
dealWithFileTaskResult(GSON.fromJson(element.getAsJsonObject(), TaskResult.class));
dealWithFetchResult(GSON.fromJson(element.getAsJsonObject(), TaskResult.class));
}
ackCommands(unackedCommands);
}
Expand Down Expand Up @@ -271,7 +272,7 @@ private void dealWithSqlTaskResult(DbCollectorTaskResult taskResult) {
/**
* the fetch file command can be normal or special
*/
private void dealWithFileTaskResult(TaskResult taskResult) {
private void dealWithFetchResult(TaskResult taskResult) {
LOGGER.info("deal with fetch result {}", taskResult);
for (DataConfig dataConfig : taskResult.getDataConfigs()) {
TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig);
Expand All @@ -291,10 +292,16 @@ private void dealWithFileTaskResult(TaskResult taskResult) {
/**
* form file command fetch request
*/
public TaskRequest getFileCmdFetchRequest(List<CommandEntity> unackedCommands) {
public TaskRequest getFetchRequest(List<CommandEntity> unackedCommands) {
TaskRequest request = new TaskRequest();
request.setAgentIp(localIp);
request.setUuid(uuid);
// when job size is over limit, require no new job
if (agentManager.getJobManager().isJobOverLimit()) {
request.setPullJobType(PullJobTypeEnum.NEVER.getType());
} else {
request.setPullJobType(PullJobTypeEnum.NEW.getType());
}
request.setCommandInfo(unackedCommands);
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private Runnable flushCache() {
LOGGER.info("start flush cache thread for {} ProxySink", inlongGroupId);
while (!shutdown) {
try {
cache.forEach((s, packProxyMessage) -> {
cache.forEach((batchKey, packProxyMessage) -> {
Pair<String, List<byte[]>> result = packProxyMessage.fetchBatch();
if (result != null) {
long sendTime = AgentUtils.getCurrentTime();
Expand All @@ -166,9 +166,9 @@ private Runnable flushCache() {
senderManager.sendBatchAsync(jobInstanceId, inlongGroupId, result.getKey(),
result.getValue(), 0, sendTime);
}
LOGGER.info("send group id {} with message size {}, the job id is {}, read source is {}"
+ "sendTime is {} syncSend {}", inlongGroupId, result.getRight().size(),
jobInstanceId, sourceName, sendTime, syncSend);
LOGGER.info("send group id {}, message key {},with message size {}, the job id is {}, "
+ "read source is {} sendTime is {} syncSend {}", inlongGroupId, batchKey,
result.getRight().size(), jobInstanceId, sourceName, sendTime, syncSend);
}

});
Expand Down
2 changes: 1 addition & 1 deletion inlong-agent/bin/agent-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ else
export JPS="$JAVA_HOME/bin/jps"
fi

HEAP_OPTS="-Xms512m"
HEAP_OPTS="-Xms512m -Xmx=6114"
GC_OPTS="-XX:SurvivorRatio=6 -XX:+UseMembar -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+CMSScavengeBeforeRemark -XX:ParallelCMSThreads=3 -XX:+TieredCompilation -XX:+UseCMSCompactAtFullCollection -verbose:gc -Xloggc:$BASE_DIR/logs/gc.log.`date +%Y-%m-%d-%H-%M-%S` -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$BASE_DIR/logs/ -XX:+CMSClassUnloadingEnabled -XX:CMSInitiatingOccupancyFraction=60 -XX:CMSFullGCsBeforeCompaction=1 -Dsun.net.inetaddr.ttl=3 -Dsun.net.inetaddr.negative.ttl=1 -Djava.net.preferIPv4Stack=true"
AGENT_JVM_ARGS="$HEAP_OPTS $GC_OPTS"

Expand Down
2 changes: 2 additions & 0 deletions inlong-agent/conf/agent.properties
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ job.running.thread.keepAlive=60
job.monitor.interval=5
# check interval(s) whether job is finished
job.finish.checkInterval=6
# the amount of jobs agent can support
job.number.limit=20


############################
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.enums;

import static java.util.Objects.requireNonNull;

public enum PullJobTypeEnum {

NEW(0), NEVER(1);

private int type;

PullJobTypeEnum(int type) {
this.type = type;
}

public static PullJobTypeEnum getPullJobType(int type) {
requireNonNull(type);
switch (type) {
case 0:
return NEW;
case 1:
return NEVER;
default:
throw new RuntimeException("such pull job type doesn't exist");
}
}

public int getType() {
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class TaskRequest {

private String uuid;

private int pullJobType;

private List<CommandEntity> commandInfo = new ArrayList<>();

}