blob: 51d71092facbe95c6c1eb047f7ca5f249d3a3afc [file] [log] [blame]
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
*/
package java.net.http;
import sun.net.httpclient.hpack.DecodingCallback;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.LongConsumer;
/**
* Http/2 Stream handling.
*
* REQUESTS
*
* sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
*
* sendRequest() -- sendHeadersOnly() + sendBody()
*
* sendBody() -- in calling thread: obeys all flow control (so may block)
* obtains data from request body processor and places on connection
* outbound Q.
*
* sendBodyAsync() -- calls sendBody() in an executor thread.
*
* sendHeadersAsync() -- calls sendHeadersOnly() which does not block
*
* sendRequestAsync() -- calls sendRequest() in an executor thread
*
* RESPONSES
*
* Multiple responses can be received per request. Responses are queued up on
* a LinkedList of CF<HttpResponse> and the the first one on the list is completed
* with the next response
*
* getResponseAsync() -- queries list of response CFs and returns first one
* if one exists. Otherwise, creates one and adds it to list
* and returns it. Completion is achieved through the
* incoming() upcall from connection reader thread.
*
* getResponse() -- calls getResponseAsync() and waits for CF to complete
*
* responseBody() -- in calling thread: blocks for incoming DATA frames on
* stream inputQ. Obeys remote and local flow control so may block.
* Calls user response body processor with data buffers.
*
* responseBodyAsync() -- calls responseBody() in an executor thread.
*
* incoming() -- entry point called from connection reader thread. Frames are
* either handled immediately without blocking or for data frames
* placed on the stream's inputQ which is consumed by the stream's
* reader thread.
*
* PushedStream sub class
* ======================
* Sending side methods are not used because the request comes from a PUSH_PROMISE
* frame sent by the server. When a PUSH_PROMISE is received the PushedStream
* is created. PushedStream does not use responseCF list as there can be only
* one response. The CF is created when the object created and when the response
* HEADERS frame is received the object is completed.
*/
class Stream extends ExchangeImpl {
final Queue<Http2Frame> inputQ;
volatile int streamid;
long responseContentLen = -1;
long responseBytesProcessed = 0;
long requestContentLen;
Http2Connection connection;
HttpClientImpl client;
final HttpRequestImpl request;
final DecodingCallback rspHeadersConsumer;
HttpHeadersImpl responseHeaders;
final HttpHeadersImpl requestHeaders;
final HttpHeadersImpl requestPseudoHeaders;
HttpResponse.BodyProcessor<?> responseProcessor;
final HttpRequest.BodyProcessor requestProcessor;
HttpResponse response;
// state flags
boolean requestSent, responseReceived;
final FlowController userRequestFlowController =
new FlowController();
final FlowController remoteRequestFlowController =
new FlowController();
final FlowController responseFlowController =
new FlowController();
final ExecutorWrapper executor;
@Override
@SuppressWarnings("unchecked")
<T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
this.responseProcessor = processor;
CompletableFuture<T> cf;
try {
T body = processor.onResponseBodyStart(
responseContentLen, responseHeaders,
responseFlowController); // TODO: filter headers
if (body != null) {
cf = CompletableFuture.completedFuture(body);
receiveDataAsync(processor);
} else
cf = receiveDataAsync(processor);
} catch (IOException e) {
cf = CompletableFuture.failedFuture(e);
}
PushGroup<?> pg = request.pushGroup();
if (pg != null) {
// if an error occurs make sure it is recorded in the PushGroup
cf = cf.whenComplete((t,e) -> pg.pushError(e));
}
return cf;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("streamid: ")
.append(streamid);
return sb.toString();
}
// pushes entire response body into response processor
// blocking when required by local or remote flow control
void receiveData() throws IOException {
Http2Frame frame;
DataFrame df = null;
try {
do {
frame = inputQ.take();
if (!(frame instanceof DataFrame)) {
assert false;
continue;
}
df = (DataFrame) frame;
int len = df.getDataLength();
ByteBuffer[] buffers = df.getData();
for (ByteBuffer b : buffers) {
responseFlowController.take();
responseProcessor.onResponseBodyChunk(b);
}
sendWindowUpdate(len);
} while (!df.getFlag(DataFrame.END_STREAM));
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
CompletableFuture<T> cf = new CompletableFuture<>();
executor.execute(() -> {
try {
receiveData();
T body = processor.onResponseComplete();
cf.complete(body);
responseReceived();
} catch (Throwable t) {
cf.completeExceptionally(t);
}
}, null);
return cf;
}
private void sendWindowUpdate(int increment)
throws IOException, InterruptedException {
if (increment == 0)
return;
LinkedList<Http2Frame> list = new LinkedList<>();
WindowUpdateFrame frame = new WindowUpdateFrame();
frame.streamid(streamid);
frame.setUpdate(increment);
list.add(frame);
frame = new WindowUpdateFrame();
frame.streamid(0);
frame.setUpdate(increment);
list.add(frame);
connection.sendFrames(list);
}
@Override
CompletableFuture<Void> sendBodyAsync() {
final CompletableFuture<Void> cf = new CompletableFuture<>();
executor.execute(() -> {
try {
sendBodyImpl();
cf.complete(null);
} catch (IOException | InterruptedException e) {
cf.completeExceptionally(e);
}
}, null);
return cf;
}
@SuppressWarnings("unchecked")
Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
super(e);
this.client = client;
this.connection = connection;
this.request = e.request();
this.requestProcessor = request.requestProcessor();
responseHeaders = new HttpHeadersImpl();
requestHeaders = new HttpHeadersImpl();
rspHeadersConsumer = (name, value) -> {
responseHeaders.addHeader(name.toString(), value.toString());
};
this.executor = client.executorWrapper();
//this.response_cf = new CompletableFuture<HttpResponseImpl>();
this.requestPseudoHeaders = new HttpHeadersImpl();
// NEW
this.inputQ = new Queue<>();
}
@SuppressWarnings("unchecked")
Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
super(null);
this.client = client;
this.connection = connection;
this.request = req;
this.requestProcessor = null;
responseHeaders = new HttpHeadersImpl();
requestHeaders = new HttpHeadersImpl();
rspHeadersConsumer = (name, value) -> {
responseHeaders.addHeader(name.toString(), value.toString());
};
this.executor = client.executorWrapper();
//this.response_cf = new CompletableFuture<HttpResponseImpl>();
this.requestPseudoHeaders = new HttpHeadersImpl();
// NEW
this.inputQ = new Queue<>();
}
/**
* Entry point from Http2Connection reader thread.
*
* Data frames will be removed by response body thread.
*
* @param frame
* @throws IOException
*/
void incoming(Http2Frame frame) throws IOException, InterruptedException {
if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
// Complete headers accumulated. handle response.
// It's okay if there are multiple HeaderFrames.
handleResponse();
} else if (frame instanceof DataFrame) {
inputQ.put(frame);
} else {
otherFrame(frame);
}
}
void otherFrame(Http2Frame frame) throws IOException {
switch (frame.type()) {
case WindowUpdateFrame.TYPE:
incoming_windowUpdate((WindowUpdateFrame) frame);
break;
case ResetFrame.TYPE:
incoming_reset((ResetFrame) frame);
break;
case PriorityFrame.TYPE:
incoming_priority((PriorityFrame) frame);
break;
default:
String msg = "Unexpected frame: " + frame.toString();
throw new IOException(msg);
}
}
// The Hpack decoder decodes into one of these consumers of name,value pairs
DecodingCallback rspHeadersConsumer() {
return rspHeadersConsumer;
}
// create and return the HttpResponseImpl
protected void handleResponse() throws IOException {
HttpConnection c = connection.connection; // TODO: improve
long statusCode = responseHeaders
.firstValueAsLong(":status")
.orElseThrow(() -> new IOException("no statuscode in response"));
this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null,
c.sslParameters(), HttpClient.Version.HTTP_2, c);
this.responseContentLen = responseHeaders
.firstValueAsLong("content-length")
.orElse(-1L);
// different implementations for normal streams and pushed streams
completeResponse(response);
}
void incoming_reset(ResetFrame frame) {
// TODO: implement reset
int error = frame.getErrorCode();
IOException e = new IOException(ErrorFrame.stringForCode(error));
completeResponseExceptionally(e);
throw new UnsupportedOperationException("Not implemented");
}
void incoming_priority(PriorityFrame frame) {
// TODO: implement priority
throw new UnsupportedOperationException("Not implemented");
}
void incoming_windowUpdate(WindowUpdateFrame frame) {
int amount = frame.getUpdate();
if (amount > 0)
remoteRequestFlowController.accept(amount);
}
void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
if (Log.requests()) {
Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
}
PushGroup<?> pushGroup = request.pushGroup();
if (pushGroup == null) {
cancelImpl(new IllegalStateException("unexpected push promise"));
}
// get the handler and call it.
BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
pushGroup.pushHandler();
CompletableFuture<HttpResponse> pushCF = pushStream
.getResponseAsync(null)
.thenApply(r -> (HttpResponse)r);
boolean accept = ph.apply(pushReq, pushCF);
if (!accept) {
IOException ex = new IOException("Stream cancelled by user");
cancelImpl(ex);
pushCF.completeExceptionally(ex);
} else {
pushStream.requestSent();
pushGroup.addPush();
}
}
private OutgoingHeaders headerFrame(long contentLength) {
HttpHeadersImpl h = request.getSystemHeaders();
if (contentLength > 0) {
h.setHeader("content-length", Long.toString(contentLength));
}
setPseudoHeaderFields();
OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this);
if (contentLength == 0) {
f.setFlag(HeadersFrame.END_STREAM);
}
return f;
}
private void setPseudoHeaderFields() {
HttpHeadersImpl hdrs = requestPseudoHeaders;
String method = request.method();
hdrs.setHeader(":method", method);
URI uri = request.uri();
hdrs.setHeader(":scheme", uri.getScheme());
// TODO: userinfo deprecated. Needs to be removed
hdrs.setHeader(":authority", uri.getAuthority());
// TODO: ensure header names beginning with : not in user headers
String query = uri.getQuery();
String path = uri.getPath();
if (path == null) {
if (method.equalsIgnoreCase("OPTIONS")) {
path = "*";
} else {
path = "/";
}
}
if (query != null) {
path += "?" + query;
}
hdrs.setHeader(":path", path);
}
HttpHeadersImpl getRequestPseudoHeaders() {
return requestPseudoHeaders;
}
@Override
HttpResponseImpl getResponse() throws IOException {
try {
if (request.timeval() > 0) {
return getResponseAsync(null).get(
request.timeval(), TimeUnit.MILLISECONDS);
} else {
return getResponseAsync(null).join();
}
} catch (TimeoutException e) {
throw new HttpTimeoutException("Response timed out");
} catch (InterruptedException | ExecutionException | CompletionException e) {
Throwable t = e.getCause();
if (t instanceof IOException) {
throw (IOException)t;
}
throw new IOException(e);
}
}
@Override
void sendRequest() throws IOException, InterruptedException {
sendHeadersOnly();
sendBody();
}
/**
* A simple general purpose blocking flow controller
*/
class FlowController implements LongConsumer {
int permits;
FlowController() {
this.permits = 0;
}
@Override
public synchronized void accept(long n) {
if (n < 1) {
throw new InternalError("FlowController.accept called with " + n);
}
if (permits == 0) {
permits += n;
notifyAll();
} else {
permits += n;
}
}
public synchronized void take() throws InterruptedException {
take(1);
}
public synchronized void take(int amount) throws InterruptedException {
assert permits >= 0;
while (permits < amount) {
int n = Math.min(amount, permits);
permits -= n;
amount -= n;
if (amount > 0)
wait();
}
}
}
@Override
void sendHeadersOnly() throws IOException, InterruptedException {
if (Log.requests() && request != null) {
Log.logRequest(request.toString());
}
requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
OutgoingHeaders f = headerFrame(requestContentLen);
connection.sendFrame(f);
}
@Override
void sendBody() throws IOException, InterruptedException {
sendBodyImpl();
}
void registerStream(int id) {
this.streamid = id;
connection.putStream(this, streamid);
}
DataFrame getDataFrame() throws IOException, InterruptedException {
userRequestFlowController.take();
int maxpayloadLen = connection.getMaxSendFrameSize() - 9;
ByteBuffer buffer = connection.getBuffer();
buffer.limit(maxpayloadLen);
boolean complete = requestProcessor.onRequestBodyChunk(buffer);
buffer.flip();
int amount = buffer.remaining();
// wait for flow control if necessary. Following method will block
// until after headers frame is sent, so correct streamid is set.
remoteRequestFlowController.take(amount);
connection.obtainSendWindow(amount);
DataFrame df = new DataFrame();
df.streamid(streamid);
if (complete) {
df.setFlag(DataFrame.END_STREAM);
}
df.setData(buffer);
df.computeLength();
return df;
}
@Override
CompletableFuture<Void> sendHeadersAsync() {
try {
sendHeadersOnly();
return CompletableFuture.completedFuture(null);
} catch (IOException | InterruptedException ex) {
return CompletableFuture.failedFuture(ex);
}
}
/**
* A List of responses relating to this stream. Normally there is only
* one response, but intermediate responses like 100 are allowed
* and must be passed up to higher level before continuing. Deals with races
* such as if responses are returned before the CFs get created by
* getResponseAsync()
*/
final List<CompletableFuture<HttpResponseImpl>> response_cfs = new LinkedList<>();
@Override
CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
CompletableFuture<HttpResponseImpl> cf;
synchronized (response_cfs) {
if (!response_cfs.isEmpty()) {
cf = response_cfs.remove(0);
} else {
cf = new CompletableFuture<>();
response_cfs.add(cf);
}
}
PushGroup<?> pg = request.pushGroup();
if (pg != null) {
// if an error occurs make sure it is recorded in the PushGroup
cf = cf.whenComplete((t,e) -> pg.pushError(e));
}
return cf;
}
/**
* Completes the first uncompleted CF on list, and removes it. If there is no
* uncompleted CF then creates one (completes it) and adds to list
*/
void completeResponse(HttpResponse r) {
HttpResponseImpl resp = (HttpResponseImpl)r;
synchronized (response_cfs) {
for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
if (!cf.isDone()) {
cf.complete(resp);
response_cfs.remove(cf);
//responseHeaders = new HttpHeadersImpl(); // for any following header blocks
return;
} else
System.err.println("Stream: " + this + " ALREADY DONE");
}
response_cfs.add(CompletableFuture.completedFuture(resp));
//responseHeaders = new HttpHeadersImpl(); // for any following header blocks
}
}
// methods to update state and remove stream when finished
synchronized void requestSent() {
requestSent = true;
if (responseReceived)
connection.deleteStream(this);
}
synchronized void responseReceived() {
responseReceived = true;
if (requestSent)
connection.deleteStream(this);
PushGroup<?> pg = request.pushGroup();
if (pg != null)
pg.noMorePushes();
}
/**
* same as above but for errors
*
* @param t
*/
void completeResponseExceptionally(Throwable t) {
synchronized (response_cfs) {
for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
if (!cf.isDone()) {
cf.completeExceptionally(t);
response_cfs.remove(cf);
return;
}
}
response_cfs.add(CompletableFuture.failedFuture(t));
}
}
void sendBodyImpl() throws IOException, InterruptedException {
if (requestContentLen == 0) {
// no body
return;
}
DataFrame df;
do {
df = getDataFrame();
// TODO: check accumulated content length (if not checked below)
connection.sendFrame(df);
} while (!df.getFlag(DataFrame.END_STREAM));
requestSent();
}
@Override
void cancel() {
cancelImpl(new Exception("Cancelled"));
}
void cancelImpl(Throwable e) {
Log.logTrace("cancelling stream: {0}\n", e.toString());
inputQ.close();
completeResponseExceptionally(e);
try {
connection.resetStream(streamid, ResetFrame.CANCEL);
} catch (IOException | InterruptedException ex) {
Log.logError(ex);
}
}
@Override
CompletableFuture<Void> sendRequestAsync() {
CompletableFuture<Void> cf = new CompletableFuture<>();
executor.execute(() -> {
try {
sendRequest();
cf.complete(null);
} catch (IOException |InterruptedException e) {
cf.completeExceptionally(e);
}
}, null);
return cf;
}
@Override
<T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
this.responseProcessor = processor;
T body = processor.onResponseBodyStart(
responseContentLen, responseHeaders,
responseFlowController); // TODO: filter headers
if (body == null) {
receiveData();
return processor.onResponseComplete();
} else
receiveDataAsync(processor);
responseReceived();
return body;
}
// called from Http2Connection reader thread
synchronized void updateOutgoingWindow(int update) {
remoteRequestFlowController.accept(update);
}
void close(String msg) {
cancel();
}
static class PushedStream extends Stream {
final PushGroup<?> pushGroup;
final private Stream parent; // used by server push streams
// push streams need the response CF allocated up front as it is
// given directly to user via the multi handler callback function.
final CompletableFuture<HttpResponseImpl> pushCF;
final HttpRequestImpl pushReq;
PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
Http2Connection connection, Stream parent,
HttpRequestImpl pushReq) {
super(client, connection, pushReq);
this.pushGroup = pushGroup;
this.pushReq = pushReq;
this.pushCF = new CompletableFuture<>();
this.parent = parent;
}
// Following methods call the super class but in case of
// error record it in the PushGroup. The error method is called
// with a null value when no error occurred (is a no-op)
@Override
CompletableFuture<Void> sendBodyAsync() {
return super.sendBodyAsync()
.whenComplete((v, t) -> pushGroup.pushError(t));
}
@Override
CompletableFuture<Void> sendHeadersAsync() {
return super.sendHeadersAsync()
.whenComplete((v, t) -> pushGroup.pushError(t));
}
@Override
CompletableFuture<Void> sendRequestAsync() {
return super.sendRequestAsync()
.whenComplete((v, t) -> pushGroup.pushError(t));
}
@Override
CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) {
return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
}
@Override
<T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
return super.responseBodyAsync(processor)
.whenComplete((v, t) -> pushGroup.pushError(t));
}
@Override
void completeResponse(HttpResponse r) {
HttpResponseImpl resp = (HttpResponseImpl)r;
Utils.logResponse(resp);
pushCF.complete(resp);
}
@Override
void completeResponseExceptionally(Throwable t) {
pushCF.completeExceptionally(t);
}
@Override
synchronized void responseReceived() {
super.responseReceived();
pushGroup.pushCompleted();
}
// create and return the PushResponseImpl
@Override
protected void handleResponse() {
HttpConnection c = connection.connection; // TODO: improve
long statusCode = responseHeaders
.firstValueAsLong(":status")
.orElse(-1L);
if (statusCode == -1L)
completeResponseExceptionally(new IOException("No status code"));
ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS);
this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this,
c.sslParameters());
this.responseContentLen = responseHeaders
.firstValueAsLong("content-length")
.orElse(-1L);
// different implementations for normal streams and pushed streams
completeResponse(response);
}
}
/**
* One PushGroup object is associated with the parent Stream of
* the pushed Streams. This keeps track of all common state associated
* with the pushes.
*/
static class PushGroup<T> {
// the overall completion object, completed when all pushes are done.
final CompletableFuture<T> resultCF;
Throwable error; // any exception that occured during pushes
// CF for main response
final CompletableFuture<HttpResponse> mainResponse;
// user's processor object
final HttpResponse.MultiProcessor<T> multiProcessor;
// per push handler function provided by processor
final private BiFunction<HttpRequest,
CompletableFuture<HttpResponse>,
Boolean> pushHandler;
int numberOfPushes;
int remainingPushes;
boolean noMorePushes = false;
PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) {
this.resultCF = new CompletableFuture<>();
this.mainResponse = new CompletableFuture<>();
this.multiProcessor = multiProcessor;
this.pushHandler = multiProcessor.onStart(req, mainResponse);
}
CompletableFuture<T> groupResult() {
return resultCF;
}
CompletableFuture<HttpResponse> mainResponse() {
return mainResponse;
}
private BiFunction<HttpRequest,
CompletableFuture<HttpResponse>, Boolean> pushHandler()
{
return pushHandler;
}
synchronized void addPush() {
numberOfPushes++;
remainingPushes++;
}
synchronized int numberOfPushes() {
return numberOfPushes;
}
// This is called when the main body response completes because it means
// no more PUSH_PROMISEs are possible
synchronized void noMorePushes() {
noMorePushes = true;
checkIfCompleted();
}
synchronized void pushCompleted() {
remainingPushes--;
checkIfCompleted();
}
synchronized void checkIfCompleted() {
if (remainingPushes == 0 && error == null && noMorePushes) {
T overallResult = multiProcessor.onComplete();
resultCF.complete(overallResult);
}
}
synchronized void pushError(Throwable t) {
if (t == null)
return;
this.error = t;
resultCF.completeExceptionally(t);
}
}
}