blob: d68436cc14fd973790f3f125a357dd1e898cb523 [file] [log] [blame]
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.incubator.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import jdk.incubator.http.HttpConnection.Mode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Formatter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.Encoder;
import jdk.incubator.http.internal.hpack.Decoder;
import jdk.incubator.http.internal.hpack.DecodingCallback;
import static jdk.incubator.http.internal.frame.SettingsFrame.*;
/**
* An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
* over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
*
* Http2Connections belong to a Http2ClientImpl, (one of) which belongs
* to a HttpClientImpl.
*
* Creation cases:
* 1) upgraded HTTP/1.1 plain tcp connection
* 2) prior knowledge directly created plain tcp connection
* 3) directly created HTTP/2 SSL connection which uses ALPN.
*
* Sending is done by writing directly to underlying HttpConnection object which
* is operating in async mode. No flow control applies on output at this level
* and all writes are just executed as puts to an output Q belonging to HttpConnection
* Flow control is implemented by HTTP/2 protocol itself.
*
* Hpack header compression
* and outgoing stream creation is also done here, because these operations
* must be synchronized at the socket level. Stream objects send frames simply
* by placing them on the connection's output Queue. sendFrame() is called
* from a higher level (Stream) thread.
*
* asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
* incoming Http2Frames, and directs them to the appropriate Stream.incoming()
* or handles them directly itself. This thread performs hpack decompression
* and incoming stream creation (Server push). Incoming frames destined for a
* stream are provided by calling Stream.incoming().
*/
class Http2Connection {
/*
* ByteBuffer pooling strategy for HTTP/2 protocol:
*
* In general there are 4 points where ByteBuffers are used:
* - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data
* in case of SSL connection.
*
* 1. Outgoing frames encoded to ByteBuffers.
* Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
* At this place no pools at all. All outgoing buffers should be collected by GC.
*
* 2. Incoming ByteBuffers (decoded to frames).
* Here, total elimination of BB pool is not a good idea.
* We don't know how many bytes we will receive through network.
* So here we allocate buffer of reasonable size. The following life of the BB:
* - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
* BB is returned to pool,
* - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
* Such BB is never returned to pool and will be GCed.
* - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
* the buffer could be release to pool.
*
* 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
* because of we can't predict size encrypted packets.
*
*/
// A small class that allows to control the state of
// the connection preface. This is just a thin wrapper
// over a CountDownLatch.
private final class PrefaceController {
volatile boolean prefaceSent;
private final CountDownLatch latch = new CountDownLatch(1);
// This method returns immediately if the preface is sent,
// and blocks until the preface is sent if not.
// In the common case this where the preface is already sent
// this will cost not more than a volatile read.
void waitUntilPrefaceSent() {
if (!prefaceSent) {
try {
// If the preface is not sent then await on the latch
Log.logTrace("Waiting until connection preface is sent");
latch.await();
Log.logTrace("Preface sent: resuming reading");
assert prefaceSent;
} catch (InterruptedException e) {
String msg = Utils.stackTrace(e);
Log.logTrace(msg);
shutdown(e);
}
}
}
// Mark that the connection preface is sent
void markPrefaceSent() {
assert !prefaceSent;
prefaceSent = true;
// Release the latch. If asyncReceive was scheduled it will
// be waiting for the release and will be woken up by this
// call. If not, then the semaphore will no longer be used after
// this.
latch.countDown();
}
boolean isPrefaceSent() {
return prefaceSent;
}
}
volatile boolean closed;
//-------------------------------------
final HttpConnection connection;
private final HttpClientImpl client;
private final Http2ClientImpl client2;
private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
private int nextstreamid;
private int nextPushStream = 2;
private final Encoder hpackOut;
private final Decoder hpackIn;
final SettingsFrame clientSettings;
private volatile SettingsFrame serverSettings;
private final String key; // for HttpClientImpl.connections map
private final FramesDecoder framesDecoder;
private final FramesEncoder framesEncoder = new FramesEncoder();
/**
* Send Window controller for both connection and stream windows.
* Each of this connection's Streams MUST use this controller.
*/
private final WindowController windowController = new WindowController();
private final PrefaceController prefaceController = new PrefaceController();
final WindowUpdateSender windowUpdater;
static final int DEFAULT_FRAME_SIZE = 16 * 1024;
// TODO: need list of control frames from other threads
// that need to be sent
private Http2Connection(HttpConnection connection,
Http2ClientImpl client2,
int nextstreamid,
String key) {
this.connection = connection;
this.client = client2.client();
this.client2 = client2;
this.nextstreamid = nextstreamid;
this.key = key;
this.clientSettings = this.client2.getClientSettings();
this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
// serverSettings will be updated by server
this.serverSettings = SettingsFrame.getDefaultSettings();
this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize());
}
/**
* Case 1) Create from upgraded HTTP/1.1 connection.
* Is ready to use. Will not be SSL. exchange is the Exchange
* that initiated the connection, whose response will be delivered
* on a Stream.
*/
Http2Connection(HttpConnection connection,
Http2ClientImpl client2,
Exchange<?> exchange,
ByteBuffer initial)
throws IOException, InterruptedException
{
this(connection,
client2,
3, // stream 1 is registered during the upgrade
keyFor(connection));
assert !(connection instanceof SSLConnection);
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
Stream<?> initialStream = createStream(exchange);
initialStream.registerStream(1);
windowController.registerStream(1, getInitialSendWindowSize());
initialStream.requestSent();
sendConnectionPreface();
// start reading and writing
// start reading
AsyncConnection asyncConn = (AsyncConnection)connection;
asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
asyncReceive(ByteBufferReference.of(initial));
asyncConn.startReading();
}
// async style but completes immediately
static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
Http2ClientImpl client2,
Exchange<?> exchange,
ByteBuffer initial) {
CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
try {
Http2Connection c = new Http2Connection(connection, client2, exchange, initial);
cf.complete(c);
} catch (IOException | InterruptedException e) {
cf.completeExceptionally(e);
}
return cf;
}
/**
* Cases 2) 3)
*
* request is request to be sent.
*/
Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client)
throws IOException, InterruptedException
{
this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true),
h2client,
1,
keyFor(request.uri(), request.proxy(h2client.client())));
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
connection.connect();
// start reading
AsyncConnection asyncConn = (AsyncConnection)connection;
asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
asyncConn.startReading();
sendConnectionPreface();
}
static String keyFor(HttpConnection connection) {
boolean isProxy = connection.isProxied();
boolean isSecure = connection.isSecure();
InetSocketAddress addr = connection.address();
return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
}
static String keyFor(URI uri, InetSocketAddress proxy) {
boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
boolean isProxy = proxy != null;
String host;
int port;
if (isProxy) {
host = proxy.getHostString();
port = proxy.getPort();
} else {
host = uri.getHost();
port = uri.getPort();
}
return keyString(isSecure, isProxy, host, port);
}
// {C,S}:{H:P}:host:port
// C indicates clear text connection "http"
// S indicates secure "https"
// H indicates host (direct) connection
// P indicates proxy
// Eg: "S:H:foo.com:80"
static String keyString(boolean secure, boolean proxy, String host, int port) {
return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
}
String key() {
return this.key;
}
void putConnection() {
client2.putConnection(this);
}
private static String toHexdump1(ByteBuffer bb) {
bb.mark();
StringBuilder sb = new StringBuilder(512);
Formatter f = new Formatter(sb);
while (bb.hasRemaining()) {
int i = Byte.toUnsignedInt(bb.get());
f.format("%02x:", i);
}
sb.deleteCharAt(sb.length()-1);
bb.reset();
return sb.toString();
}
private static String toHexdump(ByteBuffer bb) {
List<String> words = new ArrayList<>();
int i = 0;
bb.mark();
while (bb.hasRemaining()) {
if (i % 2 == 0) {
words.add("");
}
byte b = bb.get();
String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
words.set(i / 2, words.get(i / 2) + hex);
i++;
}
bb.reset();
return words.stream().collect(Collectors.joining(" "));
}
private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
ByteBufferReference[] buffers = frame.getHeaderBlock();
for (int i = 0; i < buffers.length; i++) {
hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder);
}
}
int getInitialSendWindowSize() {
return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
}
void close() {
GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
// TODO: set last stream. For now zero ok.
sendFrame(f);
}
private ByteBufferPool readBufferPool = new ByteBufferPool();
// provides buffer to read data (default size)
public ByteBufferReference getReadBuffer() {
return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE);
}
private final Object readlock = new Object();
public void asyncReceive(ByteBufferReference buffer) {
// We don't need to read anything and
// we don't want to send anything back to the server
// until the connection preface has been sent.
// Therefore we're going to wait if needed before reading
// (and thus replying) to anything.
// Starting to reply to something (e.g send an ACK to a
// SettingsFrame sent by the server) before the connection
// preface is fully sent might result in the server
// sending a GOAWAY frame with 'invalid_preface'.
prefaceController.waitUntilPrefaceSent();
synchronized (readlock) {
assert prefaceController.isPrefaceSent();
try {
framesDecoder.decode(buffer);
} catch (Throwable e) {
String msg = Utils.stackTrace(e);
Log.logTrace(msg);
shutdown(e);
}
}
}
void shutdown(Throwable t) {
Log.logError(t);
closed = true;
client2.deleteConnection(this);
List<Stream<?>> c = new LinkedList<>(streams.values());
for (Stream<?> s : c) {
s.cancelImpl(t);
}
connection.close();
}
/**
* Handles stream 0 (common) frames that apply to whole connection and passes
* other stream specific frames to that Stream object.
*
* Invokes Stream.incoming() which is expected to process frame without
* blocking.
*/
void processFrame(Http2Frame frame) throws IOException {
Log.logFrames(frame, "IN");
int streamid = frame.streamid();
if (frame instanceof MalformedFrame) {
Log.logError(((MalformedFrame) frame).getMessage());
if (streamid == 0) {
protocolError(((MalformedFrame) frame).getErrorCode());
} else {
resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
}
return;
}
if (streamid == 0) {
handleConnectionFrame(frame);
} else {
if (frame instanceof SettingsFrame) {
// The stream identifier for a SETTINGS frame MUST be zero
protocolError(GoAwayFrame.PROTOCOL_ERROR);
return;
}
Stream<?> stream = getStream(streamid);
if (stream == null) {
// Should never receive a frame with unknown stream id
// To avoid looping, an endpoint MUST NOT send a RST_STREAM in
// response to a RST_STREAM frame.
if (!(frame instanceof ResetFrame)) {
resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
}
return;
}
if (frame instanceof PushPromiseFrame) {
PushPromiseFrame pp = (PushPromiseFrame)frame;
handlePushPromise(stream, pp);
} else if (frame instanceof HeaderFrame) {
// decode headers (or continuation)
decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
stream.incoming(frame);
} else {
stream.incoming(frame);
}
}
}
private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
throws IOException
{
HttpRequestImpl parentReq = parent.request;
int promisedStreamid = pp.getPromisedStream();
if (promisedStreamid != nextPushStream) {
resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
return;
} else {
nextPushStream += 2;
}
HeaderDecoder decoder = new HeaderDecoder();
decodeHeaders(pp, decoder);
HttpHeadersImpl headers = decoder.headers();
HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
pushExch.exchImpl = pushStream;
pushStream.registerStream(promisedStreamid);
parent.incoming_pushPromise(pushReq, pushStream);
}
private void handleConnectionFrame(Http2Frame frame)
throws IOException
{
switch (frame.type()) {
case SettingsFrame.TYPE:
handleSettings((SettingsFrame)frame);
break;
case PingFrame.TYPE:
handlePing((PingFrame)frame);
break;
case GoAwayFrame.TYPE:
handleGoAway((GoAwayFrame)frame);
break;
case WindowUpdateFrame.TYPE:
handleWindowUpdate((WindowUpdateFrame)frame);
break;
default:
protocolError(ErrorFrame.PROTOCOL_ERROR);
}
}
void resetStream(int streamid, int code) throws IOException {
Log.logError(
"Resetting stream {0,number,integer} with error code {1,number,integer}",
streamid, code);
ResetFrame frame = new ResetFrame(streamid, code);
sendFrame(frame);
closeStream(streamid);
}
void closeStream(int streamid) {
Stream<?> s = streams.remove(streamid);
// ## Remove s != null. It is a hack for delayed cancellation,reset
if (s != null && !(s instanceof Stream.PushedStream)) {
// Since PushStreams have no request body, then they have no
// corresponding entry in the window controller.
windowController.removeStream(streamid);
}
}
/**
* Increments this connection's send Window by the amount in the given frame.
*/
private void handleWindowUpdate(WindowUpdateFrame f)
throws IOException
{
int amount = f.getUpdate();
if (amount <= 0) {
// ## temporarily disable to workaround a bug in Jetty where it
// ## sends Window updates with a 0 update value.
//protocolError(ErrorFrame.PROTOCOL_ERROR);
} else {
boolean success = windowController.increaseConnectionWindow(amount);
if (!success) {
protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow
}
}
}
private void protocolError(int errorCode)
throws IOException
{
GoAwayFrame frame = new GoAwayFrame(0, errorCode);
sendFrame(frame);
shutdown(new IOException("protocol error"));
}
private void handleSettings(SettingsFrame frame)
throws IOException
{
assert frame.streamid() == 0;
if (!frame.getFlag(SettingsFrame.ACK)) {
int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
int diff = newWindowSize - oldWindowSize;
if (diff != 0) {
windowController.adjustActiveStreams(diff);
}
serverSettings = frame;
sendFrame(new SettingsFrame(SettingsFrame.ACK));
}
}
private void handlePing(PingFrame frame)
throws IOException
{
frame.setFlag(PingFrame.ACK);
sendUnorderedFrame(frame);
}
private void handleGoAway(GoAwayFrame frame)
throws IOException
{
shutdown(new IOException(
String.valueOf(connection.channel().getLocalAddress())
+": GOAWAY received"));
}
/**
* Max frame size we are allowed to send
*/
public int getMaxSendFrameSize() {
int param = serverSettings.getParameter(MAX_FRAME_SIZE);
if (param == -1) {
param = DEFAULT_FRAME_SIZE;
}
return param;
}
/**
* Max frame size we will receive
*/
public int getMaxReceiveFrameSize() {
return clientSettings.getParameter(MAX_FRAME_SIZE);
}
// Not sure how useful this is.
public int getMaxHeadersSize() {
return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
}
private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
private static final byte[] PREFACE_BYTES =
CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
/**
* Sends Connection preface and Settings frame with current preferred
* values
*/
private void sendConnectionPreface() throws IOException {
Log.logTrace("{0}: start sending connection preface to {1}",
connection.channel().getLocalAddress(),
connection.address());
SettingsFrame sf = client2.getClientSettings();
ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
Log.logFrames(sf, "OUT");
// send preface bytes and SettingsFrame together
connection.write(ref.get());
Log.logTrace("PREFACE_BYTES sent");
Log.logTrace("Settings Frame sent");
// send a Window update for the receive buffer we are using
// minus the initial 64 K specified in protocol
final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1);
windowUpdater.sendWindowUpdate(len);
Log.logTrace("finished sending connection preface");
prefaceController.markPrefaceSent();
}
/**
* Returns an existing Stream with given id, or null if doesn't exist
*/
@SuppressWarnings("unchecked")
<T> Stream<T> getStream(int streamid) {
return (Stream<T>)streams.get(streamid);
}
/**
* Creates Stream with given id.
*/
<T> Stream<T> createStream(Exchange<T> exchange) {
Stream<T> stream = new Stream<>(client, this, exchange, windowController);
return stream;
}
<T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
PushGroup<?,T> pg = parent.exchange.getPushGroup();
return new Stream.PushedStream<>(pg, client, this, parent, pushEx);
}
<T> void putStream(Stream<T> stream, int streamid) {
streams.put(streamid, stream);
}
void deleteStream(int streamid) {
streams.remove(streamid);
windowController.removeStream(streamid);
}
/**
* Encode the headers into a List<ByteBuffer> and then create HEADERS
* and CONTINUATION frames from the list and return the List<Http2Frame>.
*/
private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
List<ByteBufferReference> buffers = encodeHeadersImpl(
getMaxSendFrameSize(),
frame.getAttachment().getRequestPseudoHeaders(),
frame.getUserHeaders(),
frame.getSystemHeaders());
List<HeaderFrame> frames = new ArrayList<>(buffers.size());
Iterator<ByteBufferReference> bufIterator = buffers.iterator();
HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
frames.add(oframe);
while(bufIterator.hasNext()) {
oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
frames.add(oframe);
}
oframe.setFlag(HeaderFrame.END_HEADERS);
return frames;
}
// Dedicated cache for headers encoding ByteBuffer.
// There can be no concurrent access to this buffer as all access to this buffer
// and its content happen within a single critical code block section protected
// by the sendLock. / (see sendFrame())
private ByteBufferPool headerEncodingPool = new ByteBufferPool();
private ByteBufferReference getHeaderBuffer(int maxFrameSize) {
ByteBufferReference ref = headerEncodingPool.get(maxFrameSize);
ref.get().limit(maxFrameSize);
return ref;
}
/*
* Encodes all the headers from the given HttpHeaders into the given List
* of buffers.
*
* From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
*
* ...Just as in HTTP/1.x, header field names are strings of ASCII
* characters that are compared in a case-insensitive fashion. However,
* header field names MUST be converted to lowercase prior to their
* encoding in HTTP/2...
*/
private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
ByteBufferReference buffer = getHeaderBuffer(maxFrameSize);
List<ByteBufferReference> buffers = new ArrayList<>();
for(HttpHeaders header : headers) {
for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
String lKey = e.getKey().toLowerCase();
List<String> values = e.getValue();
for (String value : values) {
hpackOut.header(lKey, value);
while (!hpackOut.encode(buffer.get())) {
buffer.get().flip();
buffers.add(buffer);
buffer = getHeaderBuffer(maxFrameSize);
}
}
}
}
buffer.get().flip();
buffers.add(buffer);
return buffers;
}
private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
oh.streamid(stream.streamid);
if (Log.headers()) {
StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
sb.append(stream.streamid).append(")\n");
Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders());
Log.dumpHeaders(sb, " ", oh.getSystemHeaders());
Log.dumpHeaders(sb, " ", oh.getUserHeaders());
Log.logHeaders(sb.toString());
}
List<HeaderFrame> frames = encodeHeaders(oh);
return encodeFrames(frames);
}
private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) {
if (Log.frames()) {
frames.forEach(f -> Log.logFrames(f, "OUT"));
}
return framesEncoder.encodeFrames(frames);
}
static Throwable getExceptionFrom(CompletableFuture<?> cf) {
try {
cf.get();
return null;
} catch (Throwable e) {
if (e.getCause() != null) {
return e.getCause();
} else {
return e;
}
}
}
private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
Stream<?> stream = oh.getAttachment();
int streamid = nextstreamid;
nextstreamid += 2;
stream.registerStream(streamid);
// set outgoing window here. This allows thread sending
// body to proceed.
windowController.registerStream(streamid, getInitialSendWindowSize());
return stream;
}
private final Object sendlock = new Object();
void sendFrame(Http2Frame frame) {
try {
synchronized (sendlock) {
if (frame instanceof OutgoingHeaders) {
@SuppressWarnings("unchecked")
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
Stream<?> stream = registerNewStream(oh);
// provide protection from inserting unordered frames between Headers and Continuation
connection.writeAsync(encodeHeaders(oh, stream));
} else {
connection.writeAsync(encodeFrame(frame));
}
}
connection.flushAsync();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
}
}
private ByteBufferReference[] encodeFrame(Http2Frame frame) {
Log.logFrames(frame, "OUT");
return framesEncoder.encodeFrame(frame);
}
void sendDataFrame(DataFrame frame) {
try {
connection.writeAsync(encodeFrame(frame));
connection.flushAsync();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
}
}
/*
* Direct call of the method bypasses synchronization on "sendlock" and
* allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
* prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
*/
void sendUnorderedFrame(Http2Frame frame) {
try {
connection.writeAsyncUnordered(encodeFrame(frame));
connection.flushAsync();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
}
}
static class HeaderDecoder implements DecodingCallback {
HttpHeadersImpl headers;
HeaderDecoder() {
this.headers = new HttpHeadersImpl();
}
@Override
public void onDecoded(CharSequence name, CharSequence value) {
headers.addHeader(name.toString(), value.toString());
}
HttpHeadersImpl headers() {
return headers;
}
}
static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
public ConnectionWindowUpdateSender(Http2Connection connection,
int initialWindowSize) {
super(connection, initialWindowSize);
}
@Override
int getStreamId() {
return 0;
}
}
}