| /* |
| * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| package java.util.stream; |
| |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumMap; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.Spliterator; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| |
| import org.testng.annotations.Test; |
| |
| /** |
| * Base class for streams test cases. Provides 'exercise' methods for taking |
| * lambdas that construct and modify streams, and evaluates them in different |
| * ways and asserts that they produce equivalent results. |
| */ |
| @Test |
| public abstract class OpTestCase extends LoggingTestCase { |
| |
| private final Map<StreamShape, Set<? extends BaseStreamTestScenario>> testScenarios; |
| |
| protected OpTestCase() { |
| testScenarios = new EnumMap<>(StreamShape.class); |
| testScenarios.put(StreamShape.REFERENCE, Collections.unmodifiableSet(EnumSet.allOf(StreamTestScenario.class))); |
| testScenarios.put(StreamShape.INT_VALUE, Collections.unmodifiableSet(EnumSet.allOf(IntStreamTestScenario.class))); |
| testScenarios.put(StreamShape.LONG_VALUE, Collections.unmodifiableSet(EnumSet.allOf(LongStreamTestScenario.class))); |
| testScenarios.put(StreamShape.DOUBLE_VALUE, Collections.unmodifiableSet(EnumSet.allOf(DoubleStreamTestScenario.class))); |
| } |
| |
| @SuppressWarnings("rawtypes") |
| public static int getStreamFlags(BaseStream s) { |
| return ((AbstractPipeline) s).getStreamFlags(); |
| } |
| |
| /** |
| * An asserter for results produced when exercising of stream or terminal |
| * tests. |
| * |
| * @param <R> the type of result to assert on |
| */ |
| public interface ResultAsserter<R> { |
| /** |
| * Assert a result produced when exercising of stream or terminal |
| * test. |
| * |
| * @param actual the actual result |
| * @param expected the expected result |
| * @param isOrdered true if the pipeline is ordered |
| * @param isParallel true if the pipeline is parallel |
| */ |
| void assertResult(R actual, R expected, boolean isOrdered, boolean isParallel); |
| } |
| |
| // Exercise stream operations |
| |
| public interface BaseStreamTestScenario { |
| StreamShape getShape(); |
| |
| boolean isParallel(); |
| |
| boolean isOrdered(); |
| |
| <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> |
| void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m); |
| } |
| |
| protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> |
| Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) { |
| return withData(data).stream(m).exercise(); |
| } |
| |
| // Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result |
| // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants |
| @SafeVarargs |
| protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> |
| Collection<U> exerciseOpsMulti(TestData<T, S_IN> data, |
| Function<S_IN, S_OUT>... ms) { |
| Collection<U> result = null; |
| for (Function<S_IN, S_OUT> m : ms) { |
| if (result == null) |
| result = withData(data).stream(m).exercise(); |
| else { |
| Collection<U> r2 = withData(data).stream(m).exercise(); |
| assertEquals(result, r2); |
| } |
| } |
| return result; |
| } |
| |
| // Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result |
| // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same |
| // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams |
| protected final |
| Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data, |
| Function<Stream<Integer>, Stream<Integer>> mRef, |
| Function<IntStream, IntStream> mInt, |
| Function<LongStream, LongStream> mLong, |
| Function<DoubleStream, DoubleStream> mDouble) { |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| Function<Stream<Integer>, Stream<Integer>>[] ms = new Function[4]; |
| ms[0] = mRef; |
| ms[1] = s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e); |
| ms[2] = s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e); |
| ms[3] = s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e); |
| return exerciseOpsMulti(data, ms); |
| } |
| |
| // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result |
| // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants |
| protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> |
| void exerciseTerminalOpsMulti(TestData<T, S_IN> data, |
| R expected, |
| Map<String, Function<S_IN, S_OUT>> streams, |
| Map<String, Function<S_OUT, R>> terminals) { |
| for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) { |
| setContext("Intermediate stream", se.getKey()); |
| for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) { |
| setContext("Terminal stream", te.getKey()); |
| withData(data) |
| .terminal(se.getValue(), te.getValue()) |
| .expectedResult(expected) |
| .exercise(); |
| |
| } |
| } |
| } |
| |
| // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result |
| // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same |
| // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams |
| protected final |
| void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data, |
| Collection<Integer> expected, |
| String desc, |
| Function<Stream<Integer>, Stream<Integer>> mRef, |
| Function<IntStream, IntStream> mInt, |
| Function<LongStream, LongStream> mLong, |
| Function<DoubleStream, DoubleStream> mDouble, |
| Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) { |
| |
| Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>(); |
| m.put("Ref " + desc, mRef); |
| m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e)); |
| m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e)); |
| m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e)); |
| |
| exerciseTerminalOpsMulti(data, expected, m, terminals); |
| } |
| |
| |
| protected <T, U, S_OUT extends BaseStream<U, S_OUT>> |
| Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) { |
| TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); |
| return withData(data1).stream(m).exercise(); |
| } |
| |
| protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>> |
| Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) { |
| TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); |
| return withData(data1).stream(m).expectedResult(expected).exercise(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected <U, S_OUT extends BaseStream<U, S_OUT>> |
| Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) { |
| return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise(); |
| } |
| |
| protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) { |
| TestData.OfInt data1 = TestData.Factory.ofArray("int array", data); |
| return withData(data1).stream(m).expectedResult(expected).exercise(); |
| } |
| |
| protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) { |
| Objects.requireNonNull(data); |
| return new DataStreamBuilder<>(data); |
| } |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| public class DataStreamBuilder<T, S_IN extends BaseStream<T, S_IN>> { |
| final TestData<T, S_IN> data; |
| |
| private DataStreamBuilder(TestData<T, S_IN> data) { |
| this.data = Objects.requireNonNull(data); |
| } |
| |
| public <U, S_OUT extends BaseStream<U, S_OUT>> |
| ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> ops(IntermediateTestOp... ops) { |
| return new ExerciseDataStreamBuilder<>(data, (S_IN s) -> (S_OUT) chain(s, ops)); |
| } |
| |
| public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> |
| stream(Function<S_IN, S_OUT> m) { |
| return new ExerciseDataStreamBuilder<>(data, m); |
| } |
| |
| public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> |
| stream(Function<S_IN, S_OUT> m, IntermediateTestOp<U, U> additionalOp) { |
| return new ExerciseDataStreamBuilder<>(data, s -> (S_OUT) chain(m.apply(s), additionalOp)); |
| } |
| |
| public <R> ExerciseDataTerminalBuilder<T, T, R, S_IN, S_IN> |
| terminal(Function<S_IN, R> terminalF) { |
| return new ExerciseDataTerminalBuilder<>(data, s -> s, terminalF); |
| } |
| |
| public <U, R, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> |
| terminal(Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) { |
| return new ExerciseDataTerminalBuilder<>(data, streamF, terminalF); |
| } |
| } |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| public class ExerciseDataStreamBuilder<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> { |
| final TestData<T, S_IN> data; |
| final Function<S_IN, S_OUT> m; |
| final StreamShape shape; |
| |
| Set<BaseStreamTestScenario> testSet = new HashSet<>(); |
| |
| Collection<U> refResult; |
| |
| Consumer<TestData<T, S_IN>> before = LambdaTestHelpers.bEmpty; |
| |
| Consumer<TestData<T, S_IN>> after = LambdaTestHelpers.bEmpty; |
| |
| ResultAsserter<Iterable<U>> resultAsserter = (act, exp, ord, par) -> { |
| if (par & !ord) { |
| LambdaTestHelpers.assertContentsUnordered(act, exp); |
| } |
| else { |
| LambdaTestHelpers.assertContentsEqual(act, exp); |
| } |
| }; |
| |
| private ExerciseDataStreamBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) { |
| this.data = data; |
| |
| this.m = Objects.requireNonNull(m); |
| |
| this.shape = ((AbstractPipeline<?, U, ?>) m.apply(data.stream())).getOutputShape(); |
| |
| // Have to initiate from the output shape of the last stream |
| // This means the stream mapper is required first rather than last |
| testSet.addAll(testScenarios.get(shape)); |
| } |
| |
| // |
| |
| public <I extends Iterable<U>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(I expectedResult) { |
| List<U> l = new ArrayList<>(); |
| expectedResult.forEach(l::add); |
| refResult = l; |
| return this; |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(int[] expectedResult) { |
| List l = new ArrayList(); |
| for (int anExpectedResult : expectedResult) { |
| l.add(anExpectedResult); |
| } |
| refResult = l; |
| return this; |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(long[] expectedResult) { |
| List l = new ArrayList(); |
| for (long anExpectedResult : expectedResult) { |
| l.add(anExpectedResult); |
| } |
| refResult = l; |
| return this; |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(double[] expectedResult) { |
| List l = new ArrayList(); |
| for (double anExpectedResult : expectedResult) { |
| l.add(anExpectedResult); |
| } |
| refResult = l; |
| return this; |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> before(Consumer<TestData<T, S_IN>> before) { |
| this.before = Objects.requireNonNull(before); |
| return this; |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> after(Consumer<TestData<T, S_IN>> after) { |
| this.after = Objects.requireNonNull(after); |
| return this; |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(BaseStreamTestScenario... tests) { |
| return without(Arrays.asList(tests)); |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(Collection<? extends BaseStreamTestScenario> tests) { |
| for (BaseStreamTestScenario ts : tests) { |
| if (ts.getShape() == shape) { |
| testSet.remove(ts); |
| } |
| } |
| |
| if (testSet.isEmpty()) { |
| throw new IllegalStateException("Test scenario set is empty"); |
| } |
| |
| return this; |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(BaseStreamTestScenario... tests) { |
| return with(Arrays.asList(tests)); |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(Collection<? extends BaseStreamTestScenario> tests) { |
| testSet = new HashSet<>(); |
| |
| for (BaseStreamTestScenario ts : tests) { |
| if (ts.getShape() == shape) { |
| testSet.add(ts); |
| } |
| } |
| |
| if (testSet.isEmpty()) { |
| throw new IllegalStateException("Test scenario set is empty"); |
| } |
| |
| return this; |
| } |
| |
| public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> resultAsserter(ResultAsserter<Iterable<U>> resultAsserter) { |
| this.resultAsserter = resultAsserter; |
| return this; |
| } |
| |
| // Build method |
| |
| public Collection<U> exercise() { |
| final boolean isStreamOrdered; |
| if (refResult == null) { |
| // Induce the reference result |
| before.accept(data); |
| S_OUT sOut = m.apply(data.stream()); |
| isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); |
| Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]); |
| refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator()); |
| after.accept(data); |
| } |
| else { |
| S_OUT sOut = m.apply(data.stream()); |
| isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); |
| } |
| |
| List<Error> errors = new ArrayList<>(); |
| for (BaseStreamTestScenario test : testSet) { |
| try { |
| before.accept(data); |
| |
| List<U> result = new ArrayList<>(); |
| test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m); |
| |
| Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel()); |
| |
| if (refResult.size() > 1000) { |
| LambdaTestHelpers.launderAssertion( |
| asserter, |
| () -> String.format("%n%s: [actual size=%d] != [expected size=%d]", test, result.size(), refResult.size())); |
| } |
| else { |
| LambdaTestHelpers.launderAssertion( |
| asserter, |
| () -> String.format("%n%s: [actual] %s != [expected] %s", test, result, refResult)); |
| } |
| |
| after.accept(data); |
| } catch (Throwable t) { |
| errors.add(new Error(String.format("%s: %s", test, t), t)); |
| } |
| } |
| |
| if (!errors.isEmpty()) { |
| StringBuilder sb = new StringBuilder(); |
| int i = 1; |
| for (Error t : errors) { |
| sb.append(i++).append(": "); |
| if (t instanceof AssertionError) { |
| sb.append(t).append("\n"); |
| } |
| else { |
| StringWriter sw = new StringWriter(); |
| PrintWriter pw = new PrintWriter(sw); |
| |
| t.getCause().printStackTrace(pw); |
| pw.flush(); |
| sb.append(t).append("\n").append(sw); |
| } |
| } |
| sb.append("--"); |
| |
| fail(String.format("%d failure(s) for test data: %s\n%s", i - 1, data.toString(), sb)); |
| } |
| |
| return refResult; |
| } |
| } |
| |
| // Exercise terminal operations |
| |
| interface BaseTerminalTestScenario<U, R, S_OUT extends BaseStream<U, S_OUT>> { |
| boolean requiresSingleStageSource(); |
| |
| boolean requiresParallelSource(); |
| |
| default R run(Function<S_OUT, R> terminalF, S_OUT source, StreamShape shape) { |
| return terminalF.apply(source); |
| } |
| } |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| enum TerminalTestScenario implements BaseTerminalTestScenario { |
| SINGLE_SEQUENTIAL(true, false), |
| |
| SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) { |
| @Override |
| public Object run(Function terminalF, BaseStream source, StreamShape shape) { |
| source = (BaseStream) chain(source, new ShortCircuitOp(shape)); |
| return terminalF.apply(source); |
| } |
| }, |
| |
| SINGLE_PARALLEL(true, true), |
| |
| ALL_SEQUENTIAL(false, false), |
| |
| ALL_SEQUENTIAL_SHORT_CIRCUIT(false, false) { |
| @Override |
| public Object run(Function terminalF, BaseStream source, StreamShape shape) { |
| source = (BaseStream) chain(source, new ShortCircuitOp(shape)); |
| return terminalF.apply(source); |
| } |
| }, |
| |
| ALL_PARALLEL(false, true), |
| |
| ALL_PARALLEL_SEQUENTIAL(false, false) { |
| @Override |
| public Object run(Function terminalF, BaseStream source, StreamShape shape) { |
| return terminalF.apply(source.sequential()); |
| } |
| }, |
| ; |
| |
| private final boolean requiresSingleStageSource; |
| private final boolean isParallel; |
| |
| TerminalTestScenario(boolean requiresSingleStageSource, boolean isParallel) { |
| this.requiresSingleStageSource = requiresSingleStageSource; |
| this.isParallel = isParallel; |
| } |
| |
| @Override |
| public boolean requiresSingleStageSource() { |
| return requiresSingleStageSource; |
| } |
| |
| @Override |
| public boolean requiresParallelSource() { |
| return isParallel; |
| } |
| |
| } |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| public class ExerciseDataTerminalBuilder<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> { |
| final TestData<T, S_IN> data; |
| final Function<S_IN, S_OUT> streamF; |
| final Function<S_OUT, R> terminalF; |
| |
| R refResult; |
| |
| ResultAsserter<R> resultAsserter = (act, exp, ord, par) -> LambdaTestHelpers.assertContentsEqual(act, exp); |
| |
| private ExerciseDataTerminalBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) { |
| this.data = data; |
| this.streamF = Objects.requireNonNull(streamF); |
| this.terminalF = Objects.requireNonNull(terminalF); |
| } |
| |
| // |
| |
| public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> expectedResult(R expectedResult) { |
| this.refResult = expectedResult; |
| return this; |
| } |
| |
| public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> equalator(BiConsumer<R, R> equalityAsserter) { |
| resultAsserter = (act, exp, ord, par) -> equalityAsserter.accept(act, exp); |
| return this; |
| } |
| |
| public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> resultAsserter(ResultAsserter<R> resultAsserter) { |
| this.resultAsserter = resultAsserter; |
| return this; |
| } |
| |
| // Build method |
| |
| public R exercise() { |
| S_OUT out = streamF.apply(data.stream()).sequential(); |
| AbstractPipeline ap = (AbstractPipeline) out; |
| boolean isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags()); |
| StreamShape shape = ap.getOutputShape(); |
| |
| EnumSet<TerminalTestScenario> tests = EnumSet.allOf(TerminalTestScenario.class); |
| // Sequentially collect the output that will be input to the terminal op |
| Node<U> node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]); |
| if (refResult == null) { |
| // Induce the reference result |
| S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(), |
| StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED, |
| false); |
| |
| refResult = (R) TerminalTestScenario.SINGLE_SEQUENTIAL.run(terminalF, source, shape); |
| tests.remove(TerminalTestScenario.SINGLE_SEQUENTIAL); |
| } |
| |
| for (BaseTerminalTestScenario test : tests) { |
| S_OUT source; |
| if (test.requiresSingleStageSource()) { |
| source = (S_OUT) createPipeline(shape, node.spliterator(), |
| StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED, |
| test.requiresParallelSource()); |
| } |
| else { |
| source = streamF.apply(test.requiresParallelSource() |
| ? data.parallelStream() : data.stream()); |
| } |
| |
| R result = (R) test.run(terminalF, source, shape); |
| |
| LambdaTestHelpers.launderAssertion( |
| () -> resultAsserter.assertResult(result, refResult, isOrdered, test.requiresParallelSource()), |
| () -> String.format("%s: %s != %s", test, refResult, result)); |
| } |
| |
| return refResult; |
| } |
| |
| AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags, boolean parallel) { |
| switch (shape) { |
| case REFERENCE: return new ReferencePipeline.Head<>(s, flags, parallel); |
| case INT_VALUE: return new IntPipeline.Head(s, flags, parallel); |
| case LONG_VALUE: return new LongPipeline.Head(s, flags, parallel); |
| case DOUBLE_VALUE: return new DoublePipeline.Head(s, flags, parallel); |
| default: throw new IllegalStateException("Unknown shape: " + shape); |
| } |
| } |
| } |
| |
| protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) { |
| TestData.OfRef<T> data1 |
| = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); |
| return withData(data1).terminal(m).expectedResult(expected).exercise(); |
| } |
| |
| protected <T, R, S_IN extends BaseStream<T, S_IN>> R |
| exerciseTerminalOps(TestData<T, S_IN> data, |
| Function<S_IN, R> terminalF) { |
| return withData(data).terminal(terminalF).exercise(); |
| } |
| |
| protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R |
| exerciseTerminalOps(TestData<T, S_IN> data, |
| Function<S_IN, S_OUT> streamF, |
| Function<S_OUT, R> terminalF) { |
| return withData(data).terminal(streamF, terminalF).exercise(); |
| } |
| |
| // |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| private static <T> AbstractPipeline<?, T, ?> chain(AbstractPipeline upstream, IntermediateTestOp<?, T> op) { |
| return (AbstractPipeline<?, T, ?>) IntermediateTestOp.chain(upstream, op); |
| } |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| private static AbstractPipeline<?, ?, ?> chain(AbstractPipeline pipe, IntermediateTestOp... ops) { |
| for (IntermediateTestOp op : ops) |
| pipe = chain(pipe, op); |
| return pipe; |
| } |
| |
| @SuppressWarnings("rawtypes") |
| private static <T> AbstractPipeline<?, T, ?> chain(BaseStream pipe, IntermediateTestOp<?, T> op) { |
| return chain((AbstractPipeline) pipe, op); |
| } |
| |
| @SuppressWarnings("rawtypes") |
| public static AbstractPipeline<?, ?, ?> chain(BaseStream pipe, IntermediateTestOp... ops) { |
| return chain((AbstractPipeline) pipe, ops); |
| } |
| |
| // Test data |
| |
| static class ShortCircuitOp<T> implements StatelessTestOp<T,T> { |
| private final StreamShape shape; |
| |
| ShortCircuitOp(StreamShape shape) { |
| this.shape = shape; |
| } |
| |
| @Override |
| public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) { |
| return sink; |
| } |
| |
| @Override |
| public int opGetFlags() { |
| return StreamOpFlag.IS_SHORT_CIRCUIT; |
| } |
| |
| @Override |
| public StreamShape outputShape() { |
| return shape; |
| } |
| |
| @Override |
| public StreamShape inputShape() { |
| return shape; |
| } |
| } |
| } |