Skip to content

Commit

Permalink
Use zero-byte reads in StreamCopier (#1415)
Browse files Browse the repository at this point in the history
* Use zero-byte reads in StreamCopier

* Fix StreamCopier telemetry test

* Reset activity timeout on EOF

* Start with a zero-byte read

* Fix tests that expected a single read
  • Loading branch information
MihaZupan authored Nov 30, 2021
1 parent 8b13188 commit e7ed7ea
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 110 deletions.
239 changes: 135 additions & 104 deletions src/ReverseProxy/Forwarder/StreamCopier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

using System;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -18,142 +20,171 @@ internal static class StreamCopier
// Based on performance investigations, see https://github.com/microsoft/reverse-proxy/pull/330#issuecomment-758851852.
private const int DefaultBufferSize = 65536;

private static readonly TimeSpan TimeBetweenTransferringEvents = TimeSpan.FromSeconds(1);

/// <inheritdoc/>
/// <remarks>
/// Based on <c>Microsoft.AspNetCore.Http.StreamCopyOperationInternal.CopyToAsync</c>.
/// See: <see href="https://github.com/dotnet/aspnetcore/blob/080660967b6043f731d4b7163af9e9e6047ef0c4/src/Http/Shared/StreamCopyOperationInternal.cs"/>.
/// </remarks>
public static async ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, IClock clock, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, IClock clock, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
{
_ = input ?? throw new ArgumentNullException(nameof(input));
_ = output ?? throw new ArgumentNullException(nameof(output));

var telemetryEnabled = ForwarderTelemetry.Log.IsEnabled();
Debug.Assert(input is not null);
Debug.Assert(output is not null);
Debug.Assert(clock is not null);
Debug.Assert(activityToken is not null);

var buffer = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
var reading = true;
// Avoid capturing 'isRequest' and 'clock' in the state machine when telemetry is disabled
var telemetry = ForwarderTelemetry.Log.IsEnabled(EventLevel.Informational, EventKeywords.All)
? new StreamCopierTelemetry(isRequest, clock)
: null;

long contentLength = 0;
long iops = 0;
var readTime = TimeSpan.Zero;
var writeTime = TimeSpan.Zero;
var firstReadTime = TimeSpan.FromMilliseconds(-1);
return CopyAsync(input, output, telemetry, activityToken, cancellation);
}

private static async ValueTask<(StreamCopyResult, Exception?)> CopyAsync(Stream input, Stream output, StreamCopierTelemetry? telemetry, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
{
var buffer = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
var read = 0;
try
{
var lastTime = TimeSpan.Zero;
var nextTransferringEvent = TimeSpan.Zero;

if (telemetryEnabled)
{
ForwarderTelemetry.Log.ForwarderStage(isRequest ? ForwarderStage.RequestContentTransferStart : ForwarderStage.ResponseContentTransferStart);

lastTime = clock.GetStopwatchTime();
nextTransferringEvent = lastTime + TimeBetweenTransferringEvents;
}

while (true)
{
if (cancellation.IsCancellationRequested)
{
return (StreamCopyResult.Canceled, new OperationCanceledException(cancellation));
}
read = 0;

reading = true;
var read = 0;
try
// Issue a zero-byte read to the input stream to defer buffer allocation until data is available.
// Note that if the underlying stream does not supporting blocking on zero byte reads, then this will
// complete immediately and won't save any memory, but will still function correctly.
var zeroByteReadTask = input.ReadAsync(Memory<byte>.Empty, cancellation);
if (zeroByteReadTask.IsCompletedSuccessfully)
{
read = await input.ReadAsync(buffer.AsMemory(), cancellation);

// Success, reset the activity monitor.
activityToken.ResetTimeout();
// Consume the ValueTask's result in case it is backed by an IValueTaskSource
_ = zeroByteReadTask.Result;
}
finally
else
{
if (telemetryEnabled)
{
contentLength += read;
iops++;

var readStop = clock.GetStopwatchTime();
var currentReadTime = readStop - lastTime;
lastTime = readStop;
readTime += currentReadTime;
if (firstReadTime.Ticks < 0)
{
firstReadTime = currentReadTime;
}
}
// Take care not to return the same buffer to the pool twice in case zeroByteReadTask throws
var bufferToReturn = buffer;
buffer = null;
ArrayPool<byte>.Shared.Return(bufferToReturn);

await zeroByteReadTask;

buffer = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
}

read = await input.ReadAsync(buffer.AsMemory(), cancellation);

telemetry?.AfterRead(read);

// Success, reset the activity monitor.
activityToken.ResetTimeout();

// End of the source stream.
if (read == 0)
{
return (StreamCopyResult.Success, null);
}

if (cancellation.IsCancellationRequested)
{
return (StreamCopyResult.Canceled, new OperationCanceledException(cancellation));
}
await output.WriteAsync(buffer.AsMemory(0, read), cancellation);

reading = false;
try
{
await output.WriteAsync(buffer.AsMemory(0, read), cancellation);
telemetry?.AfterWrite();

// Success, reset the activity monitor.
activityToken.ResetTimeout();
}
finally
{
if (telemetryEnabled)
{
var writeStop = clock.GetStopwatchTime();
writeTime += writeStop - lastTime;
lastTime = writeStop;
if (lastTime >= nextTransferringEvent)
{
ForwarderTelemetry.Log.ContentTransferring(
isRequest,
contentLength,
iops,
readTime.Ticks,
writeTime.Ticks);

// Avoid attributing the time taken by logging ContentTransferring to the next read call
lastTime = clock.GetStopwatchTime();
nextTransferringEvent = lastTime + TimeBetweenTransferringEvents;
}
}
}
// Success, reset the activity monitor.
activityToken.ResetTimeout();
}
}
catch (OperationCanceledException oex)
{
return (StreamCopyResult.Canceled, oex);
}
catch (Exception ex)
{
return (reading ? StreamCopyResult.InputError : StreamCopyResult.OutputError, ex);
if (read == 0)
{
telemetry?.AfterRead(0);
}
else
{
telemetry?.AfterWrite();
}

var result = ex is OperationCanceledException ? StreamCopyResult.Canceled :
(read == 0 ? StreamCopyResult.InputError : StreamCopyResult.OutputError);

return (result, ex);
}
finally
{
// We can afford the perf impact of clearArray == true since we only do this twice per request.
ArrayPool<byte>.Shared.Return(buffer, clearArray: true);
if (buffer is not null)
{
ArrayPool<byte>.Shared.Return(buffer);
}

telemetry?.Stop();
}
}

private sealed class StreamCopierTelemetry
{
private static readonly TimeSpan _timeBetweenTransferringEvents = TimeSpan.FromSeconds(1);

private readonly bool _isRequest;
private readonly IClock _clock;
private long _contentLength;
private long _iops;
private TimeSpan _readTime;
private TimeSpan _writeTime;
private TimeSpan _firstReadTime;
private TimeSpan _lastTime;
private TimeSpan _nextTransferringEvent;

public StreamCopierTelemetry(bool isRequest, IClock clock)
{
_isRequest = isRequest;
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_firstReadTime = new TimeSpan(-1);

if (telemetryEnabled)
ForwarderTelemetry.Log.ForwarderStage(isRequest ? ForwarderStage.RequestContentTransferStart : ForwarderStage.ResponseContentTransferStart);

_lastTime = clock.GetStopwatchTime();
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
}

public void AfterRead(int read)
{
_contentLength += read;
_iops++;

var readStop = _clock.GetStopwatchTime();
var currentReadTime = readStop - _lastTime;
_lastTime = readStop;
_readTime += currentReadTime;
if (_firstReadTime.Ticks < 0)
{
ForwarderTelemetry.Log.ContentTransferred(
isRequest,
contentLength,
iops,
readTime.Ticks,
writeTime.Ticks,
Math.Max(0, firstReadTime.Ticks));
_firstReadTime = currentReadTime;
}
}

public void AfterWrite()
{
var writeStop = _clock.GetStopwatchTime();
_writeTime += writeStop - _lastTime;
_lastTime = writeStop;

if (writeStop >= _nextTransferringEvent)
{
ForwarderTelemetry.Log.ContentTransferring(
_isRequest,
_contentLength,
_iops,
_readTime.Ticks,
_writeTime.Ticks);

// Avoid attributing the time taken by logging ContentTransferring to the next read call
_lastTime = _clock.GetStopwatchTime();
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
}
}

public void Stop()
{
ForwarderTelemetry.Log.ContentTransferred(
_isRequest,
_contentLength,
_iops,
_readTime.Ticks,
_writeTime.Ticks,
Math.Max(0, _firstReadTime.Ticks));
}
}
}
16 changes: 12 additions & 4 deletions test/ReverseProxy.Tests/Forwarder/HttpForwarderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ public async Task RequestWithBody_KeptAliveByActivity()
httpContext.Request.Method = "POST";
httpContext.Request.Body = new CallbackReadStream(async (memory, ct) =>
{
if (reads >= expectedReads)
if (memory.Length == 0 || reads >= expectedReads)
{
return 0;
}
Expand Down Expand Up @@ -1501,7 +1501,7 @@ public async Task ResponseBodyCancelled_502()
Assert.Empty(httpContext.Response.Headers);
var errorFeature = httpContext.Features.Get<IForwarderErrorFeature>();
Assert.Equal(ForwarderError.ResponseBodyCanceled, errorFeature.Error);
Assert.IsType<OperationCanceledException>(errorFeature.Exception);
Assert.IsType<TaskCanceledException>(errorFeature.Exception);

AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error);
events.AssertContainProxyStages(hasRequestContent: false);
Expand Down Expand Up @@ -1542,7 +1542,7 @@ public async Task ResponseBodyCancelledAfterStart_Aborted()
Assert.Equal("bytes", httpContext.Response.Headers[HeaderNames.AcceptRanges]);
var errorFeature = httpContext.Features.Get<IForwarderErrorFeature>();
Assert.Equal(ForwarderError.ResponseBodyCanceled, errorFeature.Error);
Assert.IsType<OperationCanceledException>(errorFeature.Exception);
Assert.IsType<TaskCanceledException>(errorFeature.Exception);

AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error);
events.AssertContainProxyStages(hasRequestContent: false);
Expand Down Expand Up @@ -2299,6 +2299,11 @@ public ThrowStream(bool throwOnFirstRead = true)

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (buffer.Length == 0)
{
return new ValueTask<int>(0);
}

cancellationToken.ThrowIfCancellationRequested();
if (_firstRead && !ThrowOnFirstRead)
{
Expand Down Expand Up @@ -2456,7 +2461,10 @@ public OnCompletedReadStream(Action onCompleted)

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
OnCompleted();
if (buffer.Length != 0)
{
OnCompleted();
}
return new ValueTask<int>(0);
}
}
Expand Down
9 changes: 7 additions & 2 deletions test/ReverseProxy.Tests/Forwarder/StreamCopierTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ public async Task Cancelled_Reported(bool isRequest)

