-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Global limit for MSQ controller tasks implemented #16889
Global limit for MSQ controller tasks implemented #16889
Conversation
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java
Fixed
Show fixed
Hide fixed
indexing-service/src/test/java/org/apache/druid/indexing/overlord/SimpleTaskRunner.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove useless tail
Co-authored-by: Benedict Jin <asdf2014@apache.org>
Done, thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, it could cause a significant slowdown in processing and may lead to temporary deadlock situations.
@nozjkoitop , rather than proceeding with a limit, I think we should try to figure out what is causing the slowdown in processing and/or the deadlock. Can you elaborate on this?
Once we have done an analysis of exactly what goes wrong when we have too many controller tasks and we have decided to impose a limit, it should not be done through the TaskQueue
as done in this PR. Instead, it should be similar to the implementation of parallelIndexTaskSlotRatio
and the config should most likely live in WorkerTaskRunnerConfig
.
Lines 37 to 50 in 73ff9f9
/** | |
* The number of task slots that a parallel indexing task can take is restricted using this config as a multiplier | |
* | |
* A value of 1 means no restriction on the number of slots ParallelIndexSupervisorTasks can occupy (default behaviour) | |
* A value of 0 means ParallelIndexSupervisorTasks can occupy no slots. | |
* Deadlocks can occur if the all task slots are occupied by ParallelIndexSupervisorTasks, | |
* as no subtask would ever get a slot. Set this config to a value < 1 to prevent deadlocks. | |
* | |
* @return ratio of task slots available to a parallel indexing task at a worker level | |
*/ | |
public double getParallelIndexTaskSlotRatio() | |
{ | |
return parallelIndexTaskSlotRatio; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a nice feature to have. Left some comments.
Thanks @nozjkoitop for taking this up.
@@ -440,7 +442,7 @@ private void manageInternalCritical( | |||
notifyStatus(task, taskStatus, taskStatus.getErrorMsg()); | |||
continue; | |||
} | |||
if (taskIsReady) { | |||
if (taskIsReady && !isControllerTaskLimitReached(task.getType(), true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should It just be limited to MSQ controllers or other job types as well. Maybe Take a json as a input ? where key is the taskType and the value is the limit/float.
Also what is the user behavior if the task is pending for launch due to limit.
- Can we communicate why the task is not launching to the user.
- If the cluster is totally starved, does the controller get timed out eventually and removed from queue?
Thanks @kfaraz, @cryptoe for your comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to ensure that we do not remove any of the existing properties or config fields.
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
Show resolved
Hide resolved
|
||
public class WorkerTaskRunnerConfig | ||
{ | ||
@JsonProperty | ||
private String minWorkerVersion = "0"; | ||
|
||
@JsonProperty | ||
private double parallelIndexTaskSlotRatio = 1; | ||
@JsonDeserialize(using = CustomJobTypeLimitsDeserializer.class) | ||
private Map<String, Number> customJobTypeLimits = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a custom deserializer?
Can't we just have a map from String to Double?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was added mostly for validation as the idea is to have double / integer values to have an option to specify not only the ratio but also a limit
@@ -57,6 +63,7 @@ public ImmutableWorkerInfo( | |||
@JsonProperty("worker") Worker worker, | |||
@JsonProperty("currCapacityUsed") int currCapacityUsed, | |||
@JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed, | |||
@JsonProperty("currTypeSpecificCapacityUsed") Map<String, Integer> typeSpecificCapacityMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be nullable no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, thanks
@@ -225,6 +253,89 @@ private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio) | |||
return workerParallelIndexCapacity; | |||
} | |||
|
|||
public boolean canRunTask(Task task, Map<String, Number> taskLimits) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add java docs for this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -167,6 +167,29 @@ private int getCurrParallelIndexCapacityUsed(Map<String, TaskAnnouncement> tasks | |||
return currParallelIndexCapacityUsed; | |||
} | |||
|
|||
@JsonProperty("currTypeSpecificCapacityUsed") | |||
public Map<String, Integer> getCurrTypeSpecificCapacityUsed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we had deprecated Zk based runner in favour of http.
@kfaraz Does this change still make sense ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, ZK-based task runner is deprecated and we should not support the new feature with ZK.
docs/configuration/index.md
Outdated
@@ -1135,6 +1135,7 @@ The following configs only apply if the Overlord is running in remote mode. For | |||
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task has been assigned to a Middle Manager before throwing an error.|`PT5M`| | |||
|`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to send tasks to. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.worker.version`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons. |"0"| | |||
| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1| | |||
|`druid.indexer.runner.taskSlotLimits`| A map where each key is a task type, and the corresponding value represents the limit on the number of task slots that a task of that type can occupy on a worker. The key is a `String` that specifies the task type. The value can either be a Double or Integer. A `Double` in the range [0, 1], representing a ratio of the available task slots that tasks of this type can occupy. An `Integer` that is greater than or equal to 0, representing an absolute limit on the number of task slots that tasks of this type can occupy.|Empty map| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please provide an example as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this interact with compaction slots ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example added! Good catch on the compaction slots. I'll need to test that, but based on the code, it looks like compaction slots availability will be checked twice (in the duty, and during the worker selection) if related entry will be included in the taskSlotLimits
map. If there's a conflict, some tasks might end up in a pending state for a while.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed that the number of submitted tasks is linked to the compaction task slot limits. However, execution might be delayed if the custom limit is smaller than the one set for compaction.
For example here, taskSlotsMax = 3
, but in the overlord configuration, I have druid.indexer.runner.taskSlotLimits={"compact": 2}
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the limit on compaction tasks (or kill tasks for that matter) should not be a concern.
This is a runtime property, typically controlled by an admin.
So, if an admin wants to restrict the number of concurrent compaction tasks, it is fair to honor that irrespective of the value of compactionTaskSlotRatio
or maxCompactionTaskSlots
set in the coordinator dynamic configs.
We just need to call it out clearly in the release notes and the docs of the new property.
You can try this link https://druid.apache.org/community/join-slack from the Druid docs. |
…q-controller-task-limit
…q-controller-task-limit
Hey @cryptoe, what do you think about this solution? I've used SelectWorkerStrategies to utilize dynamic configuration of global limits, and it seems like a good option to me. |
Apologies, I have been meaning to get to this PR. Will try to finish it by EOW. |
This pull request has been marked as stale due to 60 days of inactivity. |
Hi @cryptoe will you have some time to review this changes? |
@nozjkoitop Could you please rebase this PR. Will take a look again. Overall LGTM |
…q-controller-task-limit
@cryptoe Done |
Thanks for your work on this, @nozjkoitop ! |
indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/LimiterUtils.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/LimiterUtils.java
Outdated
Show resolved
Hide resolved
@@ -225,6 +244,13 @@ private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio) | |||
return workerParallelIndexCapacity; | |||
} | |||
|
|||
public Map<String, Integer> incrementTypeSpecificCapacity(String type, int capacityToAdd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a corresponding decrement ?
@@ -57,6 +61,7 @@ public ImmutableWorkerInfo( | |||
@JsonProperty("worker") Worker worker, | |||
@JsonProperty("currCapacityUsed") int currCapacityUsed, | |||
@JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed, | |||
@JsonProperty("currCapacityUsedByTaskType") Map<String, Integer> currCapacityUsedByTaskType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or this should be immutable no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is immutable. incrementTypeSpecificCapacity doesn't change the fields. This typeSpecificCapacity is managed exactly like parallelIndexCapacityUsed, which is immutableWorker.getCurrParallelIndexCapacityUsed() + parallelIndexTaskCapacity
in ImmutableWorkerInfo constructor arguments, but here the incremented value is created in ImmutableWorkerInfo itself. Wrt decrement, I dont see any for currParallelIndexCapacityUsed and currCapacityUsed either, if I'm not mistaken it's managed by Provisioning Strategy
Thanks for the changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some suggestions.
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
Outdated
Show resolved
Hide resolved
...che/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java
Outdated
Show resolved
Hide resolved
...g/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
Outdated
Show resolved
Hide resolved
...g/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
Outdated
Show resolved
Hide resolved
...che/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/TaskLimits.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, @nozjkoitop !
Improvements
Implemented a way to limit on the number of query controller tasks running at the same time. This limit specifies what percentage or amount of task slots can be allocated to query controllers. If the limit is reached, the tasks would wait for resources instead of potentially blocking the execution of other tasks (and failing after a timeout).
Rationale
There is no mechanism in Druid to prevent the cluster from being overloaded with controller tasks. Currently, it could cause a significant slowdown in processing and may lead to temporary deadlock situations.
Introduced new configuration options
druid.indexer.queue.controllerTaskSlotRatio
- optional value which defines the proportion of available task slots that can be allocated to msq controller tasks. This is a floating-point value between 0 and 1. Defaults to null.druid.indexer.queue.maxControllerTaskSlots
- optional value which specifies the maximum number of task slots that can be allocated to controller tasks. This is an integer value that provides a hard limit on the number of task slots available for msq controller tasks. Defaults to null.This PR has: