Skip to content

Commit

Permalink
Allow "unsafe" creation of a Queue (zio#8871)
Browse files Browse the repository at this point in the history
* Allow creating a `Queue` unsafely

* Require a fiberId

* Make `Queue.unsafe` object public
  • Loading branch information
kyri-petrou authored May 19, 2024
1 parent 665e0be commit 08d157c
Showing 1 changed file with 35 additions and 19 deletions.
54 changes: 35 additions & 19 deletions core/shared/src/main/scala/zio/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object Queue {
* `UIO[Queue[A]]`
*/
def bounded[A](requestedCapacity: => Int)(implicit trace: Trace): UIO[Queue[A]] =
ZIO.succeed(MutableConcurrentQueue.bounded[A](requestedCapacity)).flatMap(createQueue(_, Strategy.BackPressure()))
ZIO.fiberId.map(unsafe.bounded(requestedCapacity, _)(Unsafe.unsafe))

/**
* Makes a new bounded queue with the dropping strategy. When the capacity of
Expand All @@ -79,7 +79,7 @@ object Queue {
* `UIO[Queue[A]]`
*/
def dropping[A](requestedCapacity: => Int)(implicit trace: Trace): UIO[Queue[A]] =
ZIO.succeed(MutableConcurrentQueue.bounded[A](requestedCapacity)).flatMap(createQueue(_, Strategy.Dropping()))
ZIO.fiberId.map(unsafe.dropping(requestedCapacity, _)(Unsafe.unsafe))

/**
* Makes a new bounded queue with sliding strategy. When the capacity of the
Expand All @@ -99,7 +99,7 @@ object Queue {
* `UIO[Queue[A]]`
*/
def sliding[A](requestedCapacity: => Int)(implicit trace: Trace): UIO[Queue[A]] =
ZIO.succeed(MutableConcurrentQueue.bounded[A](requestedCapacity)).flatMap(createQueue(_, Strategy.Sliding()))
ZIO.fiberId.map(unsafe.sliding(requestedCapacity, _)(Unsafe.unsafe))

/**
* Makes a new unbounded queue.
Expand All @@ -110,22 +110,38 @@ object Queue {
* `UIO[Queue[A]]`
*/
def unbounded[A](implicit trace: Trace): UIO[Queue[A]] =
ZIO.succeed(MutableConcurrentQueue.unbounded[A]).flatMap(createQueue(_, Strategy.Dropping()))

private def createQueue[A](queue: MutableConcurrentQueue[A], strategy: Strategy[A])(implicit
trace: Trace
): UIO[Queue[A]] =
Promise
.make[Nothing, Unit]
.map(p =>
unsafeCreate(
queue,
MutableConcurrentQueue.unbounded[Promise[Nothing, A]],
p,
new AtomicBoolean(false),
strategy
)
)
ZIO.fiberId.map(unsafe.unbounded(_)(Unsafe.unsafe))

object unsafe {

def bounded[A](requestedCapacity: Int, fiberId: FiberId)(implicit unsafe: Unsafe): Queue[A] =
createQueue(MutableConcurrentQueue.bounded[A](requestedCapacity), Strategy.BackPressure(), fiberId)

def dropping[A](requestedCapacity: Int, fiberId: FiberId)(implicit unsafe: Unsafe): Queue[A] =
createQueue(MutableConcurrentQueue.bounded[A](requestedCapacity), Strategy.Dropping(), fiberId)

def sliding[A](requestedCapacity: Int, fiberId: FiberId)(implicit unsafe: Unsafe): Queue[A] =
createQueue(MutableConcurrentQueue.bounded[A](requestedCapacity), Strategy.Sliding(), fiberId)

def unbounded[A](fiberId: FiberId)(implicit unsafe: Unsafe): Queue[A] =
createQueue(MutableConcurrentQueue.unbounded[A], Strategy.Dropping(), fiberId)

}

private def createQueue[A](
queue: MutableConcurrentQueue[A],
strategy: Strategy[A],
fiberId: FiberId
)(implicit unsafe: Unsafe): Queue[A] = {
val p = Promise.unsafe.make[Nothing, Unit](fiberId)
unsafeCreate(
queue,
MutableConcurrentQueue.unbounded[Promise[Nothing, A]],
p,
new AtomicBoolean(false),
strategy
)
}

private def unsafeCreate[A](
queue: MutableConcurrentQueue[A],
Expand Down

0 comments on commit 08d157c

Please sign in to comment.