blob: 62c899a7847e6993683f9752a3944d99377f193f [file] [log] [blame]
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package org.reactivestreams.tck;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
import org.reactivestreams.tck.flow.support.Optional;
import org.reactivestreams.tck.flow.support.SubscriberBlackboxVerificationRules;
import org.reactivestreams.tck.flow.support.TestException;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
import static org.testng.Assert.assertTrue;
/**
* Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
* specification rules, without any modifications to the tested implementation (also known as "Black Box" testing).
*
* This verification is NOT able to check many of the rules of the spec, and if you want more
* verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification}
* instead.
*
* @see org.reactivestreams.Subscriber
* @see org.reactivestreams.Subscription
*/
public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T>
implements SubscriberBlackboxVerificationRules {
protected final TestEnvironment env;
protected SubscriberBlackboxVerification(TestEnvironment env) {
this.env = env;
}
// USER API
/**
* This is the main method you must implement in your test incarnation.
* It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
*/
public abstract Subscriber<T> createSubscriber();
/**
* Override this method if the Subscriber implementation you are verifying
* needs an external signal before it signals demand to its Publisher.
*
* By default this method does nothing.
*/
public void triggerRequest(final Subscriber<? super T> subscriber) {
// this method is intentionally left blank
}
// ENV SETUP
/**
* Executor service used by the default provided asynchronous Publisher.
* @see #createHelperPublisher(long)
*/
private ExecutorService publisherExecutor;
@BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
@AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
@Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
@BeforeMethod
public void setUp() throws Exception {
env.clearAsyncErrors();
}
////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
@Override @Test
public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() throws Throwable {
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws InterruptedException {
triggerRequest(stage.subProxy().sub());
final long requested = stage.expectRequest();// assuming subscriber wants to consume elements...
final long signalsToEmit = Math.min(requested, 512); // protecting against Subscriber which sends ridiculous large demand
// should cope with up to requested number of elements
for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++)
stage.signalNext();
// we complete after `signalsToEmit` (which can be less than `requested`),
// which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2
stage.sendCompletion();
}
/**
* In order to allow some "skid" and not check state on each iteration,
* only check {@code stage.isCancelled} every {@code checkInterval}'th iteration.
*/
private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException {
if (i % checkInterval == 0) return stage.isCancelled();
else return false;
}
});
}
@Override @Test
public void untested_spec202_blackbox_shouldAsynchronouslyDispatch() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws Throwable {
final Subscription subs = new Subscription() {
@Override
public void request(long n) {
final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete");
if (onCompleteStackTraceElement.isDefined()) {
final StackTraceElement stackElem = onCompleteStackTraceElement.get();
env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
}
}
@Override
public void cancel() {
final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete");
if (onCompleteStackElement.isDefined()) {
final StackTraceElement stackElem = onCompleteStackElement.get();
env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
}
}
};
final Subscriber<T> sub = createSubscriber();
sub.onSubscribe(subs);
sub.onComplete();
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws Throwable {
final Subscription subs = new Subscription() {
@Override
public void request(long n) {
Throwable thr = new Throwable();
for (StackTraceElement stackElem : thr.getStackTrace()) {
if (stackElem.getMethodName().equals("onError")) {
env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
}
}
}
@Override
public void cancel() {
Throwable thr = new Throwable();
for (StackTraceElement stackElem : thr.getStackTrace()) {
if (stackElem.getMethodName().equals("onError")) {
env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
}
}
}
};
final Subscriber<T> sub = createSubscriber();
sub.onSubscribe(subs);
sub.onError(new TestException());
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void untested_spec204_blackbox_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
new BlackboxTestStage(env) {{
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
final TestEnvironment.Latch secondSubscriptionCancelled = new TestEnvironment.Latch(env);
sub().onSubscribe(
new Subscription() {
@Override
public void request(long elements) {
env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`!", sub(), elements));
}
@Override
public void cancel() {
secondSubscriptionCancelled.close();
}
@Override
public String toString() {
return "SecondSubscription(should get cancelled)";
}
});
secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
env.verifyNoAsyncErrorsNoDelay();
sendCompletion(); // we're done, complete the subscriber under test
}};
}
@Override @Test
public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void untested_spec207_blackbox_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
// the same thread part of the clause can be verified but that is not very useful, or is it?
}
@Override @Test
public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
notVerified(); // cannot be meaningfully tested as black box, or can it?
}
@Override @Test
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
triggerRequest(stage.subProxy().sub());
final long notUsed = stage.expectRequest(); // received request signal
stage.sub().onComplete();
stage.subProxy().expectCompletion();
}
});
}
@Override @Test
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
final Subscriber<? super T> sub = stage.sub();
sub.onComplete();
stage.subProxy().expectCompletion();
}
});
}
@Override @Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
triggerRequest(stage.subProxy().sub());
final long notUsed = stage.expectRequest(); // received request signal
stage.sub().onError(new TestException()); // in response to that, we fail
stage.subProxy().expectError(Throwable.class);
}
});
}
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
@Override @Test
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void run(BlackboxTestStage stage) throws Throwable {
stage.sub().onError(new TestException());
stage.subProxy().expectError(Throwable.class);
}
});
}
@Override @Test
public void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable {
notVerified(); // cannot be meaningfully tested as black box, or can it?
}
@Override @Test
public void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws Throwable {
{
final Subscriber<T> sub = createSubscriber();
boolean gotNPE = false;
try {
sub.onSubscribe(null);
} catch(final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
}
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws Throwable {
final Subscription subscription = new Subscription() {
@Override public void request(final long elements) {}
@Override public void cancel() {}
};
{
final Subscriber<T> sub = createSubscriber();
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onNext(null);
} catch(final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
}
env.verifyNoAsyncErrorsNoDelay();
}
});
}
@Override @Test
public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws Throwable {
final Subscription subscription = new Subscription() {
@Override public void request(final long elements) {}
@Override public void cancel() {}
};
{
final Subscriber<T> sub = createSubscriber();
boolean gotNPE = false;
sub.onSubscribe(subscription);
try {
sub.onError(null);
} catch(final NullPointerException expected) {
gotNPE = true;
}
assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
}
env.verifyNoAsyncErrorsNoDelay();
}
});
}
////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
@Override @Test
public void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void untested_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
notVerified(); // cannot be meaningfully tested as black box, or can it?
}
@Override @Test
public void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void untested_spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void untested_spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void untested_spec315_blackbox_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
@Override @Test
public void untested_spec316_blackbox_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
}
/////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
/////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
abstract class BlackboxTestStageTestRun {
public abstract void run(BlackboxTestStage stage) throws Throwable;
}
public void blackboxSubscriberTest(BlackboxTestStageTestRun body) throws Throwable {
BlackboxTestStage stage = new BlackboxTestStage(env, true);
body.run(stage);
}
public void blackboxSubscriberWithoutSetupTest(BlackboxTestStageTestRun body) throws Throwable {
BlackboxTestStage stage = new BlackboxTestStage(env, false);
body.run(stage);
}
public class BlackboxTestStage extends ManualPublisher<T> {
public Publisher<T> pub;
public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
public T lastT = null;
private Optional<BlackboxSubscriberProxy<T>> subProxy = Optional.empty();
public BlackboxTestStage(TestEnvironment env) throws InterruptedException {
this(env, true);
}
public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
super(env);
if (runDefaultInit) {
pub = this.createHelperPublisher(Long.MAX_VALUE);
tees = env.newManualSubscriber(pub);
Subscriber<T> sub = createSubscriber();
subProxy = Optional.of(createBlackboxSubscriberProxy(env, sub));
subscribe(subProxy.get());
}
}
public Subscriber<? super T> sub() {
return subscriber.value();
}
/**
* Proxy for the {@link #sub()} {@code Subscriber}, providing certain assertions on methods being called on the Subscriber.
*/
public BlackboxSubscriberProxy<T> subProxy() {
return subProxy.get();
}
public Publisher<T> createHelperPublisher(long elements) {
return SubscriberBlackboxVerification.this.createHelperPublisher(elements);
}
public BlackboxSubscriberProxy<T> createBlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> sub) {
return new BlackboxSubscriberProxy<T>(env, sub);
}
public T signalNext() throws InterruptedException {
T element = nextT();
sendNext(element);
return element;
}
public T nextT() throws InterruptedException {
lastT = tees.requestNextElement();
return lastT;
}
}
public void notVerified() {
throw new SkipException("Not verified using this TCK.");
}
}