Skip to content

Commit 9fccedc

Browse files
author
Francesco Vasco
committed
Support for blocking calls inside coroutines (#79)
1 parent 3a78aa2 commit 9fccedc

File tree

4 files changed

+97
-1
lines changed

4 files changed

+97
-1
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

+19
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import java.util.concurrent.Executor
1920
import java.util.concurrent.locks.LockSupport
2021
import kotlin.coroutines.experimental.*
2122
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
@@ -24,6 +25,24 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2425

2526
// --------------- basic coroutine builders ---------------
2627

28+
/**
29+
* Suspend current coroutine and execute the [block] blocking code in [executor].
30+
* The current coroutine is resumed after [block] execution.
31+
*
32+
* @param executor the executor for blocking code
33+
* @param block the blocking code
34+
*/
35+
suspend fun <T> blocking(executor: Executor = DefaultBlockingExecutor, block: () -> T) =
36+
suspendCoroutine<T> { cont ->
37+
executor.execute {
38+
try {
39+
cont.resume(block())
40+
} catch (t: Throwable) {
41+
cont.resumeWithException(t)
42+
}
43+
}
44+
}
45+
2746
/**
2847
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
2948
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import java.util.concurrent.*
20+
21+
private const val DEFAULT_CORE_POOL_SIZE = 1
22+
private const val DEFAULT_KEEP_ALIVE = 60_000L // in milliseconds
23+
private const val DEFAULT_MAXIMUM_POOL_SIZE = 256
24+
25+
private val KEEP_ALIVE = try {
26+
java.lang.Long.getLong("kotlinx.coroutines.DefaultBlockingExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
27+
} catch (e: SecurityException) {
28+
DEFAULT_KEEP_ALIVE
29+
}
30+
31+
private val MAXIMUM_POOL_SIZE = try {
32+
java.lang.Integer.getInteger("kotlinx.coroutines.DefaultBlockingExecutor.maximumPoolSize", DEFAULT_MAXIMUM_POOL_SIZE)
33+
} catch (e: SecurityException) {
34+
DEFAULT_MAXIMUM_POOL_SIZE
35+
}
36+
37+
internal object DefaultBlockingExecutor : Executor by ThreadPoolExecutor(
38+
DEFAULT_CORE_POOL_SIZE,
39+
MAXIMUM_POOL_SIZE,
40+
KEEP_ALIVE, TimeUnit.MILLISECONDS,
41+
LinkedBlockingQueue<Runnable>(),
42+
ThreadFactory { Thread(it, "kotlinx.coroutines.DefaultBlockingExecutor").apply { isDaemon = true } }
43+
)

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

+16
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package kotlinx.coroutines.experimental
1919
import kotlinx.coroutines.experimental.selects.SelectBuilder
2020
import kotlinx.coroutines.experimental.selects.SelectInstance
2121
import kotlinx.coroutines.experimental.selects.select
22+
import java.util.concurrent.Executor
2223
import kotlin.coroutines.experimental.CoroutineContext
2324

2425
/**
@@ -160,6 +161,21 @@ public fun <T> async(
160161
public fun <T> async(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> T): Deferred<T> =
161162
async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)
162163

164+
/**
165+
* Execute the blocking code [block] in the [executor] without blocking current coroutine.
166+
* [start] policy is same as [async] method, but [CoroutineStart.UNDISPATCHED] is an invalid option
167+
*
168+
* @param executor the executor for blocking code
169+
* @param start start option, [CoroutineStart.UNDISPATCHED] is invalid
170+
* @param block the blocking code
171+
*/
172+
public fun <T> blockingAsync(executor: Executor = DefaultBlockingExecutor,
173+
start: CoroutineStart = CoroutineStart.DEFAULT,
174+
block: () -> T): Deferred<T> {
175+
require(start != CoroutineStart.UNDISPATCHED) { "Start blocking code undispatched is not supported" }
176+
return async(executor.asCoroutineDispatcher(), start) { block() }
177+
}
178+
163179
/**
164180
* @suppress **Deprecated**: `defer` was renamed to `async`.
165181
*/

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AsyncTest.kt

+19-1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,24 @@ class AsyncTest : TestBase() {
144144
override fun toString(): String = error("toString")
145145
}
146146

147+
148+
@Test
149+
fun blockingAsync() = runBlocking {
150+
val d = blockingAsync { 42 }
151+
check(d.await() == 42)
152+
}
153+
154+
@Test
155+
fun blockingAsyncLazy() = runBlocking {
156+
val d = blockingAsync(start = CoroutineStart.LAZY) { 42 }
157+
check(d.await() == 42)
158+
}
159+
160+
@Test(expected = IllegalArgumentException::class)
161+
fun blockingAsyncUndispatched() = runBlocking {
162+
blockingAsync(start = CoroutineStart.UNDISPATCHED) { 42 }
163+
}
164+
147165
@Test
148166
fun testDeferBadClass() = runBlocking {
149167
val bad = BadClass()
@@ -154,4 +172,4 @@ class AsyncTest : TestBase() {
154172
assertTrue(d.await() === bad)
155173
finish(2)
156174
}
157-
}
175+
}

0 commit comments

Comments
 (0)