Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kp/qsb cancellation #15133

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
a1508ff
initial code for the sandbox resource tracking and cancellation frame…
kiranprakash154 May 30, 2024
3bf611e
Fix Failing Tests
kiranprakash154 May 30, 2024
71f6088
spotless Apply
kiranprakash154 May 30, 2024
71a61ec
Update SandboxService.java
kiranprakash154 May 30, 2024
60fb018
Update SandboxService.java
kiranprakash154 May 30, 2024
b39aa89
Update SandboxTask.java
kiranprakash154 May 30, 2024
827b3cf
Add java docs
kiranprakash154 May 30, 2024
e7a04aa
spotless
kiranprakash154 May 30, 2024
6677050
javadocs
kiranprakash154 May 30, 2024
ee15703
javadocs
kiranprakash154 May 30, 2024
15d6236
java docs
kiranprakash154 May 30, 2024
0690411
Update AbstractTaskCancellation.java
kiranprakash154 May 30, 2024
4251851
Update SandboxModule.java
kiranprakash154 May 30, 2024
a38dda9
Some tests and stubs
kiranprakash154 Jun 4, 2024
a5cf2c9
spotless
kiranprakash154 Jun 4, 2024
a76e963
:server:testingConventions
kiranprakash154 Jun 4, 2024
3eabc99
Update AbstractTaskCancellation.java
kiranprakash154 Jun 5, 2024
1565b11
more tests
kiranprakash154 Jun 5, 2024
d2e3f0a
addressing comments
kiranprakash154 Jul 3, 2024
3b31926
revert some accidentally pushed files
kiranprakash154 Jul 3, 2024
e250d7d
resolve flakiness
kiranprakash154 Jul 15, 2024
0ae7c85
renaming sandbox to querygroup and adjusting code based on merged PRs
kiranprakash154 Jul 18, 2024
c4906ad
jvm to memory
kiranprakash154 Jul 19, 2024
3e50f7a
missing java docs
kiranprakash154 Jul 19, 2024
9b2f270
spotless
kiranprakash154 Jul 19, 2024
d9e0858
Update CHANGELOG.md
kiranprakash154 Jul 19, 2024
1a652b2
pluck cancellation changes out of this PR
kiranprakash154 Jul 19, 2024
403605c
remove unused
kiranprakash154 Jul 19, 2024
6d0ee4f
remove cancellation related code and add more tests coverage
kiranprakash154 Jul 22, 2024
39c33b1
us only memory and not jvm
kiranprakash154 Jul 22, 2024
bc5e0aa
test conventions
kiranprakash154 Jul 22, 2024
b5d6fc8
Bring back enum
kiranprakash154 Jul 29, 2024
e2922f4
Update SearchBackpressureService.java
kiranprakash154 Jul 29, 2024
029593c
revert changes
kiranprakash154 Jul 29, 2024
b8ae0ba
revert changes
kiranprakash154 Jul 29, 2024
4b00edf
all required changes
kiranprakash154 Jul 29, 2024
28c160f
Update CHANGELOG.md
kiranprakash154 Jul 29, 2024
42787a5
cleanups
kiranprakash154 Jul 29, 2024
ceca794
Delete QueryGroupService.java
kiranprakash154 Jul 29, 2024
c18722d
cleanups
kiranprakash154 Jul 29, 2024
b95d97b
Update QueryGroupLevelResourceUsageViewTests.java
kiranprakash154 Jul 29, 2024
2e0597d
Update QueryGroupLevelResourceUsageViewTests.java
kiranprakash154 Jul 29, 2024
0c3351f
Update QueryGroupResourceUsageTrackerService.java
kiranprakash154 Jul 29, 2024
d7d2f98
Update QueryGroupResourceUsageTrackerService.java
kiranprakash154 Jul 30, 2024
ab87fb5
Update QueryGroupResourceUsageTrackerService.java
kiranprakash154 Jul 31, 2024
56a8b1c
Update CHANGELOG.md
kiranprakash154 Jul 31, 2024
a9c52c9
rebasing with latest main
kiranprakash154 Jul 31, 2024
f957a3c
remove experimental
kiranprakash154 Jul 31, 2024
7cff5af
remove queryGroupId
kiranprakash154 Jul 31, 2024
1c5777c
Update QueryGroupResourceUsageTrackerService.java
kiranprakash154 Jul 31, 2024
c165e59
querygroup cancellation
kiranprakash154 Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))
- Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1391,7 +1391,7 @@ public Builder put(final QueryGroup queryGroup) {
return queryGroups(existing);
}

