Skip to content

Commit

Permalink
[C#] Add infrastructure to support more wire protocols (#496)
Browse files Browse the repository at this point in the history
* Add BackendProvider concept to support backend other than FasterKV. Add protocol byte to binary protocol to enable support for more protocols in the future.

* Fix some typos and add some comments

* add .idea to gitignore

* Add constructor overload to avoid API change, tweaks to serialization performance.

* remove unused import

* Cleanup & minor perf improvements

Co-authored-by: Tianyu Li <t-litianyu@microsoft.com>
Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
3 people authored Jun 18, 2021
1 parent 8c0c6e1 commit ef016eb
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 83 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,8 @@ packages/
*.lib
nativebin/
/cs/**/launchSettings.json

# JetBrains
cs/.idea/
cs/remote/.idea
cs/libdpr/.idea
8 changes: 5 additions & 3 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ public void Flush()
if (offset > sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size)
{
int payloadSize = (int)(offset - sendObject.obj.bufferPtr);
((BatchHeader*)(sendObject.obj.bufferPtr + sizeof(int)))->numMessages = numMessages;

// seqNo and wire format are default (0)
((BatchHeader*)(sendObject.obj.bufferPtr + sizeof(int)))->NumMessages = numMessages;
Interlocked.Increment(ref numPendingBatches);

// Set packet size in header
Expand Down Expand Up @@ -255,8 +257,8 @@ internal void ProcessReplies(byte[] buf, int offset)
fixed (byte* b = &buf[offset])
{
var src = b;
var seqNo = ((BatchHeader*)src)->seqNo;
var count = ((BatchHeader*)src)->numMessages;
var seqNo = ((BatchHeader*)src)->SeqNo;
var count = ((BatchHeader*)src)->NumMessages;
if (seqNo != lastSeqNo + 1)
throw new Exception("Out of order message within session");
lastSeqNo = seqNo;
Expand Down
36 changes: 30 additions & 6 deletions cs/remote/src/FASTER.common/BatchHeader.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,55 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

namespace FASTER.common
{
/// <summary>
/// Header for message batch
/// Header for message batch (Little Endian server)
/// [4 byte seqNo][1 byte protocol][3 byte numMessages]
/// </summary>
[StructLayout(LayoutKind.Explicit, Size = 8)]
public unsafe struct BatchHeader
public struct BatchHeader
{
/// <summary>
/// Size
/// </summary>
public const int Size = 8;

/// <summary>
/// Sequence number for batch
/// Sequence number.
/// </summary>
[FieldOffset(0)]
public int seqNo;
public int SeqNo;

/// <summary>
/// Number of messsages packed in batch
/// Lower-order 8 bits are protocol type, higher-order 24 bits are num messages.
/// </summary>
[FieldOffset(4)]
public int numMessages;
private int numMessagesAndProtocolType;

/// <summary>
/// Number of messages packed in batch
/// </summary>
public int NumMessages
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => (int)((uint)numMessagesAndProtocolType >> 8);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
set { numMessagesAndProtocolType = (value << 8) | (numMessagesAndProtocolType & 0xFF); }
}

/// <summary>
/// Wire protocol this batch is written in
/// </summary>
public WireFormat Protocol
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => (WireFormat)(numMessagesAndProtocolType & 0xFF);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
set { numMessagesAndProtocolType = (numMessagesAndProtocolType & ~0xFF) | ((int)value & 0xFF); }
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

namespace FASTER.server
namespace FASTER.common
{
/// <summary>
/// Wire format
/// </summary>
public enum WireFormat
public enum WireFormat : byte
{
/// <summary>
/// Custom binary format
Expand Down
26 changes: 14 additions & 12 deletions cs/remote/src/FASTER.server/BinaryServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace FASTER.server
{
internal unsafe sealed class BinaryServerSession<Key, Value, Input, Output, Functions, ParameterSerializer>
: ServerSessionBase<Key, Value, Input, Output, Functions, ParameterSerializer>
: FasterKVServerSessionBase<Key, Value, Input, Output, Functions, ParameterSerializer>
where Functions : IFunctions<Key, Value, Input, Output, long>
where ParameterSerializer : IServerSerializer<Key, Value, Input, Output>
{
Expand Down Expand Up @@ -48,7 +48,7 @@ public override int TryConsumeMessages(byte[] buf)
return bytesRead;
}

public override void CompleteRead(ref Output output, long ctx, core.Status status)
public override void CompleteRead(ref Output output, long ctx, Status status)
{
byte* d = responseObject.obj.bufferPtr;
var dend = d + responseObject.obj.buffer.Length;
Expand All @@ -60,12 +60,12 @@ public override void CompleteRead(ref Output output, long ctx, core.Status statu
hrw.Write((MessageType)(ctx >> 32), ref dcurr, (int)(dend - dcurr));
Write((int)(ctx & 0xffffffff), ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
if (status != core.Status.NOTFOUND)
if (status != Status.NOTFOUND)
serializer.Write(ref output, ref dcurr, (int)(dend - dcurr));
msgnum++;
}

public override void CompleteRMW(long ctx, core.Status status)
public override void CompleteRMW(long ctx, Status status)
{
byte* d = responseObject.obj.bufferPtr;
var dend = d + responseObject.obj.buffer.Length;
Expand Down Expand Up @@ -113,13 +113,15 @@ private unsafe void ProcessBatch(byte[] buf, int offset)

var src = b;
ref var header = ref Unsafe.AsRef<BatchHeader>(src);
var num = header.NumMessages;
src += BatchHeader.Size;
core.Status status = default;
Status status = default;

dcurr += BatchHeader.Size;
start = 0;
msgnum = 0;
for (msgnum = 0; msgnum < header.numMessages; msgnum++)

for (msgnum = 0; msgnum < num; msgnum++)
{
var message = (MessageType)(*src++);
switch (message)
Expand All @@ -146,9 +148,9 @@ private unsafe void ProcessBatch(byte[] buf, int offset)
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));

if (status == core.Status.PENDING)
if (status == Status.PENDING)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
else if (status == core.Status.OK)
else if (status == Status.OK)
serializer.SkipOutput(ref dcurr);
break;

Expand All @@ -162,7 +164,7 @@ private unsafe void ProcessBatch(byte[] buf, int offset)

hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
if (status == core.Status.PENDING)
if (status == Status.PENDING)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
break;

Expand Down Expand Up @@ -193,7 +195,7 @@ private unsafe void ProcessBatch(byte[] buf, int offset)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe bool Write(ref core.Status s, ref byte* dst, int length)
private unsafe bool Write(ref Status s, ref byte* dst, int length)
{
if (length < 1) return false;
*dst++ = (byte)s;
Expand Down Expand Up @@ -224,8 +226,8 @@ private void SendAndReset(ref byte* d, ref byte* dend)
private void Send(byte* d)
{
var dstart = d + sizeof(int);
Unsafe.AsRef<BatchHeader>(dstart).numMessages = msgnum - start;
Unsafe.AsRef<BatchHeader>(dstart).seqNo = seqNo++;
Unsafe.AsRef<BatchHeader>(dstart).NumMessages = msgnum - start;
Unsafe.AsRef<BatchHeader>(dstart).SeqNo = seqNo++;
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)responseObject.obj.bufferPtr = -(payloadSize - sizeof(int));
Expand Down
12 changes: 3 additions & 9 deletions cs/remote/src/FASTER.server/ConnectionArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,15 @@
// Licensed under the MIT license.

using System.Net.Sockets;
using FASTER.core;
using FASTER.common;
using System;

namespace FASTER.server
{
class ConnectionArgs<Key, Value, Input, Output, Functions, ParameterSerializer>
where Functions : IFunctions<Key, Value, Input, Output, long>
where ParameterSerializer : IServerSerializer<Key, Value, Input, Output>
internal class ConnectionArgs
{
public Socket socket;
public FasterKV<Key, Value> store;
public Func<WireFormat, Functions> functionsGen;
public ParameterSerializer serializer;
public ServerSessionBase<Key, Value, Input, Output, Functions, ParameterSerializer> session;
public ServerSessionBase session;
public MaxSizeSettings maxSizeSettings;
public IFasterRemoteBackendProvider provider;
}
}
76 changes: 48 additions & 28 deletions cs/remote/src/FASTER.server/FasterKVServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System;
using System.Runtime.CompilerServices;
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Threading;

namespace FASTER.server
Expand All @@ -23,22 +24,20 @@ public sealed class FasterKVServer<Key, Value, Input, Output, Functions, Paramet
readonly SocketAsyncEventArgs acceptEventArg;
readonly Socket servSocket;
readonly MaxSizeSettings maxSizeSettings;
readonly ConcurrentDictionary<ServerSessionBase<Key, Value, Input, Output, Functions, ParameterSerializer>, byte> activeSessions;
readonly ConcurrentDictionary<ServerSessionBase, byte> activeSessions;
int activeSessionCount;
bool disposed;

/// <summary>
/// Constructor
/// </summary>
/// <param name="store">Instance of FasterKV store to use in server</param>
/// <param name="functionsGen">Functions generator (based on wire format)</param>
/// <param name="backendProvider"> Backend provider for this server</param>
/// <param name="address">IP address</param>
/// <param name="port">Port</param>
/// <param name="serializer">Parameter serializer</param>
/// <param name="maxSizeSettings">Max size settings</param>
public FasterKVServer(FasterKV<Key, Value> store, Func<WireFormat, Functions> functionsGen, string address, int port, ParameterSerializer serializer = default, MaxSizeSettings maxSizeSettings = default) : base()
public FasterKVServer(IFasterRemoteBackendProvider backendProvider, string address, int port, MaxSizeSettings maxSizeSettings = default)
{
activeSessions = new ConcurrentDictionary<ServerSessionBase<Key, Value, Input, Output, Functions, ParameterSerializer>, byte>();
activeSessions = new ConcurrentDictionary<ServerSessionBase, byte>();
activeSessionCount = 0;
disposed = false;

Expand All @@ -50,11 +49,26 @@ public FasterKVServer(FasterKV<Key, Value> store, Func<WireFormat, Functions> fu
servSocket.Listen(512);
acceptEventArg = new SocketAsyncEventArgs
{
UserToken = (store, functionsGen, serializer)
UserToken = backendProvider
};
acceptEventArg.Completed += AcceptEventArg_Completed;
}

/// <summary>
/// Constructor
/// </summary>
/// <param name="store">Instance of FasterKV store to use in server</param>
/// <param name="functionsGen">Functions generator (based on wire format)</param>
/// <param name="address">IP address</param>
/// <param name="port">Port</param>
/// <param name="serializer">Parameter serializer</param>
/// <param name="maxSizeSettings">Max size settings</param>
public FasterKVServer(FasterKV<Key, Value> store, Func<WireFormat, Functions> functionsGen, string address,
int port, ParameterSerializer serializer = default, MaxSizeSettings maxSizeSettings = default)
: this(new FasterKVBackendProvider<Key, Value, Functions, ParameterSerializer>(store, functionsGen, serializer), address, port, maxSizeSettings)
{
}

/// <summary>
/// Start server
/// </summary>
Expand Down Expand Up @@ -88,15 +102,13 @@ private bool HandleNewConnection(SocketAsyncEventArgs e)
var buffer = new byte[bufferSize];
receiveEventArgs.SetBuffer(buffer, 0, bufferSize);

var (store, functionsGen, serializer) = ((FasterKV<Key, Value>, Func<WireFormat, Functions>, ParameterSerializer))e.UserToken;
var backendProvider = (IFasterRemoteBackendProvider) e.UserToken;

var args = new ConnectionArgs<Key, Value, Input, Output, Functions, ParameterSerializer>
var args = new ConnectionArgs
{
socket = e.AcceptSocket,
store = store,
functionsGen = functionsGen,
serializer = serializer,
maxSizeSettings = maxSizeSettings
maxSizeSettings = maxSizeSettings,
provider = backendProvider
};

receiveEventArgs.UserToken = args;
Expand Down Expand Up @@ -141,7 +153,7 @@ private void DisposeActiveSessions()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool HandleReceiveCompletion(SocketAsyncEventArgs e)
{
var connArgs = (ConnectionArgs<Key, Value, Input, Output, Functions, ParameterSerializer>)e.UserToken;
var connArgs = (ConnectionArgs) e.UserToken;
if (e.BytesTransferred == 0 || e.SocketError != SocketError.Success || disposed)
{
DisposeConnectionSession(e);
Expand All @@ -160,22 +172,30 @@ private bool HandleReceiveCompletion(SocketAsyncEventArgs e)
return true;
}

private bool CreateSession(SocketAsyncEventArgs e)
private unsafe bool CreateSession(SocketAsyncEventArgs e)
{
var connArgs = (ConnectionArgs<Key, Value, Input, Output, Functions, ParameterSerializer>)e.UserToken;
var connArgs = (ConnectionArgs) e.UserToken;

if (e.BytesTransferred < 4)
{
e.SetBuffer(0, e.Buffer.Length);
return true;
}
if (e.BytesTransferred < 4) return false;

// FASTER's protocol family is identified by inverted size field in the start of a packet
if (e.Buffer[3] <= 127)
throw new FasterException("Unexpected wire format");

if (e.BytesTransferred < 4 + BatchHeader.Size) return false;

if (e.Buffer[3] > 127)
fixed (void* bh = &e.Buffer[4])
{
connArgs.session = new BinaryServerSession<Key, Value, Input, Output, Functions, ParameterSerializer>(connArgs.socket, connArgs.store, connArgs.functionsGen(WireFormat.Binary), connArgs.serializer, connArgs.maxSizeSettings);
switch (((BatchHeader *) bh)->Protocol)
{
case WireFormat.Binary:
var backend = connArgs.provider.GetBackendForProtocol<FasterKVBackend<Key, Value, Functions, ParameterSerializer>>(WireFormat.Binary);
connArgs.session = new BinaryServerSession<Key, Value, Input, Output, Functions, ParameterSerializer>(connArgs.socket, backend.store, backend.functionsGen(WireFormat.Binary), backend.serializer, connArgs.maxSizeSettings);
break;
default:
throw new FasterException("Unexpected wire format");
}
}
else
throw new FasterException("Unexpected wire format");

if (activeSessions.TryAdd(connArgs.session, default))
Interlocked.Increment(ref activeSessionCount);
Expand All @@ -192,7 +212,7 @@ private bool CreateSession(SocketAsyncEventArgs e)

private void DisposeConnectionSession(SocketAsyncEventArgs e)
{
var connArgs = (ConnectionArgs<Key, Value, Input, Output, Functions, ParameterSerializer>)e.UserToken;
var connArgs = (ConnectionArgs) e.UserToken;
connArgs.socket.Dispose();
e.Dispose();
var _session = connArgs.session;
Expand All @@ -208,7 +228,7 @@ private void DisposeConnectionSession(SocketAsyncEventArgs e)

private void RecvEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
var connArgs = (ConnectionArgs<Key, Value, Input, Output, Functions, ParameterSerializer>)e.UserToken;
var connArgs = (ConnectionArgs) e.UserToken;
do
{
// No more things to receive
Expand Down
Loading

0 comments on commit ef016eb

Please sign in to comment.