Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a targetSearchConcurrency parameter to LogMergePolicy. #13517

Merged
merged 4 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public abstract class LogMergePolicy extends MergePolicy {
/** If true, we pro-rate a segment's size by the percentage of non-deleted documents. */
protected boolean calibrateSizeByDeletes = true;

/**
* Target search concurrency. This merge policy will avoid creating segments that have more than
* {@code maxDoc / targetSearchConcurrency} documents.
*/
protected int targetSearchConcurrency = 1;

/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
public LogMergePolicy() {
super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE);
Expand Down Expand Up @@ -131,6 +137,28 @@ public boolean getCalibrateSizeByDeletes() {
return calibrateSizeByDeletes;
}

/**
* Sets the target search concurrency. This prevents creating segments that are bigger than
* maxDoc/targetSearchConcurrency, which in turn makes the work parallelizable into
* targetSearchConcurrency slices of similar doc counts.
*
* <p><b>NOTE:</b> Configuring a value greater than 1 will increase the number of segments in the
* index linearly with the value of {@code targetSearchConcurrency} and also increase write
* amplification.
*/
public void setTargetSearchConcurrency(int targetSearchConcurrency) {
if (targetSearchConcurrency < 1) {
throw new IllegalArgumentException(
"targetSearchConcurrency must be >= 1 (got " + targetSearchConcurrency + ")");
}
this.targetSearchConcurrency = targetSearchConcurrency;
}

/** Returns the target search concurrency. */
public int getTargetSearchConcurrency() {
return targetSearchConcurrency;
}

/**
* Return the number of documents in the provided {@link SegmentCommitInfo}, pro-rated by
* percentage of non-deleted documents if {@link #setCalibrateSizeByDeletes} is set.
Expand Down Expand Up @@ -484,8 +512,10 @@ public MergeSpecification findMerges(

final Set<SegmentCommitInfo> mergingSegments = mergeContext.getMergingSegments();

int totalDocCount = 0;
for (int i = 0; i < numSegments; i++) {
final SegmentCommitInfo info = infos.info(i);
totalDocCount += sizeDocs(info, mergeContext);
long size = size(info, mergeContext);

// Floor tiny segments
Expand Down Expand Up @@ -575,6 +605,9 @@ public MergeSpecification findMerges(
mergeContext);
}

final int maxMergeDocs =
Math.min(this.maxMergeDocs, Math.ceilDiv(totalDocCount, targetSearchConcurrency));

// Finally, record all merges that are viable at this level:
int end = start + mergeFactor;
while (end <= 1 + upto) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testSubclassConcurrentMergeScheduler() throws IOException {
logMP.setMergeFactor(10);

try {
for (int i = 0; i < 20; i++) {
for (int i = 0; i < 60; i++) {
writer.addDocument(doc);
}
} catch (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ public boolean isEnabled(String component) {
iwc.setMaxBufferedDocs(2);
LogMergePolicy lmp = newLogMergePolicy();
lmp.setMergeFactor(2);
lmp.setTargetSearchConcurrency(1);
iwc.setMergePolicy(lmp);

IndexWriter w = new IndexWriter(dir, iwc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ public void testExceptionOnMergeInit() throws IOException {
cms.setSuppressExceptions();
conf.setMergeScheduler(cms);
((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2);
((LogMergePolicy) conf.getMergePolicy()).setTargetSearchConcurrency(1);
TestPoint3 testPoint = new TestPoint3();
IndexWriter w = RandomIndexWriter.mockIndexWriter(random(), dir, conf, testPoint);
testPoint.doFail = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,8 +900,10 @@ public MergeSpecification findFullFlushMerges(
}

public void testSetDiagnostics() throws IOException {
LogMergePolicy logMp = newLogMergePolicy(4);
logMp.setTargetSearchConcurrency(1);
MergePolicy myMergePolicy =
new FilterMergePolicy(newLogMergePolicy(4)) {
new FilterMergePolicy(logMp) {
@Override
public MergeSpecification findMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,8 @@ public void testEmptyIndex() throws Exception {
public void testSegmentWarmer() throws Exception {
Directory dir = newDirectory();
final AtomicBoolean didWarm = new AtomicBoolean();
LogMergePolicy mp = newLogMergePolicy(10);
mp.setTargetSearchConcurrency(1);
IndexWriter w =
new IndexWriter(
dir,
Expand All @@ -1012,7 +1014,7 @@ public void testSegmentWarmer() throws Exception {
assertEquals(20, count);
didWarm.set(true);
})
.setMergePolicy(newLogMergePolicy(10)));
.setMergePolicy(mp));

Document doc = new Document();
doc.add(newStringField("foo", "bar", Field.Store.NO));
Expand Down Expand Up @@ -1045,6 +1047,8 @@ public boolean isEnabled(String component) {
return true;
}
};
LogMergePolicy mp = newLogMergePolicy(10);
mp.setTargetSearchConcurrency(1);
IndexWriter w =
new IndexWriter(
dir,
Expand All @@ -1053,7 +1057,7 @@ public boolean isEnabled(String component) {
.setReaderPooling(true)
.setInfoStream(infoStream)
.setMergedSegmentWarmer(new SimpleMergedSegmentWarmer(infoStream))
.setMergePolicy(newLogMergePolicy(10)));
.setMergePolicy(mp));

Document doc = new Document();
doc.add(newStringField("foo", "bar", Field.Store.NO));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,10 @@ public void testFullFlushMerges() throws IOException {
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);

LogMergePolicy mp = mergePolicy();
// Number of segments guaranteed to trigger a merge.
int numSegmentsForMerging = mp.getMergeFactor() + mp.getTargetSearchConcurrency();

for (int i = 0; i < mp.getMergeFactor(); ++i) {
for (int i = 0; i < numSegmentsForMerging; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(),
Expand All @@ -266,6 +268,6 @@ public void testFullFlushMerges() throws IOException {
segmentInfos =
applyMerge(segmentInfos, merge, "_" + segNameGenerator.getAndIncrement(), stats);
}
assertEquals(1, segmentInfos.size());
assertTrue(segmentInfos.size() < numSegmentsForMerging);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,7 @@ public static AlcoholicMergePolicy newAlcoholicMergePolicy(Random r, TimeZone tz
public static LogMergePolicy newLogMergePolicy(Random r) {
LogMergePolicy logmp = r.nextBoolean() ? new LogDocMergePolicy() : new LogByteSizeMergePolicy();
logmp.setCalibrateSizeByDeletes(r.nextBoolean());
logmp.setTargetSearchConcurrency(TestUtil.nextInt(random(), 1, 16));
if (rarely(r)) {
logmp.setMergeFactor(TestUtil.nextInt(r, 2, 9));
} else {
Expand Down Expand Up @@ -1106,14 +1107,14 @@ public static MergePolicy newLogMergePolicy(boolean useCFS) {
return logmp;
}

public static MergePolicy newLogMergePolicy(boolean useCFS, int mergeFactor) {
public static LogMergePolicy newLogMergePolicy(boolean useCFS, int mergeFactor) {
LogMergePolicy logmp = newLogMergePolicy();
logmp.setNoCFSRatio(useCFS ? 1.0 : 0.0);
logmp.setMergeFactor(mergeFactor);
return logmp;
}

public static MergePolicy newLogMergePolicy(int mergeFactor) {
public static LogMergePolicy newLogMergePolicy(int mergeFactor) {
LogMergePolicy logmp = newLogMergePolicy();
logmp.setMergeFactor(mergeFactor);
return logmp;
Expand Down