blob: 9baec34b1896a2abd5a597cd5435991a17297d16 [file] [log] [blame]
/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.util.ArrayDeque;
import java.util.Deque;
import javax.annotation.Nullable;
/**
* A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
* facade that allows multiple buffers to be treated as one.
*
* <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once
* the composite has read past the end of a given buffer, that buffer is automatically closed and
* removed from the composite.
*/
public class CompositeReadableBuffer extends AbstractReadableBuffer {
private final Deque<ReadableBuffer> readableBuffers;
private Deque<ReadableBuffer> rewindableBuffers;
private int readableBytes;
private boolean marked;
public CompositeReadableBuffer(int initialCapacity) {
readableBuffers = new ArrayDeque<>(initialCapacity);
}
public CompositeReadableBuffer() {
readableBuffers = new ArrayDeque<>();
}
/**
* Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
* expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the
* buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
* this {@code CompositeBuffer}.
*/
public void addBuffer(ReadableBuffer buffer) {
boolean markHead = marked && readableBuffers.isEmpty();
enqueueBuffer(buffer);
if (markHead) {
readableBuffers.peek().mark();
}
}
private void enqueueBuffer(ReadableBuffer buffer) {
if (!(buffer instanceof CompositeReadableBuffer)) {
readableBuffers.add(buffer);
readableBytes += buffer.readableBytes();
return;
}
CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
while (!compositeBuffer.readableBuffers.isEmpty()) {
ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove();
readableBuffers.add(subBuffer);
}
readableBytes += compositeBuffer.readableBytes;
compositeBuffer.readableBytes = 0;
compositeBuffer.close();
}
@Override
public int readableBytes() {
return readableBytes;
}
private static final NoThrowReadOperation<Void> UBYTE_OP =
new NoThrowReadOperation<Void>() {
@Override
public int read(ReadableBuffer buffer, int length, Void unused, int value) {
return buffer.readUnsignedByte();
}
};
@Override
public int readUnsignedByte() {
return executeNoThrow(UBYTE_OP, 1, null, 0);
}
private static final NoThrowReadOperation<Void> SKIP_OP =
new NoThrowReadOperation<Void>() {
@Override
public int read(ReadableBuffer buffer, int length, Void unused, int unused2) {
buffer.skipBytes(length);
return 0;
}
};
@Override
public void skipBytes(int length) {
executeNoThrow(SKIP_OP, length, null, 0);
}
private static final NoThrowReadOperation<byte[]> BYTE_ARRAY_OP =
new NoThrowReadOperation<byte[]>() {
@Override
public int read(ReadableBuffer buffer, int length, byte[] dest, int offset) {
buffer.readBytes(dest, offset, length);
return offset + length;
}
};
@Override
public void readBytes(byte[] dest, int destOffset, int length) {
executeNoThrow(BYTE_ARRAY_OP, length, dest, destOffset);
}
private static final NoThrowReadOperation<ByteBuffer> BYTE_BUF_OP =
new NoThrowReadOperation<ByteBuffer>() {
@Override
public int read(ReadableBuffer buffer, int length, ByteBuffer dest, int unused) {
// Change the limit so that only lengthToCopy bytes are available.
int prevLimit = dest.limit();
((Buffer) dest).limit(dest.position() + length);
// Write the bytes and restore the original limit.
buffer.readBytes(dest);
((Buffer) dest).limit(prevLimit);
return 0;
}
};
@Override
public void readBytes(ByteBuffer dest) {
executeNoThrow(BYTE_BUF_OP, dest.remaining(), dest, 0);
}
private static final ReadOperation<OutputStream> STREAM_OP =
new ReadOperation<OutputStream>() {
@Override
public int read(ReadableBuffer buffer, int length, OutputStream dest, int unused)
throws IOException {
buffer.readBytes(dest, length);
return 0;
}
};
@Override
public void readBytes(OutputStream dest, int length) throws IOException {
execute(STREAM_OP, length, dest, 0);
}
@Override
public ReadableBuffer readBytes(int length) {
if (length <= 0) {
return ReadableBuffers.empty();
}
checkReadable(length);
readableBytes -= length;
ReadableBuffer newBuffer = null;
CompositeReadableBuffer newComposite = null;
do {
ReadableBuffer buffer = readableBuffers.peek();
int readable = buffer.readableBytes();
ReadableBuffer readBuffer;
if (readable > length) {
readBuffer = buffer.readBytes(length);
length = 0;
} else {
if (marked) {
readBuffer = buffer.readBytes(readable);
advanceBuffer();
} else {
readBuffer = readableBuffers.poll();
}
length -= readable;
}
if (newBuffer == null) {
newBuffer = readBuffer;
} else {
if (newComposite == null) {
newComposite = new CompositeReadableBuffer(
length == 0 ? 2 : Math.min(readableBuffers.size() + 2, 16));
newComposite.addBuffer(newBuffer);
newBuffer = newComposite;
}
newComposite.addBuffer(readBuffer);
}
} while (length > 0);
return newBuffer;
}
@Override
public boolean markSupported() {
for (ReadableBuffer buffer : readableBuffers) {
if (!buffer.markSupported()) {
return false;
}
}
return true;
}
@Override
public void mark() {
if (rewindableBuffers == null) {
rewindableBuffers = new ArrayDeque<>(Math.min(readableBuffers.size(), 16));
}
while (!rewindableBuffers.isEmpty()) {
rewindableBuffers.remove().close();
}
marked = true;
ReadableBuffer buffer = readableBuffers.peek();
if (buffer != null) {
buffer.mark();
}
}
@Override
public void reset() {
if (!marked) {
throw new InvalidMarkException();
}
ReadableBuffer buffer;
if ((buffer = readableBuffers.peek()) != null) {
int currentRemain = buffer.readableBytes();
buffer.reset();
readableBytes += (buffer.readableBytes() - currentRemain);
}
while ((buffer = rewindableBuffers.pollLast()) != null) {
buffer.reset();
readableBuffers.addFirst(buffer);
readableBytes += buffer.readableBytes();
}
}
@Override
public boolean byteBufferSupported() {
for (ReadableBuffer buffer : readableBuffers) {
if (!buffer.byteBufferSupported()) {
return false;
}
}
return true;
}
@Nullable
@Override
public ByteBuffer getByteBuffer() {
if (readableBuffers.isEmpty()) {
return null;
}
return readableBuffers.peek().getByteBuffer();
}
@Override
public void close() {
while (!readableBuffers.isEmpty()) {
readableBuffers.remove().close();
}
if (rewindableBuffers != null) {
while (!rewindableBuffers.isEmpty()) {
rewindableBuffers.remove().close();
}
}
}
/**
* Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to
* satisfy the requested {@code length}.
*/
private <T> int execute(ReadOperation<T> op, int length, T dest, int value) throws IOException {
checkReadable(length);
if (!readableBuffers.isEmpty()) {
advanceBufferIfNecessary();
}
for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) {
ReadableBuffer buffer = readableBuffers.peek();
int lengthToCopy = Math.min(length, buffer.readableBytes());
// Perform the read operation for this buffer.
value = op.read(buffer, lengthToCopy, dest, value);
length -= lengthToCopy;
readableBytes -= lengthToCopy;
}
if (length > 0) {
// Should never get here.
throw new AssertionError("Failed executing read operation");
}
return value;
}
private <T> int executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, int value) {
try {
return execute(op, length, dest, value);
} catch (IOException e) {
throw new AssertionError(e); // shouldn't happen
}
}
/**
* If the current buffer is exhausted, removes and closes it.
*/
private void advanceBufferIfNecessary() {
ReadableBuffer buffer = readableBuffers.peek();
if (buffer.readableBytes() == 0) {
advanceBuffer();
}
}
/**
* Removes one buffer from the front and closes it.
*/
private void advanceBuffer() {
if (marked) {
rewindableBuffers.add(readableBuffers.remove());
ReadableBuffer next = readableBuffers.peek();
if (next != null) {
next.mark();
}
} else {
readableBuffers.remove().close();
}
}
/**
* A simple read operation to perform on a single {@link ReadableBuffer}.
* All state management for the buffers is done by
* {@link CompositeReadableBuffer#execute(ReadOperation, int, Object, int)}.
*/
private interface ReadOperation<T> {
/**
* This method can also be used to simultaneously perform operation-specific int-valued
* aggregation over the sequence of buffers in a {@link CompositeReadableBuffer}.
* {@code value} is the return value from the prior buffer, or the "initial" value passed
* to {@code execute()} in the case of the first buffer. {@code execute()} returns the value
* returned by the operation called on the last buffer.
*/
int read(ReadableBuffer buffer, int length, T dest, int value) throws IOException;
}
private interface NoThrowReadOperation<T> extends ReadOperation<T> {
@Override
int read(ReadableBuffer buffer, int length, T dest, int value);
}
}