Skip to content

Commit

Permalink
Merge branch 'dev' into pr/measure-cache-size-periodically
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs
  • Loading branch information
sebastianburckhardt committed Jun 21, 2022
2 parents 591fa00 + 01fa680 commit e25e26e
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.9.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.7.1" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.10.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.7.2" />
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ abstract class TrackedObject
[IgnoreDataMember]
public abstract TrackedObjectKey Key { get; }

/// <summary>
/// The current value in serialized form, or null
/// </summary>
[IgnoreDataMember]
internal byte[] SerializationCache { get; set; }

[DataMember]
public int Version { get; set; } // we use this validate consistency of read/write updates in FASTER, it is not otherwise needed

Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/DurableTask.Netherite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.8" />
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.3" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.9.1" />
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.11" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.10.0" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ internal void StartStoreCheckpoint(long commitLogPosition, long inputQueuePositi
{
// update the positions
var dedupState = this.cache[TrackedObjectKey.Dedup];
dedupState.TrackedObject.SerializationCache = null;
((DedupState)dedupState.TrackedObject).Positions = (commitLogPosition, inputQueuePosition);
if (!dedupState.Modified)
{
Expand All @@ -199,14 +198,14 @@ internal void StartStoreCheckpoint(long commitLogPosition, long inputQueuePositi
var toWrite = new List<ToWrite>();
foreach (var cacheEntry in this.modified)
{
Serializer.SerializeTrackedObject(cacheEntry.TrackedObject);
byte[] bytes = Serializer.SerializeTrackedObject(cacheEntry.TrackedObject);
toWrite.Add(new ToWrite()
{
Key = cacheEntry.TrackedObject.Key,
PreviousValue = cacheEntry.LastCheckpointed,
NewValue = cacheEntry.TrackedObject.SerializationCache,
NewValue = bytes,
});
cacheEntry.LastCheckpointed = cacheEntry.TrackedObject.SerializationCache;
cacheEntry.LastCheckpointed = bytes;
cacheEntry.Modified = false;
}
this.modified.Clear();
Expand Down Expand Up @@ -377,7 +376,6 @@ public override async ValueTask ProcessEffectOnTrackedObject(FasterKV.Key key, E
this.cache.Add(key, cacheEntry);
}
var trackedObject = cacheEntry.TrackedObject;
trackedObject.SerializationCache = null;
effectTracker.ProcessEffectOn(trackedObject);
if (!cacheEntry.Modified)
{
Expand Down
29 changes: 19 additions & 10 deletions src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,14 @@ void Add(TrackedObjectKey key, long delta, long address, string desc)
totalSize += delta;
}

foreach (var k in this.cacheDebugger.Keys)
{
if (!perKey.ContainsKey(k))
{
perKey.Add(k, emptyList); // for keys that were not found in memory, the list of records is empty
}
}

foreach (var k in this.cacheDebugger.Keys)
{
if (!perKey.ContainsKey(k))
Expand All @@ -1052,6 +1060,8 @@ void Add(TrackedObjectKey key, long delta, long address, string desc)
sizeMatches = sizeMatches && this.cacheDebugger.CheckSize(kvp.Key, kvp.Value, this.Log.HeadAddress);
}

// if the records matched for each key, then the total size should also match

if (sizeMatches && trackedSizeBefore == trackedSizeAfter && trackedSizeBefore != totalSize)
{
this.cacheDebugger.Fail("total size of tracked objects does not match");
Expand Down Expand Up @@ -1082,6 +1092,8 @@ internal void ScanMemorySection(Action init, Action<RecordInfo, Key, Value, long
}
}

readonly static List<(long delta, long address, string desc)> emptyList = new List<(long delta, long address, string desc)>();

internal (int numPages, long size, long numRecords) ComputeMemorySize(bool updateCacheDebugger)
{
long totalSize = 0;
Expand Down Expand Up @@ -1291,10 +1303,10 @@ public override void Serialize(ref Value obj)
else
{
TrackedObject trackedObject = (TrackedObject) obj.Val;
DurableTask.Netherite.Serializer.SerializeTrackedObject(trackedObject);
var bytes = DurableTask.Netherite.Serializer.SerializeTrackedObject(trackedObject);
this.storeStats.Serialize++;
this.writer.Write(trackedObject.SerializationCache.Length);
this.writer.Write(trackedObject.SerializationCache);
this.writer.Write(bytes.Length);
this.writer.Write(bytes);
this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.SerializeObject, obj.Version, null, 0);
}
}
Expand Down Expand Up @@ -1405,7 +1417,6 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.InPlaceUpdater(ref Ke
value.Val = trackedObject;
this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.DeserializeObject, value.Version, tracker.CurrentEventId, 0);
}
trackedObject.SerializationCache = null; // cache is invalidated because of update
trackedObject.Partition = this.partition;
this.cacheDebugger?.CheckVersionConsistency(key.Val, trackedObject, value.Version);
tracker.ProcessEffectOn(trackedObject);
Expand All @@ -1431,10 +1442,10 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.CopyUpdater(ref Key k
{
// replace old object with its serialized snapshot
long oldValueSizeBefore = oldValue.EstimatedSize;
DurableTask.Netherite.Serializer.SerializeTrackedObject(trackedObject);
var bytes = DurableTask.Netherite.Serializer.SerializeTrackedObject(trackedObject);
this.stats.Serialize++;
this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.SerializeObject, oldValue.Version, null, 0);
oldValue.Val = trackedObject.SerializationCache;
oldValue.Val = bytes;
this.cacheTracker.UpdateTrackedObjectSize(oldValue.EstimatedSize - oldValueSizeBefore, key, null); // null indicates we don't know the address
this.stats.Copy++;
}
Expand All @@ -1450,7 +1461,6 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.CopyUpdater(ref Key k
}

newValue.Val = trackedObject;
trackedObject.SerializationCache = null; // cache is invalidated by the update which is happening below
this.cacheDebugger?.CheckVersionConsistency(key.Val, trackedObject, oldValue.Version);
tracker.ProcessEffectOn(trackedObject);
newValue.Version = oldValue.Version + 1;
Expand Down Expand Up @@ -1491,12 +1501,11 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.SingleReader(ref Key
{
if (!this.isScan)
{
// replace src with a serialized snapshot of the object, so it does not get mutated
// replace src with a serialized snapshot of the object - it is now read-only since we did a copy-read-to-tail
long oldValueSizeBefore = src.EstimatedSize;
DurableTask.Netherite.Serializer.SerializeTrackedObject(trackedObject);
src.Val = DurableTask.Netherite.Serializer.SerializeTrackedObject(trackedObject);
this.stats.Serialize++;
this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.SerializeObject, src.Version, null, 0);
src.Val = trackedObject.SerializationCache;
this.cacheTracker.UpdateTrackedObjectSize(src.EstimatedSize - oldValueSizeBefore, key, readInfo.Address);
this.stats.Copy++;
}
Expand Down
12 changes: 4 additions & 8 deletions src/DurableTask.Netherite/Util/Serializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,17 @@ public static Event DeserializeEvent(Stream stream)
return (Event)eventSerializer.ReadObject(stream);
}

public static void SerializeTrackedObject(TrackedObject trackedObject)
public static byte[] SerializeTrackedObject(TrackedObject trackedObject)
{
if (trackedObject.SerializationCache == null)
{
var stream = new MemoryStream();
trackedObjectSerializer.WriteObject(stream, trackedObject);
trackedObject.SerializationCache = stream.ToArray();
}
var stream = new MemoryStream();
trackedObjectSerializer.WriteObject(stream, trackedObject);
return stream.ToArray();
}

public static TrackedObject DeserializeTrackedObject(byte[] bytes)
{
var stream = new MemoryStream(bytes);
var result = (TrackedObject)trackedObjectSerializer.ReadObject(stream);
result.SerializationCache = bytes;
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\DurableTask.Netherite.AzureFunctions\DurableTask.Netherite.AzureFunctions.csproj" />
<ProjectReference Include="..\DurableTask.Netherite.Tests\DurableTask.Netherite.Tests.csproj" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.9.1" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.10.0" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion test/LoadGeneratorApp/LoadGeneratorApp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.13" />
<PackageReference Include="Microsoft.Azure.DurableTask.Netherite.AzureFunctions" Version="1.0.3" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.7.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.7.2" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>
<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions test/PerformanceTests/PerformanceTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.7.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.9.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.7.1" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.10.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.7.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="4.2.0" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.13" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.4" />
Expand Down

0 comments on commit e25e26e

Please sign in to comment.