blob: cabe68835284781bb1efb65a32a616e079152889 [file] [log] [blame]
/*
* Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Comparator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.DoublePredicate;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import java.util.function.LongConsumer;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
/**
* Factory for instances of a takeWhile and dropWhile operations
* that produce subsequences of their input stream.
*
* @since 1.9
*/
final class WhileOps {
static final int TAKE_FLAGS = StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SHORT_CIRCUIT;
static final int DROP_FLAGS = StreamOpFlag.NOT_SIZED;
/**
* Appends a "takeWhile" operation to the provided Stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt taking.
*/
static <T> Stream<T> makeTakeWhileRef(AbstractPipeline<?, T, ?> upstream,
Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, TAKE_FLAGS) {
@Override
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Nodes.castingArray())
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfRef.Taking<>(
helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
return new TakeWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<T, T>(sink) {
boolean take = true;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(T t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
};
}
};
}
/**
* Appends a "takeWhile" operation to the provided IntStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt taking.
*/
static IntStream makeTakeWhileInt(AbstractPipeline<?, Integer, ?> upstream,
IntPredicate predicate) {
Objects.requireNonNull(predicate);
return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, TAKE_FLAGS) {
@Override
<P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Integer[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfInt.Taking(
(Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator,
IntFunction<Integer[]> generator) {
return new TakeWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt<Integer>(sink) {
boolean take = true;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(int t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
};
}
};
}
/**
* Appends a "takeWhile" operation to the provided LongStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt taking.
*/
static LongStream makeTakeWhileLong(AbstractPipeline<?, Long, ?> upstream,
LongPredicate predicate) {
Objects.requireNonNull(predicate);
return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, TAKE_FLAGS) {
@Override
<P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Long[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfLong.Taking(
(Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator,
IntFunction<Long[]> generator) {
return new TakeWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong<Long>(sink) {
boolean take = true;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(long t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
};
}
};
}
/**
* Appends a "takeWhile" operation to the provided DoubleStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt taking.
*/
static DoubleStream makeTakeWhileDouble(AbstractPipeline<?, Double, ?> upstream,
DoublePredicate predicate) {
Objects.requireNonNull(predicate);
return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, TAKE_FLAGS) {
@Override
<P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Double[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfDouble.Taking(
(Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator,
IntFunction<Double[]> generator) {
return new TakeWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble<Double>(sink) {
boolean take = true;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(double t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
};
}
};
}
/**
* A specialization for the dropWhile operation that controls if
* elements to be dropped are counted and passed downstream.
* <p>
* This specialization is utilized by the {@link TakeWhileTask} for
* pipelines that are ordered. In such cases elements cannot be dropped
* until all elements have been collected.
*
* @param <T> the type of both input and output elements
*/
interface DropWhileOp<T> {
/**
* Accepts a {@code Sink} which will receive the results of this
* dropWhile operation, and return a {@code DropWhileSink} which
* accepts
* elements and which performs the dropWhile operation passing the
* results to the provided {@code Sink}.
*
* @param sink sink to which elements should be sent after processing
* @param retainAndCountDroppedElements true if elements to be dropped
* are counted and passed to the sink, otherwise such elements
* are actually dropped and not passed to the sink.
* @return a dropWhile sink
*/
DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements);
}
/**
* A specialization for a dropWhile sink.
*
* @param <T> the type of both input and output elements
*/
interface DropWhileSink<T> extends Sink<T> {
/**
* @return the could of elements that would have been dropped and
* instead were passed downstream.
*/
long getDropCount();
}
/**
* Appends a "dropWhile" operation to the provided Stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt dropping.
*/
static <T> Stream<T> makeDropWhileRef(AbstractPipeline<?, T, ?> upstream,
Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
class Op extends ReferencePipeline.StatefulOp<T, T> implements DropWhileOp<T> {
public Op(AbstractPipeline<?, T, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, inputShape, opFlags);
}
@Override
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Nodes.castingArray())
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfRef.Dropping<>(
helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
return new DropWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return opWrapSink(sink, false);
}
public DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements) {
class OpSink extends Sink.ChainedReference<T, T> implements DropWhileSink<T> {
long dropCount;
boolean take;
OpSink() {
super(sink);
}
@Override
public void accept(T t) {
boolean takeElement = take || (take = !predicate.test(t));
// If ordered and element is dropped increment index
// for possible future truncation
if (retainAndCountDroppedElements && !takeElement)
dropCount++;
// If ordered need to process element, otherwise
// skip if element is dropped
if (retainAndCountDroppedElements || takeElement)
downstream.accept(t);
}
@Override
public long getDropCount() {
return dropCount;
}
}
return new OpSink();
}
}
return new Op(upstream, StreamShape.REFERENCE, DROP_FLAGS);
}
/**
* Appends a "dropWhile" operation to the provided IntStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt dropping.
*/
static IntStream makeDropWhileInt(AbstractPipeline<?, Integer, ?> upstream,
IntPredicate predicate) {
Objects.requireNonNull(predicate);
class Op extends IntPipeline.StatefulOp<Integer> implements DropWhileOp<Integer> {
public Op(AbstractPipeline<?, Integer, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, inputShape, opFlags);
}
@Override
<P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Integer[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfInt.Dropping(
(Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator,
IntFunction<Integer[]> generator) {
return new DropWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return opWrapSink(sink, false);
}
public DropWhileSink<Integer> opWrapSink(Sink<Integer> sink, boolean retainAndCountDroppedElements) {
class OpSink extends Sink.ChainedInt<Integer> implements DropWhileSink<Integer> {
long dropCount;
boolean take;
OpSink() {
super(sink);
}
@Override
public void accept(int t) {
boolean takeElement = take || (take = !predicate.test(t));
// If ordered and element is dropped increment index
// for possible future truncation
if (retainAndCountDroppedElements && !takeElement)
dropCount++;
// If ordered need to process element, otherwise
// skip if element is dropped
if (retainAndCountDroppedElements || takeElement)
downstream.accept(t);
}
@Override
public long getDropCount() {
return dropCount;
}
}
return new OpSink();
}
}
return new Op(upstream, StreamShape.INT_VALUE, DROP_FLAGS);
}
/**
* Appends a "dropWhile" operation to the provided LongStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt dropping.
*/
static LongStream makeDropWhileLong(AbstractPipeline<?, Long, ?> upstream,
LongPredicate predicate) {
Objects.requireNonNull(predicate);
class Op extends LongPipeline.StatefulOp<Long> implements DropWhileOp<Long> {
public Op(AbstractPipeline<?, Long, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, inputShape, opFlags);
}
@Override
<P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Long[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfLong.Dropping(
(Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator,
IntFunction<Long[]> generator) {
return new DropWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return opWrapSink(sink, false);
}
public DropWhileSink<Long> opWrapSink(Sink<Long> sink, boolean retainAndCountDroppedElements) {
class OpSink extends Sink.ChainedLong<Long> implements DropWhileSink<Long> {
long dropCount;
boolean take;
OpSink() {
super(sink);
}
@Override
public void accept(long t) {
boolean takeElement = take || (take = !predicate.test(t));
// If ordered and element is dropped increment index
// for possible future truncation
if (retainAndCountDroppedElements && !takeElement)
dropCount++;
// If ordered need to process element, otherwise
// skip if element is dropped
if (retainAndCountDroppedElements || takeElement)
downstream.accept(t);
}
@Override
public long getDropCount() {
return dropCount;
}
}
return new OpSink();
}
}
return new Op(upstream, StreamShape.LONG_VALUE, DROP_FLAGS);
}
/**
* Appends a "dropWhile" operation to the provided DoubleStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt dropping.
*/
static DoubleStream makeDropWhileDouble(AbstractPipeline<?, Double, ?> upstream,
DoublePredicate predicate) {
Objects.requireNonNull(predicate);
class Op extends DoublePipeline.StatefulOp<Double> implements DropWhileOp<Double> {
public Op(AbstractPipeline<?, Double, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, inputShape, opFlags);
}
@Override
<P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Double[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfDouble.Dropping(
(Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator,
IntFunction<Double[]> generator) {
return new DropWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return opWrapSink(sink, false);
}
public DropWhileSink<Double> opWrapSink(Sink<Double> sink, boolean retainAndCountDroppedElements) {
class OpSink extends Sink.ChainedDouble<Double> implements DropWhileSink<Double> {
long dropCount;
boolean take;
OpSink() {
super(sink);
}
@Override
public void accept(double t) {
boolean takeElement = take || (take = !predicate.test(t));
// If ordered and element is dropped increment index
// for possible future truncation
if (retainAndCountDroppedElements && !takeElement)
dropCount++;
// If ordered need to process element, otherwise
// skip if element is dropped
if (retainAndCountDroppedElements || takeElement)
downstream.accept(t);
}
@Override
public long getDropCount() {
return dropCount;
}
}
return new OpSink();
}
}
return new Op(upstream, StreamShape.DOUBLE_VALUE, DROP_FLAGS);
}
//
/**
* A spliterator supporting takeWhile and dropWhile operations over an
* underlying spliterator whose covered elements have no encounter order.
* <p>
* Concrete subclasses of this spliterator support reference and primitive
* types for takeWhile and dropWhile.
* <p>
* For the takeWhile operation if during traversal taking completes then
* taking is cancelled globally for the splitting and traversal of all
* related spliterators.
* Cancellation is governed by a shared {@link AtomicBoolean} instance. A
* spliterator in the process of taking when cancellation occurs will also
* be cancelled but not necessarily immediately. To reduce contention on
* the {@link AtomicBoolean} instance, cancellation make be acted on after
* a small number of additional elements have been traversed.
* <p>
* For the dropWhile operation if during traversal dropping completes for
* some, but not all elements, then it is cancelled globally for the
* traversal of all related spliterators (splitting is not cancelled).
* Cancellation is governed in the same manner as for the takeWhile
* operation.
*
* @param <T> the type of elements returned by this spliterator
* @param <T_SPLITR> the type of the spliterator
*/
abstract static class UnorderedWhileSpliterator<T, T_SPLITR extends Spliterator<T>> implements Spliterator<T> {
// Power of two constant minus one used for modulus of count
static final int CANCEL_CHECK_COUNT = (1 << 6) - 1;
// The underlying spliterator
final T_SPLITR s;
// True if no splitting should be performed, if true then
// this spliterator may be used for an underlying spliterator whose
// covered elements have an encounter order
// See use in stream take/dropWhile default default methods
final boolean noSplitting;
// True when operations are cancelled for all related spliterators
// For taking, spliterators cannot split or traversed
// For dropping, spliterators cannot be traversed
final AtomicBoolean cancel;
// True while taking or dropping should be performed when traversing
boolean takeOrDrop = true;
// The count of elements traversed
int count;
UnorderedWhileSpliterator(T_SPLITR s, boolean noSplitting) {
this.s = s;
this.noSplitting = noSplitting;
this.cancel = new AtomicBoolean();
}
UnorderedWhileSpliterator(T_SPLITR s, UnorderedWhileSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.noSplitting = parent.noSplitting;
this.cancel = parent.cancel;
}
@Override
public long estimateSize() {
return s.estimateSize();
}
@Override
public int characteristics() {
// Size is not known
return s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
}
@Override
public long getExactSizeIfKnown() {
return -1L;
}
@Override
public Comparator<? super T> getComparator() {
return s.getComparator();
}
@Override
public T_SPLITR trySplit() {
@SuppressWarnings("unchecked")
T_SPLITR ls = noSplitting ? null : (T_SPLITR) s.trySplit();
return ls != null ? makeSpliterator(ls) : null;
}
boolean checkCancelOnCount() {
return count != 0 || !cancel.get();
}
abstract T_SPLITR makeSpliterator(T_SPLITR s);
abstract static class OfRef<T> extends UnorderedWhileSpliterator<T, Spliterator<T>> implements Consumer<T> {
final Predicate<? super T> p;
T t;
OfRef(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) {
super(s, noSplitting);
this.p = p;
}
OfRef(Spliterator<T> s, OfRef<T> parent) {
super(s, parent);
this.p = parent.p;
}
@Override
public void accept(T t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}
static final class Taking<T> extends OfRef<T> {
Taking(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) {
super(s, noSplitting, p);
}
Taking(Spliterator<T> s, Taking<T> parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
boolean test = true;
if (takeOrDrop && // If can take
checkCancelOnCount() && // and if not cancelled
s.tryAdvance(this) && // and if advanced one element
(test = p.test(t))) { // and test on element passes
action.accept(t); // then accept element
return true;
}
else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}
@Override
public Spliterator<T> trySplit() {
// Do not split if all operations are cancelled
return cancel.get() ? null : super.trySplit();
}
@Override
Spliterator<T> makeSpliterator(Spliterator<T> s) {
return new Taking<>(s, this);
}
}
static final class Dropping<T> extends OfRef<T> {
Dropping(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) {
super(s, noSplitting, p);
}
Dropping(Spliterator<T> s, Dropping<T> parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (takeOrDrop) {
takeOrDrop = false;
boolean adv;
boolean dropped = false;
while ((adv = s.tryAdvance(this)) && // If advanced one element
checkCancelOnCount() && // and if not cancelled
p.test(t)) { // and test on element passes
dropped = true; // then drop element
}
// Report advanced element, if any
if (adv) {
// Cancel all further dropping if one or more elements
// were previously dropped
if (dropped)
cancel.set(true);
action.accept(t);
}
return adv;
}
else {
return s.tryAdvance(action);
}
}
@Override
Spliterator<T> makeSpliterator(Spliterator<T> s) {
return new Dropping<>(s, this);
}
}
}
abstract static class OfInt extends UnorderedWhileSpliterator<Integer, Spliterator.OfInt> implements IntConsumer, Spliterator.OfInt {
final IntPredicate p;
int t;
OfInt(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) {
super(s, noSplitting);
this.p = p;
}
OfInt(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) {
super(s, parent);
this.p = parent.p;
}
@Override
public void accept(int t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}
static final class Taking extends UnorderedWhileSpliterator.OfInt {
Taking(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) {
super(s, noSplitting, p);
}
Taking(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(IntConsumer action) {
boolean test = true;
if (takeOrDrop && // If can take
checkCancelOnCount() && // and if not cancelled
s.tryAdvance(this) && // and if advanced one element
(test = p.test(t))) { // and test on element passes
action.accept(t); // then accept element
return true;
}
else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}
@Override
public Spliterator.OfInt trySplit() {
// Do not split if all operations are cancelled
return cancel.get() ? null : super.trySplit();
}
@Override
Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
return new Taking(s, this);
}
}
static final class Dropping extends UnorderedWhileSpliterator.OfInt {
Dropping(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) {
super(s, noSplitting, p);
}
Dropping(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(IntConsumer action) {
if (takeOrDrop) {
takeOrDrop = false;
boolean adv;
boolean dropped = false;
while ((adv = s.tryAdvance(this)) && // If advanced one element
checkCancelOnCount() && // and if not cancelled
p.test(t)) { // and test on element passes
dropped = true; // then drop element
}
// Report advanced element, if any
if (adv) {
// Cancel all further dropping if one or more elements
// were previously dropped
if (dropped)
cancel.set(true);
action.accept(t);
}
return adv;
}
else {
return s.tryAdvance(action);
}
}
@Override
Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
return new Dropping(s, this);
}
}
}
abstract static class OfLong extends UnorderedWhileSpliterator<Long, Spliterator.OfLong> implements LongConsumer, Spliterator.OfLong {
final LongPredicate p;
long t;
OfLong(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) {
super(s, noSplitting);
this.p = p;
}
OfLong(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) {
super(s, parent);
this.p = parent.p;
}
@Override
public void accept(long t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}
static final class Taking extends UnorderedWhileSpliterator.OfLong {
Taking(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) {
super(s, noSplitting, p);
}
Taking(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(LongConsumer action) {
boolean test = true;
if (takeOrDrop && // If can take
checkCancelOnCount() && // and if not cancelled
s.tryAdvance(this) && // and if advanced one element
(test = p.test(t))) { // and test on element passes
action.accept(t); // then accept element
return true;
}
else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}
@Override
public Spliterator.OfLong trySplit() {
// Do not split if all operations are cancelled
return cancel.get() ? null : super.trySplit();
}
@Override
Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
return new Taking(s, this);
}
}
static final class Dropping extends UnorderedWhileSpliterator.OfLong {
Dropping(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) {
super(s, noSplitting, p);
}
Dropping(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(LongConsumer action) {
if (takeOrDrop) {
takeOrDrop = false;
boolean adv;
boolean dropped = false;
while ((adv = s.tryAdvance(this)) && // If advanced one element
checkCancelOnCount() && // and if not cancelled
p.test(t)) { // and test on element passes
dropped = true; // then drop element
}
// Report advanced element, if any
if (adv) {
// Cancel all further dropping if one or more elements
// were previously dropped
if (dropped)
cancel.set(true);
action.accept(t);
}
return adv;
}
else {
return s.tryAdvance(action);
}
}
@Override
Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
return new Dropping(s, this);
}
}
}
abstract static class OfDouble extends UnorderedWhileSpliterator<Double, Spliterator.OfDouble> implements DoubleConsumer, Spliterator.OfDouble {
final DoublePredicate p;
double t;
OfDouble(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) {
super(s, noSplitting);
this.p = p;
}
OfDouble(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) {
super(s, parent);
this.p = parent.p;
}
@Override
public void accept(double t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}
static final class Taking extends UnorderedWhileSpliterator.OfDouble {
Taking(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) {
super(s, noSplitting, p);
}
Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(DoubleConsumer action) {
boolean test = true;
if (takeOrDrop && // If can take
checkCancelOnCount() && // and if not cancelled
s.tryAdvance(this) && // and if advanced one element
(test = p.test(t))) { // and test on element passes
action.accept(t); // then accept element
return true;
}
else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}
@Override
public Spliterator.OfDouble trySplit() {
// Do not split if all operations are cancelled
return cancel.get() ? null : super.trySplit();
}
@Override
Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
return new Taking(s, this);
}
}
static final class Dropping extends UnorderedWhileSpliterator.OfDouble {
Dropping(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) {
super(s, noSplitting, p);
}
Dropping(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(DoubleConsumer action) {
if (takeOrDrop) {
takeOrDrop = false;
boolean adv;
boolean dropped = false;
while ((adv = s.tryAdvance(this)) && // If advanced one element
checkCancelOnCount() && // and if not cancelled
p.test(t)) { // and test on element passes
dropped = true; // then drop element
}
// Report advanced element, if any
if (adv) {
// Cancel all further dropping if one or more elements
// were previously dropped
if (dropped)
cancel.set(true);
action.accept(t);
}
return adv;
}
else {
return s.tryAdvance(action);
}
}
@Override
Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
return new Dropping(s, this);
}
}
}
}
//
/**
* {@code ForkJoinTask} implementing takeWhile computation.
* <p>
* If the pipeline has encounter order then all tasks to the right of
* a task where traversal was short-circuited are cancelled.
* The results of completed (and cancelled) tasks are discarded.
* The result of merging a short-circuited left task and right task (which
* may or may not be short-circuited) is that left task.
* <p>
* If the pipeline has no encounter order then all tasks to the right of
* a task where traversal was short-circuited are cancelled.
* The results of completed (and possibly cancelled) tasks are not
* discarded, as there is no need to throw away computed results.
* The result of merging does not change if a left task was
* short-circuited.
* No attempt is made, once a leaf task stopped taking, for it to cancel
* all other tasks, and further more, short-circuit the computation with its
* result.
*
* @param <P_IN> Input element type to the stream pipeline
* @param <P_OUT> Output element type from the stream pipeline
*/
@SuppressWarnings("serial")
private static final class TakeWhileTask<P_IN, P_OUT>
extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, TakeWhileTask<P_IN, P_OUT>> {
private final AbstractPipeline<P_OUT, P_OUT, ?> op;
private final IntFunction<P_OUT[]> generator;
private final boolean isOrdered;
private long thisNodeSize;
// True if a short-circuited
private boolean shortCircuited;
// True if completed, must be set after the local result
private volatile boolean completed;
TakeWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<P_OUT[]> generator) {
super(helper, spliterator);
this.op = op;
this.generator = generator;
this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags());
}
TakeWhileTask(TakeWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
super(parent, spliterator);
this.op = parent.op;
this.generator = parent.generator;
this.isOrdered = parent.isOrdered;
}
@Override
protected TakeWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
return new TakeWhileTask<>(this, spliterator);
}
@Override
protected final Node<P_OUT> getEmptyResult() {
return Nodes.emptyNode(op.getOutputShape());
}
@Override
protected final Node<P_OUT> doLeaf() {
Node.Builder<P_OUT> builder = helper.makeNodeBuilder(-1, generator);
Sink<P_OUT> s = op.opWrapSink(helper.getStreamAndOpFlags(), builder);
if (shortCircuited = helper.copyIntoWithCancel(helper.wrapSink(s), spliterator)) {
// Cancel later nodes if the predicate returned false
// during traversal
cancelLaterNodes();
}
Node<P_OUT> node = builder.build();
thisNodeSize = node.count();
return node;
}
@Override
public final void onCompletion(CountedCompleter<?> caller) {
if (!isLeaf()) {
Node<P_OUT> result;
shortCircuited = leftChild.shortCircuited | rightChild.shortCircuited;
if (isOrdered && canceled) {
thisNodeSize = 0;
result = getEmptyResult();
}
else if (isOrdered && leftChild.shortCircuited) {
// If taking finished on the left node then
// use the left node result
thisNodeSize = leftChild.thisNodeSize;
result = leftChild.getLocalResult();
}
else {
thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
result = merge();
}
setLocalResult(result);
}
completed = true;
super.onCompletion(caller);
}
Node<P_OUT> merge() {
if (leftChild.thisNodeSize == 0) {
// If the left node size is 0 then
// use the right node result
return rightChild.getLocalResult();
}
else if (rightChild.thisNodeSize == 0) {
// If the right node size is 0 then
// use the left node result
return leftChild.getLocalResult();
}
else {
// Combine the left and right nodes
return Nodes.conc(op.getOutputShape(),
leftChild.getLocalResult(), rightChild.getLocalResult());
}
}
@Override
protected void cancel() {
super.cancel();
if (isOrdered && completed)
// If the task is completed then clear the result, if any
// to aid GC
setLocalResult(getEmptyResult());
}
}
/**
* {@code ForkJoinTask} implementing dropWhile computation.
* <p>
* If the pipeline has encounter order then each leaf task will not
* drop elements but will obtain a count of the elements that would have
* been otherwise dropped. That count is used as an index to track
* elements to be dropped. Merging will update the index so it corresponds
* to the index that is the end of the global prefix of elements to be
* dropped. The root is truncated according to that index.
* <p>
* If the pipeline has no encounter order then each leaf task will drop
* elements. Leaf tasks are ordinarily merged. No truncation of the root
* node is required.
* No attempt is made, once a leaf task stopped dropping, for it to cancel
* all other tasks, and further more, short-circuit the computation with
* its result.
*
* @param <P_IN> Input element type to the stream pipeline
* @param <P_OUT> Output element type from the stream pipeline
*/
@SuppressWarnings("serial")
private static final class DropWhileTask<P_IN, P_OUT>
extends AbstractTask<P_IN, P_OUT, Node<P_OUT>, DropWhileTask<P_IN, P_OUT>> {
private final AbstractPipeline<P_OUT, P_OUT, ?> op;
private final IntFunction<P_OUT[]> generator;
private final boolean isOrdered;
private long thisNodeSize;
// The index from which elements of the node should be taken
// i.e. the node should be truncated from [takeIndex, thisNodeSize)
// Equivalent to the count of dropped elements
private long index;
DropWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<P_OUT[]> generator) {
super(helper, spliterator);
assert op instanceof DropWhileOp;
this.op = op;
this.generator = generator;
this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags());
}
DropWhileTask(DropWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
super(parent, spliterator);
this.op = parent.op;
this.generator = parent.generator;
this.isOrdered = parent.isOrdered;
}
@Override
protected DropWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
return new DropWhileTask<>(this, spliterator);
}
@Override
protected final Node<P_OUT> doLeaf() {
boolean isChild = !isRoot();
// If this not the root and pipeline is ordered and size is known
// then pre-size the builder
long sizeIfKnown = isChild && isOrdered && StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
? op.exactOutputSizeIfKnown(spliterator)
: -1;
Node.Builder<P_OUT> builder = helper.makeNodeBuilder(sizeIfKnown, generator);
@SuppressWarnings("unchecked")
DropWhileOp<P_OUT> dropOp = (DropWhileOp<P_OUT>) op;
// If this leaf is the root then there is no merging on completion
// and there is no need to retain dropped elements
DropWhileSink<P_OUT> s = dropOp.opWrapSink(builder, isOrdered && isChild);
helper.wrapAndCopyInto(s, spliterator);
Node<P_OUT> node = builder.build();
thisNodeSize = node.count();
index = s.getDropCount();
return node;
}
@Override
public final void onCompletion(CountedCompleter<?> caller) {
if (!isLeaf()) {
if (isOrdered) {
index = leftChild.index;
// If a contiguous sequence of dropped elements
// include those of the right node, if any
if (index == leftChild.thisNodeSize)
index += rightChild.index;
}
thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
Node<P_OUT> result = merge();
setLocalResult(isRoot() ? doTruncate(result) : result);
}
super.onCompletion(caller);
}
private Node<P_OUT> merge() {
if (leftChild.thisNodeSize == 0) {
// If the left node size is 0 then
// use the right node result
return rightChild.getLocalResult();
}
else if (rightChild.thisNodeSize == 0) {
// If the right node size is 0 then
// use the left node result
return leftChild.getLocalResult();
}
else {
// Combine the left and right nodes
return Nodes.conc(op.getOutputShape(),
leftChild.getLocalResult(), rightChild.getLocalResult());
}
}
private Node<P_OUT> doTruncate(Node<P_OUT> input) {
return isOrdered
? input.truncate(index, input.count(), generator)
: input;
}
}
}