Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com>
  • Loading branch information
pranikum committed Sep 26, 2022
1 parent ba8f500 commit 8920480
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ void setWeightForDecommissionedZone(List<String> zones, ActionListener<ClusterPu

Map<String, String> weights = new HashMap<>();
zones.forEach(zone -> {
if (zone.equalsIgnoreCase(decommissionAttribute.attributeValue())) {
if (zone.equalsIgnoreCase(decommissionAttribute.attributeName())) {
weights.put(zone, "0");
} else {
weights.put(zone, "1");
Expand All @@ -297,7 +297,7 @@ void setWeightForDecommissionedZone(List<String> zones, ActionListener<ClusterPu

// WRR API will validate invalid weights
final ClusterPutWRRWeightsRequest clusterWeightRequest = new ClusterPutWRRWeightsRequest();
clusterWeightRequest.attributeName("zone");
clusterWeightRequest.attributeName(decommissionAttribute.attributeValue());
clusterWeightRequest.setWRRWeight(weights);

transportService.sendRequest(
Expand Down Expand Up @@ -335,13 +335,13 @@ void handleNodesDecommissionRequest(
Set<DiscoveryNode> decommissionedNodes,
String reason,
TimeValue timeout,
TimeValue timeoutForNodeDecommission,
TimeValue timeoutForNodeDraining,
ActionListener<Void> listener
) {

if (timeoutForNodeDecommission.getSeconds() > 0) {
if (timeoutForNodeDraining.getSeconds() > 0) {
// Wait for timeout to happen. Log the active connection before decommissioning of nodes.
scheduleDecommissionNodesRequestCheck(decommissionedNodes, reason, timeout, listener, timeoutForNodeDecommission);
scheduleDecommissionNodesRequestCheck(decommissionedNodes, reason, timeout, listener, timeoutForNodeDraining);
} else {
getActiveRequestCountOnDecommissionNodes(decommissionedNodes);
removeDecommissionedNodes(decommissionedNodes, reason, timeout, listener);
Expand All @@ -364,7 +364,7 @@ private void scheduleDecommissionNodesRequestCheck(
String reason,
TimeValue timeout,
ActionListener<Void> nodesRemovedListener,
TimeValue timeoutForNodeDecommission
TimeValue timeoutForNodeDraining
) {
transportService.getThreadPool().schedule(new Runnable() {
@Override
Expand All @@ -373,7 +373,7 @@ public void run() {
getActiveRequestCountOnDecommissionNodes(decommissionedNodes);
removeDecommissionedNodes(decommissionedNodes, reason, timeout, nodesRemovedListener);
}
}, timeoutForNodeDecommission, org.opensearch.threadpool.ThreadPool.Names.SAME);
}, timeoutForNodeDraining, org.opensearch.threadpool.ThreadPool.Names.SAME);
}

private void getActiveRequestCountOnDecommissionNodes(Set<DiscoveryNode> decommissionedNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void setForcedAwarenessAttributes(Settings forceSettings) {
public void startDecommissionAction(
final DecommissionAttribute decommissionAttribute,
final ActionListener<ClusterStateUpdateResponse> listener,
final TimeValue timeOutForNodeDecommission
final TimeValue timeOutForNodeDraining
) {
// register the metadata with status as INIT as first step
clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) {
Expand Down Expand Up @@ -158,19 +158,15 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
decommissionAttributeMetadata.decommissionAttribute(),
decommissionAttributeMetadata.status()
);
decommissionClusterManagerNodes(
decommissionAttributeMetadata.decommissionAttribute(),
listener,
timeOutForNodeDecommission
);
decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener, timeOutForNodeDraining);
}
});
}

private synchronized void decommissionClusterManagerNodes(
final DecommissionAttribute decommissionAttribute,
ActionListener<ClusterStateUpdateResponse> listener,
TimeValue timeOutForNodeDecommission
TimeValue timeOutForNodeDraining
) {
ClusterState state = clusterService.getClusterApplierService().state();
// since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further
Expand Down Expand Up @@ -214,7 +210,7 @@ public void onResponse(Void unused) {
// nodes can be part of Voting Config
listener.onResponse(new ClusterStateUpdateResponse(true));

weighAwayDecommissionedZone(state, timeOutForNodeDecommission);
weighAwayDecommissionedZone(state, timeOutForNodeDraining);
// decommissionController.setWeightForDecommissionedZone();
// failDecommissionedNodes(clusterService.getClusterApplierService().state(), timeOutForNodeDecommission);
}
Expand Down Expand Up @@ -313,7 +309,7 @@ public void onFailure(Exception e) {
}
}

private void weighAwayDecommissionedZone(ClusterState state, TimeValue timeOutForNodeDecommission) {
private void weighAwayDecommissionedZone(ClusterState state, TimeValue timeOutForNodeDraining) {
// this method ensures no matter what, we always exit from this function after clearing the voting config exclusion
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
Expand All @@ -331,7 +327,7 @@ public void onResponse(DecommissionStatus status) {
@Override
public void onResponse(ClusterPutWRRWeightsResponse response) {
clearVotingConfigExclusionAndUpdateStatus(true, true);
failDecommissionedNodes(state, timeOutForNodeDecommission);
failDecommissionedNodes(state, timeOutForNodeDraining);
}

@Override
Expand All @@ -357,7 +353,7 @@ public void onFailure(Exception e) {
});
}

private void failDecommissionedNodes(ClusterState state, TimeValue timeOutForNodeDecommission) {
private void failDecommissionedNodes(ClusterState state, TimeValue timeOutForNodeDraining) {
// this method ensures no matter what, we always exit from this function after clearing the voting config exclusion
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
Expand All @@ -372,7 +368,7 @@ public void onResponse(DecommissionStatus status) {
filterNodesWithDecommissionAttribute(clusterService.getClusterApplierService().state(), decommissionAttribute, false),
"nodes-decommissioned",
TimeValue.timeValueSeconds(120L),
timeOutForNodeDecommission,
timeOutForNodeDraining,
new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
Expand Down

0 comments on commit 8920480

Please sign in to comment.