Skip to content

IGNITE-24548 Enable speed-based throttling #5474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
51cfa5d
IGNITE-24548 Enable PDS throttling
ibessonov Mar 14, 2025
f907116
IGNITE-24548 switch
ibessonov Mar 14, 2025
3388ee7
IGNITE-24548 Enable random-access file IO by default, as it should be!
ibessonov Mar 14, 2025
7470185
IGNITE-24548 NPE in metrics
ibessonov Mar 17, 2025
d7843ea
IGNITE-24548 Adequate metrics rendering
ibessonov Mar 17, 2025
916d3fb
Merge remote-tracking branch 'apache-github/main' into ignite-24548
ibessonov Mar 17, 2025
8ff88d0
IGNITE-24548 <logs>
ibessonov Mar 17, 2025
dfa4a79
IGNITE-24548 <compilation error>
ibessonov Mar 17, 2025
8db6798
IGNITE-24548 <logs>
ibessonov Mar 17, 2025
a86f3ed
IGNITE-24548 <logs>
ibessonov Mar 17, 2025
bff44ab
IGNITE-24548 <logs>
ibessonov Mar 18, 2025
995840b
IGNITE-24548 <less logs>
ibessonov Mar 18, 2025
4c6bcdc
IGNITE-24548 Maybe a fix
ibessonov Mar 18, 2025
4786364
IGNITE-24548 logs, logs
ibessonov Mar 18, 2025
66f7d36
IGNITE-24548 tuuuune
ibessonov Mar 18, 2025
6b318ba
IGNITE-24548 <logs>
ibessonov Mar 19, 2025
f69bd4e
IGNITE-24548 <prototyping new idea>
ibessonov Mar 19, 2025
ef22409
IGNITE-24548 slightly changed approach
ibessonov Mar 19, 2025
ba9b169
IGNITE-24548 fix
ibessonov Mar 19, 2025
10905f4
IGNITE-24548 fixes
ibessonov Mar 19, 2025
8b1daed
IGNITE-24548 ???
ibessonov Mar 19, 2025
55f7fba
IGNITE-24548 <todo>
ibessonov Mar 20, 2025
8e25b77
IGNITE-24548 <refactoring>
ibessonov Mar 20, 2025
c5dc72a
IGNITE-24548 <refactoring>
ibessonov Mar 20, 2025
775ca9f
IGNITE-24548 <optimization>
ibessonov Mar 20, 2025
925ca12
IGNITE-24548 current checkpoint progress
ibessonov Mar 20, 2025
8259b8b
Merge remote-tracking branch 'apache-github/main' into ignite-24548
ibessonov Mar 21, 2025
93b5a96
IGNITE-24548 <compilation>
ibessonov Mar 21, 2025
c1be7fc
IGNITE-24548 <metric rollback>
ibessonov Mar 21, 2025
bd5ec6a
IGNITE-24548 <async IO>
ibessonov Mar 21, 2025
3a5bb10
IGNITE-24548 default throttler changed + a few other changes
ibessonov Mar 24, 2025
30890b3
IGNITE-24548 Reduce backoff ratio
ibessonov Mar 25, 2025
6ad5851
IGNITE-24548 <even lower backoff ratio>
ibessonov Mar 25, 2025
c11b099
IGNITE-24548 <speed based throttling once again>
ibessonov Mar 26, 2025
c223b68
Merge remote-tracking branch 'apache-gitbox/main' into ignite-24548
ibessonov Mar 26, 2025
5b72b39
IGNITE-24548 <system cfg>
ibessonov Mar 26, 2025
74466a4
IGNITE-24548 <cleanup>
ibessonov Mar 26, 2025
7df1447
IGNITE-24548 <cleanup 2>
ibessonov Mar 26, 2025
2fced2e
IGNITE-24548 <tests>
ibessonov Mar 27, 2025
da0c952
IGNITE-24548 <cleanup>
ibessonov Mar 27, 2025
c22e1c7
IGNITE-24548 <todo>
ibessonov Mar 27, 2025
2eb73d2
IGNITE-24548 <cleanup>
ibessonov Mar 27, 2025
0130c50
IGNITE-24548 <cleanup>
ibessonov Mar 27, 2025
57aed9f
Merge remote-tracking branch 'apache-github/main' into ignite-24548
ibessonov Mar 28, 2025
285948c
IGNITE-24548 <cleanup>
ibessonov Mar 28, 2025
2226420
IGNITE-24548 <review>
ibessonov Mar 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,7 @@ private void initMetrics() {
*
* @param writeThrottle Page write throttling instance.
*/
// TODO IGNITE-24548 Make a proper implementation.
@TestOnly
// TODO IGNITE-24933 Remove this method.
public void initThrottling(PagesWriteThrottlePolicy writeThrottle) {
this.writeThrottle = writeThrottle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ public CheckpointProgress scheduleCheckpoint(long delayMillis, String reason) {
return checkpointer.currentCheckpointProgress();
}

public @Nullable CheckpointProgress currentCheckpointProgressForThrottling() {
return checkpointer.currentCheckpointProgressForThrottling();
}

/**
* Returns the progress of the last checkpoint, or the current checkpoint if in progress, {@code null} if no checkpoint has occurred.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ public class Checkpointer extends IgniteWorker {
/** Current checkpoint progress. This field is updated only by checkpoint thread. */
private volatile @Nullable CheckpointProgressImpl currentCheckpointProgress;

/**
* Checkpoint progress instance with a more limited range of visibility. It is initialized when checkpoint write lick is acquired, and
* nullified when checkpoint finishes (unlike {@link #currentCheckpointProgress} that is updated before we started notifying checkpoint
* listeners and is never nullified).
*/
private volatile @Nullable CheckpointProgressImpl currentCheckpointProgressForThrottling;

/** Checkpoint progress after releasing write lock. */
private volatile @Nullable CheckpointProgressImpl afterReleaseWriteLockCheckpointProgress;

Expand Down Expand Up @@ -418,6 +425,8 @@ void doCheckpoint() throws IgniteInternalCheckedException {
failureManager.process(new FailureContext(CRITICAL_ERROR, e));

throw e;
} finally {
currentCheckpointProgressForThrottling = null;
}
}

Expand Down Expand Up @@ -684,6 +693,10 @@ void startCheckpointProgress() {
scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));

currentCheckpointProgress = curr;

curr.futureFor(LOCK_TAKEN).thenRun(() -> {
currentCheckpointProgressForThrottling = curr;
});
}
}

Expand Down Expand Up @@ -779,6 +792,10 @@ public void shutdownCheckpointer(boolean shutdown) {
return currentCheckpointProgress;
}

public @Nullable CheckpointProgress currentCheckpointProgressForThrottling() {
return currentCheckpointProgressForThrottling;
}

/**
* Returns the progress of the last checkpoint, or the current checkpoint if in progress, {@code null} if no checkpoint has occurred.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
*/
static final long NO_THROTTLING_MARKER = Long.MIN_VALUE;

private final long logThresholdNanos;

private final PersistentPageMemory pageMemory;

private final Supplier<CheckpointProgress> cpProgress;
Expand Down Expand Up @@ -95,17 +97,20 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
/**
* Constructor.
*
* @param logThresholdNanos Minimal throttling duration required for printing a warning message to the log.
* @param pageMemory Page memory.
* @param cpProgress Database manager.
* @param stateChecker Checkpoint lock state provider.
* @param metricSource Metric source.
*/
public PagesWriteSpeedBasedThrottle(
long logThresholdNanos,
PersistentPageMemory pageMemory,
Supplier<CheckpointProgress> cpProgress,
CheckpointLockStateChecker stateChecker,
PersistentPageMemoryMetricSource metricSource
) {
this.logThresholdNanos = logThresholdNanos;
this.pageMemory = pageMemory;
this.cpProgress = cpProgress;
cpLockStateChecker = stateChecker;
Expand Down Expand Up @@ -187,7 +192,7 @@ private void initMetrics(PersistentPageMemoryMetricSource metricSource) {
return;
}

if (throttleParkTimeNs > LOGGING_THRESHOLD) {
if (throttleParkTimeNs > logThresholdNanos) {
LOG.warn("Parking thread={} for timeout(ms)={}", Thread.currentThread().getName(), throttleParkTimeNs / 1_000_000);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@
* </ul>
*/
public interface PagesWriteThrottlePolicy {
// TODO Maybe make it configurable in IGNITE-24548
/** Min park time which triggers logging. */
long LOGGING_THRESHOLD = TimeUnit.SECONDS.toNanos(1);
long DEFAULT_LOGGING_THRESHOLD = TimeUnit.SECONDS.toNanos(1);

/** Checkpoint buffer fullfill upper bound. */
float CP_BUF_FILL_THRESHOLD = 2f / 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,19 @@ private long speedBasedParkTime(int cpWrittenPages, long donePages, int cpTotalP
detectCpPagesWriteStart(cpWrittenPages, dirtyPagesRatio);

if (dirtyPagesRatio >= MAX_DIRTY_PAGES) {
return 0; // too late to throttle, will wait on safe to update instead.
return 0; // Too late to throttle, will wait on safe to update instead.
} else {
return getParkTime(dirtyPagesRatio,
donePages,
notEvictedPagesTotal(cpTotalPages),
// TODO IGNITE-24937 Should be a "notEvictedPagesTotal(cpTotalPages)" call.
cpTotalPages,
threadIds.size(),
instantaneousMarkDirtySpeed,
avgCpWriteSpeed);
}
}

// TODO IGNITE-24937 Leads to negative estimations in some cases. Should be fixed.
private int notEvictedPagesTotal(int cpTotalPages) {
return Math.max(cpTotalPages - cpEvictedPages(), 0);
}
Expand Down Expand Up @@ -375,7 +377,13 @@ long getCpWriteSpeed() {
* Returns counter for fsynced checkpoint pages.
*/
int cpSyncedPages() {
AtomicInteger syncedPagesCounter = cpProgress.get().syncedPagesCounter();
CheckpointProgress progress = cpProgress.get();

if (progress == null) {
return 0;
}

AtomicInteger syncedPagesCounter = progress.syncedPagesCounter();

// Null-check simplifies testing, we don't have to mock this counter.
return syncedPagesCounter == null ? 0 : syncedPagesCounter.get();
Expand All @@ -385,14 +393,26 @@ int cpSyncedPages() {
* Return a number of pages in current checkpoint.
*/
int cpTotalPages() {
return cpProgress.get().currentCheckpointPagesCount();
CheckpointProgress progress = cpProgress.get();

if (progress == null) {
return 0;
}

return progress.currentCheckpointPagesCount();
}

/**
* Returns a number of evicted pages.
*/
int cpEvictedPages() {
AtomicInteger evictedPagesCounter = cpProgress.get().evictedPagesCounter();
CheckpointProgress progress = cpProgress.get();

if (progress == null) {
return 0;
}

AtomicInteger evictedPagesCounter = progress.evictedPagesCounter();

// Null-check simplifies testing, we don't have to mock this counter.
return evictedPagesCounter == null ? 0 : evictedPagesCounter.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class TargetRatioPagesWriteThrottle implements PagesWriteThrottlePolicy {
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(TargetRatioPagesWriteThrottle.class);

private final long logThresholdNanos;

/** Page memory. */
private final PersistentPageMemory pageMemory;

Expand Down Expand Up @@ -68,17 +70,20 @@ public class TargetRatioPagesWriteThrottle implements PagesWriteThrottlePolicy {
/**
* Constructor.
*
* @param logThresholdNanos Minimal throttling duration required for printing a warning message to the log.
* @param pageMemory Page memory.
* @param cpProgress Database manager.
* @param stateChecker checkpoint lock state checker.
* @param metricSource Metric source.
*/
public TargetRatioPagesWriteThrottle(
long logThresholdNanos,
PersistentPageMemory pageMemory,
Supplier<CheckpointProgress> cpProgress,
CheckpointLockStateChecker stateChecker,
PersistentPageMemoryMetricSource metricSource
) {
this.logThresholdNanos = logThresholdNanos;
this.pageMemory = pageMemory;
this.cpProgress = cpProgress;
this.stateChecker = stateChecker;
Expand Down Expand Up @@ -132,7 +137,7 @@ public TargetRatioPagesWriteThrottle(

Thread curThread = Thread.currentThread();

if (throttleParkTimeNs > LOGGING_THRESHOLD) {
if (throttleParkTimeNs > logThresholdNanos) {
LOG.warn("Parking thread=" + curThread.getName()
+ " for timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
}
Expand All @@ -147,7 +152,7 @@ public TargetRatioPagesWriteThrottle(
} finally {
cpBufThrottledThreads.remove(curThread.getId());

if (throttleParkTimeNs > LOGGING_THRESHOLD) {
if (throttleParkTimeNs > logThresholdNanos) {
LOG.warn("Unparking thread=" + curThread.getName()
+ " with park timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.pagememory.persistence.throttling;

/**
* Available throttling types.
*/
public enum ThrottlingType {
/** Corresponds to no throttling. */
DISABLED,

/** Corresponds to {@link TargetRatioPagesWriteThrottle}. */
TARGET_RATIO,

/** Corresponds to {@link PagesWriteSpeedBasedThrottle}. */
SPEED_BASED
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.pagememory.persistence.throttling;

import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
import static org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteThrottlePolicy.DEFAULT_LOGGING_THRESHOLD;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -372,13 +373,15 @@ void hugeLoadDoesNotBreakCheckpointReadLock(boolean speedBasedThrottling) {
PagesWriteThrottlePolicy writeThrottle;
if (speedBasedThrottling) {
writeThrottle = new PagesWriteSpeedBasedThrottle(
DEFAULT_LOGGING_THRESHOLD,
pageMemory,
checkpointManager::currentCheckpointProgress,
checkpointManager.checkpointTimeoutLock()::checkpointLockIsHeldByThread,
metricSource
);
} else {
writeThrottle = new TargetRatioPagesWriteThrottle(
DEFAULT_LOGGING_THRESHOLD,
pageMemory,
checkpointManager::currentCheckpointProgress,
checkpointManager.checkpointTimeoutLock()::checkpointLockIsHeldByThread,
Expand Down
Loading