Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: optmize recoverWith to avoid some materialization #1775

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.Utils._
import pekko.stream.testkit.scaladsl.TestSink

import scala.concurrent.Future

@nowarn // tests deprecated APIs
class FlowRecoverWithSpec extends StreamSpec {

Expand Down Expand Up @@ -62,6 +64,32 @@ class FlowRecoverWithSpec extends StreamSpec {
.expectComplete()
}

"recover with a completed future source" in {
Source.failed(ex)
.recoverWith { case _: Throwable => Source.future(Future.successful(3)) }
.runWith(TestSink[Int]())
.request(1)
.expectNext(3)
.expectComplete()
}

"recover with a failed future source" in {
Source.failed(ex)
.recoverWith { case _: Throwable => Source.future(Future.failed(ex)) }
.runWith(TestSink[Int]())
.request(1)
.expectError(ex)
}

"recover with a java stream source" in {
Source.failed(ex)
.recoverWith { case _: Throwable => Source.fromJavaStream(() => java.util.stream.Stream.of(1, 2, 3)) }
.runWith(TestSink[Int]())
.request(3)
.expectNextN(1 to 3)
.expectComplete()
}

"recover with single source" in {
Source(1 to 4)
.map { a =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.util.{ Failure, Success, Try }
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.control.Exception.Catcher

import org.apache.pekko
import pekko.actor.{ ActorRef, Terminated }
import pekko.annotation.InternalApi
Expand All @@ -36,9 +35,16 @@ import pekko.stream.Attributes.{ InputBuffer, LogLevels }
import pekko.stream.Attributes.SourceLocation
import pekko.stream.OverflowStrategies._
import pekko.stream.Supervision.Decider
import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation, ReactiveStreamsCompliance, TraversalBuilder }
import pekko.stream.impl.{
Buffer => BufferImpl,
ContextPropagation,
FailedSource,
JavaStreamSource,
ReactiveStreamsCompliance,
TraversalBuilder
}
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SimpleLinearGraphStage, SingleSource }
import pekko.stream.scaladsl.{ DelayStrategy, Source }
import pekko.stream.stage._
import pekko.util.{ unused, ConstantFun, OptionVal }
Expand Down Expand Up @@ -2173,12 +2179,28 @@ private[pekko] object TakeWithin {
case _: NotApplied.type => failStage(ex)
case source: Graph[SourceShape[T] @unchecked, M @unchecked] if TraversalBuilder.isEmptySource(source) =>
completeStage()
case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
TraversalBuilder.getSingleSource(other) match {
case OptionVal.Some(singleSource) =>
emit(out, singleSource.elem.asInstanceOf[T], () => completeStage())
case source: Graph[SourceShape[T] @unchecked, M @unchecked] =>
TraversalBuilder.getValuePresentedSource(source) match {
case OptionVal.Some(graph) => graph match {
case singleSource: SingleSource[T @unchecked] => emit(out, singleSource.elem, () => completeStage())
case failed: FailedSource[T @unchecked] => failStage(failed.failure)
case futureSource: FutureSource[T @unchecked] => futureSource.future.value match {
case Some(Success(elem)) => emit(out, elem, () => completeStage())
case Some(Failure(ex)) => failStage(ex)
case None =>
switchTo(source)
attempt += 1
}
case iterableSource: IterableSource[T @unchecked] =>
emitMultiple(out, iterableSource.elements, () => completeStage())
case javaStreamSource: JavaStreamSource[T @unchecked, _] =>
emitMultiple(out, javaStreamSource.open().iterator(), () => completeStage())
case _ =>
switchTo(source)
attempt += 1
}
case _ =>
switchTo(other)
switchTo(source)
attempt += 1
}
case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser
Expand Down