Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MonitoringStream.DidFlush event #149

Merged
merged 1 commit into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Nerdbank.Streams.Tests/MonitoringStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,10 @@ public void Flush()
var mockedUnderlyingStream = new Mock<Stream>(MockBehavior.Strict);
mockedUnderlyingStream.Setup(s => s.Flush());
var monitoringStream = new MonitoringStream(mockedUnderlyingStream.Object);
bool didFlushRaised = false;
monitoringStream.DidFlush += (s, e) => didFlushRaised = true;
monitoringStream.Flush();
Assert.True(didFlushRaised);
mockedUnderlyingStream.VerifyAll();
}

Expand All @@ -402,7 +405,10 @@ public async Task FlushAsync()
var mockedUnderlyingStream = new Mock<Stream>(MockBehavior.Strict);
mockedUnderlyingStream.Setup(s => s.FlushAsync(It.IsAny<CancellationToken>())).Returns(Task.CompletedTask);
var monitoringStream = new MonitoringStream(mockedUnderlyingStream.Object);
bool didFlushRaised = false;
monitoringStream.DidFlush += (s, e) => didFlushRaised = true;
await monitoringStream.FlushAsync();
Assert.True(didFlushRaised);
mockedUnderlyingStream.VerifyAll();
}

Expand Down
26 changes: 2 additions & 24 deletions src/Nerdbank.Streams.Tests/SubstreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public async Task Flush_RepeatedlyDoesNotWriteMore(bool async)
[PairwiseData]
public async Task Dispose_FlushesFinalBytes(bool async)
{
var monitoredStream = new InstrumentedStream(this.underlyingStream);
var monitoredStream = new MonitoringStream(this.underlyingStream);
int lastOperation = 0;
monitoredStream.DidWrite += (s, e) => lastOperation = 1;
monitoredStream.DidWriteMemory += (s, e) => lastOperation = 1;
Expand All @@ -312,7 +312,7 @@ public async Task Dispose_FlushesFinalBytes(bool async)
[PairwiseData]
public async Task Flush_FlushesUnderlyingStream(bool async)
{
var monitoredStream = new InstrumentedStream(this.underlyingStream);
var monitoredStream = new MonitoringStream(this.underlyingStream);
int flushed = 0;
monitoredStream.DidFlush += (s, e) => flushed++;

Expand Down Expand Up @@ -382,26 +382,4 @@ private async Task DisposeSyncOrAsync(Substream stream, bool async)
stream.Dispose();
}
}

private class InstrumentedStream : MonitoringStream
{
public InstrumentedStream(Stream inner)
: base(inner)
{
}

public event EventHandler? DidFlush;

public override void Flush()
{
base.Flush();
this.DidFlush?.Invoke(this, EventArgs.Empty);
}

public override async Task FlushAsync(CancellationToken cancellationToken)
{
await base.FlushAsync(cancellationToken);
this.DidFlush?.Invoke(this, EventArgs.Empty);
}
}
}
17 changes: 15 additions & 2 deletions src/Nerdbank.Streams/MonitoringStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public MonitoringStream(Stream inner)
/// </summary>
public event EventHandler<byte>? DidWriteByte;

/// <summary>
/// Occurs after <see cref="Flush"/> or <see cref="FlushAsync(CancellationToken)"/> is invoked.
/// </summary>
public event EventHandler? DidFlush;

/// <summary>
/// Occurs when <see cref="Stream.Dispose()"/> is invoked.
/// </summary>
Expand Down Expand Up @@ -204,10 +209,18 @@ public override int WriteTimeout
public override bool CanTimeout => this.inner.CanTimeout;

/// <inheritdoc/>
public override void Flush() => this.inner.Flush();
public override void Flush()
{
this.inner.Flush();
this.DidFlush?.Invoke(this, EventArgs.Empty);
}

/// <inheritdoc/>
public override Task FlushAsync(CancellationToken cancellationToken) => this.inner.FlushAsync(cancellationToken);
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await this.inner.FlushAsync(cancellationToken).ConfigureAwait(false);
this.DidFlush?.Invoke(this, EventArgs.Empty);
}

/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count)
Expand Down