blob: fdfd24582ef5a95b9beb5a3d9aa5faf33d18dd9b [file] [log] [blame]
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package org.reactivestreams.tck;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.TestEnvironment.BlackholeSubscriberWithSubscriptionSupport;
import org.reactivestreams.tck.TestEnvironment.Latch;
import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
import org.reactivestreams.tck.flow.support.Function;
import org.reactivestreams.tck.flow.support.Optional;
import org.reactivestreams.tck.flow.support.PublisherVerificationRules;
import org.testng.SkipException;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.lang.Override;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
/**
* Provides tests for verifying {@code Publisher} specification rules.
*
* @see org.reactivestreams.Publisher
*/
public abstract class PublisherVerification<T> implements PublisherVerificationRules {
private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS";
private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L;
private final TestEnvironment env;
/**
* The amount of time after which a cancelled Subscriber reference should be dropped.
* See Rule 3.13 for details.
*/
private final long publisherReferenceGCTimeoutMillis;
/**
* Constructs a new verification class using the given env and configuration.
*
* @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
*/
public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
this.env = env;
this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis;
}
/**
* Constructs a new verification class using the given env and configuration.
*
* The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}.
*/
public PublisherVerification(TestEnvironment env) {
this.env = env;
this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis();
}
/**
* Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present,
* OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}).
*
* This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
*
* @throws java.lang.IllegalArgumentException when unable to parse the env variable
*/
public static long envPublisherReferenceGCTimeoutMillis() {
final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV);
if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS;
else try {
return Long.parseLong(envMillis);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex);
}
}
/**
* This is the main method you must implement in your test incarnation.
* It must create a Publisher for a stream with exactly the given number of elements.
* If `elements` is `Long.MAX_VALUE` the produced stream must be infinite.
*/
public abstract Publisher<T> createPublisher(long elements);
/**
* By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
*
* The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
* followed by signalling {@code onError} on it, as specified by Rule 1.9.
*
* If you ignore these additional tests, return {@code null} from this method.
*/
public abstract Publisher<T> createFailedPublisher();
/**
* Override and return lower value if your Publisher is only able to produce a known number of elements.
* For example, if it is designed to return at-most-one element, return {@code 1} from this method.
*
* Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements.
*
* To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE},
* which will result in *skipping all tests which require an onComplete to be triggered* (!).
*/
public long maxElementsFromPublisher() {
return Long.MAX_VALUE - 1;
}
/**
* Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
* Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify—
* usually this means that this test case can yield false positives ("be green") even if for some case,
* the given implementation may violate the tested behaviour.
*/
public boolean skipStochasticTests() {
return false;
}
/**
* In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
* {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
* recursive calls to exceed the number returned by this method.
*
* @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a>
* @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion()
*/
public long boundedDepthOfOnNextAndRequestRecursion() {
return 1;
}
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
@BeforeMethod
public void setUp() throws Exception {
env.clearAsyncErrors();
}
////////////////////// TEST SETUP VERIFICATION //////////////////////////////
@Override @Test
public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
activePublisherTest(1, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws InterruptedException {
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
sub.requestEndOfStream();
}
Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
}
});
}
@Override @Test
public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
activePublisherTest(3, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws InterruptedException {
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub));
assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 2 elements", pub));
sub.requestEndOfStream();
}
Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
}
});
}
@Override @Test
public void required_validate_maxElementsFromPublisher() throws Exception {
assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0");
}
@Override @Test
public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1");
}
////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
@Override @Test
public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
activePublisherTest(5, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws InterruptedException {
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
try {
sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
sub.request(1);
sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
sub.request(1);
sub.request(2);
sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
} finally {
sub.cancel();
}
}
});
}
@Override @Test
public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
final int elements = 3;
final int requested = 10;
activePublisherTest(elements, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.request(requested);
sub.nextElements(elements);
sub.expectCompletion();
}
});
}
@Override @Test
public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
final int iterations = 100;
final int elements = 10;
stochasticTest(iterations, new Function<Integer, Void>() {
@Override
public Void apply(final Integer runNumber) throws Throwable {
activePublisherTest(elements, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final Latch completionLatch = new Latch(env);
final AtomicInteger gotElements = new AtomicInteger(0);
pub.subscribe(new Subscriber<T>() {
private Subscription subs;
private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier();
/**
* Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect
* a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness.
*
* Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call
* {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough
* to verify concurrent access safety - one needs to track if the caller is not still using the calling thread
* to enter subsequent critical sections ("nesting" them effectively).
*/
final class ConcurrentAccessBarrier {
private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null);
private volatile String previousSignal = null;
public void enterSignal(String signalName) {
if((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) {
env.flop(String.format(
"Illegal concurrent access detected (entering critical section)! " +
"%s emited %s signal, before %s finished its %s signal.",
Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal));
}
this.previousSignal = signalName;
}
public void leaveSignal(String signalName) {
currentlySignallingThread.set(null);
this.previousSignal = signalName;
}
private boolean isSynchronousSignal() {
return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get());
}
}
@Override
public void onSubscribe(Subscription s) {
final String signal = "onSubscribe()";
concurrentAccessBarrier.enterSignal(signal);
subs = s;
subs.request(1);
concurrentAccessBarrier.leaveSignal(signal);
}
@Override
public void onNext(T ignore) {
final String signal = String.format("onNext(%s)", ignore);
concurrentAccessBarrier.enterSignal(signal);
if (gotElements.incrementAndGet() <= elements) // requesting one more than we know are in the stream (some Publishers need this)
subs.request(1);
concurrentAccessBarrier.leaveSignal(signal);
}
@Override
public void onError(Throwable t) {
final String signal = String.format("onError(%s)", t.getMessage());
concurrentAccessBarrier.enterSignal(signal);
// ignore value
concurrentAccessBarrier.leaveSignal(signal);
}
@Override
public void onComplete() {
final String signal = "onComplete()";
concurrentAccessBarrier.enterSignal(signal);
// entering for completeness
concurrentAccessBarrier.leaveSignal(signal);
completionLatch.close();
}
});
completionLatch.expectClose(
elements * env.defaultTimeoutMillis(),
String.format("Failed in iteration %d of %d. Expected completion signal after signalling %d elements (signalled %d), yet did not receive it",
runNumber, iterations, elements, gotElements.get()));
}
});
return null;
}
});
}
@Override @Test
public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
try {
whenHasErrorPublisherTest(new PublisherTestRun<T>() {
@Override
public void run(final Publisher<T> pub) throws InterruptedException {
final Latch onErrorlatch = new Latch(env);
final Latch onSubscribeLatch = new Latch(env);
pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) {
@Override
public void onSubscribe(Subscription subs) {
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
onSubscribeLatch.close();
}
@Override
public void onError(Throwable cause) {
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
onErrorlatch.close();
}
});
onSubscribeLatch.expectClose("Should have received onSubscribe");
onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
env.verifyNoAsyncErrors();
}
});
} catch (SkipException se) {
throw se;
} catch (Throwable ex) {
// we also want to catch AssertionErrors and anything the publisher may have thrown inside subscribe
// which was wrong of him - he should have signalled on error using onError
throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex);
}
}
@Override @Test
public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
activePublisherTest(3, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.requestNextElement();
sub.requestNextElement();
sub.requestNextElement();
sub.requestEndOfStream();
sub.expectNone();
}
});
}
@Override @Test
public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
optionalActivePublisherTest(0, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.request(1);
sub.expectCompletion();
sub.expectNone();
}
});
}
@Override @Test
public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
notVerified(); // not really testable without more control over the Publisher
}
@Override @Test
public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
activePublisherTest(1, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.request(10);
sub.nextElement();
sub.expectCompletion();
sub.request(10);
sub.expectNone();
}
});
}
@Override @Test
public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
notVerified(); // can we meaningfully test this, without more control over the publisher?
}
@Override @Test
public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
notVerified(); // can we meaningfully test this?
}
@Override @Test
public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
notVerified(); // can we meaningfully test this?
}
@Override @Test
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
activePublisherTest(0, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
try {
pub.subscribe(null);
env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
} catch (NullPointerException ignored) {
// valid behaviour
}
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
activePublisherTest(0, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final Latch onSubscribeLatch = new Latch(env);
final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>();
try {
pub.subscribe(new Subscriber<T>() {
@Override
public void onError(Throwable cause) {
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
}
@Override
public void onSubscribe(Subscription subs) {
cancel.set(subs);
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
onSubscribeLatch.close();
}
@Override
public void onNext(T elem) {
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
}
@Override
public void onComplete() {
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
}
});
onSubscribeLatch.expectClose("Should have received onSubscribe");
env.verifyNoAsyncErrorsNoDelay();
} finally {
Subscription s = cancel.getAndSet(null);
if (s != null) {
s.cancel();
}
}
}
});
}
@Override @Test
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
whenHasErrorPublisherTest(new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final Latch onErrorLatch = new Latch(env);
final Latch onSubscribeLatch = new Latch(env);
ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
@Override
public void onError(Throwable cause) {
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
onErrorLatch.assertOpen("Only one onError call expected");
onErrorLatch.close();
}
@Override
public void onSubscribe(Subscription subs) {
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
onSubscribeLatch.close();
}
};
pub.subscribe(sub);
onSubscribeLatch.expectClose("Should have received onSubscribe");
onErrorLatch.expectClose("Should have received onError");
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
notVerified(); // can we meaningfully test this?
}
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11
@Override @Test
public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
try {
env.verifyNoAsyncErrors();
} finally {
try {
sub1.cancel();
} finally {
sub2.cancel();
}
}
}
});
}
@Override @Test
public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable {
optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
// Since we're testing the case when the Publisher DOES support the optional multi-subscribers scenario,
// and decides if it handles them uni-cast or multi-cast, we don't know which subscriber will receive an
// onNext (and optional onComplete) signal(s) and which just onComplete signal.
// Plus, even if subscription assumed to be unicast, it's implementation choice, which one will be signalled
// with onNext.
sub1.requestNextElementOrEndOfStream();
sub2.requestNextElementOrEndOfStream();
try {
env.verifyNoAsyncErrors();
} finally {
try {
sub1.cancel();
} finally {
sub2.cancel();
}
}
}
});
}
@Override @Test
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
@Override
public void run(Publisher<T> pub) throws InterruptedException {
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
sub1.request(1);
T x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
sub2.request(2);
List<T> y1 = sub2.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub));
sub1.request(1);
T x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
sub3.request(3);
List<T> z1 = sub3.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub));
sub3.request(1);
T z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
sub3.request(1);
T z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub));
sub2.request(3);
List<T> y2 = sub2.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub));
sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub));
sub1.request(2);
List<T> x3 = sub1.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub));
sub1.request(1);
T x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub));
@SuppressWarnings("unchecked")
List<T> r = new ArrayList<T>(Arrays.asList(x1, x2));
r.addAll(x3);
r.addAll(Collections.singleton(x4));
List<T> check1 = new ArrayList<T>(y1);
check1.addAll(y2);
//noinspection unchecked
List<T> check2 = new ArrayList<T>(z1);
check2.add(z2);
check2.add(z3);
assertEquals(r, check1, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub));
assertEquals(r, check2, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub));
}
});
}
@Override @Test
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
List<T> received1 = new ArrayList<T>();
List<T> received2 = new ArrayList<T>();
List<T> received3 = new ArrayList<T>();
// if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
// edgy edge case?
sub1.request(4);
sub2.request(4);
sub3.request(4);
received1.addAll(sub1.nextElements(3));
received2.addAll(sub2.nextElements(3));
received3.addAll(sub3.nextElements(3));
// NOTE: can't check completion, the Publisher may not be able to signal it
// a similar test *with* completion checking is implemented
assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
}
});
}
@Override @Test
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
List<T> received1 = new ArrayList<T>();
List<T> received2 = new ArrayList<T>();
List<T> received3 = new ArrayList<T>();
// if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
// edgy edge case?
sub1.request(4);
sub2.request(4);
sub3.request(4);
received1.addAll(sub1.nextElements(3));
received2.addAll(sub2.nextElements(3));
received3.addAll(sub3.nextElements(3));
sub1.expectCompletion();
sub2.expectCompletion();
sub3.expectCompletion();
assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
}
});
}
///////////////////// SUBSCRIPTION TESTS //////////////////////////////////
@Override @Test
public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
activePublisherTest(6, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub = new ManualSubscriber<T>(env) {
@Override
public void onSubscribe(Subscription subs) {
this.subscription.completeImmediatly(subs);
subs.request(1);
subs.request(1);
subs.request(1);
}
@Override
public void onNext(T element) {
Subscription subs = this.subscription.value();
subs.request(1);
}
};
env.subscribe(pub, sub);
env.verifyNoAsyncErrors();
}
});
}
@Override @Test
public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable {
final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1;
activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>() {
@Override
protected Long initialValue() {
return 0L;
}
};
final Latch runCompleted = new Latch(env);
final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
// counts the number of signals received, used to break out from possibly infinite request/onNext loops
long signalsReceived = 0L;
@Override
public void onNext(T element) {
// NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements
// which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing
signalsReceived += 1;
stackDepthCounter.set(stackDepthCounter.get() + 1);
if (env.debugEnabled()) {
env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
}
final long callsUntilNow = stackDepthCounter.get();
if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d",
callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion()));
// stop the recursive call chain
runCompleted.close();
return;
} else if (signalsReceived >= oneMoreThanBoundedLimit) {
// since max number of signals reached, and recursion depth not exceeded, we judge this as a success and
// stop the recursive call chain
runCompleted.close();
return;
}
// request more right away, the Publisher must break the recursion
subscription.value().request(1);
stackDepthCounter.set(stackDepthCounter.get() - 1);
}
@Override
public void onComplete() {
super.onComplete();
runCompleted.close();
}
@Override
public void onError(Throwable cause) {
super.onError(cause);
runCompleted.close();
}
};
try {
env.subscribe(pub, sub);
sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
final String msg = String.format("Unable to validate call stack depth safety, " +
"awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
oneMoreThanBoundedLimit);
runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
env.verifyNoAsyncErrorsNoDelay();
} finally {
// since the request/onNext recursive calls may keep the publisher running "forever",
// we MUST cancel it manually before exiting this test case
sub.cancel();
}
}
});
}
@Override @Test
public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
activePublisherTest(3, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
// override ManualSubscriberWithSubscriptionSupport#cancel because by default a ManualSubscriber will drop the
// subscription once it's cancelled (as expected).
// In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it.
ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
@Override
public void cancel() {
if (subscription.isCompleted()) {
subscription.value().cancel();
} else {
env.flop("Cannot cancel a subscription before having received it");
}
}
};
env.subscribe(pub, sub);
sub.cancel();
sub.request(1);
sub.request(1);
sub.request(1);
sub.expectNone();
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
activePublisherTest(1, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
// leak the Subscription
final Subscription subs = sub.subscription.value();
subs.cancel();
subs.cancel();
subs.cancel();
sub.expectNone();
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
activePublisherTest(10, false, new PublisherTestRun<T>() {
@Override public void run(Publisher<T> pub) throws Throwable {
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.request(0);
sub.expectError(IllegalArgumentException.class);
}
});
}
@Override @Test
public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
activePublisherTest(10, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
final Random r = new Random();
sub.request(-r.nextInt(Integer.MAX_VALUE) - 1);
// we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem
sub.expectError(IllegalArgumentException.class);
}
});
}
@Override @Test
public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable {
optionalActivePublisherTest(10, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
final Random r = new Random();
sub.request(-r.nextInt(Integer.MAX_VALUE) - 1);
// we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem
sub.expectErrorWithMessage(IllegalArgumentException.class, Arrays.asList("3.9", "non-positive subscription request", "negative subscription request"));
}
});
}
@Override @Test
public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
// the publisher is able to signal more elements than the subscriber will be requesting in total
final int publisherElements = 20;
final int demand1 = 10;
final int demand2 = 5;
final int totalDemand = demand1 + demand2;
activePublisherTest(publisherElements, false, new PublisherTestRun<T>() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(Publisher<T> pub) throws Throwable {
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.request(demand1);
sub.request(demand2);
/*
NOTE: The order of the nextElement/cancel calls below is very important (!)
If this ordering was reversed, given an asynchronous publisher,
the following scenario would be *legal* and would break this test:
> AsyncPublisher receives request(10) - it does not emit data right away, it's asynchronous
> AsyncPublisher receives request(5) - demand is now 15
! AsyncPublisher didn't emit any onNext yet (!)
> AsyncPublisher receives cancel() - handles it right away, by "stopping itself" for example
! cancel was handled hefore the AsyncPublisher ever got the chance to emit data
! the subscriber ends up never receiving even one element - the test is stuck (and fails, even on valid Publisher)
Which is why we must first expect an element, and then cancel, once the producing is "running".
*/
sub.nextElement();
sub.cancel();
int onNextsSignalled = 1;
boolean stillBeingSignalled;
do {
// put asyncError if onNext signal received
sub.expectNone();
Throwable error = env.dropAsyncError();
if (error == null) {
stillBeingSignalled = false;
} else {
onNextsSignalled += 1;
stillBeingSignalled = true;
}
// if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw
assertTrue(onNextsSignalled <= totalDemand,
String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d",
onNextsSignalled, totalDemand));
} while (stillBeingSignalled);
}
});
env.verifyNoAsyncErrorsNoDelay();
}
@Override @Test
public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
final ReferenceQueue<ManualSubscriber<T>> queue = new ReferenceQueue<ManualSubscriber<T>>();
final Function<Publisher<T>, WeakReference<ManualSubscriber<T>>> run = new Function<Publisher<T>, WeakReference<ManualSubscriber<T>>>() {
@Override
public WeakReference<ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception {
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
final WeakReference<ManualSubscriber<T>> ref = new WeakReference<ManualSubscriber<T>>(sub, queue);
sub.request(1);
sub.nextElement();
sub.cancel();
return ref;
}
};
activePublisherTest(3, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final WeakReference<ManualSubscriber<T>> ref = run.apply(pub);
// cancel may be run asynchronously so we add a sleep before running the GC
// to "resolve" the race
Thread.sleep(publisherReferenceGCTimeoutMillis);
System.gc();
if (!ref.equals(queue.remove(100))) {
env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub));
}
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
final int totalElements = 3;
activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.request(Long.MAX_VALUE);
sub.nextElements(totalElements);
sub.expectCompletion();
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
final int totalElements = 3;
activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2
sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1
sub.request(1); // pending = Long.MAX_VALUE
sub.nextElements(totalElements);
sub.expectCompletion();
try {
env.verifyNoAsyncErrorsNoDelay();
} finally {
sub.cancel();
}
}
});
}
@Override @Test
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() {
@Override public void run(Publisher<T> pub) throws Throwable {
final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
// arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls,
// so 10 is relatively high and safe even if arbitrarily chosen
int callsCounter = 10;
@Override
public void onNext(T element) {
if (env.debugEnabled()) {
env.debug(String.format("%s::onNext(%s)", this, element));
}
if (subscription.isCompleted()) {
if (callsCounter > 0) {
subscription.value().request(Long.MAX_VALUE - 1);
callsCounter--;
} else {
subscription.value().cancel();
}
} else {
env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
}
}
};
env.subscribe(pub, sub, env.defaultTimeoutMillis());
// eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)`
// we're pretty sure to overflow from those
sub.request(1);
// no onError should be signalled
try {
env.verifyNoAsyncErrors();
} finally {
sub.cancel();
}
}
});
}
///////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
///////////////////// TEST INFRASTRUCTURE /////////////////////////////////
public interface PublisherTestRun<T> {
public void run(Publisher<T> pub) throws Throwable;
}
/**
* Test for feature that SHOULD/MUST be implemented, using a live publisher.
*
* @param elements the number of elements the Publisher under test must be able to emit to run this test
* @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
* If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
* To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
*/
public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
if (elements > maxElementsFromPublisher()) {
throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
} else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
"which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
} else {
Publisher<T> pub = createPublisher(elements);
body.run(pub);
env.verifyNoAsyncErrorsNoDelay();
}
}
/**
* Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
*
* @param elements the number of elements the Publisher under test must be able to emit to run this test
* @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
* If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
* To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
*/
public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
if (elements > maxElementsFromPublisher()) {
throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
} else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
"which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
} else {
final Publisher<T> pub = createPublisher(elements);
final String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement.";
try {
potentiallyPendingTest(pub, body);
} catch (Exception ex) {
notVerified(skipMessage);
} catch (AssertionError ex) {
notVerified(skipMessage + " Reason for skipping was: " + ex.getMessage());
}
}
}
public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE =
"Skipping because no error state Publisher provided, and the test requires it. " +
"Please implement PublisherVerification#createFailedPublisher to run this test.";
public static final String SKIPPING_OPTIONAL_TEST_FAILED =
"Skipping, because provided Publisher does not pass this *additional* verification.";
/**
* Additional test for Publisher in error state
*/
public void whenHasErrorPublisherTest(PublisherTestRun<T> body) throws Throwable {
potentiallyPendingTest(createFailedPublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE);
}
public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable {
potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED);
}
public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable {
if (pub != null) {
body.run(pub);
} else {
throw new SkipException(message);
}
}
/**
* Executes a given test body {@code n} times.
* All the test runs must pass in order for the stochastic test to pass.
*/
public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable {
if (skipStochasticTests()) {
notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!");
}
for (int i = 0; i < n; i++) {
body.apply(i);
}
}
public void notVerified() {
throw new SkipException("Not verified by this TCK.");
}
/**
* Return this value from {@link PublisherVerification#maxElementsFromPublisher()} to mark that the given {@link org.reactivestreams.Publisher},
* is not able to signal completion. For example it is strictly a time-bound or unbounded source of data.
*
* <b>Returning this value from {@link PublisherVerification#maxElementsFromPublisher()} will result in skipping all TCK tests which require onComplete signals!</b>
*/
public long publisherUnableToSignalOnComplete() {
return Long.MAX_VALUE;
}
public void notVerified(String message) {
throw new SkipException(message);
}
}