blob: a9c6ca48e5d64eb2f12771ace986b359a7a9ee68 [file] [log] [blame]
/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package android.arch.lifecycle;
import android.arch.core.executor.AppToolkitTaskExecutor;
import android.support.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.lang.ref.WeakReference;
/**
* Adapts {@link LiveData} input and output to the ReactiveStreams spec.
*/
@SuppressWarnings("WeakerAccess")
public final class LiveDataReactiveStreams {
private LiveDataReactiveStreams() {
}
/**
* Adapts the given {@link LiveData} stream to a ReactiveStreams {@link Publisher}.
*
* <p>
* By using a good publisher implementation such as RxJava 2.x Flowables, most consumers will
* be able to let the library deal with backpressure using operators and not need to worry about
* ever manually calling {@link Subscription#request}.
*
* <p>
* On subscription to the publisher, the observer will attach to the given {@link LiveData}.
* Once {@link Subscription#request) is called on the subscription object, an observer will be
* connected to the data stream. Calling request(Long.MAX_VALUE) is equivalent to creating an
* unbounded stream with no backpressure. If request with a finite count reaches 0, the observer
* will buffer the latest item and emit it to the subscriber when data is again requested. Any
* other items emitted during the time there was no backpressure requested will be dropped.
*/
public static <T> Publisher<T> toPublisher(
final LifecycleOwner lifecycle, final LiveData<T> liveData) {
return new Publisher<T>() {
boolean mObserving;
boolean mCanceled;
long mRequested;
@Nullable
T mLatest;
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
final Observer<T> observer = new Observer<T>() {
@Override
public void onChanged(@Nullable T t) {
if (mCanceled) {
return;
}
if (mRequested > 0) {
mLatest = null;
subscriber.onNext(t);
if (mRequested != Long.MAX_VALUE) {
mRequested--;
}
} else {
mLatest = t;
}
}
};
subscriber.onSubscribe(new Subscription() {
@Override
public void request(final long n) {
if (n < 0 || mCanceled) {
return;
}
AppToolkitTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
if (mCanceled) {
return;
}
// Prevent overflowage.
mRequested = mRequested + n >= mRequested
? mRequested + n : Long.MAX_VALUE;
if (!mObserving) {
mObserving = true;
liveData.observe(lifecycle, observer);
} else if (mLatest != null) {
observer.onChanged(mLatest);
mLatest = null;
}
}
});
}
@Override
public void cancel() {
if (mCanceled) {
return;
}
AppToolkitTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
if (mCanceled) {
return;
}
if (mObserving) {
liveData.removeObserver(observer);
mObserving = false;
}
mLatest = null;
mCanceled = true;
}
});
}
});
}
};
}
/**
* Creates an Observable {@link LiveData} stream from a ReactiveStreams publisher.
*/
public static <T> LiveData<T> fromPublisher(final Publisher<T> publisher) {
LiveData<T> liveData = new LiveData<>();
// Since we don't have a way to directly observe cancels, weakly hold the live data.
final WeakReference<LiveData<T>> liveDataRef = new WeakReference<>(liveData);
publisher.subscribe(new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
// Don't worry about backpressure. If the stream is too noisy then backpressure can
// be handled upstream.
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(final T t) {
final LiveData<T> liveData = liveDataRef.get();
if (liveData != null) {
liveData.postValue(t);
}
}
@Override
public void onError(Throwable t) {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException(t);
}
@Override
public void onComplete() {
}
});
return liveData;
}
}