Skip to content

Commit

Permalink
using voting exclusion only
Browse files Browse the repository at this point in the history
  • Loading branch information
gbbafna committed Jul 20, 2022
1 parent 646a03f commit 1e6d54a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.IndicesOptions;
Expand All @@ -46,7 +52,6 @@
import org.opensearch.cluster.LocalClusterUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -68,6 +73,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand All @@ -84,6 +90,10 @@ public class TransportClusterHealthAction extends TransportClusterManagerNodeRea

private final Discovery discovery;

private final TransportAddVotingConfigExclusionsAction exclusionsAction;

private final TransportClearVotingConfigExclusionsAction clearAction;

@Inject
public TransportClusterHealthAction(
TransportService transportService,
Expand All @@ -92,7 +102,9 @@ public TransportClusterHealthAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AllocationService allocationService,
NodeService nodeService
NodeService nodeService,
TransportAddVotingConfigExclusionsAction exclusionsAction,
TransportClearVotingConfigExclusionsAction clearAction
) {
super(
ClusterHealthAction.NAME,
Expand All @@ -106,6 +118,8 @@ public TransportClusterHealthAction(
);
this.allocationService = allocationService;
this.discovery = nodeService.discovery;
this.exclusionsAction = exclusionsAction;
this.clearAction = clearAction;
}

@Override
Expand Down Expand Up @@ -139,18 +153,47 @@ protected void masterOperation(
final ClusterState unusedState,
final ActionListener<ClusterHealthResponse> listener
) {
if (discovery instanceof Coordinator) {
//find the node .
String masterId = clusterService.state().getNodes().getMasterNodeId();
DiscoveryNode mast = clusterService.state().getNodes().get(masterId);
if ( mast.getName() == "runTask-2" ) {
logger.info("not doing anything");
} else {
logger.info("trying to changes master ");
((Coordinator) discovery).abdicateFrom(mast);
}
//find the node .
String masterId = clusterService.state().getNodes().getMasterNodeId();
DiscoveryNode mast = clusterService.state().getNodes().get(masterId);
Map<String, String> attr = mast.getAttributes();
logger.info("Master name is " + mast.getName() + "attributes" + attr);
// ToDo : Match zone values from attributes
if ( mast.getName() != "runTask-2" ) {
logger.info("changing master ");
ActionListener<ClearVotingConfigExclusionsResponse> listener3 = new ActionListener<>() {
@Override
public void onResponse(ClearVotingConfigExclusionsResponse clearVotingConfigExclusionsResponse) {
logger.info("remoed exclude di as well ");
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
};

ActionListener<AddVotingConfigExclusionsResponse> listener2 = new ActionListener<>() {

@Override
public void onResponse(AddVotingConfigExclusionsResponse addVotingConfigExclusionsResponse) {
logger.info("trying to changes master ");
ClearVotingConfigExclusionsRequest req = new ClearVotingConfigExclusionsRequest();
req.setWaitForRemoval(false);
clearAction.execute(req, listener3);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
};

exclusionsAction.execute(new AddVotingConfigExclusionsRequest(mast.getName()), listener2 );
logger.info("changed master ");
}


final int waitCount = getWaitCount(request);

if (request.waitForEvents() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ListenableFuture;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.discovery.Discovery;
Expand Down Expand Up @@ -528,58 +528,6 @@ public void abdicateTo(DiscoveryNode newClusterManager) {
// explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event
becomeCandidate("after abdicating to " + newClusterManager);
}

public void abdicateTo2(DiscoveryNode newClusterManager) {
synchronized (mutex) {
assert mode == Mode.LEADER : "expected to be leader on abdication but was " + mode;
assert newClusterManager.isMasterNode() : "should only abdicate to cluster-manager-eligible node but was " + newClusterManager;
final StartJoinRequest startJoinRequest = new StartJoinRequest(newClusterManager, Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.info("abdicating to {} with term {}", newClusterManager, startJoinRequest.getTerm());
getLastAcceptedState().nodes().mastersFirstStream().forEach(node -> {
if (isZen1Node(node) == false) {
joinHelper.sendStartJoinRequest(startJoinRequest, node);
}
});
// handling of start join messages on the local node will be dispatched to the generic thread-pool
assert mode == Mode.LEADER : "should still be leader after sending abdication messages " + mode;
// explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event
becomeCandidate("after abdicating to " + newClusterManager);
}
}

public void abdicateFrom(DiscoveryNode newClusterManager) {
//Exclude itself
clusterManagerService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {

VotingConfigExclusion votingConfigExclusion = new VotingConfigExclusion(newClusterManager);
Set<VotingConfigExclusion> resolvedExclusions = new HashSet<>();
resolvedExclusions.add(votingConfigExclusion);
final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(getLastAcceptedState().coordinationMetadata());
resolvedExclusions.forEach(builder::addVotingConfigExclusion);
final Metadata newMetadata = Metadata.builder(getLastAcceptedState().metadata()).coordinationMetadata(builder.build()).build();
final ClusterState newState = ClusterState.builder(getLastAcceptedState()).metadata(newMetadata).build();

return newState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.debug("Removed repository cleanup task [{}] from cluster state");
// explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event
synchronized (mutex) {
becomeCandidate("after abdicating to " + newClusterManager);
}
}

@Override
public void onFailure(String source, Exception e) {
logger.debug("reconfiguration failed", e);
}
});
}

private static boolean localNodeMayWinElection(ClusterState lastAcceptedState) {
final DiscoveryNode localNode = lastAcceptedState.nodes().getLocalNode();
assert localNode != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte
new ActionFilters(new HashSet<>()),
indexNameExpressionResolver,
new AllocationService(null, new TestGatewayAllocator(), null, null, null),
null,
null,
null
);
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
Expand Down

0 comments on commit 1e6d54a

Please sign in to comment.