Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add target search concurrency to TieredMergePolicy #13430

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ab0a13a
Add targetSearchConcurrency
carlosdelest May 27, 2024
ecf2035
Some renaming and fixes
carlosdelest May 27, 2024
6bf94d2
Some renaming and fixes
carlosdelest May 27, 2024
36ad550
minNumSegments can be overriden by force merge
carlosdelest May 27, 2024
585c927
Add minNumSegments to tests
carlosdelest May 27, 2024
c922f47
Merge remote-tracking branch 'origin/main'
carlosdelest May 27, 2024
f74616c
Add forceMerge semantics
carlosdelest May 28, 2024
31e132f
WIP - test
carlosdelest May 28, 2024
2d6173d
Take forced merges into account
carlosdelest May 28, 2024
22436c0
Minor fixes, adding tests
carlosdelest May 28, 2024
7c4397e
Rename minNumSegments -> targetSearchConcurrency
carlosdelest May 29, 2024
289d07d
Implement target search concurrency as number of segments for the las…
carlosdelest May 29, 2024
d4d89f1
PR feedback
carlosdelest May 30, 2024
bfdb58b
Removed some changes for not making doc merges so aggressive compared…
carlosdelest May 31, 2024
f240644
Focus testing on append only merges for target search concurrency
carlosdelest May 31, 2024
500fe32
Removing some useless changes from main
carlosdelest May 31, 2024
1b91298
Better handling of num docs overflow
carlosdelest Jun 4, 2024
299ac18
Simplify testing
carlosdelest Jun 4, 2024
e058a9f
Refactor singleton merges
carlosdelest Jun 4, 2024
9ab6fcd
Merge branch 'main' into carlosdelest/tiered-merge-policy-min-num-seg…
jpountz Jun 24, 2024
d44de4c
Tune merging logic for targetSearchConcurrency.
jpountz Jun 24, 2024
66d62e9
Add CHANGES entry.
jpountz Jun 24, 2024
ca4c19c
Add proper credit to Adrien on this :)
carlosdelest Jul 2, 2024
2c18e5f
Small refactoring for getMaxAllowedDocs
carlosdelest Jul 2, 2024
1b9b60b
Add target search concurrency checks in testSimulateUpdates()
carlosdelest Jul 2, 2024
3db835a
Minor improvement to corner case.
jpountz Jul 16, 2024
2d80315
Add target search concurrency to all tests randomly
carlosdelest Jul 16, 2024
8de0212
Remove check for target search concurrency, which already takes place…
carlosdelest Jul 16, 2024
898e2e8
Change target search concurrency for rare test occurrences
carlosdelest Jul 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 57 additions & 14 deletions lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class TieredMergePolicy extends MergePolicy {
private double segsPerTier = 10.0;
private double forceMergeDeletesPctAllowed = 10.0;
private double deletesPctAllowed = 20.0;
private int minNumSegments = 1;

/** Sole constructor, setting all settings to their defaults. */
public TieredMergePolicy() {
Expand Down Expand Up @@ -257,6 +258,25 @@ public double getSegmentsPerTier() {
return segsPerTier;
}

/**
* Sets the minimum number of segments to merge to. This allows merging to ensure that there are
* at least minNumSegments segments after a natural merge. This setting can be overriden by force
* merging to a specified number. Default is 1.
*/
public TieredMergePolicy setMinNumSegments(int minNumSegments) {
if (minNumSegments < 1) {
throw new IllegalArgumentException(
"minNumSegments must be >= 1 (got " + minNumSegments + ")");
}
this.minNumSegments = minNumSegments;
return this;
}

/** Returns the current minimum number of segments. */
public int getMinNumSegments() {
return minNumSegments;
}

private static class SegmentSizeAndDocs {
private final SegmentCommitInfo segInfo;
/// Size of the segment in bytes, pro-rated by the number of live documents.
Expand Down Expand Up @@ -326,6 +346,7 @@ public MergeSpecification findMerges(
int totalMaxDoc = 0;

long mergingBytes = 0;
long mergingDocs = 0;

List<SegmentSizeAndDocs> sortedInfos = getSortedBySegmentSize(infos, mergeContext);
Iterator<SegmentSizeAndDocs> iter = sortedInfos.iterator();
Expand Down Expand Up @@ -353,7 +374,9 @@ public MergeSpecification findMerges(
iter.remove();
// if this segment is merging, then its deletes are being reclaimed already.
// only count live docs in the total max doc
totalMaxDoc += segSizeDocs.maxDoc - segSizeDocs.delCount;
int mergeDocs = segSizeDocs.maxDoc - segSizeDocs.delCount;
mergingDocs += mergeDocs;
totalMaxDoc += mergeDocs;
} else {
totalDelDocs += segSizeDocs.delCount;
totalMaxDoc += segSizeDocs.maxDoc;
Expand Down Expand Up @@ -422,15 +445,17 @@ public MergeSpecification findMerges(
+ tooBigCount,
mergeContext);
}
int allowedDocCount = totalMaxDoc / minNumSegments;
return doFindMerges(
sortedInfos,
maxMergedSegmentBytes,
mergeFactor,
(int) allowedSegCount,
allowedDelCount,
allowedDocCount,
MERGE_TYPE.NATURAL,
mergeContext,
mergingBytes >= maxMergedSegmentBytes);
mergingBytes >= maxMergedSegmentBytes || mergingDocs >= allowedDocCount);
}

private MergeSpecification doFindMerges(
Expand All @@ -439,6 +464,7 @@ private MergeSpecification doFindMerges(
final int mergeFactor,
final int allowedSegCount,
final int allowedDelCount,
final int allowedDocCount,
final MERGE_TYPE mergeType,
MergeContext mergeContext,
boolean maxMergeIsRunning)
Expand Down Expand Up @@ -513,6 +539,7 @@ private MergeSpecification doFindMerges(
MergeScore bestScore = null;
List<SegmentCommitInfo> best = null;
boolean bestTooLarge = false;
boolean bestMaxDocs = false;
long bestMergeBytes = 0;

for (int startIdx = 0; startIdx < sortedEligible.size(); startIdx++) {
Expand All @@ -521,22 +548,28 @@ private MergeSpecification doFindMerges(

final List<SegmentCommitInfo> candidate = new ArrayList<>();
boolean hitTooLarge = false;
boolean hitMaxDocs = false;
long bytesThisMerge = 0;
long docCountThisMerge = 0;
for (int idx = startIdx;
idx < sortedEligible.size()
&& candidate.size() < mergeFactor
&& bytesThisMerge < maxMergedSegmentBytes;
&& bytesThisMerge < maxMergedSegmentBytes
&& docCountThisMerge < allowedDocCount;
idx++) {
final SegmentSizeAndDocs segSizeDocs = sortedEligible.get(idx);
final long segBytes = segSizeDocs.sizeInBytes;

if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
hitTooLarge = true;
int segDocCount = segSizeDocs.maxDoc - segSizeDocs.delCount;
hitTooLarge = totAfterMergeBytes + segBytes > maxMergedSegmentBytes;
hitMaxDocs = docCountThisMerge + segDocCount > allowedDocCount;
if (hitTooLarge || hitMaxDocs) {
if (candidate.size() == 0) {
// We should never have something coming in that _cannot_ be merged, so handle
// singleton merges
candidate.add(segSizeDocs.segInfo);
bytesThisMerge += segBytes;
docCountThisMerge += segDocCount;
}
// NOTE: we continue, so that we can try
// "packing" smaller segments into this merge
Expand All @@ -549,6 +582,7 @@ private MergeSpecification doFindMerges(
candidate.add(segSizeDocs.segInfo);
bytesThisMerge += segBytes;
totAfterMergeBytes += segBytes;
docCountThisMerge += segDocCount;
}

// We should never see an empty candidate: we iterated over maxMergeAtOnce
Expand Down Expand Up @@ -581,11 +615,14 @@ private MergeSpecification doFindMerges(
// whose length is less than the merge factor, it means we are reaching
// the tail of the list of segments and will only find smaller merges.
// Stop here.
if (bestScore != null && hitTooLarge == false && candidate.size() < mergeFactor) {
if (bestScore != null
&& hitTooLarge == false
&& hitMaxDocs == false
&& candidate.size() < mergeFactor) {
break;
}

final MergeScore score = score(candidate, hitTooLarge, segInfosSizes);
final MergeScore score = score(candidate, hitTooLarge || hitMaxDocs, segInfosSizes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we want to do this. hitTooLarge is considered a good thing by this merge policy. It means that we found a merge that reached the maximum merged segment size. It's great because if we perform this merge, then this segment will not be eligible again for merging (unless it gets many deletes), so we'll essentially be done with it. So these merges are given a very good score by assuming that they have a perfect skew.

hitMaxDocs is different, we don't want to prioritize unbalanced merges that produce segments of hitMaxDocs documents? It's better to keep prioritizing balanced small segment merges as usual?

My expectation for handling allowedMaxDoc is that we would just never score any merge that has more than allowedMaxDoc documents and force the merge policy to select one of the candidate merges that produce fewer documents than that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - I was thinking in doing the same with docs than with bytes, but I see they're quite different. Docs is more a hint to maintain the number of segments we want, but bytes provides a hard limit on size and also score guidance on segments to merge.

if (verbose(mergeContext)) {
message(
" maybe="
Expand All @@ -596,16 +633,19 @@ private MergeSpecification doFindMerges(
+ score.getExplanation()
+ " tooLarge="
+ hitTooLarge
+ " hitMaxDocs="
+ hitMaxDocs
+ " size="
+ String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes / 1024. / 1024.),
mergeContext);
}

if ((bestScore == null || score.getScore() < bestScore.getScore())
&& (!hitTooLarge || !maxMergeIsRunning)) {
&& (!hitTooLarge || !hitMaxDocs || !maxMergeIsRunning)) {
best = candidate;
bestScore = score;
bestTooLarge = hitTooLarge;
bestMaxDocs = hitMaxDocs;
bestMergeBytes = totAfterMergeBytes;
}
}
Expand All @@ -620,9 +660,10 @@ private MergeSpecification doFindMerges(
// we should remove this.
if (haveOneLargeMerge == false
|| bestTooLarge == false
|| bestMaxDocs == false
|| mergeType == MERGE_TYPE.FORCE_MERGE_DELETES) {

haveOneLargeMerge |= bestTooLarge;
haveOneLargeMerge |= bestTooLarge || bestMaxDocs;

if (spec == null) {
spec = new MergeSpecification();
Expand All @@ -640,7 +681,8 @@ private MergeSpecification doFindMerges(
+ String.format(Locale.ROOT, "%.3f", bestScore.getScore())
+ " "
+ bestScore.getExplanation()
+ (bestTooLarge ? " [max merge]" : ""),
+ (bestTooLarge ? " [max merge]" : "")
+ (bestMaxDocs ? " [max docs]" : ""),
mergeContext);
}
}
Expand All @@ -653,7 +695,7 @@ private MergeSpecification doFindMerges(
/** Expert: scores one merge; subclasses can override. */
protected MergeScore score(
List<SegmentCommitInfo> candidate,
boolean hitTooLarge,
boolean tooLargeSegment,
Map<SegmentCommitInfo, SegmentSizeAndDocs> segmentsSizes)
throws IOException {
long totBeforeMergeBytes = 0;
Expand All @@ -673,7 +715,7 @@ protected MergeScore score(
// lopsided merges (skew near 1.0) is no good; it means
// O(N^2) merge cost over time:
final double skew;
if (hitTooLarge) {
if (tooLargeSegment) {
// Pretend the merge has perfect skew; skew doesn't
// matter in this case because this merge will not
// "cascade" and so it cannot lead to N^2 merge cost
Expand Down Expand Up @@ -736,7 +778,6 @@ public MergeSpecification findForcedMerges(
+ segmentsToMerge,
mergeContext);
}

List<SegmentSizeAndDocs> sortedSizeAndDocs = getSortedBySegmentSize(infos, mergeContext);

long totalMergeBytes = 0;
Expand Down Expand Up @@ -950,6 +991,7 @@ public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, MergeConte
Integer.MAX_VALUE,
Integer.MAX_VALUE,
0,
Integer.MAX_VALUE,
MERGE_TYPE.FORCE_MERGE_DELETES,
mergeContext,
false);
Expand All @@ -969,7 +1011,8 @@ public String toString() {
sb.append("segmentsPerTier=").append(segsPerTier).append(", ");
sb.append("maxCFSSegmentSizeMB=").append(getMaxCFSSegmentSizeMB()).append(", ");
sb.append("noCFSRatio=").append(noCFSRatio).append(", ");
sb.append("deletesPctAllowed=").append(deletesPctAllowed);
sb.append("deletesPctAllowed=").append(deletesPctAllowed).append(", ");
sb.append("targetNumSegments=").append(minNumSegments);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
+ tmp.getDeletesPctAllowed(),
delPercentage <= tmp.getDeletesPctAllowed());

assertTrue(
"Number of segments "
+ infos.size()
+ " is lower than the minimum: "
+ tmp.getMinNumSegments(),
infos.size() >= tmp.getMinNumSegments());

long levelSizeBytes = Math.max(minSegmentBytes, (long) (tmp.getFloorSegmentMB() * 1024 * 1024));
long bytesLeft = totalBytes;
double allowedSegCount = 0;
Expand Down Expand Up @@ -115,7 +122,7 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
assertTrue(
String.format(
Locale.ROOT,
"mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g",
"mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g targetNumSegments=%d",
mergeFactor,
minSegmentBytes,
maxMergedSegmentBytes,
Expand All @@ -125,7 +132,8 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
allowedSegCount,
totalBytes,
delPercentage,
tmp.getDeletesPctAllowed()),
tmp.getDeletesPctAllowed(),
tmp.getMinNumSegments()),
numSegments <= allowedSegCount || hasBalancedMerges == false);
}

Expand All @@ -136,6 +144,7 @@ protected void assertMerge(MergePolicy policy, MergeSpecification merges) {
for (OneMerge merge : merges.merges) {
assertTrue(merge.segments.size() <= mergeFactor);
}
assertTrue(merges.merges.size() >= tmp.getMinNumSegments());
}

public void testForceMergeDeletes() throws Exception {
Expand Down Expand Up @@ -207,7 +216,9 @@ public void testPartialMerge() throws Exception {
w.flush(true, true);

int segmentCount = w.getSegmentCount();
int targetCount = TestUtil.nextInt(random(), 1, segmentCount);
tmp.setMinNumSegments(TestUtil.nextInt(random(), 1, segmentCount));
int targetCount = TestUtil.nextInt(random(), tmp.getMinNumSegments(), segmentCount);

if (VERBOSE) {
System.out.println(
"TEST: merge to " + targetCount + " segs (current count=" + segmentCount + ")");
Expand Down Expand Up @@ -238,6 +249,10 @@ public void testPartialMerge() throws Exception {
}
}

assertTrue(
"There should be at least " + tmp.getMinNumSegments() + " segments",
w.getSegmentCount() >= tmp.getMinNumSegments());

w.close();
dir.close();
}
Expand Down Expand Up @@ -291,8 +306,9 @@ public void testForceMergeDeletesMaxSegSize() throws Exception {
}

// LUCENE-7976 makes findForceMergeDeletes and findForcedDeletes respect max segment size by
// default, so ensure that this works.
public void testForcedMergesRespectSegSize() throws Exception {
// default, so ensure that this works. We also check that the minimum number of segments is
// respected.
public void testForcedMergesRespectSizes() throws Exception {
final Directory dir = newDirectory();
final IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
final TieredMergePolicy tmp = new TieredMergePolicy();
Expand All @@ -305,6 +321,7 @@ public void testForcedMergesRespectSegSize() throws Exception {
(long)
((1024.0 * 1024.0)); // fudge it up, we're trying to catch egregious errors and segbytes
// don't really reflect the number for original merges.
tmp.setMinNumSegments(TestUtil.nextInt(random(), 1, 10));
tmp.setMaxMergedSegmentMB(mbSize);
conf.setMaxBufferedDocs(100);
conf.setMergePolicy(tmp);
Expand All @@ -326,10 +343,12 @@ public void testForcedMergesRespectSegSize() throws Exception {
List<String> segNamesBefore = getSegmentNames(w);
w.forceMergeDeletes();
checkSegmentsInExpectations(w, segNamesBefore, false); // There should have been no merges.
checkMinNumSegmentNotExceeded(w.cloneSegmentInfos(), tmp);

w.forceMerge(Integer.MAX_VALUE);
checkSegmentsInExpectations(w, segNamesBefore, true);
checkSegmentSizeNotExceeded(w.cloneSegmentInfos(), maxSegBytes);
checkMinNumSegmentNotExceeded(w.cloneSegmentInfos(), tmp);

// Delete 12-17% of each segment and expungeDeletes. This should result in:
// > the same number of segments as before.
Expand All @@ -339,6 +358,7 @@ public void testForcedMergesRespectSegSize() throws Exception {
w.forceMergeDeletes();
w.commit();
checkSegmentSizeNotExceeded(w.cloneSegmentInfos(), maxSegBytes);
checkMinNumSegmentNotExceeded(w.cloneSegmentInfos(), tmp);
assertFalse("There should be no deleted docs in the index.", w.hasDeletions());

// Check that deleting _fewer_ than 10% doesn't merge inappropriately. Nothing should be merged
Expand All @@ -349,6 +369,7 @@ public void testForcedMergesRespectSegSize() throws Exception {
w.forceMergeDeletes();
remainingDocs -= deletedThisPass;
checkSegmentsInExpectations(w, segNamesBefore, false); // There should have been no merges
checkMinNumSegmentNotExceeded(w.cloneSegmentInfos(), tmp);
assertEquals(
"NumDocs should reflect removed documents ", remainingDocs, w.getDocStats().numDocs);
assertTrue(
Expand All @@ -359,6 +380,12 @@ public void testForcedMergesRespectSegSize() throws Exception {
// Will change for LUCENE-8236
w.forceMerge(Integer.MAX_VALUE);
checkSegmentSizeNotExceeded(w.cloneSegmentInfos(), maxSegBytes);
checkMinNumSegmentNotExceeded(w.cloneSegmentInfos(), tmp);

// forceMerge to minimum number of segments, should respect max segment size
w.forceMerge(tmp.getMinNumSegments());
checkSegmentSizeNotExceeded(w.cloneSegmentInfos(), maxSegBytes);
checkMinNumSegmentNotExceeded(w.cloneSegmentInfos(), tmp);

// Now forceMerge down to one segment, there should be exactly remainingDocs in exactly one
// segment.
Expand Down Expand Up @@ -735,6 +762,15 @@ private void checkSegmentSizeNotExceeded(SegmentInfos infos, long maxSegBytes)
}
}

private static void checkMinNumSegmentNotExceeded(SegmentInfos infos, TieredMergePolicy tmp) {
assertTrue(
"There should be at least "
+ tmp.getMinNumSegments()
+ " segments, there are "
+ infos.size(),
infos.size() >= tmp.getMinNumSegments());
}

private static final double EPSILON = 1E-14;

public void testSetters() {
Expand Down