Skip to content

Commit

Permalink
Merge pull request #163 from AArnott/fix162
Browse files Browse the repository at this point in the history
Fix Stream disposal when ReadAsync ignores CancellationToken
  • Loading branch information
AArnott authored Feb 9, 2020
2 parents 82ff5e9 + a84bbe6 commit 3cfffad
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 68 deletions.
12 changes: 0 additions & 12 deletions src/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -982,18 +982,6 @@ private static void AssertNoFault(MultiplexingStream? stream)
}
}

private static Task<T[]> WhenAllSucceedOrAnyFail<T>(params Task<T>[] tasks)
{
var tcs = new TaskCompletionSource<T[]>();
Task.WhenAll(tasks).ApplyResultTo(tcs);
foreach (var task in tasks)
{
task.ContinueWith(t => tcs.TrySetException(t.Exception), CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default).Forget();
}

return tcs.Task;
}

private async Task WaitForEphemeralChannelOfferToPropagateAsync()
{
// Propagation of ephemeral channel offers must occur before the remote end can accept it.
Expand Down
55 changes: 54 additions & 1 deletion src/Nerdbank.Streams.Tests/PipeExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.IO.Pipes;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
Expand All @@ -15,6 +16,7 @@
using Nerdbank.Streams;
using Xunit;
using Xunit.Abstractions;
using IPC = System.IO.Pipes;

public partial class PipeExtensionsTests : TestBase
{
Expand Down Expand Up @@ -56,6 +58,54 @@ public async Task UsePipe_Stream_Disposal()
await this.AssertStreamClosesAsync(ms);
}

/// <summary>
/// Verify that completing the <see cref="PipeReader"/> and <see cref="PipeWriter"/> lead to the disposal of the
/// IPC <see cref="Stream"/> on both sides.
/// </summary>
/// <remarks>
/// This is worth a special test because on .NET Framework, IPC stream reads are not cancelable.
/// </remarks>
[Fact]
public async Task UsePipe_IpcPipeStream_Disposal()
{
var guid = Guid.NewGuid().ToString();

var ipcServerTask = Task.Run(async delegate
{
using var ipcServerPipe = new NamedPipeServerStream(guid, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, IPC.PipeOptions.Asynchronous);
await ipcServerPipe.WaitForConnectionAsync(this.TimeoutToken);
int bytesRead = await ipcServerPipe.ReadAsync(new byte[1], 0, 1, this.TimeoutToken).WithCancellation(this.TimeoutToken);
ipcServerPipe.Dispose();
this.Logger.WriteLine("The server stream closed.");
});
var ipcClientTask = Task.Run(async delegate
{
using var ipcClientPipe = new NamedPipeClientStream(".", guid, IPC.PipeDirection.InOut, IPC.PipeOptions.Asynchronous);
await ipcClientPipe.ConnectAsync(this.TimeoutToken);

// We need to time this so that we don't call Complete() until reading from the PipeStream has already started.
// Use our MonitoringStream for this purpose, and also to know when Dispose is called.
var monitoredStream = new MonitoringStream(ipcClientPipe);
var disposed = new TaskCompletionSource<object?>();
var readStarted = new TaskCompletionSource<object?>();
monitoredStream.Disposed += (s, e) => disposed.SetResult(null);
monitoredStream.WillRead += (s, e) => readStarted.SetResult(null);
monitoredStream.WillReadMemory += (s, e) => readStarted.SetResult(null);
monitoredStream.WillReadSpan += (s, e) => readStarted.SetResult(null);

IDuplexPipe pipe = monitoredStream.UsePipe(cancellationToken: this.TimeoutToken);
await readStarted.Task.WithCancellation(this.TimeoutToken);

pipe.Output.Complete();
pipe.Input.Complete();

await disposed.Task.WithCancellation(this.TimeoutToken);
this.Logger.WriteLine("The client stream closed.");
});

await WhenAllSucceedOrAnyFail(ipcClientTask, ipcServerTask);
}

[Theory]
[PairwiseData]
public async Task UsePipe_Stream_OneDirectionDoesNotDispose(bool completeOutput)
Expand Down Expand Up @@ -237,7 +287,10 @@ private async Task AssertStreamClosesAsync(Stream stream)
{
Requires.NotNull(stream, nameof(stream));

Func<bool> isDisposed = stream is IDisposableObservable observableStream ? new Func<bool>(() => observableStream.IsDisposed) : new Func<bool>(() => !stream.CanRead && !stream.CanWrite);
Func<bool> isDisposed =
stream is IDisposableObservable observableStream ? new Func<bool>(() => observableStream.IsDisposed) :
stream is PipeStream pipeStream ? new Func<bool>(() => !pipeStream.IsConnected) :
new Func<bool>(() => !stream.CanRead && !stream.CanWrite);

while (!this.TimeoutToken.IsCancellationRequested && !isDisposed())
{
Expand Down
24 changes: 24 additions & 0 deletions src/Nerdbank.Streams.Tests/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,30 @@ internal Task<bool> ExecuteInIsolationAsync(object testClass, string testMethodN
return this.ExecuteInIsolationAsync(testClass.GetType().FullName, testMethodName, logger);
}

protected static Task WhenAllSucceedOrAnyFail(params Task[] tasks)
{
var tcs = new TaskCompletionSource<int>();
Task.WhenAll(tasks).ApplyResultTo(tcs);
foreach (var task in tasks)
{
task.ContinueWith(t => tcs.TrySetException(t.Exception!), CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default).Forget();
}

return tcs.Task;
}

protected static Task<T[]> WhenAllSucceedOrAnyFail<T>(params Task<T>[] tasks)
{
var tcs = new TaskCompletionSource<T[]>();
Task.WhenAll(tasks).ApplyResultTo(tcs);
foreach (var task in tasks)
{
task.ContinueWith(t => tcs.TrySetException(t.Exception!), CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default).Forget();
}

return tcs.Task;
}

/// <summary>
/// Executes the specified test method in its own process, offering maximum isolation from ambient noise from other threads
/// and GC.
Expand Down
144 changes: 89 additions & 55 deletions src/Nerdbank.Streams/PipeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,56 +73,7 @@ public static partial class PipeExtensions
/// </remarks>
public static PipeReader UsePipeReader(this Stream stream, int sizeHint = 0, PipeOptions? pipeOptions = null, CancellationToken cancellationToken = default)
{
Requires.NotNull(stream, nameof(stream));
Requires.Argument(stream.CanRead, nameof(stream), "Stream must be readable.");

var pipe = new Pipe(pipeOptions ?? PipeOptions.Default);

// Notice when the pipe reader isn't listening any more, and terminate our loop that reads from the stream.
var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
pipe.Writer.OnReaderCompleted((ex, state) => ((CancellationTokenSource)state).Cancel(), combinedTokenSource);

Task.Run(async delegate
{
while (!combinedTokenSource.Token.IsCancellationRequested)
{
Memory<byte> memory = pipe.Writer.GetMemory(sizeHint);
try
{
int bytesRead = await stream.ReadAsync(memory, combinedTokenSource.Token).ConfigureAwait(false);
if (bytesRead == 0)
{
break;
}

pipe.Writer.Advance(bytesRead);
}
catch (OperationCanceledException)
{
break;
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception ex)
{
// Propagate the exception to the reader.
pipe.Writer.Complete(ex);
return;
}

FlushResult result = await pipe.Writer.FlushAsync().ConfigureAwait(false);
if (result.IsCompleted)
{
break;
}
}

// Tell the PipeReader that there's no more data coming
pipe.Writer.Complete();
}).Forget();
return pipe.Reader;
return UsePipeReader(stream, sizeHint, pipeOptions, null, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -244,14 +195,14 @@ public static IDuplexPipe UsePipe(this Stream stream, bool allowUnwrap, int size
return new DuplexPipe(pipeStream.UnderlyingPipeReader, pipeStream.UnderlyingPipeWriter);
}

PipeReader? input = stream.CanRead ? stream.UsePipeReader(sizeHint, pipeOptions, cancellationToken) : null;
PipeWriter? output = stream.CanWrite ? stream.UsePipeWriter(pipeOptions, cancellationToken) : null;
PipeReader? input = stream.CanRead ? stream.UsePipeReader(sizeHint, pipeOptions, output?.WaitForReaderCompletionAsync(), cancellationToken) : null;

Task closeStreamAntecedent;
Task? closeStreamAntecedent;
if (input != null && output != null)
{
// Arrange for closing the stream when *both* input/output are completed.
closeStreamAntecedent = Task.WhenAll(input.WaitForWriterCompletionAsync(), output.WaitForReaderCompletionAsync());
// The UsePipeReader function will be responsible for disposing the stream in this case.
closeStreamAntecedent = null;
}
else if (input != null)
{
Expand All @@ -263,7 +214,7 @@ public static IDuplexPipe UsePipe(this Stream stream, bool allowUnwrap, int size
closeStreamAntecedent = output.WaitForReaderCompletionAsync();
}

closeStreamAntecedent.ContinueWith((_, state) => ((Stream)state).Dispose(), stream, cancellationToken, TaskContinuationOptions.None, TaskScheduler.Default).Forget();
closeStreamAntecedent?.ContinueWith((_, state) => ((Stream)state).Dispose(), stream, cancellationToken, TaskContinuationOptions.None, TaskScheduler.Default).Forget();
return new DuplexPipe(input, output);
}

Expand Down Expand Up @@ -463,6 +414,89 @@ internal static Task LinkToAsync(this PipeReader reader, PipeWriter writer, bool
});
}

/// <summary>
/// Enables efficiently reading a stream using <see cref="PipeReader"/>.
/// </summary>
/// <param name="stream">The stream to read from using a pipe.</param>
/// <param name="sizeHint">A hint at the size of messages that are commonly transferred. Use 0 for a commonly reasonable default.</param>
/// <param name="pipeOptions">Optional pipe options to use.</param>
/// <param name="disposeWhenReaderCompleted">A task which, when complete, signals that this method should dispose of the <paramref name="stream"/>.</param>
/// <param name="cancellationToken">A cancellation token that aborts reading from the <paramref name="stream"/>.</param>
/// <returns>A <see cref="PipeReader"/>.</returns>
/// <remarks>
/// When the caller invokes <see cref="PipeReader.Complete(Exception)"/> on the result value,
/// this leads to the associated <see cref="PipeWriter.Complete(Exception)"/> to be automatically called as well.
/// </remarks>
private static PipeReader UsePipeReader(this Stream stream, int sizeHint = 0, PipeOptions? pipeOptions = null, Task? disposeWhenReaderCompleted = null, CancellationToken cancellationToken = default)
{
Requires.NotNull(stream, nameof(stream));
Requires.Argument(stream.CanRead, nameof(stream), "Stream must be readable.");

var pipe = new Pipe(pipeOptions ?? PipeOptions.Default);

// Notice when the pipe reader isn't listening any more, and terminate our loop that reads from the stream.
var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
pipe.Writer.OnReaderCompleted((ex, state) => ((CancellationTokenSource)state).Cancel(), combinedTokenSource);

// When this argument is provided, it provides a means to ensure we don't hang while reading from an I/O pipe
// that doesn't respect the CancellationToken. Disposing a Stream while reading is a means to terminate the ReadAsync operation.
if (disposeWhenReaderCompleted is object)
{
disposeWhenReaderCompleted.ContinueWith(
(_, s1) =>
{
var tuple = (Tuple<Pipe, Stream>)s1;
tuple.Item1.Writer.OnReaderCompleted((ex, s2) => ((Stream)s2).Dispose(), tuple.Item2);
},
Tuple.Create(pipe, stream),
cancellationToken,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default).Forget();
}

Task.Run(async delegate
{
while (!combinedTokenSource.Token.IsCancellationRequested)
{
Memory<byte> memory = pipe.Writer.GetMemory(sizeHint);
try
{
int bytesRead = await stream.ReadAsync(memory, combinedTokenSource.Token).ConfigureAwait(false);
if (bytesRead == 0)
{
break;
}

pipe.Writer.Advance(bytesRead);
}
catch (OperationCanceledException)
{
break;
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception ex)
{
// Propagate the exception to the reader.
pipe.Writer.Complete(ex);
return;
}

FlushResult result = await pipe.Writer.FlushAsync().ConfigureAwait(false);
if (result.IsCompleted)
{
break;
}
}

// Tell the PipeReader that there's no more data coming
pipe.Writer.Complete();
}).Forget();
return pipe.Reader;
}

/// <summary>
/// Copies a sequence of bytes to a <see cref="PipeWriter"/>.
/// </summary>
Expand Down

0 comments on commit 3cfffad

Please sign in to comment.