Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write unary content with single Stream.WriteAsync #901

Merged
merged 3 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/Grpc.Net.Client.Web/Internal/Base64RequestStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,20 @@ private static void EnsureSuccess(OperationStatus status, OperationStatus expect
}

public override async Task FlushAsync(CancellationToken cancellationToken)
{
await WriteRemainderAsync(cancellationToken).ConfigureAwait(false);
await _inner.FlushAsync(cancellationToken).ConfigureAwait(false);
}

internal async Task WriteRemainderAsync(CancellationToken cancellationToken)
{
if (_remainder > 0)
{
EnsureSuccess(Base64.EncodeToUtf8InPlace(_buffer, _remainder, out var bytesWritten));

await _inner.WriteAsync(_buffer.AsMemory(0, bytesWritten), cancellationToken);
await _inner.WriteAsync(_buffer.AsMemory(0, bytesWritten), cancellationToken).ConfigureAwait(false);
_remainder = 0;
}

await _inner.FlushAsync(cancellationToken).ConfigureAwait(false);
}

protected override void Dispose(bool disposing)
Expand Down
30 changes: 13 additions & 17 deletions src/Grpc.Net.Client.Web/Internal/GrpcWebRequestContent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,28 @@ public GrpcWebRequestContent(HttpContent inner, GrpcWebMode mode)
_mode = mode;
foreach (var header in inner.Headers)
{
Headers.Add(header.Key, header.Value);
Headers.TryAddWithoutValidation(header.Key, header.Value);
}

Headers.ContentType = (mode == GrpcWebMode.GrpcWebText)
? GrpcWebProtocolConstants.GrpcWebTextHeader
: GrpcWebProtocolConstants.GrpcWebHeader;
}

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
Base64RequestStream? base64RequestStream = null;
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) =>
_mode == GrpcWebMode.GrpcWebText
? SerializeTextToStreamAsync(stream)
: _inner.CopyToAsync(stream);

try
{
if (_mode == GrpcWebMode.GrpcWebText)
{
base64RequestStream = new Base64RequestStream(stream);
stream = base64RequestStream;
}
private async Task SerializeTextToStreamAsync(Stream stream)
{
using var base64RequestStream = new Base64RequestStream(stream);
await _inner.CopyToAsync(base64RequestStream).ConfigureAwait(false);

await _inner.CopyToAsync(stream).ConfigureAwait(false);
}
finally
{
base64RequestStream?.Dispose();
}
// Any remaining content needs to be written when SerializeToStreamAsync finishes.
// We want to avoid unnecessary flush calls so a custom method is used to write
// ramining content rather than calling FlushAsync.
await base64RequestStream.WriteRemainderAsync(CancellationToken.None).ConfigureAwait(false);
}

protected override bool TryComputeLength(out long length)
Expand Down
23 changes: 0 additions & 23 deletions src/Grpc.Net.Client/Internal/DefaultSerializationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,28 +135,5 @@ public override void Complete()
break;
}
}

public Memory<byte> GetHeader(bool isCompressed, int length)
{
// TODO(JamesNK): We can optimize header allocation when IBufferWriter is being used.
// IBufferWriter can be used to provide a buffer, either before or after message content.
// https://github.com/grpc/grpc-dotnet/issues/784
var buffer = new byte[GrpcProtocolConstants.HeaderSize];

// Compression flag
buffer[0] = isCompressed ? (byte)1 : (byte)0;

// Message length
EncodeMessageLength(length, buffer.AsSpan(1, 4));

return buffer;
}

private static void EncodeMessageLength(int messageLength, Span<byte> destination)
{
Debug.Assert(destination.Length >= GrpcProtocolConstants.MessageDelimiterSize, "Buffer too small to encode message length.");

BinaryPrimitives.WriteUInt32BigEndian(destination, (uint)messageLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ private async Task WriteAsyncCore(TRequest message)
callOptions = callOptions.WithWriteOptions(WriteOptions);
}

await _call.WriteMessageAsync(writeStream, message, _grpcEncoding, callOptions);
await _call.WriteMessageAsync(writeStream, message, _grpcEncoding, callOptions).ConfigureAwait(false);

// Flush stream to ensure messages are sent immediately
await writeStream.FlushAsync(callOptions.CancellationToken).ConfigureAwait(false);

GrpcEventSource.Log.MessageSent();
}
Expand Down
36 changes: 33 additions & 3 deletions src/Grpc.Net.Client/Internal/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,23 @@ public static async ValueTask WriteMessageAsync<TMessage>(
data);
}

await stream.WriteAsync(serializationContext.GetHeader(isCompressed, data.Length), callOptions.CancellationToken).ConfigureAwait(false);
await stream.WriteAsync(data, callOptions.CancellationToken).ConfigureAwait(false);
await stream.FlushAsync(callOptions.CancellationToken).ConfigureAwait(false);
var totalSize = data.Length + GrpcProtocolConstants.HeaderSize;
var completeData = ArrayPool<byte>.Shared.Rent(totalSize);
try
{
var buffer = completeData.AsMemory(0, totalSize);

WriteHeader(buffer.Span.Slice(0, GrpcProtocolConstants.HeaderSize), isCompressed, data.Length);
data.CopyTo(buffer.Slice(GrpcProtocolConstants.HeaderSize));

// Sending the header+content in a single WriteAsync call has significant performance benefits
// https://github.com/dotnet/runtime/issues/35184#issuecomment-626304981
await stream.WriteAsync(buffer, callOptions.CancellationToken).ConfigureAwait(false);
}
finally
{
ArrayPool<byte>.Shared.Return(completeData);
}

GrpcCallLog.MessageSent(logger);
}
Expand Down Expand Up @@ -328,5 +342,21 @@ private static ReadOnlyMemory<byte> CompressMessage(ILogger logger, string compr
// Should never reach here
throw new InvalidOperationException($"Could not find compression provider for '{compressionEncoding}'.");
}

private static void WriteHeader(Span<byte> headerData, bool isCompressed, int length)
{
// Compression flag
headerData[0] = isCompressed ? (byte)1 : (byte)0;

// Message length
EncodeMessageLength(length, headerData.Slice(1, 4));
}

private static void EncodeMessageLength(int messageLength, Span<byte> destination)
{
Debug.Assert(destination.Length >= GrpcProtocolConstants.MessageDelimiterSize, "Buffer too small to encode message length.");

BinaryPrimitives.WriteUInt32BigEndian(destination, (uint)messageLength);
}
}
}