Skip to content

Commit

Permalink
Implemented computation of segment replication stats at shard level (#…
Browse files Browse the repository at this point in the history
…17055)

* Implemented computation of segment replication stats at shard level

The method implemented here computes the segment replication stats at the shard level,
instead of relying on the primary shard to compute stats based on reports from its replicas.

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Updated style checks in the test

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Updated changelog

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* fixed style issues

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Fix the failing integration test

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Fix stylecheck

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Fixed the comments for the initial revision

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Updated to use System.nanoTime() for lag calculation

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Fixed the integration test for node stats

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Modified the version in the ReplicationCheckpoint for backward compatibility

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Added precomputation logic for the stats calculation

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Removed unwanted lines

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Clean up the maps when index closed

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Added a null check for the indexshard checkpoint

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* fix style checks

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Updated version and added bwc for RemoteSegmentMetadata

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Upated the javadoc comments

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Address comments PR

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Removed the latestReceivedCheckpoint map from SegmentReplicationTargetService

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Added granular locks for the concurrency of stats methods

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Style check fixes

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* Changes to maintain atomicity

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* spotlessApply

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* removed querying the remotestore when replication is in progress

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

* spotlessApply

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>

---------

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>
  • Loading branch information
vinaykpud authored Feb 27, 2025
1 parent 0ffed5e commit ee7fbbd
Show file tree
Hide file tree
Showing 34 changed files with 706 additions and 101 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce a setting to disable download of full cluster state from remote on term mismatch([#16798](https://github.com/opensearch-project/OpenSearch/pull/16798/))
- Added ability to retrieve value from DocValues in a flat_object filed([#16802](https://github.com/opensearch-project/OpenSearch/pull/16802))
- Improve performace of NumericTermAggregation by avoiding unnecessary sorting([#17252](https://github.com/opensearch-project/OpenSearch/pull/17252))
- Implemented computation of segment replication stats at shard level ([#17055](https://github.com/opensearch-project/OpenSearch/pull/17055))
- [Rule Based Auto-tagging] Add in-memory attribute value store ([#17342](https://github.com/opensearch-project/OpenSearch/pull/17342))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand All @@ -136,6 +137,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
import static org.mockito.Mockito.mock;

public class IndexShardIT extends OpenSearchSingleNodeTestCase {

Expand Down Expand Up @@ -716,7 +718,8 @@ public static final IndexShard newIndexShard(
null,
DefaultRemoteStoreSettings.INSTANCE,
false,
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting)
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting),
mock(Function.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,19 +404,17 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception {

for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats();
// primary node - should hold replication statistics
// primary node - do not have any replication statistics
if (nodeStats.getNode().getName().equals(primaryNode)) {
assertTrue(replicationStats.getMaxBytesBehind() == 0);
assertTrue(replicationStats.getTotalBytesBehind() == 0);
assertTrue(replicationStats.getMaxReplicationLag() == 0);
}
// replica nodes - should hold replication statistics
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
assertTrue(replicationStats.getMaxBytesBehind() > 0);
assertTrue(replicationStats.getTotalBytesBehind() > 0);
assertTrue(replicationStats.getMaxReplicationLag() > 0);
// 2 replicas so total bytes should be double of max
assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind());
}
// replica nodes - should hold empty replication statistics
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
assertEquals(0, replicationStats.getMaxBytesBehind());
assertEquals(0, replicationStats.getTotalBytesBehind());
assertEquals(0, replicationStats.getMaxReplicationLag());
}
}
// get replication statistics at index level
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.io;

/**
* Interface for factory to provide handler implementation for type {@link T}
* @param <T> The type of content to be read/written to stream
*
* @opensearch.internal
*/
public interface IndexIOStreamHandlerFactory<T> {

/**
* Implements logic to provide handler based on the stream versions
* @param version stream version
* @return Handler for reading/writing content streams to/from - {@link T}
*/
IndexIOStreamHandler<T> getHandler(int version);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,25 @@
public class VersionedCodecStreamWrapper<T> {
private static final Logger logger = LogManager.getLogger(VersionedCodecStreamWrapper.class);

// TODO This can be updated to hold a streamReadWriteHandlerFactory and get relevant handler based on the stream versions
private final IndexIOStreamHandler<T> indexIOStreamHandler;
private final IndexIOStreamHandlerFactory<T> indexIOStreamHandlerFactory;
private final int minVersion;
private final int currentVersion;
private final String codec;

/**
* @param indexIOStreamHandler handler to read/write stream from T
* @param indexIOStreamHandlerFactory factory for providing handler to read/write stream from T
* @param minVersion earliest supported version of the stream
* @param currentVersion latest supported version of the stream
* @param codec: stream codec
*/
public VersionedCodecStreamWrapper(IndexIOStreamHandler<T> indexIOStreamHandler, int currentVersion, String codec) {
this.indexIOStreamHandler = indexIOStreamHandler;
public VersionedCodecStreamWrapper(
IndexIOStreamHandlerFactory<T> indexIOStreamHandlerFactory,
int minVersion,
int currentVersion,
String codec
) {
this.indexIOStreamHandlerFactory = indexIOStreamHandlerFactory;
this.minVersion = minVersion;
this.currentVersion = currentVersion;
this.codec = codec;
}
Expand Down Expand Up @@ -87,7 +94,7 @@ public void writeStream(IndexOutput indexOutput, T content) throws IOException {
*/
private int checkHeader(IndexInput indexInput) throws IOException {
// TODO Once versioning strategy is decided we'll add support for min/max supported versions
return CodecUtil.checkHeader(indexInput, this.codec, this.currentVersion, this.currentVersion);
return CodecUtil.checkHeader(indexInput, this.codec, minVersion, this.currentVersion);
}

/**
Expand Down Expand Up @@ -120,8 +127,6 @@ private void writeFooter(IndexOutput indexOutput) throws IOException {
* @param version stream content version
*/
private IndexIOStreamHandler<T> getHandlerForVersion(int version) {
// TODO implement factory and pick relevant handler based on version.
// It should also take into account min and max supported versions
return this.indexIOStreamHandler;
return this.indexIOStreamHandlerFactory.getHandler(version);
}
}
10 changes: 7 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
Expand Down Expand Up @@ -652,7 +653,8 @@ public IndexService newIndexService(
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings,
(s) -> {}
(s) -> {},
shardId -> ReplicationStats.empty()
);
}

Expand All @@ -678,7 +680,8 @@ public IndexService newIndexService(
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator
Consumer<IndexShard> replicator,
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -740,7 +743,8 @@ public IndexService newIndexService(
remoteStoreSettings,
fileCache,
compositeIndexSettings,
replicator
replicator,
segmentReplicationStatsProvider
);
success = true;
return indexService;
Expand Down
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -235,7 +236,8 @@ public IndexService(
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator
Consumer<IndexShard> replicator,
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -322,6 +324,7 @@ public IndexService(
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -398,7 +401,8 @@ public IndexService(
remoteStoreSettings,
null,
null,
s -> {}
s -> {},
(shardId) -> ReplicationStats.empty()
);
}

Expand Down Expand Up @@ -694,7 +698,8 @@ protected void closeInternal() {
recoverySettings,
remoteStoreSettings,
seedRemote,
discoveryNodes
discoveryNodes,
segmentReplicationStatsProvider
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public ReplicationStats(StreamInput in) throws IOException {
this.maxReplicationLag = in.readVLong();
}

public static ReplicationStats empty() {
return new ReplicationStats();
}

public ReplicationStats() {

}
Expand Down
20 changes: 8 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ Runnable getGlobalCheckpointSyncer() {
*/
private final ShardMigrationState shardMigrationState;
private DiscoveryNodes discoveryNodes;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -391,7 +392,8 @@ public IndexShard(
final RecoverySettings recoverySettings,
final RemoteStoreSettings remoteStoreSettings,
boolean seedRemote,
final DiscoveryNodes discoveryNodes
final DiscoveryNodes discoveryNodes,
final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -493,6 +495,7 @@ public boolean shouldCache(Query query) {
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
this.discoveryNodes = discoveryNodes;
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3233,17 +3236,10 @@ public Set<SegmentReplicationShardStats> getReplicationStatsForTrackedReplicas()
}

public ReplicationStats getReplicationStats() {
if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary()) {
final Set<SegmentReplicationShardStats> stats = getReplicationStatsForTrackedReplicas();
long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L);
long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum();
long maxReplicationLag = stats.stream()
.mapToLong(SegmentReplicationShardStats::getCurrentReplicationLagMillis)
.max()
.orElse(0L);
return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag);
}
return new ReplicationStats();
if (indexSettings.isSegRepEnabledOrRemoteNode() && !routingEntry().primary()) {
return segmentReplicationStatsProvider.apply(shardId);
}
return ReplicationStats.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandlerFactory;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -104,7 +104,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
private Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore;

private static final VersionedCodecStreamWrapper<RemoteSegmentMetadata> metadataStreamWrapper = new VersionedCodecStreamWrapper<>(
new RemoteSegmentMetadataHandler(),
new RemoteSegmentMetadataHandlerFactory(),
RemoteSegmentMetadata.VERSION_ONE,
RemoteSegmentMetadata.CURRENT_VERSION,
RemoteSegmentMetadata.METADATA_CODEC
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@
*/
@PublicApi(since = "2.6.0")
public class RemoteSegmentMetadata {

public static final int VERSION_ONE = 1;

public static final int VERSION_TWO = 2;

/**
* Latest supported version of metadata
*/
public static final int CURRENT_VERSION = 1;
public static final int CURRENT_VERSION = VERSION_TWO;
/**
* Metadata codec
*/
Expand Down Expand Up @@ -106,18 +111,30 @@ public static Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> f
);
}

/**
* Write always writes with the latest version of the RemoteSegmentMetadata
* @param out file output stream which will store stream content
* @throws IOException in case there is a problem writing the file
*/
public void write(IndexOutput out) throws IOException {
out.writeMapOfStrings(toMapOfStrings());
writeCheckpointToIndexOutput(replicationCheckpoint, out);
out.writeLong(segmentInfosBytes.length);
out.writeBytes(segmentInfosBytes, segmentInfosBytes.length);
}

public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException {
/**
* Read can happen in the upgraded version of replica which needs to support all versions of RemoteSegmentMetadata
* @param indexInput file input stream
* @param version version of the RemoteSegmentMetadata
* @return {@code RemoteSegmentMetadata}
* @throws IOException in case there is a problem reading from the file input stream
*/
public static RemoteSegmentMetadata read(IndexInput indexInput, int version) throws IOException {
Map<String, String> metadata = indexInput.readMapOfStrings();
final Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap = RemoteSegmentMetadata
.fromMapOfStrings(metadata);
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap);
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap, version);
int byteArraySize = (int) indexInput.readLong();
byte[] segmentInfosBytes = new byte[byteArraySize];
indexInput.readBytes(segmentInfosBytes, 0, byteArraySize);
Expand All @@ -136,11 +153,13 @@ public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicatio
out.writeLong(replicationCheckpoint.getSegmentInfosVersion());
out.writeLong(replicationCheckpoint.getLength());
out.writeString(replicationCheckpoint.getCodec());
out.writeLong(replicationCheckpoint.getCreatedTimeStamp());
}

private static ReplicationCheckpoint readCheckpointFromIndexInput(
IndexInput in,
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap,
int version
) throws IOException {
return new ReplicationCheckpoint(
new ShardId(new Index(in.readString(), in.readString()), in.readVInt()),
Expand All @@ -149,7 +168,8 @@ private static ReplicationCheckpoint readCheckpointFromIndexInput(
in.readLong(),
in.readLong(),
in.readString(),
toStoreFileMetadata(uploadedSegmentMetadataMap)
toStoreFileMetadata(uploadedSegmentMetadataMap),
version >= VERSION_TWO ? in.readLong() : 0
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@
* @opensearch.internal
*/
public class RemoteSegmentMetadataHandler implements IndexIOStreamHandler<RemoteSegmentMetadata> {

private final int version;

public RemoteSegmentMetadataHandler(int version) {
this.version = version;
}

/**
* Reads metadata content from metadata file input stream and parsed into {@link RemoteSegmentMetadata}
* @param indexInput metadata file input stream with {@link IndexInput#getFilePointer()} pointing to metadata content
* @return {@link RemoteSegmentMetadata}
*/
@Override
public RemoteSegmentMetadata readContent(IndexInput indexInput) throws IOException {
return RemoteSegmentMetadata.read(indexInput);
return RemoteSegmentMetadata.read(indexInput, version);
}

/**
Expand Down
Loading

0 comments on commit ee7fbbd

Please sign in to comment.