Skip to content

Commit

Permalink
[kernel] unified Isolate mechanism (#1077)
Browse files Browse the repository at this point in the history
### Problem

Kyo provides two similar mechanisms to isolate the execution of
computations while forking fibers and other cases that require isolation
like `STM`. `Boundary` is in the kernel and provides isolation for
`ContextEffect` and `Isolate` is designed in prelude to enable forking
with effects that handle state via `ArrowEffect`. The current approach
has a few major issues:

- Two separate mechanisms make it more difficult to understand the their
purpose.
- `Boundary` has limitations with nested invocations like
`Async.run(Async.run(..))`, which we work around by providing a
user-facing `inline`d method + internal implementation deriving the
`Boundary`.
- The use of inlined user-facing methods and the need to support both
`Boundary` and `Isolate` in different methods introduces significant
code complexity, with `Async` being the most symptomatic scenario.
- I hadn't noticed before working on this change but the `Boundary`
macro was failing to disallow forking with composite effects defined via
`opaque type`s like `Parse`. It allows forking in those cases, which
typically makes the computation hang indefinitely due to the missing
handlers.
- `Isolate` requires "manual" use without derivation, which increases
the complexity to users.

### Solution

This PR introduces a new `Isolate` implementation in `kyo-kernel`. It
still provides two different isolation mechanisms. `Boundary` becomes
`Isolate.Contextual` and `Isolate` becomes `Isolate.Stateful`. Important
aspects:

- The two versions are still necessary because of methods like
`Async.run` and `Fiber.*` can't perform the necessary transformations to
isolate and later restore state due to the expectation that the forked
`Fiber`s with these methods will run to completion and not suspend with
some effect. This expectation isn't present in other methods like
`Async.parallel`, which uses `Isolate.Stateful` by wiring the necessary
transformations and the final continuation to restore the necessary
state.
- Automatic derivation is now provided for both cases but with an
important restriction: it's only able to derive for intersections and
requires each individual effect to provide their own evidence. This
makes it so `opaque type` effects like `Parse` can choose to support
isolation or not, even if all the effects they use provide isolation.
This approach is more sensible since the ability to provide isolation is
a characteristic of the specific effect. For example, running
computations in parallel with `Parse` doesn't make much sense since the
parsing consumes from a continuous stream of text that couldn't be
supported with parallel execution.
- Both `Contextual` and `Stateful` now provide a second type parameter
`Passthrough`. With this approach, `opaque type` effects like `Topic`
can isolate specific effects and leave other effects like `Async` and
`Abort` pending. The type parameter is contravariant so the evidences
can be used in contexts where a super set of `Passthrough` effects is
used.
- I've added implicit evidences only to effects that can provide
isolation without ambiguity. For example, `Var` and `Emit` don't offer
evidences because, depending on the specific use, values may need to be
merged when restoring the state. In those cases, I still kept the
`isolate` companion object like `Var.isolate.update[Int]`.
- The methods of `Stateful` were updated in comparison to the original
`Isolate`. There's a new `use` method designed to introduce an isolate
as an implicit evidence in a scope like
`Var.isolate.update[Int].use(Async.parallel(..))` and the isolation step
methods were renamed to `capture`/`isolate`/`restore`. There's also a
new `Transform` type that enables effects to have more control over the
packing and unpacking of state during isolation. For example,
`Stateful.noop` is able to avoid a tuple allocation now by using `type
Transform[A] = A`.
- Effects that short circuit would be problematic with the automatic
derivation since the ordering of the effect handling could change the
semantics of execution. For this reason `Abort` and `Choice` don't
provide isolates although it'd be possible to implement `Stateful` for
them with the new `Transform` mechanism. I've also added a warning
regarding this to the top scaladoc in `Isolate`.
- The issue with nested isolation like in `Async.run(Async.run(..))` is
worked around in this new encoding by taking the isolate evidence as the
first parameter. With this approach, the compiler is able to infer the
correct types. This has an important limitation: if there's an isolate
instance in the immediate scope of the method call, it can be
automatically selected even if it doesn't make sense. This shouldn't be
a problem in user code unless they're defining new isolates, which is an
advanced usage, but it's something we need to keep in mind when working
on Kyo's effects. For example, I had to add explicit type parameters in
`Resource` due to this aspect.

### Notes

- This PR addresses one of the two main issues we have to fix before
1.0. The other main one is the limitations of `Tag` and effect handling
with variance.
- I'll follow up with another PR changing the `Async` effect to provide
methods that resemble the collection operations in `Kyo.*`. It's
something I've been wanting to do for some time but I wanted to address
the boundary/isolate issue first.
- Adding support for forking with algebraic effects is actually an open
problem in the field. OCaml's algebraic effects for example don't
support proper parallelism with effects and provide only concurrency.
Haskell libraries don't seem to provide async primitives like Kyo's, and
Unison only allows a pending `IO` when forking. It seems Kyo is the
first solution to encode this behavior.

---------

Co-authored-by: Adam Hearn <22334119+hearnadam@users.noreply.github.com>
  • Loading branch information
fwbrasil and hearnadam authored Mar 2, 2025
1 parent c060b41 commit b083b22
Show file tree
Hide file tree
Showing 40 changed files with 1,191 additions and 1,610 deletions.
9 changes: 6 additions & 3 deletions kyo-aeron/jvm/src/main/scala/kyo/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import upickle.default.*
* @see
* [[https://github.com/com-lihaoyi/upickle]] for documentation on serialization.
*/
opaque type Topic <: (Async & Env[Aeron]) = Async & Env[Aeron]
opaque type Topic <: Env[Aeron] = Env[Aeron]

object Topic:

Expand Down Expand Up @@ -130,7 +130,7 @@ object Topic:
def publish[A: ReadWriter](
aeronUri: String,
retrySchedule: Schedule = defaultRetrySchedule
)[S](stream: Stream[A, S])(using frame: Frame, tag: Tag[A]): Unit < (Topic & S & Abort[Closed | Backpressured]) =
)[S](stream: Stream[A, S])(using frame: Frame, tag: Tag[A]): Unit < (Topic & S & Abort[Closed | Backpressured] & Async) =
Env.use[Aeron] { aeron =>
IO {
// register the publication with Aeron using type's hash as stream ID
Expand Down Expand Up @@ -200,7 +200,7 @@ object Topic:
def stream[A: ReadWriter](
aeronUri: String,
retrySchedule: Schedule = defaultRetrySchedule
)(using tag: Tag[A], frame: Frame): Stream[A, Topic & Abort[Backpressured]] =
)(using tag: Tag[A], frame: Frame): Stream[A, Topic & Abort[Backpressured] & Async] =
Stream {
Env.use[Aeron] { aeron =>
IO {
Expand Down Expand Up @@ -260,4 +260,7 @@ object Topic:
}
}
end stream

given isolate: Isolate.Contextual[Topic, Any] = Isolate.Contextual[Topic, Any]

end Topic
5 changes: 3 additions & 2 deletions kyo-aeron/jvm/src/test/scala/kyo/TopicTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ class TopicTest extends Test:
val messages = Seq(Message(1), Message(2), Message(3))
Topic.run {
for
started <- Latch.init(1)
fiber <- Async.run(started.release.andThen(Topic.stream[Message](uri).take(messages.size).run))
started <- Latch.init(1)
fiber <-
Async.run(using Topic.isolate)(started.release.andThen(Topic.stream[Message](uri).take(messages.size).run))
_ <- started.await
_ <- Async.run(Topic.publish(uri)(Stream.init(messages)))
received <- fiber.get
Expand Down
3 changes: 3 additions & 0 deletions kyo-caliban/src/main/scala/kyo/Resolvers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,7 @@ object Resolvers:
}
)

given isolate: Isolate.Contextual[Resolvers, Abort[CalibanError] & Async] =
Isolate.Contextual.derive[Resolvers, Abort[CalibanError] & Async]

end Resolvers
69 changes: 22 additions & 47 deletions kyo-combinators/shared/src/main/scala/kyo/Combinators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kyo

import kyo.debug.Debug
import kyo.kernel.ArrowEffect
import kyo.kernel.Boundary
import scala.annotation.tailrec
import scala.annotation.targetName
import scala.util.NotGiven
Expand Down Expand Up @@ -989,7 +988,7 @@ extension [A, E, S](fiber: Fiber[E, A] < S)
fiber.map(_.getResult.unit)
end extension

extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
extension [A, E, S](effect: A < (Abort[E] & Async & S))

/** Performs this computation and then the next one in parallel, discarding the result of this computation.
*
Expand All @@ -999,29 +998,21 @@ extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
* A computation that produces the result of `next`
*/
@targetName("zipRightPar")
inline def &>[A1, E1, Ctx1](next: A1 < (Abort[E1] & Async & Ctx1))(
def &>[A1, E1, S1](
using
f: Flat[A],
f1: Flat[A1],
r: Reducible[Abort[E]],
r1: Reducible[Abort[E1]],
fr: Frame
): A1 < (r.SReduced & r1.SReduced & Async & Ctx & Ctx1) =
_zipRightPar(next)

private def _zipRightPar[A1, E1, Ctx1](next: A1 < (Abort[E1] & Async & Ctx1))(
s: Isolate.Contextual[S, IO],
s2: Isolate.Contextual[S1, IO]
)(next: A1 < (Abort[E1] & Async & S1))(
using
f: Flat[A],
f1: Flat[A1],
b: Boundary[Ctx, IO & Abort[E]],
b1: Boundary[Ctx1, IO & Abort[E1]],
r: Reducible[Abort[E]],
r1: Reducible[Abort[E1]],
fr: Frame
): A1 < (r.SReduced & r1.SReduced & Async & Ctx & Ctx1) =
): A1 < (r.SReduced & r1.SReduced & Async & S & S1) =
for
fiberA <- Async._run(effect)
fiberA1 <- Async._run(next)
fiberA <- Async.run(using s)(effect)
fiberA1 <- Async.run(using s2)(next)
_ <- fiberA.awaitCompletion
a1 <- fiberA1.join
yield a1
Expand All @@ -1034,29 +1025,21 @@ extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
* A computation that produces the result of this computation
*/
@targetName("zipLeftPar")
inline def <&[A1, E1, Ctx1](next: A1 < (Abort[E1] & Async & Ctx1))(
def <&[A1, E1, S1](next: A1 < (Abort[E1] & Async & S1))(
using
f: Flat[A],
f1: Flat[A1],
r: Reducible[Abort[E]],
r1: Reducible[Abort[E1]],
fr: Frame
): A < (r.SReduced & r1.SReduced & Async & Ctx & Ctx1) =
_zipLeftPar(next)

private def _zipLeftPar[A1, E1, Ctx1](next: A1 < (Abort[E1] & Async & Ctx1))(
s: Isolate.Contextual[S, IO],
s2: Isolate.Contextual[S1, IO]
)(
using
f: Flat[A],
f1: Flat[A1],
b: Boundary[Ctx, IO & Abort[E]],
b1: Boundary[Ctx1, IO & Abort[E1]],
r: Reducible[Abort[E]],
r1: Reducible[Abort[E1]],
fr: Frame
): A < (r.SReduced & r1.SReduced & Async & Ctx & Ctx1) =
): A < (r.SReduced & r1.SReduced & Async & S & S1) =
for
fiberA <- Async._run(effect)
fiberA1 <- Async._run(next)
fiberA <- Async.run(using s)(effect)
fiberA1 <- Async.run(using s2)(next)
a <- fiberA.join
_ <- fiberA1.awaitCompletion
yield a
Expand All @@ -1069,29 +1052,21 @@ extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
* A computation that produces a tuple of both results
*/
@targetName("zipPar")
inline def <&>[A1, E1, Ctx1](next: A1 < (Abort[E1] & Async & Ctx1))(
def <&>[A1, E1, S1](
using
f: Flat[A],
f1: Flat[A1],
r: Reducible[Abort[E]],
r1: Reducible[Abort[E1]],
fr: Frame
): (A, A1) < (r.SReduced & r1.SReduced & Async & Ctx & Ctx1) =
_zipPar(next)

private def _zipPar[A1, E1, Ctx1](next: A1 < (Abort[E1] & Async & Ctx1))(
s: Isolate.Contextual[S, IO],
s2: Isolate.Contextual[S1, IO]
)(next: A1 < (Abort[E1] & Async & S1))(
using
f: Flat[A],
f1: Flat[A1],
b: Boundary[Ctx, IO & Abort[E]],
b1: Boundary[Ctx1, IO & Abort[E1]],
r: Reducible[Abort[E]],
r1: Reducible[Abort[E1]],
fr: Frame
): (A, A1) < (r.SReduced & r1.SReduced & Async & Ctx & Ctx1) =
): (A, A1) < (r.SReduced & r1.SReduced & Async & S & S1) =
for
fiberA <- Async._run(effect)
fiberA1 <- Async._run(next)
fiberA <- Async.run(using s)(effect)
fiberA1 <- Async.run(using s2)(next)
a <- fiberA.join
a1 <- fiberA1.join
yield (a, a1)
Expand Down
Loading

0 comments on commit b083b22

Please sign in to comment.