-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convenience layer over isolates for function computation (e.g. streaming computations) #48579
Comments
We could resurrect I think We can do an |
I'd suggest something like static Stream<T> stream<T>(Stream<T> Function() computation, {String? debugName}) => ... which also forwards flow back pressure commands (pause/resume/cancel) in the other direction. The I don't want to include buffering in this function. I'd rather make a buffering abstraction that you can wrap each end in if you want to. (I don't think buffering is something that works in general for arbitrary streams, it needs specific knowledge about the relation between events. A (It's possible to use static Stream<T> streamEvents<T>(void Function(void Function(T) emit) computation), ...) then using isolate based control would be reasonable. I don't think I'd want something like this in the SDK.) |
Agree.
👍
The reason I've mentioned it is that I think often one may stream elements that are small and frequent. Overhead may be dominated by communication (rather than message sizes themselves). The On the other hand, if one doesn't wait for received-signal from consumer, there will be a buffer (namely the event loop) - which we don't want to grow unboundedly. So for performance it would be good to avoid frequent pause/resumes which are significantly more expensive between isolates than inside one isolate and at the same time avoid unbounded growth of event loop buffer. That's why I think it may be a good idea to consider buffering in the design here. The buffering should preferably be configurable - e.g. support for specialized buffer sizes, so if one has a |
That makes sense. A buffering to prevent eagerly pausing/resuming can be built into this operation, so that if you pause, the remote end is not notified until at least n elements have been received. That's still something that can be implemented entirely on the receiving side, like extension StreamBuffer<T> on Stream<T> {
/// Delays pause/resume requests on this stream by buffering events.
///
/// When the returned stream is paused, the source stream is not immediately
/// paused. Only when the buffer has filled up will the pause be forwarded to the source.
/// After that, if the returned stream is resumed while there are still buffered events,
/// the source stream is not resumed until all buffered events have been delivered.
Stream<T> buffer(int size) { ... }
} It doesn't have to be built into the Or we can configure the buffering by introducing a |
Since it appears that |
(@kevmoo pointed to https://github.com/sourcegraph/conc for concurrency abstractions used in Go, we could use as an inspiration) |
I can definitely do an /// Runs [computation] in a new isolate and sends the stream events back.
///
/// Returns a stream which gets events from [computation], which is running in
/// a new isolate. Pausing, resuming and cancelling the stream subscription
/// is forwarded to the isolate running [computation].
/// The isolate terminates when the stream ends or the subscription is cancelled.
/// The returned stream also ends if the isolate terminates early,
/// and it emits extra errors if the isolate of [computation] has any uncaught
/// errors.
Stream<T> stream(Stream<T> Function() computation); I can see some possible variants of such a function:
There are also different ways to handle pause/resume/cancel:
I guess we can implement all combinations and provide flags to choose, or we can be opinionated and give you just one. import "dart:isolate";
import "dart:async";
Stream<T> stream<T>(Stream<T> Function() computation) =>
Stream<T>.multi((c) async {
// Set if subscription.cancel is called.
Completer<void>? cancelCompleter;
var eventPort = RawReceivePort();
SendPort? controlPortForCancel;
c.onCancel = () {
var completer = cancelCompleter = Completer<void>();
controlPortForCancel?.send("cancel");
return completer.future;
};
eventPort.handler = (firstMessage) {
var controlPort = controlPortForCancel = firstMessage as SendPort;
eventPort.handler = (message) {
switch (message) {
// Stream events.
case (T value,):
c.add(value);
case (Object e, StackTrace s):
c.addError(e, s);
// Subscription cancel future results.
case null:
eventPort.close();
cancelCompleter?.complete(null);
case (Object e, StackTrace s, null):
eventPort.close();
cancelCompleter?.completeError(e, s);
// Wat?
default:
c.addError(UnsupportedError("Unknown message: $message"));
}
};
if (!c.hasListener) {
assert(cancelCompleter != null);
controlPort.send("cancel");
return;
}
if (c.isPaused) {
controlPort.send("pause");
}
c
..onPause = () {
controlPort.send("pause");
}
..onResume = () {
controlPort.send("resume");
}
..onCancel = () {
var completer = cancelCompleter = Completer<void>();
controlPort.send("cancel");
return completer.future;
};
};
try {
// Remote run ends when stream completes or isolate dies.
await Isolate.run(_remote<T>(computation, eventPort.sendPort));
} catch (e, s) {
// Isolate terminated with an error.
c.addError(e, s);
}
eventPort.close();
c.close();
});
Future<void> Function() _remote<T>(
Stream<T> Function() computation, SendPort eventPort) =>
() {
var done = Completer<void>();
var stream = computation();
var controlPort = RawReceivePort();
eventPort.send(controlPort.sendPort);
var subscription = stream.listen((value) {
eventPort.send((value,));
}, onError: (Object e, StackTrace s) {
eventPort.send((e, s));
}, onDone: () {
controlPort.close();
done.complete();
});
controlPort.handler = (message) {
switch (message) {
case "pause":
subscription.pause();
case "resume":
subscription.resume();
case "cancel":
controlPort.close();
unawaited(subscription.cancel().then<Never>((_) {
Isolate.exit(eventPort, null);
}, onError: (Object e, StackTrace s) {
Isolate.exit(eventPort, (e, s, null));
}));
}
};
return done.future;
};
void main() async {
await for (var i in numbers()) {
print(i);
if (i == 5) break;
}
}
Stream<int> numbers() => Stream.periodic(const Duration(seconds: 1), (i) => i); It's not a deep problem, it's just about figuring out which messages to send and what to do with them. |
One feature which could make distributed message passing more viable, would be receive ports which won't keep the isolate alive. (Maybe even be GC'able while open, if nobody keeps a reference to it.) A multi-isolate It'd probably still need one primary isolate to keep the buffer, and stay alive, to avoid multiple listeners consuming the same value. Another useful feature would be to ask a send port whether its receive port is still listening, other than waiting for a timeout. Again, maybe something can be built to keep a bunch of ports aware of each other. It just sounds like something which can be very fragile if assumptions change. |
Now that we have lightweight isolates (#36097) we should consider adding convenience abstraction layers on top of the bare-minimum
Isolate
class itself.The change in 05322f2 added one specific abstraction, namely
Isolate.run<T>()
- allowing one to compute one function and get the result back.One may want other kinds of helpers, e.g.
Isolate.stream()
), with cancellation supportReceivePort
/SendPort
)StreamIterator
but with configurable buffer/pause signal)The question is: Would we want to have such convenience layers as part of more methods on the
Isolate
class itself or rather build on top, possibly in a package. If the latter: Why haveIsolate.run()
as part of core libraries?/cc @mit-mit @lrhn
The text was updated successfully, but these errors were encountered: