| /* |
| * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| */ |
| |
| package kotlinx.coroutines.rx3 |
| |
| import io.reactivex.rxjava3.core.* |
| import io.reactivex.rxjava3.exceptions.* |
| import kotlinx.atomicfu.* |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| import kotlinx.coroutines.selects.* |
| import kotlinx.coroutines.sync.* |
| import kotlin.coroutines.* |
| import kotlinx.coroutines.internal.* |
| import kotlinx.coroutines.intrinsics.* |
| |
| /** |
| * Creates cold [observable][Observable] that will run a given [block] in a coroutine. |
| * Every time the returned observable is subscribed, it starts a new coroutine. |
| * |
| * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete]) |
| * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError]) |
| * if coroutine throws an exception or closes channel with a cause. |
| * Unsubscribing cancels running coroutine. |
| * |
| * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently. |
| * Note that Rx 2.x [Observable] **does not support backpressure**. |
| * |
| * Coroutine context can be specified with [context] argument. |
| * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. |
| * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. |
| */ |
| public fun <T : Any> rxObservable( |
| context: CoroutineContext = EmptyCoroutineContext, |
| @BuilderInference block: suspend ProducerScope<T>.() -> Unit |
| ): Observable<T> { |
| require(context[Job] === null) { "Observable context cannot contain job in it." + |
| "Its lifecycle should be managed via Disposable handle. Had $context" } |
| return rxObservableInternal(GlobalScope, context, block) |
| } |
| |
| private fun <T : Any> rxObservableInternal( |
| scope: CoroutineScope, // support for legacy rxObservable in scope |
| context: CoroutineContext, |
| block: suspend ProducerScope<T>.() -> Unit |
| ): Observable<T> = Observable.create { subscriber -> |
| val newContext = scope.newCoroutineContext(context) |
| val coroutine = RxObservableCoroutine(newContext, subscriber) |
| subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions |
| coroutine.start(CoroutineStart.DEFAULT, coroutine, block) |
| } |
| |
| private const val OPEN = 0 // open channel, still working |
| private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet |
| private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError |
| |
| private class RxObservableCoroutine<T : Any>( |
| parentContext: CoroutineContext, |
| private val subscriber: ObservableEmitter<T> |
| ) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { |
| override val channel: SendChannel<T> get() = this |
| |
| // Mutex is locked while subscriber.onXXX is being invoked |
| private val mutex = Mutex() |
| |
| private val _signal = atomic(OPEN) |
| |
| override val isClosedForSend: Boolean get() = !isActive |
| override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) |
| override fun invokeOnClose(handler: (Throwable?) -> Unit) = |
| throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") |
| |
| override fun trySend(element: T): ChannelResult<Unit> = |
| if (!mutex.tryLock()) { |
| ChannelResult.failure() |
| } else { |
| when (val throwable = doLockedNext(element)) { |
| null -> ChannelResult.success(Unit) |
| else -> ChannelResult.closed(throwable) |
| } |
| } |
| |
| public override suspend fun send(element: T) { |
| mutex.lock() |
| doLockedNext(element)?.let { throw it } |
| } |
| |
| override val onSend: SelectClause2<T, SendChannel<T>> |
| get() = this |
| |
| // registerSelectSend |
| @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") |
| override fun <R> registerSelectClause2( |
| select: SelectInstance<R>, |
| element: T, |
| block: suspend (SendChannel<T>) -> R |
| ) { |
| val clause = suspend { |
| doLockedNext(element)?.let { throw it } |
| block(this) |
| } |
| |
| // This is the default replacement proposed in onLock replacement |
| launch(start = CoroutineStart.UNDISPATCHED) { |
| mutex.lock() |
| // Already selected -- bail out |
| if (!select.trySelect()) { |
| mutex.unlock() |
| return@launch |
| } |
| |
| clause.startCoroutineCancellable(select.completion) |
| } |
| } |
| |
| // assert: mutex.isLocked() |
| private fun doLockedNext(elem: T): Throwable? { |
| // check if already closed for send |
| if (!isActive) { |
| doLockedSignalCompleted(completionCause, completionCauseHandled) |
| return getCancellationException() |
| } |
| // notify subscriber |
| try { |
| subscriber.onNext(elem) |
| } catch (e: Throwable) { |
| val cause = UndeliverableException(e) |
| val causeDelivered = close(cause) |
| unlockAndCheckCompleted() |
| return if (causeDelivered) { |
| // `cause` is the reason this channel is closed |
| cause |
| } else { |
| // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. |
| handleUndeliverableException(cause, context) |
| getCancellationException() |
| } |
| } |
| /* |
| * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might |
| * happen after this check and before `unlock` (see signalCompleted that does not do anything |
| * if it fails to acquire the lock that we are still holding). |
| * We have to recheck `isCompleted` after `unlock` anyway. |
| */ |
| unlockAndCheckCompleted() |
| return null |
| } |
| |
| private fun unlockAndCheckCompleted() { |
| mutex.unlock() |
| // recheck isActive |
| if (!isActive && mutex.tryLock()) |
| doLockedSignalCompleted(completionCause, completionCauseHandled) |
| } |
| |
| // assert: mutex.isLocked() |
| private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { |
| // cancellation failures |
| try { |
| if (_signal.value == SIGNALLED) |
| return |
| _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) |
| @Suppress("INVISIBLE_MEMBER") |
| val unwrappedCause = cause?.let { unwrap(it) } |
| if (unwrappedCause == null) { |
| try { |
| subscriber.onComplete() |
| } catch (e: Exception) { |
| handleUndeliverableException(e, context) |
| } |
| } else if (unwrappedCause is UndeliverableException && !handled) { |
| /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, |
| * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already |
| * cancelled. */ |
| handleUndeliverableException(cause, context) |
| } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) { |
| try { |
| /** If the subscriber is already in a terminal state, the error will be signalled to |
| * `RxJavaPlugins.onError`. */ |
| subscriber.onError(cause) |
| } catch (e: Exception) { |
| cause.addSuppressed(e) |
| handleUndeliverableException(cause, context) |
| } |
| } |
| } finally { |
| mutex.unlock() |
| } |
| } |
| |
| private fun signalCompleted(cause: Throwable?, handled: Boolean) { |
| if (!_signal.compareAndSet(OPEN, CLOSED)) return // abort, other thread invoked doLockedSignalCompleted |
| if (mutex.tryLock()) // if we can acquire the lock |
| doLockedSignalCompleted(cause, handled) |
| } |
| |
| override fun onCompleted(value: Unit) { |
| signalCompleted(null, false) |
| } |
| |
| override fun onCancelled(cause: Throwable, handled: Boolean) { |
| signalCompleted(cause, handled) |
| } |
| } |
| |