Skip to content

Commit

Permalink
Bug fixes and api updates (#48)
Browse files Browse the repository at this point in the history
* reduce visibility of FlowControlledInboundStreamObserver to internal

* default server rpc method invocation to CoroutineStart.ATOMIC

* revise coroutine context call option builders

* add tests for atomic server method invocation

* remove message propagation from rpc exception mapper

* add tests for new stub context extensions

* refactor atomic server tests
  • Loading branch information
marcoferrer authored Jun 18, 2019
1 parent b6b5d94 commit 1b9bd38
Show file tree
Hide file tree
Showing 19 changed files with 457 additions and 96 deletions.
9 changes: 7 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
language: java
jdk:
- oraclejdk8
dist: trusty
sudo: false
addons:
apt:
Expand All @@ -16,7 +17,11 @@ before_cache:
- rm -rf $HOME/.gradle/caches/*/fileHashes/
jobs:
include:
- stage: "Tests"
script: ./gradlew build && cd example-project && ./gradlew test
- name: Gradle Check
install:
- ./gradlew assemble
script:
- ./gradlew check
- cd example-project && ./gradlew test
after_success:
- bash <(curl -s https://codecov.io/bash)
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Version 0.3.1
## Version 0.4.0
_\*\*-\*\*\-\*\*_
* New: Update to kotlin `1.3.31`
* New: Update to kotlin Coroutines `1.2.1`
Expand All @@ -9,7 +9,12 @@ _\*\*-\*\*\-\*\*_
* Fix: Address bug in parallelization of generator execution

#### Coroutines
* Fix: Disable auto flow control for inbound client and server streams during bidi calls
* New: Default server method execution to `CoroutineStart.ATOMIC`
* New: Introduce abstract stub ext for concatenating coroutine contexts, `AbstactStub.plusContext`.
* Fix: Don't propagate message for `UNKNOWN` exceptions in rpc exception mapper
* Fix: Disable auto flow control for inbound client and server streams during bidi calls
* Fix: Reduce visibility of `FlowControlledInboundStreamObserver` to `internal`
* Deprecated: `AbstractStub.coroutineContext` ext in favor of `AbstractStub.context`


## Version 0.3.0
Expand Down
1 change: 1 addition & 0 deletions kroto-plus-coroutines/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
testImplementation project(':test-api:grpc')
testImplementation project(':test-api:java')
testImplementation "io.mockk:mockk:${Versions.mockk}"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${Versions.coroutines}"
}

tasks.withType(JavaCompile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,33 @@ public val CALL_OPTION_COROUTINE_CONTEXT: CallOptions.Key<CoroutineContext> =
/**
* Get the coroutineContext the receiving stub is using for cooperative cancellation.
*/
@Deprecated("Use extension property context instead", ReplaceWith("context"))
public val <T : AbstractStub<T>> T.coroutineContext: CoroutineContext
get() = callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT)


public val <T : AbstractStub<T>> T.context: CoroutineContext
get() = callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT)

/**
* Returns a new stub with the value of [coroutineContext] attached as a [CallOptions].
* Any rpcs invoked on the resulting stub will use this context to participate in cooperative cancellation.
*/
public fun <T : AbstractStub<T>> T.withCoroutineContext(context: CoroutineContext): T{
val newContext = this.coroutineContext + context
return this.withOption(CALL_OPTION_COROUTINE_CONTEXT, newContext)
}
public fun <T : AbstractStub<T>> T.withCoroutineContext(context: CoroutineContext): T =
withOption(CALL_OPTION_COROUTINE_CONTEXT, context)

public fun <T : AbstractStub<T>> T.plusCoroutineContext(context: CoroutineContext): T =
withOption(CALL_OPTION_COROUTINE_CONTEXT, this.context + context)

/**
* Returns a new stub with the 'coroutineContext' from the current suspension attached as a [CallOptions].
* Any rpcs invoked on the resulting stub will use this context to participate in cooperative cancellation.
*/
public suspend fun <T : AbstractStub<T>> T.withCoroutineContext(): T {
val newContext = this.coroutineContext + kotlin.coroutines.coroutineContext
return this.withOption(CALL_OPTION_COROUTINE_CONTEXT, newContext)
}
public suspend fun <T : AbstractStub<T>> T.withCoroutineContext(): T =
withOption(CALL_OPTION_COROUTINE_CONTEXT, kotlin.coroutines.coroutineContext)

public suspend fun <T : AbstractStub<T>> T.plusCoroutineContext(): T =
withOption(CALL_OPTION_COROUTINE_CONTEXT, context + kotlin.coroutines.coroutineContext)

internal fun CallOptions.withCoroutineContext(coroutineContext: CoroutineContext): CallOptions =
this.withOption(CALL_OPTION_COROUTINE_CONTEXT, coroutineContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ internal fun Throwable.toRpcException(): Throwable =
statusFromThrowable
}

status
.withDescription(this.message)
.asRuntimeException(Status.trailersFromThrowable(this))
status.asRuntimeException(Status.trailersFromThrowable(this))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

interface FlowControlledInboundStreamObserver<T> : StreamObserver<T>, CoroutineScope {
internal interface FlowControlledInboundStreamObserver<T> : StreamObserver<T>, CoroutineScope {

val inboundChannel: Channel<T>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.grpc.StatusRuntimeException
import io.grpc.stub.ServerCallStreamObserver
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
Expand All @@ -45,7 +46,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallUnary(
) {
with(newRpcScope(initialContext, methodDescriptor)) rpcScope@ {
bindToClientCancellation(responseObserver as ServerCallStreamObserver<*>)
launch {
launch(start = CoroutineStart.ATOMIC) {
try{
responseObserver.onNext(block())
responseObserver.onCompleted()
Expand All @@ -66,7 +67,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallServerStreaming(
with(newRpcScope(initialContext, methodDescriptor)) {
bindToClientCancellation(serverCallObserver)
applyOutboundFlowControl(serverCallObserver,responseChannel)
launch {
launch(start = CoroutineStart.ATOMIC) {
try{
block(responseChannel)
responseChannel.close()
Expand Down Expand Up @@ -112,7 +113,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallClientStreaming(
}
)

launch {
launch(start = CoroutineStart.ATOMIC) {
try {
responseObserver.onNext(block(requestChannel))
responseObserver.onCompleted()
Expand Down Expand Up @@ -162,7 +163,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallBidiStreaming(
}
)

launch {
launch(start = CoroutineStart.ATOMIC) {
serverCallObserver.request(1)
try {
block(requestChannel, responseChannel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package com.github.marcoferrer.krotoplus.coroutines
import io.grpc.CallOptions
import io.grpc.Channel
import io.mockk.mockk
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.Test
import kotlin.test.assertEquals
Expand All @@ -29,43 +31,87 @@ import kotlin.test.assertNotEquals
class CallOptionsTest {

@Test
fun `Test coroutine context call option defaults to EmptyCoroutineContext`(){
fun `Coroutine context call option defaults to EmptyCoroutineContext`(){
assertEquals(EmptyCoroutineContext, CallOptions.DEFAULT.getOption(CALL_OPTION_COROUTINE_CONTEXT))
}

@Test
fun `Test stub coroutineContext is populated via call option value`(){
fun `Stub coroutineContext is populated via call option value`(){
val channel = mockk<Channel>()
val stub = TestStub(channel)
assertEquals(EmptyCoroutineContext, stub.coroutineContext)
assertEquals(EmptyCoroutineContext, stub.context)
}

@Test
fun `Test attaching coroutineContext to stub explicitly`(){
fun `Attaching coroutineContext to stub explicitly`(){
val channel = mockk<Channel>()
val stub = TestStub(channel)
assertEquals(EmptyCoroutineContext, stub.coroutineContext)
assertEquals(
Dispatchers.Default,
stub.withCoroutineContext(Dispatchers.Default).coroutineContext
)

assertEquals(EmptyCoroutineContext, stub.context)
assertEquals(
Dispatchers.Default,
stub.withCoroutineContext(Dispatchers.Default).context
)
}

@Test
fun `Test attaching coroutineContext to stub implicitly`(){
fun `Attaching coroutineContext to stub implicitly`(){
val channel = mockk<Channel>()
val stub = TestStub(channel)
assertEquals(EmptyCoroutineContext, stub.coroutineContext)
assertEquals(EmptyCoroutineContext, stub.context)

runBlocking {
val newStub = stub.withCoroutineContext()

assertEquals(coroutineContext, newStub.coroutineContext)
assertEquals(coroutineContext, newStub.context)

assertNotEquals(stub.coroutineContext, newStub.coroutineContext)
assertNotEquals(stub.context, newStub.context)
}
}

@Test
fun `Test attaching coroutineContext to call options explicitly`(){
fun `Merging scope context with stub context implicitly`(){
val channel = mockk<Channel>()

val coroutineName = CoroutineName("testing")
val stub = TestStub(channel)
.withCoroutineContext(coroutineName)

runBlocking(Dispatchers.IO) {
val newStub = stub.plusCoroutineContext()

assertEquals(coroutineName.name,newStub.context[CoroutineName]?.name)
assertEquals(Dispatchers.IO,newStub.context[ContinuationInterceptor])
assertNotEquals(stub.context, newStub.context)
}
}

@Test
fun `Merging context with stub context explicitly`(){
val channel = mockk<Channel>()

val coroutineName = CoroutineName("testing")
val stub = TestStub(channel)
.withCoroutineContext(coroutineName)

val newStub = stub.plusCoroutineContext(Dispatchers.IO)

assertEquals(coroutineName.name,newStub.context[CoroutineName]?.name)
assertEquals(Dispatchers.IO,newStub.context[ContinuationInterceptor])
assertNotEquals(stub.context, newStub.context)
}

@Test
fun `Attaching coroutineContext to call options explicitly`(){
val callOptions = CallOptions.DEFAULT.withCoroutineContext(Dispatchers.Default)
assertEquals(
Dispatchers.Default,
Expand All @@ -74,7 +120,7 @@ class CallOptionsTest {
}

@Test
fun `Test attaching coroutineContext to call options implicitly`() = runBlocking {
fun `Attaching coroutineContext to call options implicitly`() = runBlocking {
val callOptions = CallOptions.DEFAULT.withCoroutineContext()
assertEquals(
coroutineContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,30 @@ package com.github.marcoferrer.krotoplus.coroutines.client

import com.github.marcoferrer.krotoplus.coroutines.utils.assertFails
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus
import com.github.marcoferrer.krotoplus.coroutines.utils.matchStatus
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
import io.grpc.*
import io.grpc.CallOptions
import io.grpc.ClientCall
import io.grpc.Status
import io.grpc.examples.helloworld.GreeterGrpc
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
import io.grpc.stub.StreamObserver
import io.grpc.testing.GrpcServerRule
import io.mockk.*
import kotlinx.coroutines.*
import io.mockk.every
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.map
import kotlinx.coroutines.channels.toList
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Rule
import org.junit.Test
import kotlin.test.BeforeTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,26 @@ package com.github.marcoferrer.krotoplus.coroutines.client
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFails
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
import io.grpc.*
import io.grpc.CallOptions
import io.grpc.ClientCall
import io.grpc.Status
import io.grpc.examples.helloworld.GreeterGrpc
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
import io.grpc.stub.StreamObserver
import io.grpc.testing.GrpcServerRule
import io.mockk.*
import kotlinx.coroutines.*
import io.mockk.every
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Rule
import org.junit.Test
import kotlin.test.BeforeTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,30 @@ package com.github.marcoferrer.krotoplus.coroutines.client
import com.github.marcoferrer.krotoplus.coroutines.CALL_OPTION_COROUTINE_CONTEXT
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
import io.grpc.*
import io.grpc.CallOptions
import io.grpc.Channel
import io.grpc.ClientCall
import io.grpc.ClientInterceptor
import io.grpc.MethodDescriptor
import io.grpc.Status
import io.grpc.examples.helloworld.GreeterGrpc
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
import io.grpc.stub.StreamObserver
import io.grpc.testing.GrpcServerRule
import io.mockk.*
import kotlinx.coroutines.*
import io.mockk.Runs
import io.mockk.every
import io.mockk.just
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Rule
import org.junit.Test
import kotlin.test.BeforeTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@ package com.github.marcoferrer.krotoplus.coroutines.client

import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
import io.grpc.*
import io.grpc.CallOptions
import io.grpc.ClientCall
import io.grpc.Status
import io.grpc.examples.helloworld.GreeterGrpc
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
import io.grpc.stub.StreamObserver
import io.grpc.testing.GrpcServerRule
import io.mockk.*
import kotlinx.coroutines.*
import io.mockk.Runs
import io.mockk.every
import io.mockk.just
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Rule
import org.junit.Test
import kotlin.test.BeforeTest
Expand Down
Loading

0 comments on commit 1b9bd38

Please sign in to comment.