Skip to content

Commit

Permalink
Implement outbound flow control (#42)
Browse files Browse the repository at this point in the history
* prevent outbound buffering with flow control

* add coroutine start param to launchProducerJob

* remove redundant call to disableAutoInboundFlowControl

* add assertion for deferred response completion in onComplete
  • Loading branch information
marcoferrer committed Apr 3, 2019
1 parent 59726e3 commit 202e1f4
Show file tree
Hide file tree
Showing 23 changed files with 445 additions and 1,068 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import jojo.bizarre.adventure.character.CharacterProto
import jojo.bizarre.adventure.stand.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.CoroutineContext

class StandServiceCoroutineImpl : StandServiceCoroutineGrpc.StandServiceImplBase(){

override val initialContext: CoroutineContext
get() = Dispatchers.Unconfined

override suspend fun getStandByName(
request: StandServiceProto.GetStandByNameRequest
): StandProto.Stand = coroutineScope {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ package com.github.marcoferrer.krotoplus.coroutines

import com.github.marcoferrer.krotoplus.coroutines.call.newProducerScope
import com.github.marcoferrer.krotoplus.coroutines.call.toRpcException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

Expand All @@ -42,9 +39,10 @@ import kotlin.coroutines.EmptyCoroutineContext
public fun <T> CoroutineScope.launchProducerJob(
channel: SendChannel<T>,
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend ProducerScope<T>.()->Unit
): Job =
launch(context) {
launch(context, start) {
newProducerScope(channel).block()
}.apply {
invokeOnCompletion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,65 +26,30 @@ import io.grpc.stub.StreamObserver
import io.grpc.ClientCall
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext


internal fun <RespT> CoroutineScope.newSendChannelFromObserver(
observer: StreamObserver<RespT>,
capacity: Int = 1
): SendChannel<RespT> =
actor<RespT>(
context = observer.exceptionHandler + Dispatchers.Unconfined,
capacity = capacity,
start = CoroutineStart.LAZY
) {
try {
consumeEach { observer.onNext(it) }
channel.close()
}catch (e:Throwable){
channel.close(e)
}
}.apply{
invokeOnClose(observer.completionHandler)
}


internal fun <ReqT, RespT> CoroutineScope.newManagedServerResponseChannel(
responseObserver: ServerCallStreamObserver<RespT>,
isMessagePreloaded: AtomicBoolean,
requestChannel: Channel<ReqT> = Channel(capacity = 1)
): SendChannel<RespT> {

val responseChannel = newSendChannelFromObserver(responseObserver)

responseObserver.enableManualFlowControl(requestChannel,isMessagePreloaded)

return responseChannel
}

internal fun CoroutineScope.bindToClientCancellation(observer: ServerCallStreamObserver<*>){
internal fun CoroutineScope.bindToClientCancellation(observer: ServerCallStreamObserver<*>) {
observer.setOnCancelHandler {
this@bindToClientCancellation.cancel()
}
}

internal fun CoroutineScope.bindScopeCancellationToCall(call: ClientCall<*, *>){
internal fun CoroutineScope.bindScopeCancellationToCall(call: ClientCall<*, *>) {

val job = coroutineContext[Job]
?: error("Unable to bind cancellation to call because scope does not have a job: $this")

job.apply {
invokeOnCompletion {
if(isCancelled){
call.cancel(it?.message,it?.cause ?: it)
if (isCancelled) {
call.cancel(it?.message, it?.cause ?: it)
}
}
}
}

internal fun StreamObserver<*>.completeSafely(error: Throwable? = null){
internal fun StreamObserver<*>.completeSafely(error: Throwable? = null) {
// If the call was cancelled already
// the stream observer will throw
kotlin.runCatching {
Expand All @@ -94,26 +59,24 @@ internal fun StreamObserver<*>.completeSafely(error: Throwable? = null){
}
}

internal val StreamObserver<*>.exceptionHandler: CoroutineExceptionHandler
get() = CoroutineExceptionHandler { _, e ->
completeSafely(e)
}

internal val StreamObserver<*>.completionHandler: CompletionHandler
get() = { completeSafely(it) }

internal fun Throwable.toRpcException(): Throwable =
when (this) {
is StatusException,
is StatusRuntimeException -> this
else -> {
val error = Status.fromThrowable(this)
.asRuntimeException(Status.trailersFromThrowable(this))

if(error.status.code == Status.Code.UNKNOWN && this is CancellationException)
val statusFromThrowable = Status.fromThrowable(this)
val status = if (
statusFromThrowable.code == Status.UNKNOWN.code &&
this is CancellationException
) {
Status.CANCELLED
.withDescription(this.message)
.asRuntimeException() else error
} else {
statusFromThrowable
}

status
.withDescription(this.message)
.asRuntimeException(Status.trailersFromThrowable(this))
}
}

Expand All @@ -130,43 +93,11 @@ internal fun newRpcScope(
methodDescriptor.getCoroutineName()
)

@ExperimentalCoroutinesApi
internal fun <T> CoroutineScope.newProducerScope(channel: SendChannel<T>): ProducerScope<T> =
object : ProducerScope<T>,
CoroutineScope by this,
SendChannel<T> by channel {

override val channel: SendChannel<T>
get() = channel
}

internal inline fun <T> StreamObserver<T>.handleUnaryRpc(block: ()->T){
try{
onNext(block())
onCompleted()
}catch (e: Throwable){
completeSafely(e)
}
}

internal inline fun <T> SendChannel<T>.handleStreamingRpc(block: (SendChannel<T>)->Unit){
try{
block(this)
close()
}catch (e: Throwable){
close(e.toRpcException())
}
}

internal inline fun <ReqT, RespT> handleBidiStreamingRpc(
requestChannel: ReceiveChannel<ReqT>,
responseChannel: SendChannel<RespT>,
block: (ReceiveChannel<ReqT>, SendChannel<RespT>) -> Unit
) {
try{
block(requestChannel,responseChannel)
responseChannel.close()
}catch (e:Throwable){
responseChannel.close(e.toRpcException())
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,59 @@ import io.grpc.stub.CallStreamObserver
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

internal interface FlowControlledObserver {

@ExperimentalCoroutinesApi
fun <T, T2> CoroutineScope.nextValueWithBackPressure(
value: T,
channel: Channel<T>,
callStreamObserver: CallStreamObserver<T2>,
isMessagePreloaded: AtomicBoolean
) {
try {
when {
!channel.isClosedForSend && channel.offer(value) -> callStreamObserver.request(1)

!channel.isClosedForSend -> {
// We are setting isMessagePreloaded to true to prevent the
// onReadyHandler from requesting a new message while we have
// a message preloaded.
isMessagePreloaded.set(true)

// Using [CoroutineStart.UNDISPATCHED] ensures that
// values are sent in the proper order (FIFO).
// This also prevents a race between [StreamObserver.onNext] and
// [StreamObserver.onComplete] by making sure all preloaded messages
// have been submitted before invoking [Channel.close]
launch(start = CoroutineStart.UNDISPATCHED) {
try {
channel.send(value)
callStreamObserver.request(1)

// Allow the onReadyHandler to begin requesting messages again.
isMessagePreloaded.set(false)
}catch (e: Throwable){
channel.close(e)
}
}
}
}
} catch (e: Throwable) {
channel.close(e)
}
}
}

@ExperimentalCoroutinesApi
internal fun <T, T2> CallStreamObserver<T>.enableManualFlowControl(
targetChannel: Channel<T2>,
isMessagePreloaded: AtomicBoolean
internal fun <T> CallStreamObserver<*>.applyInboundFlowControl(
targetChannel: Channel<T>,
transientInboundMessageCount: AtomicInteger
) {
disableAutoInboundFlowControl()
setOnReadyHandler {
if (
isReady &&
!targetChannel.isFull &&
!targetChannel.isClosedForSend &&
isMessagePreloaded.compareAndSet(false, true)
!targetChannel.isClosedForReceive &&
transientInboundMessageCount.get() == 0
) {
request(1)
}
}
}

internal fun <T> CoroutineScope.applyOutboundFlowControl(
streamObserver: CallStreamObserver<T>,
targetChannel: Channel<T>
){
val isOutboundJobRunning = AtomicBoolean()
val channelIterator = targetChannel.iterator()
streamObserver.setOnReadyHandler {
if(targetChannel.isClosedForReceive){
streamObserver.completeSafely()
}else if(
streamObserver.isReady &&
!targetChannel.isClosedForReceive &&
isOutboundJobRunning.compareAndSet(false, true)
){
launch(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
streamObserver.completeSafely(e)
targetChannel.close(e)
}) {
try{
while(
streamObserver.isReady &&
!targetChannel.isClosedForReceive &&
channelIterator.hasNext()
){
val value = channelIterator.next()
streamObserver.onNext(value)
}
if(targetChannel.isClosedForReceive){
streamObserver.onCompleted()
}
} finally {
isOutboundJobRunning.set(false)
}
}
}
}
}
Loading

0 comments on commit 202e1f4

Please sign in to comment.