Skip to content

Experimenting with flatMap #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ enablePlugins(ScalaJSBundlerPlugin)

libraryDependencies ++= Seq(
"org.scala-js" %%% "scalajs-dom" % "0.9.6", // This has no runtime cost. We only use it for `Debug.log` // @TODO[Elegance] Reconsider
"org.scalatest" %%% "scalatest" % "3.0.5" % Test
"org.scalatest" %%% "scalatest" % "3.0.6" % Test
)

useYarn := true
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
logLevel := Level.Warn

addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.25")
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.26")

addSbtPlugin("ch.epfl.scala" % "sbt-scalajs-bundler" % "0.12.0")

Expand Down
2 changes: 1 addition & 1 deletion release.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ normalizedName := "airstream"

organization := "com.raquo"

scalaVersion := "2.12.6"
scalaVersion := "2.12.8"

// Scala 2.11 does not include Try.fold method which we use heavily.
// I don't think anyone would want to use Airstream / Laminar with 2.11 anyway.
Expand Down
32 changes: 16 additions & 16 deletions src/main/scala/com/raquo/airstream/core/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,14 @@ object Observable {
implicit val switchStreamStrategy: FlattenStrategy[Observable, EventStream, EventStream] = SwitchStreamStrategy

// @TODO[Elegance] Maybe use implicit evidence on a method instead?
implicit class MetaObservable[A, Outer[+_] <: Observable[_], Inner[_]](
val parent: Outer[Inner[A]]
implicit class MetaObservable[A, Inner[_]](
val parent: Observable[Inner[A]]
) extends AnyVal {

@inline def flatten[Output[+_] <: Observable[_]](
implicit strategy: FlattenStrategy[Outer, Inner, Output]
@inline def flatten[Output[_]](
implicit strategy: FlattenStrategy[Observable, Inner, Output]
): Output[A] = {
strategy.flatten(parent)
strategy.flatMap[Inner[A], A](parent, identity)
}

// @TODO one of the problems is that Outer.map(compose) is not proven to return the same Outer container, as Observable.map returns a Self type.
Expand All @@ -218,16 +218,16 @@ object Observable {
// }
}

// implicit class MetaLazyObservable[A, Inner[_]](
// val parent: Observable[Inner[A]]
// ) extends AnyVal {
//
// // @TODO Does this even work? Seems like type inference is broken
// /** @param compose Note: guarded against exceptions */
// @inline def flatMap[B, Inner2[_], Output[+_] <: Observable[_]](compose: Inner[A] => Inner2[B])(
// implicit strategy: FlattenStrategy[Observable, Inner2, Output]
// ): Output[B] = {
implicit class MetaLazyObservable[A](
val parent: Observable[A]
) extends AnyVal {

/** @param compose Note: guarded against exceptions */
@inline def flatMap[B, Inner[_], Output[_]](compose: A => Inner[B])(
implicit strategy: FlattenStrategy[Observable, Inner, Output]
): Output[B] = {
strategy.flatMap(parent, compose)
// strategy.flatten(parent.map(compose))
// }
// }
}
}
}
26 changes: 13 additions & 13 deletions src/main/scala/com/raquo/airstream/features/FlattenStrategy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,42 @@ import com.raquo.airstream.eventstream.{ConcurrentFutureStream, EventStream, Fut
import scala.concurrent.Future

/** [[Observable.MetaObservable.flatten]] needs an instance of this trait to know how exactly to do the flattening. */
trait FlattenStrategy[-Outer[+_] <: Observable[_], -Inner[_], Output[+_] <: Observable[_]] {
trait FlattenStrategy[Outer[_], Inner[_], Output[_]] {

/** Must not throw */
def flatten[A](parent: Outer[Inner[A]]): Output[A]
def flatMap[A, B](outer: Outer[A], project: A => Inner[B]): Output[B]
}

object FlattenStrategy {

/** See docs for [[SwitchEventStream]] */
object SwitchStreamStrategy extends FlattenStrategy[Observable, EventStream, EventStream] {
override def flatten[A](parent: Observable[EventStream[A]]): EventStream[A] = {
new SwitchEventStream[EventStream[A], A](parent = parent, makeStream = identity)
override def flatMap[A, B](outer: Observable[A], project: A => EventStream[B]): EventStream[B] = {
new SwitchEventStream[EventStream[B], B](parent = outer.map(project), makeStream = identity)
}
}

/** See docs for [[SwitchEventStream]] */
object SwitchFutureStrategy extends FlattenStrategy[Observable, Future, EventStream] {
override def flatten[A](parent: Observable[Future[A]]): EventStream[A] = {
new SwitchEventStream[Future[A], A](
parent = parent,
makeStream = new FutureEventStream(_, emitIfFutureCompleted = true)
override def flatMap[A, B](outer: Observable[A], project: A => Future[B]): EventStream[B] = {
new SwitchEventStream[A, B](
parent = outer,
makeStream = a => new FutureEventStream(project(a), emitIfFutureCompleted = true)
)
}
}

/** See docs for [[ConcurrentFutureStream]] */
object ConcurrentFutureStrategy extends FlattenStrategy[Observable, Future, EventStream] {
override def flatten[A](parent: Observable[Future[A]]): EventStream[A] = {
new ConcurrentFutureStream[A](parent, dropPreviousValues = false, emitIfFutureCompleted = true)
override def flatMap[A, B](outer: Observable[A], project: A => Future[B]): EventStream[B] = {
new ConcurrentFutureStream[B](outer.map(project), dropPreviousValues = false, emitIfFutureCompleted = true)
}
}

// @TODO[Naming] this strategy needs a better name
/** See docs for [[ConcurrentFutureStream]] */
object OverwriteFutureStrategy extends FlattenStrategy[Observable, Future, EventStream] {
override def flatten[A](parent: Observable[Future[A]]): EventStream[A] = {
new ConcurrentFutureStream[A](parent, dropPreviousValues = true, emitIfFutureCompleted = true)
override def flatMap[A, B](outer: Observable[A], project: A => Future[B]): EventStream[B] = {
new ConcurrentFutureStream[B](outer.map(project), dropPreviousValues = true, emitIfFutureCompleted = true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class EventStreamFlattenFutureSpec extends AsyncSpec {
val promise5 = makePromise()

val futureBus = new EventBus[Future[Int]]()
val stream = futureBus.events.flatten(OverwriteFutureStrategy)
val stream: EventStream[Int] = futureBus.events.flatten(OverwriteFutureStrategy)

stream.addObserver(obs)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package com.raquo.airstream.eventstream

import com.raquo.airstream.AsyncSpec
import com.raquo.airstream.eventbus.EventBus
import com.raquo.airstream.fixtures.{Effect, TestableOwner}
import com.raquo.airstream.ownership.Owner

import scala.collection.mutable

class EventStreamFlattenSpec extends AsyncSpec {

it("sync map-flatten works") {

implicit val owner: Owner = new TestableOwner

val range = 0 to 3
val stream = EventStream.fromSeq(range)
val flatStream =
stream
.map { v =>
EventStream.fromSeq(Seq(v * 3))
}
.flatten

val effects = mutable.Buffer[Effect[_]]()
val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue))

subscription0.kill()
effects.toList shouldBe range.map(i => Effect("obs0", i*3))
}

it("sync three-level map-flatten works") {

implicit val owner: Owner = new TestableOwner

val range = 0 to 3
val stream = EventStream.fromSeq(range)
val flatStream =
stream
.map { v =>
EventStream.fromSeq(Seq(v * 3)).map { vv =>
EventStream.fromSeq(Seq(vv * 7))
}.flatten
}
.flatten

val effects = mutable.Buffer[Effect[_]]()
val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue))

subscription0.kill()
effects.toList shouldBe range.map(i => Effect("obs0", i * 3 * 7))
}

private def delayedStream(points: Seq[Int], interval: Int, valueF: Int => Int = identity): EventStream[Int] = {
val bus = new EventBus[Int]()
points.foreach { i =>
delay(i * interval) {
bus.writer.onNext(valueF(i))
}
}
bus.events
}

/** Stability: make sure the outer delayedStream interval is large enough to ensure all events
* emitted by the inner delayedStream are processed. Just because the interval is set to 6ms
* does not mean that this is what it will be. It's merely the lower bound.
*/
it("from-future map-flatten works") {
implicit val owner: Owner = new TestableOwner

val range1 = 1 to 3
val range2 = 1 to 2
val stream = delayedStream(range1, interval = 30)

val flatStream =
stream
.map { v =>
delayedStream(range2, interval = 6, _ * v)
}
.flatten

val effects = mutable.Buffer[Effect[_]]()
val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue))

delay(150) {
subscription0.kill()
effects.toList shouldBe range1.flatMap(i =>
range2.map(j =>
Effect("obs0", i * j)
)

)
}
}

/** Stability: make sure the outer delayedStream interval is large enough to ensure all events
* emitted by the inner delayedStream are processed. Just because the interval is set to 6ms
* does not mean that this is what it will be. It's merely the lower bound.
*/
it("three-level from-future map-flatten works") {
implicit val owner: Owner = new TestableOwner

val range1 = 1 to 3
val range2 = 1 to 2
val stream = delayedStream(range1, interval = 30)

val flatStream =
stream
.map { v =>
delayedStream(range2, interval = 6, _ * v).map { vv =>
EventStream.fromFuture(delay(1)(vv * 7))
}.flatten
}
.flatten

val effects = mutable.Buffer[Effect[_]]()
val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue))

delay(150) {
subscription0.kill()
effects.toList shouldBe range1.flatMap(i =>
range2.map(j => Effect("obs0", i * j * 7))
)
}
}

it("sync flatMap works") {

implicit val owner: Owner = new TestableOwner

val range = 0 to 3
val stream = EventStream.fromSeq(range)
val flatStream =
stream
.flatMap { v =>
EventStream.fromSeq(Seq(v * 3))
}

val effects = mutable.Buffer[Effect[_]]()
val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue))

subscription0.kill()
effects.toList shouldBe range.map(i => Effect("obs0", i*3))
}

it("sync three-level flatMap works") {

implicit val owner: Owner = new TestableOwner

val range = 0 to 3
val stream = EventStream.fromSeq(range)
val flatStream =
stream
.flatMap { v =>
EventStream.fromSeq(Seq(v * 3)).flatMap { vv =>
EventStream.fromSeq(Seq(vv * 7))
}
}

val effects = mutable.Buffer[Effect[_]]()
val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue))

subscription0.kill()
effects.toList shouldBe range.map(i => Effect("obs0", i * 3 * 7))
}

/** Stability: make sure the outer delayedStream interval is large enough to ensure all events
* emitted by the inner delayedStream are processed. Just because the interval is set to 6ms
* does not mean that this is what it will be. It's merely the lower bound.
*/
it("from-future flatMap works") {
implicit val owner: Owner = new TestableOwner

val range1 = 1 to 3
val range2 = 1 to 2
val stream = delayedStream(range1, interval = 30)

val flatStream =
stream
.flatMap { v =>
delayedStream(range2, interval = 6, _ * v)
}

val effects = mutable.Buffer[Effect[_]]()
val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue))

delay(150) {
subscription0.kill()
effects.toList shouldBe range1.flatMap(i =>
range2.map(j =>
Effect("obs0", i * j)
)

)
}
}

/** Stability: make sure the outer delayedStream interval is large enough to ensure all events
* emitted by the inner delayedStream are processed. Just because the interval is set to 6ms
* does not mean that this is what it will be. It's merely the lower bound.
*/
it("three-level from-future flatMap works") {
implicit val owner: Owner = new TestableOwner

val range1 = 1 to 3
val range2 = 1 to 2
val stream = delayedStream(range1, interval = 30)

val flatStream =
stream
.flatMap { v =>
delayedStream(range2, interval = 6, _ * v).flatMap { vv =>
EventStream.fromFuture(delay(1)(vv * 7))
}
}

val effects = mutable.Buffer[Effect[_]]()
val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue))

delay(150) {
subscription0.kill()
effects.toList shouldBe range1.flatMap(i =>
range2.map(j => Effect("obs0", i * j * 7))
)
}
}

}
Loading