Skip to content

Commit f94b7a1

Browse files
committed
Do not use DefaultExecutor for cleanup work
Before this change, DefaultExecutor was occasionally used for executing the work of dispatchers that no longer function. This is no longer the case: instead, Dispatchers.IO is used for that on our multithreaded targets.
1 parent f39e482 commit f94b7a1

20 files changed

+114
-96
lines changed

kotlinx-coroutines-core/common/src/Dispatchers.common.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,27 @@ public expect object Dispatchers {
7171
*/
7272
public val Unconfined: CoroutineDispatcher
7373
}
74+
75+
/**
76+
* If a task can no longer run because its dispatcher is closed, it is rescheduled to another dispatcher.
77+
*
78+
* This is required to avoid a situation where some finalizers do not run:
79+
* ```
80+
* val dispatcher = newSingleThreadContext("test")
81+
* launch(dispatcher) {
82+
* val resource = Resource()
83+
* try {
84+
* // do something `suspending` with resource
85+
* } finally {
86+
* resource.close()
87+
* }
88+
* }
89+
* dispatcher.close()
90+
* ```
91+
*
92+
* `close` needs to run somewhere, but it can't run on the closed dispatcher.
93+
*
94+
* On the JVM and Native, we reschedule to the thread pool backing `Dispatchers.IO`,
95+
* because an arbitrary task may well have blocking behavior.
96+
*/
97+
internal expect fun rescheduleTaskFromClosedDispatcher(task: Runnable)

kotlinx-coroutines-core/common/src/EventLoop.common.kt

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,6 @@ private typealias Queue<T> = LockFreeTaskQueueCore<T>
169169
internal expect abstract class EventLoopImplPlatform() : EventLoop {
170170
// Called to unpark this event loop's thread
171171
protected fun unpark()
172-
173-
// Called to reschedule to DefaultExecutor when this event loop is complete
174-
protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
175172
}
176173

177174
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
@@ -275,7 +272,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
275272
// todo: we should unpark only when this delayed task became first in the queue
276273
unpark()
277274
} else {
278-
DefaultExecutor.enqueue(task)
275+
rescheduleTaskFromClosedDispatcher(task)
279276
}
280277
}
281278

@@ -408,6 +405,14 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
408405
}
409406
}
410407

408+
// Called to reschedule when this event loop is complete
409+
protected open fun reschedule(now: Long, delayedTask: DelayedTask) {
410+
val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now)
411+
DefaultDelay.invokeOnTimeout(delayTimeMillis, Runnable {
412+
rescheduleTaskFromClosedDispatcher(delayedTask)
413+
}, EmptyCoroutineContext)
414+
}
415+
411416
internal abstract class DelayedTask(
412417
/**
413418
* This field can be only modified in [scheduleTask] before putting this DelayedTask
@@ -530,10 +535,6 @@ internal expect fun createEventLoop(): EventLoop
530535

531536
internal expect fun nanoTime(): Long
532537

533-
internal expect object DefaultExecutor {
534-
fun enqueue(task: Runnable)
535-
}
536-
537538
/**
538539
* Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
539540
* non-Darwin native targets.

kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,8 @@ import kotlin.coroutines.*
2020
*
2121
* Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
2222
* block, potentially leading to thread starvation issues.
23+
*
24+
* If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns,
25+
* they are resubmitted to [Dispatchers.IO].
2326
*/
2427
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T

kotlinx-coroutines-core/concurrent/src/Dispatchers.kt renamed to kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,26 @@ package kotlinx.coroutines
3939
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
4040
public expect val Dispatchers.IO: CoroutineDispatcher
4141

42-
42+
internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) {
43+
/**
44+
* We do not create a separate view of [Dispatchers.IO] for the cleanup needs.
45+
*
46+
* If [Dispatchers.IO] is not flooded with other tasks + the cleanup view does not have more threads than
47+
* [Dispatchers.IO], there is no difference between sending cleanup tasks to [Dispatchers.IO] and creating
48+
* a separate view of [Dispatchers.IO] for cleanup.
49+
*
50+
* If [Dispatchers.IO] is flooded with other tasks, we are at a crossroads:
51+
* - On the one hand, we do not want to create too many threads.
52+
* Some clients are carefully monitoring the number of threads and are relying on it not being larger than
53+
* the system property + the sum of explicit `limitedParallelism` arguments in the system.
54+
* - On the other hand, we don't want to delay productive tasks in favor of cleanup tasks.
55+
*
56+
* The first consideration wins on two accounts:
57+
* - As of writing this, [Dispatchers.IO] has been in use as the cleanup dispatcher for dispatchers obtained
58+
* from JVM executors for years, and this has not caused any issues that we know of.
59+
* - On the other hand, some people likely rely on the number of threads not exceeding the number they control.
60+
* If we were to create a separate view of [Dispatchers.IO] for cleanup, this number would be exceeded, which
61+
* is a regression.
62+
*/
63+
Dispatchers.IO.dispatch(Dispatchers.IO, task)
64+
}

kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@ internal class UnconfinedEventLoop : EventLoop() {
1212

1313
internal actual abstract class EventLoopImplPlatform : EventLoop() {
1414
protected actual fun unpark(): Unit = unsupported()
15-
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit = unsupported()
16-
}
17-
18-
internal actual object DefaultExecutor {
19-
public actual fun enqueue(task: Runnable): Unit = unsupported()
2015
}
2116

2217
private fun unsupported(): Nothing =
2318
throw UnsupportedOperationException("runBlocking event loop is not supported")
2419

2520
internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
21+
22+
internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) {
23+
Dispatchers.Default.dispatch(Dispatchers.Default, task)
24+
}

