blob: 592b609cad57f60c582e64484ed90f2c10fc7371 [file] [log] [blame]
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.IntFunction;
/**
* Factory methods for transforming streams into sorted streams.
*
* @since 1.8
*/
final class SortedOps {
private SortedOps() { }
/**
* Appends a "sorted" operation to the provided stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
*/
static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
return new OfRef<>(upstream);
}
/**
* Appends a "sorted" operation to the provided stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
* @param comparator the comparator to order elements by
*/
static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
Comparator<? super T> comparator) {
return new OfRef<>(upstream, comparator);
}
/**
* Appends a "sorted" operation to the provided stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
*/
static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
return new OfInt(upstream);
}
/**
* Appends a "sorted" operation to the provided stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
*/
static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
return new OfLong(upstream);
}
/**
* Appends a "sorted" operation to the provided stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
*/
static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
return new OfDouble(upstream);
}
/**
* Specialized subtype for sorting reference streams
*/
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
/**
* Comparator used for sorting
*/
private final boolean isNaturalSort;
private final Comparator<? super T> comparator;
/**
* Sort using natural order of {@literal <T>} which must be
* {@code Comparable}.
*/
OfRef(AbstractPipeline<?, T, ?> upstream) {
super(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
this.isNaturalSort = true;
// Will throw CCE when we try to sort if T is not Comparable
@SuppressWarnings("unchecked")
Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
this.comparator = comp;
}
/**
* Sort using the provided comparator.
*
* @param comparator The comparator to be used to evaluate ordering.
*/
OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
super(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
this.isNaturalSort = false;
this.comparator = Objects.requireNonNull(comparator);
}
@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
}
@Override
public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
// If the input is already naturally sorted and this operation
// naturally sorts then collect the output
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
return helper.evaluate(spliterator, false, generator);
}
else {
// @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
Arrays.parallelSort(flattenedData, comparator);
return Nodes.node(flattenedData);
}
}
}
/**
* Specialized subtype for sorting int streams.
*/
private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
OfInt(AbstractPipeline<?, Integer, ?> upstream) {
super(upstream, StreamShape.INT_VALUE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
}
@Override
public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.SORTED.isKnown(flags))
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedIntSortingSink(sink);
else
return new IntSortingSink(sink);
}
@Override
public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator,
IntFunction<Integer[]> generator) {
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
return helper.evaluate(spliterator, false, generator);
}
else {
Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
int[] content = n.asPrimitiveArray();
Arrays.parallelSort(content);
return Nodes.node(content);
}
}
}
/**
* Specialized subtype for sorting long streams.
*/
private static final class OfLong extends LongPipeline.StatefulOp<Long> {
OfLong(AbstractPipeline<?, Long, ?> upstream) {
super(upstream, StreamShape.LONG_VALUE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
}
@Override
public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.SORTED.isKnown(flags))
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedLongSortingSink(sink);
else
return new LongSortingSink(sink);
}
@Override
public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator,
IntFunction<Long[]> generator) {
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
return helper.evaluate(spliterator, false, generator);
}
else {
Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
long[] content = n.asPrimitiveArray();
Arrays.parallelSort(content);
return Nodes.node(content);
}
}
}
/**
* Specialized subtype for sorting double streams.
*/
private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
OfDouble(AbstractPipeline<?, Double, ?> upstream) {
super(upstream, StreamShape.DOUBLE_VALUE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
}
@Override
public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.SORTED.isKnown(flags))
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedDoubleSortingSink(sink);
else
return new DoubleSortingSink(sink);
}
@Override
public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator,
IntFunction<Double[]> generator) {
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
return helper.evaluate(spliterator, false, generator);
}
else {
Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
double[] content = n.asPrimitiveArray();
Arrays.parallelSort(content);
return Nodes.node(content);
}
}
}
/**
* Abstract {@link Sink} for implementing sort on reference streams.
*
* <p>
* Note: documentation below applies to reference and all primitive sinks.
* <p>
* Sorting sinks first accept all elements, buffering then into an array
* or a re-sizable data structure, if the size of the pipeline is known or
* unknown respectively. At the end of the sink protocol those elements are
* sorted and then pushed downstream.
* This class records if {@link #cancellationRequested} is called. If so it
* can be inferred that the source pushing source elements into the pipeline
* knows that the pipeline is short-circuiting. In such cases sub-classes
* pushing elements downstream will preserve the short-circuiting protocol
* by calling {@code downstream.cancellationRequested()} and checking the
* result is {@code false} before an element is pushed.
* <p>
* Note that the above behaviour is an optimization for sorting with
* sequential streams. It is not an error that more elements, than strictly
* required to produce a result, may flow through the pipeline. This can
* occur, in general (not restricted to just sorting), for short-circuiting
* parallel pipelines.
*/
private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
protected final Comparator<? super T> comparator;
// @@@ could be a lazy final value, if/when support is added
protected boolean cancellationWasRequested;
AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
super(downstream);
this.comparator = comparator;
}
/**
* Records is cancellation is requested so short-circuiting behaviour
* can be preserved when the sorted elements are pushed downstream.
*
* @return false, as this sink never short-circuits.
*/
@Override
public final boolean cancellationRequested() {
cancellationWasRequested = true;
return false;
}
}
/**
* {@link Sink} for implementing sort on SIZED reference streams.
*/
private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
private T[] array;
private int offset;
SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
@SuppressWarnings("unchecked")
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
array = (T[]) new Object[(int) size];
}
@Override
public void end() {
Arrays.sort(array, 0, offset, comparator);
downstream.begin(offset);
if (!cancellationWasRequested) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
downstream.end();
array = null;
}
@Override
public void accept(T t) {
array[offset++] = t;
}
}
/**
* {@link Sink} for implementing sort on reference streams.
*/
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}
@Override
public void end() {
list.sort(comparator);
downstream.begin(list.size());
if (!cancellationWasRequested) {
list.forEach(downstream::accept);
}
else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);
}
}
downstream.end();
list = null;
}
@Override
public void accept(T t) {
list.add(t);
}
}
/**
* Abstract {@link Sink} for implementing sort on int streams.
*/
private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
protected boolean cancellationWasRequested;
AbstractIntSortingSink(Sink<? super Integer> downstream) {
super(downstream);
}
@Override
public final boolean cancellationRequested() {
cancellationWasRequested = true;
return false;
}
}
/**
* {@link Sink} for implementing sort on SIZED int streams.
*/
private static final class SizedIntSortingSink extends AbstractIntSortingSink {
private int[] array;
private int offset;
SizedIntSortingSink(Sink<? super Integer> downstream) {
super(downstream);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
array = new int[(int) size];
}
@Override
public void end() {
Arrays.sort(array, 0, offset);
downstream.begin(offset);
if (!cancellationWasRequested) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
downstream.end();
array = null;
}
@Override
public void accept(int t) {
array[offset++] = t;
}
}
/**
* {@link Sink} for implementing sort on int streams.
*/
private static final class IntSortingSink extends AbstractIntSortingSink {
private SpinedBuffer.OfInt b;
IntSortingSink(Sink<? super Integer> sink) {
super(sink);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
}
@Override
public void end() {
int[] ints = b.asPrimitiveArray();
Arrays.sort(ints);
downstream.begin(ints.length);
if (!cancellationWasRequested) {
for (int anInt : ints)
downstream.accept(anInt);
}
else {
for (int anInt : ints) {
if (downstream.cancellationRequested()) break;
downstream.accept(anInt);
}
}
downstream.end();
}
@Override
public void accept(int t) {
b.accept(t);
}
}
/**
* Abstract {@link Sink} for implementing sort on long streams.
*/
private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
protected boolean cancellationWasRequested;
AbstractLongSortingSink(Sink<? super Long> downstream) {
super(downstream);
}
@Override
public final boolean cancellationRequested() {
cancellationWasRequested = true;
return false;
}
}
/**
* {@link Sink} for implementing sort on SIZED long streams.
*/
private static final class SizedLongSortingSink extends AbstractLongSortingSink {
private long[] array;
private int offset;
SizedLongSortingSink(Sink<? super Long> downstream) {
super(downstream);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
array = new long[(int) size];
}
@Override
public void end() {
Arrays.sort(array, 0, offset);
downstream.begin(offset);
if (!cancellationWasRequested) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
downstream.end();
array = null;
}
@Override
public void accept(long t) {
array[offset++] = t;
}
}
/**
* {@link Sink} for implementing sort on long streams.
*/
private static final class LongSortingSink extends AbstractLongSortingSink {
private SpinedBuffer.OfLong b;
LongSortingSink(Sink<? super Long> sink) {
super(sink);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
}
@Override
public void end() {
long[] longs = b.asPrimitiveArray();
Arrays.sort(longs);
downstream.begin(longs.length);
if (!cancellationWasRequested) {
for (long aLong : longs)
downstream.accept(aLong);
}
else {
for (long aLong : longs) {
if (downstream.cancellationRequested()) break;
downstream.accept(aLong);
}
}
downstream.end();
}
@Override
public void accept(long t) {
b.accept(t);
}
}
/**
* Abstract {@link Sink} for implementing sort on long streams.
*/
private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
protected boolean cancellationWasRequested;
AbstractDoubleSortingSink(Sink<? super Double> downstream) {
super(downstream);
}
@Override
public final boolean cancellationRequested() {
cancellationWasRequested = true;
return false;
}
}
/**
* {@link Sink} for implementing sort on SIZED double streams.
*/
private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink {
private double[] array;
private int offset;
SizedDoubleSortingSink(Sink<? super Double> downstream) {
super(downstream);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
array = new double[(int) size];
}
@Override
public void end() {
Arrays.sort(array, 0, offset);
downstream.begin(offset);
if (!cancellationWasRequested) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
downstream.end();
array = null;
}
@Override
public void accept(double t) {
array[offset++] = t;
}
}
/**
* {@link Sink} for implementing sort on double streams.
*/
private static final class DoubleSortingSink extends AbstractDoubleSortingSink {
private SpinedBuffer.OfDouble b;
DoubleSortingSink(Sink<? super Double> sink) {
super(sink);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
}
@Override
public void end() {
double[] doubles = b.asPrimitiveArray();
Arrays.sort(doubles);
downstream.begin(doubles.length);
if (!cancellationWasRequested) {
for (double aDouble : doubles)
downstream.accept(aDouble);
}
else {
for (double aDouble : doubles) {
if (downstream.cancellationRequested()) break;
downstream.accept(aDouble);
}
}
downstream.end();
}
@Override
public void accept(double t) {
b.accept(t);
}
}
}