blob: 63e30f2617f7d4f4379564baebe127f27dd1e822 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.disposables.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
/**
* Converts this job to the hot reactive completable that signals
* with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
*
* Every subscriber gets the signal at the same time.
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
*
* **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
* in the future to account for the concept of structured concurrency.
*
* @param context -- the coroutine context from which the resulting completable is going to be signalled
*/
@ExperimentalCoroutinesApi
public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
this@asCompletable.join()
}
/**
* Converts this deferred value to the hot reactive maybe that signals
* [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
*
* Every subscriber gets the same completion value.
* Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
*
* **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
* in the future to account for the concept of structured concurrency.
*
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
*/
@ExperimentalCoroutinesApi
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
this@asMaybe.await()
}
/**
* Converts this deferred value to the hot reactive single that signals either
* [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
*
* Every subscriber gets the same completion value.
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
*
* **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
* in the future to account for the concept of structured concurrency.
*
* @param context -- the coroutine context from which the resulting single is going to be signalled
*/
@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
this@asSingle.await()
}
/**
* Transforms given cold [ObservableSource] into cold [Flow].
*
* The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
* is applied to the resulting flow.
*
* A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
*/
@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val disposableRef = AtomicReference<Disposable>()
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) {
try {
sendBlocking(t)
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
// Is handled by the downstream flow
}
}
override fun onError(e: Throwable) { close(e) }
}
subscribe(observer)
awaitClose { disposableRef.getAndSet(Disposable.disposed())?.dispose() }
}
/**
* Converts the given flow to a cold observable.
* The original flow is cancelled when the observable subscriber is disposed.
*
* An optional [context] can be specified to control the execution context of calls to [Observer] methods.
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
* asObservable is already invoked from unconfined
*/
val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
try {
collect { value -> emitter.onNext(value) }
emitter.onComplete()
} catch (e: Throwable) {
// 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
if (e !is CancellationException) {
if (!emitter.tryOnError(e)) {
handleUndeliverableException(e, coroutineContext)
}
} else {
emitter.onComplete()
}
}
}
emitter.setCancellable(RxCancellable(job))
}
/**
* Converts the given flow to a cold flowable.
* The original flow is cancelled when the flowable subscriber is disposed.
*
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
asFlowable(context)
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)