Skip to content

Commit

Permalink
Merge branch 'main' into decommission/metadata-bug
Browse files Browse the repository at this point in the history
  • Loading branch information
imRishN authored Oct 18, 2022
2 parents 0e89f8b + 1d65485 commit dd8b4ec
Show file tree
Hide file tree
Showing 10 changed files with 1,109 additions and 951 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix weighted routing metadata deserialization error on process restart ([#4691](https://github.com/opensearch-project/OpenSearch/pull/4691))
- Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732))
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761))
### Deprecated
### Removed
- Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568))
Expand Down Expand Up @@ -138,6 +139,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774))
- Update version check after backport ([4786](https://github.com/opensearch-project/OpenSearch/pull/4786))
- Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800))
- Fix recovery path for searchable snapshots ([4813](https://github.com/opensearch-project/OpenSearch/pull/4813))

### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,24 @@ protected boolean addMockInternalEngine() {
}

public void testCreateSearchableSnapshot() throws Exception {
final int numReplicasIndex1 = randomIntBetween(1, 4);
final int numReplicasIndex2 = randomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
final Client client = client();
createRepository("test-repo", "fs");
createIndex(
"test-idx-1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex1))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
createIndex(
"test-idx-2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex2))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
ensureGreen();
indexRandomDocs("test-idx-1", 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.cluster.routing.allocation;

import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -27,11 +28,11 @@ public AllocationConstraints() {
}

class ConstraintParams {
private BalancedShardsAllocator.Balancer balancer;
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
private String index;

ConstraintParams(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
this.balancer = balancer;
this.node = node;
this.index = index;
Expand All @@ -50,7 +51,7 @@ class ConstraintParams {
* This weight function is used only in case of unassigned shards to avoid overloading a newly added node.
* Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this function.
*/
public long weight(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
int constraintsBreached = 0;
ConstraintParams params = new ConstraintParams(balancer, node, index);
for (Predicate<ConstraintParams> predicate : constraintPredicates) {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation.allocator;

import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.MoveDecision;

/**
* <p>
* A {@link ShardsBalancer} helps the {@link BalancedShardsAllocator} to perform allocation and balancing
* operations on the cluster.
* </p>
*
* @opensearch.internal
*/
public abstract class ShardsBalancer {

/**
* Performs allocation of unassigned shards on nodes within the cluster.
*/
abstract void allocateUnassigned();

/**
* Moves shards that cannot be allocated to a node anymore.
*/
abstract void moveShards();

/**
* Balances the nodes on the cluster model.
*/
abstract void balance();

/**
* Make a decision for allocating an unassigned shard.
* @param shardRouting the shard for which the decision has to be made
* @return the allocation decision
*/
abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting);

/**
* Makes a decision on whether to move a started shard to another node.
* @param shardRouting the shard for which the decision has to be made
* @return a move decision for the shard
*/
abstract MoveDecision decideMove(ShardRouting shardRouting);

/**
* Makes a decision about moving a single shard to a different node to form a more
* optimally balanced cluster.
* @param shardRouting the shard for which the move decision has to be made
* @return a move decision for the shard
*/
abstract MoveDecision decideRebalance(ShardRouting shardRouting);

/**
* Returns the average of shards per node for the given index
*/
public float avgShardsPerNode() {
return Float.MAX_VALUE;
}

/**
* Returns the global average of shards per node
*/
public float avgShardsPerNode(String index) {
return Float.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.mapper.MapperException;
Expand Down Expand Up @@ -244,16 +245,18 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled);
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = IndexModule.Type.REMOTE_SNAPSHOT.match(indexShard.indexSettings());
final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog) == false;
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog);
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
startRequest = getStartRecoveryRequest(
logger,
clusterService.localNode(),
recoveryTarget,
startingSeqNo,
!remoteTranslogEnabled
verifyTranslog
);
requestToSend = startRequest;
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.IndexModule;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.seqno.ReplicationTracker;
Expand Down Expand Up @@ -355,10 +356,12 @@ public void cleanFiles(
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata);

// If Segment Replication is enabled, we need to reuse the primary's translog UUID already stored in the index.
// With Segrep, replicas should never create their own commit points. This ensures the index and xlog share the same
// UUID without the extra step to associate the index with a new xlog.
if (indexShard.indexSettings().isSegRepEnabled()) {
// Replicas for segment replication or remote snapshot indices do not create
// their own commit points and therefore do not modify the commit user data
// in their store. In these cases, reuse the primary's translog UUID.
final boolean reuseTranslogUUID = indexShard.indexSettings().isSegRepEnabled()
|| IndexModule.Type.REMOTE_SNAPSHOT.match(indexShard.indexSettings());
if (reuseTranslogUUID) {
final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY);
Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.LocalShardsBalancer;
import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;

Expand Down Expand Up @@ -45,7 +47,7 @@ public void testSettings() {
* for IndexShardPerNode constraint satisfied and breached.
*/
public void testIndexShardsPerNodeConstraint() {
BalancedShardsAllocator.Balancer balancer = mock(BalancedShardsAllocator.Balancer.class);
ShardsBalancer balancer = mock(LocalShardsBalancer.class);
BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class);
AllocationConstraints constraints = new AllocationConstraints();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer;
import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer;
import org.opensearch.cluster.routing.allocation.decider.AllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
Expand All @@ -65,7 +65,7 @@
import static org.hamcrest.Matchers.startsWith;

/**
* Tests for balancing a single shard, see {@link Balancer#decideRebalance(ShardRouting)}.
* Tests for balancing a single shard, see {@link ShardsBalancer#decideRebalance(ShardRouting)}.
*/
public class BalancedSingleShardTests extends OpenSearchAllocationTestCase {

Expand Down

0 comments on commit dd8b4ec

Please sign in to comment.