using var cts = ActivityCancellationTokenSource.Rent(TimeSpan.FromSeconds(10), CancellationToken.None);
cts.Cancel();
var (result, error) = await StreamCopier.CopyAsync(isRequest, source, destination, new Clock(), cts, cts.Token);
var (result, error) = await StreamCopier.CopyAsync(isRequest, source, destination, new ManualClock(), cts, cts.Token);
Assert.Equal(StreamCopyResult.Canceled, result);
Assert.IsAssignableFrom<OperationCanceledException>(error);

AssertContentTransferred(events, isRequest,
contentLength: 0,
iops: 0,
iops: 1,
firstReadTime: TimeSpan.Zero,
readTime: TimeSpan.Zero,
writeTime: TimeSpan.Zero);
Expand Down Expand Up @@ -306,6 +306,11 @@ public SlowStream(Stream innerStream, ManualClock clock, TimeSpan waitTime)

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (buffer.Length == 0)
{
return new ValueTask<int>(0);
}

_clock.AdvanceClockBy(_waitTime);
return base.ReadAsync(buffer.Slice(0, Math.Min(buffer.Length, MaxBytesPerRead)), cancellationToken);
}
Expand Down
10 changes: 10 additions & 0 deletions test/ReverseProxy.Tests/Forwarder/StreamCopyHttpContentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public async Task SerializeToStreamAsync_RespectsContentCancellation()

var source = new ReadDelegatingStream(new MemoryStream(), async (buffer, cancellation) =>
{
if (buffer.Length == 0)
{
return 0;
}

Assert.False(cancellation.IsCancellationRequested);
await tcs.Task;
Assert.True(cancellation.IsCancellationRequested);
Expand All @@ -160,6 +165,11 @@ public async Task SerializeToStreamAsync_CanBeCanceledExternally()

var source = new ReadDelegatingStream(new MemoryStream(), async (buffer, cancellation) =>
{
if (buffer.Length == 0)
{
return 0;
}

Assert.False(cancellation.IsCancellationRequested);
await tcs.Task;
Assert.True(cancellation.IsCancellationRequested);
Expand Down

0 comments on commit e7ed7ea

Please sign in to comment.