blob: 163662b538525a79163bbaf3ac7fb2357bddebe6 [file] [log] [blame]
class BugExample {
public static void main(String... args) {
Observable<String> obs = create((Observer<? super String> o) -> {
o.onNext("one");
o.onNext("two");
o.onCompleted();
});
obs.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onNext(String v) {
System.out.println("Value: " + v);
}
});
Observable<String> obs2 = create(new OnSubscribeFunc<String>() {
@Override
public void onSubscribe(Observer<? super String> o) {
o.onNext("one");
o.onNext("two");
o.onCompleted();
}
});
obs2.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onNext(String v) {
System.out.println("Value: " + v);
}
});
}
public static class Observable<T> {
private final OnSubscribeFunc<T> f;
public Observable(OnSubscribeFunc<T> f) {
this.f = f;
}
public void subscribe(Observer<T> o) {
f.onSubscribe(o);
}
}
public static <T> Observable<T> create(OnSubscribeFunc<T> func) {
return new Observable<T>(func);
}
public static interface OnSubscribeFunc<T> {
public void onSubscribe(Observer<? super T> t1);
}
public interface Observer<T> {
public void onCompleted();
public void onError(Throwable e);
public void onNext(T args);
}
}