kotlinx-coroutines-core/jvm/src/Builders.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package kotlinx.coroutines
66

7-
import java.util.concurrent.locks.*
87
import kotlin.contracts.*
98
import kotlin.coroutines.*
109

@@ -38,6 +37,9 @@ import kotlin.coroutines.*
3837
* If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
3938
* this `runBlocking` invocation throws [InterruptedException].
4039
*
40+
* If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns,
41+
* they are resubmitted to [Dispatchers.IO].
42+
*
4143
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
4244
* for a newly created coroutine.
4345
*

kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,27 @@ internal actual val DefaultDelay: Delay = initializeDefaultDelay()
1111

1212
private fun initializeDefaultDelay(): Delay {
1313
// Opt-out flag
14-
if (!defaultMainDelayOptIn) return DefaultExecutor
14+
if (!defaultMainDelayOptIn) return DefaultDelayImpl
1515
val main = Dispatchers.Main
1616
/*
1717
* When we already are working with UI and Main threads, it makes
1818
* no sense to create a separate thread with timer that cannot be controller
1919
* by the UI runtime.
2020
*/
21-
return if (main.isMissing() || main !is Delay) DefaultExecutor else main
21+
return if (main.isMissing() || main !is Delay) DefaultDelayImpl else main
2222
}
2323

24-
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
25-
internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
24+
internal object DefaultExecutor {
25+
fun shutdown() = DefaultDelayImpl.shutdown()
26+
27+
fun ensureStarted() = DefaultDelayImpl.ensureStarted()
28+
29+
fun shutdownForTests(timeout: Long) = DefaultDelayImpl.shutdownForTests(timeout)
30+
31+
val isThreadPresent: Boolean get() = DefaultDelayImpl.isThreadPresent
32+
}
33+
34+
private object DefaultDelayImpl : EventLoopImplBase(), Runnable {
2635
const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"
2736

2837
init {
@@ -61,7 +70,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
6170
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
6271
}
6372

64-
actual override fun enqueue(task: Runnable) {
73+
override fun enqueue(task: Runnable) {
6574
if (isShutDown) shutdownError()
6675
super.enqueue(task)
6776
}
@@ -137,7 +146,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
137146
* the singleton itself instead of using parent' thread one
138147
* in order not to accidentally capture temporary application classloader.
139148
*/
140-
contextClassLoader = this@DefaultExecutor.javaClass.classLoader
149+
contextClassLoader = this@DefaultDelayImpl.javaClass.classLoader
141150
isDaemon = true
142151
start()
143152
}

kotlinx-coroutines-core/jvm/src/Dispatchers.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kotlinx.coroutines
22

33
import kotlinx.coroutines.internal.*
44
import kotlinx.coroutines.scheduling.*
5-
import kotlin.coroutines.*
65

76
/**
87
* Name of the property that defines the maximal number of threads that are used by [Dispatchers.IO] coroutines dispatcher.

kotlinx-coroutines-core/jvm/src/EventLoop.kt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kotlinx.coroutines
33
import kotlinx.coroutines.Runnable
44
import kotlinx.coroutines.scheduling.*
55
import kotlinx.coroutines.scheduling.CoroutineScheduler
6+
import kotlin.coroutines.EmptyCoroutineContext
67

78
internal actual abstract class EventLoopImplPlatform: EventLoop() {
89

@@ -14,9 +15,6 @@ internal actual abstract class EventLoopImplPlatform: EventLoop() {
1415
unpark(thread)
1516
}
1617

17-
protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
18-
DefaultExecutor.schedule(now, delayedTask)
19-
}
2018
}
2119

2220
internal class BlockingEventLoop(
@@ -122,4 +120,3 @@ internal fun Thread.isIoDispatcherThread(): Boolean {
122120
if (this !is CoroutineScheduler.Worker) return false
123121
return isIo()
124122
}
125-

kotlinx-coroutines-core/jvm/src/Executors.kt

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
131131
} catch (e: RejectedExecutionException) {
132132
unTrackTask()
133133
cancelJobOnRejection(context, e)
134-
Dispatchers.IO.dispatch(context, block)
134+
rescheduleTaskFromClosedDispatcher(block)
135135
}
136136
}
137137

@@ -146,15 +146,15 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
146146
continuation.invokeOnCancellation(CancelFutureOnCancel(future))
147147
return
148148
}
149-
// Otherwise fallback to default executor
150-
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
149+
// Otherwise fallback to default delay
150+
DefaultDelay.scheduleResumeAfterDelay(timeMillis, continuation)
151151
}
152152

153153
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
154154
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
155155
return when {
156156
future != null -> DisposableFutureHandle(future)
157-
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
157+
else -> DefaultDelay.invokeOnTimeout(timeMillis, block, context)
158158
}
159159
}
160160

@@ -189,6 +189,14 @@ private class ResumeUndispatchedRunnable(
189189
}
190190
}
191191

192+
private class ResumeDispatchedRunnable(
193+
private val continuation: CancellableContinuation<Unit>
194+
) : Runnable {
195+
override fun run() {
196+
continuation.resume(Unit)
197+
}
198+
}
199+
192200
/**
193201
* An implementation of [DisposableHandle] that cancels the specified future on dispose.
194202
* @suppress **This is unstable API and it is subject to change.**

0 commit comments

Comments
 (0)