| /* |
| * Copyright (C) 2017 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package android.arch.lifecycle; |
| |
| import static android.arch.lifecycle.Lifecycle.State.RESUMED; |
| |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| |
| import android.arch.core.executor.AppToolkitTaskExecutor; |
| import android.arch.core.executor.TaskExecutor; |
| import android.support.annotation.Nullable; |
| import android.support.test.filters.SmallTest; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.reactivestreams.Subscriber; |
| import org.reactivestreams.Subscription; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import io.reactivex.Flowable; |
| import io.reactivex.disposables.Disposable; |
| import io.reactivex.functions.Consumer; |
| import io.reactivex.processors.PublishProcessor; |
| import io.reactivex.processors.ReplayProcessor; |
| import io.reactivex.schedulers.TestScheduler; |
| import io.reactivex.subjects.AsyncSubject; |
| |
| @SmallTest |
| public class LiveDataReactiveStreamsTest { |
| private static final Lifecycle sLifecycle = new Lifecycle() { |
| @Override |
| public void addObserver(LifecycleObserver observer) { |
| } |
| |
| @Override |
| public void removeObserver(LifecycleObserver observer) { |
| } |
| |
| @Override |
| public State getCurrentState() { |
| return RESUMED; |
| } |
| }; |
| private static final LifecycleOwner S_LIFECYCLE_OWNER = new LifecycleOwner() { |
| |
| @Override |
| public Lifecycle getLifecycle() { |
| return sLifecycle; |
| } |
| |
| }; |
| |
| private final List<String> mLiveDataOutput = new ArrayList<>(); |
| private final Observer<String> mObserver = new Observer<String>() { |
| @Override |
| public void onChanged(@Nullable String s) { |
| mLiveDataOutput.add(s); |
| } |
| }; |
| |
| private final ReplayProcessor<String> mOutputProcessor = ReplayProcessor.create(); |
| |
| private static final TestScheduler sBackgroundScheduler = new TestScheduler(); |
| private Thread mTestThread; |
| |
| @Before |
| public void init() { |
| mTestThread = Thread.currentThread(); |
| AppToolkitTaskExecutor.getInstance().setDelegate(new TaskExecutor() { |
| |
| @Override |
| public void executeOnDiskIO(Runnable runnable) { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public void postToMainThread(Runnable runnable) { |
| // Wrong implementation, but it is fine for test |
| runnable.run(); |
| } |
| |
| @Override |
| public boolean isMainThread() { |
| return Thread.currentThread() == mTestThread; |
| } |
| |
| }); |
| } |
| |
| @After |
| public void removeExecutorDelegate() { |
| AppToolkitTaskExecutor.getInstance().setDelegate(null); |
| } |
| |
| @Test |
| public void convertsFromPublisher() { |
| PublishProcessor<String> processor = PublishProcessor.create(); |
| LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor); |
| |
| liveData.observe(S_LIFECYCLE_OWNER, mObserver); |
| |
| processor.onNext("foo"); |
| processor.onNext("bar"); |
| processor.onNext("baz"); |
| |
| assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz"))); |
| } |
| |
| @Test |
| public void convertsFromPublisherWithMultipleObservers() { |
| final List<String> output2 = new ArrayList<>(); |
| PublishProcessor<String> processor = PublishProcessor.create(); |
| LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor); |
| |
| liveData.observe(S_LIFECYCLE_OWNER, mObserver); |
| |
| processor.onNext("foo"); |
| processor.onNext("bar"); |
| |
| // The second mObserver should only get the newest value and any later values. |
| liveData.observe(S_LIFECYCLE_OWNER, new Observer<String>() { |
| @Override |
| public void onChanged(@Nullable String s) { |
| output2.add(s); |
| } |
| }); |
| |
| processor.onNext("baz"); |
| |
| assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz"))); |
| assertThat(output2, is(Arrays.asList("bar", "baz"))); |
| } |
| |
| @Test |
| public void convertsFromAsyncPublisher() { |
| Flowable<String> input = Flowable.just("foo") |
| .concatWith(Flowable.just("bar", "baz").observeOn(sBackgroundScheduler)); |
| LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(input); |
| |
| liveData.observe(S_LIFECYCLE_OWNER, mObserver); |
| |
| assertThat(mLiveDataOutput, is(Collections.singletonList("foo"))); |
| sBackgroundScheduler.triggerActions(); |
| assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz"))); |
| } |
| |
| @Test |
| public void convertsToPublisherWithSyncData() { |
| MutableLiveData<String> liveData = new MutableLiveData<>(); |
| liveData.setValue("foo"); |
| assertThat(liveData.getValue(), is("foo")); |
| |
| Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(S_LIFECYCLE_OWNER, liveData)) |
| .subscribe(mOutputProcessor); |
| |
| liveData.setValue("bar"); |
| liveData.setValue("baz"); |
| |
| assertThat( |
| mOutputProcessor.getValues(new String[]{}), |
| is(new String[] {"foo", "bar", "baz"})); |
| } |
| |
| @Test |
| public void convertingToPublisherIsCancelable() { |
| MutableLiveData<String> liveData = new MutableLiveData<>(); |
| liveData.setValue("foo"); |
| assertThat(liveData.getValue(), is("foo")); |
| |
| Disposable disposable = Flowable |
| .fromPublisher(LiveDataReactiveStreams.toPublisher(S_LIFECYCLE_OWNER, liveData)) |
| .subscribe(new Consumer<String>() { |
| @Override |
| public void accept(String s) throws Exception { |
| mLiveDataOutput.add(s); |
| } |
| }); |
| |
| liveData.setValue("bar"); |
| liveData.setValue("baz"); |
| |
| assertThat(liveData.hasObservers(), is(true)); |
| disposable.dispose(); |
| |
| liveData.setValue("fizz"); |
| liveData.setValue("buzz"); |
| |
| assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz"))); |
| // Canceling disposable should also remove livedata mObserver. |
| assertThat(liveData.hasObservers(), is(false)); |
| } |
| |
| @Test |
| public void convertsToPublisherWithBackpressure() { |
| MutableLiveData<String> liveData = new MutableLiveData<>(); |
| |
| final AsyncSubject<Subscription> subscriptionSubject = AsyncSubject.create(); |
| |
| Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(S_LIFECYCLE_OWNER, liveData)) |
| .subscribe(new Subscriber<String>() { |
| @Override |
| public void onSubscribe(Subscription s) { |
| subscriptionSubject.onNext(s); |
| subscriptionSubject.onComplete(); |
| } |
| |
| @Override |
| public void onNext(String s) { |
| mOutputProcessor.onNext(s); |
| } |
| |
| @Override |
| public void onError(Throwable t) { |
| throw new RuntimeException(t); |
| } |
| |
| @Override |
| public void onComplete() { |
| } |
| }); |
| |
| // Subscription should have happened synchronously. If it didn't, this will deadlock. |
| final Subscription subscription = subscriptionSubject.blockingSingle(); |
| |
| subscription.request(1); |
| assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {})); |
| |
| liveData.setValue("foo"); |
| assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {"foo"})); |
| |
| subscription.request(2); |
| liveData.setValue("baz"); |
| liveData.setValue("fizz"); |
| |
| assertThat( |
| mOutputProcessor.getValues(new String[]{}), |
| is(new String[] {"foo", "baz", "fizz"})); |
| |
| // 'nyan' will be dropped as there is nothing currently requesting a stream. |
| liveData.setValue("nyan"); |
| liveData.setValue("cat"); |
| |
| assertThat( |
| mOutputProcessor.getValues(new String[]{}), |
| is(new String[] {"foo", "baz", "fizz"})); |
| |
| // When a new request comes in, the latest value will be pushed. |
| subscription.request(1); |
| assertThat( |
| mOutputProcessor.getValues(new String[]{}), |
| is(new String[] {"foo", "baz", "fizz", "cat"})); |
| } |
| |
| @Test |
| public void convertsToPublisherWithAsyncData() { |
| MutableLiveData<String> liveData = new MutableLiveData<>(); |
| |
| Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(S_LIFECYCLE_OWNER, liveData)) |
| .observeOn(sBackgroundScheduler) |
| .subscribe(mOutputProcessor); |
| |
| liveData.setValue("foo"); |
| |
| assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {})); |
| sBackgroundScheduler.triggerActions(); |
| assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {"foo"})); |
| |
| liveData.setValue("bar"); |
| liveData.setValue("baz"); |
| |
| assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {"foo"})); |
| sBackgroundScheduler.triggerActions(); |
| assertThat(mOutputProcessor.getValues( |
| new String[]{}), |
| is(new String[] {"foo", "bar", "baz"})); |
| } |
| } |