Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API Proposal]: Aggregate streams and treat them as one #83416

Open
jozkee opened this issue Mar 14, 2023 · 9 comments
Open

[API Proposal]: Aggregate streams and treat them as one #83416

jozkee opened this issue Mar 14, 2023 · 9 comments
Labels
api-suggestion Early API idea and discussion, it is NOT ready for implementation area-System.IO
Milestone

Comments

@jozkee
Copy link
Member

jozkee commented Mar 14, 2023

Background and motivation

Another follow-up to our stream backlog, yet another popular operation requested for streams is the ability to aggregate multiple and treat them as one super stream.

While I haven't found any application of aggregated streams in our libraries, there's plenty of posts requesting it:
https://stackoverflow.com/q/3879152/4231460
https://www.c-sharpcorner.com/article/combine-multiple-streams-in-a-single-net-framework-stream-o/

API Proposal

namespace System.IO
{
    public class AggregatedStream : Stream
    {
        public Stream? Current { get; }
        public AggregatedStream(Stream first, Stream second, bool leaveOpen = false) { }
        public AggregatedStream(Stream[] streams, bool leaveOpen = false) { }
        
        // all the necessary overrides...
    }
}

API Usage

Stream[] streams = { stream1, stream2 };
using AggregatedStream aggregatedStream = new(streams);

int bytesRead;
byte[] buffer = new byte[1024];

while ((bytesRead = aggregatedStream.Read(buffer)) > 0) 
{
    // Do something...
}

Notes

Writability:

It is likely that we should not allow writing, e.g: when you create a new file (with File.Create and get the FileStream) it always starts with length zero, it seems unintuitive that you want to aggregate multiple streams for writing to them if this is the common case for most writable streams, unless you are using Streams with fixed length as a buffer/cache.

Seekability:

There's three ways to support seeking IMO.

  • The combined capabilities of all the inner streams: it may be restrictive over streams that support seeking.
  • The capabilities of the current inner stream: it may be inconsistent with the ones that do not support it.
  • Do not support seeking ever.

Disposing:

If leaveOpen is false, we can dispose all Streams once we finish reading them all, doing this will also enable seeking throughout multiple inner streams when supported.

Alternatively, early disposing could still be achieved by using the Current property:

while ((bytesRead = aggregatedStream.Read(buffer)) > 0)
{
    // Do something...
    // ...
    // dispose all streams that we no longer need.
    int idx = Array.IndexOf(streams, aggregatedStream.Current);
    for (int i = 0; i < idx; i++)
    {
        streams[i].Dispose();
    }
}

Alternative Designs

Alt. desing 1:
If we are not confident about needing a public Stream-derived type, we could just add a factory:

namespace System.IO
{
    public static class StreamFactory
    {
        public static Stream Aggregate(params Stream[] streams) { }
        public static Stream Aggregate(Stream[] streams, bool leaveOpen = false) { }
    }
}

Alt. design 2:
Use a different name. As prior-art we have string.Concat, we could follow the pattern and call them ConcatStream and StreamFactory.Concat respectively.

Another words I saw in the wild used interchangeably are concatenate, combine, chain, merge.

Risks

No response

@jozkee jozkee added the api-suggestion Early API idea and discussion, it is NOT ready for implementation label Mar 14, 2023
@ghost ghost added the untriaged New issue has not been triaged by the area owner label Mar 14, 2023
@ghost
Copy link

ghost commented Mar 14, 2023

Tagging subscribers to this area: @dotnet/area-system-io
See info in area-owners.md if you want to be subscribed.

Issue Details

Background and motivation

Another follow-up to our stream backlog, yet another popular operation requested for streams is the ability to aggregate multiple and treat them as one super stream.

While I haven't found any application of aggregated streams in our libraries, there's plenty of posts requesting it:
https://stackoverflow.com/q/3879152/4231460
https://www.c-sharpcorner.com/article/combine-multiple-streams-in-a-single-net-framework-stream-o/

API Proposal

namespace System.IO
{
    public class AggregatedStream : Stream
    {
        public Stream? Current { get; }
        public AggregatedStream(Stream first, Stream second, bool leaveOpen = false) { }
        public AggregatedStream(Stream[] streams, bool leaveOpen = false) { }
        
        // all the necessary overrides...
    }

    public static class StreamFactory
    {
        // convenience method, passes leaveOpen = false to the ctor.
        public static AggregatedStream Aggregate(params Stream[] streams) { }
    }
}

API Usage

Stream[] streams = { stream1, stream2 };
using AggregatedStream aggregatedStream = new(streams);

int bytesRead;
byte[] buffer = new byte[1024];

while ((bytesRead = aggregatedStream.Read(buffer)) > 0) 
{
    // Do something...
}

Notes

Writability:

It is likely that we should not allow writing, e.g: when you create a new file (with File.Create and get the FileStream) it always starts with length zero, it seems unintuitive that you want to aggregate multiple streams for writing to them if this is the common case for most writable streams, unless you are using Streams with fixed length as a buffer/cache.

Seekability:

There's three ways to support seeking IMO.

  • The combined capabilities of all the inner streams: it may be restrictive over streams that support seeking.
  • The capabilities of the current inner stream: it may be inconsistent with the ones that do not support it.
  • Do not support seeking ever.

Disposing:

If leaveOpen is false, we can dispose all Streams once we finish reading them all, doing this will also enable seeking throughout multiple inner streams when supported.

Alternatively, early disposing could still be achieved by using the Current property:

while ((bytesRead = aggregatedStream.Read(buffer)) > 0)
{
    // Do something...
    // ...
    // dispose all streams that we no longer need.
    int idx = Array.IndexOf(streams, aggregatedStream.Current);
    for (int i = 0; i < idx; i++)
    {
        streams[i].Dispose();
    }
}

Alternative Designs

Alt. desing 1:
We could leave out AggregateStream form the public and just keep the factory:

namespace System.IO
{
    public static class StreamFactory
    {
        public static Stream Aggregate(params Stream[] streams) { }
    }
}

Alt. design 2:
Use a different name. As prior-art we have string.Concat, we could follow the pattern and call them ConcatStream and StreamFactory.Concat respectively.

Another words I saw in the wild used to describe the same concept: concatenate, combine, chain, merge.

Risks

No response

Author: Jozkee
Assignees: -
Labels:

api-suggestion, area-System.IO, untriaged

Milestone: -

@jozkee jozkee added this to the 8.0.0 milestone Mar 14, 2023
@ghost ghost removed the untriaged New issue has not been triaged by the area owner label Mar 14, 2023
@jozkee jozkee self-assigned this Mar 14, 2023
@jeffhandley
Copy link
Member

One note on the naming. "Aggregate" does not communicate the way in which the streams are aggregated. A word like "Concatenate" can communicate more precisely what form of aggregation will be used.

Aggregate:

taking all units as a whole

Concatenate:

to link together in a series or chain

@vukovinski
Copy link

If writing to multiple streams would not be the primary use case (which I don't understand why), what would be the semantics of the obverse case, i.e. reading from multiple streams?

Could this be implemented as delegate based with asynchronous operations provided by the abstraction itself?

public class AggregatedReadStreamBuilder
{
    public AggregatedReadStreamBuilder(Action<Span<byte>,Span<byte>> onBothRead) { ... }
    public AggregatedReadStreamBuilder(Action<Span<byte>> onFirstRead, Action<Span<byte>> onSecondRead) { ... }
}

Just brainstorming. But I don't see the immediate value of an abstraction such as AggregatedStream. I do think there are some more derived versions with more clear use cases would be a better way forward: such as a read only 2 stream combinator that tries reading from the left stream and if that fails, tries reading from the right stream. Or a 3 stream variant of the same. However, for writing, those combinators don't make much sense because, likely, you will want all writes to succeed. 🫤

@jozkee
Copy link
Member Author

jozkee commented Mar 20, 2023

what would be the semantics of the obverse case, i.e. reading from multiple streams?

For both read and write, this type is a concatenation of the specified first and second Streams. As @jeffhandley pointed out, we need a name that better alludes at a concat. operation.

Could this be implemented as delegate based with asynchronous operations provided by the abstraction itself?

For this type, I don't think that's desired if we want to keep it simple.
Having a type that reads/writes from multiple backing streams at once asynchronously and reports the results via a delegate is specific enough to have its own API proposal IMO.

But I don't see the immediate value of an abstraction such as AggregatedStream.

Can you please elaborate? In the StackOverflow post I linked, it seems to me that stream concatenation is quite a common thing needed by .NET developers. If this API has potential to be problematic (pit-of-failure), that I can buy, but I need you (or anyone) to state the reasons.

@vukovinski
Copy link

vukovinski commented Mar 20, 2023

Looking at the article, yes, the below makes much more sense.

https://stackoverflow.com/q/3879152/4231460

It uses the IEnumerable interface in the constructor, which is much more potent than the original proposal, and it has a simple semantics, as both of you mentioned, concatenation, therefore, one stream after the other.

The source of my confusion might be that backlog on streams, which implies some bigger design work.

class ConcatenatedStream : Stream
{
    Queue<Stream> streams;

    public ConcatenatedStream(IEnumerable<Stream> streams)
    {
        this.streams = new Queue<Stream>(streams);
    }
}

@jeffhandley
Copy link
Member

We're moving this out to Future and it will be considered again for .NET 9.

@jeffhandley jeffhandley modified the milestones: 8.0.0, Future May 16, 2023
@WeihanLi
Copy link
Contributor

think this can be related to the TextWriter.CreateBroadcasting
#93623

maybe we could use CreateBroadcasting to align the new TextWriter.CreateBroadcasting API

namespace System.IO;

public static class Stream
{
    public static Stream CreateBroadcasting(params Stream[] streams);
    public static Stream CreateBroadcasting(Stream[] streams, bool leaveOpen = false);
}

@jozkee
Copy link
Member Author

jozkee commented Apr 2, 2024

@WeihanLi no, TextWriter.CreateBroadcasting writes the same input to all writers. In this case, you want to read or write sequentially to the inner streams and only jump to the next one when the current inner stream position reaches the end.

@WeihanLi
Copy link
Contributor

WeihanLi commented Apr 3, 2024

@jozkee many thanks for the explanation

@jozkee jozkee removed their assignment Jan 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api-suggestion Early API idea and discussion, it is NOT ready for implementation area-System.IO
Projects
None yet
Development

No branches or pull requests

4 participants