From cb2cf889a89bb81f689d1f0efc43934e85313f88 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 25 Jun 2024 16:33:53 +0200 Subject: [PATCH 1/3] Add a `targetSearchConcurrency` parameter to `LogMergePolicy`. This adds the same `targetSearchConcurrency` parameter to `LogMergePolicy` that #13430 is adding to `TieredMergePolicy`. The implementation is simpler here since `LogMergePolicy` is constrained to only merging adjacent segments. The downside is that this option results is significantly more write amplification on this merge policy compared with `TieredMergePolicy`, which is allowed to merge non-adjacent segments. From simulating merging on an index that gets appended 555 equal-size segments (like nightly benchmarks) and a `targetSearchConcurrency` of 8, I get the following results: - 19.1 segments in the index on average, vs. 11.1 with a `targetSearchConcurrency` of 1. - Maximum number of segments in the index of 28, vs. 22 with a `targetSearchConcurrency` of 1. - Write amplification through merging of 3.6 instead of 2.9. For comparison, 3.6 is about the write amplification that you get with a merge factor of 7 otherwise. - The resulting index has 24 segments, including 11 segments on the highest tier, vs. 15 segments and 5 segments on the highest tier with a `targetSearchConcurrency` of 1. --- .../apache/lucene/index/LogMergePolicy.java | 33 +++++++++++++++++++ .../lucene/TestMergeSchedulerExternal.java | 2 +- .../index/TestConcurrentMergeScheduler.java | 1 + .../index/TestIndexWriterExceptions.java | 1 + .../index/TestIndexWriterMergePolicy.java | 4 ++- .../lucene/index/TestIndexWriterReader.java | 8 +++-- .../lucene/index/TestLogMergePolicy.java | 6 ++-- .../lucene/tests/util/LuceneTestCase.java | 5 +-- 8 files changed, 52 insertions(+), 8 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java index f7c5011d9c6e..a86809a1fb15 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java @@ -90,6 +90,12 @@ public abstract class LogMergePolicy extends MergePolicy { /** If true, we pro-rate a segment's size by the percentage of non-deleted documents. */ protected boolean calibrateSizeByDeletes = true; + /** + * Target search concurrency. This merge policy will avoid creating segments that have more than + * {@code maxDoc / targetSearchConcurrency} documents. + */ + protected int targetSearchConcurrency = 1; + /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */ public LogMergePolicy() { super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE); @@ -131,6 +137,28 @@ public boolean getCalibrateSizeByDeletes() { return calibrateSizeByDeletes; } + /** + * Sets the target search concurrency. This prevents creating segments that are bigger than + * maxDoc/targetSearchConcurrency, which in turn makes the work parallelizable into + * targetSearchConcurrency slices of similar doc counts. + * + *

