blob: 8f5f4964f414796b15f0731dea8ec281e5a858e4 [file] [log] [blame]
/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
* facade that allows multiple buffers to be treated as one.
*
* <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once
* the composite has read past the end of a given buffer, that buffer is automatically closed and
* removed from the composite.
*/
public class CompositeReadableBuffer extends AbstractReadableBuffer {
private int readableBytes;
private final Queue<ReadableBuffer> buffers = new ArrayDeque<ReadableBuffer>();
/**
* Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
* expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the
* buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
* this {@code CompositeBuffer}.
*/
public void addBuffer(ReadableBuffer buffer) {
if (!(buffer instanceof CompositeReadableBuffer)) {
buffers.add(buffer);
readableBytes += buffer.readableBytes();
return;
}
CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
while (!compositeBuffer.buffers.isEmpty()) {
ReadableBuffer subBuffer = compositeBuffer.buffers.remove();
buffers.add(subBuffer);
}
readableBytes += compositeBuffer.readableBytes;
compositeBuffer.readableBytes = 0;
compositeBuffer.close();
}
@Override
public int readableBytes() {
return readableBytes;
}
@Override
public int readUnsignedByte() {
ReadOperation op = new ReadOperation() {
@Override
int readInternal(ReadableBuffer buffer, int length) {
return buffer.readUnsignedByte();
}
};
execute(op, 1);
return op.value;
}
@Override
public void skipBytes(int length) {
execute(new ReadOperation() {
@Override
public int readInternal(ReadableBuffer buffer, int length) {
buffer.skipBytes(length);
return 0;
}
}, length);
}
@Override
public void readBytes(final byte[] dest, final int destOffset, int length) {
execute(new ReadOperation() {
int currentOffset = destOffset;
@Override
public int readInternal(ReadableBuffer buffer, int length) {
buffer.readBytes(dest, currentOffset, length);
currentOffset += length;
return 0;
}
}, length);
}
@Override
public void readBytes(final ByteBuffer dest) {
execute(new ReadOperation() {
@Override
public int readInternal(ReadableBuffer buffer, int length) {
// Change the limit so that only lengthToCopy bytes are available.
int prevLimit = dest.limit();
dest.limit(dest.position() + length);
// Write the bytes and restore the original limit.
buffer.readBytes(dest);
dest.limit(prevLimit);
return 0;
}
}, dest.remaining());
}
@Override
public void readBytes(final OutputStream dest, int length) throws IOException {
ReadOperation op = new ReadOperation() {
@Override
public int readInternal(ReadableBuffer buffer, int length) throws IOException {
buffer.readBytes(dest, length);
return 0;
}
};
execute(op, length);
// If an exception occurred, throw it.
if (op.isError()) {
throw op.ex;
}
}
@Override
public CompositeReadableBuffer readBytes(int length) {
checkReadable(length);
readableBytes -= length;
CompositeReadableBuffer newBuffer = new CompositeReadableBuffer();
while (length > 0) {
ReadableBuffer buffer = buffers.peek();
if (buffer.readableBytes() > length) {
newBuffer.addBuffer(buffer.readBytes(length));
length = 0;
} else {
newBuffer.addBuffer(buffers.poll());
length -= buffer.readableBytes();
}
}
return newBuffer;
}
@Override
public void close() {
while (!buffers.isEmpty()) {
buffers.remove().close();
}
}
/**
* Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to
* satisfy the requested {@code length}.
*/
private void execute(ReadOperation op, int length) {
checkReadable(length);
if (!buffers.isEmpty()) {
advanceBufferIfNecessary();
}
for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
ReadableBuffer buffer = buffers.peek();
int lengthToCopy = Math.min(length, buffer.readableBytes());
// Perform the read operation for this buffer.
op.read(buffer, lengthToCopy);
if (op.isError()) {
return;
}
length -= lengthToCopy;
readableBytes -= lengthToCopy;
}
if (length > 0) {
// Should never get here.
throw new AssertionError("Failed executing read operation");
}
}
/**
* If the current buffer is exhausted, removes and closes it.
*/
private void advanceBufferIfNecessary() {
ReadableBuffer buffer = buffers.peek();
if (buffer.readableBytes() == 0) {
buffers.remove().close();
}
}
/**
* A simple read operation to perform on a single {@link ReadableBuffer}. All state management for
* the buffers is done by {@link CompositeReadableBuffer#execute(ReadOperation, int)}.
*/
private abstract static class ReadOperation {
/**
* Only used by {@link CompositeReadableBuffer#readUnsignedByte()}.
*/
int value;
/**
* Only used by {@link CompositeReadableBuffer#readBytes(OutputStream, int)}.
*/
IOException ex;
final void read(ReadableBuffer buffer, int length) {
try {
value = readInternal(buffer, length);
} catch (IOException e) {
ex = e;
}
}
final boolean isError() {
return ex != null;
}
abstract int readInternal(ReadableBuffer buffer, int length) throws IOException;
}
}