Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream - Interface segregation #4724

Closed
benaadams opened this issue Nov 28, 2015 · 7 comments
Closed

Stream - Interface segregation #4724

benaadams opened this issue Nov 28, 2015 · 7 comments
Assignees
Milestone

Comments

@benaadams
Copy link
Member

Stream - Interface segregation

Problem Statement

Stream is a leaky abstraction with its behavior determined at runtime with CanRead, CanSeek, CanTimeout, CanWrite and NotSupportedException rather than compile time contracts.

This also means adapter wrapping classes like TextWriter and its subclasses, which don't share Stream's interface, are required to enforce consistency; which leads to higher allocations. However their constructors throw so it is still runtime discovery.

The wide interface scope of Stream also suffers from ownership confusion due to it being passed as a IDisposable leading to clarifying methods such as

StreamWriter(Stream stream, Encoding encoding, int bufferSize, bool leaveOpen)

Also its large Api surface is problematic for testing, dependency injection and scope control - as rarely is it desirable to expose a directly derived type; rather than a secondary wrapper for the derived type, in a public Api due to this large surface area.

These effect cause the Api surfaces to be defined at the fringes rather than in the BCL; reducing easy interoperability between 3rd party components.

Ideally the ownership of behavior would be brought back to the BCL which would allow greater component reuse and communication without requiring custom adapters or glue code for each interaction.

Inefficiencies

Creating an HttpContext that supports keepalive; Request, Response and Duplex streams from a socket currently would requires the creation of 4 NetworkStream objects; 3 per request which maybe be several million extra objects created per second.

This is because disposable control of the socket can't be passed on, and Read shouldn't have write ability, and Write shouldn't have read ability.

public partial class HttpContext
{
    private Stream _stream;

    public HttpContext(Socket socket)
    {
        _stream = new NetworkStream(socket, FileAccess.ReadWrite, true);
        DuplexStream = new NetworkStream(socket, FileAccess.ReadWrite, false);
        RequestStream = new NetworkStream(socket, FileAccess.Read, false);
        ResponseStream = new NetworkStream(socket, FileAccess.Write, false);
    }

    public Stream DuplexStream { get; set; }
    public Stream RequestStream { get; set; }
    public Stream ResponseStream { get; set; }
}

This situation could be improved by the constructor taking a disposable R+RW interface; easier to mock for testing; for example they could be a file input and a file output combined in a class that implemented the interface. This could then be exposed it to the properties via up-casting to its parent interfaces. Which only requires a single stream object; as shown below.

public partial class HttpContext
{
    private IDuplexStreamAsync<byte> _stream;

    public HttpContext(IDuplexStreamAsync<byte> stream)
    {
        _stream = stream;
        DuplexStream = _stream;
        RequestStream = _stream;
        ResponseStream = _stream;
    }

    public IDuplexAsync<byte> DuplexStream { get; set; }
    public IReaderAsync<byte> RequestStream { get; set; }
    public IWriterAsync<byte> ResponseStream { get; set; }
}

This ends up being a 75% reduction in allocations aspnet/KestrelHttpServer#437 for the same behavior.

Background

This stemmed from a conversation on twitter; which coincided with some frustrations I was having with with Stream at the time.

@davkean tweet

If someone can come up with a better design for something like Stream, I'm all ears - but I've yet to see one.

@haacked tweet

how often has the core API for Stream changed in a way that couldn't be expressed by adding a new interface?

@benaadams (ben_a_adams on twitter) tweet

can do API prop if interested; non-breaking, but would need new methods to accept the ifaces else pointless?

@davkean tweet

Goal: Design stream using only interfaces to handle all the cases that we've change stream over the years.

It's somewhat of a pointless exercise, but it's the only way I'm take the statement seriously.

Which I addressed in this Gist in a non-breaking way that is compatible with Stream.

However that is not my api proposal as if Stream implemented these interfaces it would be violating the Interface contracts; and would still need runtime checks so wouldn't be a step forward. Also some historical methods can be dropped like Close as that is now covered by Dispose as these are new interfaces.

Proposed API

Base Interfaces

// namespace System.IO
/* .NET Framework 1.1 */
public interface IBlockingReader<T>
{
    int Read(T[] buffer, int offset, int count);
}

public interface IBlockingWriter<T>
{
    void Flush();
    void Write(T value);
    void Write(T[] buffer, int offset, int count);
}

public interface ISeekable
{
    long Position { get; set; }
    long Length { get; }
    long Seek(long offset, SeekOrigin origin);
}

