Skip to content

Commit

Permalink
[C#] Fix double dispose in CompletePendingWithOutputs (#465)
Browse files Browse the repository at this point in the history
* [C#] Fix double dispose in CompletePendingWithOutputs
* Update MultiReadSpanByteKey.cs
* Cleaned up testcase, merged with other SpanByte test.

Co-authored-by: Tim Coleman <timothy.coleman@gmail.com>
  • Loading branch information
badrishc and timothycoleman authored May 3, 2021
1 parent 3a12bb3 commit 660d1c9
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 23 deletions.
10 changes: 4 additions & 6 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,10 @@ internal void InternalCompletePendingRequest<Input, Output, Context, FasterSessi
// Remove from pending dictionary
opCtx.ioPendingRequests.Remove(request.id);
var status = InternalCompletePendingRequestFromContext(opCtx, currentCtx, fasterSession, request, ref pendingContext, false, out _);
if (completedOutputs is { })
{
if (status == Status.OK || status == Status.NOTFOUND)
completedOutputs.Add(ref pendingContext, status);
}
pendingContext.Dispose();
if (completedOutputs is { } && (status == Status.OK || status == Status.NOTFOUND))
completedOutputs.Add(ref pendingContext, status);
else
pendingContext.Dispose();
}
}

Expand Down
82 changes: 65 additions & 17 deletions cs/test/SpanByteTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@
// Licensed under the MIT license.

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using FASTER.core;
using System.IO;
using NUnit.Framework;
using System.Runtime.InteropServices;

namespace FASTER.test
{
Expand All @@ -25,13 +20,10 @@ public unsafe void SpanByteTest1()
Span<byte> output = stackalloc byte[20];
SpanByte input = default;

FasterKV<SpanByte, SpanByte> fht;
IDevice log;
log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "/hlog1.log", deleteOnClose: true);
fht = new FasterKV<SpanByte, SpanByte>
using var log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "/hlog1.log", deleteOnClose: true);
using var fht = new FasterKV<SpanByte, SpanByte>
(128, new LogSettings { LogDevice = log, MemorySizeBits = 17, PageSizeBits = 12 });

var s = fht.NewSession(new SpanByteFunctions<Empty>());
using var s = fht.NewSession(new SpanByteFunctions<Empty>());

var key1 = MemoryMarshal.Cast<char, byte>("key1".AsSpan());
var value1 = MemoryMarshal.Cast<char, byte>("value1".AsSpan());
Expand All @@ -54,12 +46,68 @@ public unsafe void SpanByteTest1()

Assert.IsTrue(!output2.IsSpanByte);
Assert.IsTrue(output2.Memory.Memory.Span.Slice(0, output2.Length).SequenceEqual(value2));
}

[Test]
[Category("FasterKV")]
public unsafe void MultiReadSpanByteKeyTest()
{
using var log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "/MultiReadSpanByteKeyTest.log", deleteOnClose: true);
using var fht = new FasterKV<SpanByte, long>(
size: 1L << 20,
new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 12 });
using var session = fht.For(new MultiReadSpanByteKeyTestFunctions()).NewSession<MultiReadSpanByteKeyTestFunctions>();

for (int i = 0; i < 3000; i++)
{
var key = MemoryMarshal.Cast<char, byte>($"{i}".AsSpan());
fixed (byte* _ = key)
session.Upsert(SpanByte.FromFixedSpan(key), i);
}

// Evict all records to disk
fht.Log.FlushAndEvict(true);

s.Dispose();
fht.Dispose();
fht = null;
log.Dispose();
}
for (long key = 0; key < 50; key++)
{
// read each key multiple times
for (int i = 0; i < 10; i++)
Assert.AreEqual(key, ReadKey($"{key}"));
}

long ReadKey(string keyString)
{
Status status;

var key = MemoryMarshal.Cast<char, byte>(keyString.AsSpan());
fixed (byte* _ = key)
status = session.Read(key: SpanByte.FromFixedSpan(key), out var unused);

// All keys need to be fetched from disk
Assert.AreEqual(Status.PENDING, status);

session.CompletePendingWithOutputs(out var completedOutputs, wait: true);

var count = 0;
var value = 0L;
using (completedOutputs)
{
while (completedOutputs.Next())
{
count++;
Assert.AreEqual(Status.OK, completedOutputs.Current.Status);
value = completedOutputs.Current.Output;
}
}
Assert.AreEqual(1, count);
return value;
}
}

class MultiReadSpanByteKeyTestFunctions : FunctionsBase<SpanByte, long, long, long, Empty>
{
public override void SingleReader(ref SpanByte key, ref long input, ref long value, ref long dst) => dst = value;
public override void ConcurrentReader(ref SpanByte key, ref long input, ref long value, ref long dst) => dst = value;
}
}
}

0 comments on commit 660d1c9

Please sign in to comment.