diff --git a/CHANGELOG.md b/CHANGELOG.md index bc20d4fcc0272..62d6fc04a17e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233) - Increase force merge threads to 1/8th of cores [#17255](https://github.com/opensearch-project/OpenSearch/pull/17255) +- TieredSpilloverCache took-time threshold now guards heap tier as well as disk tier [#17190](https://github.com/opensearch-project/OpenSearch/pull/17190) ### Deprecated diff --git a/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java index d58e36c036510..3fac345fc6e00 100644 --- a/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java @@ -118,7 +118,7 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio ); } - public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception { + public void testWithDynamicDiskTookTimePolicyWithMultiSegments() throws Exception { int numberOfSegments = getNumberOfSegments(); int onHeapCacheSizePerSegmentInBytes = 800; // Per cache entry below is around ~700 bytes, so keeping this // just a bit higher so that each segment can atleast hold 1 entry. @@ -139,12 +139,13 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception { ) .get() ); - // Set a very high value for took time policy so that no items evicted from onHeap cache are spilled + // Set a very high value for took time disk policy so that no items evicted from onHeap cache are spilled // to disk. And then hit requests so that few items are cached into cache. ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( Settings.builder() .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), new TimeValue(100, TimeUnit.SECONDS) ) .build() @@ -182,12 +183,13 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception { assertEquals(0, requestCacheStats.getHitCount()); long lastEvictionSeen = requestCacheStats.getEvictions(); - // Decrease took time policy to zero so that disk cache also comes into play. Now we should be able + // Decrease disk took time policy to zero so that disk cache also comes into play. Now we should be able // to cache all entries. updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( Settings.builder() .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), new TimeValue(0, TimeUnit.MILLISECONDS) ) .build() @@ -206,7 +208,7 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception { assertEquals(lastEvictionSeen, requestCacheStats.getEvictions()); } - public void testWithDynamicTookTimePolicy() throws Exception { + public void testWithDynamicHeapTookTimePolicy() throws Exception { int onHeapCacheSizeInBytes = 2000; internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build()); Client client = client(); @@ -224,8 +226,7 @@ public void testWithDynamicTookTimePolicy() throws Exception { ) .get() ); - // Step 1 : Set a very high value for took time policy so that no items evicted from onHeap cache are spilled - // to disk. And then hit requests so that few items are cached into cache. + // Set a high threshold for the overall cache took time policy so nothing will enter the cache. ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( Settings.builder() .put( @@ -245,6 +246,57 @@ public void testWithDynamicTookTimePolicy() throws Exception { ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); long perQuerySizeInCacheInBytes = -1; + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(0, requestCacheStats.getEvictions()); + } + + public void testWithDynamicDiskTookTimePolicy() throws Exception { + int onHeapCacheSizeInBytes = 2000; + internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build()); + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + ) + .get() + ); + // Step 1 : Set a very high value for disk took time policy so that no items evicted from onHeap cache are spilled + // to disk. And then hit requests so that few items are cached into cache. + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put( + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), + new TimeValue(100, TimeUnit.SECONDS) + ) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + int numberOfIndexedItems = randomIntBetween(6, 10); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + long perQuerySizeInCacheInBytes = -1; for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { SearchResponse resp = client.prepareSearch("index") .setRequestCache(true) @@ -282,12 +334,13 @@ public void testWithDynamicTookTimePolicy() throws Exception { assertEquals(0, requestCacheStats.getHitCount()); long lastEvictionSeen = requestCacheStats.getEvictions(); - // Step 3: Decrease took time policy to zero so that disk cache also comes into play. Now we should be able + // Step 3: Decrease disk took time policy to zero so that disk cache also comes into play. Now we should be able // to cache all entries. updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( Settings.builder() .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), new TimeValue(0, TimeUnit.MILLISECONDS) ) .build() @@ -352,11 +405,12 @@ public void testInvalidationWithIndicesRequestCache() throws Exception { ) .get() ); - // Update took time policy to zero so that all entries are eligible to be cached on disk. + // Update disk took time policy to zero so that all entries are eligible to be cached on disk. ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( Settings.builder() .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), new TimeValue(0, TimeUnit.MILLISECONDS) ) .build() @@ -437,11 +491,12 @@ public void testWithExplicitCacheClear() throws Exception { ) .get() ); - // Update took time policy to zero so that all entries are eligible to be cached on disk. + // Update disk took time policy to zero so that all entries are eligible to be cached on disk. ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( Settings.builder() .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), new TimeValue(0, TimeUnit.MILLISECONDS) ) .build() @@ -512,11 +567,12 @@ public void testWithDynamicDiskCacheSetting() throws Exception { ) .get() ); - // Update took time policy to zero so that all entries are eligible to be cached on disk. + // Update disk took time policy to zero so that all entries are eligible to be cached on disk. ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( Settings.builder() .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), new TimeValue(0, TimeUnit.MILLISECONDS) ) .build() diff --git a/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java index a858e94ad1609..b45355bfe63ac 100644 --- a/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java @@ -62,16 +62,7 @@ protected Collection> nodePlugins() { * Test aggregating by indices */ public void testIndicesLevelAggregation() throws Exception { - internalCluster().startNodes( - 1, - Settings.builder() - .put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1)) - .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), - new TimeValue(0, TimeUnit.SECONDS) - ) - .build() - ); + startNodesDefaultSettings(); Client client = client(); Map values = setupCacheForAggregationTests(client); @@ -115,16 +106,7 @@ public void testIndicesLevelAggregation() throws Exception { * Test aggregating by indices and tier */ public void testIndicesAndTierLevelAggregation() throws Exception { - internalCluster().startNodes( - 1, - Settings.builder() - .put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1)) - .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), - new TimeValue(0, TimeUnit.SECONDS) - ) - .build() - ); + startNodesDefaultSettings(); Client client = client(); Map values = setupCacheForAggregationTests(client); @@ -195,16 +177,7 @@ public void testIndicesAndTierLevelAggregation() throws Exception { * Test aggregating by tier only */ public void testTierLevelAggregation() throws Exception { - internalCluster().startNodes( - 1, - Settings.builder() - .put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1)) - .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), - new TimeValue(0, TimeUnit.SECONDS) - ) - .build() - ); + startNodesDefaultSettings(); Client client = client(); Map values = setupCacheForAggregationTests(client); // Get values for tiers alone and check they add correctly across indices @@ -236,16 +209,7 @@ public void testTierLevelAggregation() throws Exception { } public void testInvalidLevelsAreIgnored() throws Exception { - internalCluster().startNodes( - 1, - Settings.builder() - .put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments())) - .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), - new TimeValue(0, TimeUnit.SECONDS) - ) - .build() - ); + startNodesDefaultSettings(); Client client = client(); Map values = setupCacheForAggregationTests(client); @@ -287,16 +251,7 @@ public void testInvalidLevelsAreIgnored() throws Exception { * Check the new stats API returns the same values as the old stats API. */ public void testStatsMatchOldApi() throws Exception { - internalCluster().startNodes( - 1, - Settings.builder() - .put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments())) - .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), - new TimeValue(0, TimeUnit.SECONDS) - ) - .build() - ); + startNodesDefaultSettings(); String index = "index"; Client client = client(); startIndex(client, index); @@ -354,7 +309,12 @@ public void testStatsWithMultipleSegments() throws Exception { .put(defaultSettings(heap_cache_size_per_segment * numberOfSegments + "B", numberOfSegments)) .put( TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), - new TimeValue(0, TimeUnit.SECONDS) + TimeValue.ZERO + ) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), + TimeValue.ZERO ) .build() ); @@ -429,6 +389,11 @@ public void testClosingShard() throws Exception { TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), new TimeValue(0, TimeUnit.SECONDS) ) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) .put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1)) .build() ); @@ -631,4 +596,22 @@ private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client, NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats(); return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE); } + + private void startNodesDefaultSettings() { + internalCluster().startNodes( + 1, + Settings.builder() + .put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + TimeValue.ZERO + ) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + .getKey(), + TimeValue.ZERO + ) + .build() + ); + } } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java index 4bc26803acf4c..620b5597086f4 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java @@ -13,16 +13,14 @@ package org.opensearch.cache.common.policy; -import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; import java.util.function.Function; import java.util.function.Predicate; -import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; - /** * A cache tier policy which accepts queries whose took time is greater than some threshold. * The threshold should be set to approximately the time it takes to get a result from the cache tier. @@ -46,20 +44,20 @@ public class TookTimePolicy implements Predicate { * @param threshold the threshold * @param cachedResultParser the function providing policy values * @param clusterSettings cluster settings - * @param cacheType cache type + * @param targetSetting the cluster setting to register a consumer with */ public TookTimePolicy( TimeValue threshold, Function cachedResultParser, ClusterSettings clusterSettings, - CacheType cacheType + Setting targetSetting ) { if (threshold.compareTo(TimeValue.ZERO) < 0) { throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep()); } this.threshold = threshold; this.cachedResultParser = cachedResultParser; - clusterSettings.addSettingsUpdateConsumer(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType), this::setThreshold); + clusterSettings.addSettingsUpdateConsumer(targetSetting, this::setThreshold); } private void setThreshold(TimeValue threshold) { @@ -72,6 +70,10 @@ private void setThreshold(TimeValue threshold) { * @return whether to admit the data */ public boolean test(V data) { + if (threshold.equals(TimeValue.ZERO)) { + // Skip parsing the took time if this threshold is zero. + return true; + } long tookTimeNanos; try { tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos(); diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 9879235812377..d968e61cffcff 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -53,6 +53,8 @@ import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_SIZE; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK; import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP; import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE; @@ -145,9 +147,12 @@ static class TieredSpilloverCacheSegment implements ICache { ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock()); private final Map, TierInfo> caches; - + // Policies guarding access to the cache overall. private final List> policies; + // Policies guarding access to the disk tier. + private final List> diskPolicies; + private final TieredSpilloverCacheStatsHolder statsHolder; private final long onHeapCacheMaxWeight; @@ -157,7 +162,7 @@ static class TieredSpilloverCacheSegment implements ICache { * This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value * only once. */ - Map, CompletableFuture, V>>> completableFutureMap = new ConcurrentHashMap<>(); + Map, CompletableFuture, V>, Boolean>>> completableFutureMap = new ConcurrentHashMap<>(); TieredSpilloverCacheSegment( Builder builder, @@ -220,7 +225,8 @@ static class TieredSpilloverCacheSegment implements ICache { cacheListMap.put(onHeapCache, new TierInfo(true, TIER_DIMENSION_VALUE_ON_HEAP)); cacheListMap.put(diskCache, new TierInfo(isDiskCacheEnabled, TIER_DIMENSION_VALUE_DISK)); this.caches = Collections.synchronizedMap(cacheListMap); - this.policies = builder.policies; // Will never be null; builder initializes it to an empty list + this.policies = builder.policies; + this.diskPolicies = builder.diskPolicies; // Will never be null; builder initializes it to an empty list this.onHeapCacheMaxWeight = onHeapCacheSizeInBytes; this.diskCacheMaxWeight = diskCacheSizeInBytes; } @@ -255,18 +261,19 @@ public V get(ICacheKey key) { public void put(ICacheKey key, V value) { // First check in case the key is already present in either of tiers. Tuple cacheValueTuple = getValueFromTieredCache(true).apply(key); - if (cacheValueTuple == null) { - // In case it is not present in any tier, put it inside onHeap cache by default. - try (ReleasableLock ignore = writeLock.acquire()) { - onHeapCache.put(key, value); - } - updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value); - } else { - // Put it inside desired tier. - try (ReleasableLock ignore = writeLock.acquire()) { - for (Map.Entry, TierInfo> entry : this.caches.entrySet()) { - if (cacheValueTuple.v2().equals(entry.getValue().tierName)) { - entry.getKey().put(key, value); + if (evaluatePoliciesList(value, policies)) { + if (cacheValueTuple == null) { + // In case it is not present in any tier, put it inside onHeap cache by default. + try (ReleasableLock ignore = writeLock.acquire()) { + onHeapCache.put(key, value); + } + updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value); + } else { + try (ReleasableLock ignore = writeLock.acquire()) { + for (Map.Entry, TierInfo> entry : this.caches.entrySet()) { + if (cacheValueTuple.v2().equals(entry.getValue().tierName)) { + entry.getKey().put(key, value); + } } } updateStatsOnPut(cacheValueTuple.v2(), key, value); @@ -281,7 +288,7 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> // getValueFromTieredCache(), // we will see all misses. Instead, handle stats in computeIfAbsent(). Tuple cacheValueTuple; - CompletableFuture, V>> future = null; + CompletableFuture, V>, Boolean>> future = null; try (ReleasableLock ignore = readLock.acquire()) { cacheValueTuple = getValueFromTieredCache(false).apply(key); if (cacheValueTuple == null) { @@ -297,22 +304,25 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. // This is needed as there can be many requests for the same key at the same time and we only want to load // the value once. - V value = compute(key, loader, future); - // Handle stats - if (loader.isLoaded()) { - // The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk - // cache - // if present - updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value); - statsHolder.incrementMisses(heapDimensionValues); - if (caches.get(diskCache).isEnabled()) { - statsHolder.incrementMisses(diskDimensionValues); + Tuple> computedValueTuple = compute(key, loader, future); + boolean wasCacheMiss = computedValueTuple.v2().v1(); + boolean wasRejectedByPolicy = computedValueTuple.v2().v2(); + // If the value was rejected by policy, it counts as neither a hit or miss. + if (!wasRejectedByPolicy) { + // Handle stats + if (wasCacheMiss) { + // The value was just computed and added to the cache by this thread. + // Register a miss for the heap cache, and the disk cache if present + statsHolder.incrementMisses(heapDimensionValues); + if (caches.get(diskCache).isEnabled()) { + statsHolder.incrementMisses(diskDimensionValues); + } + } else { + // Another thread requesting this key already loaded the value. Register a hit for the heap cache + statsHolder.incrementHits(heapDimensionValues); } - } else { - // Another thread requesting this key already loaded the value. Register a hit for the heap cache - statsHolder.incrementHits(heapDimensionValues); } - return value; + return computedValueTuple.v1(); } else { // Handle stats for an initial hit from getValueFromTieredCache() if (cacheValueTuple.v2().equals(TIER_DIMENSION_VALUE_ON_HEAP)) { @@ -327,20 +337,33 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> return cacheValueTuple.v1(); } - private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader, CompletableFuture, V>> future) - throws Exception { - // Handler to handle results post-processing. Takes a tuple or exception as an input and returns - // the value. Also before returning value, puts the value in cache. - BiFunction, V>, Throwable, Void> handler = (pair, ex) -> { - if (pair != null) { + private Tuple> compute( + ICacheKey key, + LoadAwareCacheLoader, V> loader, + CompletableFuture, V>, Boolean>> future + ) throws Exception { + // Handler to handle results post-processing. Takes a Tuple, boolean>, where the boolean represents whether + // this key/value pair was rejected by the policies, + // or exception as an input and returns the value. Also before returning value, puts the value in cache if accepted by policies. + boolean wasCacheMiss = false; + boolean wasRejectedByPolicy = false; + BiFunction, V>, Boolean>, Throwable, Void> handler = (pairInfo, ex) -> { + Tuple, V> pair = pairInfo.v1(); + boolean rejectedByPolicy = pairInfo.v2(); + if (pair != null && !rejectedByPolicy) { + boolean didAddToCache = false; try (ReleasableLock ignore = writeLock.acquire()) { onHeapCache.put(pair.v1(), pair.v2()); + didAddToCache = true; } catch (Exception e) { // TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal // listeners/stats. Needs better exception handling at underlying layers.For now swallowing // exception. logger.warn("Exception occurred while putting item onto heap cache", e); } + if (didAddToCache) { + updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, pair.v2()); + } } else { if (ex != null) { logger.warn("Exception occurred while trying to compute the value", ex); @@ -364,16 +387,20 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader future.completeExceptionally(npe); throw new ExecutionException(npe); } else { - future.complete(new Tuple<>(key, value)); + wasRejectedByPolicy = !evaluatePoliciesList(value, policies); + future.complete(new Tuple<>(new Tuple<>(key, value), wasRejectedByPolicy)); + wasCacheMiss = !wasRejectedByPolicy; } } else { try { - value = future.get().v2(); + Tuple, V>, Boolean> futureTuple = future.get(); + wasRejectedByPolicy = futureTuple.v2(); + value = futureTuple.v1().v2(); } catch (InterruptedException ex) { throw new IllegalStateException(ex); } } - return value; + return new Tuple<>(value, new Tuple<>(wasCacheMiss, wasRejectedByPolicy)); } @Override @@ -442,7 +469,9 @@ void handleRemovalFromHeapTier(RemovalNotification, V> notification boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()); boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier boolean exceptionOccurredOnDiskCachePut = false; - boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue()); + boolean canCacheOnDisk = caches.get(diskCache).isEnabled() + && wasEvicted + && evaluatePoliciesList(notification.getValue(), diskPolicies); if (canCacheOnDisk) { try (ReleasableLock ignore = writeLock.acquire()) { diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats @@ -465,8 +494,8 @@ void handleRemovalFromHeapTier(RemovalNotification, V> notification updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue(), countEvictionTowardsTotal); } - boolean evaluatePolicies(V value) { - for (Predicate policy : policies) { + boolean evaluatePoliciesList(V value, List> policiesList) { + for (Predicate policy : policiesList) { if (!policy.test(value)) { return false; } @@ -822,8 +851,8 @@ public ICache create(CacheConfig config, CacheType cacheType, } ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName); - TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType) - .get(settings); + TimeValue tookTimePolicyThreshold = TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType).get(settings); + TimeValue tookTimeDiskPolicyThreshold = TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType).get(settings); Function cachedResultParser = Objects.requireNonNull( config.getCachedResultParser(), "Cached result parser fn can't be null" @@ -849,7 +878,22 @@ public ICache create(CacheConfig config, CacheType cacheType, .setCacheConfig(config) .setCacheType(cacheType) .setNumberOfSegments(numberOfSegments) - .addPolicy(new TookTimePolicy(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType)) + .addPolicy( + new TookTimePolicy<>( + tookTimePolicyThreshold, + cachedResultParser, + config.getClusterSettings(), + TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType) + ) + ) + .addDiskPolicy( + new TookTimePolicy<>( + tookTimeDiskPolicyThreshold, + cachedResultParser, + config.getClusterSettings(), + TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType) + ) + ) .setOnHeapCacheSizeInBytes(onHeapCacheSize) .setDiskCacheSize(diskCacheSize) .build(); @@ -873,7 +917,8 @@ public static class Builder { private CacheConfig cacheConfig; private CacheType cacheType; private Map cacheFactories; - private final ArrayList> policies = new ArrayList<>(); + private final List> policies = new ArrayList<>(); + private final List> diskPolicies = new ArrayList<>(); private int numberOfSegments; private long onHeapCacheSizeInBytes; @@ -945,7 +990,7 @@ public Builder setCacheFactories(Map cacheFactorie } /** - * Set a cache policy to be used to limit access to this cache's disk tier. + * Set a cache policy to be used to limit access to this cache. * @param policy the policy * @return builder */ @@ -955,12 +1000,12 @@ public Builder addPolicy(Predicate policy) { } /** - * Set multiple policies to be used to limit access to this cache's disk tier. - * @param policies the policies + * Set a cache policy to be used to limit access to this cache's disk tier. + * @param diskPolicy the policy * @return builder */ - public Builder addPolicies(List> policies) { - this.policies.addAll(policies); + public Builder addDiskPolicy(Predicate diskPolicy) { + this.diskPolicies.add(diskPolicy); return this; } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index bf522b42b70ca..d1d033fae8cd2 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -21,6 +21,7 @@ import java.util.Map; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; /** @@ -62,6 +63,7 @@ public List> getSettings() { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); + settingList.add(TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) { settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType)); } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index 31dc1795134e4..790e2ead729fe 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -38,6 +38,16 @@ public class TieredSpilloverCacheSettings { */ public static final long MIN_DISK_CACHE_SIZE_IN_BYTES = 10485760L; + /** + * The default took time threshold for a value to enter the heap tier of the cache, and therefore to enter the cache at all. + */ + public static final TimeValue DEFAULT_TOOK_TIME_THRESHOLD = TimeValue.ZERO; + + /** + * The default took time threshold for a value to enter the disk tier of the cache. + */ + public static final TimeValue DEFAULT_TOOK_TIME_DISK_THRESHOLD = new TimeValue(10, TimeUnit.MILLISECONDS); + /** * Setting which defines the onHeap cache store to be used in TieredSpilloverCache. * @@ -109,13 +119,27 @@ public class TieredSpilloverCacheSettings { ); /** - * Setting defining the minimum took time for a query to be allowed into the disk cache. + * Setting defining the minimum took time for a query to be allowed in the cache. + */ + private static final Setting.AffixSetting TIERED_SPILLOVER_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting( + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".policies.took_time.threshold", + (key) -> Setting.timeSetting( + key, + DEFAULT_TOOK_TIME_THRESHOLD, + TimeValue.ZERO, // Minimum value for this setting + NodeScope, + Setting.Property.Dynamic + ) + ); + + /** + * Setting defining the minimum took time for a query to be allowed in the disk tier of the cache. */ private static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting( TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold", (key) -> Setting.timeSetting( key, - new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting + DEFAULT_TOOK_TIME_DISK_THRESHOLD, TimeValue.ZERO, // Minimum value for this setting NodeScope, Setting.Property.Dynamic @@ -128,6 +152,12 @@ public class TieredSpilloverCacheSettings { */ public static final Map> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; + /** + * Stores took time policy settings for the disk tiers of various cache types as these are dynamic so that can be registered and + * retrieved accordingly. + */ + public static final Map> TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP; + /** * Stores disk cache enabled settings for various cache types as these are dynamic so that can be registered and * retrieved accordingly. @@ -139,9 +169,14 @@ public class TieredSpilloverCacheSettings { */ static { Map> concreteTookTimePolicySettingMap = new HashMap<>(); + Map> concreteDiskTookTimePolicySettingMap = new HashMap<>(); Map> diskCacheSettingMap = new HashMap<>(); for (CacheType cacheType : CacheType.values()) { concreteTookTimePolicySettingMap.put( + cacheType, + TIERED_SPILLOVER_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + ); + concreteDiskTookTimePolicySettingMap.put( cacheType, TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); @@ -151,6 +186,7 @@ public class TieredSpilloverCacheSettings { ); } TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap; + TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP = concreteDiskTookTimePolicySettingMap; DISK_CACHE_ENABLED_SETTING_MAP = diskCacheSettingMap; } diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java index 000067280e50d..535274b30f2d9 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java @@ -11,7 +11,6 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; -import org.opensearch.common.Randomness; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -20,7 +19,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.test.OpenSearchTestCase; @@ -28,7 +26,7 @@ import java.io.IOException; import java.util.HashSet; -import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; @@ -52,7 +50,12 @@ public void setup() { } private TookTimePolicy getTookTimePolicy(TimeValue threshold) { - return new TookTimePolicy<>(threshold, transformationFunction, clusterSettings, CacheType.INDICES_REQUEST_CACHE); + return new TookTimePolicy<>( + threshold, + transformationFunction, + clusterSettings, + TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + ); } public void testTookTimePolicy() throws Exception { @@ -75,17 +78,31 @@ public void testTookTimePolicy() throws Exception { assertTrue(longResult); } - public void testNegativeOneInput() throws Exception { - // PolicyValues with -1 took time can be passed to this policy if we shouldn't accept it for whatever reason - TookTimePolicy tookTimePolicy = getTookTimePolicy(TimeValue.ZERO); - BytesReference minusOne = getValidPolicyInput(-1L); - assertFalse(tookTimePolicy.test(minusOne)); - } - public void testInvalidThreshold() throws Exception { assertThrows(IllegalArgumentException.class, () -> getTookTimePolicy(TimeValue.MINUS_ONE)); } + public void testZeroThresholdSkipsCheck() throws Exception { + AtomicInteger numChecksRun = new AtomicInteger(); + Function dummyTransformationFunction = (data) -> { + numChecksRun.incrementAndGet(); + try { + return CachedQueryResult.getPolicyValues(data); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + TookTimePolicy policy = new TookTimePolicy<>( + TimeValue.ZERO, + dummyTransformationFunction, + clusterSettings, + TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE) + ); + BytesReference minusOne = getValidPolicyInput(-1L); + assertTrue(policy.test(minusOne)); + assertEquals(0, numChecksRun.get()); + } + private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException { // When it's used in the cache, the policy will receive BytesReferences which come from // serializing a CachedQueryResult. @@ -109,11 +126,4 @@ private QuerySearchResult getQSR() { ); return mockQSR; } - - private void writeRandomBytes(StreamOutput out, int numBytes) throws IOException { - Random rand = Randomness.get(); - byte[] bytes = new byte[numBytes]; - rand.nextBytes(bytes); - out.writeBytes(bytes); - } } diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 56b7072ddb590..f632724d89aae 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -24,6 +24,7 @@ import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -57,10 +58,12 @@ import java.util.function.Predicate; import static org.opensearch.cache.common.tier.TieredSpilloverCache.ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DEFAULT_TOOK_TIME_DISK_THRESHOLD; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.MIN_DISK_CACHE_SIZE_IN_BYTES; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_NAME; import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK; @@ -83,6 +86,7 @@ public void setup() { Settings settings = Settings.EMPTY; clusterSettings = new ClusterSettings(settings, new HashSet<>()); clusterSettings.registerSetting(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)); + clusterSettings.registerSetting(TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)); clusterSettings.registerSetting(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE)); } @@ -191,8 +195,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception .setValueSerializer(new StringSerializer()) .setSettings(settings) .setDimensionNames(dimensionNames) - .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken - // 20_000_000 ns = 20 ms to compute + // Values will always appear to have taken 2x the took time threshold to compute, so they will be admitted + .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(DEFAULT_TOOK_TIME_DISK_THRESHOLD.getNanos() * 2)) .setClusterSettings(clusterSettings) .setStoragePath(storagePath) .build(), @@ -291,8 +295,8 @@ public void testComputeIfAbsentWithSegmentedCache() throws Exception { .setValueSerializer(new StringSerializer()) .setSettings(settings) .setDimensionNames(dimensionNames) - .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken - // 20_000_000 ns = 20 ms to compute + // Values will always appear to have taken 2x the took time threshold to compute, so they will be admitted + .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(DEFAULT_TOOK_TIME_DISK_THRESHOLD.getNanos() * 2)) .setClusterSettings(clusterSettings) .setStoragePath(storagePath) .setSegmentCount(numberOfSegments) @@ -1155,6 +1159,7 @@ public void testComputeIfAbsentWithOnHeapCacheThrowingExceptionOnPut() throws Ex mockDiskCacheFactory, cacheConfig, null, + null, removalListener, 1, onHeapCacheSize * keyValueSize, @@ -1202,6 +1207,7 @@ public void testComputeIfAbsentWithDiskCacheThrowingExceptionOnPut() throws Exce mockDiskCacheFactory, cacheConfig, null, + null, removalListener, 1, onHeapCacheSize * keyValueSize, @@ -1356,14 +1362,13 @@ public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exceptio } public void testDiskTierPolicies() throws Exception { - // For policy function, allow if what it receives starts with "a" and string is even length - ArrayList> policies = new ArrayList<>(); - policies.add(new AllowFirstLetterA()); - policies.add(new AllowEvenLengths()); + // For disk policy function, allow if what it receives starts with "a" and string is even length + Tuple>, Map>> setupTuple = setupPoliciesTest(); + List> diskPolicies = setupTuple.v1(); int keyValueSize = 50; MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); - TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( keyValueSize, keyValueSize * 100, removalListener, @@ -1376,35 +1381,22 @@ public void testDiskTierPolicies() throws Exception { ) .build(), 0, - policies, + diskPolicies, 1 ); - Map keyValuePairs = new HashMap<>(); - Map expectedOutputs = new HashMap<>(); - keyValuePairs.put("key1", "abcd"); - expectedOutputs.put("key1", true); - keyValuePairs.put("key2", "abcde"); - expectedOutputs.put("key2", false); - keyValuePairs.put("key3", "bbc"); - expectedOutputs.put("key3", false); - keyValuePairs.put("key4", "ab"); - expectedOutputs.put("key4", true); - keyValuePairs.put("key5", ""); - expectedOutputs.put("key5", false); - + Map> keyValuePairs = setupTuple.v2(); LoadAwareCacheLoader, String> loader = getLoadAwareCacheLoader(keyValuePairs); - int expectedEvictions = 0; for (String key : keyValuePairs.keySet()) { ICacheKey iCacheKey = getICacheKey(key); - Boolean expectedOutput = expectedOutputs.get(key); + Boolean expectedOutput = keyValuePairs.get(key).v2(); String value = tieredSpilloverCache.computeIfAbsent(iCacheKey, loader); - assertEquals(keyValuePairs.get(key), value); + assertEquals(keyValuePairs.get(key).v1(), value); String result = tieredSpilloverCache.get(iCacheKey); if (expectedOutput) { // Should retrieve from disk tier if it was accepted - assertEquals(keyValuePairs.get(key), result); + assertEquals(keyValuePairs.get(key).v1(), result); } else { // Should miss as heap tier size = 0 and the policy rejected it assertNull(result); @@ -1419,19 +1411,70 @@ public void testDiskTierPolicies() throws Exception { assertEquals(expectedEvictions, getTotalStatsSnapshot(tieredSpilloverCache).getEvictions()); } - public void testTookTimePolicyFromFactory() throws Exception { + private Tuple>, Map>> setupPoliciesTest() { + ArrayList> policies = new ArrayList<>(); + policies.add(new AllowFirstLetterA()); + policies.add(new AllowEvenLengths()); + + // Map from key to tuple of (value, whether we expect it to be admitted by policy) + Map> keyValuePairs = new HashMap<>(); + keyValuePairs.put("key1", new Tuple<>("abcd", true)); + keyValuePairs.put("key2", new Tuple<>("abcde", false)); + keyValuePairs.put("key3", new Tuple<>("bbc", false)); + keyValuePairs.put("key4", new Tuple<>("ab", true)); + keyValuePairs.put("key5", new Tuple<>("", false)); + return new Tuple<>(policies, keyValuePairs); + } + + public void testTookTimePoliciesFromFactory() throws Exception { // Mock took time by passing this map to the policy info wrapper fn // The policy inspects values, not keys, so this is a map from values -> took time + + long cacheThresholdNanos = 2_000_000L; + long diskThresholdNanos = 11_000_000L; Map tookTimeMap = new HashMap<>(); - tookTimeMap.put("a", 10_000_000L); + tookTimeMap.put("a", diskThresholdNanos); tookTimeMap.put("b", 0L); - tookTimeMap.put("c", 99_999_999L); + tookTimeMap.put("c", diskThresholdNanos * 3); tookTimeMap.put("d", null); tookTimeMap.put("e", -1L); - tookTimeMap.put("f", 8_888_888L); - long timeValueThresholdNanos = 10_000_000L; - - Map keyValueMap = Map.of("A", "a", "B", "b", "C", "c", "D", "d", "E", "e", "F", "f"); + tookTimeMap.put("f", cacheThresholdNanos * 2); + tookTimeMap.put("g", cacheThresholdNanos - 1); + assertTrue(cacheThresholdNanos * 2 < diskThresholdNanos); + + Map keyValueMap = Map.of("A", "a", "B", "b", "C", "c", "D", "d", "E", "e", "F", "f", "G", "g"); + Map expectedInHeapTierMap = Map.of( + "A", + true, + "B", + false, + "C", + true, + "D", + false, + "E", + false, + "F", + true, + "G", + false + ); + Map expectedInDiskTierMap = Map.of( + "A", + true, + "B", + false, + "C", + true, + "D", + false, + "E", + false, + "F", + false, + "G", + false + ); // Most of setup duplicated from testComputeIfAbsentWithFactoryBasedCacheCreation() int onHeapCacheSize = randomIntBetween(tookTimeMap.size() + 1, tookTimeMap.size() + 30); @@ -1460,10 +1503,9 @@ public void testTookTimePolicyFromFactory() throws Exception { ).getKey(), onHeapCacheSize * keyValueSize + "b" ) - .put( - TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), - new TimeValue(timeValueThresholdNanos / 1_000_000) - ) + // Initialize the settings to some other value, so we can demonstrate the updating logic works correctly. + .put(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), TimeValue.ZERO) + .put(TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), TimeValue.ZERO) .put(TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()).getKey(), 1) .build(); @@ -1497,28 +1539,57 @@ public CachedQueryResult.PolicyValues apply(String s) { TieredSpilloverCache tieredSpilloverCache = (TieredSpilloverCache) tieredSpilloverICache; - // First add all our values to the on heap cache - for (String key : tookTimeMap.keySet()) { - tieredSpilloverCache.computeIfAbsent(getICacheKey(key), getLoadAwareCacheLoader(keyValueMap)); + // Change setting values to the target values to show both updates work as expected. + clusterSettings.applySettings( + Settings.builder() + .put( + TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(cacheThresholdNanos / 1_000_000) + ) + .put( + TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(diskThresholdNanos / 1_000_000) + ) + .build() + ); + + Map> loaderMap = new HashMap<>(); + for (String key : keyValueMap.keySet()) { + // The boolean here is not needed, just to fit with the get loader method + loaderMap.put(key, new Tuple<>(keyValueMap.get(key), false)); + } + LoadAwareCacheLoader, String> loader = getLoadAwareCacheLoader(loaderMap); + // First check whether keys respect the heap tier threshold. + int expectedKeys = 0; + for (String key : keyValueMap.keySet()) { + tieredSpilloverCache.computeIfAbsent(getICacheKey(key), loader); + if (expectedInHeapTierMap.get(key)) { + expectedKeys++; + } } - assertEquals(tookTimeMap.size(), tieredSpilloverCache.count()); + assertEquals(0, removalListener.evictionsMetric.count()); + assertEquals(0, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(expectedKeys, tieredSpilloverCache.count()); - // Ensure all these keys get evicted from the on heap tier by adding > heap tier size worth of random keys (this works as we have 1 - // segment) + // Ensure all these keys get evicted from the on heap tier by adding > heap tier size worth of random keys + // (this works as we have 1 segment). Set heap threshold to 0 to ensure random keys can all enter + clusterSettings.applySettings( + Settings.builder() + .put(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), TimeValue.ZERO) + .build() + ); for (int i = 0; i < onHeapCacheSize; i++) { - tieredSpilloverCache.computeIfAbsent(getICacheKey(UUID.randomUUID().toString()), getLoadAwareCacheLoader(keyValueMap)); + tieredSpilloverCache.computeIfAbsent(getICacheKey(UUID.randomUUID().toString()), getLoadAwareCacheLoader()); } - for (String key : tookTimeMap.keySet()) { + for (String key : keyValueMap.keySet()) { ICacheKey iCacheKey = getICacheKey(key); assertNull(tieredSpilloverCache.getTieredCacheSegment(iCacheKey).getOnHeapCache().get(iCacheKey)); } // Now the original keys should be in the disk tier if the policy allows them, or misses if not - for (String key : tookTimeMap.keySet()) { + for (String key : keyValueMap.keySet()) { String computedValue = tieredSpilloverCache.get(getICacheKey(key)); - String mapValue = keyValueMap.get(key); - Long tookTime = tookTimeMap.get(mapValue); - if (tookTime != null && tookTime > timeValueThresholdNanos) { + if (expectedInDiskTierMap.get(key)) { // expect a hit assertNotNull(computedValue); } else { @@ -1543,6 +1614,139 @@ public void testMinimumThresholdSettingValue() throws Exception { assertEquals(validDuration, concreteSetting.get(validSettings)); } + public void testEntryPoliciesWithPut() throws Exception { + Tuple>, Map>> setupTuple = setupPoliciesTest(); + List> policies = setupTuple.v1(); + Map> keyValuePairs = setupTuple.v2(); + + int keyValueSize = 50; + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + keyValueSize * 100, + removalListener, + Settings.builder() + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + keyValueSize * keyValuePairs.size() + 1 + "b" + ) + .build(), + 0, + policies, + null, + 1 + ); + + int expectedKeys = 0; + for (String key : keyValuePairs.keySet()) { + ICacheKey iCacheKey = getICacheKey(key); + tieredSpilloverCache.put(iCacheKey, keyValuePairs.get(key).v1()); + Boolean expectedOutput = keyValuePairs.get(key).v2(); + String result = tieredSpilloverCache.get(iCacheKey); + if (expectedOutput) { + // Should retrieve from heap tier if it was accepted + assertEquals(keyValuePairs.get(key).v1(), result); + expectedKeys++; + } else { + // Should miss as the policy rejected it + assertNull(result); + } + } + + assertEquals(0, getEvictionsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(expectedKeys, getTotalStatsSnapshot(tieredSpilloverCache).getItems()); + assertEquals(0, removalListener.evictionsMetric.count()); + } + + public void testEntryPoliciesConcurrentlyWithComputeIfAbsent() throws Exception { + Tuple>, Map>> setupTuple = setupPoliciesTest(); + List> policies = setupTuple.v1(); + Map> keyValuePairs = setupTuple.v2(); + + int keyValueSize = 50; + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + LoadAwareCacheLoader, String> loader = getLoadAwareCacheLoader(keyValuePairs); + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + keyValueSize * 100, + removalListener, + Settings.builder() + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + keyValueSize * keyValuePairs.size() + 1 + "b" + ) + .build(), + 0, + policies, + null, + 1 + ); + + // To test concurrently, run for each key multiple times in parallel threads + int numRepetitionsPerKey = 10; + int numThreads = keyValuePairs.size() * numRepetitionsPerKey; + + Thread[] threads = new Thread[numThreads]; + Phaser phaser = new Phaser(numThreads + 1); + CountDownLatch countDownLatch = new CountDownLatch(numThreads); + + // Get number of keys we expect to enter the cache + int expectedKeys = 0; + for (String key : keyValuePairs.keySet()) { + Boolean expectedOutput = keyValuePairs.get(key).v2(); + if (expectedOutput) { + expectedKeys++; + } + } + + int threadNumber = 0; + for (String key : keyValuePairs.keySet()) { + for (int j = 0; j < numRepetitionsPerKey; j++) { + threads[threadNumber] = new Thread(() -> { + try { + phaser.arriveAndAwaitAdvance(); + ICacheKey iCacheKey = getICacheKey(key); + tieredSpilloverCache.computeIfAbsent(iCacheKey, loader); + } catch (Exception ignored) {} finally { + countDownLatch.countDown(); + } + }); + threads[threadNumber].start(); + threadNumber++; + } + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + assertEquals(0, getEvictionsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(expectedKeys, getItemsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + // We should have (numRepetitionsPerKey - 1) * (expectedKeys) hits + assertEquals((numRepetitionsPerKey - 1) * expectedKeys, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + // We should have 1 miss for each accepted key. Rejected keys should not cause misses. + assertEquals(expectedKeys, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + + for (String key : keyValuePairs.keySet()) { + ICacheKey iCacheKey = getICacheKey(key); + String result = tieredSpilloverCache.get(iCacheKey); + Boolean expectedInCache = keyValuePairs.get(key).v2(); + if (expectedInCache) { + // Should retrieve from heap tier if it was accepted + assertEquals(keyValuePairs.get(key).v1(), result); + } else { + // Should miss as the policy rejected it + assertNull(result); + } + } + + assertEquals(0, getEvictionsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(expectedKeys, getTotalStatsSnapshot(tieredSpilloverCache).getItems()); + assertEquals(0, removalListener.evictionsMetric.count()); + } + public void testPutWithDiskCacheDisabledSetting() throws Exception { int onHeapCacheSize = randomIntBetween(10, 30); int diskCacheSize = randomIntBetween(300, 500); @@ -1972,8 +2176,8 @@ public void testWithInvalidSegmentNumber() throws Exception { .setValueSerializer(new StringSerializer()) .setSettings(settings) .setDimensionNames(dimensionNames) - .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken - // 20_000_000 ns = 20 ms to compute + // Values will always appear to have taken 2x the took time threshold to compute, so they will be admitted + .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(DEFAULT_TOOK_TIME_DISK_THRESHOLD.getNanos() * 2)) .setClusterSettings(clusterSettings) .setStoragePath(storagePath) .build(), @@ -2037,8 +2241,8 @@ public void testWithVeryLowDiskCacheSize() throws Exception { .setValueSerializer(new StringSerializer()) .setSettings(settings) .setDimensionNames(dimensionNames) - .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken - // 20_000_000 ns = 20 ms to compute + // Values will always appear to have taken 2x the took time threshold to compute, so they will be admitted + .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(DEFAULT_TOOK_TIME_DISK_THRESHOLD.getNanos() * 2)) .setClusterSettings(clusterSettings) .setStoragePath(storagePath) .build(), @@ -2096,8 +2300,8 @@ public void testTieredCacheDefaultSegmentCount() { .setValueSerializer(new StringSerializer()) .setSettings(settings) .setDimensionNames(dimensionNames) - .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken - // 20_000_000 ns = 20 ms to compute + // Values will always appear to have taken 2x the took time threshold to compute, so they will be admitted + .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(DEFAULT_TOOK_TIME_DISK_THRESHOLD.getNanos() * 2)) .setClusterSettings(clusterSettings) .setStoragePath(storagePath) .build(), @@ -2180,8 +2384,8 @@ public void testSegmentSizesWhenUsingFactory() { .setValueSerializer(new StringSerializer()) .setSettings(settings) .setDimensionNames(dimensionNames) - .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken - // 20_000_000 ns = 20 ms to compute + // Values will always appear to have taken 2x the took time threshold to compute, so they will be admitted + .setCachedResultParser(s -> new CachedQueryResult.PolicyValues(DEFAULT_TOOK_TIME_DISK_THRESHOLD.getNanos() * 2)) .setClusterSettings(clusterSettings) .setStoragePath(storagePath) .build(), @@ -2227,6 +2431,7 @@ public void testSegmentSizesWhenNotUsingFactory() { new MockDiskCache.MockDiskCacheFactory(0, diskSizeFromImplSetting, true, keyValueSize), cacheConfig, null, + null, removalListener, numSegments, expectedHeapSize, @@ -2351,14 +2556,14 @@ public boolean isLoaded() { }; } - private LoadAwareCacheLoader, String> getLoadAwareCacheLoader(Map keyValueMap) { + private LoadAwareCacheLoader, String> getLoadAwareCacheLoader(Map> keyValueMap) { return new LoadAwareCacheLoader<>() { boolean isLoaded = false; @Override public String load(ICacheKey key) { isLoaded = true; - String mapValue = keyValueMap.get(key.key); + String mapValue = keyValueMap.get(key.key).v1(); if (mapValue == null) { mapValue = UUID.randomUUID().toString(); } @@ -2377,6 +2582,7 @@ private TieredSpilloverCache getTieredSpilloverCache( ICache.Factory mockDiskCacheFactory, CacheConfig cacheConfig, List> policies, + List> diskPolicies, RemovalListener, String> removalListener, int numberOfSegments, long onHeapCacheSizeInBytes, @@ -2393,7 +2599,14 @@ private TieredSpilloverCache getTieredSpilloverCache( .setOnHeapCacheSizeInBytes(onHeapCacheSizeInBytes) .setCacheConfig(cacheConfig); if (policies != null) { - builder.addPolicies(policies); + for (Predicate policy : policies) { + builder.addPolicy(policy); + } + } + if (diskPolicies != null) { + for (Predicate diskPolicy : diskPolicies) { + builder.addDiskPolicy(diskPolicy); + } } return builder.build(); } @@ -2406,7 +2619,7 @@ private TieredSpilloverCache initializeTieredSpilloverCache( long diskDeliberateDelay ) { - return intializeTieredSpilloverCache(keyValueSize, diskCacheSize, removalListener, settings, diskDeliberateDelay, null, 256); + return initializeTieredSpilloverCache(keyValueSize, diskCacheSize, removalListener, settings, diskDeliberateDelay, null, 256); } private TieredSpilloverCache initializeTieredSpilloverCache( @@ -2418,7 +2631,7 @@ private TieredSpilloverCache initializeTieredSpilloverCache( int numberOfSegments ) { - return intializeTieredSpilloverCache( + return initializeTieredSpilloverCache( keyValueSize, diskCacheSize, removalListener, @@ -2429,13 +2642,35 @@ private TieredSpilloverCache initializeTieredSpilloverCache( ); } - private TieredSpilloverCache intializeTieredSpilloverCache( + private TieredSpilloverCache initializeTieredSpilloverCache( + int keyValueSize, + int diskCacheSize, + RemovalListener, String> removalListener, + Settings settings, + long diskDeliberateDelay, + List> diskPolicies, + int numberOfSegments + ) { + return initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + settings, + diskDeliberateDelay, + new ArrayList>(), + diskPolicies, + numberOfSegments + ); + } + + private TieredSpilloverCache initializeTieredSpilloverCache( int keyValueSize, int diskCacheSize, RemovalListener, String> removalListener, Settings settings, long diskDeliberateDelay, List> policies, + List> diskPolicies, int numberOfSegments ) { ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); @@ -2481,6 +2716,7 @@ private TieredSpilloverCache intializeTieredSpilloverCache( mockDiskCacheFactory, cacheConfig, policies, + diskPolicies, removalListener, numberOfSegments, onHeapCacheSizeInBytes,