Skip to content

Commit e429c6f

Browse files
committed
Introduce IO dispatcher to offload blocking I/O-intensive tasks
Fixes #79
1 parent 4b0379f commit e429c6f

File tree

5 files changed

+64
-13
lines changed

5 files changed

+64
-13
lines changed

common/kotlinx-coroutines-core-common/src/CoroutineDispatcher.kt

+10-7
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,19 @@ import kotlin.coroutines.experimental.*
1010
* Base class that shall be extended by all coroutine dispatcher implementations.
1111
*
1212
* The following standard implementations are provided by `kotlinx.coroutines`:
13+
*
14+
* * [DefaultDispatcher] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
15+
* is specified in their context. It is currently equal to [CommonPool] (subject to change in the future).
16+
* This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
17+
* * [CommonPool] -- schedules coroutine execution to a common pool of shared background threads designed
18+
* to be used for compute-intensive code.
19+
* * [IO] -- uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_
20+
* operations (like file I/O and blocking socket I/O).
1321
* * [Unconfined] -- starts coroutine execution in the current call-frame until the first suspension.
1422
* On first suspension the coroutine builder function returns.
15-
* The coroutine will resume in whatever thread that is used by the
23+
* The coroutine resumes in whatever thread that is used by the
1624
* corresponding suspending function, without confining it to any specific thread or pool.
17-
* This in an appropriate choice for IO-intensive coroutines that do not consume CPU resources.
18-
* * [DefaultDispatcher] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
19-
* is specified in their context. It is currently equal to [CommonPool] (subject to change).
20-
* * [CommonPool] -- immediately returns from the coroutine builder and schedules coroutine execution to
21-
* a common pool of shared background threads.
22-
* This is an appropriate choice for compute-intensive coroutines that consume a lot of CPU resources.
25+
* **Unconfined dispatcher should not be normally used in code**.
2326
* * Private thread pools can be created with [newSingleThreadContext] and [newFixedThreadPoolContext].
2427
* * An arbitrary [Executor][java.util.concurrent.Executor] can be converted to dispatcher with [asCoroutineDispatcher] extension function.
2528
*

core/kotlinx-coroutines-core/src/CoroutineContext.kt

+23-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import java.util.concurrent.atomic.*
1010
import kotlin.coroutines.experimental.*
1111

1212
/**
13-
* Name of the property that control coroutine debugging. See [newCoroutineContext].
13+
* Name of the property that controls coroutine debugging. See [newCoroutineContext].
1414
*/
1515
public const val DEBUG_PROPERTY_NAME = "kotlinx.coroutines.debug"
1616

@@ -56,14 +56,34 @@ internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_N
5656
}
5757

5858
/**
59-
* This is the default [CoroutineDispatcher] that is used by all standard builders like
59+
* The default [CoroutineDispatcher] that is used by all standard builders like
6060
* [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
6161
*
6262
* It is currently equal to [CommonPool], but the value is subject to change in the future.
63+
* You can set system property "`kotlinx.coroutines.scheduler`" (either no value or to the value of "`on`")
64+
* to use an experimental coroutine dispatcher that shares threads with [IO] dispatcher and thus can switch to
65+
* [IO] context without performing an actual thread context switch.
6366
*/
6467
@Suppress("PropertyName")
6568
public actual val DefaultDispatcher: CoroutineDispatcher =
66-
if (useCoroutinesScheduler) ExperimentalCoroutineDispatcher() else CommonPool
69+
if (useCoroutinesScheduler) BackgroundDispatcher else CommonPool
70+
71+
/**
72+
* Name of the property that defines the maximal number of threads that are used by [IO] coroutines dispatcher.
73+
*/
74+
public const val IO_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.io.parallelism"
75+
76+
/**
77+
* The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
78+
*
79+
* Additional threads in this pool are created and are shutdown on demand.
80+
* The number of threads used by this dispatcher is limited by the value of
81+
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
82+
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
83+
*/
84+
public val IO by lazy {
85+
BackgroundDispatcher.blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
86+
}
6787

6888
/**
6989
* Creates context for the new coroutine. It installs [DefaultDispatcher] when no other dispatcher nor

core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,16 @@ import kotlinx.coroutines.experimental.*
99
import java.util.concurrent.*
1010
import kotlin.coroutines.experimental.*
1111

12+
/**
13+
* Default instance of coroutine dispatcher for background coroutines (as opposed to UI coroutines).
14+
*/
15+
internal object BackgroundDispatcher : ExperimentalCoroutineDispatcher()
16+
1217
/**
1318
* @suppress **This is unstable API and it is subject to change.**
1419
*/
1520
// TODO make internal (and rename) after complete integration
16-
class ExperimentalCoroutineDispatcher(
21+
open class ExperimentalCoroutineDispatcher(
1722
private val corePoolSize: Int,
1823
private val maxPoolSize: Int,
1924
private val idleWorkerKeepAliveNs: Long
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
import kotlinx.coroutines.experimental.*
6+
import org.junit.Test
7+
import kotlin.test.*
8+
9+
class IODispatcherTest : TestBase() {
10+
@Test
11+
fun testWithIOContext() = runTest {
12+
// just a very basic test that is dispatcher works and indeed uses background thread
13+
val mainThread = Thread.currentThread()
14+
expect(1)
15+
withContext(IO) {
16+
expect(2)
17+
assertNotSame(mainThread, Thread.currentThread())
18+
}
19+
expect(3)
20+
assertSame(mainThread, Thread.currentThread())
21+
finish(4)
22+
}
23+
}

core/kotlinx-coroutines-core/test/TestBase.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,12 @@ public actual open class TestBase actual constructor() {
117117

118118
fun initPoolsBeforeTest() {
119119
CommonPool.usePrivatePool()
120-
if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).usePrivateScheduler()
120+
BackgroundDispatcher.usePrivateScheduler()
121121
}
122122

123123
fun shutdownPoolsAfterTest() {
124124
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
125-
if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).shutdown(SHUTDOWN_TIMEOUT)
125+
BackgroundDispatcher.shutdown(SHUTDOWN_TIMEOUT)
126126
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
127127
}
128128

0 commit comments

Comments
 (0)