Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measure cache size periodically #170

Merged
merged 4 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public NetheriteOrchestrationService(NetheriteOrchestrationServiceSettings setti
$"Configured trace generation limits: general={settings.LogLevelLimit} , transport={settings.TransportLogLevelLimit}, storage={settings.StorageLogLevelLimit}, "
+ $"events={settings.EventLogLevelLimit}; workitems={settings.WorkItemLogLevelLimit}; clients={settings.ClientLogLevelLimit}; loadmonitor={settings.LoadMonitorLogLevelLimit}; etwEnabled={EtwSource.Log.IsEnabled()}; "
+ $"core.IsTraceEnabled={DurableTask.Core.Tracing.DefaultEventSource.Log.IsTraceEnabled}");

if (this.Settings.TestHooks != null)
{
this.Settings.TestHooks.OnError += (string message) => this.TraceHelper.TraceError("TestHook error", message);
}
}
catch (Exception e) when (!Utils.IsFatal(e))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ void TraceError(bool isWarning, string context, string message, Exception except

void Terminate()
{
// we use a dedicated shutdown thread to help debugging and to contain damage if there are hangs
Thread shutdownThread = TrackedThreads.MakeTrackedThread(Shutdown, "PartitionShutdown");

try
{
this.logger?.LogDebug("Part{partition:D2} Started PartitionCancellation", this.partitionId);
Expand All @@ -125,6 +122,8 @@ void Terminate()
this.HandleError("PartitionErrorHandler.Terminate", "Exception in PartitionCancellation", e, false, true);
}

// we use a dedicated shutdown thread to help debugging and to contain damage if there are hangs
Thread shutdownThread = TrackedThreads.MakeTrackedThread(Shutdown, "PartitionShutdown");
shutdownThread.Start();

void Shutdown()
Expand Down
25 changes: 24 additions & 1 deletion src/DurableTask.Netherite/OrchestrationService/TestHooks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DurableTask.Netherite
{
using System;
using System.Text;

/// <summary>
/// Hooks for attaching additional checkers and debuggers during testing.
Expand Down Expand Up @@ -44,7 +45,29 @@ internal void Error(string source, string message)

public override string ToString()
{
return $"TestHooks:{(this.CacheDebugger != null ? " CacheDebugger" : "")}{(this.ReplayChecker != null ? " ReplayChecker" : "")}{(this.FaultInjector != null ? " FaultInjector" : "")}";
StringBuilder sb = new StringBuilder();

sb.Append("TestHooks:");

if (this.CacheDebugger != null)
{
sb.Append(" CacheDebugger");
}
if (this.ReplayChecker != null)
{
sb.Append(" ReplayChecker");

}
if (this.FaultInjector != null)
{
sb.Append(" FaultInjector");
}
if (this.CheckpointInjector != null)
{
sb.Append(" CheckpointInjector");
}

return sb.ToString();
}
}
}
135 changes: 101 additions & 34 deletions src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public override void InitMainSession()
{
this.singletons = new TrackedObject[TrackedObjectKey.NumberSingletonTypes];
this.mainSession = this.CreateASession($"main-{this.RandomSuffix()}", false);
this.cacheTracker.MeasureCacheSize();
this.cacheTracker.MeasureCacheSize(true);
this.CheckInvariants();
}

Expand All @@ -201,7 +201,7 @@ public override void InitMainSession()
this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV");
await this.fht.RecoverAsync(this.partition.Settings.FasterTuningParameters?.NumPagesToPreload ?? 1, true, -1, this.terminationToken);
this.mainSession = this.CreateASession($"main-{this.RandomSuffix()}", false);
this.cacheTracker.MeasureCacheSize();
this.cacheTracker.MeasureCacheSize(true);
this.CheckInvariants();

return (this.blobManager.CheckpointInfo.CommitLogPosition, this.blobManager.CheckpointInfo.InputQueuePosition, this.blobManager.CheckpointInfo.InputQueueFingerprint);
Expand Down Expand Up @@ -305,9 +305,22 @@ void RunTask() {
await await tasktask;
}

public override Task FinalizeCheckpointCompletedAsync(Guid guid)
public override async Task FinalizeCheckpointCompletedAsync(Guid guid)
{
return this.blobManager.FinalizeCheckpointCompletedAsync();
await this.blobManager.FinalizeCheckpointCompletedAsync();

if (this.cacheDebugger == null)
{
// update the cache size tracker after each checkpoint, to compensate for inaccuracies in the tracking
try
{
this.cacheTracker.MeasureCacheSize(false);
}
catch (Exception e)
{
this.TraceHelper.FasterStorageError("Measuring CacheSize", e);
}
}
}

public override Guid? StartIndexCheckpoint()
Expand Down Expand Up @@ -987,14 +1000,30 @@ public void ValidateMemoryTracker()
return; // we only do this when the cache debugger is attached
}

long trackedSizeBefore = this.cacheTracker.TrackedObjectSize;

var inMemoryIterator = this.fht.Log.Scan(this.fht.Log.HeadAddress, this.fht.Log.TailAddress);
long trackedSizeBefore = 0;
long totalSize = 0;
Dictionary<TrackedObjectKey, List<(long delta, long address, string desc)>> perKey = null;

// we now scan the in-memory part of the log and compute the total size, and store, for each key, the list of records found
this.ScanMemorySection(Init, Iteration);

long totalSize = 0;
Dictionary<TrackedObjectKey, List<(long delta, long address, string desc)>> perKey = new Dictionary<TrackedObjectKey, List<(long delta, long address, string desc)>>();
void Init()
{
trackedSizeBefore = this.cacheTracker.TrackedObjectSize;
totalSize = 0;
perKey = new Dictionary<TrackedObjectKey, List<(long delta, long address, string desc)>>();
}

void Iteration(RecordInfo recordInfo, Key key, Value value, long currentAddress)
{
long delta = key.Val.EstimatedSize;
if (!recordInfo.Tombstone)
{
delta += value.EstimatedSize;
}
Add(key, delta, currentAddress, $"{(recordInfo.Invalid ? "I" : "")}{(recordInfo.Tombstone ? "T" : "")}{delta}@{currentAddress.ToString("x")}");
}

void Add(TrackedObjectKey key, long delta, long address, string desc)
{
perKey.TryGetValue(key, out var current);
Expand All @@ -1006,16 +1035,6 @@ void Add(TrackedObjectKey key, long delta, long address, string desc)
totalSize += delta;
}

while (inMemoryIterator.GetNext(out RecordInfo recordInfo, out Key key, out Value value))
{
long delta = key.Val.EstimatedSize;
if (!recordInfo.Tombstone)
{
delta += value.EstimatedSize;
}
Add(key, delta, inMemoryIterator.CurrentAddress, $"{(recordInfo.Invalid ? "I" : "")}{(recordInfo.Tombstone ? "T" : "")}{delta}@{inMemoryIterator.CurrentAddress.ToString("x")}");
}

foreach (var k in this.cacheDebugger.Keys)
{
if (!perKey.ContainsKey(k))
Expand All @@ -1025,18 +1044,15 @@ void Add(TrackedObjectKey key, long delta, long address, string desc)
}

long trackedSizeAfter = this.cacheTracker.TrackedObjectSize;

bool sizeMatches = true;

// now we compare, for each key, the list of entries found in memory with what the cache debugger is tracking

foreach (var kvp in perKey)
{
sizeMatches = sizeMatches && this.cacheDebugger.CheckSize(kvp.Key, kvp.Value, this.Log.HeadAddress);
}

// if the records matched for each key, then the total size should also match

