| /* |
| * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| package java.util.stream; |
| |
| import java.util.Comparator; |
| import java.util.Objects; |
| import java.util.Spliterator; |
| import java.util.concurrent.CountedCompleter; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.function.Consumer; |
| import java.util.function.DoubleConsumer; |
| import java.util.function.DoublePredicate; |
| import java.util.function.IntConsumer; |
| import java.util.function.IntFunction; |
| import java.util.function.IntPredicate; |
| import java.util.function.LongConsumer; |
| import java.util.function.LongPredicate; |
| import java.util.function.Predicate; |
| |
| /** |
| * Factory for instances of a takeWhile and dropWhile operations |
| * that produce subsequences of their input stream. |
| * |
| * @since 1.9 |
| */ |
| final class WhileOps { |
| |
| static final int TAKE_FLAGS = StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SHORT_CIRCUIT; |
| |
| static final int DROP_FLAGS = StreamOpFlag.NOT_SIZED; |
| |
| /** |
| * Appends a "takeWhile" operation to the provided Stream. |
| * |
| * @param <T> the type of both input and output elements |
| * @param upstream a reference stream with element type T |
| * @param predicate the predicate that returns false to halt taking. |
| */ |
| static <T> Stream<T> makeTakeWhileRef(AbstractPipeline<?, T, ?> upstream, |
| Predicate<? super T> predicate) { |
| Objects.requireNonNull(predicate); |
| return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, TAKE_FLAGS) { |
| @Override |
| <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, |
| Spliterator<P_IN> spliterator) { |
| if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
| return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) |
| .spliterator(); |
| } |
| else { |
| return new UnorderedWhileSpliterator.OfRef.Taking<>( |
| helper.wrapSpliterator(spliterator), false, predicate); |
| } |
| } |
| |
| @Override |
| <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<T[]> generator) { |
| return new TakeWhileTask<>(this, helper, spliterator, generator) |
| .invoke(); |
| } |
| |
| @Override |
| Sink<T> opWrapSink(int flags, Sink<T> sink) { |
| return new Sink.ChainedReference<T, T>(sink) { |
| boolean take = true; |
| |
| @Override |
| public void begin(long size) { |
| downstream.begin(-1); |
| } |
| |
| @Override |
| public void accept(T t) { |
| if (take = predicate.test(t)) { |
| downstream.accept(t); |
| } |
| } |
| |
| @Override |
| public boolean cancellationRequested() { |
| return !take || downstream.cancellationRequested(); |
| } |
| }; |
| } |
| }; |
| } |
| |
| /** |
| * Appends a "takeWhile" operation to the provided IntStream. |
| * |
| * @param upstream a reference stream with element type T |
| * @param predicate the predicate that returns false to halt taking. |
| */ |
| static IntStream makeTakeWhileInt(AbstractPipeline<?, Integer, ?> upstream, |
| IntPredicate predicate) { |
| Objects.requireNonNull(predicate); |
| return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, TAKE_FLAGS) { |
| @Override |
| <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, |
| Spliterator<P_IN> spliterator) { |
| if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
| return opEvaluateParallel(helper, spliterator, Integer[]::new) |
| .spliterator(); |
| } |
| else { |
| return new UnorderedWhileSpliterator.OfInt.Taking( |
| (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); |
| } |
| } |
| |
| @Override |
| <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<Integer[]> generator) { |
| return new TakeWhileTask<>(this, helper, spliterator, generator) |
| .invoke(); |
| } |
| |
| @Override |
| Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
| return new Sink.ChainedInt<Integer>(sink) { |
| boolean take = true; |
| |
| @Override |
| public void begin(long size) { |
| downstream.begin(-1); |
| } |
| |
| @Override |
| public void accept(int t) { |
| if (take = predicate.test(t)) { |
| downstream.accept(t); |
| } |
| } |
| |
| @Override |
| public boolean cancellationRequested() { |
| return !take || downstream.cancellationRequested(); |
| } |
| }; |
| } |
| }; |
| } |
| |
| /** |
| * Appends a "takeWhile" operation to the provided LongStream. |
| * |
| * @param upstream a reference stream with element type T |
| * @param predicate the predicate that returns false to halt taking. |
| */ |
| static LongStream makeTakeWhileLong(AbstractPipeline<?, Long, ?> upstream, |
| LongPredicate predicate) { |
| Objects.requireNonNull(predicate); |
| return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, TAKE_FLAGS) { |
| @Override |
| <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, |
| Spliterator<P_IN> spliterator) { |
| if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
| return opEvaluateParallel(helper, spliterator, Long[]::new) |
| .spliterator(); |
| } |
| else { |
| return new UnorderedWhileSpliterator.OfLong.Taking( |
| (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); |
| } |
| } |
| |
| @Override |
| <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<Long[]> generator) { |
| return new TakeWhileTask<>(this, helper, spliterator, generator) |
| .invoke(); |
| } |
| |
| @Override |
| Sink<Long> opWrapSink(int flags, Sink<Long> sink) { |
| return new Sink.ChainedLong<Long>(sink) { |
| boolean take = true; |
| |
| @Override |
| public void begin(long size) { |
| downstream.begin(-1); |
| } |
| |
| @Override |
| public void accept(long t) { |
| if (take = predicate.test(t)) { |
| downstream.accept(t); |
| } |
| } |
| |
| @Override |
| public boolean cancellationRequested() { |
| return !take || downstream.cancellationRequested(); |
| } |
| }; |
| } |
| }; |
| } |
| |
| /** |
| * Appends a "takeWhile" operation to the provided DoubleStream. |
| * |
| * @param upstream a reference stream with element type T |
| * @param predicate the predicate that returns false to halt taking. |
| */ |
| static DoubleStream makeTakeWhileDouble(AbstractPipeline<?, Double, ?> upstream, |
| DoublePredicate predicate) { |
| Objects.requireNonNull(predicate); |
| return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, TAKE_FLAGS) { |
| @Override |
| <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, |
| Spliterator<P_IN> spliterator) { |
| if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
| return opEvaluateParallel(helper, spliterator, Double[]::new) |
| .spliterator(); |
| } |
| else { |
| return new UnorderedWhileSpliterator.OfDouble.Taking( |
| (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); |
| } |
| } |
| |
| @Override |
| <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<Double[]> generator) { |
| return new TakeWhileTask<>(this, helper, spliterator, generator) |
| .invoke(); |
| } |
| |
| @Override |
| Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
| return new Sink.ChainedDouble<Double>(sink) { |
| boolean take = true; |
| |
| @Override |
| public void begin(long size) { |
| downstream.begin(-1); |
| } |
| |
| @Override |
| public void accept(double t) { |
| if (take = predicate.test(t)) { |
| downstream.accept(t); |
| } |
| } |
| |
| @Override |
| public boolean cancellationRequested() { |
| return !take || downstream.cancellationRequested(); |
| } |
| }; |
| } |
| }; |
| } |
| |
| /** |
| * A specialization for the dropWhile operation that controls if |
| * elements to be dropped are counted and passed downstream. |
| * <p> |
| * This specialization is utilized by the {@link TakeWhileTask} for |
| * pipelines that are ordered. In such cases elements cannot be dropped |
| * until all elements have been collected. |
| * |
| * @param <T> the type of both input and output elements |
| */ |
| interface DropWhileOp<T> { |
| /** |
| * Accepts a {@code Sink} which will receive the results of this |
| * dropWhile operation, and return a {@code DropWhileSink} which |
| * accepts |
| * elements and which performs the dropWhile operation passing the |
| * results to the provided {@code Sink}. |
| * |
| * @param sink sink to which elements should be sent after processing |
| * @param retainAndCountDroppedElements true if elements to be dropped |
| * are counted and passed to the sink, otherwise such elements |
| * are actually dropped and not passed to the sink. |
| * @return a dropWhile sink |
| */ |
| DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements); |
| } |
| |
| /** |
| * A specialization for a dropWhile sink. |
| * |
| * @param <T> the type of both input and output elements |
| */ |
| interface DropWhileSink<T> extends Sink<T> { |
| /** |
| * @return the could of elements that would have been dropped and |
| * instead were passed downstream. |
| */ |
| long getDropCount(); |
| } |
| |
| /** |
| * Appends a "dropWhile" operation to the provided Stream. |
| * |
| * @param <T> the type of both input and output elements |
| * @param upstream a reference stream with element type T |
| * @param predicate the predicate that returns false to halt dropping. |
| */ |
| static <T> Stream<T> makeDropWhileRef(AbstractPipeline<?, T, ?> upstream, |
| Predicate<? super T> predicate) { |
| Objects.requireNonNull(predicate); |
| |
| class Op extends ReferencePipeline.StatefulOp<T, T> implements DropWhileOp<T> { |
| public Op(AbstractPipeline<?, T, ?> upstream, StreamShape inputShape, int opFlags) { |
| super(upstream, inputShape, opFlags); |
| } |
| |
| @Override |
| <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, |
| Spliterator<P_IN> spliterator) { |
| if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
| return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) |
| .spliterator(); |
| } |
| else { |
| return new UnorderedWhileSpliterator.OfRef.Dropping<>( |
| helper.wrapSpliterator(spliterator), false, predicate); |
| } |
| } |
| |
| @Override |
| <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<T[]> generator) { |
| return new DropWhileTask<>(this, helper, spliterator, generator) |
| .invoke(); |
| } |
| |
| @Override |
| Sink<T> opWrapSink(int flags, Sink<T> sink) { |
| return opWrapSink(sink, false); |
| } |
| |
| public DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements) { |
| class OpSink extends Sink.ChainedReference<T, T> implements DropWhileSink<T> { |
| long dropCount; |
| boolean take; |
| |
| OpSink() { |
| super(sink); |
| } |
| |
| @Override |
| public void accept(T t) { |
| boolean takeElement = take || (take = !predicate.test(t)); |
| |
| // If ordered and element is dropped increment index |
| // for possible future truncation |
| if (retainAndCountDroppedElements && !takeElement) |
| dropCount++; |
| |
| // If ordered need to process element, otherwise |
| // skip if element is dropped |
| if (retainAndCountDroppedElements || takeElement) |
| downstream.accept(t); |
| } |
| |
| @Override |
| public long getDropCount() { |
| return dropCount; |
| } |
| } |
| return new OpSink(); |
| } |
| } |
| return new Op(upstream, StreamShape.REFERENCE, DROP_FLAGS); |
| } |
| |
| /** |
| * Appends a "dropWhile" operation to the provided IntStream. |
| * |
| * @param upstream a reference stream with element type T |
| * @param predicate the predicate that returns false to halt dropping. |
| */ |
| static IntStream makeDropWhileInt(AbstractPipeline<?, Integer, ?> upstream, |
| IntPredicate predicate) { |
| Objects.requireNonNull(predicate); |
| class Op extends IntPipeline.StatefulOp<Integer> implements DropWhileOp<Integer> { |
| public Op(AbstractPipeline<?, Integer, ?> upstream, StreamShape inputShape, int opFlags) { |
| super(upstream, inputShape, opFlags); |
| } |
| |
| @Override |
| <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, |
| Spliterator<P_IN> spliterator) { |
| if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
| return opEvaluateParallel(helper, spliterator, Integer[]::new) |
| .spliterator(); |
| } |
| else { |
| return new UnorderedWhileSpliterator.OfInt.Dropping( |
| (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); |
| } |
| } |
| |
| @Override |
| <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<Integer[]> generator) { |
| return new DropWhileTask<>(this, helper, spliterator, generator) |
| .invoke(); |
| } |
| |
| @Override |
| Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
| return opWrapSink(sink, false); |
| } |
| |
| public DropWhileSink<Integer> opWrapSink(Sink<Integer> sink, boolean retainAndCountDroppedElements) { |
| class OpSink extends Sink.ChainedInt<Integer> implements DropWhileSink<Integer> { |
| long dropCount; |
| boolean take; |
| |
| OpSink() { |
| super(sink); |
| } |
| |
| @Override |
| public void accept(int t) { |
| boolean takeElement = take || (take = !predicate.test(t)); |
| |
| // If ordered and element is dropped increment index |
| // for possible future truncation |
| if (retainAndCountDroppedElements && !takeElement) |
| dropCount++; |
| |
| // If ordered need to process element, otherwise |
| // skip if element is dropped |
| if (retainAndCountDroppedElements || takeElement) |
| downstream.accept(t); |
| } |
| |
| @Override |
| public long getDropCount() { |
| return dropCount; |
| } |
| } |
| return new OpSink(); |
| } |
| } |
| return new Op(upstream, StreamShape.INT_VALUE, DROP_FLAGS); |
| } |
| |
| /** |
| * Appends a "dropWhile" operation to the provided LongStream. |
| * |
| * @param upstream a reference stream with element type T |
| * @param predicate the predicate that returns false to halt dropping. |
| */ |
| static LongStream makeDropWhileLong(AbstractPipeline<?, Long, ?> upstream, |
| LongPredicate predicate) { |
| Objects.requireNonNull(predicate); |
| class Op extends LongPipeline.StatefulOp<Long> implements DropWhileOp<Long> { |
| public Op(AbstractPipeline<?, Long, ?> upstream, StreamShape inputShape, int opFlags) { |
| super(upstream, inputShape, opFlags); |
| } |
| |
| @Override |
| <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, |
| Spliterator<P_IN> spliterator) { |
| if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
| return opEvaluateParallel(helper, spliterator, Long[]::new) |
| .spliterator(); |
| } |
| else { |
| return new UnorderedWhileSpliterator.OfLong.Dropping( |
| (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); |
| } |
| } |
| |
| @Override |
| <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<Long[]> generator) { |
| return new DropWhileTask<>(this, helper, spliterator, generator) |
| .invoke(); |
| } |
| |
| @Override |
| Sink<Long> opWrapSink(int flags, Sink<Long> sink) { |
| return opWrapSink(sink, false); |
| } |
| |
| public DropWhileSink<Long> opWrapSink(Sink<Long> sink, boolean retainAndCountDroppedElements) { |
| class OpSink extends Sink.ChainedLong<Long> implements DropWhileSink<Long> { |
| long dropCount; |
| boolean take; |
| |
| OpSink() { |
| super(sink); |
| } |
| |
| @Override |
| public void accept(long t) { |
| boolean takeElement = take || (take = !predicate.test(t)); |
| |
| // If ordered and element is dropped increment index |
| // for possible future truncation |
| if (retainAndCountDroppedElements && !takeElement) |
| dropCount++; |
| |
| // If ordered need to process element, otherwise |
| // skip if element is dropped |
| if (retainAndCountDroppedElements || takeElement) |
| downstream.accept(t); |
| } |
| |
| @Override |
| public long getDropCount() { |
| return dropCount; |
| } |
| } |
| return new OpSink(); |
| } |
| } |
| return new Op(upstream, StreamShape.LONG_VALUE, DROP_FLAGS); |
| } |
| |
| /** |
| * Appends a "dropWhile" operation to the provided DoubleStream. |
| * |
| * @param upstream a reference stream with element type T |
| * @param predicate the predicate that returns false to halt dropping. |
| */ |
| static DoubleStream makeDropWhileDouble(AbstractPipeline<?, Double, ?> upstream, |
| DoublePredicate predicate) { |
| Objects.requireNonNull(predicate); |
| class Op extends DoublePipeline.StatefulOp<Double> implements DropWhileOp<Double> { |
| public Op(AbstractPipeline<?, Double, ?> upstream, StreamShape inputShape, int opFlags) { |
| super(upstream, inputShape, opFlags); |
| } |
| |
| @Override |
| <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, |
| Spliterator<P_IN> spliterator) { |
| if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
| return opEvaluateParallel(helper, spliterator, Double[]::new) |
| .spliterator(); |
| } |
| else { |
| return new UnorderedWhileSpliterator.OfDouble.Dropping( |
| (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); |
| } |
| } |
| |
| @Override |
| <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<Double[]> generator) { |
| return new DropWhileTask<>(this, helper, spliterator, generator) |
| .invoke(); |
| } |
| |
| @Override |
| Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
| return opWrapSink(sink, false); |
| } |
| |
| public DropWhileSink<Double> opWrapSink(Sink<Double> sink, boolean retainAndCountDroppedElements) { |
| class OpSink extends Sink.ChainedDouble<Double> implements DropWhileSink<Double> { |
| long dropCount; |
| boolean take; |
| |
| OpSink() { |
| super(sink); |
| } |
| |
| @Override |
| public void accept(double t) { |
| boolean takeElement = take || (take = !predicate.test(t)); |
| |
| // If ordered and element is dropped increment index |
| // for possible future truncation |
| if (retainAndCountDroppedElements && !takeElement) |
| dropCount++; |
| |
| // If ordered need to process element, otherwise |
| // skip if element is dropped |
| if (retainAndCountDroppedElements || takeElement) |
| downstream.accept(t); |
| } |
| |
| @Override |
| public long getDropCount() { |
| return dropCount; |
| } |
| } |
| return new OpSink(); |
| } |
| } |
| return new Op(upstream, StreamShape.DOUBLE_VALUE, DROP_FLAGS); |
| } |
| |
| // |
| |
| /** |
| * A spliterator supporting takeWhile and dropWhile operations over an |
| * underlying spliterator whose covered elements have no encounter order. |
| * <p> |
| * Concrete subclasses of this spliterator support reference and primitive |
| * types for takeWhile and dropWhile. |
| * <p> |
| * For the takeWhile operation if during traversal taking completes then |
| * taking is cancelled globally for the splitting and traversal of all |
| * related spliterators. |
| * Cancellation is governed by a shared {@link AtomicBoolean} instance. A |
| * spliterator in the process of taking when cancellation occurs will also |
| * be cancelled but not necessarily immediately. To reduce contention on |
| * the {@link AtomicBoolean} instance, cancellation make be acted on after |
| * a small number of additional elements have been traversed. |
| * <p> |
| * For the dropWhile operation if during traversal dropping completes for |
| * some, but not all elements, then it is cancelled globally for the |
| * traversal of all related spliterators (splitting is not cancelled). |
| * Cancellation is governed in the same manner as for the takeWhile |
| * operation. |
| * |
| * @param <T> the type of elements returned by this spliterator |
| * @param <T_SPLITR> the type of the spliterator |
| */ |
| abstract static class UnorderedWhileSpliterator<T, T_SPLITR extends Spliterator<T>> implements Spliterator<T> { |
| // Power of two constant minus one used for modulus of count |
| static final int CANCEL_CHECK_COUNT = (1 << 6) - 1; |
| |
| // The underlying spliterator |
| final T_SPLITR s; |
| // True if no splitting should be performed, if true then |
| // this spliterator may be used for an underlying spliterator whose |
| // covered elements have an encounter order |
| // See use in stream take/dropWhile default default methods |
| final boolean noSplitting; |
| // True when operations are cancelled for all related spliterators |
| // For taking, spliterators cannot split or traversed |
| // For dropping, spliterators cannot be traversed |
| final AtomicBoolean cancel; |
| // True while taking or dropping should be performed when traversing |
| boolean takeOrDrop = true; |
| // The count of elements traversed |
| int count; |
| |
| UnorderedWhileSpliterator(T_SPLITR s, boolean noSplitting) { |
| this.s = s; |
| this.noSplitting = noSplitting; |
| this.cancel = new AtomicBoolean(); |
| } |
| |
| UnorderedWhileSpliterator(T_SPLITR s, UnorderedWhileSpliterator<T, T_SPLITR> parent) { |
| this.s = s; |
| this.noSplitting = parent.noSplitting; |
| this.cancel = parent.cancel; |
| } |
| |
| @Override |
| public long estimateSize() { |
| return s.estimateSize(); |
| } |
| |
| @Override |
| public int characteristics() { |
| // Size is not known |
| return s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED); |
| } |
| |
| @Override |
| public long getExactSizeIfKnown() { |
| return -1L; |
| } |
| |
| @Override |
| public Comparator<? super T> getComparator() { |
| return s.getComparator(); |
| } |
| |
| @Override |
| public T_SPLITR trySplit() { |
| @SuppressWarnings("unchecked") |
| T_SPLITR ls = noSplitting ? null : (T_SPLITR) s.trySplit(); |
| return ls != null ? makeSpliterator(ls) : null; |
| } |
| |
| boolean checkCancelOnCount() { |
| return count != 0 || !cancel.get(); |
| } |
| |
| abstract T_SPLITR makeSpliterator(T_SPLITR s); |
| |
| abstract static class OfRef<T> extends UnorderedWhileSpliterator<T, Spliterator<T>> implements Consumer<T> { |
| final Predicate<? super T> p; |
| T t; |
| |
| OfRef(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { |
| super(s, noSplitting); |
| this.p = p; |
| } |
| |
| OfRef(Spliterator<T> s, OfRef<T> parent) { |
| super(s, parent); |
| this.p = parent.p; |
| } |
| |
| @Override |
| public void accept(T t) { |
| count = (count + 1) & CANCEL_CHECK_COUNT; |
| this.t = t; |
| } |
| |
| static final class Taking<T> extends OfRef<T> { |
| Taking(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { |
| super(s, noSplitting, p); |
| } |
| |
| Taking(Spliterator<T> s, Taking<T> parent) { |
| super(s, parent); |
| } |
| |
| @Override |
| public boolean tryAdvance(Consumer<? super T> action) { |
| boolean test = true; |
| if (takeOrDrop && // If can take |
| checkCancelOnCount() && // and if not cancelled |
| s.tryAdvance(this) && // and if advanced one element |
| (test = p.test(t))) { // and test on element passes |
| action.accept(t); // then accept element |
| return true; |
| } |
| else { |
| // Taking is finished |
| takeOrDrop = false; |
| // Cancel all further traversal and splitting operations |
| // only if test of element failed (short-circuited) |
| if (!test) |
| cancel.set(true); |
| return false; |
| } |
| } |
| |
| @Override |
| public Spliterator<T> trySplit() { |
| // Do not split if all operations are cancelled |
| return cancel.get() ? null : super.trySplit(); |
| } |
| |
| @Override |
| Spliterator<T> makeSpliterator(Spliterator<T> s) { |
| return new Taking<>(s, this); |
| } |
| } |
| |
| static final class Dropping<T> extends OfRef<T> { |
| Dropping(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { |
| super(s, noSplitting, p); |
| } |
| |
| Dropping(Spliterator<T> s, Dropping<T> parent) { |
| super(s, parent); |
| } |
| |
| @Override |
| public boolean tryAdvance(Consumer<? super T> action) { |
| if (takeOrDrop) { |
| takeOrDrop = false; |
| boolean adv; |
| boolean dropped = false; |
| while ((adv = s.tryAdvance(this)) && // If advanced one element |
| checkCancelOnCount() && // and if not cancelled |
| p.test(t)) { // and test on element passes |
| dropped = true; // then drop element |
| } |
| |
| // Report advanced element, if any |
| if (adv) { |
| // Cancel all further dropping if one or more elements |
| // were previously dropped |
| if (dropped) |
| cancel.set(true); |
| action.accept(t); |
| } |
| return adv; |
| } |
| else { |
| return s.tryAdvance(action); |
| } |
| } |
| |
| @Override |
| Spliterator<T> makeSpliterator(Spliterator<T> s) { |
| return new Dropping<>(s, this); |
| } |
| } |
| } |
| |
| abstract static class OfInt extends UnorderedWhileSpliterator<Integer, Spliterator.OfInt> implements IntConsumer, Spliterator.OfInt { |
| final IntPredicate p; |
| int t; |
| |
| OfInt(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { |
| super(s, noSplitting); |
| this.p = p; |
| } |
| |
| OfInt(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { |
| super(s, parent); |
| this.p = parent.p; |
| } |
| |
| @Override |
| public void accept(int t) { |
| count = (count + 1) & CANCEL_CHECK_COUNT; |
| this.t = t; |
| } |
| |
| static final class Taking extends UnorderedWhileSpliterator.OfInt { |
| Taking(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { |
| super(s, noSplitting, p); |
| } |
| |
| Taking(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { |
| super(s, parent); |
| } |
| |
| @Override |
| public boolean tryAdvance(IntConsumer action) { |
| boolean test = true; |
| if (takeOrDrop && // If can take |
| checkCancelOnCount() && // and if not cancelled |
| s.tryAdvance(this) && // and if advanced one element |
| (test = p.test(t))) { // and test on element passes |
| action.accept(t); // then accept element |
| return true; |
| } |
| else { |
| // Taking is finished |
| takeOrDrop = false; |
| // Cancel all further traversal and splitting operations |
| // only if test of element failed (short-circuited) |
| if (!test) |
| cancel.set(true); |
| return false; |
| } |
| } |
| |
| @Override |
| public Spliterator.OfInt trySplit() { |
| // Do not split if all operations are cancelled |
| return cancel.get() ? null : super.trySplit(); |
| } |
| |
| @Override |
| Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { |
| return new Taking(s, this); |
| } |
| } |
| |
| static final class Dropping extends UnorderedWhileSpliterator.OfInt { |
| Dropping(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { |
| super(s, noSplitting, p); |
| } |
| |
| Dropping(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { |
| super(s, parent); |
| } |
| |
| @Override |
| public boolean tryAdvance(IntConsumer action) { |
| if (takeOrDrop) { |
| takeOrDrop = false; |
| boolean adv; |
| boolean dropped = false; |
| while ((adv = s.tryAdvance(this)) && // If advanced one element |
| checkCancelOnCount() && // and if not cancelled |
| p.test(t)) { // and test on element passes |
| dropped = true; // then drop element |
| } |
| |
| // Report advanced element, if any |
| if (adv) { |
| // Cancel all further dropping if one or more elements |
| // were previously dropped |
| if (dropped) |
| cancel.set(true); |
| action.accept(t); |
| } |
| return adv; |
| } |
| else { |
| return s.tryAdvance(action); |
| } |
| } |
| |
| @Override |
| Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { |
| return new Dropping(s, this); |
| } |
| } |
| } |
| |
| abstract static class OfLong extends UnorderedWhileSpliterator<Long, Spliterator.OfLong> implements LongConsumer, Spliterator.OfLong { |
| final LongPredicate p; |
| long t; |
| |
| OfLong(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { |
| super(s, noSplitting); |
| this.p = p; |
| } |
| |
| OfLong(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { |
| super(s, parent); |
| this.p = parent.p; |
| } |
| |
| @Override |
| public void accept(long t) { |
| count = (count + 1) & CANCEL_CHECK_COUNT; |
| this.t = t; |
| } |
| |
| static final class Taking extends UnorderedWhileSpliterator.OfLong { |
| Taking(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { |
| super(s, noSplitting, p); |
| } |
| |
| Taking(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { |
| super(s, parent); |
| } |
| |
| @Override |
| public boolean tryAdvance(LongConsumer action) { |
| boolean test = true; |
| if (takeOrDrop && // If can take |
| checkCancelOnCount() && // and if not cancelled |
| s.tryAdvance(this) && // and if advanced one element |
| (test = p.test(t))) { // and test on element passes |
| action.accept(t); // then accept element |
| return true; |
| } |
| else { |
| // Taking is finished |
| takeOrDrop = false; |
| // Cancel all further traversal and splitting operations |
| // only if test of element failed (short-circuited) |
| if (!test) |
| cancel.set(true); |
| return false; |
| } |
| } |
| |
| @Override |
| public Spliterator.OfLong trySplit() { |
| // Do not split if all operations are cancelled |
| return cancel.get() ? null : super.trySplit(); |
| } |
| |
| @Override |
| Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { |
| return new Taking(s, this); |
| } |
| } |
| |
| static final class Dropping extends UnorderedWhileSpliterator.OfLong { |
| Dropping(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { |
| super(s, noSplitting, p); |
| } |
| |
| Dropping(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { |
| super(s, parent); |
| } |
| |
| @Override |
| public boolean tryAdvance(LongConsumer action) { |
| if (takeOrDrop) { |
| takeOrDrop = false; |
| boolean adv; |
| boolean dropped = false; |
| while ((adv = s.tryAdvance(this)) && // If advanced one element |
| checkCancelOnCount() && // and if not cancelled |
| p.test(t)) { // and test on element passes |
| dropped = true; // then drop element |
| } |
| |
| // Report advanced element, if any |
| if (adv) { |
| // Cancel all further dropping if one or more elements |
| // were previously dropped |
| if (dropped) |
| cancel.set(true); |
| action.accept(t); |
| } |
| return adv; |
| } |
| else { |
| return s.tryAdvance(action); |
| } |
| } |
| |
| @Override |
| Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { |
| return new Dropping(s, this); |
| } |
| } |
| } |
| |
| abstract static class OfDouble extends UnorderedWhileSpliterator<Double, Spliterator.OfDouble> implements DoubleConsumer, Spliterator.OfDouble { |
| final DoublePredicate p; |
| double t; |
| |
| OfDouble(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { |
| super(s, noSplitting); |
| this.p = p; |
| } |
| |
| OfDouble(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { |
| super(s, parent); |
| this.p = parent.p; |
| } |
| |
| @Override |
| public void accept(double t) { |
| count = (count + 1) & CANCEL_CHECK_COUNT; |
| this.t = t; |
| } |
| |
| static final class Taking extends UnorderedWhileSpliterator.OfDouble { |
| Taking(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { |
| super(s, noSplitting, p); |
| } |
| |
| Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { |
| super(s, parent); |
| } |
| |
| @Override |
| public boolean tryAdvance(DoubleConsumer action) { |
| boolean test = true; |
| if (takeOrDrop && // If can take |
| checkCancelOnCount() && // and if not cancelled |
| s.tryAdvance(this) && // and if advanced one element |
| (test = p.test(t))) { // and test on element passes |
| action.accept(t); // then accept element |
| return true; |
| } |
| else { |
| // Taking is finished |
| takeOrDrop = false; |
| // Cancel all further traversal and splitting operations |
| // only if test of element failed (short-circuited) |
| if (!test) |
| cancel.set(true); |
| return false; |
| } |
| } |
| |
| @Override |
| public Spliterator.OfDouble trySplit() { |
| // Do not split if all operations are cancelled |
| return cancel.get() ? null : super.trySplit(); |
| } |
| |
| @Override |
| Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { |
| return new Taking(s, this); |
| } |
| } |
| |
| static final class Dropping extends UnorderedWhileSpliterator.OfDouble { |
| Dropping(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { |
| super(s, noSplitting, p); |
| } |
| |
| Dropping(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { |
| super(s, parent); |
| } |
| |
| @Override |
| public boolean tryAdvance(DoubleConsumer action) { |
| if (takeOrDrop) { |
| takeOrDrop = false; |
| boolean adv; |
| boolean dropped = false; |
| while ((adv = s.tryAdvance(this)) && // If advanced one element |
| checkCancelOnCount() && // and if not cancelled |
| p.test(t)) { // and test on element passes |
| dropped = true; // then drop element |
| } |
| |
| // Report advanced element, if any |
| if (adv) { |
| // Cancel all further dropping if one or more elements |
| // were previously dropped |
| if (dropped) |
| cancel.set(true); |
| action.accept(t); |
| } |
| return adv; |
| } |
| else { |
| return s.tryAdvance(action); |
| } |
| } |
| |
| @Override |
| Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { |
| return new Dropping(s, this); |
| } |
| } |
| } |
| } |
| |
| |
| // |
| |
| /** |
| * {@code ForkJoinTask} implementing takeWhile computation. |
| * <p> |
| * If the pipeline has encounter order then all tasks to the right of |
| * a task where traversal was short-circuited are cancelled. |
| * The results of completed (and cancelled) tasks are discarded. |
| * The result of merging a short-circuited left task and right task (which |
| * may or may not be short-circuited) is that left task. |
| * <p> |
| * If the pipeline has no encounter order then all tasks to the right of |
| * a task where traversal was short-circuited are cancelled. |
| * The results of completed (and possibly cancelled) tasks are not |
| * discarded, as there is no need to throw away computed results. |
| * The result of merging does not change if a left task was |
| * short-circuited. |
| * No attempt is made, once a leaf task stopped taking, for it to cancel |
| * all other tasks, and further more, short-circuit the computation with its |
| * result. |
| * |
| * @param <P_IN> Input element type to the stream pipeline |
| * @param <P_OUT> Output element type from the stream pipeline |
| */ |
| @SuppressWarnings("serial") |
| private static final class TakeWhileTask<P_IN, P_OUT> |
| extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, TakeWhileTask<P_IN, P_OUT>> { |
| private final AbstractPipeline<P_OUT, P_OUT, ?> op; |
| private final IntFunction<P_OUT[]> generator; |
| private final boolean isOrdered; |
| private long thisNodeSize; |
| // True if a short-circuited |
| private boolean shortCircuited; |
| // True if completed, must be set after the local result |
| private volatile boolean completed; |
| |
| TakeWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op, |
| PipelineHelper<P_OUT> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<P_OUT[]> generator) { |
| super(helper, spliterator); |
| this.op = op; |
| this.generator = generator; |
| this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); |
| } |
| |
| TakeWhileTask(TakeWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { |
| super(parent, spliterator); |
| this.op = parent.op; |
| this.generator = parent.generator; |
| this.isOrdered = parent.isOrdered; |
| } |
| |
| @Override |
| protected TakeWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { |
| return new TakeWhileTask<>(this, spliterator); |
| } |
| |
| @Override |
| protected final Node<P_OUT> getEmptyResult() { |
| return Nodes.emptyNode(op.getOutputShape()); |
| } |
| |
| @Override |
| protected final Node<P_OUT> doLeaf() { |
| Node.Builder<P_OUT> builder = helper.makeNodeBuilder(-1, generator); |
| Sink<P_OUT> s = op.opWrapSink(helper.getStreamAndOpFlags(), builder); |
| |
| if (shortCircuited = helper.copyIntoWithCancel(helper.wrapSink(s), spliterator)) { |
| // Cancel later nodes if the predicate returned false |
| // during traversal |
| cancelLaterNodes(); |
| } |
| |
| Node<P_OUT> node = builder.build(); |
| thisNodeSize = node.count(); |
| return node; |
| } |
| |
| @Override |
| public final void onCompletion(CountedCompleter<?> caller) { |
| if (!isLeaf()) { |
| Node<P_OUT> result; |
| shortCircuited = leftChild.shortCircuited | rightChild.shortCircuited; |
| if (isOrdered && canceled) { |
| thisNodeSize = 0; |
| result = getEmptyResult(); |
| } |
| else if (isOrdered && leftChild.shortCircuited) { |
| // If taking finished on the left node then |
| // use the left node result |
| thisNodeSize = leftChild.thisNodeSize; |
| result = leftChild.getLocalResult(); |
| } |
| else { |
| thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; |
| result = merge(); |
| } |
| |
| setLocalResult(result); |
| } |
| |
| completed = true; |
| super.onCompletion(caller); |
| } |
| |
| Node<P_OUT> merge() { |
| if (leftChild.thisNodeSize == 0) { |
| // If the left node size is 0 then |
| // use the right node result |
| return rightChild.getLocalResult(); |
| } |
| else if (rightChild.thisNodeSize == 0) { |
| // If the right node size is 0 then |
| // use the left node result |
| return leftChild.getLocalResult(); |
| } |
| else { |
| // Combine the left and right nodes |
| return Nodes.conc(op.getOutputShape(), |
| leftChild.getLocalResult(), rightChild.getLocalResult()); |
| } |
| } |
| |
| @Override |
| protected void cancel() { |
| super.cancel(); |
| if (isOrdered && completed) |
| // If the task is completed then clear the result, if any |
| // to aid GC |
| setLocalResult(getEmptyResult()); |
| } |
| } |
| |
| /** |
| * {@code ForkJoinTask} implementing dropWhile computation. |
| * <p> |
| * If the pipeline has encounter order then each leaf task will not |
| * drop elements but will obtain a count of the elements that would have |
| * been otherwise dropped. That count is used as an index to track |
| * elements to be dropped. Merging will update the index so it corresponds |
| * to the index that is the end of the global prefix of elements to be |
| * dropped. The root is truncated according to that index. |
| * <p> |
| * If the pipeline has no encounter order then each leaf task will drop |
| * elements. Leaf tasks are ordinarily merged. No truncation of the root |
| * node is required. |
| * No attempt is made, once a leaf task stopped dropping, for it to cancel |
| * all other tasks, and further more, short-circuit the computation with |
| * its result. |
| * |
| * @param <P_IN> Input element type to the stream pipeline |
| * @param <P_OUT> Output element type from the stream pipeline |
| */ |
| @SuppressWarnings("serial") |
| private static final class DropWhileTask<P_IN, P_OUT> |
| extends AbstractTask<P_IN, P_OUT, Node<P_OUT>, DropWhileTask<P_IN, P_OUT>> { |
| private final AbstractPipeline<P_OUT, P_OUT, ?> op; |
| private final IntFunction<P_OUT[]> generator; |
| private final boolean isOrdered; |
| private long thisNodeSize; |
| // The index from which elements of the node should be taken |
| // i.e. the node should be truncated from [takeIndex, thisNodeSize) |
| // Equivalent to the count of dropped elements |
| private long index; |
| |
| DropWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op, |
| PipelineHelper<P_OUT> helper, |
| Spliterator<P_IN> spliterator, |
| IntFunction<P_OUT[]> generator) { |
| super(helper, spliterator); |
| assert op instanceof DropWhileOp; |
| this.op = op; |
| this.generator = generator; |
| this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); |
| } |
| |
| DropWhileTask(DropWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { |
| super(parent, spliterator); |
| this.op = parent.op; |
| this.generator = parent.generator; |
| this.isOrdered = parent.isOrdered; |
| } |
| |
| @Override |
| protected DropWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { |
| return new DropWhileTask<>(this, spliterator); |
| } |
| |
| @Override |
| protected final Node<P_OUT> doLeaf() { |
| boolean isChild = !isRoot(); |
| // If this not the root and pipeline is ordered and size is known |
| // then pre-size the builder |
| long sizeIfKnown = isChild && isOrdered && StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) |
| ? op.exactOutputSizeIfKnown(spliterator) |
| : -1; |
| Node.Builder<P_OUT> builder = helper.makeNodeBuilder(sizeIfKnown, generator); |
| @SuppressWarnings("unchecked") |
| DropWhileOp<P_OUT> dropOp = (DropWhileOp<P_OUT>) op; |
| // If this leaf is the root then there is no merging on completion |
| // and there is no need to retain dropped elements |
| DropWhileSink<P_OUT> s = dropOp.opWrapSink(builder, isOrdered && isChild); |
| helper.wrapAndCopyInto(s, spliterator); |
| |
| Node<P_OUT> node = builder.build(); |
| thisNodeSize = node.count(); |
| index = s.getDropCount(); |
| return node; |
| } |
| |
| @Override |
| public final void onCompletion(CountedCompleter<?> caller) { |
| if (!isLeaf()) { |
| if (isOrdered) { |
| index = leftChild.index; |
| // If a contiguous sequence of dropped elements |
| // include those of the right node, if any |
| if (index == leftChild.thisNodeSize) |
| index += rightChild.index; |
| } |
| |
| thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; |
| Node<P_OUT> result = merge(); |
| setLocalResult(isRoot() ? doTruncate(result) : result); |
| } |
| |
| super.onCompletion(caller); |
| } |
| |
| private Node<P_OUT> merge() { |
| if (leftChild.thisNodeSize == 0) { |
| // If the left node size is 0 then |
| // use the right node result |
| return rightChild.getLocalResult(); |
| } |
| else if (rightChild.thisNodeSize == 0) { |
| // If the right node size is 0 then |
| // use the left node result |
| return leftChild.getLocalResult(); |
| } |
| else { |
| // Combine the left and right nodes |
| return Nodes.conc(op.getOutputShape(), |
| leftChild.getLocalResult(), rightChild.getLocalResult()); |
| } |
| } |
| |
| private Node<P_OUT> doTruncate(Node<P_OUT> input) { |
| return isOrdered |
| ? input.truncate(index, input.count(), generator) |
| : input; |
| } |
| } |
| } |