Skip to content

Commit 613f4aa

Browse files
authored
[Remote Translog] Trimming based on Remote Segment Store (opensearch-project#7383)
* [Remote Translog] Set min referenced as per remote segment store only when enabled. Add IT for failover using remote translog as well Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent a1e42b1 commit 613f4aa

File tree

13 files changed

+308
-64
lines changed

13 files changed

+308
-64
lines changed

codecov.yml

+3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
codecov:
22
require_ci_to_pass: yes
33

4+
ignore:
5+
- "test"
6+
47
coverage:
58
precision: 2
69
round: down
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.junit.After;
12+
import org.junit.Before;
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.FeatureFlags;
16+
import org.opensearch.index.IndexModule;
17+
import org.opensearch.index.IndexSettings;
18+
import org.opensearch.indices.replication.common.ReplicationType;
19+
import org.opensearch.plugins.Plugin;
20+
import org.opensearch.test.OpenSearchIntegTestCase;
21+
import org.opensearch.test.transport.MockTransportService;
22+
23+
import java.nio.file.Path;
24+
import java.util.Collection;
25+
26+
import static java.util.Arrays.asList;
27+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
28+
29+
public class RemoteStoreBaseIT extends OpenSearchIntegTestCase {
30+
protected static final String REPOSITORY_NAME = "test-remore-store-repo";
31+
protected static final int SHARD_COUNT = 1;
32+
protected static final int REPLICA_COUNT = 1;
33+
34+
@Override
35+
protected Collection<Class<? extends Plugin>> nodePlugins() {
36+
return asList(MockTransportService.TestPlugin.class);
37+
}
38+
39+
@Override
40+
protected boolean addMockInternalEngine() {
41+
return false;
42+
}
43+
44+
@Override
45+
protected Settings featureFlagSettings() {
46+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
47+
}
48+
49+
public Settings indexSettings() {
50+
return Settings.builder()
51+
.put(super.indexSettings())
52+
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
53+
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
54+
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
55+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
56+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
57+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
58+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
59+
.build();
60+
}
61+
62+
@Before
63+
public void setup() {
64+
internalCluster().startClusterManagerOnlyNode();
65+
Path absolutePath = randomRepoPath().toAbsolutePath();
66+
assertAcked(
67+
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
68+
);
69+
}
70+
71+
@After
72+
public void teardown() {
73+
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
74+
}
75+
76+
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

+1-37
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88

99
package org.opensearch.remotestore;
1010

11-
import org.junit.After;
12-
import org.junit.Before;
1311
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
1412
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
1513
import org.opensearch.action.index.IndexResponse;
@@ -18,17 +16,13 @@
1816
import org.opensearch.cluster.routing.RecoverySource;
1917
import org.opensearch.common.UUIDs;
2018
import org.opensearch.common.settings.Settings;
21-
import org.opensearch.common.util.FeatureFlags;
22-
import org.opensearch.index.IndexModule;
2319
import org.opensearch.indices.recovery.RecoveryState;
24-
import org.opensearch.indices.replication.common.ReplicationType;
2520
import org.opensearch.plugins.Plugin;
2621
import org.opensearch.test.InternalTestCluster;
2722
import org.opensearch.test.OpenSearchIntegTestCase;
2823
import org.opensearch.test.transport.MockTransportService;
2924

3025
import java.io.IOException;
31-
import java.nio.file.Path;
3226
import java.util.Arrays;
3327
import java.util.Collection;
3428
import java.util.HashMap;
@@ -40,9 +34,8 @@
4034
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
4135

4236
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
43-
public class RemoteStoreIT extends OpenSearchIntegTestCase {
37+
public class RemoteStoreIT extends RemoteStoreBaseIT {
4438

45-
private static final String REPOSITORY_NAME = "test-remore-store-repo";
4639
private static final String INDEX_NAME = "remote-store-test-idx-1";
4740
private static final String TOTAL_OPERATIONS = "total-operations";
4841
private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations";
@@ -62,13 +55,8 @@ public Settings indexSettings() {
6255
private Settings remoteStoreIndexSettings(int numberOfReplicas) {
6356
return Settings.builder()
6457
.put(super.indexSettings())
65-
.put("index.refresh_interval", "300s")
6658
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
6759
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
68-
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
69-
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
70-
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
71-
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
7260
.build();
7361
}
7462

@@ -80,30 +68,6 @@ private Settings remoteTranslogIndexSettings(int numberOfReplicas) {
8068
.build();
8169
}
8270

83-
@Override
84-
protected boolean addMockInternalEngine() {
85-
return false;
86-
}
87-
88-
@Override
89-
protected Settings featureFlagSettings() {
90-
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
91-
}
92-
93-
@Before
94-
public void setup() {
95-
internalCluster().startClusterManagerOnlyNode();
96-
Path absolutePath = randomRepoPath().toAbsolutePath();
97-
assertAcked(
98-
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
99-
);
100-
}
101-
102-
@After
103-
public void teardown() {
104-
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
105-
}
106-
10771
private IndexResponse indexSingleDoc() {
10872
return client().prepareIndex(INDEX_NAME)
10973
.setId(UUIDs.randomBase64UUID())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import com.carrotsearch.randomizedtesting.RandomizedTest;
12+
import org.opensearch.action.admin.indices.close.CloseIndexResponse;
13+
import org.opensearch.cluster.ClusterState;
14+
import org.opensearch.cluster.metadata.IndexMetadata;
15+
import org.opensearch.cluster.node.DiscoveryNode;
16+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
17+
import org.opensearch.cluster.routing.ShardRouting;
18+
import org.opensearch.common.settings.Settings;
19+
import org.opensearch.test.BackgroundIndexer;
20+
import org.opensearch.test.InternalTestCluster;
21+
import org.opensearch.test.OpenSearchIntegTestCase;
22+
23+
import java.util.Locale;
24+
25+
import static org.hamcrest.Matchers.equalTo;
26+
import static org.hamcrest.Matchers.is;
27+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
28+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
29+
30+
@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0)
31+
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIT {
32+
private int shard_count = 5;
33+
34+
@Override
35+
public Settings indexSettings() {
36+
return Settings.builder()
37+
.put(super.indexSettings())
38+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shard_count)
39+
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
40+
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
41+
.build();
42+
}
43+
44+
public void testPromoteReplicaToPrimary() throws Exception {
45+
internalCluster().startNode();
46+
internalCluster().startNode();
47+
final String indexName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
48+
shard_count = scaledRandomIntBetween(1, 5);
49+
createIndex(indexName);
50+
int numOfDocs = 0;
51+
int numIter = scaledRandomIntBetween(0, 10);
52+
for (int i = 0; i < numIter; i++) {
53+
final int numOfDoc = scaledRandomIntBetween(0, 200);
54+
logger.info("num of docs in iter {} {}", numOfDoc, i);
55+
if (numOfDoc > 0) {
56+
try (
57+
BackgroundIndexer indexer = new BackgroundIndexer(
58+
indexName,
59+
"_doc",
60+
client(),
61+
numOfDoc,
62+
RandomizedTest.scaledRandomIntBetween(2, 5),
63+
false,
64+
null
65+
)
66+
) {
67+
indexer.setUseAutoGeneratedIDs(true);
68+
indexer.start(numOfDoc);
69+
waitForIndexed(numOfDoc, indexer);
70+
numOfDocs += numOfDoc;
71+
indexer.stopAndAwaitStopped();
72+
if (random().nextBoolean()) {
73+
// 90% refresh + 10% flush
74+
if (random().nextInt(10) != 0) {
75+
refresh(indexName);
76+
} else {
77+
flush(indexName);
78+
}
79+
}
80+
}
81+
}
82+
}
83+
84+
ensureGreen(indexName);
85+
86+
// sometimes test with a closed index
87+
final IndexMetadata.State indexState = randomFrom(IndexMetadata.State.OPEN, IndexMetadata.State.CLOSE);
88+
if (indexState == IndexMetadata.State.CLOSE) {
89+
CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose(indexName).get();
90+
assertThat("close index not acked - " + closeIndexResponse, closeIndexResponse.isAcknowledged(), equalTo(true));
91+
ensureGreen(indexName);
92+
}
93+
94+
// pick up a data node that contains a random primary shard
95+
ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
96+
final int numShards = state.metadata().index(indexName).getNumberOfShards();
97+
final ShardRouting primaryShard = state.routingTable().index(indexName).shard(randomIntBetween(0, numShards - 1)).primaryShard();
98+
final DiscoveryNode randomNode = state.nodes().resolveNode(primaryShard.currentNodeId());
99+
100+
// stop the random data node, all remaining shards are promoted to primaries
101+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(randomNode.getName()));
102+
ensureYellowAndNoInitializingShards(indexName);
103+
104+
state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
105+
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(indexName)) {
106+
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
107+
assertThat(shardRouting + " should be promoted as a primary", shardRouting.primary(), is(true));
108+
}
109+
}
110+
111+
if (indexState == IndexMetadata.State.CLOSE) {
112+
assertAcked(client().admin().indices().prepareOpen(indexName));
113+
ensureYellowAndNoInitializingShards(indexName);
114+
}
115+
refresh(indexName);
116+
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
117+
}
118+
}

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.opensearch.index.translog;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.opensearch.common.SetOnce;
1214
import org.opensearch.core.util.FileSystemUtils;
1315
import org.opensearch.common.lease.Releasable;
@@ -47,6 +49,7 @@
4749
*/
4850
public class RemoteFsTranslog extends Translog {
4951

52+
private static final Logger logger = LogManager.getLogger(RemoteFsTranslog.class);
5053
private final BlobStoreRepository blobStoreRepository;
5154
private final TranslogTransferManager translogTransferManager;
5255
private final FileTransferTracker fileTransferTracker;
@@ -82,7 +85,6 @@ public RemoteFsTranslog(
8285
this.primaryModeSupplier = primaryModeSupplier;
8386
fileTransferTracker = new FileTransferTracker(shardId);
8487
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker);
85-
8688
try {
8789
download(translogTransferManager, location);
8890
Checkpoint checkpoint = readCheckpoint(location);
@@ -131,6 +133,7 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
131133
}
132134

133135
public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
136+
logger.info("Downloading translog files from remote for shard {} ", translogTransferManager.getShardId());
134137
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
135138
if (translogMetadata != null) {
136139
if (Files.notExists(location)) {
@@ -152,6 +155,7 @@ public static void download(TranslogTransferManager translogTransferManager, Pat
152155
location.resolve(Translog.CHECKPOINT_FILE_NAME)
153156
);
154157
}
158+
logger.info("Downloaded translog files from remote for shard {} ", translogTransferManager.getShardId());
155159
}
156160

157161
public static TranslogTransferManager buildTranslogTransferManager(
@@ -161,6 +165,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
161165
FileTransferTracker fileTransferTracker
162166
) {
163167
return new TranslogTransferManager(
168+
shardId,
164169
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
165170
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
166171
fileTransferTracker
@@ -331,8 +336,9 @@ protected long getMinReferencedGen() throws IOException {
331336
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
332337
long minReferencedGen = Math.min(
333338
deletionPolicy.minTranslogGenRequired(readers, current),
334-
minGenerationForSeqNo(Math.min(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, minSeqNoToKeep), current, readers)
339+
minGenerationForSeqNo(minSeqNoToKeep, current, readers)
335340
);
341+
336342
assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of ["
337343
+ minReferencedGen
338344
+ "] but the lowest gen available is ["

server/src/main/java/org/opensearch/index/translog/Translog.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,8 @@ TranslogWriter createWriter(
522522
primaryTermSupplier.getAsLong(),
523523
tragedy,
524524
persistedSequenceNumberConsumer,
525-
bigArrays
525+
bigArrays,
526+
indexSettings.isRemoteTranslogStoreEnabled()
526527
);
527528
} catch (final IOException e) {
528529
throw new TranslogException(shardId, "failed to create new translog file", e);
@@ -2025,7 +2026,8 @@ public static String createEmptyTranslog(
20252026
seqNo -> {
20262027
throw new UnsupportedOperationException();
20272028
},
2028-
BigArrays.NON_RECYCLING_INSTANCE
2029+
BigArrays.NON_RECYCLING_INSTANCE,
2030+
null
20292031
);
20302032
writer.close();
20312033
return uuid;

0 commit comments

Comments
 (0)