Skip to content

Commit

Permalink
Deprecate HalfDuplexStream and copy to new SimplexStream type
Browse files Browse the repository at this point in the history
Fixes #144
  • Loading branch information
AArnott committed Feb 7, 2020
1 parent 60eb853 commit f4ad308
Show file tree
Hide file tree
Showing 10 changed files with 453 additions and 9 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Specialized .NET Stream classes

## Features

1. [`HalfDuplexStream`](doc/HalfDuplexStream.md) is meant to allow two parties to communicate *one direction*.
1. [`SimplexStream`](doc/SimplexStream.md) is meant to allow two parties to communicate *one direction*.
Anything written to the stream can subsequently be read from it. You can share this `Stream`
with any two parties (in the same AppDomain) and one can send messages to the other.
1. [`FullDuplexStream`](doc/FullDuplexStream.md) creates a pair of bidirectional streams for
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions src/Nerdbank.Streams.Tests/HalfDuplexStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using Xunit;
using Xunit.Abstractions;

#pragma warning disable CS0618 // Type or member is obsolete

public class HalfDuplexStreamTests : TestBase
{
private const int ResumeThreshold = 39;
Expand Down
6 changes: 3 additions & 3 deletions src/Nerdbank.Streams.Tests/PipeExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void UsePipeWriter_WebSocket_ThrowsOnNull()
[Fact]
public async Task UsePipe_Stream()
{
var ms = new HalfDuplexStream();
var ms = new SimplexStream();
IDuplexPipe pipe = ms.UsePipe(cancellationToken: this.TimeoutToken);
await pipe.Output.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken);
var readResult = await pipe.Input.ReadAsync(this.TimeoutToken);
Expand All @@ -49,7 +49,7 @@ public async Task UsePipe_Stream()
[Fact]
public async Task UsePipe_Stream_Disposal()
{
var ms = new HalfDuplexStream();
var ms = new SimplexStream();
IDuplexPipe pipe = ms.UsePipe(cancellationToken: this.TimeoutToken);
pipe.Output.Complete();
pipe.Input.Complete();
Expand All @@ -60,7 +60,7 @@ public async Task UsePipe_Stream_Disposal()
[PairwiseData]
public async Task UsePipe_Stream_OneDirectionDoesNotDispose(bool completeOutput)
{
var ms = new HalfDuplexStream();
var ms = new SimplexStream();
IDuplexPipe pipe = ms.UsePipe(cancellationToken: this.TimeoutToken);
if (completeOutput)
{
Expand Down
275 changes: 275 additions & 0 deletions src/Nerdbank.Streams.Tests/SimplexStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
// Copyright (c) Andrew Arnott. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.Threading;
using Nerdbank.Streams;
using Xunit;
using Xunit.Abstractions;

public class SimplexStreamTests : TestBase
{
private const int ResumeThreshold = 39;

private const int PauseThreshold = 40;

private readonly Random random = new Random();

private SimplexStream stream = new SimplexStream(ResumeThreshold, PauseThreshold);

public SimplexStreamTests(ITestOutputHelper logger)
: base(logger)
{
}

[Fact]
public void DefaultCtor()
{
var stream = new SimplexStream();
stream.Dispose();
}

[Fact]
public void CanSeek() => Assert.False(this.stream.CanSeek);

[Fact]
public void Length()
{
Assert.Throws<NotSupportedException>(() => this.stream.Length);
this.stream.Dispose();
Assert.Throws<ObjectDisposedException>(() => this.stream.Length);
}

[Fact]
public void Position()
{
Assert.Throws<NotSupportedException>(() => this.stream.Position);
Assert.Throws<NotSupportedException>(() => this.stream.Position = 0);
this.stream.Dispose();
Assert.Throws<ObjectDisposedException>(() => this.stream.Position);
Assert.Throws<ObjectDisposedException>(() => this.stream.Position = 0);
}

[Fact]
public void IsDisposed()
{
Assert.False(this.stream.IsDisposed);
this.stream.Dispose();
Assert.True(this.stream.IsDisposed);
}

[Fact]
public void SetLength()
{
Assert.Throws<NotSupportedException>(() => this.stream.SetLength(0));
this.stream.Dispose();
Assert.Throws<ObjectDisposedException>(() => this.stream.SetLength(0));
}

[Fact]
public void Seek()
{
Assert.Throws<NotSupportedException>(() => this.stream.Seek(0, SeekOrigin.Begin));
this.stream.Dispose();
Assert.Throws<ObjectDisposedException>(() => this.stream.Seek(0, SeekOrigin.Begin));
}

[Fact]
public void CanRead()
{
Assert.True(this.stream.CanRead);
this.stream.Dispose();
Assert.False(this.stream.CanRead);
}

[Fact]
public void CanWrite()
{
Assert.True(this.stream.CanWrite);
this.stream.Dispose();
Assert.False(this.stream.CanWrite);
}

[Fact]
public void Flush()
{
this.stream.Flush();
Assert.True(this.stream.FlushAsync().IsCompleted);
}

[Theory]
[PairwiseData]
public async Task WriteThenRead(bool useAsync)
{
byte[] sendBuffer = this.GetRandomBuffer(20);
await this.WriteAsync(sendBuffer, 0, sendBuffer.Length, useAsync);
await this.stream.FlushAsync(this.TimeoutToken);
byte[] recvBuffer = new byte[sendBuffer.Length];
await this.ReadAsync(this.stream, recvBuffer, isAsync: useAsync);
Assert.Equal(sendBuffer, recvBuffer);
}

[Theory]
[PairwiseData]
public async Task Write_InputValidation(bool useAsync)
{
await Assert.ThrowsAsync<ArgumentNullException>(() => this.WriteAsync(null!, 0, 0, isAsync: useAsync));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(() => this.WriteAsync(new byte[5], 0, 6, isAsync: useAsync));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(() => this.WriteAsync(new byte[5], 5, 1, isAsync: useAsync));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(() => this.WriteAsync(new byte[5], 3, 3, isAsync: useAsync));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(() => this.WriteAsync(new byte[5], -1, 2, isAsync: useAsync));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(() => this.WriteAsync(new byte[5], 2, -1, isAsync: useAsync));

await this.WriteAsync(new byte[5], 5, 0, useAsync);
}

[Theory]
[PairwiseData]
public async Task Write_ThrowsObjectDisposedException(bool useAsync)
{
this.stream.Dispose();
await Assert.ThrowsAsync<ObjectDisposedException>(() => this.WriteAsync(new byte[1], 0, 1, useAsync));
}

[Theory]
[CombinatorialData]
public async Task WriteManyThenRead([CombinatorialValues(PauseThreshold / 2, PauseThreshold - 1)] int bytes, [CombinatorialValues(1, 2, 3)] int steps, bool useAsync)
{
int typicalWriteSize = bytes / steps;
byte[] sendBuffer = this.GetRandomBuffer(bytes);
int bytesWritten = 0;
for (int i = 0; i < steps; i++)
{
await this.WriteAsync(sendBuffer, bytesWritten, typicalWriteSize, useAsync);
bytesWritten += typicalWriteSize;
}

// Write the balance of the bytes
await this.WriteAsync(sendBuffer, bytesWritten, bytes - bytesWritten, useAsync);
await this.stream.FlushAsync(this.TimeoutToken);

byte[] recvBuffer = new byte[sendBuffer.Length];
await this.ReadAsync(this.stream, recvBuffer, isAsync: useAsync);
Assert.Equal(sendBuffer, recvBuffer);
}

[Theory]
[CombinatorialData]
public async Task WriteWriteRead_Loop_WriteRead([CombinatorialValues(2, 2.1, 2.9, 3, 4, 5, 6)] float stepsPerBuffer)
{
bool useAsync = true;
const int maxBufferMultiplier = 3;
float steps = stepsPerBuffer * maxBufferMultiplier;
byte[] sendBuffer = this.GetRandomBuffer(PauseThreshold * maxBufferMultiplier);
byte[] recvBuffer = new byte[sendBuffer.Length];
int typicalWriteSize = (int)(sendBuffer.Length / steps);
typicalWriteSize = Math.Min(typicalWriteSize, (PauseThreshold / 2) - 1); // We need to be able to write twice in a row.
int bytesWritten = 0;
int bytesRead = 0;
for (int i = 0; i < Math.Floor(steps); i++)
{
await this.WriteAsync(sendBuffer, bytesWritten, typicalWriteSize, useAsync);
bytesWritten += typicalWriteSize;
await this.stream.FlushAsync();

if (i > 0)
{
await this.ReadAsync(this.stream, recvBuffer, typicalWriteSize, bytesRead, useAsync);
bytesRead += typicalWriteSize;
Assert.Equal(sendBuffer.Take(bytesRead), recvBuffer.Take(bytesRead));
}
}

// Write the balance of the bytes
await this.WriteAsync(sendBuffer, bytesWritten, sendBuffer.Length - bytesWritten, useAsync);
await this.stream.FlushAsync();

// Read the balance
await this.ReadAsync(this.stream, recvBuffer, recvBuffer.Length - bytesRead, bytesRead, useAsync);

Assert.Equal(sendBuffer, recvBuffer);
}

[Theory]
[CombinatorialData]
public async Task Write_ThenReadMore(bool useAsync)
{
byte[] sendBuffer = new byte[] { 0x1, 0x2 };
await this.WriteAsync(sendBuffer, 0, sendBuffer.Length, useAsync);
await this.stream.FlushAsync(this.TimeoutToken);
int bytesRead;
byte[] recvBuffer = new byte[5];
if (useAsync)
{
bytesRead = await this.stream.ReadAsync(recvBuffer, 0, recvBuffer.Length, this.TimeoutToken).WithCancellation(this.TimeoutToken);
}
else
{
bytesRead = this.stream.Read(recvBuffer, 0, recvBuffer.Length);
}

Assert.Equal(sendBuffer.Length, bytesRead);
Assert.Equal(sendBuffer, recvBuffer.Take(bytesRead));
}

[Fact]
public async Task ReadAsyncThenWriteAsync()
{
byte[] sendBuffer = this.GetRandomBuffer(20);
byte[] recvBuffer = new byte[sendBuffer.Length];
Task readTask = this.ReadAsync(this.stream, recvBuffer);
await this.stream.WriteAsync(sendBuffer, 0, sendBuffer.Length).WithCancellation(this.TimeoutToken);
await this.stream.FlushAsync(this.TimeoutToken);
await readTask.WithCancellation(this.TimeoutToken);
Assert.Equal(sendBuffer, recvBuffer);
}

[Theory]
[CombinatorialData]
public async Task CompleteWriting(bool useAsync)
{
await this.WriteAsync(new byte[3], 0, 3, useAsync);
this.stream.CompleteWriting();
byte[] recvbuffer = new byte[5];
await this.ReadAsync(this.stream, recvbuffer, count: 3, isAsync: useAsync);
Assert.Equal(0, await this.stream.ReadAsync(recvbuffer, 3, 2, this.TimeoutToken).WithCancellation(this.TimeoutToken));
Assert.Equal(0, this.stream.Read(recvbuffer, 3, 2));
}

[Fact]
public async Task StreamAsBufferWriter()
{
IBufferWriter<byte> writer = this.stream;
writer.Write(new byte[] { 1, 2, 3 });
writer.Write(new byte[] { 4, 5, 6, 7, 8, 9 });
await this.stream.FlushAsync(this.TimeoutToken);
var readBuffer = new byte[10];
int bytesRead = await this.stream.ReadAsync(readBuffer, 0, 10, this.TimeoutToken);
Assert.Equal(9, bytesRead);
Assert.Equal(Enumerable.Range(1, 9).Select(i => (byte)i), readBuffer.Take(bytesRead));
}

protected override void Dispose(bool disposing)
{
this.stream.Dispose();
base.Dispose(disposing);
}

private async Task WriteAsync(byte[] buffer, int offset, int count, bool isAsync)
{
if (isAsync)
{
await this.stream.WriteAsync(buffer, offset, count, this.TimeoutToken).WithCancellation(this.TimeoutToken);
}
else
{
this.stream.Write(buffer, offset, count);
}
}
}
6 changes: 3 additions & 3 deletions src/Nerdbank.Streams.Tests/StreamPipeReaderTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public async Task Stream()
public async Task ReadAsyncAfterExamining()
{
byte[] expectedBuffer = this.GetRandomBuffer(2048);
var stream = new HalfDuplexStream();
var stream = new SimplexStream();
stream.Write(expectedBuffer, 0, 50);
await stream.FlushAsync(this.TimeoutToken);
var reader = this.CreatePipeReader(stream, sizeHint: 50);
Expand Down Expand Up @@ -214,7 +214,7 @@ public async Task OnWriterCompleted()
[Fact]
public async Task CancelPendingRead_WithCancellationToken()
{
var stream = new HalfDuplexStream();
var stream = new SimplexStream();
var reader = this.CreatePipeReader(stream, sizeHint: 50);

var cts = new CancellationTokenSource();
Expand All @@ -226,7 +226,7 @@ public async Task CancelPendingRead_WithCancellationToken()
[Fact]
public async Task Complete_DoesNotCauseStreamDisposal()
{
var stream = new HalfDuplexStream();
var stream = new SimplexStream();
var reader = this.CreatePipeReader(stream);
reader.Complete();

Expand Down
2 changes: 1 addition & 1 deletion src/Nerdbank.Streams.Tests/StreamUsePipeReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task StreamFails()
[Fact]
public async Task Complete_CausesWriterCompletion()
{
var stream = new HalfDuplexStream();
var stream = new SimplexStream();
var reader = this.CreatePipeReader(stream);
#pragma warning disable CS0618 // Type or member is obsolete
Task writerCompletion = reader.WaitForWriterCompletionAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public async Task StreamFails()
[Fact] // Bizarre behavior when using the built-in Pipe class: https://github.com/dotnet/corefx/issues/31696
public async Task CancelPendingRead()
{
var stream = new HalfDuplexStream();
var stream = new SimplexStream();
var reader = this.CreatePipeReader(stream, sizeHint: 50);

ValueTask<ReadResult> readTask = reader.ReadAsync(this.TimeoutToken);
Expand Down
1 change: 1 addition & 0 deletions src/Nerdbank.Streams/HalfDuplexStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Nerdbank.Streams
/// can then be read from it, in order.
/// This is actually a "simplex" stream -- not a half duplex stream. Naming bug.
/// </summary>
[Obsolete("Use " + nameof(SimplexStream) + " instead.")]
public class HalfDuplexStream : Stream, IBufferWriter<byte>, IDisposableObservable
{
/// <summary>
Expand Down
Loading

0 comments on commit f4ad308

Please sign in to comment.