if (sizeMatches && trackedSizeBefore == trackedSizeAfter && trackedSizeBefore != totalSize)
{
this.cacheDebugger.Fail("total size of tracked objects does not match");
Expand All @@ -1045,25 +1061,61 @@ void Add(TrackedObjectKey key, long delta, long address, string desc)

readonly static List<(long delta, long address, string desc)> emptyList = new List<(long delta, long address, string desc)>();

internal (int numPages, long size) ComputeMemorySize(bool resetCacheDebugger)
internal void ScanMemorySection(Action init, Action<RecordInfo, Key, Value, long> iteration, int retries = 3)
{
var headAddress = this.fht.Log.HeadAddress;

try
{
using var inMemoryIterator = this.fht.Log.Scan(headAddress, this.fht.Log.TailAddress);
init();
while (inMemoryIterator.GetNext(out RecordInfo recordInfo, out Key key, out Value value))
{
iteration(recordInfo, key, value, inMemoryIterator.CurrentAddress);
}
}
catch(FASTER.core.FasterException e) when (retries > 0 && e.Message.StartsWith("Iterator address is less than log BeginAddress"))
{
this.ScanMemorySection(init, iteration, retries - 1);
}

if (this.fht.Log.HeadAddress > headAddress)
{
this.ScanMemorySection(init, iteration, retries - 1);
}
}

internal (int numPages, long size, long numRecords) ComputeMemorySize(bool updateCacheDebugger)
{
var cacheDebugger = resetCacheDebugger ? this.cacheDebugger : null;
cacheDebugger?.Reset((string instanceId) => this.partition.PartitionFunction(instanceId) == this.partition.PartitionId);
var inMemoryIterator = this.fht.Log.Scan(this.fht.Log.HeadAddress, this.fht.Log.TailAddress);
long totalSize = 0;
long firstPage = this.fht.Log.HeadAddress >> this.storelogsettings.PageSizeBits;
while (inMemoryIterator.GetNext(out RecordInfo recordInfo, out Key key, out Value value))
long firstPage = 0;
long numRecords = 0;
var cacheDebugger = updateCacheDebugger ? this.cacheDebugger : null;

void Init()
{
totalSize = 0;
numRecords = 0;
firstPage = this.fht.Log.HeadAddress >> this.storelogsettings.PageSizeBits;
cacheDebugger?.Reset((string instanceId) => this.partition.PartitionFunction(instanceId) == this.partition.PartitionId);
}

void Iteration(RecordInfo recordInfo, Key key, Value value, long currentAddress)
{
long delta = key.Val.EstimatedSize;
if (!recordInfo.Tombstone)
{
delta += value.EstimatedSize;
}
numRecords++;
totalSize += delta;
cacheDebugger?.UpdateSize(key, delta);
}

this.ScanMemorySection(Init, Iteration);

long lastPage = this.fht.Log.TailAddress >> this.storelogsettings.PageSizeBits;
return ((int) (lastPage-firstPage) + 1, totalSize);
return ((int) (lastPage-firstPage) + 1, totalSize, numRecords);
}

public void SetEmptyPageCount(int emptyPageCount)
Expand Down Expand Up @@ -1294,7 +1346,7 @@ public Functions(Partition partition, FasterKV store, MemoryTracker.CacheTracker
this.store = store;
this.stats = store.StoreStats;
this.cacheDebugger = partition.Settings.TestHooks?.CacheDebugger;
this.cacheTracker = isScan ? null : cacheTracker;
this.cacheTracker = cacheTracker;
this.isScan = isScan;
}

Expand Down Expand Up @@ -1508,6 +1560,11 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.SingleWriter(ref Key

case WriteReason.Compaction:
this.cacheDebugger?.Record(key.Val, CacheDebugger.CacheEvent.SingleWriterCompaction, src.Version, default, info.Address);
this.cacheTracker.UpdateTrackedObjectSize(key.Val.EstimatedSize + src.EstimatedSize, key, info.Address);
break;

default:
this.cacheDebugger?.Fail("Invalid WriteReason in SingleWriter", key);
break;
}
dst.Val = output.Val ?? src.Val;
Expand Down Expand Up @@ -1535,15 +1592,20 @@ void IFunctions<Key, Value, EffectTracker, Output, object>.PostSingleWriter(ref

case WriteReason.CopyToTail:
this.cacheDebugger?.Record(key.Val, CacheDebugger.CacheEvent.PostSingleWriterCopyToTail, src.Version, default, info.Address);
if (!this.isScan)
{
this.cacheTracker.UpdateTrackedObjectSize(key.Val.EstimatedSize + dst.EstimatedSize, key, info.Address);
}
break;

case WriteReason.Compaction:
this.cacheDebugger?.Record(key.Val, CacheDebugger.CacheEvent.PostSingleWriterCompaction, src.Version, default, info.Address);
this.cacheDebugger?.Fail("Do not expect PostSingleWriter-Compaction", key);
break;

default:
this.cacheDebugger?.Fail("Invalid WriteReason in PostSingleWriter", key);
break;
}
if (!this.isScan)
{
this.cacheTracker.UpdateTrackedObjectSize(key.Val.EstimatedSize + dst.EstimatedSize, key, info.Address);
}
}

Expand All @@ -1559,6 +1621,7 @@ void IFunctions<Key, Value, EffectTracker, Output, object>.PostSingleDeleter(ref
if (!this.isScan)
{
this.cacheTracker.UpdateTrackedObjectSize(key.Val.EstimatedSize, key, info.Address);
this.cacheDebugger?.UpdateReferenceValue(ref key.Val, null, 0);
}
}

