Skip to content

Commit 7ca7ffc

Browse files
Francesco Vascofvasco
Francesco Vasco
authored andcommitted
Support for blocking calls inside coroutines (#79)
1 parent d28cc87 commit 7ca7ffc

File tree

4 files changed

+73
-2
lines changed

4 files changed

+73
-2
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

+19
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,32 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import java.util.concurrent.Executor
1920
import java.util.concurrent.locks.LockSupport
2021
import kotlin.coroutines.experimental.*
2122
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
2223
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2324

2425
// --------------- basic coroutine builders ---------------
2526

27+
/**
28+
* Suspend current coroutine and execute the [block] blocking code in [executor].
29+
* The current coroutine is resumed after [block] execution.
30+
*
31+
* @param executor the executor for blocking code
32+
* @param block the blocking code
33+
*/
34+
suspend fun <T> blocking(executor: Executor = IOExecutor, block: () -> T) =
35+
suspendCoroutine<T> { cont ->
36+
executor.execute {
37+
try {
38+
cont.resume(block())
39+
} catch (t: Throwable) {
40+
cont.resumeWithException(t)
41+
}
42+
}
43+
}
44+
2645
/**
2746
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
2847
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

+17-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
2020
import kotlinx.coroutines.experimental.selects.SelectBuilder
2121
import kotlinx.coroutines.experimental.selects.SelectInstance
2222
import kotlinx.coroutines.experimental.selects.select
23+
import java.util.concurrent.Executor
2324
import kotlin.coroutines.experimental.CoroutineContext
24-
import kotlin.coroutines.experimental.startCoroutine
2525

2626
/**
2727
* Deferred value is a non-blocking cancellable future.
@@ -137,6 +137,22 @@ public fun <T> async(
137137
public fun <T> async(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> T): Deferred<T> =
138138
async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)
139139

140+
/**
141+
* Execute the blocking code [block] in the [executor] without blocking current coroutine.
142+
* [block] code is immediately scheduled for execution unless you set [start] as [CoroutineStart.LAZY], see [async] for more details.
143+
*
144+
* @param executor the executor for blocking code
145+
* @param start start option, [CoroutineStart.UNDISPATCHED] is invalid
146+
* @param block the blocking code
147+
* @see async
148+
*/
149+
public fun <T> blockingAsync(executor: Executor = IOExecutor,
150+
start: CoroutineStart = CoroutineStart.DEFAULT,
151+
block: () -> T): Deferred<T> {
152+
require(start != CoroutineStart.UNDISPATCHED) { "Start blocking code undispatched is not supported" }
153+
return async(executor.asCoroutineDispatcher(), start) { block() }
154+
}
155+
140156
/**
141157
* @suppress **Deprecated**: `defer` was renamed to `async`.
142158
*/

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt

+19
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,29 @@ package kotlinx.coroutines.experimental
1818

1919
import java.util.concurrent.Executor
2020
import java.util.concurrent.ScheduledExecutorService
21+
import java.util.concurrent.SynchronousQueue
22+
import java.util.concurrent.ThreadFactory
23+
import java.util.concurrent.ThreadPoolExecutor
2124
import java.util.concurrent.TimeUnit
2225
import kotlin.coroutines.experimental.Continuation
2326
import kotlin.coroutines.experimental.CoroutineContext
2427

28+
/**
29+
* Create a shared Executor for blocking method invocation (ie: I/O, synchronized block, lock, etc...).
30+
*
31+
* You can tune the pool size and the thread lifetime setting the properties
32+
* `kotlinx.coroutines.experimental.IOExecutor.maximumPoolSize` and
33+
* `kotlinx.coroutines.experimental.IOExecutor.keepAliveMillis` respectively.
34+
*/
35+
public val IOExecutor: Executor = ThreadPoolExecutor(
36+
0,
37+
System.getProperty("kotlinx.coroutines.experimental.IOExecutor.maximumPoolSize").toIntOrNull()
38+
?: Runtime.getRuntime().availableProcessors() * 64,
39+
System.getProperty("kotlinx.coroutines.experimental.IOExecutor.keepAliveMillis").toLongOrNull()
40+
?: 60_000L, TimeUnit.MILLISECONDS,
41+
SynchronousQueue<Runnable>(),
42+
ThreadFactory { r -> Thread(r, "IOExecutor-${Math.abs(System.identityHashCode(r)).toString(16)}") })
43+
2544
/**
2645
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
2746
* @suppress **Deprecated**: Renamed to [asCoroutineDispatcher].

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AsyncTest.kt

+18-1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,23 @@ class AsyncTest : TestBase() {
110110
override fun toString(): String = error("toString")
111111
}
112112

113+
@Test
114+
fun blockingAsync() = runBlocking {
115+
val d = blockingAsync { 42 }
116+
check(d.await() == 42)
117+
}
118+
119+
@Test
120+
fun blockingAsyncLazy() = runBlocking {
121+
val d = blockingAsync(start = CoroutineStart.LAZY) { 42 }
122+
check(d.await() == 42)
123+
}
124+
125+
@Test(expected = IllegalArgumentException::class)
126+
fun blockingAsyncUndispatched() = runBlocking {
127+
blockingAsync(start = CoroutineStart.UNDISPATCHED) { 42 }
128+
}
129+
113130
@Test
114131
fun testDeferBadClass() = runBlocking {
115132
val bad = BadClass()
@@ -120,4 +137,4 @@ class AsyncTest : TestBase() {
120137
assertTrue(d.await() === bad)
121138
finish(2)
122139
}
123-
}
140+
}

0 commit comments

Comments
 (0)