blob: e7f93868b1b3f0109d19c0b7d2b632f42dab9e63 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.disposables.*
import io.reactivex.rxjava3.plugins.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import java.util.concurrent.*
import kotlin.coroutines.*
/**
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
* and provides native support of [delay] and [withTimeout].
*/
public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
if (this is DispatcherScheduler) {
dispatcher
} else {
SchedulerCoroutineDispatcher(this)
}
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
@JvmName("asCoroutineDispatcher")
public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
SchedulerCoroutineDispatcher(this)
/**
* Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
*/
public fun CoroutineDispatcher.asScheduler(): Scheduler =
if (this is SchedulerCoroutineDispatcher) {
scheduler
} else {
DispatcherScheduler(this)
}
private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {
private val schedulerJob = SupervisorJob()
/**
* The scope for everything happening in this [DispatcherScheduler].
*
* Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
*/
private val scope = CoroutineScope(schedulerJob + dispatcher)
/**
* The counter of created workers, for their pretty-printing.
*/
private val workerCounter = atomic(1L)
override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
scope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { scope.launch { task() } }
}
override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)
override fun shutdown() {
schedulerJob.cancel()
}
private class DispatcherWorker(
private val counter: Long,
private val dispatcher: CoroutineDispatcher,
parentJob: Job
) : Worker() {
private val workerJob = SupervisorJob(parentJob)
private val workerScope = CoroutineScope(workerJob + dispatcher)
private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
init {
workerScope.launch {
blockChannel.consumeEach {
it()
}
}
}
override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { blockChannel.trySend(task) }
}
override fun isDisposed(): Boolean = !workerScope.isActive
override fun dispose() {
blockChannel.close()
workerJob.cancel()
}
override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
}
override fun toString(): String = dispatcher.toString()
}
private typealias Task = suspend () -> Unit
/**
* Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
* milliseconds.
*/
private fun CoroutineScope.scheduleTask(
block: Runnable,
delayMillis: Long,
adaptForScheduling: (Task) -> Runnable
): Disposable {
val ctx = coroutineContext
var handle: DisposableHandle? = null
val disposable = Disposable.fromRunnable {
// null if delay <= 0
handle?.dispose()
}
val decoratedBlock = RxJavaPlugins.onSchedule(block)
suspend fun task() {
if (disposable.isDisposed) return
try {
runInterruptible {
decoratedBlock.run()
}
} catch (e: Throwable) {
handleUndeliverableException(e, ctx)
}
}
val toSchedule = adaptForScheduling(::task)
if (!isActive) return Disposable.disposed()
if (delayMillis <= 0) {
toSchedule.run()
} else {
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
}
return disposable
}
/**
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
*/
public class SchedulerCoroutineDispatcher(
/**
* Underlying scheduler of current [CoroutineDispatcher].
*/
public val scheduler: Scheduler
) : CoroutineDispatcher(), Delay {
/** @suppress */
override fun dispatch(context: CoroutineContext, block: Runnable) {
scheduler.scheduleDirect(block)
}
/** @suppress */
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val disposable = scheduler.scheduleDirect({
with(continuation) { resumeUndispatched(Unit) }
}, timeMillis, TimeUnit.MILLISECONDS)
continuation.disposeOnCancellation(disposable)
}
/** @suppress */
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
return DisposableHandle { disposable.dispose() }
}
/** @suppress */
override fun toString(): String = scheduler.toString()
/** @suppress */
override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
/** @suppress */
override fun hashCode(): Int = System.identityHashCode(scheduler)
}