blob: 24c3f11834cccd0b0b0b12112de4c33436b23c32 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.Scheduler
import kotlinx.coroutines.*
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
/**
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
* and provides native support of [delay] and [withTimeout].
*/
public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
/**
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
*/
public class SchedulerCoroutineDispatcher(
/**
* Underlying scheduler of current [CoroutineDispatcher].
*/
public val scheduler: Scheduler
) : CoroutineDispatcher(), Delay {
/** @suppress */
override fun dispatch(context: CoroutineContext, block: Runnable) {
scheduler.scheduleDirect(block)
}
/** @suppress */
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val disposable = scheduler.scheduleDirect({
with(continuation) { resumeUndispatched(Unit) }
}, timeMillis, TimeUnit.MILLISECONDS)
continuation.disposeOnCancellation(disposable)
}
/** @suppress */
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
return DisposableHandle { disposable.dispose() }
}
/** @suppress */
override fun toString(): String = scheduler.toString()
/** @suppress */
override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
/** @suppress */
override fun hashCode(): Int = System.identityHashCode(scheduler)
}