From bc138b658e324b85bf18945daaa60beac75b85d7 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 22 Feb 2025 17:07:38 +0800 Subject: [PATCH] feat: Add emitMulti with Spliterator support --- .../stream/impl/GraphStageLogicSpec.scala | 20 +++++++++++ .../pekko/stream/stage/GraphStage.scala | 36 ++++++++++++++++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/GraphStageLogicSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/GraphStageLogicSpec.scala index 787bbd029eb..607e13c00e2 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/GraphStageLogicSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/GraphStageLogicSpec.scala @@ -121,6 +121,20 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S override def toString = "GraphStageLogicSpec.emitEmptyIterable" } + object emitSplitIterator extends GraphStage[SourceShape[Int]] { + val out = Outlet[Int]("out") + override val shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + setHandler(out, + new OutHandler { + override def onPull(): Unit = emitMultiple( + out, + java.util.stream.Stream.of(1, 2, 3).spliterator(), () => emit(out, 42, () => completeStage())) + }) + } + override def toString = "GraphStageLogicSpec.emitEmptyIterable" + } + private case class ReadNEmitN(n: Int) extends GraphStage[FlowShape[Int, Int]] { override val shape = FlowShape(Inlet[Int]("readN.in"), Outlet[Int]("readN.out")) @@ -196,6 +210,12 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S } + "emit properly when using split iterator" in { + + Source.fromGraph(emitSplitIterator).runWith(Sink.seq).futureValue should ===(List(1, 2, 3, 42)) + + } + "invoke lifecycle hooks in the right order" in { val g = new GraphStage[FlowShape[Int, Int]] { val in = Inlet[Int]("in") diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index b18630843fc..16ca4642fa2 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -21,7 +21,6 @@ import scala.annotation.tailrec import scala.collection.{ immutable, mutable } import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration - import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor._ @@ -37,6 +36,8 @@ import pekko.stream.stage.ConcurrentAsyncCallbackState.{ NoPendingEvents, State import pekko.util.OptionVal import pekko.util.unused +import java.util.Spliterator + /** * Scala API: A GraphStage represents a reusable graph stream processing operator. * @@ -979,6 +980,26 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: } } else andThen() + /** + * Emit a sequence of elements through the given outlet and continue with the given thunk + * afterwards, suspending execution if necessary. + * This action replaces the [[OutHandler]] for the given outlet if suspension + * is needed and reinstalls the current handler upon receiving an `onPull()` + * signal (before invoking the `andThen` function). + */ + final protected def emitMultiple[T](out: Outlet[T], elems: Spliterator[T], andThen: () => Unit): Unit = { + val iter = new EmittingSpliterator[T](out, elems, getNonEmittingHandler(out), andThen) + if (isAvailable(out)) { + if (!iter.tryPush()) { + andThen() + } else { + setOrAddEmitting(out, iter) + } + } else { + setOrAddEmitting(out, iter) + } + } + /** * Emit a sequence of elements through the given outlet, suspending execution if necessary. * This action replaces the [[OutHandler]] for the given outlet if suspension @@ -1118,6 +1139,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: } } + private final class EmittingSpliterator[T](_out: Outlet[T], elems: Spliterator[T], _previous: OutHandler, + _andThen: () => Unit) + extends Emitting[T](_out, _previous, _andThen) with java.util.function.Consumer[T] { + + override def onPull(): Unit = if (!elems.tryAdvance(this)) { + followUp() + } + + def tryPush(): Boolean = elems.tryAdvance(this) + + override def accept(elem: T): Unit = push(out, elem) + } + private class EmittingCompletion[T](_out: Outlet[T], _previous: OutHandler) extends Emitting[T](_out, _previous, DoNothing) { override def onPull(): Unit = complete(out)