Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global limit for MSQ controller tasks implemented #16889

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b8c6f23
Global msq limit implemented
Aug 13, 2024
1b75e47
fix post-merge compilation issues
Aug 13, 2024
580999f
fix tests compilation
Aug 13, 2024
87db3da
address spell check failure
Aug 13, 2024
c2287eb
Update docs/configuration/index.md
nozjkoitop Aug 14, 2024
b7889cc
Merge branch 'apache:master' into feature-global-msq-controller-task-…
nozjkoitop Sep 12, 2024
02a195a
Revert "fix tests compilation"
Sep 12, 2024
ea3ad87
Revert "fix post-merge compilation issues"
Sep 12, 2024
9cc11f5
Addressed comments moved configs to the WorkerTaskRunnerConfig and me…
Sep 12, 2024
19a1a52
Fix failing checks
Sep 13, 2024
5d45ef4
Checkstyle fix
Sep 13, 2024
1a09ca6
Rollback parallelIndexRatio removal
Sep 23, 2024
ef7ae81
Address review comments
Sep 24, 2024
23e1c36
Address spellchecks
Sep 24, 2024
50edf37
Address review comments
Sep 25, 2024
ca9b244
Deserializer removed
Sep 25, 2024
8c9ca8d
Spellcheck and tests fix
Sep 25, 2024
72f40b5
Addressed review comments
Sep 26, 2024
2ae6371
Increase test coverage
Sep 26, 2024
9841f74
empty commit to trigger workflow
Sep 26, 2024
cf32aa8
Addressed review comments
Sep 27, 2024
e69c0a5
Checkstyle fix
Sep 27, 2024
682b463
Update indexing-service/src/main/java/org/apache/druid/indexing/overl…
nozjkoitop Sep 27, 2024
5c4cfbe
Update indexing-service/src/main/java/org/apache/druid/indexing/overl…
nozjkoitop Oct 1, 2024
ee984f1
Update indexing-service/src/main/java/org/apache/druid/indexing/overl…
nozjkoitop Oct 1, 2024
95a3a31
Merge remote-tracking branch 'upstream/master' into feature-global-ms…
Oct 8, 2024
9459e06
revert per-worker limit changes
Oct 8, 2024
9a95773
Merge remote-tracking branch 'upstream/master' into feature-global-ms…
Oct 10, 2024
6834b52
Global limit with dynamic config implemented using select strategies
Oct 10, 2024
fcaaa04
conflicts resolved
Oct 10, 2024
7be4502
Fix compilation and spellcheck failures
Oct 10, 2024
6e1501f
Trigger the checks
Oct 10, 2024
89150ff
Merge remote-tracking branch 'upstream/master' into feature-global-ms…
Mar 5, 2025
7f5f982
TODO's resolved
Mar 5, 2025
832f62a
Comments addressed, LimiterUtils logic moved to TaskLimits class
Mar 6, 2025
72f79b7
Checkstyle fix
Mar 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,8 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|`druid.indexer.queue.restartDelay`|Sleep this long when Overlord queue management throws an exception before trying again.|`PT30S`|
|`druid.indexer.queue.storageSyncRate`|Sync Overlord state this often with an underlying task persistence mechanism.|`PT1M`|
|`druid.indexer.queue.maxTaskPayloadSize`|Maximum allowed size in bytes of a single task payload accepted by the Overlord.|none (allow all task payload sizes)|
|`druid.indexer.queue.controllerTaskSlotRatio`|Optional value which defines the proportion of available task slots that can be allocated to `query_controller` tasks. This is a floating-point value between 0 and 1. |`null`|
|`druid.indexer.queue.maxControllerTaskSlots`|Optional value which specifies the maximum number of task slots that can be allocated to controller tasks. This is an integer value that provides a hard limit on the number of task slots available for `query_controller` tasks.|`null`|

The following configs only apply if the Overlord is running in remote mode. For a description of local vs. remote mode, see [Overlord service](../design/overlord.md).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,12 @@ public Map<String, Long> getTotalTaskSlotCount()
}

public long getTotalTaskSlotCountLong()
{
return getTotalCapacity();
}

@Override
public int getTotalCapacity()
{
return workerConfig.getCapacity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand All @@ -96,6 +97,7 @@
*/
public class TaskQueue
{
private static final String MSQ_CONTROLLER = "query_controller";
private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);
private static final long MIN_WAIT_TIME_MS = 100;

Expand Down Expand Up @@ -440,7 +442,7 @@ private void manageInternalCritical(
notifyStatus(task, taskStatus, taskStatus.getErrorMsg());
continue;
}
if (taskIsReady) {
if (taskIsReady && !isControllerTaskLimitReached(task.getType(), true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should It just be limited to MSQ controllers or other job types as well. Maybe Take a json as a input ? where key is the taskType and the value is the limit/float.

Also what is the user behavior if the task is pending for launch due to limit.

  • Can we communicate why the task is not launching to the user.
  • If the cluster is totally starved, does the controller get timed out eventually and removed from queue?

log.info("Asking taskRunner to run: %s", task.getId());
runnerTaskFuture = taskRunner.run(task);
} else {
Expand All @@ -456,6 +458,9 @@ private void manageInternalCritical(
// if the taskFutures contain this task and this task is pending, also let the taskRunner
// to run it to guarantee it will be assigned to run
// see https://github.com/apache/druid/pull/6991
if (isControllerTaskLimitReached(task.getType(), false)) {
continue;
}
taskRunner.run(task);
}
}
Expand Down Expand Up @@ -490,6 +495,32 @@ private void manageInternalPostCritical(
}
}

private boolean isControllerTaskLimitReached(final String type, final boolean includePending)
{
if (!MSQ_CONTROLLER.equals(type)) {
return false;
}

Integer maxSlots = config.getMaxControllerTaskSlots();
Float slotRatio = config.getControllerTaskSlotRatio();

if (maxSlots == null && slotRatio == null) {
return false;
}

long runningTasks = getRunningControllerTaskCount();

if (includePending) {
runningTasks += getPendingControllerTaskCount();
}

if (maxSlots != null && runningTasks >= maxSlots) {
return true;
}

return slotRatio != null && runningTasks >= Math.floor(taskRunner.getTotalCapacity() * slotRatio);
}

private boolean isTaskPending(Task task)
{
return taskRunner.getPendingTasks()
Expand Down Expand Up @@ -925,6 +956,21 @@ public Map<String, Long> getRunningTaskCount()
));
}

public Long getRunningControllerTaskCount()
{
return taskRunner.getRunningTasks()
.stream()
.filter(workItem -> Objects.equals(workItem.getTaskType(), MSQ_CONTROLLER))
.count();
}
public Long getPendingControllerTaskCount()
{
return taskRunner.getPendingTasks()
.stream()
.filter(workItem -> Objects.equals(workItem.getTaskType(), MSQ_CONTROLLER))
.count();
}

public Map<String, Long> getPendingTaskCount()
{
Map<String, String> taskDatasources = getCurrentTaskDatasources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,12 @@ public Map<String, Long> getTotalTaskSlotCount()
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(workerConfig.getCapacity()));
}

@Override
public int getTotalCapacity()
{
return workerConfig.getCapacity();
}

public long getTotalTaskSlotCountLong()
{
return workerConfig.getCapacity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,24 @@
@JsonProperty
private HumanReadableBytes maxTaskPayloadSize;

@Nullable
@JsonProperty
private Float controllerTaskSlotRatio;

@Nullable
@JsonProperty
private Integer maxControllerTaskSlots;

@JsonCreator
public TaskQueueConfig(
@JsonProperty("maxSize") final Integer maxSize,
@JsonProperty("startDelay") final Period startDelay,
@JsonProperty("restartDelay") final Period restartDelay,
@JsonProperty("storageSyncRate") final Period storageSyncRate,
@JsonProperty("taskCompleteHandlerNumThreads") final Integer taskCompleteHandlerNumThreads,
@JsonProperty("maxTaskPayloadSize") @Nullable final HumanReadableBytes maxTaskPayloadSize
@JsonProperty("maxTaskPayloadSize") @Nullable final HumanReadableBytes maxTaskPayloadSize,
@Nullable @JsonProperty("controllerTaskSlotRatio") final Float controllerTaskSlotRatio,
@Nullable @JsonProperty("maxControllerTaskSlots") final Integer maxControllerTaskSlots
)
{
this.maxSize = Configs.valueOrDefault(maxSize, Integer.MAX_VALUE);
Expand All @@ -64,6 +74,15 @@
this.restartDelay = defaultDuration(restartDelay, "PT30S");
this.storageSyncRate = defaultDuration(storageSyncRate, "PT1M");
this.maxTaskPayloadSize = maxTaskPayloadSize;
if (controllerTaskSlotRatio != null && maxControllerTaskSlots != null) {
throw new IllegalArgumentException(
"Only one controller task limit parameter should be specified, controllerTaskSlotRatio or maxControllerTaskSlots");
} else if (controllerTaskSlotRatio != null && controllerTaskSlotRatio > 1 && controllerTaskSlotRatio <= 0) {
throw new IllegalArgumentException(
"controllerTaskSlotRatio is out of range (0;1]");
}
this.controllerTaskSlotRatio = controllerTaskSlotRatio;
this.maxControllerTaskSlots = maxControllerTaskSlots;
}

public int getMaxSize()
Expand Down Expand Up @@ -100,4 +119,16 @@
{
return (period == null ? new Period(theDefault) : period).toStandardDuration();
}

@Nullable
public Integer getMaxControllerTaskSlots()
{
return maxControllerTaskSlots;
}

@Nullable
public Float getControllerTaskSlotRatio()
{
return controllerTaskSlotRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void setup()
);
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(0L), null, null, null, null),
new TaskQueueConfig(null, new Period(0L), null, null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void setUpIngestionTestBase() throws IOException
);
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(0L), null, null, null, null),
new TaskQueueConfig(null, new Period(0L), null, null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,
Expand Down
Loading
Loading