Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global limit for MSQ controller tasks implemented #16889

Merged

Conversation

nozjkoitop
Copy link
Contributor

@nozjkoitop nozjkoitop commented Aug 13, 2024

Improvements

Implemented a way to limit on the number of query controller tasks running at the same time. This limit specifies what percentage or amount of task slots can be allocated to query controllers. If the limit is reached, the tasks would wait for resources instead of potentially blocking the execution of other tasks (and failing after a timeout).

Rationale

There is no mechanism in Druid to prevent the cluster from being overloaded with controller tasks. Currently, it could cause a significant slowdown in processing and may lead to temporary deadlock situations.

Introduced new configuration options

  • druid.indexer.queue.controllerTaskSlotRatio - optional value which defines the proportion of available task slots that can be allocated to msq controller tasks. This is a floating-point value between 0 and 1. Defaults to null.
  • druid.indexer.queue.maxControllerTaskSlots - optional value which specifies the maximum number of task slots that can be allocated to controller tasks. This is an integer value that provides a hard limit on the number of task slots available for msq controller tasks. Defaults to null.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Member

@asdf2014 asdf2014 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove useless tail

Co-authored-by: Benedict Jin <asdf2014@apache.org>
@nozjkoitop
Copy link
Contributor Author

Remove useless tail

Done, thanks

@kfaraz kfaraz requested a review from cryptoe August 16, 2024 02:50
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it could cause a significant slowdown in processing and may lead to temporary deadlock situations.

@nozjkoitop , rather than proceeding with a limit, I think we should try to figure out what is causing the slowdown in processing and/or the deadlock. Can you elaborate on this?

Once we have done an analysis of exactly what goes wrong when we have too many controller tasks and we have decided to impose a limit, it should not be done through the TaskQueue as done in this PR. Instead, it should be similar to the implementation of parallelIndexTaskSlotRatio and the config should most likely live in WorkerTaskRunnerConfig.

/**
* The number of task slots that a parallel indexing task can take is restricted using this config as a multiplier
*
* A value of 1 means no restriction on the number of slots ParallelIndexSupervisorTasks can occupy (default behaviour)
* A value of 0 means ParallelIndexSupervisorTasks can occupy no slots.
* Deadlocks can occur if the all task slots are occupied by ParallelIndexSupervisorTasks,
* as no subtask would ever get a slot. Set this config to a value < 1 to prevent deadlocks.
*
* @return ratio of task slots available to a parallel indexing task at a worker level
*/
public double getParallelIndexTaskSlotRatio()
{
return parallelIndexTaskSlotRatio;
}

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a nice feature to have. Left some comments.
Thanks @nozjkoitop for taking this up.