public interface ISizable
{
    void SetLength(long value);
}

/* .NET Framework 4.5 */
public interface IReaderAsync<T>
{
    ValueTask<int> ReadAsync(T[] buffer, int offset, int count);
    ValueTask<int> ReadAsync(T[] buffer, int offset, int count, CancellationToken cancellationToken);
}

public interface IWriterAsync<T>
{
    Task FlushAsync();
    Task FlushAsync(CancellationToken cancellationToken);
    Task WriteAsync(T[] buffer, int offset, int count);
    Task WriteAsync(T[] buffer, int offset, int count, CancellationToken cancellationToken);
}

Example use

This by itself is enough to replicate most of the general stream behavior using Generic Constraints; with the type "streamified" by including IDisposable as a constraint.

// Do something with generic sync Reading, Seeking, Disposable stream
public static void DoSomething<T>(T stream)
    where T : IBlockingReader<T>, ISeekable, ISizable, IDisposable
{
    T[] buffer = new T[7];
    var count = stream.Read(buffer, 0, buffer.Length);
    stream.SetLength(6);
    stream.Position = 5;
    stream.Dispose();
}

Convenience Extensions

The CopyTo convenience functions introduced at 4.0 and 4.5 can be covered by a set of Extension methods.

// namespace System.IO.Extensions
// public static partial class ReaderWriterExtensions
/* .NET Framework 4.0 */
private const int _DefaultCopyBufferSize = 81920; // Bit big?

public static void Write<T>(this IBlockingWriter<T> destination, T[] buffer)
{
    destination.Write(buffer, 0, buffer.Length);
}

public static void CopyTo<T>(this IBlockingReader<T> source, IBlockingWriter<T> destination)
{
    source.CopyTo(destination, _DefaultCopyBufferSize);
}

public static void CopyTo<T>(
    this IBlockingReader<T> source, 
    IBlockingWriter<T> destination, 
    int bufferSize)
{
    var buffer = new T[bufferSize];
    int read;
    while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
    {
        destination.Write(buffer, 0, read);
    }
}

/* .NET Framework 4.5 */
public static Task CopyToAsync<T>(this IReaderAsync<T> source, IWriterAsync<T> destination)
{
    return source.CopyToAsync(destination, _DefaultCopyBufferSize, CancellationToken.None);
}

public static Task CopyToAsync<T>(
    this IReaderAsync<T> source, 
    IWriterAsync<T> destination, 
    int bufferSize)
{
    return source.CopyToAsync(destination, bufferSize, CancellationToken.None);
}

public static async Task CopyToAsync<T>(
    this IReaderAsync<T> source, 
    IWriterAsync<T> destination, 
    int bufferSize, 
    CancellationToken cancellationToken)
{
    var buffer = new T[bufferSize];
    int bytesRead;
    while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
    {
        await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
    }
}
//}

Convenience Mixins

However, most day to day use would be a combination of the interfaces; and for ease of use rather than using Generic Constraints; as well as allowing for consistent up-casting between the interfaces; the following pre-defined mixins should be declared:

// namespace System.IO.Mixin
/* .NET Framework 1.1 */
public interface IBlockingDuplex<T> : IBlockingReader<T>, IBlockingWriter<T> { }
public interface ISeekableReader<T> : IBlockingReader<T>, ISeekable { }
public interface ISeekableWriter<T> : IBlockingWriter<T>, ISeekable, ISizable { }
public interface ISeekableDuplex<T> : ISeekableReader<T>, ISeekableWriter<T> { }

/* .NET Framework 4.5 */
public interface IDuplexAsync<T> : IReaderAsync<T>, IWriterAsync<T> { }

Convenience Stream Mixins

Streams are essentially the above but with the ability to close them; or the IDisposable interface. They should also be able to be cast to the above interfaces so they can be passed to methods without allowing ownership change or disposal. These interfaces would be as follows:

// namespace System.IO.Mixin.Stream
/* .NET Framework 1.1 */
public interface IBlockingReadStream<T> : IBlockingReader<T>, IDisposable { }
public interface IBlockingWriteStream<T> : IBlockingWriter<T>, IDisposable { }
public interface IBlockingDuplexStream<T> : IBlockingDuplex<T>, IBlockingReadStream<T>, IBlockingWriteStream<T> { }

public interface ISeekableStream : ISeekable, IDisposable { }
public interface ISeekableReadStream<T> : IBlockingReadStream<T>, ISeekableStream, ISeekableReader<T> { }
public interface ISeekableWriteStream<T> : IBlockingWriteStream<T>, ISeekableStream, ISeekableWriter<T> { }
public interface ISeekableDuplexStream<T> : ISeekableReadStream<T>, ISeekableWriteStream<T>, IBlockingDuplexStream<T>, ISeekableDuplex<T> { }

/* .NET Framework 4.5 */
public interface IReadStreamAsync<T> : IReaderAsync<T>, IDisposable { }
public interface IWriteStreamAsync<T> : IWriterAsync<T>, IDisposable { }
public interface IDuplexStreamAsync<T> : IReadStreamAsync<T>, IWriteStreamAsync<T>, IDuplexAsync<T> { }

public interface IBlockingAsyncReader<T> : IReaderAsync<T>, IBlockingReader<T> { }
public interface IBlockingAsyncWriter<T> : IWriterAsync<T>, IBlockingWriter<T> { }
public interface IBlockingAsyncDuplex<T> : IDuplexAsync<T>, IBlockingDuplex<T> { }

public interface IBlockingAsyncReadStream<T> : IReadStreamAsync<T>, IBlockingReadStream<T>, IBlockingAsyncReader<T> { }
public interface IBlockingAsyncWriteStream<T> : IWriteStreamAsync<T>, IBlockingWriteStream<T>, IBlockingAsyncWriter<T> { }
public interface IBlockingAsyncDuplexStream<T> : IDuplexStreamAsync<T>, IBlockingDuplexStream<T>, IBlockingAsyncDuplex<T>, IBlockingAsyncReadStream<T>, IBlockingAsyncWriteStream<T> { }

Example use

// Do something with sync Reading, Seeking, Disposable stream (Pre-Mixin)
public static void DoSomething<T>(ISeekableDuplexStream<T> stream)
{
    T[] buffer = new T[7];
    var count = stream.Read(buffer, 0, buffer.Length);
    stream.SetLength(6);
    stream.Position = 5;
    stream.Dispose();
}

More importantly operate directly on hard types without indirection; for example a FileStream would be

public static void DoSomething(ISeekableDuplexStream<byte> stream)
{
    byte[] buffer = new byte[7];
    var count = stream.Read(buffer, 0, buffer.Length);
    stream.SetLength(6);
    stream.Position = 5;
    stream.Dispose();
}

Suggested Integration into BCL

Stream

As stated in the opening problem statement Stream doesn't not nessarily obey the interface's contracts, depending on its runtime state; so Stream cannot implement these interfaces directly without inadvertently violating their contracts.

However, to get widespread adoption, using Stream as a source for these interfaced types needs to be supported. Adding a shim object adds unnecessary overhead, which we are trying to avoid. Luckily (unsure) the interfaces can be implemented via hidden internal interfaces for Stream.

internal interface IStreamSeekableDuplexStream : ISeekableDuplexStream<byte> { }
internal interface IStreamDuplexStreamAsync : IDuplexStreamAsync<byte> { }

#if FEATURE_REMOTING
    public abstract class Stream : MarshalByRefObject, IDisposable, IStreamDuplexStreamAsync, IStreamSeekableDuplexStream {
#else // FEATURE_REMOTING
    public abstract class Stream : IDisposable, IStreamDuplexStreamAsync, IStreamSeekableDuplexStream
    {
#endif // FEATURE_REMOTING

The addition of AsType methods with checks, then gives an easy route to use the Stream abstarct base class as the source and maintain strong interface contracts.

// Missing method
void IBlockingWriter<byte>.Write(byte value)
{
    WriteByte(value);
}

// Interface conversion
public IReadStreamAsync<byte> AsReadStreamAsync()
{
    if (!CanRead) __Error.ReadNotSupported();
    return this;
}
public IWriteStreamAsync<byte> AsWriteStreamAsync()
{
    if (!CanWrite) __Error.WriteNotSupported();
    return this;
}
public IDuplexStreamAsync<byte> AsDuplexStreamAsync()
{
    if (!CanWrite) __Error.WriteNotSupported();
    if (!CanRead) __Error.ReadNotSupported();
    return this;
}
public IBlockingReadStream<byte> AsBlockingReadStream()
{
    if (!CanRead) __Error.ReadNotSupported();
    return this;
}
public IBlockingWriteStream<byte> AsBlockingWriteStream()
{
    if (!CanWrite) __Error.WriteNotSupported();
    return this;
}
public IBlockingDuplexStream<byte> AsBlockingDuplexStream()
{
    if (!CanWrite) __Error.WriteNotSupported();
    if (!CanRead) __Error.ReadNotSupported();
    return this;
}
public ISeekableReadStream<byte> AsSeekableReadStream()
{
    if (!CanSeek) __Error.SeekNotSupported();
    if (!CanRead) __Error.ReadNotSupported();
    return this;
}
public ISeekableWriteStream<byte> AsSeekableWriteStream()
{
    if (!CanSeek) __Error.SeekNotSupported();
    if (!CanWrite) __Error.WriteNotSupported();
    return this;
}
public ISeekableDuplexStream<byte> AsSeekableDuplexStream()
{
    if (!CanSeek) __Error.SeekNotSupported();
    if (!CanWrite) __Error.WriteNotSupported();
    if (!CanRead) __Error.ReadNotSupported();
    return this;
}

TextReader

TextReader : IBlockingReadStream<char>, IReadStreamAsync<char>

TextReader would need the following extra method defined to support CancellationToken parameter.

public virtual Task<int> ReadAsync(char[] buffer, int index, int count, CancellationToken cancellationToken)
{
    // ...
}

TextWriter

TextWriter : IBlockingWriteStream<char>, IWriteStreamAsync<char>, IBlockingWriteStream<string>, IWriteStreamAsync<string>

TextWriter would need the following extra methods defined to support CancellationToken parameter and the string based interface.

public virtual Task WriteAsync(string value, CancellationToken cancellationToken)
{
    // ...
}
public virtual async Task WriteAsync(string[] values)
{
    if (values == null || values.Length == 0) return;
    for (var i = 0; i < values.Length; i++)
    {
        await WriteAsync(values[i]).ConfigureAwait(false);
    }
    return;
}
public virtual async Task WriteAsync(string[] values, CancellationToken cancellationToken)
{
    if (values == null || values.Length == 0) return;
    for (var i = 0; i < values.Length; i++)
    {
        await WriteAsync(values[i], cancellationToken).ConfigureAwait(false);
    }
    return;
}

Use cases

CopyTo & CopyToAsync & Extensions

As shown above as part of the api additions CopyTo and CopyToAsync can have their main method defined as extension methods, but only need the up-cast types IBlockingReader<T>, IBlockingWriter<T>, IReaderAsync<T> IWriterAsync<T> and do not need access to the full IDisposable type.

This also means its automatically defined on types of the same type T so any Reader(Async) can copy to any Writer(Async) whether its Stream derived,
TextReader/TextReader derived or any other future or 3rd party type.

Audio/Video Playback

Pass ISeekableReadStream to main control

Subcontrols only need up-cast interfaces and do not need access to properties and methods they shouldn't be using.

  • Close - IDisposable
  • Playback - IBlockingReader
  • Scrubbing - ISeekable

Multi-file bundling, Continuous log processing

Multi-file bundling or continuous log file processing could be implemented with the following class; adding a bunch of files to the ReadStream or appending new files over time.

public class MultiFileReader : IReadStreamAsync<byte>
{
    private readonly SemaphoreSlim _readSemaphore;
    private readonly SemaphoreSlim _writeSemaphore;

    private CancellationTokenSource _cts;
    private readonly byte[] _interalBuffer;
    private int _offset;
    private int _count;
    private bool _isDisposed;

    public MultiFileReader() : this(4096) { }
    public MultiFileReader(int bufferSize)
    {
        _interalBuffer = new byte[bufferSize];
        _readSemaphore = new SemaphoreSlim(0, 1);
        _writeSemaphore = new SemaphoreSlim(0, 1);
        _cts = new CancellationTokenSource();
    }

    public Task AddFilesAsync(string[] filePaths)
        => AddFilesAsync(filePaths, CancellationToken.None);

    public async Task AddFilesAsync(string[] filePaths, CancellationToken cancellationToken)
    {
        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, cancellationToken))
        {
            var token = cts.Token;
            token.ThrowIfCancellationRequested();
            for (var i = 0; i < filePaths.Length; i++)
            {
                using (var file = new FileReader(filePaths[i]))
                {
                    while ((_count = await file.ReadAsync(_interalBuffer, 0, _interalBuffer.Length, cancellationToken)) != 0)
                    {
                        _offset = 0;
                        _readSemaphore.Release();
                        await _writeSemaphore.WaitAsync(token);
                    }
                }
            }
        }
    }

    public ValueTask<int> ReadAsync(byte[] buffer, int offset, int count)
        => ReadAsync(buffer, offset, count, CancellationToken.None);

    public async ValueTask<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        if (_count == 0)
        {
            using (var cts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, cancellationToken))
            {
                await _readSemaphore.WaitAsync(cts.Token);
                if (_isDisposed)
                {
                    return 0;
                }
            }
        }

        var toCopy = _count > count ? _count: count;
        Buffer.BlockCopy(_interalBuffer, 0, buffer, offset, toCopy);

        _count -= toCopy;
        _offset += toCopy;

        if (_count == 0)
        {
            _writeSemaphore.Release();
        }

        return toCopy;
    }

    public void Dispose()
    {
        _cts.Cancel();
        _isDisposed = true;
    }
}

Null IO

All forms of Null IO for a datatype can be represented by a single object.

// namespace System.IO
public static class Null<T>
{
    private readonly static Task CompletedTask = Task.FromResult<object>(null);
    private readonly static NullStream s_nullStream = new NullStream();

    public readonly static IBlockingReader<T> BlockingReader = s_nullStream;
    public readonly static IBlockingWriter<T> BlockingWriter = s_nullStream;
    public readonly static IBlockingDuplex<T> BlockingDuplex = s_nullStream;

    public readonly static IReaderAsync<T> ReaderAsync = s_nullStream;
    public readonly static IWriterAsync<T> WriterAsync = s_nullStream;
    public readonly static IDuplexAsync<T> DuplexAsync = s_nullStream;

    public readonly static IBlockingReadStream<T> BlockingReadStream = s_nullStream;
    public readonly static IBlockingWriteStream<T> BlockingWriteStream = s_nullStream;
    public readonly static IBlockingDuplexStream<T> BlockingDuplexStream = s_nullStream;

    public readonly static IReadStreamAsync<T> ReadStreamAsync = s_nullStream;
    public readonly static IWriteStreamAsync<T> WriteStreamAsync = s_nullStream;
    public readonly static IDuplexStreamAsync<T> DuplexStreamAsync = s_nullStream;

    public readonly static IBlockingAsyncReader<T> BlockingAsyncReader = s_nullStream;
    public readonly static IBlockingAsyncWriter<T> BlockingAsyncWriter = s_nullStream;
    public readonly static IBlockingAsyncDuplex<T> BlockingAsyncDuplex = s_nullStream;

    public readonly static IBlockingAsyncReadStream<T> BlockingAsyncReadStream = s_nullStream;
    public readonly static IBlockingAsyncWriteStream<T> BlockingAsyncWriteStream = s_nullStream;
    public readonly static IBlockingAsyncDuplexStream<T> BlockingAsyncDuplexStream = s_nullStream;

    private class NullStream : IBlockingAsyncDuplexStream<T>
    {
        public void Dispose() { }
        public void Flush() { }
        public Task FlushAsync() => CompletedTask;
        public Task FlushAsync(CancellationToken cancellationToken) => CompletedTask;
        public int Read(T[] buffer, int offset, int count) => 0;
        public ValueTask<int> ReadAsync(T[] buffer, int offset, int count) => 0;
        public ValueTask<int> ReadAsync(T[] buffer, int offset, int count, CancellationToken cancellationToken) => 0;
        public void Write(T value) { }
        public void Write(T[] buffer, int offset, int count) { }
        public Task WriteAsync(T[] buffer, int offset, int count) => CompletedTask;
        public Task WriteAsync(T[] buffer, int offset, int count, CancellationToken cancellationToken) => CompletedTask;
    }
}

Example usage

public void DoNothing(IBlockingReader<byte> reader)
{
    reader.CopyTo(System.IO.Null<byte>.BlockingWriter);
}
public Task DoNothingAsync(IReaderAsync<byte> reader)
{
    return reader.CopyToAsync(System.IO.Null<byte>.WriterAsync);
}

Request & Response Streams

Returning to the Request & Response Streams, shown in the inefficiencies section of the problem statement. That demonstrated an improved pure interface approach.
However, with the additional Stream AsType changes the entry point can still be maintained and the same result can be achieved; as shown below.

public partial class HttpContext
{
    private IDuplexStreamAsync<byte> _stream;

    public HttpContext(Socket socket)
    {
        var networkStream = new NetworkStream(socket, FileAccess.ReadWrite, true);
        _stream = networkStream.AsDuplexStreamAsync();
        DuplexStream = _stream;
        RequestStream = _stream;
        ResponseStream = _stream;
    }

    public IDuplexAsync<byte> DuplexStream { get; set; }
    public IReaderAsync<byte> RequestStream { get; set; }
    public IWriterAsync<byte> ResponseStream { get; set; }
}

Encoding, Decoding

As shown above TextReader and TextWriter can share the Stream Api, but this can be taken further. These interfaces would allow the stream Api to flow though all stages for efficient stream processing, runtime flow construction, compile time checking and a common Api.

interface Decryptor : IWriterAsync<byte>, IReaderAsync<byte> { }
interface TextDecoder : IWriterAsync<byte>, IReaderAsync<char> { }
interface DelimiterParser : IWriterAsync<char>, IReaderAsync<string> { }

interface ImageDecoder : IWriterAsync<byte>, IReaderAsync<byte> { }
interface Decompressor : IWriterAsync<byte>, IReaderAsync<byte> { }

partial class TlsDecryptor : Decryptor
{
    public RegisterDecryptor(Decryptor decryptor)
}

partial class AsciiDecoder : TextDecoder { }
partial class Utf8Decoder : TextDecoder { }
partial class UnicodeDecoder : TextDecoder { }
partial class BigEndianUnicodeDecoder : TextDecoder { }
partial class Utf32Decoder : TextDecoder { }

partial class GifDecoder : ImageDecoder { }
partial class JpgDecoder : ImageDecoder { }
partial class PngDecoder : ImageDecoder { }
partial class WebPDecoder : ImageDecoder { }

partial class DeflateDecompressor : Decompressor { }
partial class GzipDecompressor : Decompressor { }
partial class Bzip2Decompressor : Decompressor { }
partial class SdchDecompressor : Decompressor { }
partial class BrotliDecompressor : Decompressor { }

interface Encryptor : IWriterAsync<byte>, IReaderAsync<byte> { }
interface TextEncoder : IWriterAsync<char>, IReaderAsync<byte> { }
interface Delimitor : IWriterAsync<string>, IReaderAsync<char> { }

interface ImageEncoder : IWriterAsync<byte>, IReaderAsync<byte> { }
interface Compressor : IWriterAsync<byte>, IReaderAsync<byte> { }

partial class TlsEncryptor : Encryptor
{
    public RegisterEncryptor(Encryptor encryptor)
}

partial class AsciiDecoder : TextEncoder { }
partial class Utf8Decoder : TextEncoder { }
partial class UnicodeDecoder : TextEncoder { }
partial class BigEndianUnicodeDecoder : TextEncoder { }
partial class Utf32Decoder : TextEncoder { }

partial class GifEncoder : ImageEncoder { }
partial class JpgEncoder : ImageEncoder { }
partial class PngEncoder : ImageEncoder { }
partial class WebPEncoder : ImageEncoder { }

partial class DeflateCompressor : Compressor { }
partial class GzipCompressor : Compressor { }
partial class Bzip2Compressor : Compressor { }
partial class SdchCompressor : Compressor { }
partial class BrotliCompressor : Compressor { }

Execution Pipeline

interface ICommand
{
    Task RunAsync();
}

partial class FileReader : IReadStreamAsync<byte> { }
partial class Utf8Decoder : IWriterAsync<byte>, IReaderAsync<char> { }
partial class LineParser : IWriterAsync<char>, IReaderAsync<string> { }
partial class CommandParser : IWriterAsync<string>, IReaderAsync<ICommand> { }
partial class CommandExecutor : IWriterAsync<ICommand>, IReaderAsync<string> { }
partial class ConsoleOutput : IWriterAsync<string> { }

public Task DoThisThing(string path)
{
        FileReader fileReader = new FileReader(path);
        Utf8Decoder decoder = new Utf8Decoder();
        LineParser stringParser = new LineParser();
        CommandParser commandParser = new CommandParser();
        CommandExecutor executionEngine = new CommandExecutor();
        ConsoleOutput messageOutput = new ConsoleOutput();

        var fileReadTask = fileReader.CopyToAsync(decoder);
        var decodeTask = decoder.CopyToAsync(stringParser);
        var delimiterGroupTask = stringParser.CopyToAsync(commandParser);
        var commandParseTask = commandParser.CopyToAsync(executionEngine);
        var commandProcessTask = executionEngine.CopyToAsync(messageOutput).ContinueWith((t) => 
            {
                fileReader.Dispose();
                if (t.IsFaulted)
                {
                    throw t.Exception;
                }
            });

        return Task.WhenAll(
                fileReadTask, 
                decodeTask, 
                delimiterGroupTask, 
                commandParseTask, 
                commandProcessTask);
    }
}

tl;dr

Add a compile time contract, that can be scope reduced to Streams in a way that has no breaking changes and gives greater flexibility for the future. Let the streams flow...

Issues...

