blob: 94ddc9b2bdb0df0313dd90babe7f2acb769a8c43 [file] [log] [blame]
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
*/
package java.net.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpConnection.Mode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import sun.net.httpclient.hpack.Encoder;
import sun.net.httpclient.hpack.Decoder;
import static java.net.http.SettingsFrame.*;
import static java.net.http.Utils.BUFSIZE;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Formatter;
import java.util.stream.Collectors;
import sun.net.httpclient.hpack.DecodingCallback;
/**
* An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
* over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
*
* Http2Connections belong to a Http2ClientImpl, (one of) which belongs
* to a HttpClientImpl.
*
* Creation cases:
* 1) upgraded HTTP/1.1 plain tcp connection
* 2) prior knowledge directly created plain tcp connection
* 3) directly created HTTP/2 SSL connection which uses ALPN.
*
* Sending is done by writing directly to underlying HttpConnection object which
* is operating in async mode. No flow control applies on output at this level
* and all writes are just executed as puts to an output Q belonging to HttpConnection
* Flow control is implemented by HTTP/2 protocol itself.
*
* Hpack header compression
* and outgoing stream creation is also done here, because these operations
* must be synchronized at the socket level. Stream objects send frames simply
* by placing them on the connection's output Queue. sendFrame() is called
* from a higher level (Stream) thread.
*
* asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
* incoming Http2Frames, and directs them to the appropriate Stream.incoming()
* or handles them directly itself. This thread performs hpack decompression
* and incoming stream creation (Server push). Incoming frames destined for a
* stream are provided by calling Stream.incoming().
*/
class Http2Connection implements BufferHandler {
final Queue<Http2Frame> outputQ;
volatile boolean closed;
//-------------------------------------
final HttpConnection connection;
HttpClientImpl client;
final Http2ClientImpl client2;
Map<Integer,Stream> streams;
int nextstreamid = 3; // stream 1 is registered separately
int nextPushStream = 2;
Encoder hpackOut;
Decoder hpackIn;
SettingsFrame clientSettings, serverSettings;
ByteBufferConsumer bbc;
final LinkedList<ByteBuffer> freeList;
final String key; // for HttpClientImpl.connections map
FrameReader reader;
// Connection level flow control windows
int sendWindow = INITIAL_WINDOW_SIZE;
final static int DEFAULT_FRAME_SIZE = 16 * 1024;
private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
final ExecutorWrapper executor;
/**
* This is established by the protocol spec and the peer will update it with
* WINDOW_UPDATEs, which affects the sendWindow.
*/
final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
// TODO: need list of control frames from other threads
// that need to be sent
/**
* Case 1) Create from upgraded HTTP/1.1 connection.
* Is ready to use. Will not be SSL. exchange is the Exchange
* that initiated the connection, whose response will be delivered
* on a Stream.
*/
Http2Connection(HttpConnection connection, Http2ClientImpl client2,
Exchange exchange) throws IOException, InterruptedException {
this.outputQ = new Queue<>();
String msg = "Connection send window size " + Integer.toString(sendWindow);
Log.logTrace(msg);
//this.initialExchange = exchange;
assert !(connection instanceof SSLConnection);
this.connection = connection;
this.client = client2.client();
this.client2 = client2;
this.executor = client.executorWrapper();
this.freeList = new LinkedList<>();
this.key = keyFor(connection);
streams = Collections.synchronizedMap(new HashMap<>());
initCommon();
//sendConnectionPreface();
Stream initialStream = createStream(exchange);
initialStream.registerStream(1);
initialStream.requestSent();
sendConnectionPreface();
connection.configureMode(Mode.ASYNC);
// start reading and writing
// start reading
AsyncConnection asyncConn = (AsyncConnection)connection;
asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
asyncReceive(connection.getRemaining());
asyncConn.startReading();
}
// async style but completes immediately
static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
Http2ClientImpl client2, Exchange exchange) {
CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
try {
Http2Connection c = new Http2Connection(connection, client2, exchange);
cf.complete(c);
} catch (IOException | InterruptedException e) {
cf.completeExceptionally(e);
}
return cf;
}
/**
* Cases 2) 3)
*
* request is request to be sent.
*/
Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
InetSocketAddress proxy = request.proxy();
URI uri = request.uri();
InetSocketAddress addr = Utils.getAddress(request);
String msg = "Connection send window size " + Integer.toString(sendWindow);
Log.logTrace(msg);
this.key = keyFor(uri, proxy);
this.connection = HttpConnection.getConnection(addr, request, this);
streams = Collections.synchronizedMap(new HashMap<>());
this.client = request.client();
this.client2 = client.client2();
this.executor = client.executorWrapper();
this.freeList = new LinkedList<>();
this.outputQ = new Queue<>();
nextstreamid = 1;
initCommon();
connection.connect();
connection.configureMode(Mode.ASYNC);
// start reading
AsyncConnection asyncConn = (AsyncConnection)connection;
asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
sendConnectionPreface();
asyncConn.startReading();
}
// NEW
synchronized void obtainSendWindow(int amount) throws InterruptedException {
while (amount > 0) {
int n = Math.min(amount, sendWindow);
sendWindow -= n;
amount -= n;
if (amount > 0)
wait();
}
}
synchronized void updateSendWindow(int amount) {
if (sendWindow == 0) {
sendWindow += amount;
notifyAll();
} else
sendWindow += amount;
}
synchronized int sendWindow() {
return sendWindow;
}
static String keyFor(HttpConnection connection) {
boolean isProxy = connection.isProxied();
boolean isSecure = connection.isSecure();
InetSocketAddress addr = connection.address();
return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
}
static String keyFor(URI uri, InetSocketAddress proxy) {
boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
boolean isProxy = proxy != null;
String host;
int port;
if (isProxy) {
host = proxy.getHostString();
port = proxy.getPort();
} else {
host = uri.getHost();
port = uri.getPort();
}
return keyString(isSecure, isProxy, host, port);
}
// {C,S}:{H:P}:host:port
// C indicates clear text connection "http"
// S indicates secure "https"
// H indicates host (direct) connection
// P indicates proxy
// Eg: "S:H:foo.com:80"
static String keyString(boolean secure, boolean proxy, String host, int port) {
char c1 = secure ? 'S' : 'C';
char c2 = proxy ? 'P' : 'H';
StringBuilder sb = new StringBuilder(128);
sb.append(c1).append(':').append(c2).append(':')
.append(host).append(':').append(port);
return sb.toString();
}
String key() {
return this.key;
}
void putConnection() {
client2.putConnection(this);
}
private static String toHexdump1(ByteBuffer bb) {
bb.mark();
StringBuilder sb = new StringBuilder(512);
Formatter f = new Formatter(sb);
while (bb.hasRemaining()) {
int i = Byte.toUnsignedInt(bb.get());
f.format("%02x:", i);
}
sb.deleteCharAt(sb.length()-1);
bb.reset();
return sb.toString();
}
private static String toHexdump(ByteBuffer bb) {
List<String> words = new ArrayList<>();
int i = 0;
bb.mark();
while (bb.hasRemaining()) {
if (i % 2 == 0) {
words.add("");
}
byte b = bb.get();
String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
words.set(i / 2, words.get(i / 2) + hex);
i++;
}
bb.reset();
return words.stream().collect(Collectors.joining(" "));
}
private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
ByteBuffer[] buffers = frame.getHeaderBlock();
for (int i = 0; i < buffers.length; i++) {
hpackIn.decode(buffers[i], endOfHeaders, decoder);
}
}
int getInitialSendWindowSize() {
return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE);
}
void close() {
GoAwayFrame f = new GoAwayFrame();
f.setDebugData("Requested by user".getBytes());
// TODO: set last stream. For now zero ok.
sendFrame(f);
}
// BufferHandler methods
@Override
public ByteBuffer getBuffer(int n) {
return client.getBuffer(n);
}
@Override
public void returnBuffer(ByteBuffer buf) {
client.returnBuffer(buf);
}
@Override
public void setMinBufferSize(int n) {
client.setMinBufferSize(n);
}
private final Object readlock = new Object();
void asyncReceive(ByteBuffer buffer) {
synchronized (readlock) {
try {
if (reader == null) {
reader = new FrameReader(buffer);
} else {
reader.input(buffer);
}
while (true) {
if (reader.haveFrame()) {
List<ByteBuffer> buffers = reader.frame();
ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer);
processFrame(bbc);
if (bbc.consumed()) {
reader = new FrameReader();
return;
} else {
reader = new FrameReader(reader);
}
} else
return;
}
} catch (Throwable e) {
String msg = Utils.stackTrace(e);
Log.logTrace(msg);
shutdown(e);
}
}
}
void shutdown(Throwable t) {
Log.logError(t);
closed = true;
client2.deleteConnection(this);
Collection<Stream> c = streams.values();
for (Stream s : c) {
s.cancelImpl(t);
}
connection.close();
}
/**
* Handles stream 0 (common) frames that apply to whole connection and passes
* other stream specific frames to that Stream object.
*
* Invokes Stream.incoming() which is expected to process frame without
* blocking.
*/
void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException {
Http2Frame frame = Http2Frame.readIncoming(bbc);
Log.logFrames(frame, "IN");
int streamid = frame.streamid();
if (streamid == 0) {
handleCommonFrame(frame);
} else {
Stream stream = getStream(streamid);
if (stream == null) {
// should never receive a frame with unknown stream id
resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
}
if (frame instanceof PushPromiseFrame) {
PushPromiseFrame pp = (PushPromiseFrame)frame;
handlePushPromise(stream, pp);
} else if (frame instanceof HeaderFrame) {
// decode headers (or continuation)
decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
stream.incoming(frame);
} else
stream.incoming(frame);
}
}
private void handlePushPromise(Stream parent, PushPromiseFrame pp)
throws IOException, InterruptedException {
HttpRequestImpl parentReq = parent.request;
int promisedStreamid = pp.getPromisedStream();
if (promisedStreamid != nextPushStream) {
resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
return;
} else {
nextPushStream += 2;
}
HeaderDecoder decoder = new HeaderDecoder();
decodeHeaders(pp, decoder);
HttpHeadersImpl headers = decoder.headers();
HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
Stream.PushedStream pushStream = createPushStream(parent, pushReq);
pushStream.registerStream(promisedStreamid);
parent.incoming_pushPromise(pushReq, pushStream);
}
private void handleCommonFrame(Http2Frame frame)
throws IOException, InterruptedException {
switch (frame.type()) {
case SettingsFrame.TYPE:
{ SettingsFrame f = (SettingsFrame)frame;
handleSettings(f);}
break;
case PingFrame.TYPE:
{ PingFrame f = (PingFrame)frame;
handlePing(f);}
break;
case GoAwayFrame.TYPE:
{ GoAwayFrame f = (GoAwayFrame)frame;
handleGoAway(f);}
break;
case WindowUpdateFrame.TYPE:
{ WindowUpdateFrame f = (WindowUpdateFrame)frame;
handleWindowUpdate(f);}
break;
default:
protocolError(ErrorFrame.PROTOCOL_ERROR);
}
}
void resetStream(int streamid, int code) throws IOException, InterruptedException {
Log.logError(
"Resetting stream {0,number,integer} with error code {1,number,integer}",
streamid, code);
ResetFrame frame = new ResetFrame();
frame.streamid(streamid);
frame.setErrorCode(code);
sendFrame(frame);
streams.remove(streamid);
}
private void handleWindowUpdate(WindowUpdateFrame f)
throws IOException, InterruptedException {
updateSendWindow(f.getUpdate());
}
private void protocolError(int errorCode)
throws IOException, InterruptedException {
GoAwayFrame frame = new GoAwayFrame();
frame.setErrorCode(errorCode);
sendFrame(frame);
String msg = "Error code: " + errorCode;
shutdown(new IOException("protocol error"));
}
private void handleSettings(SettingsFrame frame)
throws IOException, InterruptedException {
if (frame.getFlag(SettingsFrame.ACK)) {
// ignore ack frames for now.
return;
}
serverSettings = frame;
SettingsFrame ack = getAckFrame(frame.streamid());
sendFrame(ack);
}
private void handlePing(PingFrame frame)
throws IOException, InterruptedException {
frame.setFlag(PingFrame.ACK);
sendFrame(frame);
}
private void handleGoAway(GoAwayFrame frame)
throws IOException, InterruptedException {
//System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
shutdown(new IOException("GOAWAY received"));
}
private void initCommon() {
clientSettings = client2.getClientSettings();
// serverSettings will be updated by server
serverSettings = SettingsFrame.getDefaultSettings();
hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
}
/**
* Max frame size we are allowed to send
*/
public int getMaxSendFrameSize() {
int param = serverSettings.getParameter(MAX_FRAME_SIZE);
if (param == -1) {
param = DEFAULT_FRAME_SIZE;
}
return param;
}
/**
* Max frame size we will receive
*/
public int getMaxReceiveFrameSize() {
return clientSettings.getParameter(MAX_FRAME_SIZE);
}
// Not sure how useful this is.
public int getMaxHeadersSize() {
return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
}
private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
private static final byte[] PREFACE_BYTES =
CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
/**
* Sends Connection preface and Settings frame with current preferred
* values
*/
private void sendConnectionPreface() throws IOException {
ByteBufferGenerator bg = new ByteBufferGenerator(this);
bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
ByteBuffer[] ba = bg.getBufferArray();
connection.write(ba, 0, ba.length);
bg = new ByteBufferGenerator(this);
SettingsFrame sf = client2.getClientSettings();
Log.logFrames(sf, "OUT");
sf.writeOutgoing(bg);
WindowUpdateFrame wup = new WindowUpdateFrame();
wup.streamid(0);
// send a Window update for the receive buffer we are using
// minus the initial 64 K specified in protocol
wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
wup.computeLength();
wup.writeOutgoing(bg);
Log.logFrames(wup, "OUT");
ba = bg.getBufferArray();
connection.write(ba, 0, ba.length);
}
/**
* Returns an existing Stream with given id, or null if doesn't exist
*/
Stream getStream(int streamid) {
return streams.get(streamid);
}
/**
* Creates Stream with given id.
*/
Stream createStream(Exchange exchange) {
Stream stream = new Stream(client, this, exchange);
return stream;
}
Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
Stream.PushGroup<?> pg = parent.request.pushGroup();
return new Stream.PushedStream(pg, client, this, parent, pushReq);
}
void putStream(Stream stream, int streamid) {
streams.put(streamid, stream);
}
void deleteStream(Stream stream) {
streams.remove(stream.streamid);
}
static final int MAX_STREAM = Integer.MAX_VALUE - 2;
// Number of header bytes in a Headers Frame
final static int HEADERS_HEADER_SIZE = 15;
// Number of header bytes in a Continuation frame
final static int CONTIN_HEADER_SIZE = 9;
/**
* Encode the headers into a List<ByteBuffer> and then create HEADERS
* and CONTINUATION frames from the list and return the List<Http2Frame>.
*
* @param frame
* @return
*/
private LinkedList<Http2Frame> encodeHeaders(OutgoingHeaders frame) {
LinkedList<ByteBuffer> buffers = new LinkedList<>();
ByteBuffer buf = getBuffer();
buffers.add(buf);
encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers);
encodeHeadersImpl(frame.getUserHeaders(), buffers);
encodeHeadersImpl(frame.getSystemHeaders(), buffers);
for (ByteBuffer b : buffers) {
b.flip();
}
LinkedList<Http2Frame> frames = new LinkedList<>();
int maxframesize = getMaxSendFrameSize();
HeadersFrame oframe = new HeadersFrame();
oframe.setFlags(frame.getFlags());
oframe.streamid(frame.streamid());
oframe.setHeaderBlock(getBufferArray(buffers, maxframesize));
frames.add(oframe);
// Any buffers left?
boolean done = buffers.isEmpty();
if (done) {
oframe.setFlag(HeaderFrame.END_HEADERS);
} else {
ContinuationFrame cf = null;
while (!done) {
cf = new ContinuationFrame();
cf.streamid(frame.streamid());
cf.setHeaderBlock(getBufferArray(buffers, maxframesize));
frames.add(cf);
done = buffers.isEmpty();
}
cf.setFlag(HeaderFrame.END_HEADERS);
}
return frames;
}
// should always return at least one buffer
private static ByteBuffer[] getBufferArray(LinkedList<ByteBuffer> list, int maxsize) {
assert maxsize >= BUFSIZE;
LinkedList<ByteBuffer> newlist = new LinkedList<>();
int size = list.size();
int nbytes = 0;
for (int i=0; i<size; i++) {
ByteBuffer buf = list.getFirst();
if (nbytes + buf.remaining() <= maxsize) {
nbytes += buf.remaining();
newlist.add(buf);
list.remove();
} else {
break;
}
}
return newlist.toArray(empty);
}
/**
* Encode all the headers from the given HttpHeadersImpl into the given List.
*/
private void encodeHeadersImpl(HttpHeaders hdrs, LinkedList<ByteBuffer> buffers) {
ByteBuffer buffer;
if (!(buffer = buffers.getLast()).hasRemaining()) {
buffer = getBuffer();
buffers.add(buffer);
}
for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) {
String key = e.getKey();
String lkey = key.toLowerCase();
List<String> values = e.getValue();
for (String value : values) {
hpackOut.header(lkey, value);
boolean encoded = false;
do {
encoded = hpackOut.encode(buffer);
if (!encoded) {
buffer = getBuffer();
buffers.add(buffer);
}
} while (!encoded);
}
}
}
public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
for (Http2Frame frame : frames) {
sendFrame(frame);
}
}
static Throwable getExceptionFrom(CompletableFuture<?> cf) {
try {
cf.get();
return null;
} catch (Throwable e) {
if (e.getCause() != null)
return e.getCause();
else
return e;
}
}
void execute(Runnable r) {
executor.execute(r, null);
}
private final Object sendlock = new Object();
/**
*
*/
void sendFrame(Http2Frame frame) {
synchronized (sendlock) {
try {
if (frame instanceof OutgoingHeaders) {
OutgoingHeaders oh = (OutgoingHeaders) frame;
Stream stream = oh.getStream();
stream.registerStream(nextstreamid);
oh.streamid(nextstreamid);
nextstreamid += 2;
// set outgoing window here. This allows thread sending
// body to proceed.
stream.updateOutgoingWindow(getInitialSendWindowSize());
LinkedList<Http2Frame> frames = encodeHeaders(oh);
for (Http2Frame f : frames) {
sendOneFrame(f);
}
} else {
sendOneFrame(frame);
}
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
}
}
}
/**
* Send a frame.
*
* @param frame
* @throws IOException
*/
private void sendOneFrame(Http2Frame frame) throws IOException {
ByteBufferGenerator bbg = new ByteBufferGenerator(this);
frame.computeLength();
Log.logFrames(frame, "OUT");
frame.writeOutgoing(bbg);
ByteBuffer[] currentBufs = bbg.getBufferArray();
connection.write(currentBufs, 0, currentBufs.length);
}
private SettingsFrame getAckFrame(int streamid) {
SettingsFrame frame = new SettingsFrame();
frame.setFlag(SettingsFrame.ACK);
frame.streamid(streamid);
return frame;
}
static class HeaderDecoder implements DecodingCallback {
HttpHeadersImpl headers;
HeaderDecoder() {
this.headers = new HttpHeadersImpl();
}
@Override
public void onDecoded(CharSequence name, CharSequence value) {
headers.addHeader(name.toString(), value.toString());
}
HttpHeadersImpl headers() {
return headers;
}
}
}