Skip to content

Commit

Permalink
[C#] Expand async session-free API sample in playground (#452)
Browse files Browse the repository at this point in the history
* added sample
* cleaned up async stress
* Fix memory leak + add serialized wrapper sample
* added testcases for concurrency within session in async.
* clean up LMD
  • Loading branch information
badrishc authored Apr 22, 2021
1 parent 9dcb777 commit a328fcb
Show file tree
Hide file tree
Showing 10 changed files with 555 additions and 132 deletions.
1 change: 1 addition & 0 deletions cs/playground/AsyncStress/AsyncStress.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MessagePack" Version="2.2.85" />
<PackageReference Include="xunit.assert" Version="2.4.1" />
</ItemGroup>

Expand Down
71 changes: 28 additions & 43 deletions cs/playground/AsyncStress/FasterWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,58 +1,59 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using Xunit;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using FASTER.core;

namespace AsyncStress
{
public class FasterWrapper<Key, Value>
public class FasterWrapper<Key, Value> : IFasterWrapper<Key, Value>
{
readonly FasterKV<Key, Value> _store;
readonly AsyncPool<ClientSession<Key, Value, Value, Value, Empty, SimpleFunctions<Key, Value, Empty>>> _sessionPool;

// OS Buffering is safe to use in this app because Reads are done after all updates
internal static bool useOsReadBuffering = false;
readonly bool useOsReadBuffering;
int upsertPendingCount = 0;

public FasterWrapper()
// This can be used to verify the same amount data is loaded.
public long TailAddress => _store.Log.TailAddress;

// Indicates how many upsert operations went pending
public int UpsertPendingCount { get => upsertPendingCount; set => upsertPendingCount = value; }
// Whether OS Read buffering is enabled
public bool UseOsReadBuffering => useOsReadBuffering;

public FasterWrapper(bool useOsReadBuffering = false)
{
var logDirectory ="d:/FasterLogs";
var logDirectory = "d:/AsyncStress";
var logFileName = Guid.NewGuid().ToString();
var logSettings = new LogSettings
{
LogDevice = new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering),
ObjectLogDevice = new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering),
ObjectLogDevice = new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.obj.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering),
PageSizeBits = 12,
MemorySizeBits = 13
};

Console.WriteLine($" Using {logSettings.LogDevice.GetType()}");

this.useOsReadBuffering = useOsReadBuffering;
_store = new FasterKV<Key, Value>(1L << 20, logSettings);
_sessionPool = new AsyncPool<ClientSession<Key, Value, Value, Value, Empty, SimpleFunctions<Key, Value, Empty>>>(
logSettings.LogDevice.ThrottleLimit,
() => _store.For(new SimpleFunctions<Key, Value, Empty>()).NewSession<SimpleFunctions<Key, Value, Empty>>());
}

// This can be used to verify the same amount data is loaded.
public long TailAddress => _store.Log.TailAddress;

// Indicates how many operations went pending
public int UpsertPendingCount = 0;
public int ReadPendingCount = 0;

public async ValueTask UpsertAsync(Key key, Value value)
{
if (!_sessionPool.TryGet(out var session))
session = await _sessionPool.GetAsync();
var r = await session.UpsertAsync(key, value);
while (r.Status == Status.PENDING)
{
Interlocked.Increment(ref UpsertPendingCount);
Interlocked.Increment(ref upsertPendingCount);
r = await r.CompleteAsync();
}
_sessionPool.Return(session);
Expand All @@ -63,26 +64,21 @@ public void Upsert(Key key, Value value)
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.Upsert(key, value);
if (status == Status.PENDING)
{
// This should not happen for sync Upsert().
Interlocked.Increment(ref UpsertPendingCount);
session.CompletePending();
}
Assert.True(status != Status.PENDING);
_sessionPool.Return(session);
}

public async ValueTask UpsertChunkAsync((Key, Value)[] chunk)
public async ValueTask UpsertChunkAsync((Key, Value)[] chunk, int offset, int count)
{
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();

for (var ii = 0; ii < chunk.Length; ++ii)
for (var i = 0; i < count; ++i)
{
var r = await session.UpsertAsync(chunk[ii].Item1, chunk[ii].Item2);
var r = await session.UpsertAsync(chunk[offset + i].Item1, chunk[offset + i].Item2);
while (r.Status == Status.PENDING)
{
Interlocked.Increment(ref UpsertPendingCount);
Interlocked.Increment(ref upsertPendingCount);
r = await r.CompleteAsync();
}
}
Expand All @@ -105,7 +101,6 @@ public async ValueTask UpsertChunkAsync((Key, Value)[] chunk)
var result = session.Read(key);
if (result.status == Status.PENDING)
{
Interlocked.Increment(ref ReadPendingCount);
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
int count = 0;
for (; completedOutputs.Next(); ++count)
Expand All @@ -120,26 +115,16 @@ public async ValueTask UpsertChunkAsync((Key, Value)[] chunk)
return new ValueTask<(Status, Value)>(result);
}

public async ValueTask ReadChunkAsync(Key[] chunk, ValueTask<(Status, Value)>[] results, int offset)
public async ValueTask<(Status, Value)[]> ReadChunkAsync(Key[] chunk, int offset, int count)
{
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();

// Reads in chunk are performed serially
for (var ii = 0; ii < chunk.Length; ++ii)
results[offset + ii] = new ValueTask<(Status, Value)>((await session.ReadAsync(chunk[ii])).Complete());
_sessionPool.Return(session);
}
(Status, Value)[] result = new (Status, Value)[count];
for (var i = 0; i < count; ++i)
result[i] = (await session.ReadAsync(chunk[offset + i]).ConfigureAwait(false)).Complete();

public async ValueTask<(Status, Value)[]> ReadChunkAsync(Key[] chunk)
{
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();

// Reads in chunk are performed serially
(Status, Value)[] result = new (Status, Value)[chunk.Length];
for (var ii = 0; ii < chunk.Length; ++ii)
result[ii] = (await session.ReadAsync(chunk[ii]).ConfigureAwait(false)).Complete();
_sessionPool.Return(session);
return result;
}
Expand Down
20 changes: 20 additions & 0 deletions cs/playground/AsyncStress/IFasterWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using FASTER.core;
using System.Threading.Tasks;

namespace AsyncStress
{
public interface IFasterWrapper<Key, Value>
{
long TailAddress { get; }
int UpsertPendingCount { get; set; }
bool UseOsReadBuffering { get; }

void Dispose();
ValueTask<(Status, Value)> Read(Key key);
ValueTask<(Status, Value)> ReadAsync(Key key);
ValueTask<(Status, Value)[]> ReadChunkAsync(Key[] chunk, int offset, int count);
void Upsert(Key key, Value value);
ValueTask UpsertAsync(Key key, Value value);
ValueTask UpsertChunkAsync((Key, Value)[] chunk, int offset, int count);
}
}
Loading

0 comments on commit a328fcb

Please sign in to comment.