Skip to content

Commit

Permalink
Reintroduce the initial proposal, better being safe than sorry
Browse files Browse the repository at this point in the history
  • Loading branch information
pgreze committed Aug 4, 2021
1 parent 39e153d commit 246c46d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 21 deletions.
9 changes: 7 additions & 2 deletions src/main/kotlin/com/github/pgreze/process/Process.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
Expand Down Expand Up @@ -72,8 +73,12 @@ suspend fun process(
stderr == Redirect.CAPTURE ->
process.errorStream
else -> null
}?.lineFlow { f -> f.map { it.also { consumer(it) } }.toList() }
?: emptyList()
}?.lineFlow { f ->
f.map {
yield()
it.also { consumer(it) }
}.toList()
} ?: emptyList()
}

val input = async {
Expand Down
45 changes: 26 additions & 19 deletions src/test/kotlin/com/github/pgreze/process/ProcessKtTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.io.ByteArrayOutputStream
import java.io.File
import java.io.InputStream
import java.io.PrintStream
import java.nio.file.Path
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -166,27 +165,35 @@ class ProcessKtTest {
stdout shouldBeEqualTo OUT.toList()
}

@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
fun `job cancellation should destroy the process`() = runSuspendTest {
var visitedCancelledBlock = false
val job = launch(Dispatchers.IO) {
try {
val ret = process("cat") // cat without args is an endless process.
throw AssertionError("Process completed despite being cancelled: $ret")
} catch (e: CancellationException) {
visitedCancelledBlock = true
@Nested
@DisplayName("print to console or not")
inner class Cancellation {
@ParameterizedTest
@ValueSource(booleans = [true, false])
@Timeout(value = 3, unit = TimeUnit.SECONDS)
fun `job cancellation should destroy the process`(captureStdout: Boolean) = runSuspendTest {
var visitedCancelledBlock = false
val job = launch(Dispatchers.IO) {
try {
val ret = process(
"cat", // cat without args is an endless process.
stdout = if (captureStdout) CAPTURE else SILENT
)
throw AssertionError("Process completed despite being cancelled: $ret")
} catch (e: CancellationException) {
visitedCancelledBlock = true
}
}
}

// Introduce delays to be sure the job was started before being cancelled.
delay(500L)
job.cancel()
delay(500L)
// Introduce delays to be sure the job was started before being cancelled.
delay(500L)
job.cancel()
delay(500L)

job.isCancelled shouldBeEqualTo true
job.isCompleted shouldBeEqualTo true
visitedCancelledBlock shouldBeEqualTo true
job.isCancelled shouldBeEqualTo true
job.isCompleted shouldBeEqualTo true
visitedCancelledBlock shouldBeEqualTo true
}
}

@Nested
Expand Down

0 comments on commit 246c46d

Please sign in to comment.