@@ -440,7 +442,7 @@ private void manageInternalCritical(
notifyStatus(task, taskStatus, taskStatus.getErrorMsg());
continue;
}
if (taskIsReady) {
if (taskIsReady && !isControllerTaskLimitReached(task.getType(), true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should It just be limited to MSQ controllers or other job types as well. Maybe Take a json as a input ? where key is the taskType and the value is the limit/float.

Also what is the user behavior if the task is pending for launch due to limit.

  • Can we communicate why the task is not launching to the user.
  • If the cluster is totally starved, does the controller get timed out eventually and removed from queue?

@nozjkoitop
Copy link
Contributor Author

Thanks @kfaraz, @cryptoe for your comments
The most trivial deadlock scenario occurs when we queue a group of controller tasks but don't have available task slots for actual workers. This results in tasks hanging and eventually timing out. Thanks for highlighting the WorkerTaskRunnerConfig, it seems like the great place for this configuration. I've updated the behavior and merged it with parallelIndexTaskSlotRatio, now it's more flexible, also I've added the logging to inform the user why the task is Pending

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to ensure that we do not remove any of the existing properties or config fields.


public class WorkerTaskRunnerConfig
{
@JsonProperty
private String minWorkerVersion = "0";

@JsonProperty
private double parallelIndexTaskSlotRatio = 1;
@JsonDeserialize(using = CustomJobTypeLimitsDeserializer.class)
private Map<String, Number> customJobTypeLimits = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a custom deserializer?
Can't we just have a map from String to Double?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added mostly for validation as the idea is to have double / integer values to have an option to specify not only the ratio but also a limit

@nozjkoitop nozjkoitop requested a review from kfaraz September 24, 2024 07:23
@@ -57,6 +63,7 @@ public ImmutableWorkerInfo(
@JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed,
@JsonProperty("currTypeSpecificCapacityUsed") Map<String, Integer> typeSpecificCapacityMap,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be nullable no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, thanks

@@ -225,6 +253,89 @@ private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}

public boolean canRunTask(Task task, Map<String, Number> taskLimits)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add java docs for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -167,6 +167,29 @@ private int getCurrParallelIndexCapacityUsed(Map<String, TaskAnnouncement> tasks
return currParallelIndexCapacityUsed;
}

@JsonProperty("currTypeSpecificCapacityUsed")
public Map<String, Integer> getCurrTypeSpecificCapacityUsed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we had deprecated Zk based runner in favour of http.
@kfaraz Does this change still make sense ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ZK-based task runner is deprecated and we should not support the new feature with ZK.

@@ -1135,6 +1135,7 @@ The following configs only apply if the Overlord is running in remote mode. For
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task has been assigned to a Middle Manager before throwing an error.|`PT5M`|
|`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to send tasks to. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.worker.version`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons. |"0"|
| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1|
|`druid.indexer.runner.taskSlotLimits`| A map where each key is a task type, and the corresponding value represents the limit on the number of task slots that a task of that type can occupy on a worker. The key is a `String` that specifies the task type. The value can either be a Double or Integer. A `Double` in the range [0, 1], representing a ratio of the available task slots that tasks of this type can occupy. An `Integer` that is greater than or equal to 0, representing an absolute limit on the number of task slots that tasks of this type can occupy.|Empty map|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please provide an example as well ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this interact with compaction slots ?

Copy link
Contributor Author

@nozjkoitop nozjkoitop Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example added! Good catch on the compaction slots. I'll need to test that, but based on the code, it looks like compaction slots availability will be checked twice (in the duty, and during the worker selection) if related entry will be included in the taskSlotLimits map. If there's a conflict, some tasks might end up in a pending state for a while.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed that the number of submitted tasks is linked to the compaction task slot limits. However, execution might be delayed if the custom limit is smaller than the one set for compaction.
image
For example here, taskSlotsMax = 3, but in the overlord configuration, I have druid.indexer.runner.taskSlotLimits={"compact": 2}.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the limit on compaction tasks (or kill tasks for that matter) should not be a concern.
This is a runtime property, typically controlled by an admin.
So, if an admin wants to restrict the number of concurrent compaction tasks, it is fair to honor that irrespective of the value of compactionTaskSlotRatio or maxCompactionTaskSlots set in the coordinator dynamic configs.

We just need to call it out clearly in the release notes and the docs of the new property.

@nozjkoitop nozjkoitop requested a review from cryptoe September 24, 2024 14:02
@kfaraz
Copy link
Contributor

kfaraz commented Oct 2, 2024

I'm afraid I don't have access to this Slack workspace

You can try this link https://druid.apache.org/community/join-slack from the Druid docs.

@nozjkoitop
Copy link
Contributor Author

Hey @cryptoe, what do you think about this solution? I've used SelectWorkerStrategies to utilize dynamic configuration of global limits, and it seems like a good option to me.

@nozjkoitop nozjkoitop requested a review from cryptoe October 10, 2024 10:38
@cryptoe
Copy link
Contributor

cryptoe commented Nov 21, 2024

Apologies, I have been meaning to get to this PR. Will try to finish it by EOW.

Copy link

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions bot added the stale label Jan 21, 2025
@nozjkoitop
Copy link
Contributor Author

Hi @cryptoe will you have some time to review this changes?

@github-actions github-actions bot removed the stale label Jan 22, 2025
@cryptoe
Copy link
Contributor

cryptoe commented Mar 4, 2025

@nozjkoitop Could you please rebase this PR. Will take a look again. Overall LGTM
cc @kfaraz

@nozjkoitop
Copy link
Contributor Author

@cryptoe Done

@kfaraz
Copy link
Contributor

kfaraz commented Mar 5, 2025

Thanks for your work on this, @nozjkoitop !
I will try to finish the review of this PR later this week.

@@ -225,6 +244,13 @@ private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}

public Map<String, Integer> incrementTypeSpecificCapacity(String type, int capacityToAdd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a corresponding decrement ?

@@ -57,6 +61,7 @@ public ImmutableWorkerInfo(
@JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed,
@JsonProperty("currCapacityUsedByTaskType") Map<String, Integer> currCapacityUsedByTaskType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or this should be immutable no ?

Copy link
Contributor Author

@nozjkoitop nozjkoitop Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is immutable. incrementTypeSpecificCapacity doesn't change the fields. This typeSpecificCapacity is managed exactly like parallelIndexCapacityUsed, which is immutableWorker.getCurrParallelIndexCapacityUsed() + parallelIndexTaskCapacity in ImmutableWorkerInfo constructor arguments, but here the incremented value is created in ImmutableWorkerInfo itself. Wrt decrement, I dont see any for currParallelIndexCapacityUsed and currCapacityUsed either, if I'm not mistaken it's managed by Provisioning Strategy

@nozjkoitop nozjkoitop requested a review from cryptoe March 5, 2025 14:55
@cryptoe
Copy link
Contributor

cryptoe commented Mar 6, 2025

Thanks for the changes.
Lets wait for @kfaraz review and then we can get this merged.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some suggestions.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, @nozjkoitop !

@kfaraz kfaraz merged commit 8b56824 into apache:master Mar 6, 2025
75 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants