blob: 04bff0badb536a9d082436a9c95c42fb7d61517a [file] [log] [blame]
/*
* Copyright (C) 2011 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.internal.spdy;
import com.squareup.okhttp.Protocol;
import com.squareup.okhttp.internal.NamedRunnable;
import com.squareup.okhttp.internal.Util;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import okio.Buffer;
import okio.BufferedSource;
import okio.ByteString;
import okio.Okio;
import static com.squareup.okhttp.internal.spdy.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
/**
* A socket connection to a remote peer. A connection hosts streams which can
* send and receive data.
*
* <p>Many methods in this API are <strong>synchronous:</strong> the call is
* completed before the method returns. This is typical for Java but atypical
* for SPDY. This is motivated by exception transparency: an IOException that
* was triggered by a certain caller can be caught and handled by that caller.
*/
public final class SpdyConnection implements Closeable {
// Internal state of this connection is guarded by 'this'. No blocking
// operations may be performed while holding this lock!
//
// Socket writes are guarded by frameWriter.
//
// Socket reads are unguarded but are only made by the reader thread.
//
// Certain operations (like SYN_STREAM) need to synchronize on both the
// frameWriter (to do blocking I/O) and this (to create streams). Such
// operations must synchronize on 'this' last. This ensures that we never
// wait for a blocking operation while holding 'this'.
private static final ExecutorService executor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
Util.threadFactory("OkHttp SpdyConnection", true));
/** The protocol variant, like {@link com.squareup.okhttp.internal.spdy.Spdy3}. */
final Protocol protocol;
/** True if this peer initiated the connection. */
final boolean client;
/**
* User code to run in response to an incoming stream. Callbacks must not be
* run on the callback executor.
*/
private final IncomingStreamHandler handler;
private final Map<Integer, SpdyStream> streams = new HashMap<>();
private final String hostName;
private int lastGoodStreamId;
private int nextStreamId;
private boolean shutdown;
private long idleStartTimeNs = System.nanoTime();
/** Ensures push promise callbacks events are sent in order per stream. */
private final ExecutorService pushExecutor;
/** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
private Map<Integer, Ping> pings;
/** User code to run in response to push promise events. */
private final PushObserver pushObserver;
private int nextPingId;
/**
* The total number of bytes consumed by the application, but not yet
* acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection.
*/
// Visible for testing
long unacknowledgedBytesRead = 0;
/**
* Count of bytes that can be written on the connection before receiving a
* window update.
*/
// Visible for testing
long bytesLeftInWriteWindow;
/** Settings we communicate to the peer. */
// TODO: Do we want to dynamically adjust settings, or KISS and only set once?
final Settings okHttpSettings = new Settings();
// okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max);
private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
/** Settings we receive from the peer. */
// TODO: MWS will need to guard on this setting before attempting to push.
final Settings peerSettings = new Settings();
private boolean receivedInitialPeerSettings = false;
final Variant variant;
final Socket socket;
final FrameWriter frameWriter;
// Visible for testing
final Reader readerRunnable;
private SpdyConnection(Builder builder) throws IOException {
protocol = builder.protocol;
pushObserver = builder.pushObserver;
client = builder.client;
handler = builder.handler;
// http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1
nextStreamId = builder.client ? 1 : 2;
if (builder.client && protocol == Protocol.HTTP_2) {
nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade.
}
nextPingId = builder.client ? 1 : 2;
// Flow control was designed more for servers, or proxies than edge clients.
// If we are a client, set the flow control window to 16MiB. This avoids
// thrashing window updates every 64KiB, yet small enough to avoid blowing
// up the heap.
if (builder.client) {
okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE);
}
hostName = builder.hostName;
if (protocol == Protocol.HTTP_2) {
variant = new Http2();
// Like newSingleThreadExecutor, except lazy creates the thread.
pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
Util.threadFactory(String.format("OkHttp %s Push Observer", hostName), true));
// 1 less than SPDY http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.9.2
peerSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535);
peerSettings.set(Settings.MAX_FRAME_SIZE, 0, Http2.INITIAL_MAX_FRAME_SIZE);
} else if (protocol == Protocol.SPDY_3) {
variant = new Spdy3();
pushExecutor = null;
} else {
throw new AssertionError(protocol);
}
bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
socket = builder.socket;
frameWriter = variant.newWriter(Okio.buffer(Okio.sink(builder.socket)), client);
readerRunnable = new Reader();
new Thread(readerRunnable).start(); // Not a daemon thread.
}
/** The protocol as selected using ALPN. */
public Protocol getProtocol() {
return protocol;
}
/**
* Returns the number of {@link SpdyStream#isOpen() open streams} on this
* connection.
*/
public synchronized int openStreamCount() {
return streams.size();
}
synchronized SpdyStream getStream(int id) {
return streams.get(id);
}
synchronized SpdyStream removeStream(int streamId) {
SpdyStream stream = streams.remove(streamId);
if (stream != null && streams.isEmpty()) {
setIdle(true);
}
return stream;
}
private synchronized void setIdle(boolean value) {
idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE;
}
/** Returns true if this connection is idle. */
public synchronized boolean isIdle() {
return idleStartTimeNs != Long.MAX_VALUE;
}
/**
* Returns the time in ns when this connection became idle or Long.MAX_VALUE
* if connection is not idle.
*/
public synchronized long getIdleStartTimeNs() {
return idleStartTimeNs;
}
/**
* Returns a new server-initiated stream.
*
* @param associatedStreamId the stream that triggered the sender to create
* this stream.
* @param out true to create an output stream that we can use to send data
* to the remote peer. Corresponds to {@code FLAG_FIN}.
*/
public SpdyStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)
throws IOException {
if (client) throw new IllegalStateException("Client cannot push requests.");
if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2");
return newStream(associatedStreamId, requestHeaders, out, false);
}
/**
* Returns a new locally-initiated stream.
*
* @param out true to create an output stream that we can use to send data to the remote peer.
* Corresponds to {@code FLAG_FIN}.
* @param in true to create an input stream that the remote peer can use to send data to us.
* Corresponds to {@code FLAG_UNIDIRECTIONAL}.
*/
public SpdyStream newStream(List<Header> requestHeaders, boolean out, boolean in)
throws IOException {
return newStream(0, requestHeaders, out, in);
}
private SpdyStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out,
boolean in) throws IOException {
boolean outFinished = !out;
boolean inFinished = !in;
SpdyStream stream;
int streamId;
synchronized (frameWriter) {
synchronized (this) {
if (shutdown) {
throw new IOException("shutdown");
}
streamId = nextStreamId;
nextStreamId += 2;
stream = new SpdyStream(streamId, this, outFinished, inFinished, requestHeaders);
if (stream.isOpen()) {
streams.put(streamId, stream);
setIdle(false);
}
}
if (associatedStreamId == 0) {
frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId,
requestHeaders);
} else if (client) {
throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
} else { // HTTP/2 has a PUSH_PROMISE frame.
frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
}
}
if (!out) {
frameWriter.flush();
}
return stream;
}
void writeSynReply(int streamId, boolean outFinished, List<Header> alternating)
throws IOException {
frameWriter.synReply(outFinished, streamId, alternating);
}
/**
* Callers of this method are not thread safe, and sometimes on application
* threads. Most often, this method will be called to send a buffer worth of
* data to the peer.
* <p>
* Writes are subject to the write window of the stream and the connection.
* Until there is a window sufficient to send {@code byteCount}, the caller
* will block. For example, a user of {@code HttpURLConnection} who flushes
* more bytes to the output stream than the connection's write window will
* block.
* <p>
* Zero {@code byteCount} writes are not subject to flow control and
* will not block. The only use case for zero {@code byteCount} is closing
* a flushed output stream.
*/
public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
throws IOException {
if (byteCount == 0) { // Empty data frames are not flow-controlled.
frameWriter.data(outFinished, streamId, buffer, 0);
return;
}
while (byteCount > 0) {
int toWrite;
synchronized (SpdyConnection.this) {
try {
while (bytesLeftInWriteWindow <= 0) {
SpdyConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
toWrite = Math.min(toWrite, frameWriter.maxDataLength());
bytesLeftInWriteWindow -= toWrite;
}
byteCount -= toWrite;
frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
}
}
/**
* {@code delta} will be negative if a settings frame initial window is
* smaller than the last.
*/
void addBytesToWriteWindow(long delta) {
bytesLeftInWriteWindow += delta;
if (delta > 0) SpdyConnection.this.notifyAll();
}
void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
@Override public void execute() {
try {
writeSynReset(streamId, errorCode);
} catch (IOException ignored) {
}
}
});
}
void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
frameWriter.rstStream(streamId, statusCode);
}
void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) {
executor.execute(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) {
@Override public void execute() {
try {
frameWriter.windowUpdate(streamId, unacknowledgedBytesRead);
} catch (IOException ignored) {
}
}
});
}
/**
* Sends a ping frame to the peer. Use the returned object to await the
* ping's response and observe its round trip time.
*/
public Ping ping() throws IOException {
Ping ping = new Ping();
int pingId;
synchronized (this) {
if (shutdown) {
throw new IOException("shutdown");
}
pingId = nextPingId;
nextPingId += 2;
if (pings == null) pings = new HashMap<>();
pings.put(pingId, ping);
}
writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping);
return ping;
}
private void writePingLater(
final boolean reply, final int payload1, final int payload2, final Ping ping) {
executor.execute(new NamedRunnable("OkHttp %s ping %08x%08x",
hostName, payload1, payload2) {
@Override public void execute() {
try {
writePing(reply, payload1, payload2, ping);
} catch (IOException ignored) {
}
}
});
}
private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException {
synchronized (frameWriter) {
// Observe the sent time immediately before performing I/O.
if (ping != null) ping.send();
frameWriter.ping(reply, payload1, payload2);
}
}
private synchronized Ping removePing(int id) {
return pings != null ? pings.remove(id) : null;
}
public void flush() throws IOException {
frameWriter.flush();
}
/**
* Degrades this connection such that new streams can neither be created
* locally, nor accepted from the remote peer. Existing streams are not
* impacted. This is intended to permit an endpoint to gracefully stop
* accepting new requests without harming previously established streams.
*/
public void shutdown(ErrorCode statusCode) throws IOException {
synchronized (frameWriter) {
int lastGoodStreamId;
synchronized (this) {
if (shutdown) {
return;
}
shutdown = true;
lastGoodStreamId = this.lastGoodStreamId;
}
// TODO: propagate exception message into debugData
frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY);
}
}
/**
* Closes this connection. This cancels all open streams and unanswered
* pings. It closes the underlying input and output streams and shuts down
* internal executor services.
*/
@Override public void close() throws IOException {
close(ErrorCode.NO_ERROR, ErrorCode.CANCEL);
}
private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException {
assert (!Thread.holdsLock(this));
IOException thrown = null;
try {
shutdown(connectionCode);
} catch (IOException e) {
thrown = e;
}
SpdyStream[] streamsToClose = null;
Ping[] pingsToCancel = null;
synchronized (this) {
if (!streams.isEmpty()) {
streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]);
streams.clear();
setIdle(false);
}
if (pings != null) {
pingsToCancel = pings.values().toArray(new Ping[pings.size()]);
pings = null;
}
}
if (streamsToClose != null) {
for (SpdyStream stream : streamsToClose) {
try {
stream.close(streamCode);
} catch (IOException e) {
if (thrown != null) thrown = e;
}
}
}
if (pingsToCancel != null) {
for (Ping ping : pingsToCancel) {
ping.cancel();
}
}
// Close the writer to release its resources (such as deflaters).
try {
frameWriter.close();
} catch (IOException e) {
if (thrown == null) thrown = e;
}
// Close the socket to break out the reader thread, which will clean up after itself.
try {
socket.close();
} catch (IOException e) {
thrown = e;
}
if (thrown != null) throw thrown;
}
/**
* Sends a connection header if the current variant requires it. This should
* be called after {@link Builder#build} for all new connections.
*/
public void sendConnectionPreface() throws IOException {
frameWriter.connectionPreface();
frameWriter.settings(okHttpSettings);
int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE);
if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
frameWriter.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
}
}
public static class Builder {
private String hostName;
private Socket socket;
private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
private Protocol protocol = Protocol.SPDY_3;
private PushObserver pushObserver = PushObserver.CANCEL;
private boolean client;
public Builder(boolean client, Socket socket) throws IOException {
this(((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(), client, socket);
}
/**
* @param client true if this peer initiated the connection; false if this
* peer accepted the connection.
*/
public Builder(String hostName, boolean client, Socket socket) throws IOException {
this.hostName = hostName;
this.client = client;
this.socket = socket;
}
public Builder handler(IncomingStreamHandler handler) {
this.handler = handler;
return this;
}
public Builder protocol(Protocol protocol) {
this.protocol = protocol;
return this;
}
public Builder pushObserver(PushObserver pushObserver) {
this.pushObserver = pushObserver;
return this;
}
public SpdyConnection build() throws IOException {
return new SpdyConnection(this);
}
}
/**
* Methods in this class must not lock FrameWriter. If a method needs to
* write a frame, create an async task to do so.
*/
class Reader extends NamedRunnable implements FrameReader.Handler {
FrameReader frameReader;
private Reader() {
super("OkHttp %s", hostName);
}
@Override protected void execute() {
ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
try {
frameReader = variant.newReader(Okio.buffer(Okio.source(socket)), client);
if (!client) {
frameReader.readConnectionPreface();
}
while (frameReader.nextFrame(this)) {
}
connectionErrorCode = ErrorCode.NO_ERROR;
streamErrorCode = ErrorCode.CANCEL;
} catch (IOException e) {
connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
streamErrorCode = ErrorCode.PROTOCOL_ERROR;
} finally {
try {
close(connectionErrorCode, streamErrorCode);
} catch (IOException ignored) {
}
Util.closeQuietly(frameReader);
}
}
@Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
throws IOException {
if (pushedStream(streamId)) {
pushDataLater(streamId, source, length, inFinished);
return;
}
SpdyStream dataStream = getStream(streamId);
if (dataStream == null) {
writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
source.skip(length);
return;
}
dataStream.receiveData(source, length);
if (inFinished) {
dataStream.receiveFin();
}
}
@Override public void headers(boolean outFinished, boolean inFinished, int streamId,
int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) {
if (pushedStream(streamId)) {
pushHeadersLater(streamId, headerBlock, inFinished);
return;
}
SpdyStream stream;
synchronized (SpdyConnection.this) {
// If we're shutdown, don't bother with this stream.
if (shutdown) return;
stream = getStream(streamId);
if (stream == null) {
// The headers claim to be for an existing stream, but we don't have one.
if (headersMode.failIfStreamAbsent()) {
writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
return;
}
// If the stream ID is less than the last created ID, assume it's already closed.
if (streamId <= lastGoodStreamId) return;
// If the stream ID is in the client's namespace, assume it's already closed.
if (streamId % 2 == nextStreamId % 2) return;
// Create a stream.
final SpdyStream newStream = new SpdyStream(streamId, SpdyConnection.this, outFinished,
inFinished, headerBlock);
lastGoodStreamId = streamId;
streams.put(streamId, newStream);
executor.execute(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
@Override public void execute() {
try {
handler.receive(newStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
return;
}
}
// The headers claim to be for a new stream, but we already have one.
if (headersMode.failIfStreamPresent()) {
stream.closeLater(ErrorCode.PROTOCOL_ERROR);
removeStream(streamId);
return;
}
// Update an existing stream.
stream.receiveHeaders(headerBlock, headersMode);
if (inFinished) stream.receiveFin();
}
@Override public void rstStream(int streamId, ErrorCode errorCode) {
if (pushedStream(streamId)) {
pushResetLater(streamId, errorCode);
return;
}
SpdyStream rstStream = removeStream(streamId);
if (rstStream != null) {
rstStream.receiveRstStream(errorCode);
}
}
@Override public void settings(boolean clearPrevious, Settings newSettings) {
long delta = 0;
SpdyStream[] streamsToNotify = null;
synchronized (SpdyConnection.this) {
int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
if (clearPrevious) peerSettings.clear();
peerSettings.merge(newSettings);
if (getProtocol() == Protocol.HTTP_2) {
ackSettingsLater(newSettings);
}
int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
delta = peerInitialWindowSize - priorWriteWindowSize;
if (!receivedInitialPeerSettings) {
addBytesToWriteWindow(delta);
receivedInitialPeerSettings = true;
}
if (!streams.isEmpty()) {
streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]);
}
}
}
if (streamsToNotify != null && delta != 0) {
for (SpdyStream stream : streamsToNotify) {
synchronized (stream) {
stream.addBytesToWriteWindow(delta);
}
}
}
}
private void ackSettingsLater(final Settings peerSettings) {
executor.execute(new NamedRunnable("OkHttp %s ACK Settings", hostName) {
@Override public void execute() {
try {
frameWriter.ackSettings(peerSettings);
} catch (IOException ignored) {
}
}
});
}
@Override public void ackSettings() {
// TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT.
}
@Override public void ping(boolean reply, int payload1, int payload2) {
if (reply) {
Ping ping = removePing(payload1);
if (ping != null) {
ping.receive();
}
} else {
// Send a reply to a client ping if this is a server and vice versa.
writePingLater(true, payload1, payload2, null);
}
}
@Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
if (debugData.size() > 0) { // TODO: log the debugData
}
// Copy the streams first. We don't want to hold a lock when we call receiveRstStream().
SpdyStream[] streamsCopy;
synchronized (SpdyConnection.this) {
streamsCopy = streams.values().toArray(new SpdyStream[streams.size()]);
shutdown = true;
}
// Fail all streams created after the last good stream ID.
for (SpdyStream spdyStream : streamsCopy) {
if (spdyStream.getId() > lastGoodStreamId && spdyStream.isLocallyInitiated()) {
spdyStream.receiveRstStream(ErrorCode.REFUSED_STREAM);
removeStream(spdyStream.getId());
}
}
}
@Override public void windowUpdate(int streamId, long windowSizeIncrement) {
if (streamId == 0) {
synchronized (SpdyConnection.this) {
bytesLeftInWriteWindow += windowSizeIncrement;
SpdyConnection.this.notifyAll();
}
} else {
SpdyStream stream = getStream(streamId);
if (stream != null) {
synchronized (stream) {
stream.addBytesToWriteWindow(windowSizeIncrement);
}
}
}
}
@Override public void priority(int streamId, int streamDependency, int weight,
boolean exclusive) {
// TODO: honor priority.
}
@Override
public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) {
pushRequestLater(promisedStreamId, requestHeaders);
}
@Override public void alternateService(int streamId, String origin, ByteString protocol,
String host, int port, long maxAge) {
// TODO: register alternate service.
}
}
/** Even, positive numbered streams are pushed streams in HTTP/2. */
private boolean pushedStream(int streamId) {
return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0;
}
// Guarded by this.
private final Set<Integer> currentPushRequests = new LinkedHashSet<>();
private void pushRequestLater(final int streamId, final List<Header> requestHeaders) {
synchronized (this) {
if (currentPushRequests.contains(streamId)) {
writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
return;
}
currentPushRequests.add(streamId);
}
pushExecutor.execute(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) {
@Override public void execute() {
boolean cancel = pushObserver.onRequest(streamId, requestHeaders);
try {
if (cancel) {
frameWriter.rstStream(streamId, ErrorCode.CANCEL);
synchronized (SpdyConnection.this) {
currentPushRequests.remove(streamId);
}
}
} catch (IOException ignored) {
}
}
});
}
private void pushHeadersLater(final int streamId, final List<Header> requestHeaders,
final boolean inFinished) {
pushExecutor.execute(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) {
@Override public void execute() {
boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished);
try {
if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
if (cancel || inFinished) {
synchronized (SpdyConnection.this) {
currentPushRequests.remove(streamId);
}
}
} catch (IOException ignored) {
}
}
});
}
/**
* Eagerly reads {@code byteCount} bytes from the source before launching a background task to
* process the data. This avoids corrupting the stream.
*/
private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount,
final boolean inFinished) throws IOException {
final Buffer buffer = new Buffer();
source.require(byteCount); // Eagerly read the frame before firing client thread.
source.read(buffer, byteCount);
if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount);
pushExecutor.execute(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) {
@Override public void execute() {
try {
boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished);
if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
if (cancel || inFinished) {
synchronized (SpdyConnection.this) {
currentPushRequests.remove(streamId);
}
}
} catch (IOException ignored) {
}
}
});
}
private void pushResetLater(final int streamId, final ErrorCode errorCode) {
pushExecutor.execute(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) {
@Override public void execute() {
pushObserver.onReset(streamId, errorCode);
synchronized (SpdyConnection.this) {
currentPushRequests.remove(streamId);
}
}
});
}
}