NOTE: Configuring a value greater than 1 will increase the number of segments in the + * index linearly with the value of {@code targetSearchConcurrency} and also increase write + * amplification. + */ + public void setTargetSearchConcurrency(int targetSearchConcurrency) { + if (targetSearchConcurrency < 1) { + throw new IllegalArgumentException( + "targetSearchConcurrency must be >= 1 (got " + targetSearchConcurrency + ")"); + } + this.targetSearchConcurrency = targetSearchConcurrency; + } + + /** Returns the target search concurrency. */ + public int getTargetSearchConcurrency() { + return targetSearchConcurrency; + } + /** * Return the number of documents in the provided {@link SegmentCommitInfo}, pro-rated by * percentage of non-deleted documents if {@link #setCalibrateSizeByDeletes} is set. @@ -484,8 +512,10 @@ public MergeSpecification findMerges( final Set mergingSegments = mergeContext.getMergingSegments(); + int totalDocCount = 0; for (int i = 0; i < numSegments; i++) { final SegmentCommitInfo info = infos.info(i); + totalDocCount += sizeDocs(info, mergeContext); long size = size(info, mergeContext); // Floor tiny segments @@ -575,6 +605,9 @@ public MergeSpecification findMerges( mergeContext); } + final int maxMergeDocs = + Math.min(this.maxMergeDocs, Math.ceilDiv(totalDocCount, targetSearchConcurrency)); + // Finally, record all merges that are viable at this level: int end = start + mergeFactor; while (end <= 1 + upto) { diff --git a/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java b/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java index 506a76f99895..99e1e8e1fc0b 100644 --- a/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java +++ b/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java @@ -132,7 +132,7 @@ public void testSubclassConcurrentMergeScheduler() throws IOException { logMP.setMergeFactor(10); try { - for (int i = 0; i < 20; i++) { + for (int i = 0; i < 60; i++) { writer.addDocument(doc); } } catch ( diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java index 5f85b5d37741..e0b2c49d8548 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java @@ -802,6 +802,7 @@ public boolean isEnabled(String component) { iwc.setMaxBufferedDocs(2); LogMergePolicy lmp = newLogMergePolicy(); lmp.setMergeFactor(2); + lmp.setTargetSearchConcurrency(1); iwc.setMergePolicy(lmp); IndexWriter w = new IndexWriter(dir, iwc); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java index 674ba6eac651..6f693a7fe38b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java @@ -471,6 +471,7 @@ public void testExceptionOnMergeInit() throws IOException { cms.setSuppressExceptions(); conf.setMergeScheduler(cms); ((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2); + ((LogMergePolicy) conf.getMergePolicy()).setTargetSearchConcurrency(1); TestPoint3 testPoint = new TestPoint3(); IndexWriter w = RandomIndexWriter.mockIndexWriter(random(), dir, conf, testPoint); testPoint.doFail = true; diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java index 1835a4ff3097..c56feb62d84a 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java @@ -900,8 +900,10 @@ public MergeSpecification findFullFlushMerges( } public void testSetDiagnostics() throws IOException { + LogMergePolicy logMp = newLogMergePolicy(4); + logMp.setTargetSearchConcurrency(1); MergePolicy myMergePolicy = - new FilterMergePolicy(newLogMergePolicy(4)) { + new FilterMergePolicy(logMp) { @Override public MergeSpecification findMerges( MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java index 51b502a122b4..82aa548df806 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterReader.java @@ -999,6 +999,8 @@ public void testEmptyIndex() throws Exception { public void testSegmentWarmer() throws Exception { Directory dir = newDirectory(); final AtomicBoolean didWarm = new AtomicBoolean(); + LogMergePolicy mp = newLogMergePolicy(10); + mp.setTargetSearchConcurrency(1); IndexWriter w = new IndexWriter( dir, @@ -1012,7 +1014,7 @@ public void testSegmentWarmer() throws Exception { assertEquals(20, count); didWarm.set(true); }) - .setMergePolicy(newLogMergePolicy(10))); + .setMergePolicy(mp)); Document doc = new Document(); doc.add(newStringField("foo", "bar", Field.Store.NO)); @@ -1045,6 +1047,8 @@ public boolean isEnabled(String component) { return true; } }; + LogMergePolicy mp = newLogMergePolicy(10); + mp.setTargetSearchConcurrency(1); IndexWriter w = new IndexWriter( dir, @@ -1053,7 +1057,7 @@ public boolean isEnabled(String component) { .setReaderPooling(true) .setInfoStream(infoStream) .setMergedSegmentWarmer(new SimpleMergedSegmentWarmer(infoStream)) - .setMergePolicy(newLogMergePolicy(10))); + .setMergePolicy(mp)); Document doc = new Document(); doc.add(newStringField("foo", "bar", Field.Store.NO)); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java index 0b00a20a17fb..ea60f9b1e090 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestLogMergePolicy.java @@ -249,8 +249,10 @@ public void testFullFlushMerges() throws IOException { SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); LogMergePolicy mp = mergePolicy(); + // Number of segments guaranteed to trigger a merge. + int numSegmentsForMerging = mp.getMergeFactor() + mp.getTargetSearchConcurrency(); - for (int i = 0; i < mp.getMergeFactor(); ++i) { + for (int i = 0; i < numSegmentsForMerging; ++i) { segmentInfos.add( makeSegmentCommitInfo( "_" + segNameGenerator.getAndIncrement(), @@ -266,6 +268,6 @@ public void testFullFlushMerges() throws IOException { segmentInfos = applyMerge(segmentInfos, merge, "_" + segNameGenerator.getAndIncrement(), stats); } - assertEquals(1, segmentInfos.size()); + assertTrue(segmentInfos.size() < numSegmentsForMerging); } } diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java index c649fd18fa59..d17f4d441c44 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java @@ -1053,6 +1053,7 @@ public static AlcoholicMergePolicy newAlcoholicMergePolicy(Random r, TimeZone tz public static LogMergePolicy newLogMergePolicy(Random r) { LogMergePolicy logmp = r.nextBoolean() ? new LogDocMergePolicy() : new LogByteSizeMergePolicy(); logmp.setCalibrateSizeByDeletes(r.nextBoolean()); + logmp.setTargetSearchConcurrency(TestUtil.nextInt(random(), 1, 16)); if (rarely(r)) { logmp.setMergeFactor(TestUtil.nextInt(r, 2, 9)); } else { @@ -1106,14 +1107,14 @@ public static MergePolicy newLogMergePolicy(boolean useCFS) { return logmp; } - public static MergePolicy newLogMergePolicy(boolean useCFS, int mergeFactor) { + public static LogMergePolicy newLogMergePolicy(boolean useCFS, int mergeFactor) { LogMergePolicy logmp = newLogMergePolicy(); logmp.setNoCFSRatio(useCFS ? 1.0 : 0.0); logmp.setMergeFactor(mergeFactor); return logmp; } - public static MergePolicy newLogMergePolicy(int mergeFactor) { + public static LogMergePolicy newLogMergePolicy(int mergeFactor) { LogMergePolicy logmp = newLogMergePolicy(); logmp.setMergeFactor(mergeFactor); return logmp; From f808b29a7a84b6224cddd56c6f73ee35f65929c9 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 18 Jul 2024 10:56:01 +0200 Subject: [PATCH 2/3] CHANGES --- lucene/CHANGES.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index f361fdcf0028..9c68ad206f98 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -260,6 +260,10 @@ New Features merge policy to try to have at least this number of segments on the highest tier. (Adrien Grand, Carlos Delgado) +* GITHUB#13517: Allow configuring the search concurrency on LogDocMergePolicy + and LogByteSizeMergePolicy via a new #setTargetConcurrency setter. + (Adrien Grand) + Improvements --------------------- From 41da8ce2ecc432c57062dd423cc456f8435429df Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 18 Jul 2024 10:59:46 +0200 Subject: [PATCH 3/3] Don't apply a threshold on the doc count for merges under the min merge size --- .../core/src/java/org/apache/lucene/index/LogMergePolicy.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java index a86809a1fb15..f2113d0d13b6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java @@ -623,7 +623,9 @@ public MergeSpecification findMerges( } long segmentSize = size(info, mergeContext); long segmentDocs = sizeDocs(info, mergeContext); - if (mergeSize + segmentSize > maxMergeSize || mergeDocs + segmentDocs > maxMergeDocs) { + if (mergeSize + segmentSize > maxMergeSize + || (mergeSize + segmentSize > minMergeSize + && mergeDocs + segmentDocs > maxMergeDocs)) { // This merge is full, stop adding more segments to it if (i == start) { // This segment alone is too large, return a singleton merge