Skip to content

Commit

Permalink
[prelude] make Stream lazier (#851)
Browse files Browse the repository at this point in the history
- stop using class member, enforcing accidental eagerness
- add new method: Stream.range to simplify creation of streams
- init/range: accept `chunkSize` to limit size of Chunks- Chunk.from
(Array): remove AnyRef type bound

The first PR to address #679. This change is important to ensure the
laziness of streams initialized from non-effectful sources.
  • Loading branch information
hearnadam authored Nov 20, 2024
1 parent 25ba284 commit 11f2447
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 22 deletions.
7 changes: 5 additions & 2 deletions kyo-data/shared/src/main/scala/kyo/Chunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,11 @@ object Chunk:
* @return
* a new Chunk.Indexed containing the elements from the Array
*/
def from[A <: AnyRef](values: Array[A]): Chunk.Indexed[A] =
Compact(Arrays.copyOf(values, values.length))
def from[A](values: Array[A]): Chunk.Indexed[A] =
if values.isEmpty then cachedEmpty.asInstanceOf[Chunk.Indexed[A]]
else
Compact(Array.copyAs(values, values.length)(using ClassTag.AnyRef).asInstanceOf[Array[A]])
end from

/** Creates a Chunk from a Seq of elements.
*
Expand Down
107 changes: 87 additions & 20 deletions kyo-prelude/shared/src/main/scala/kyo/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kyo.Emit.Ack
import kyo.Emit.Ack.*
import kyo.Tag
import kyo.kernel.ArrowEffect
import scala.annotation.nowarn
import scala.annotation.targetName

/** Represents a stream of values of type `V` with effects of type `S`.
Expand All @@ -19,13 +20,13 @@ import scala.annotation.targetName
* @param v
* The effect that produces acknowledgments and emits chunks of values
*/
final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
sealed abstract class Stream[V, -S]:

/** Returns the effect that produces acknowledgments and emits chunks of values. */
def emit: Ack < (Emit[Chunk[V]] & S) = v
def emit: Ack < (Emit[Chunk[V]] & S)

private def continue[S2](f: Int => Ack < (Emit[Chunk[V]] & S & S2))(using Frame): Stream[V, S & S2] =
Stream(v.map {
Stream(emit.map {
case Stop => Stop
case Continue(n) => f(n)
})
Expand Down Expand Up @@ -63,7 +64,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
tagV2: Tag[Emit[Chunk[V2]]],
frame: Frame
): Stream[V2, S & S2] =
Stream[V2, S & S2](ArrowEffect.handleState(tagV, (), v)(
Stream[V2, S & S2](ArrowEffect.handleState(tagV, (), emit)(
[C] =>
(input, _, cont) =>
if input.isEmpty then
Expand All @@ -85,7 +86,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
tagV2: Tag[Emit[Chunk[V2]]],
frame: Frame
): Stream[V2, S & S2 & S3] =
Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), v)(
Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), emit)(
[C] =>
(input, _, cont) =>
Kyo.foldLeft(input)(Continue(): Ack) { (ack, v) =>
Expand All @@ -108,7 +109,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
tagV2: Tag[Emit[Chunk[V2]]],
frame: Frame
): Stream[V2, S & S2 & S3] =
Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), v)(
Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), emit)(
[C] =>
(input, _, cont) =>
if input.isEmpty then
Expand All @@ -118,7 +119,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
))