Implementing an internal interface; exposes the internal interface's public inheritance chain on the object. Way to prevent compile time direct casting?

Interfaces have Async at the end rather than start as IAsyncReadStream fits more with the BeginXXX EndXXX type async in case that also wants support.

Main Api Summary

public interface IBlockingReader<T>
{
    int Read(T[] buffer, int offset, int count);
}

public interface IBlockingWriter<T>
{
    void Flush();
    void Write(T value);
    void Write(T[] buffer, int offset, int count);
}

public interface ISeekable
{
    long Position { get; set; }
    long Length { get; }
    long Seek(long offset, SeekOrigin origin);
}

public interface ISizable
{
    void SetLength(long value);
}

public interface IReaderAsync<T>
{
    ValueTask<int> ReadAsync(T[] buffer, int offset, int count);
    ValueTask<int> ReadAsync(T[] buffer, int offset, int count, CancellationToken cancellationToken);
}

public interface IWriterAsync<T>
{
    Task FlushAsync();
    Task FlushAsync(CancellationToken cancellationToken);
    Task WriteAsync(T[] buffer, int offset, int count);
    Task WriteAsync(T[] buffer, int offset, int count, CancellationToken cancellationToken);
}

public interface IBlockingDuplex<T> : IBlockingReader<T>, IBlockingWriter<T> { }
public interface ISeekableReader<T> : IBlockingReader<T>, ISeekable { }
public interface ISeekableWriter<T> : IBlockingWriter<T>, ISeekable, ISizable { }
public interface ISeekableDuplex<T> : ISeekableReader<T>, ISeekableWriter<T> { }

public interface IDuplexAsync<T> : IReaderAsync<T>, IWriterAsync<T> { }

public interface IBlockingReadStream<T> : IBlockingReader<T>, IDisposable { }
public interface IBlockingWriteStream<T> : IBlockingWriter<T>, IDisposable { }
public interface IBlockingDuplexStream<T> : IBlockingDuplex<T>, IBlockingReadStream<T>, IBlockingWriteStream<T> { }

public interface ISeekableStream : ISeekable, IDisposable { }
public interface ISeekableReadStream<T> : IBlockingReadStream<T>, ISeekableStream, ISeekableReader<T> { }
public interface ISeekableWriteStream<T> : IBlockingWriteStream<T>, ISeekableStream, ISeekableWriter<T> { }
public interface ISeekableDuplexStream<T> : ISeekableReadStream<T>, ISeekableWriteStream<T>, IBlockingDuplexStream<T>, ISeekableDuplex<T> { }

public interface IReadStreamAsync<T> : IReaderAsync<T>, IDisposable { }
public interface IWriteStreamAsync<T> : IWriterAsync<T>, IDisposable { }
public interface IDuplexStreamAsync<T> : IReadStreamAsync<T>, IWriteStreamAsync<T>, IDuplexAsync<T> { }

public interface IBlockingAsyncReader<T> : IReaderAsync<T>, IBlockingReader<T> { }
public interface IBlockingAsyncWriter<T> : IWriterAsync<T>, IBlockingWriter<T> { }
public interface IBlockingAsyncDuplex<T> : IDuplexAsync<T>, IBlockingDuplex<T> { }

public interface IBlockingAsyncReadStream<T> : IReadStreamAsync<T>, IBlockingReadStream<T>, IBlockingAsyncReader<T> { }
public interface IBlockingAsyncWriteStream<T> : IWriteStreamAsync<T>, IBlockingWriteStream<T>, IBlockingAsyncWriter<T> { }
public interface IBlockingAsyncDuplexStream<T> : IDuplexStreamAsync<T>, IBlockingDuplexStream<T>, IBlockingAsyncDuplex<T>, IBlockingAsyncReadStream<T>, IBlockingAsyncWriteStream<T> { }

Changes

  • Added Api Summary
  • Use ValueTask for Async Reads
  • Added Combined blocking+async interfaces
@weshaggard
Copy link
Member

@KrzysztofCwalina can you please have a look.

@ghost
Copy link

ghost commented Nov 28, 2015

This would be an awesome addition 👍

@GSPP
Copy link

GSPP commented Nov 28, 2015

(1) Useful stuff here but this needs to be simplified considerably. In particular, signatures such as:

public static void DoSomething<T>(T stream)
    where T : IBlockingReader<T>, ISeekable, ISizable, IDisposable

suggest that the API design is too complicated. Generics tend to infect calling code. Generics should not be required to pass around streams. I think a bit of runtime discovery is not at all a problem.

(2) Maybe we could get most of the benefits with two simple interfaces IReadableStream and IWritableStream? Those interfaces should cover most use cases I think.

(3) In general I think that generalizing streams to non-byte use cases sounds useful but I personally have never needed that. Whenever I had a similar need I used IEnumerable<T[]> or similar.

(4) Personally, I have needed runtime discovery to see whether the Length property can be read. I have need this in combination with some ZIP library which was repeatedly calling Length on some streaming data source and swallowing the exception at great loss of performance and debugger alert pollution. Can we have a bool CanReadLength member or something? This would need to be in a new interface for compatibility reasons.

(5) My idea always was to have a enum StreamCapabilities and expose that as a property. That way we can have fine-grained runtime capability discovery. This would be the second best solution in case we can't make static capabilities through typing work. We would need to have this:

interface IHasStreamCapabilities { StreamCapabilities Capabilities; }

And maybe extensions on Stream to quickly test capabilities.

Note, that static capabilities can never fully work in case the presence of a capability is dynamic. For example, the ability to write (CanWrite) might depend on the user logged in (e.g. permission checks). That makes me think that the StreamCapabilities idea is always a good thing.

@benaadams
Copy link
Member Author

@GSPP I may have spread the Api out too much so I included a summary at the bottom

(1) Addressed further down so you'd just use

public static void DoSomething(ISeekableDuplexStream<byte> stream)

(2) doesn't cover RW and ownership issues "Who calls disposable". The destructuring with inheritance is to allow only the expected use to be passed on. Imagine if IDictionary inherited from IReadOnlyDictionary; then any dictionary could be presented as a readonly one without any casting rather than having to currently sideways cast it into an unrelated type.

(3) This is a case of definition at the fringes; increasing glue code needed to get 3rd party compnents to work together. You'll use IEnumerable<T[]> someone else will want IEnumerable<T> and they'll pass it on to something that wants Read(T). If it was brought into BCL then everyone could use the same type increasing interoperability. Also IEnumerable suffers from boxing :(

(4) Here you'd want one of the Seekable interfaces probably ISeekableReader so as not to pass on the ability to dispose the stream as you would with ISeekableReadStream. With the upcasting you can open a ISeekableReadStream keep the ability to dispose yourself and just pass on ISeekableReader, leaving ownership clear.

(5) You can open as IDuplexStreamAsync; but when the user isn't logged in only call functions that take IReadStreamAsync and when they are logged in you can then use IDuplexStreamAsync functions.

Does that help?

@GSPP
Copy link

GSPP commented Nov 29, 2015

(1) yeah, that works but it's kind of a mess. Your implicit interface proposal would help address that but it does not remove the jungle of interfaces.

(3) That is true. Better to have a common standard.

(4) That does not work with dynamic capability discovery.

(5) This breaks down when I want to call a function that needs to behave differently depending on what runtime capabilities the stream has. I might not want to caller to make that decision.

(6) (New) Regarding ownership, what about a wrapping stream that allows you to suppress certain calls such as Close and Flush. I have needed both because libraries I was calling were closing and flushing my streams when they were not supposed to. Flushing for example can cause physical disk seeks when all you wanted was flush intermediate streams into the Windows file cache.

I solved that with wrapper streams that suppress disposal etc.

Maybe we can solve all ownership issues by declaring by convention that streams are always owned and if you don't like that don't do new C(steam), instead do new C(new UnownedStream(stream)).

Above I reported that some ZIP libraries accessed Length all the time. I solved that with a wrapper returning a bogus Length value. Nasty hack, but wrapping can go a long way.

For wrapping to work an interface based solution is not enough. Here, in order to pass information about capabilities you need data because you can't dynamically implement an interface.

@benaadams
Copy link
Member Author

(1) Have to work with how the type system currently works and to get the derivations correct. To the implementer you implement one interface repersenting the maximal interface you support. The user uses one interface; which is the minimal interface they require - the compiler does the rest.

(4) & (5) just use Stream here; am not suggesting taking it away

(6) Trying to avoid wrapping; though conversely you could for (4) & (5) create an abstract base and add the state properties on which then change for runtime discovery and you methods take the abstract base class type - though you would be violating the compile time contract, so just using Stream would probably be better.

@benaadams
Copy link
Member Author

Closing as Channels looks more appropriate https://github.com/davidfowl/Channels

@msftgits msftgits transferred this issue from dotnet/coreclr Jan 30, 2020
@msftgits msftgits added this to the Future milestone Jan 30, 2020
@ghost ghost locked as resolved and limited conversation to collaborators Jan 4, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants