Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] SpanByteFasterKVProvider #520

Merged
merged 3 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cs/remote/samples/VarLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static void VarLenServer(string[] args)
if (opts.Recover) store.Recover();

// This variable-length session provider can be used with compatible clients such as VarLenClient
var provider = new FasterKVProvider<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteSerializer>(store, wp => new SpanByteFunctionsForServer<long>(wp), new SpanByteSerializer());
var provider = new SpanByteFasterKVProvider(store);

// Create server
var server = new FasterServer(opts.Address, opts.Port);
Expand Down
47 changes: 47 additions & 0 deletions cs/remote/src/FASTER.server/SpanByteClientSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using FASTER.common;
using FASTER.core;
using System.Buffers;

namespace FASTER.client
{
/// <summary>
/// Serializer for SpanByte (can be used on client side)
/// </summary>
public unsafe class SpanByteClientSerializer : IClientSerializer<SpanByte, SpanByte, SpanByte, SpanByteAndMemory>
{
readonly MemoryPool<byte> memoryPool;

/// <summary>
/// Constructor
/// </summary>
/// <param name="memoryPool"></param>
public SpanByteClientSerializer(MemoryPool<byte> memoryPool = default)
{
this.memoryPool = memoryPool ?? MemoryPool<byte>.Shared;
}

/// <inheritdoc />
public SpanByteAndMemory ReadOutput(ref byte* src)
{
int length = *(int*)src;
var mem = memoryPool.Rent(length);
new ReadOnlySpan<byte>(src + sizeof(int), length).CopyTo(mem.Memory.Span);
src += length + sizeof(int);
return new SpanByteAndMemory(mem, length);
}

/// <inheritdoc />
public bool Write(ref SpanByte k, ref byte* dst, int length)
{
var len = k.TotalSize;
if (length < len) return false;
k.CopyTo(dst);
dst += len;
return true;
}
}
}
47 changes: 47 additions & 0 deletions cs/remote/src/FASTER.server/SpanByteFasterKVProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading;
using FASTER.common;
using FASTER.core;