private def discard(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
Stream(ArrowEffect.handle(tag, v)(
Stream(ArrowEffect.handle(tag, emit)(
[C] => (input, cont) => cont(Stop)
))

Expand All @@ -132,7 +133,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
def take(n: Int)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
if n <= 0 then discard
else
Stream[V, S](ArrowEffect.handleState(tag, n, v)(
Stream[V, S](ArrowEffect.handleState(tag, n, emit)(
[C] =>
(input, state, cont) =>
if state == 0 then
Expand All @@ -153,7 +154,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
def drop(n: Int)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
if n <= 0 then this
else
Stream[V, S](ArrowEffect.handleState(tag, n, v)(
Stream[V, S](ArrowEffect.handleState(tag, n, emit)(
[C] =>
(input, state, cont) =>
if state == 0 then
Expand All @@ -172,7 +173,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A new stream containing elements that satisfy the predicate
*/
def takeWhile[S2](f: V => Boolean < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S & S2] =
Stream[V, S & S2](ArrowEffect.handleState(tag, true, v)(
Stream[V, S & S2](ArrowEffect.handleState(tag, true, emit)(
[C] =>
(input, state, cont) =>
if !state then (false, cont(Stop))
Expand All @@ -191,7 +192,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A new stream with initial elements that satisfy the predicate removed
*/
def dropWhile[S2](f: V => Boolean < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S & S2] =
Stream[V, S & S2](ArrowEffect.handleState(tag, true, v)(
Stream[V, S & S2](ArrowEffect.handleState(tag, true, emit)(
[C] =>
(input, state, cont) =>
if state then
Expand All @@ -211,7 +212,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A new stream containing only elements that satisfy the predicate
*/
def filter[S2](f: V => Boolean < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S & S2] =
Stream[V, S & S2](ArrowEffect.handleState(tag, (), v)(
Stream[V, S & S2](ArrowEffect.handleState(tag, (), emit)(
[C] =>
(input, _, cont) =>
Kyo.filter(input)(f).map { c =>
Expand Down Expand Up @@ -247,7 +248,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
*/
@targetName("changesMaybe")
def changes(first: Maybe[V])(using tag: Tag[Emit[Chunk[V]]], frame: Frame, ce: CanEqual[V, V]): Stream[V, S] =
Stream[V, S](ArrowEffect.handleState(tag, first, v)(
Stream[V, S](ArrowEffect.handleState(tag, first, emit)(
[C] =>
(input, state, cont) =>
val c = input.changes(state)
Expand All @@ -264,7 +265,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A unit effect that runs the stream without collecting results
*/
def runDiscard(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Unit < S =
ArrowEffect.handle(tag, v.unit)(
ArrowEffect.handle(tag, emit.unit)(
[C] => (input, cont) => cont(Continue())
)

Expand All @@ -286,7 +287,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A unit effect that runs the stream and applies f to each chunk
*/
def runForeachChunk[S2](f: Chunk[V] => Unit < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Unit < (S & S2) =
ArrowEffect.handle(tag, v.unit)(
ArrowEffect.handle(tag, emit.unit)(
[C] =>
(input, cont) =>
if !input.isEmpty then
Expand All @@ -305,7 +306,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* The final accumulated value
*/
def runFold[A, S2](acc: A)(f: (A, V) => A < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): A < (S & S2) =
ArrowEffect.handleState(tag, acc, v)(
ArrowEffect.handleState(tag, acc, emit)(
handle = [C] =>
(input, state, cont) =>
Kyo.foldLeft(input)(state)(f).map((_, cont(Continue()))),
Expand All @@ -318,7 +319,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A chunk containing all values emitted by the stream
*/
def run(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Chunk[V] < S =
ArrowEffect.handleState(tag, Chunk.empty[Chunk[V]], v)(
ArrowEffect.handleState(tag, Chunk.empty[Chunk[V]], emit)(
handle = [C] =>
(input, state, cont) =>
(state.append(input), cont(Continue())),
Expand All @@ -328,25 +329,91 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
end Stream

object Stream:
@nowarn("msg=anonymous")
inline def apply[V, S](inline v: => Ack < (Emit[Chunk[V]] & S)): Stream[V, S] =
new Stream[V, S]:
def emit: Ack < (Emit[Chunk[V]] & S) = v

private val _empty = Stream(Stop)
def empty[V]: Stream[V, Any] = _empty.asInstanceOf[Stream[V, Any]]

def init[V, S](seq: Seq[V] < S)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
/** The default chunk size for streams. */
inline def DefaultChunkSize: Int = 4096

/** Creates a stream from a sequence of values.
*
* @param v
* The effect returning a sequence of values
* @param chunkSize
* The size of chunks to emit (default: 4096). Supplying a negative value will result in a chunk size of 1.
* @return
* A stream of values from the sequence
*/
def init[V, S](v: => Seq[V] < S, chunkSize: Int = DefaultChunkSize)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
Stream[V, S](
seq.map { seq =>
v.map { seq =>
val chunk: Chunk[V] = Chunk.from(seq)
val _chunkSize = chunkSize max 1
Emit.andMap(Chunk.empty[V]) { ack =>
Loop(chunk, ack) { (c, ack) =>
ack match
case Stop =>
Loop.done(Stop)
case Continue(n) =>
if c.isEmpty then Loop.done(Ack.Continue())
else Emit.andMap(c.take(n))(ack => Loop.continue(c.dropLeft(n), ack))
else
val i = n min _chunkSize
Emit.andMap(c.take(i))(ack => Loop.continue(c.dropLeft(i), ack))
}
}
}
)

/** Creates a stream of integers from start (inclusive) to end (exclusive).
*
* @param start
* The starting value (inclusive)
* @param end
* The ending value (exclusive)
* @param step
* The step size (default: 1)
* @param chunkSize
* The size of chunks to emit (default: 4096)
* @return
* A stream of integers within the specified range
*/
def range[S](start: Int, end: Int, step: Int = 1, chunkSize: Int = DefaultChunkSize)(using
tag: Tag[Emit[Chunk[Int]]],
frame: Frame
): Stream[Int, S] =
if step == 0 || (start < end && step < 0) || (start > end && step > 0) then empty
else
Stream[Int, S] {
val _chunkSize = chunkSize max 1

Emit.andMap(Chunk.empty[Int]) { ack =>
Loop(start, ack) { (current, ack) =>
ack match
case Stop =>
Loop.done(Stop)
case Continue(n) =>
val continue =
if step > 0 then current < end
else current > end

if !continue then Loop.done(Stop)
else
val remaining =
if step > 0 then
((end - current - 1) / step).abs + 1
else
((current - end - 1) / step.abs).abs + 1
val size = (n min _chunkSize) min remaining
val chunk = Chunk.from(Range(current, current + size * step, step))
Emit.andMap(chunk)(ack => Loop.continue(current + step * size, ack))
end if
}
}
}

end Stream
70 changes: 70 additions & 0 deletions kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ class StreamTest extends Test:
Stream.init(Seq(1, 2, 3)).run.eval == Seq(1, 2, 3)
)
}

"lazy" in {
var i = 0
val _ = Stream.init {
i += 1
Seq.empty[Int]
}

assert(i == 0)
}

"chunk size" in {
def size(n: Int, c: Int): Chunk[Int] =
Var.runTuple(Chunk.empty[Int])(
Stream
.init(Seq.fill(n)(""), chunkSize = c)
.mapChunk(chunk => Var.update[Chunk[Int]](_.append(chunk.size)))
.runDiscard
).eval._1

assert(size(10240, 4096) == Chunk(4096, 4096, 2048))
assert(size(301, 100) == Chunk(100, 100, 100, 1))
assert(size(5, 0) == Chunk(1, 1, 1, 1, 1))
}
}

"initChunk" - {
Expand All @@ -42,6 +66,52 @@ class StreamTest extends Test:
}
}

"range" - {
"empty" in {
assert(Stream.range(0, 0).run.eval == Seq.empty)
}

"negative" in {
assert(Stream.range(0, -10).run.eval == (0 until -10))
assert(Stream.range(0, -10, step = -1).run.eval == (0 until -10 by -1))
}

"positive" in {
assert(Stream.range(0, 1024).run.eval == (0 until 1024))
}

"step" - {
"zero" in {
assert(Stream.range(0, 1024, 0).run.eval == Seq.empty)
}

"positive" in {
assert(Stream.range(0, 1024, 2).run.eval == (0 until 1024 by 2))
}

"negative" in {
assert(Stream.range(0, -10, -2).run.eval == (0 until -10 by -2))
}
}

"chunk size" in {
def size(n: Int, c: Int): Chunk[Int] =
Var.runTuple(Chunk.empty[Int])(
Stream
.range(0, n, chunkSize = c)
.mapChunk(chunk => Var.update[Chunk[Int]](_.append(chunk.size)))
.runDiscard
).eval._1

assert(size(10240, 4096) == Chunk(4096, 4096, 2048))
assert(size(301, 100) == Chunk(100, 100, 100, 1))
}

"stack safety" in {
assert(Stream.range(0, 1000000).take(5).run.eval == (0 until 5))
}
}

"take" - {
"zero" in {
assert(
Expand Down

0 comments on commit 11f2447

Please sign in to comment.