Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global limit for MSQ controller tasks implemented #16889

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b8c6f23
Global msq limit implemented
Aug 13, 2024
1b75e47
fix post-merge compilation issues
Aug 13, 2024
580999f
fix tests compilation
Aug 13, 2024
87db3da
address spell check failure
Aug 13, 2024
c2287eb
Update docs/configuration/index.md
nozjkoitop Aug 14, 2024
b7889cc
Merge branch 'apache:master' into feature-global-msq-controller-task-…
nozjkoitop Sep 12, 2024
02a195a
Revert "fix tests compilation"
Sep 12, 2024
ea3ad87
Revert "fix post-merge compilation issues"
Sep 12, 2024
9cc11f5
Addressed comments moved configs to the WorkerTaskRunnerConfig and me…
Sep 12, 2024
19a1a52
Fix failing checks
Sep 13, 2024
5d45ef4
Checkstyle fix
Sep 13, 2024
1a09ca6
Rollback parallelIndexRatio removal
Sep 23, 2024
ef7ae81
Address review comments
Sep 24, 2024
23e1c36
Address spellchecks
Sep 24, 2024
50edf37
Address review comments
Sep 25, 2024
ca9b244
Deserializer removed
Sep 25, 2024
8c9ca8d
Spellcheck and tests fix
Sep 25, 2024
72f40b5
Addressed review comments
Sep 26, 2024
2ae6371
Increase test coverage
Sep 26, 2024
9841f74
empty commit to trigger workflow
Sep 26, 2024
cf32aa8
Addressed review comments
Sep 27, 2024
e69c0a5
Checkstyle fix
Sep 27, 2024
682b463
Update indexing-service/src/main/java/org/apache/druid/indexing/overl…
nozjkoitop Sep 27, 2024
5c4cfbe
Update indexing-service/src/main/java/org/apache/druid/indexing/overl…
nozjkoitop Oct 1, 2024
ee984f1
Update indexing-service/src/main/java/org/apache/druid/indexing/overl…
nozjkoitop Oct 1, 2024
95a3a31
Merge remote-tracking branch 'upstream/master' into feature-global-ms…
Oct 8, 2024
9459e06
revert per-worker limit changes
Oct 8, 2024
9a95773
Merge remote-tracking branch 'upstream/master' into feature-global-ms…
Oct 10, 2024
6834b52
Global limit with dynamic config implemented using select strategies
Oct 10, 2024
fcaaa04
conflicts resolved
Oct 10, 2024
7be4502
Fix compilation and spellcheck failures
Oct 10, 2024
6e1501f
Trigger the checks
Oct 10, 2024
89150ff
Merge remote-tracking branch 'upstream/master' into feature-global-ms…
Mar 5, 2025
7f5f982
TODO's resolved
Mar 5, 2025
832f62a
Comments addressed, LimiterUtils logic moved to TaskLimits class
Mar 6, 2025
72f79b7
Checkstyle fix
Mar 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ This evenly distributes work across your Middle Managers.
|--------|-----------|-------|
|`type`|`equalDistribution`|required; must be `equalDistribution`|
|`affinityConfig`|[`AffinityConfig`](#affinityconfig) object|null (no affinity)|
|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|

###### `equalDistributionWithCategorySpec`

Expand All @@ -1288,6 +1289,7 @@ This strategy doesn't work with `AutoScaler` since the behavior is undefined.
|--------|-----------|-------|
|`type`|`equalDistributionWithCategorySpec`|required; must be `equalDistributionWithCategorySpec`|
|`workerCategorySpec`|[`WorkerCategorySpec`](#workercategoryspec) object|null (no worker category spec)|
|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|

The following example shows tasks of type `index_kafka` that default to running on Middle Managers of category `c1`, except for tasks that write to datasource `ds1`, which run on Middle Managers of category `c2`.

Expand Down Expand Up @@ -1323,6 +1325,7 @@ Middle Managers up to capacity simultaneously, rather than a single Middle Manag
|--------|-----------|-------|
|`type`| `fillCapacity`|required; must be `fillCapacity`|
|`affinityConfig`| [`AffinityConfig`](#affinityconfig) object |null (no affinity)|
|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|

###### `fillCapacityWithCategorySpec`

Expand All @@ -1334,6 +1337,7 @@ This strategy doesn't work with `AutoScaler` since the behavior is undefined.
|--------|-----------|-------|
|`type`|`fillCapacityWithCategorySpec`.|required; must be `fillCapacityWithCategorySpec`|
|`workerCategorySpec`|[`WorkerCategorySpec`](#workercategoryspec) object|null (no worker category spec)|
|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|

<a name="javascript-worker-select-strategy"></a>

Expand Down Expand Up @@ -1383,6 +1387,16 @@ field. If not provided, the default is to not use it at all.
|`categoryMap`|A JSON map object mapping a task type String name to a [CategoryConfig](#categoryconfig) object, by which you can specify category config for different task type.|`{}`|
|`strong`|With weak workerCategorySpec (the default), tasks for a dataSource may be assigned to other Middle Managers if the Middle Managers specified in `categoryMap` are not able to run all pending tasks in the queue for that dataSource. With strong workerCategorySpec, tasks for a dataSource will only ever be assigned to their specified Middle Managers, and will wait in the pending queue if necessary.|false|

###### `taskLimits`

The `taskLimits` field can be used with the `equalDistribution`, `fillCapacity`, `equalDistributionWithCategorySpec` and `fillCapacityWithCategorySpec` strategies.
If you don't provide it, it will default to not being used.

|Property|Description|Default|
|--------|-----------|-------|
|`maxSlotCountByType`|A map where each key is a task type (`String`), and the corresponding value represents the absolute limit on the number of task slots that tasks of this type can occupy. The value is an `Integer` that is greater than or equal to 0. For example, a value of 5 means that tasks of this type can occupy up to 5 task slots in total. If both absolute and ratio limits are specified for the same task type, the effective limit will be the smaller of the absolute limit and the limit derived from the corresponding ratio. `maxSlotCountByType = {"index_parallel": 3, "query_controller": 5}`. In this example, parallel indexing tasks can occupy up to 3 task slots, and query controllers can occupy up to 5 task slots.|`{}`|
|`maxSlotRatioByType`|A map where each key is a task type (`String`), and the corresponding value is a `Double` which should be in the range [0, 1], representing the ratio of task slots that tasks of this type can occupy. This ratio defines the proportion of total task slots a task type can use, calculated as `ratio * totalSlots`. If both absolute and ratio limits are specified for the same task type, the effective limit will be the smaller of the absolute limit and the limit derived from the corresponding ratio. `maxSlotRatioByType = {"index_parallel": 0.5, "query_controller": 0.25}`. In this example, parallel indexing tasks can occupy up to 50% of the total task slots, and query controllers can occupy up to 25% of the total task slots.|`{}`|

###### CategoryConfig

|Property|Description|Default|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void testSerde() throws Exception
new AffinityConfig(
ImmutableMap.of("foo", ImmutableSet.of("localhost")),
false
)
),
null
),
new EC2AutoScaler(
7,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.Configs;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
Expand All @@ -33,6 +34,8 @@

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

Expand All @@ -45,6 +48,7 @@ public class ImmutableWorkerInfo
private final Worker worker;
private final int currCapacityUsed;
private final int currParallelIndexCapacityUsed;
private final Map<String, Integer> currCapacityUsedByTaskType;
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;
Expand All @@ -57,6 +61,7 @@ public ImmutableWorkerInfo(
@JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed,
@JsonProperty("currCapacityUsedByTaskType") Map<String, Integer> currCapacityUsedByTaskType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or this should be immutable no ?

Copy link
Contributor Author

@nozjkoitop nozjkoitop Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is immutable. incrementTypeSpecificCapacity doesn't change the fields. This typeSpecificCapacity is managed exactly like parallelIndexCapacityUsed, which is immutableWorker.getCurrParallelIndexCapacityUsed() + parallelIndexTaskCapacity in ImmutableWorkerInfo constructor arguments, but here the incremented value is created in ImmutableWorkerInfo itself. Wrt decrement, I dont see any for currParallelIndexCapacityUsed and currCapacityUsed either, if I'm not mistaken it's managed by Provisioning Strategy

@JsonProperty("availabilityGroups") Set<String> availabilityGroups,
@JsonProperty("runningTasks") Collection<String> runningTasks,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime,
Expand All @@ -66,6 +71,7 @@ public ImmutableWorkerInfo(
this.worker = worker;
this.currCapacityUsed = currCapacityUsed;
this.currParallelIndexCapacityUsed = currParallelIndexCapacityUsed;
this.currCapacityUsedByTaskType = Configs.valueOrDefault(currCapacityUsedByTaskType, Collections.emptyMap());
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
this.runningTasks = ImmutableSet.copyOf(runningTasks);
this.lastCompletedTaskTime = lastCompletedTaskTime;
Expand All @@ -76,12 +82,13 @@ public ImmutableWorkerInfo(
Worker worker,
int currCapacityUsed,
int currParallelIndexCapacityUsed,
Map<String, Integer> currCapacityUsedByTaskType,
Set<String> availabilityGroups,
Collection<String> runningTasks,
DateTime lastCompletedTaskTime
)
{
this(worker, currCapacityUsed, currParallelIndexCapacityUsed, availabilityGroups,
this(worker, currCapacityUsed, currParallelIndexCapacityUsed, currCapacityUsedByTaskType, availabilityGroups,
runningTasks, lastCompletedTaskTime, null
);
}
Expand All @@ -94,7 +101,7 @@ public ImmutableWorkerInfo(
DateTime lastCompletedTaskTime
)
{
this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, lastCompletedTaskTime, null);
this(worker, currCapacityUsed, 0, Collections.emptyMap(), availabilityGroups, runningTasks, lastCompletedTaskTime, null);
}

/**
Expand All @@ -111,6 +118,8 @@ public static ImmutableWorkerInfo fromWorkerAnnouncements(
int currParallelIndexCapacityUsed = 0;
ImmutableSet.Builder<String> taskIds = ImmutableSet.builder();
ImmutableSet.Builder<String> availabilityGroups = ImmutableSet.builder();
Map<String, Integer> currCapacityUsedByTaskType = new HashMap<>();


for (final Map.Entry<String, TaskAnnouncement> entry : announcements.entrySet()) {
final TaskAnnouncement announcement = entry.getValue();
Expand All @@ -126,6 +135,8 @@ public static ImmutableWorkerInfo fromWorkerAnnouncements(
currParallelIndexCapacityUsed += requiredCapacity;
}

currCapacityUsedByTaskType.merge(announcement.getTaskType(), 1, Integer::sum);

taskIds.add(taskId);
availabilityGroups.add(taskResource.getAvailabilityGroup());
}
Expand All @@ -135,6 +146,7 @@ public static ImmutableWorkerInfo fromWorkerAnnouncements(
worker,
currCapacityUsed,
currParallelIndexCapacityUsed,
currCapacityUsedByTaskType,
availabilityGroups.build(),
taskIds.build(),
lastCompletedTaskTime,
Expand All @@ -160,6 +172,13 @@ public int getCurrParallelIndexCapacityUsed()
return currParallelIndexCapacityUsed;
}

@JsonProperty("currCapacityUsedByTaskType")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public Map<String, Integer> getCurrCapacityUsedByTaskType()
{
return currCapacityUsedByTaskType;
}

@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
Expand Down Expand Up @@ -243,6 +262,9 @@ public boolean equals(Object o)
if (currParallelIndexCapacityUsed != that.currParallelIndexCapacityUsed) {
return false;
}
if (!currCapacityUsedByTaskType.equals(that.currCapacityUsedByTaskType)) {
return false;
}
if (!worker.equals(that.worker)) {
return false;
}
Expand All @@ -266,6 +288,7 @@ public int hashCode()
int result = worker.hashCode();
result = 31 * result + currCapacityUsed;
result = 31 * result + currParallelIndexCapacityUsed;
result = 31 * result + currCapacityUsedByTaskType.hashCode();
result = 31 * result + availabilityGroups.hashCode();
result = 31 * result + runningTasks.hashCode();
result = 31 * result + lastCompletedTaskTime.hashCode();
Expand All @@ -280,6 +303,7 @@ public String toString()
"worker=" + worker +
", currCapacityUsed=" + currCapacityUsed +
", currParallelIndexCapacityUsed=" + currParallelIndexCapacityUsed +
", currCapacityUsedByTaskType=" + currCapacityUsedByTaskType +
", availabilityGroups=" + availabilityGroups +
", runningTasks=" + runningTasks +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,16 @@ private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableW
int parallelIndexTaskCapacity = task.getType().equals(ParallelIndexSupervisorTask.TYPE)
? task.getTaskResource().getRequiredCapacity()
: 0;
int taskCapacity = task.getTaskResource().getRequiredCapacity();

final Map<String, Integer> typeSpecificCapacity = new HashMap<>(immutableWorker.getCurrCapacityUsedByTaskType());
typeSpecificCapacity.merge(task.getType(), taskCapacity, Integer::sum);

return new ImmutableWorkerInfo(
immutableWorker.getWorker(),
immutableWorker.getCurrCapacityUsed() + 1,
immutableWorker.getCurrParallelIndexCapacityUsed() + parallelIndexTaskCapacity,
typeSpecificCapacity,
Sets.union(
immutableWorker.getAvailabilityGroups(),
Sets.newHashSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ public class EqualDistributionWithAffinityWorkerSelectStrategy extends EqualDist
{
@JsonCreator
public EqualDistributionWithAffinityWorkerSelectStrategy(
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
@JsonProperty("affinityConfig") AffinityConfig affinityConfig,
@JsonProperty("taskLimits") TaskLimits taskLimits
)
{
super(affinityConfig);
super(affinityConfig, taskLimits);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
Expand All @@ -32,13 +33,16 @@
public class EqualDistributionWithCategorySpecWorkerSelectStrategy implements WorkerSelectStrategy
{
private final WorkerCategorySpec workerCategorySpec;
private final TaskLimits taskLimits;

@JsonCreator
public EqualDistributionWithCategorySpecWorkerSelectStrategy(
@JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec
@JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec,
@JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
)
{
this.workerCategorySpec = workerCategorySpec;
this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
}

@JsonProperty
Expand All @@ -47,6 +51,12 @@ public WorkerCategorySpec getWorkerCategorySpec()
return workerCategorySpec;
}

@JsonProperty
public TaskLimits getTaskLimits()
{
return taskLimits;
}

@Nullable
@Override
public ImmutableWorkerInfo findWorkerForTask(
Expand All @@ -60,7 +70,8 @@ public ImmutableWorkerInfo findWorkerForTask(
zkWorkers,
config,
workerCategorySpec,
EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers,
taskLimits
);
}

Expand All @@ -74,20 +85,22 @@ public boolean equals(final Object o)
return false;
}
final EqualDistributionWithCategorySpecWorkerSelectStrategy that = (EqualDistributionWithCategorySpecWorkerSelectStrategy) o;
return Objects.equals(workerCategorySpec, that.workerCategorySpec);
return Objects.equals(workerCategorySpec, that.workerCategorySpec)
&& Objects.equals(taskLimits, that.taskLimits);
}

@Override
public int hashCode()
{
return Objects.hash(workerCategorySpec);
return Objects.hash(workerCategorySpec, taskLimits);
}

@Override
public String toString()
{
return "EqualDistributionWithCategorySpecWorkerSelectStrategy{" +
"workerCategorySpec=" + workerCategorySpec +
", taskLimits=" + taskLimits +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
Expand All @@ -35,13 +37,16 @@
public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrategy
{
private final AffinityConfig affinityConfig;
private final TaskLimits taskLimits;

@JsonCreator
public EqualDistributionWorkerSelectStrategy(
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
@JsonProperty("affinityConfig") AffinityConfig affinityConfig,
@JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
)
{
this.affinityConfig = affinityConfig;
this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
}

@JsonProperty
Expand All @@ -50,6 +55,12 @@ public AffinityConfig getAffinityConfig()
return affinityConfig;
}

@JsonProperty
public TaskLimits getTaskLimits()
{
return taskLimits;
}

@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
Expand All @@ -62,7 +73,8 @@ public ImmutableWorkerInfo findWorkerForTask(
zkWorkers,
config,
affinityConfig,
EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers,
taskLimits
);
}

Expand All @@ -83,20 +95,22 @@ public boolean equals(final Object o)
return false;
}
final EqualDistributionWorkerSelectStrategy that = (EqualDistributionWorkerSelectStrategy) o;
return Objects.equals(affinityConfig, that.affinityConfig);
return Objects.equals(affinityConfig, that.affinityConfig)
&& Objects.equals(taskLimits, that.taskLimits);
}

@Override
public int hashCode()
{
return Objects.hash(affinityConfig);
return Objects.hash(affinityConfig, taskLimits);
}

@Override
public String toString()
{
return "EqualDistributionWorkerSelectStrategy{" +
"affinityConfig=" + affinityConfig +
", taskLimits=" + taskLimits +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo
{
@JsonCreator
public FillCapacityWithAffinityWorkerSelectStrategy(
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
@JsonProperty("affinityConfig") AffinityConfig affinityConfig,
@JsonProperty("taskLimits") TaskLimits taskLimits
)
{
super(affinityConfig);
super(affinityConfig, taskLimits);
}
}
Loading
Loading