Skip to content

Commit

Permalink
[core] introduce Async.memoize (#969)
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Jan 9, 2025
1 parent d6cc7cc commit 4fefcae
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 3 deletions.
48 changes: 48 additions & 0 deletions kyo-core/shared/src/main/scala/kyo/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,54 @@ object Async:
(s(0).asInstanceOf[A1], s(1).asInstanceOf[A2], s(2).asInstanceOf[A3], s(3).asInstanceOf[A4])
}

/** Creates a memoized version of a computation.
*
* Returns a function that will cache the result of the first execution and return the cached result for subsequent calls. During
* initialization, only one fiber will execute the computation while others wait for the result. If the first execution fails, the
* cache is cleared and the computation will be retried on the next invocation. Note that successful results are cached indefinitely,
* so use this for stable values that won't need refreshing.
*
* Unlike `Memo`, this memoization is optimized for performance and can be safely used in hot paths. If you're memoizing global
* initialization code or need more control over cache isolation, consider using `Memo` instead.
*
* WARNING: If the initial computation never completes (e.g., hangs indefinitely), all subsequent calls will be permanently blocked
* waiting for the result. Ensure the computation can complete in a reasonable time or introduce a timeout via `Async.timeout`.
*
* @param v
* The computation to memoize
* @return
* A function that returns the memoized computation result
*/
def memoize[A: Flat, S](v: A < S)(using Frame): (() => A < (S & Async)) < Async =
IO.Unsafe {
val ref = AtomicRef.Unsafe.init(Maybe.empty[Promise.Unsafe[Nothing, A]])
() =>
@tailrec def loop(): A < (S & Async) =
ref.get() match
case Present(v) => v.safe.get
case Absent =>
val promise = Promise.Unsafe.init[Nothing, A]()
if ref.compareAndSet(Absent, Present(promise)) then
Abort.run(v).map { r =>
IO.Unsafe {
if !r.isSuccess then
ref.set(Absent)
promise.completeDiscard(r)
Abort.get(r)
}
}.pipe(IO.ensure {
IO.Unsafe {
if !promise.done() then
ref.set(Absent)
}
})
else
loop()
end if
loop()

}

/** Converts a Future to an asynchronous computation.
*
* This method allows integration of existing Future-based code with Kyo's asynchronous system. It handles successful completion and
Expand Down
107 changes: 107 additions & 0 deletions kyo-core/shared/src/test/scala/kyo/AsyncTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1069,4 +1069,111 @@ class AsyncTest extends Test:
}
}

"memoize" - {
"caches successful results" in run {
for
counter <- AtomicInt.init(0)
memoized <- Async.memoize {
counter.incrementAndGet.map(_ => 42)
}
v1 <- memoized()
v2 <- memoized()
v3 <- memoized()
count <- counter.get
yield
assert(v1 == 42)
assert(v2 == 42)
assert(v3 == 42)
assert(count == 1)
}

"retries after failure" in run {
for
counter <- AtomicInt.init(0)
memoized <- Async.memoize {
counter.incrementAndGet.map { count =>
if count == 1 then throw new RuntimeException("First attempt fails")
else 42
}
}
r1 <- Abort.run(memoized())
v2 <- memoized()
v3 <- memoized()
count <- counter.get
yield
assert(r1.isPanic)
assert(v2 == 42)
assert(v3 == 42)
assert(count == 2)
}

"works with async operations" in run {
for
counter <- AtomicInt.init(0)
memoized <- Async.memoize {
for
_ <- Async.sleep(1.millis)
count <- counter.incrementAndGet
yield count
}
v1 <- memoized()
v2 <- memoized()
v3 <- memoized()
count <- counter.get
yield
assert(v1 == 1)
assert(v2 == 1)
assert(v3 == 1)
assert(count == 1)
}

"handles concurrent access" in run {
for
counter <- AtomicInt.init(0)
memoized <- Async.memoize {
for
_ <- Async.sleep(1.millis)
count <- counter.incrementAndGet
yield count
}
results <- Async.parallel(
memoized(),
memoized(),
memoized()
)
count <- counter.get
yield
assert(results._1 == 1)
assert(results._2 == 1)
assert(results._3 == 1)
assert(count == 1)
}

"handles interruption during initialization" in run {
for
counter <- AtomicInt.init(0)
started <- Latch.init(1)
sleeping <- Latch.init(1)
done <- Latch.init(1)
memoized <- Async.memoize {
IO.ensure(done.release) {
for
_ <- started.release
_ <- Async.sleep(50.millis)
count <- counter.incrementAndGet
yield count
}
}
fiber <- Async.run(memoized())
_ <- started.await
_ <- fiber.interrupt
_ <- done.await
v2 <- memoized()
count <- counter.get
yield
assert(v2 == 1)
assert(count == 1)
}
}

end AsyncTest
7 changes: 4 additions & 3 deletions kyo-prelude/shared/src/main/scala/kyo/Memo.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package kyo

/** Represents a memoization effect.
/** Represents a memoization effect for global value initialization.
*
* Memo is used to cache the results of expensive computations, allowing them to be reused without re-computation.
*
* This effect is primarily intended for initializing global values or caching results of infrequent, expensive operations. It is not
* recommended for use in hot paths or frequently executed code due to potential performance overhead.
* This effect is specifically designed for initializing global values or caching results of infrequent, expensive operations. For
* memoization in performance-sensitive code or hot paths, consider using `Async.memoize` or `kyo-cache` instead, which have lower
* overhead.
*/
opaque type Memo <: Var[Memo.Cache] = Var[Memo.Cache]

Expand Down

0 comments on commit 4fefcae

Please sign in to comment.