Expand All @@ -1580,9 +1643,13 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.ConcurrentDeleter(ref
if (!this.isScan)
{
long removed = value.EstimatedSize;

// If record is marked invalid (failed to insert), dispose key as well
if (info.RecordInfo.Invalid)
{
removed += key.Val.EstimatedSize;
}

this.cacheTracker.UpdateTrackedObjectSize(-removed, key, info.Address);
this.cacheDebugger?.UpdateReferenceValue(ref key.Val, null, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ public void FasterStorageError(string context, Exception exception)
}
}

public void FasterCacheSizeMeasured(int numPages, long numRecords, long sizeInBytes, long discrepancy, double elapsedMs)
{
if (this.logLevelLimit <= LogLevel.Information)
{
this.logger.LogInformation("Part{partition:D2} Measured CacheSize numPages={numPages} numRecords={numRecords} sizeInBytes={sizeInBytes} discrepancy={discrepancy} elapsedMs={elapsedMs:F2}", this.partitionId, numPages, numRecords, sizeInBytes, discrepancy, elapsedMs);
EtwSource.Log.FasterCacheSizeMeasured(this.account, this.taskHub, this.partitionId, numPages, numRecords, sizeInBytes, discrepancy, elapsedMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
}

public void FasterProgress(string details)
{
if (this.logLevelLimit <= LogLevel.Debug)
Expand Down
19 changes: 13 additions & 6 deletions src/DurableTask.Netherite/StorageProviders/Faster/MemoryTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void UpdateTargetSizes()
(int totalPages, long totalSize) = (0, 0);
foreach(var store in this.stores.Values)
{
(int numPages, long size) = store.ComputeMemorySize();
(int numPages, long size, long numRecords) = store.ComputeMemorySize();
totalPages += numPages;
totalSize += size;
}
Expand Down Expand Up @@ -103,17 +103,24 @@ public void Dispose()
}
}

public void MeasureCacheSize()
public void MeasureCacheSize(bool isFirstCall)
{
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
(int numPages, long size) = this.store.ComputeMemorySize(true);
double MB(long bytes) => (double)bytes / (1024 * 1024);
this.store.TraceHelper.FasterProgress($"CacheSize: numPages={numPages} objectSize={MB(size):F2}MB totalSize={MB(size + this.store.MemoryUsedWithoutObjects):F2}MB elapsedMs={stopwatch.Elapsed.TotalMilliseconds:F2}");
(int numPages, long size, long numRecords) = this.store.ComputeMemorySize(updateCacheDebugger: true);
stopwatch.Stop();

this.store.TraceHelper.FasterCacheSizeMeasured(
numPages,
numRecords,
sizeInBytes: size,
discrepancy: isFirstCall ? 0 : size - this.trackedObjectSize,
stopwatch.Elapsed.TotalMilliseconds);

this.trackedObjectSize = size;
}

public (int, long) ComputeMemorySize() => this.store.ComputeMemorySize(false); // used by tests only
public (int numPages, long size, long numRecords) ComputeMemorySize() => this.store.ComputeMemorySize(updateCacheDebugger: false); // used by tests only

internal void SetEmptyPageCount(int emptyPageCount) => this.store.SetEmptyPageCount(emptyPageCount); // used by tests only

Expand Down
7 changes: 7 additions & 0 deletions src/DurableTask.Netherite/Tracing/EtwSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ public void FasterStorageError(string Account, string TaskHub, int PartitionId,
this.WriteEvent(256, Account, TaskHub, PartitionId, Context, Details, AppName, ExtensionVersion);
}

[Event(257, Level = EventLevel.Informational, Version = 1)]
public void FasterCacheSizeMeasured(string Account, string TaskHub, int PartitionId, int NumPages, long NumRecords, long SizeInBytes, long Discrepancy, double ElapsedMs, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(257, Account, TaskHub, PartitionId, NumPages, NumRecords, SizeInBytes, Discrepancy, ElapsedMs, AppName, ExtensionVersion);
}

[Event(259, Level = EventLevel.Verbose, Version = 1)]
public void FasterProgress(string Account, string TaskHub, int PartitionId, string Details, string AppName, string ExtensionVersion)
{
Expand Down