private Map<String, QueryGroup> getQueryGroups() {
public Map<String, QueryGroup> getQueryGroups() {
return Optional.ofNullable(this.customs.get(QueryGroupMetadata.TYPE))
.map(o -> (QueryGroupMetadata) o)
.map(QueryGroupMetadata::queryGroups)
Expand Down
42 changes: 42 additions & 0 deletions server/src/main/java/org/opensearch/wlm/QueryGroupHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;

import java.util.Map;
import java.util.function.Function;

/**
* Helper class for calculating resource usage for different resource types.
*/
public class QueryGroupHelper {

/**
* A map that associates each {@link ResourceType} with a function that calculates the resource usage for a given {@link Task}.
*/
private static final Map<ResourceType, Function<Task, Long>> resourceUsageCalculator = Map.of(
ResourceType.MEMORY,
(task) -> task.getTotalResourceStats().getMemoryInBytes(),
ResourceType.CPU,
(task) -> task.getTotalResourceStats().getCpuTimeInNanos()
);

/**
* Gets the resource usage for a given resource type and task.
*
* @param resource the resource type
* @param task the task for which to calculate resource usage
* @return the resource usage
*/
public static long getResourceUsage(ResourceType resource, Task task) {
return resourceUsageCalculator.get(resource).apply(task);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;

import java.util.List;
import java.util.Map;

/**
* Represents the point in time view of resource usage of a QueryGroup and
* has a 1:1 relation with a QueryGroup.
* This class holds the resource usage data and the list of active tasks.
*/
public class QueryGroupLevelResourceUsageView {
// resourceUsage holds the resource usage data for a QueryGroup at a point in time
private final Map<ResourceType, Long> resourceUsage;
// activeTasks holds the list of active tasks for a QueryGroup at a point in time
private final List<Task> activeTasks;

public QueryGroupLevelResourceUsageView(Map<ResourceType, Long> resourceUsage, List<Task> activeTasks) {
this.resourceUsage = resourceUsage;
this.activeTasks = activeTasks;
}

/**
* Returns the resource usage data.
*
* @return The map of resource usage data
*/
public Map<ResourceType, Long> getResourceUsageData() {
return resourceUsage;
}

/**
* Returns the list of active tasks.
*
* @return The list of active tasks
*/
public List<Task> getActiveTasks() {
return activeTasks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.cancellation;

import org.opensearch.search.ResourceType;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
import org.opensearch.wlm.QueryGroupHelper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
* Represents an abstract task selection strategy.
* This class implements the TaskSelectionStrategy interface and provides a method to select tasks for cancellation based on a sorting condition.
* The specific sorting condition depends on the implementation.
*/
public abstract class AbstractTaskSelectionStrategy implements TaskSelectionStrategy {

/**
* Returns a comparator that defines the sorting condition for tasks.
* The specific sorting condition depends on the implementation.
*
* @return The comparator
*/
public abstract Comparator<Task> sortingCondition();

/**
* Selects tasks for cancellation based on the provided limit and resource type.
* The tasks are sorted based on the sorting condition and then selected until the accumulated resource usage reaches the limit.
*
* @param tasks The list of tasks from which to select
* @param limit The limit on the accumulated resource usage
* @param resourceType The type of resource to consider
* @return The list of selected tasks
* @throws IllegalArgumentException If the limit is less than zero
*/
@Override
public List<TaskCancellation> selectTasksForCancellation(List<Task> tasks, long limit, ResourceType resourceType) {
if (limit < 0) {
throw new IllegalArgumentException("reduceBy has to be greater than zero");
}
if (limit == 0) {
return Collections.emptyList();
}

List<Task> sortedTasks = tasks.stream().sorted(sortingCondition()).collect(Collectors.toList());

List<TaskCancellation> selectedTasks = new ArrayList<>();
long accumulated = 0;

for (Task task : sortedTasks) {
if (task instanceof CancellableTask) {
selectedTasks.add(createTaskCancellation((CancellableTask) task));
accumulated += QueryGroupHelper.getResourceUsage(resourceType, task);
if (accumulated >= limit) {
break;
}
}
}
return selectedTasks;
}

private TaskCancellation createTaskCancellation(CancellableTask task) {
// TODO add correct reason and callbacks
return new TaskCancellation(task, List.of(new TaskCancellation.Reason("limits exceeded", 5)), List.of(this::callbackOnCancel));
}

private void callbackOnCancel() {
// todo Implement callback logic here mostly used for Stats
}
}
Loading
Loading