blob: eca5957bc0d5aa28b922bc60b57f2302d2e5503c [file] [log] [blame]
/*
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.incubator.http;
import java.io.IOException;
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.Utils;
/**
* Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
*
* Call pushBody() to read the body (blocking). Data and errors are provided
* to given Consumers. After final buffer delivered, empty optional delivered
*/
class ResponseContent {
static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
final HttpResponse.BodySubscriber<?> pusher;
final int contentLength;
final HttpHeaders headers;
// this needs to run before we complete the body
// so that connection can be returned to pool
private final Runnable onFinished;
private final String dbgTag;
ResponseContent(HttpConnection connection,
int contentLength,
HttpHeaders h,
HttpResponse.BodySubscriber<?> userSubscriber,
Runnable onFinished)
{
this.pusher = userSubscriber;
this.contentLength = contentLength;
this.headers = h;
this.onFinished = onFinished;
this.dbgTag = connection.dbgString() + "/ResponseContent";
}
static final int LF = 10;
static final int CR = 13;
private boolean chunkedContent, chunkedContentInitialized;
boolean contentChunked() throws IOException {
if (chunkedContentInitialized) {
return chunkedContent;
}
if (contentLength == -1) {
String tc = headers.firstValue("Transfer-Encoding")
.orElse("");
if (!tc.equals("")) {
if (tc.equalsIgnoreCase("chunked")) {
chunkedContent = true;
} else {
throw new IOException("invalid content");
}
} else {
chunkedContent = false;
}
}
chunkedContentInitialized = true;
return chunkedContent;
}
interface BodyParser extends Consumer<ByteBuffer> {
void onSubscribe(AbstractSubscription sub);
}
// Returns a parser that will take care of parsing the received byte
// buffers and forward them to the BodySubscriber.
// When the parser is done, it will call onComplete.
// If parsing was successful, the throwable parameter will be null.
// Otherwise it will be the exception that occurred
// Note: revisit: it might be better to use a CompletableFuture than
// a completion handler.
BodyParser getBodyParser(Consumer<Throwable> onComplete)
throws IOException {
if (contentChunked()) {
return new ChunkedBodyParser(onComplete);
} else {
return new FixedLengthBodyParser(contentLength, onComplete);
}
}
static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
class ChunkedBodyParser implements BodyParser {
final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
final Consumer<Throwable> onComplete;
final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
volatile Throwable closedExceptionally;
volatile int partialChunklen = 0; // partially read chunk len
volatile int chunklen = -1; // number of bytes in chunk
volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF
volatile boolean cr = false; // tryReadChunkLength has found CR
volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding
volatile ChunkState state = ChunkState.READING_LENGTH; // current state
volatile AbstractSubscription sub;
ChunkedBodyParser(Consumer<Throwable> onComplete) {
this.onComplete = onComplete;
}
String dbgString() {
return dbgTag;
}
@Override
public void onSubscribe(AbstractSubscription sub) {
debug.log(Level.DEBUG, () -> "onSubscribe: "
+ pusher.getClass().getName());
pusher.onSubscribe(this.sub = sub);
}
@Override
public void accept(ByteBuffer b) {
if (closedExceptionally != null) {
debug.log(Level.DEBUG, () -> "already closed: "
+ closedExceptionally);
return;
}
boolean completed = false;
try {
List<ByteBuffer> out = new ArrayList<>();
do {
if (tryPushOneHunk(b, out)) {
// We're done! (true if the final chunk was parsed).
if (!out.isEmpty()) {
// push what we have and complete
// only reduce demand if we actually push something.
// we would not have come here if there was no
// demand.
boolean hasDemand = sub.demand().tryDecrement();
assert hasDemand;
pusher.onNext(out);
}
debug.log(Level.DEBUG, () -> "done!");
assert closedExceptionally == null;
assert state == ChunkState.DONE;
onFinished.run();
pusher.onComplete();
completed = true;
onComplete.accept(closedExceptionally); // should be null
break;
}
// the buffer may contain several hunks, and therefore
// we must loop while it's not exhausted.
} while (b.hasRemaining());
if (!completed && !out.isEmpty()) {
// push what we have.
// only reduce demand if we actually push something.
// we would not have come here if there was no
// demand.
boolean hasDemand = sub.demand().tryDecrement();
assert hasDemand;
pusher.onNext(out);
}
assert state == ChunkState.DONE || !b.hasRemaining();
} catch(Throwable t) {
closedExceptionally = t;
if (!completed) onComplete.accept(t);
}
}
// reads and returns chunklen. Position of chunkbuf is first byte
// of chunk on return. chunklen includes the CR LF at end of chunk
// returns -1 if needs more bytes
private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
assert state == ChunkState.READING_LENGTH;
while (chunkbuf.hasRemaining()) {
int c = chunkbuf.get();
if (cr) {
if (c == LF) {
return partialChunklen;
} else {
throw new IOException("invalid chunk header");
}
}
if (c == CR) {
cr = true;
} else {
int digit = toDigit(c);
partialChunklen = partialChunklen * 16 + digit;
}
}
return -1;
}
// try to consume as many bytes as specified by bytesToConsume.
// returns the number of bytes that still need to be consumed.
// In practice this method is only called to consume one CRLF pair
// with bytesToConsume set to 2, so it will only return 0 (if completed),
// 1, or 2 (if chunkbuf doesn't have the 2 chars).
private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
int n = bytesToConsume;
if (n > 0) {
int e = Math.min(chunkbuf.remaining(), n);
// verifies some assertions
// this methods is called only to consume CRLF
if (Utils.ASSERTIONSENABLED) {
assert n <= 2 && e <= 2;
ByteBuffer tmp = chunkbuf.slice();
// if n == 2 assert that we will first consume CR
assert (n == 2 && e > 0) ? tmp.get() == CR : true;
// if n == 1 || n == 2 && e == 2 assert that we then consume LF
assert (n == 1 || e == 2) ? tmp.get() == LF : true;
}
chunkbuf.position(chunkbuf.position() + e);
n -= e;
bytesToConsume = n;
}
assert n >= 0;
return n;
}
/**
* Returns a ByteBuffer containing chunk of data or a "hunk" of data
* (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
* If the given chunk does not have enough data this method return
* an empty ByteBuffer (READMORE).
* If we encounter the final chunk (an empty chunk) this method
* returns null.
*/
ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
int unfulfilled = bytesremaining;
int toconsume = bytesToConsume;
ChunkState st = state;
if (st == ChunkState.READING_LENGTH && chunklen == -1) {
debug.log(Level.DEBUG, () -> "Trying to read chunk len"
+ " (remaining in buffer:"+chunk.remaining()+")");
int clen = chunklen = tryReadChunkLen(chunk);
if (clen == -1) return READMORE;
debug.log(Level.DEBUG, "Got chunk len %d", clen);
cr = false; partialChunklen = 0;
unfulfilled = bytesremaining = clen;
if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
else st = state = ChunkState.READING_DATA; // read the data
}
if (toconsume > 0) {
debug.log(Level.DEBUG,
"Trying to consume bytes: %d (remaining in buffer: %s)",
toconsume, chunk.remaining());
if (tryConsumeBytes(chunk) > 0) {
return READMORE;
}
}
toconsume = bytesToConsume;
assert toconsume == 0;
if (st == ChunkState.READING_LENGTH) {
// we will come here only if chunklen was 0, after having
// consumed the trailing CRLF
int clen = chunklen;
assert clen == 0;
debug.log(Level.DEBUG, "No more chunks: %d", clen);
// the DONE state is not really needed but it helps with
// assertions...
state = ChunkState.DONE;
return null;
}
int clen = chunklen;
assert clen > 0;
assert st == ChunkState.READING_DATA;
ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
if (unfulfilled > 0) {
int bytesread = chunk.remaining();
debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
bytesread, unfulfilled);
int bytes2return = Math.min(bytesread, unfulfilled);
debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return);
returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return);
unfulfilled = bytesremaining -= bytes2return;
if (unfulfilled == 0) bytesToConsume = 2;
}
assert unfulfilled >= 0;
if (unfulfilled == 0) {
debug.log(Level.DEBUG,
"No more bytes to read - %d yet to consume.",
unfulfilled);
// check whether the trailing CRLF is consumed, try to
// consume it if not. If tryConsumeBytes needs more bytes
// then we will come back here later - skipping the block
// that reads data because remaining==0, and finding
// that the two bytes are now consumed.
if (tryConsumeBytes(chunk) == 0) {
// we're done for this chunk! reset all states and
// prepare to read the next chunk.
chunklen = -1;
partialChunklen = 0;
cr = false;
state = ChunkState.READING_LENGTH;
debug.log(Level.DEBUG, "Ready to read next chunk");
}
}
if (returnBuffer == READMORE) {
debug.log(Level.DEBUG, "Need more data");
}
return returnBuffer;
}
// Attempt to parse and push one hunk from the buffer.
// Returns true if the final chunk was parsed.
// Returns false if we need to push more chunks.
private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
throws IOException {
assert state != ChunkState.DONE;
ByteBuffer b1 = tryReadOneHunk(b);
if (b1 != null) {
//assert b1.hasRemaining() || b1 == READMORE;
if (b1.hasRemaining()) {
debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
b1.remaining());
out.add(b1);
debug.log(Level.DEBUG, "Chunk sent.");
}
return false; // we haven't parsed the final chunk yet.
} else {
return true; // we're done! the final chunk was parsed.
}
}
private int toDigit(int b) throws IOException {
if (b >= 0x30 && b <= 0x39) {
return b - 0x30;
}
if (b >= 0x41 && b <= 0x46) {
return b - 0x41 + 10;
}
if (b >= 0x61 && b <= 0x66) {
return b - 0x61 + 10;
}
throw new IOException("Invalid chunk header byte " + b);
}
}
class FixedLengthBodyParser implements BodyParser {
final int contentLength;
final Consumer<Throwable> onComplete;
final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
volatile int remaining;
volatile Throwable closedExceptionally;
volatile AbstractSubscription sub;
FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
this.contentLength = this.remaining = contentLength;
this.onComplete = onComplete;
}
String dbgString() {
return dbgTag;
}
@Override
public void onSubscribe(AbstractSubscription sub) {
debug.log(Level.DEBUG, () -> "length="
+ contentLength +", onSubscribe: "
+ pusher.getClass().getName());
pusher.onSubscribe(this.sub = sub);
try {
if (contentLength == 0) {
onFinished.run();
pusher.onComplete();
onComplete.accept(null);
}
} catch (Throwable t) {
closedExceptionally = t;
try {
pusher.onError(t);
} finally {
onComplete.accept(t);
}
}
}
@Override
public void accept(ByteBuffer b) {
if (closedExceptionally != null) {
debug.log(Level.DEBUG, () -> "already closed: "
+ closedExceptionally);
return;
}
boolean completed = false;
try {
int unfulfilled = remaining;
debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
b.remaining(), unfulfilled, contentLength);
assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
if (unfulfilled == 0 && contentLength > 0) return;
if (b.hasRemaining() && unfulfilled > 0) {
// only reduce demand if we actually push something.
// we would not have come here if there was no
// demand.
boolean hasDemand = sub.demand().tryDecrement();
assert hasDemand;
int amount = Math.min(b.remaining(), unfulfilled);
unfulfilled = remaining -= amount;
ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount);
pusher.onNext(List.of(buffer));
}
if (unfulfilled == 0) {
// We're done! All data has been received.
assert closedExceptionally == null;
onFinished.run();
pusher.onComplete();
completed = true;
onComplete.accept(closedExceptionally); // should be null
} else {
assert b.remaining() == 0;
}
} catch (Throwable t) {
debug.log(Level.DEBUG, "Unexpected exception", t);
closedExceptionally = t;
if (!completed) {
onComplete.accept(t);
}
}
}
}
}