diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 619b493f3..ad50b9ef7 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -329,12 +329,10 @@ internal void InternalCompletePendingRequest output = stackalloc byte[20]; SpanByte input = default; - FasterKV fht; - IDevice log; - log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "/hlog1.log", deleteOnClose: true); - fht = new FasterKV + using var log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "/hlog1.log", deleteOnClose: true); + using var fht = new FasterKV (128, new LogSettings { LogDevice = log, MemorySizeBits = 17, PageSizeBits = 12 }); - - var s = fht.NewSession(new SpanByteFunctions()); + using var s = fht.NewSession(new SpanByteFunctions()); var key1 = MemoryMarshal.Cast("key1".AsSpan()); var value1 = MemoryMarshal.Cast("value1".AsSpan()); @@ -54,12 +46,68 @@ public unsafe void SpanByteTest1() Assert.IsTrue(!output2.IsSpanByte); Assert.IsTrue(output2.Memory.Memory.Span.Slice(0, output2.Length).SequenceEqual(value2)); + } + + [Test] + [Category("FasterKV")] + public unsafe void MultiReadSpanByteKeyTest() + { + using var log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "/MultiReadSpanByteKeyTest.log", deleteOnClose: true); + using var fht = new FasterKV( + size: 1L << 20, + new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 12 }); + using var session = fht.For(new MultiReadSpanByteKeyTestFunctions()).NewSession(); + + for (int i = 0; i < 3000; i++) + { + var key = MemoryMarshal.Cast($"{i}".AsSpan()); + fixed (byte* _ = key) + session.Upsert(SpanByte.FromFixedSpan(key), i); + } + // Evict all records to disk + fht.Log.FlushAndEvict(true); - s.Dispose(); - fht.Dispose(); - fht = null; - log.Dispose(); - } + for (long key = 0; key < 50; key++) + { + // read each key multiple times + for (int i = 0; i < 10; i++) + Assert.AreEqual(key, ReadKey($"{key}")); + } + + long ReadKey(string keyString) + { + Status status; + + var key = MemoryMarshal.Cast(keyString.AsSpan()); + fixed (byte* _ = key) + status = session.Read(key: SpanByte.FromFixedSpan(key), out var unused); + + // All keys need to be fetched from disk + Assert.AreEqual(Status.PENDING, status); + + session.CompletePendingWithOutputs(out var completedOutputs, wait: true); + + var count = 0; + var value = 0L; + using (completedOutputs) + { + while (completedOutputs.Next()) + { + count++; + Assert.AreEqual(Status.OK, completedOutputs.Current.Status); + value = completedOutputs.Current.Output; + } + } + Assert.AreEqual(1, count); + return value; + } + } + + class MultiReadSpanByteKeyTestFunctions : FunctionsBase + { + public override void SingleReader(ref SpanByte key, ref long input, ref long value, ref long dst) => dst = value; + public override void ConcurrentReader(ref SpanByte key, ref long input, ref long value, ref long dst) => dst = value; + } } }