namespace FASTER.server
{

/// <summary>
/// Session provider for FasterKV store based on
/// [K, V, I, O, C] = [SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long]
/// </summary>
public sealed class SpanByteFasterKVProvider : ISessionProvider, IDisposable
{
readonly FasterKV<SpanByte, SpanByte> store;
readonly SpanByteServerSerializer serializer;
readonly MaxSizeSettings maxSizeSettings;

/// <summary>
/// Create SpanByte FasterKV backend
/// </summary>
/// <param name="store"></param>
/// <param name="maxSizeSettings"></param>
public SpanByteFasterKVProvider(FasterKV<SpanByte, SpanByte> store, MaxSizeSettings maxSizeSettings = default)
{
this.store = store;
this.serializer = new SpanByteServerSerializer();
this.maxSizeSettings = maxSizeSettings ?? new MaxSizeSettings();
}

/// <inheritdoc />
public IServerSession GetSession(WireFormat wireFormat, Socket socket)
{
return new BinaryServerSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteServerSerializer>
(socket, store, new SpanByteFunctionsForServer<long>(wireFormat), serializer, maxSizeSettings);
}

/// <inheritdoc />
public void Dispose()
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
namespace FASTER.server
{
/// <summary>
/// Serializer for SpanByte
/// Used only on server-side, but we add client-side serializer API for completeness
/// Serializer for SpanByte. Used only on server-side.
/// </summary>
public unsafe sealed class SpanByteSerializer : IServerSerializer<SpanByte, SpanByte, SpanByte, SpanByteAndMemory>, IClientSerializer<SpanByte, SpanByte, SpanByte, SpanByteAndMemory>
public unsafe sealed class SpanByteServerSerializer : IServerSerializer<SpanByte, SpanByte, SpanByte, SpanByteAndMemory>
{
readonly SpanByteVarLenStruct settings;
readonly int keyLength;
readonly int valueLength;

[ThreadStatic]
static SpanByteAndMemory output;

Expand All @@ -26,9 +24,8 @@ public unsafe sealed class SpanByteSerializer : IServerSerializer<SpanByte, Span
/// </summary>
/// <param name="maxKeyLength">Max key length</param>
/// <param name="maxValueLength">Max value length</param>
public SpanByteSerializer(int maxKeyLength = 512, int maxValueLength = 512)
public SpanByteServerSerializer(int maxKeyLength = 512, int maxValueLength = 512)
{
settings = new SpanByteVarLenStruct();
keyLength = maxKeyLength;
valueLength = maxValueLength;
}
Expand All @@ -38,7 +35,7 @@ public SpanByteSerializer(int maxKeyLength = 512, int maxValueLength = 512)
public ref SpanByte ReadKeyByRef(ref byte* src)
{
ref var ret = ref Unsafe.AsRef<SpanByte>(src);
src += settings.GetLength(ref ret);
src += ret.TotalSize;
return ref ret;
}

Expand All @@ -47,7 +44,7 @@ public ref SpanByte ReadKeyByRef(ref byte* src)
public ref SpanByte ReadValueByRef(ref byte* src)
{
ref var ret = ref Unsafe.AsRef<SpanByte>(src);
src += settings.GetLength(ref ret);
src += ret.TotalSize;
return ref ret;
}

Expand All @@ -56,20 +53,10 @@ public ref SpanByte ReadValueByRef(ref byte* src)
public ref SpanByte ReadInputByRef(ref byte* src)
{
ref var ret = ref Unsafe.AsRef<SpanByte>(src);
src += settings.GetLength(ref ret);
src += ret.TotalSize;
return ref ret;
}

/// <inheritdoc />
public bool Write(ref SpanByte k, ref byte* dst, int length)
{
var len = settings.GetLength(ref k);
if (length < len) return false;
Buffer.MemoryCopy(Unsafe.AsPointer(ref k), dst, len, len);
dst += len;
return true;
}

/// <inheritdoc />
public bool Write(ref SpanByteAndMemory k, ref byte* dst, int length)
{
Expand All @@ -94,15 +81,6 @@ public ref SpanByteAndMemory AsRefOutput(byte* src, int length)
/// <inheritdoc />
public void SkipOutput(ref byte* src) => src += (*(int*)src) + sizeof(int);

/// <inheritdoc />
public SpanByteAndMemory ReadOutput(ref byte* src)
{
int length = *(int*)src;
var _output = SpanByteAndMemory.FromFixedSpan(new Span<byte>(src, length + sizeof(int)));
src += length + sizeof(int);
return _output;
}

/// <inheritdoc />
public int GetLength(ref SpanByteAndMemory o) => o.Length;
}
Expand Down
2 changes: 1 addition & 1 deletion cs/remote/test/FASTER.remote.test/VarLenServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public VarLenServer(string folderName, string address = "127.0.0.1", int port =
store = new FasterKV<SpanByte, SpanByte>(indexSize, logSettings, checkpointSettings);

// Create session provider for VarLen
var provider = new FasterKVProvider<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteSerializer>(store, wp => new SpanByteFunctionsForServer<long>(wp), new SpanByteSerializer());
var provider = new SpanByteFasterKVProvider(store);

server = new FasterServer(address, port);
server.Register(WireFormat.DefaultVarLenKV, provider);
Expand Down
13 changes: 13 additions & 0 deletions cs/src/core/VarLen/SpanByteAndMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ public SpanByteAndMemory(IMemoryOwner<byte> memory)
Memory = memory;
}

/// <summary>
/// Constructor using given IMemoryOwner and length
/// </summary>
/// <param name="memory"></param>
/// <param name="length"></param>
public SpanByteAndMemory(IMemoryOwner<byte> memory, int length)
{
SpanByte = default;
SpanByte.Invalid = true;
Memory = memory;
SpanByte.Length = length;
}

/// <summary>
/// View a fixed Span&lt;byte&gt; as a SpanByteAndMemory
/// </summary>
Expand Down