diff --git a/README.md b/README.md index 769c9e06..5a6e633c 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Specialized .NET Stream classes ## Features -1. [`HalfDuplexStream`](doc/HalfDuplexStream.md) is meant to allow two parties to communicate *one direction*. +1. [`SimplexStream`](doc/SimplexStream.md) is meant to allow two parties to communicate *one direction*. Anything written to the stream can subsequently be read from it. You can share this `Stream` with any two parties (in the same AppDomain) and one can send messages to the other. 1. [`FullDuplexStream`](doc/FullDuplexStream.md) creates a pair of bidirectional streams for diff --git a/doc/HalfDuplexStream.md b/doc/SimplexStream.md similarity index 100% rename from doc/HalfDuplexStream.md rename to doc/SimplexStream.md diff --git a/src/Nerdbank.Streams.Tests/HalfDuplexStreamTests.cs b/src/Nerdbank.Streams.Tests/HalfDuplexStreamTests.cs index f92dc094..f6bd851c 100644 --- a/src/Nerdbank.Streams.Tests/HalfDuplexStreamTests.cs +++ b/src/Nerdbank.Streams.Tests/HalfDuplexStreamTests.cs @@ -13,6 +13,8 @@ using Xunit; using Xunit.Abstractions; +#pragma warning disable CS0618 // Type or member is obsolete + public class HalfDuplexStreamTests : TestBase { private const int ResumeThreshold = 39; diff --git a/src/Nerdbank.Streams.Tests/PipeExtensionsTests.cs b/src/Nerdbank.Streams.Tests/PipeExtensionsTests.cs index 202cb1f7..99535aa2 100644 --- a/src/Nerdbank.Streams.Tests/PipeExtensionsTests.cs +++ b/src/Nerdbank.Streams.Tests/PipeExtensionsTests.cs @@ -38,7 +38,7 @@ public void UsePipeWriter_WebSocket_ThrowsOnNull() [Fact] public async Task UsePipe_Stream() { - var ms = new HalfDuplexStream(); + var ms = new SimplexStream(); IDuplexPipe pipe = ms.UsePipe(cancellationToken: this.TimeoutToken); await pipe.Output.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken); var readResult = await pipe.Input.ReadAsync(this.TimeoutToken); @@ -49,7 +49,7 @@ public async Task UsePipe_Stream() [Fact] public async Task UsePipe_Stream_Disposal() { - var ms = new HalfDuplexStream(); + var ms = new SimplexStream(); IDuplexPipe pipe = ms.UsePipe(cancellationToken: this.TimeoutToken); pipe.Output.Complete(); pipe.Input.Complete(); @@ -60,7 +60,7 @@ public async Task UsePipe_Stream_Disposal() [PairwiseData] public async Task UsePipe_Stream_OneDirectionDoesNotDispose(bool completeOutput) { - var ms = new HalfDuplexStream(); + var ms = new SimplexStream(); IDuplexPipe pipe = ms.UsePipe(cancellationToken: this.TimeoutToken); if (completeOutput) { diff --git a/src/Nerdbank.Streams.Tests/SimplexStreamTests.cs b/src/Nerdbank.Streams.Tests/SimplexStreamTests.cs new file mode 100644 index 00000000..869fa45b --- /dev/null +++ b/src/Nerdbank.Streams.Tests/SimplexStreamTests.cs @@ -0,0 +1,275 @@ +// Copyright (c) Andrew Arnott. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.VisualStudio.Threading; +using Nerdbank.Streams; +using Xunit; +using Xunit.Abstractions; + +public class SimplexStreamTests : TestBase +{ + private const int ResumeThreshold = 39; + + private const int PauseThreshold = 40; + + private readonly Random random = new Random(); + + private SimplexStream stream = new SimplexStream(ResumeThreshold, PauseThreshold); + + public SimplexStreamTests(ITestOutputHelper logger) + : base(logger) + { + } + + [Fact] + public void DefaultCtor() + { + var stream = new SimplexStream(); + stream.Dispose(); + } + + [Fact] + public void CanSeek() => Assert.False(this.stream.CanSeek); + + [Fact] + public void Length() + { + Assert.Throws(() => this.stream.Length); + this.stream.Dispose(); + Assert.Throws(() => this.stream.Length); + } + + [Fact] + public void Position() + { + Assert.Throws(() => this.stream.Position); + Assert.Throws(() => this.stream.Position = 0); + this.stream.Dispose(); + Assert.Throws(() => this.stream.Position); + Assert.Throws(() => this.stream.Position = 0); + } + + [Fact] + public void IsDisposed() + { + Assert.False(this.stream.IsDisposed); + this.stream.Dispose(); + Assert.True(this.stream.IsDisposed); + } + + [Fact] + public void SetLength() + { + Assert.Throws(() => this.stream.SetLength(0)); + this.stream.Dispose(); + Assert.Throws(() => this.stream.SetLength(0)); + } + + [Fact] + public void Seek() + { + Assert.Throws(() => this.stream.Seek(0, SeekOrigin.Begin)); + this.stream.Dispose(); + Assert.Throws(() => this.stream.Seek(0, SeekOrigin.Begin)); + } + + [Fact] + public void CanRead() + { + Assert.True(this.stream.CanRead); + this.stream.Dispose(); + Assert.False(this.stream.CanRead); + } + + [Fact] + public void CanWrite() + { + Assert.True(this.stream.CanWrite); + this.stream.Dispose(); + Assert.False(this.stream.CanWrite); + } + + [Fact] + public void Flush() + { + this.stream.Flush(); + Assert.True(this.stream.FlushAsync().IsCompleted); + } + + [Theory] + [PairwiseData] + public async Task WriteThenRead(bool useAsync) + { + byte[] sendBuffer = this.GetRandomBuffer(20); + await this.WriteAsync(sendBuffer, 0, sendBuffer.Length, useAsync); + await this.stream.FlushAsync(this.TimeoutToken); + byte[] recvBuffer = new byte[sendBuffer.Length]; + await this.ReadAsync(this.stream, recvBuffer, isAsync: useAsync); + Assert.Equal(sendBuffer, recvBuffer); + } + + [Theory] + [PairwiseData] + public async Task Write_InputValidation(bool useAsync) + { + await Assert.ThrowsAsync(() => this.WriteAsync(null!, 0, 0, isAsync: useAsync)); + await Assert.ThrowsAsync(() => this.WriteAsync(new byte[5], 0, 6, isAsync: useAsync)); + await Assert.ThrowsAsync(() => this.WriteAsync(new byte[5], 5, 1, isAsync: useAsync)); + await Assert.ThrowsAsync(() => this.WriteAsync(new byte[5], 3, 3, isAsync: useAsync)); + await Assert.ThrowsAsync(() => this.WriteAsync(new byte[5], -1, 2, isAsync: useAsync)); + await Assert.ThrowsAsync(() => this.WriteAsync(new byte[5], 2, -1, isAsync: useAsync)); + + await this.WriteAsync(new byte[5], 5, 0, useAsync); + } + + [Theory] + [PairwiseData] + public async Task Write_ThrowsObjectDisposedException(bool useAsync) + { + this.stream.Dispose(); + await Assert.ThrowsAsync(() => this.WriteAsync(new byte[1], 0, 1, useAsync)); + } + + [Theory] + [CombinatorialData] + public async Task WriteManyThenRead([CombinatorialValues(PauseThreshold / 2, PauseThreshold - 1)] int bytes, [CombinatorialValues(1, 2, 3)] int steps, bool useAsync) + { + int typicalWriteSize = bytes / steps; + byte[] sendBuffer = this.GetRandomBuffer(bytes); + int bytesWritten = 0; + for (int i = 0; i < steps; i++) + { + await this.WriteAsync(sendBuffer, bytesWritten, typicalWriteSize, useAsync); + bytesWritten += typicalWriteSize; + } + + // Write the balance of the bytes + await this.WriteAsync(sendBuffer, bytesWritten, bytes - bytesWritten, useAsync); + await this.stream.FlushAsync(this.TimeoutToken); + + byte[] recvBuffer = new byte[sendBuffer.Length]; + await this.ReadAsync(this.stream, recvBuffer, isAsync: useAsync); + Assert.Equal(sendBuffer, recvBuffer); + } + + [Theory] + [CombinatorialData] + public async Task WriteWriteRead_Loop_WriteRead([CombinatorialValues(2, 2.1, 2.9, 3, 4, 5, 6)] float stepsPerBuffer) + { + bool useAsync = true; + const int maxBufferMultiplier = 3; + float steps = stepsPerBuffer * maxBufferMultiplier; + byte[] sendBuffer = this.GetRandomBuffer(PauseThreshold * maxBufferMultiplier); + byte[] recvBuffer = new byte[sendBuffer.Length]; + int typicalWriteSize = (int)(sendBuffer.Length / steps); + typicalWriteSize = Math.Min(typicalWriteSize, (PauseThreshold / 2) - 1); // We need to be able to write twice in a row. + int bytesWritten = 0; + int bytesRead = 0; + for (int i = 0; i < Math.Floor(steps); i++) + { + await this.WriteAsync(sendBuffer, bytesWritten, typicalWriteSize, useAsync); + bytesWritten += typicalWriteSize; + await this.stream.FlushAsync(); + + if (i > 0) + { + await this.ReadAsync(this.stream, recvBuffer, typicalWriteSize, bytesRead, useAsync); + bytesRead += typicalWriteSize; + Assert.Equal(sendBuffer.Take(bytesRead), recvBuffer.Take(bytesRead)); + } + } + + // Write the balance of the bytes + await this.WriteAsync(sendBuffer, bytesWritten, sendBuffer.Length - bytesWritten, useAsync); + await this.stream.FlushAsync(); + + // Read the balance + await this.ReadAsync(this.stream, recvBuffer, recvBuffer.Length - bytesRead, bytesRead, useAsync); + + Assert.Equal(sendBuffer, recvBuffer); + } + + [Theory] + [CombinatorialData] + public async Task Write_ThenReadMore(bool useAsync) + { + byte[] sendBuffer = new byte[] { 0x1, 0x2 }; + await this.WriteAsync(sendBuffer, 0, sendBuffer.Length, useAsync); + await this.stream.FlushAsync(this.TimeoutToken); + int bytesRead; + byte[] recvBuffer = new byte[5]; + if (useAsync) + { + bytesRead = await this.stream.ReadAsync(recvBuffer, 0, recvBuffer.Length, this.TimeoutToken).WithCancellation(this.TimeoutToken); + } + else + { + bytesRead = this.stream.Read(recvBuffer, 0, recvBuffer.Length); + } + + Assert.Equal(sendBuffer.Length, bytesRead); + Assert.Equal(sendBuffer, recvBuffer.Take(bytesRead)); + } + + [Fact] + public async Task ReadAsyncThenWriteAsync() + { + byte[] sendBuffer = this.GetRandomBuffer(20); + byte[] recvBuffer = new byte[sendBuffer.Length]; + Task readTask = this.ReadAsync(this.stream, recvBuffer); + await this.stream.WriteAsync(sendBuffer, 0, sendBuffer.Length).WithCancellation(this.TimeoutToken); + await this.stream.FlushAsync(this.TimeoutToken); + await readTask.WithCancellation(this.TimeoutToken); + Assert.Equal(sendBuffer, recvBuffer); + } + + [Theory] + [CombinatorialData] + public async Task CompleteWriting(bool useAsync) + { + await this.WriteAsync(new byte[3], 0, 3, useAsync); + this.stream.CompleteWriting(); + byte[] recvbuffer = new byte[5]; + await this.ReadAsync(this.stream, recvbuffer, count: 3, isAsync: useAsync); + Assert.Equal(0, await this.stream.ReadAsync(recvbuffer, 3, 2, this.TimeoutToken).WithCancellation(this.TimeoutToken)); + Assert.Equal(0, this.stream.Read(recvbuffer, 3, 2)); + } + + [Fact] + public async Task StreamAsBufferWriter() + { + IBufferWriter writer = this.stream; + writer.Write(new byte[] { 1, 2, 3 }); + writer.Write(new byte[] { 4, 5, 6, 7, 8, 9 }); + await this.stream.FlushAsync(this.TimeoutToken); + var readBuffer = new byte[10]; + int bytesRead = await this.stream.ReadAsync(readBuffer, 0, 10, this.TimeoutToken); + Assert.Equal(9, bytesRead); + Assert.Equal(Enumerable.Range(1, 9).Select(i => (byte)i), readBuffer.Take(bytesRead)); + } + + protected override void Dispose(bool disposing) + { + this.stream.Dispose(); + base.Dispose(disposing); + } + + private async Task WriteAsync(byte[] buffer, int offset, int count, bool isAsync) + { + if (isAsync) + { + await this.stream.WriteAsync(buffer, offset, count, this.TimeoutToken).WithCancellation(this.TimeoutToken); + } + else + { + this.stream.Write(buffer, offset, count); + } + } +} diff --git a/src/Nerdbank.Streams.Tests/StreamPipeReaderTestBase.cs b/src/Nerdbank.Streams.Tests/StreamPipeReaderTestBase.cs index 44b3b1c6..546dedb4 100644 --- a/src/Nerdbank.Streams.Tests/StreamPipeReaderTestBase.cs +++ b/src/Nerdbank.Streams.Tests/StreamPipeReaderTestBase.cs @@ -93,7 +93,7 @@ public async Task Stream() public async Task ReadAsyncAfterExamining() { byte[] expectedBuffer = this.GetRandomBuffer(2048); - var stream = new HalfDuplexStream(); + var stream = new SimplexStream(); stream.Write(expectedBuffer, 0, 50); await stream.FlushAsync(this.TimeoutToken); var reader = this.CreatePipeReader(stream, sizeHint: 50); @@ -214,7 +214,7 @@ public async Task OnWriterCompleted() [Fact] public async Task CancelPendingRead_WithCancellationToken() { - var stream = new HalfDuplexStream(); + var stream = new SimplexStream(); var reader = this.CreatePipeReader(stream, sizeHint: 50); var cts = new CancellationTokenSource(); @@ -226,7 +226,7 @@ public async Task CancelPendingRead_WithCancellationToken() [Fact] public async Task Complete_DoesNotCauseStreamDisposal() { - var stream = new HalfDuplexStream(); + var stream = new SimplexStream(); var reader = this.CreatePipeReader(stream); reader.Complete(); diff --git a/src/Nerdbank.Streams.Tests/StreamUsePipeReaderTests.cs b/src/Nerdbank.Streams.Tests/StreamUsePipeReaderTests.cs index 90440be0..51fbbd82 100644 --- a/src/Nerdbank.Streams.Tests/StreamUsePipeReaderTests.cs +++ b/src/Nerdbank.Streams.Tests/StreamUsePipeReaderTests.cs @@ -46,7 +46,7 @@ public async Task StreamFails() [Fact] public async Task Complete_CausesWriterCompletion() { - var stream = new HalfDuplexStream(); + var stream = new SimplexStream(); var reader = this.CreatePipeReader(stream); #pragma warning disable CS0618 // Type or member is obsolete Task writerCompletion = reader.WaitForWriterCompletionAsync(); diff --git a/src/Nerdbank.Streams.Tests/StreamUseStrictPipeReaderTests.cs b/src/Nerdbank.Streams.Tests/StreamUseStrictPipeReaderTests.cs index 9c14a00d..c993f14e 100644 --- a/src/Nerdbank.Streams.Tests/StreamUseStrictPipeReaderTests.cs +++ b/src/Nerdbank.Streams.Tests/StreamUseStrictPipeReaderTests.cs @@ -44,7 +44,7 @@ public async Task StreamFails() [Fact] // Bizarre behavior when using the built-in Pipe class: https://github.com/dotnet/corefx/issues/31696 public async Task CancelPendingRead() { - var stream = new HalfDuplexStream(); + var stream = new SimplexStream(); var reader = this.CreatePipeReader(stream, sizeHint: 50); ValueTask readTask = reader.ReadAsync(this.TimeoutToken); diff --git a/src/Nerdbank.Streams/HalfDuplexStream.cs b/src/Nerdbank.Streams/HalfDuplexStream.cs index bbfc063f..bdc6cb5f 100644 --- a/src/Nerdbank.Streams/HalfDuplexStream.cs +++ b/src/Nerdbank.Streams/HalfDuplexStream.cs @@ -19,6 +19,7 @@ namespace Nerdbank.Streams /// can then be read from it, in order. /// This is actually a "simplex" stream -- not a half duplex stream. Naming bug. /// + [Obsolete("Use " + nameof(SimplexStream) + " instead.")] public class HalfDuplexStream : Stream, IBufferWriter, IDisposableObservable { /// diff --git a/src/Nerdbank.Streams/SimplexStream.cs b/src/Nerdbank.Streams/SimplexStream.cs new file mode 100644 index 00000000..ada29632 --- /dev/null +++ b/src/Nerdbank.Streams/SimplexStream.cs @@ -0,0 +1,166 @@ +// Copyright (c) Andrew Arnott. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Nerdbank.Streams +{ + using System; + using System.Buffers; + using System.Collections.Generic; + using System.IO; + using System.IO.Pipelines; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using Microsoft; + using Microsoft.VisualStudio.Threading; + + /// + /// A that acts as a queue for bytes, in that what gets written to it + /// can then be read from it, in order. + /// + public class SimplexStream : Stream, IBufferWriter, IDisposableObservable + { + /// + /// The pipe that does all the hard work. + /// + private readonly Pipe pipe; + + /// + /// Initializes a new instance of the class. + /// + public SimplexStream() + : this(16 * 1024, 32 * 1024) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The size the buffer must shrink to after hitting before writing is allowed to resume. + /// The maximum size the buffer is allowed to grow before write calls are blocked (pending a read that will release buffer space. + public SimplexStream(int resumeWriterThreshold, int pauseWriterThreshold) + { + PipeOptions options = new PipeOptions( + pauseWriterThreshold: pauseWriterThreshold, + resumeWriterThreshold: resumeWriterThreshold, + useSynchronizationContext: false); + this.pipe = new Pipe(options); + } + + /// + public bool IsDisposed { get; private set; } + + /// + public override bool CanRead => !this.IsDisposed; + + /// + public override bool CanSeek => false; + + /// + public override bool CanWrite => !this.IsDisposed; + + /// + public override long Length => throw this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override long Position + { + get => throw this.ThrowDisposedOr(new NotSupportedException()); + set => throw this.ThrowDisposedOr(new NotSupportedException()); + } + + /// + /// Signals that no more writing will take place, causing readers to receive 0 bytes when asking for any more data. + /// + public void CompleteWriting() => this.pipe.Writer.Complete(); + + /// + public override async Task FlushAsync(CancellationToken cancellationToken) => await this.pipe.Writer.FlushAsync(cancellationToken).ConfigureAwait(false); + + /// + public override long Seek(long offset, SeekOrigin origin) => throw this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override void SetLength(long value) => throw this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + Requires.NotNull(buffer, nameof(buffer)); + Requires.Range(offset + count <= buffer.Length, nameof(count)); + Requires.Range(offset >= 0, nameof(offset)); + Requires.Range(count > 0, nameof(count)); + + ReadResult readResult = await this.pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + int bytesRead = 0; + ReadOnlySequence slice = readResult.Buffer.Slice(0, Math.Min(count, readResult.Buffer.Length)); + foreach (ReadOnlyMemory span in slice) + { + int bytesToCopy = Math.Min(count, span.Length); + span.CopyTo(new Memory(buffer, offset, bytesToCopy)); + offset += bytesToCopy; + count -= bytesToCopy; + bytesRead += bytesToCopy; + } + + this.pipe.Reader.AdvanceTo(slice.End); + return bytesRead; + } + + /// + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + this.Write(buffer, offset, count); + return Task.CompletedTask; + } + + /// + void IBufferWriter.Advance(int count) => this.pipe.Writer.Advance(count); + + /// + Memory IBufferWriter.GetMemory(int sizeHint) => this.pipe.Writer.GetMemory(sizeHint); + + /// + Span IBufferWriter.GetSpan(int sizeHint) => this.pipe.Writer.GetSpan(sizeHint); + +#pragma warning disable VSTHRD002 // Avoid problematic synchronous waits + + /// + public override int Read(byte[] buffer, int offset, int count) => this.ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + + /// + public override void Write(byte[] buffer, int offset, int count) + { + Requires.NotNull(buffer, nameof(buffer)); + Requires.Range(offset + count <= buffer.Length, nameof(count)); + Requires.Range(offset >= 0, nameof(offset)); + Requires.Range(count >= 0, nameof(count)); + Verify.NotDisposed(this); + + var memory = this.pipe.Writer.GetMemory(count); + buffer.AsMemory(offset, count).CopyTo(memory); + this.pipe.Writer.Advance(count); + } + + /// + public override void Flush() => this.FlushAsync().GetAwaiter().GetResult(); + +#pragma warning restore VSTHRD002 // Avoid problematic synchronous waits + + /// + protected override void Dispose(bool disposing) + { + this.IsDisposed = true; + this.pipe.Writer.Complete(); + this.pipe.Reader.Complete(); + base.Dispose(disposing); + } + + private Exception ThrowDisposedOr(Exception ex) + { + Verify.NotDisposed(this); + throw ex; + } + } +}