blob: cdbd7aa740013b4499bb0a0cc8a070c442109f1f [file] [log] [blame]
/*
* Copyright (C) 2011 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.internal.spdy;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import okio.AsyncTimeout;
import okio.Buffer;
import okio.BufferedSource;
import okio.Sink;
import okio.Source;
import okio.Timeout;
import static com.squareup.okhttp.internal.spdy.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
/** A logical bidirectional stream. */
public final class SpdyStream {
// Internal state is guarded by this. No long-running or potentially
// blocking operations are performed while the lock is held.
/**
* The total number of bytes consumed by the application (with {@link
* SpdyDataSource#read}), but not yet acknowledged by sending a {@code
* WINDOW_UPDATE} frame on this stream.
*/
// Visible for testing
long unacknowledgedBytesRead = 0;
/**
* Count of bytes that can be written on the stream before receiving a
* window update. Even if this is positive, writes will block until there
* available bytes in {@code connection.bytesLeftInWriteWindow}.
*/
// guarded by this
long bytesLeftInWriteWindow;
private final int id;
private final SpdyConnection connection;
/** Headers sent by the stream initiator. Immutable and non null. */
private final List<Header> requestHeaders;
/** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */
private List<Header> responseHeaders;
private final SpdyDataSource source;
final SpdyDataSink sink;
private final SpdyTimeout readTimeout = new SpdyTimeout();
private final SpdyTimeout writeTimeout = new SpdyTimeout();
/**
* The reason why this stream was abnormally closed. If there are multiple
* reasons to abnormally close this stream (such as both peers closing it
* near-simultaneously) then this is the first reason known to this peer.
*/
private ErrorCode errorCode = null;
SpdyStream(int id, SpdyConnection connection, boolean outFinished, boolean inFinished,
List<Header> requestHeaders) {
if (connection == null) throw new NullPointerException("connection == null");
if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
this.id = id;
this.connection = connection;
this.bytesLeftInWriteWindow =
connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
this.source = new SpdyDataSource(
connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE));
this.sink = new SpdyDataSink();
this.source.finished = inFinished;
this.sink.finished = outFinished;
this.requestHeaders = requestHeaders;
}
public int getId() {
return id;
}
/**
* Returns true if this stream is open. A stream is open until either:
* <ul>
* <li>A {@code SYN_RESET} frame abnormally terminates the stream.
* <li>Both input and output streams have transmitted all data and
* headers.
* </ul>
* Note that the input stream may continue to yield data even after a stream
* reports itself as not open. This is because input data is buffered.
*/
public synchronized boolean isOpen() {
if (errorCode != null) {
return false;
}
if ((source.finished || source.closed)
&& (sink.finished || sink.closed)
&& responseHeaders != null) {
return false;
}
return true;
}
/** Returns true if this stream was created by this peer. */
public boolean isLocallyInitiated() {
boolean streamIsClient = ((id & 1) == 1);
return connection.client == streamIsClient;
}
public SpdyConnection getConnection() {
return connection;
}
public List<Header> getRequestHeaders() {
return requestHeaders;
}
/**
* Returns the stream's response headers, blocking if necessary if they
* have not been received yet.
*/
public synchronized List<Header> getResponseHeaders() throws IOException {
readTimeout.enter();
try {
while (responseHeaders == null && errorCode == null) {
waitForIo();
}
} finally {
readTimeout.exitAndThrowIfTimedOut();
}
if (responseHeaders != null) return responseHeaders;
throw new IOException("stream was reset: " + errorCode);
}
/**
* Returns the reason why this stream was closed, or null if it closed
* normally or has not yet been closed.
*/
public synchronized ErrorCode getErrorCode() {
return errorCode;
}
/**
* Sends a reply to an incoming stream.
*
* @param out true to create an output stream that we can use to send data
* to the remote peer. Corresponds to {@code FLAG_FIN}.
*/
public void reply(List<Header> responseHeaders, boolean out) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
boolean outFinished = false;
synchronized (this) {
if (responseHeaders == null) {
throw new NullPointerException("responseHeaders == null");
}
if (this.responseHeaders != null) {
throw new IllegalStateException("reply already sent");
}
this.responseHeaders = responseHeaders;
if (!out) {
this.sink.finished = true;
outFinished = true;
}
}
connection.writeSynReply(id, outFinished, responseHeaders);
if (outFinished) {
connection.flush();
}
}
public Timeout readTimeout() {
return readTimeout;
}
public Timeout writeTimeout() {
return writeTimeout;
}
/** Returns a source that reads data from the peer. */
public Source getSource() {
return source;
}
/**
* Returns a sink that can be used to write data to the peer.
*
* @throws IllegalStateException if this stream was initiated by the peer
* and a {@link #reply} has not yet been sent.
*/
public Sink getSink() {
synchronized (this) {
if (responseHeaders == null && !isLocallyInitiated()) {
throw new IllegalStateException("reply before requesting the sink");
}
}
return sink;
}
/**
* Abnormally terminate this stream. This blocks until the {@code RST_STREAM}
* frame has been transmitted.
*/
public void close(ErrorCode rstStatusCode) throws IOException {
if (!closeInternal(rstStatusCode)) {
return; // Already closed.
}
connection.writeSynReset(id, rstStatusCode);
}
/**
* Abnormally terminate this stream. This enqueues a {@code RST_STREAM}
* frame and returns immediately.
*/
public void closeLater(ErrorCode errorCode) {
if (!closeInternal(errorCode)) {
return; // Already closed.
}
connection.writeSynResetLater(id, errorCode);
}
/** Returns true if this stream was closed. */
private boolean closeInternal(ErrorCode errorCode) {
assert (!Thread.holdsLock(this));
synchronized (this) {
if (this.errorCode != null) {
return false;
}
if (source.finished && sink.finished) {
return false;
}
this.errorCode = errorCode;
notifyAll();
}
connection.removeStream(id);
return true;
}
void receiveHeaders(List<Header> headers, HeadersMode headersMode) {
assert (!Thread.holdsLock(SpdyStream.this));
ErrorCode errorCode = null;
boolean open = true;
synchronized (this) {
if (responseHeaders == null) {
if (headersMode.failIfHeadersAbsent()) {
errorCode = ErrorCode.PROTOCOL_ERROR;
} else {
responseHeaders = headers;
open = isOpen();
notifyAll();
}
} else {
if (headersMode.failIfHeadersPresent()) {
errorCode = ErrorCode.STREAM_IN_USE;
} else {
List<Header> newHeaders = new ArrayList<>();
newHeaders.addAll(responseHeaders);
newHeaders.addAll(headers);
this.responseHeaders = newHeaders;
}
}
}
if (errorCode != null) {
closeLater(errorCode);
} else if (!open) {
connection.removeStream(id);
}
}
void receiveData(BufferedSource in, int length) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
this.source.receive(in, length);
}
void receiveFin() {
assert (!Thread.holdsLock(SpdyStream.this));
boolean open;
synchronized (this) {
this.source.finished = true;
open = isOpen();
notifyAll();
}
if (!open) {
connection.removeStream(id);
}
}
synchronized void receiveRstStream(ErrorCode errorCode) {
if (this.errorCode == null) {
this.errorCode = errorCode;
notifyAll();
}
}
/**
* A source that reads the incoming data frames of a stream. Although this
* class uses synchronization to safely receive incoming data frames, it is
* not intended for use by multiple readers.
*/
private final class SpdyDataSource implements Source {
/** Buffer to receive data from the network into. Only accessed by the reader thread. */
private final Buffer receiveBuffer = new Buffer();
/** Buffer with readable data. Guarded by SpdyStream.this. */
private final Buffer readBuffer = new Buffer();
/** Maximum number of bytes to buffer before reporting a flow control error. */
private final long maxByteCount;
/** True if the caller has closed this stream. */
private boolean closed;
/**
* True if either side has cleanly shut down this stream. We will
* receive no more bytes beyond those already in the buffer.
*/
private boolean finished;
private SpdyDataSource(long maxByteCount) {
this.maxByteCount = maxByteCount;
}
@Override public long read(Buffer sink, long byteCount)
throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
long read;
synchronized (SpdyStream.this) {
waitUntilReadable();
checkNotClosed();
if (readBuffer.size() == 0) return -1; // This source is exhausted.
// Move bytes from the read buffer into the caller's buffer.
read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
// Flow control: notify the peer that we're ready for more data!
unacknowledgedBytesRead += read;
if (unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
unacknowledgedBytesRead = 0;
}
}
// Update connection.unacknowledgedBytesRead outside the stream lock.
synchronized (connection) { // Multiple application threads may hit this section.
connection.unacknowledgedBytesRead += read;
if (connection.unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
connection.unacknowledgedBytesRead = 0;
}
}
return read;
}
/** Returns once the source is either readable or finished. */
private void waitUntilReadable() throws IOException {
readTimeout.enter();
try {
while (readBuffer.size() == 0 && !finished && !closed && errorCode == null) {
waitForIo();
}
} finally {
readTimeout.exitAndThrowIfTimedOut();
}
}
void receive(BufferedSource in, long byteCount) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
while (byteCount > 0) {
boolean finished;
boolean flowControlError;
synchronized (SpdyStream.this) {
finished = this.finished;
flowControlError = byteCount + readBuffer.size() > maxByteCount;
}
// If the peer sends more data than we can handle, discard it and close the connection.
if (flowControlError) {
in.skip(byteCount);
closeLater(ErrorCode.FLOW_CONTROL_ERROR);
return;
}
// Discard data received after the stream is finished. It's probably a benign race.
if (finished) {
in.skip(byteCount);
return;
}
// Fill the receive buffer without holding any locks.
long read = in.read(receiveBuffer, byteCount);
if (read == -1) throw new EOFException();
byteCount -= read;
// Move the received data to the read buffer to the reader can read it.
synchronized (SpdyStream.this) {
boolean wasEmpty = readBuffer.size() == 0;
readBuffer.writeAll(receiveBuffer);
if (wasEmpty) {
SpdyStream.this.notifyAll();
}
}
}
}
@Override public Timeout timeout() {
return readTimeout;
}
@Override public void close() throws IOException {
synchronized (SpdyStream.this) {
closed = true;
readBuffer.clear();
SpdyStream.this.notifyAll();
}
cancelStreamIfNecessary();
}
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException("stream closed");
}
if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
}
}
}
private void cancelStreamIfNecessary() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
boolean open;
boolean cancel;
synchronized (this) {
cancel = !source.finished && source.closed && (sink.finished || sink.closed);
open = isOpen();
}
if (cancel) {
// RST this stream to prevent additional data from being sent. This
// is safe because the input stream is closed (we won't use any
// further bytes) and the output stream is either finished or closed
// (so RSTing both streams doesn't cause harm).
SpdyStream.this.close(ErrorCode.CANCEL);
} else if (!open) {
connection.removeStream(id);
}
}
/**
* A sink that writes outgoing data frames of a stream. This class is not
* thread safe.
*/
final class SpdyDataSink implements Sink {
private static final long EMIT_BUFFER_SIZE = 16384;
/**
* Buffer of outgoing data. This batches writes of small writes into this sink as larges
* frames written to the outgoing connection. Batching saves the (small) framing overhead.
*/
private final Buffer sendBuffer = new Buffer();
private boolean closed;
/**
* True if either side has cleanly shut down this stream. We shall send
* no more bytes.
*/
private boolean finished;
@Override public void write(Buffer source, long byteCount) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
sendBuffer.write(source, byteCount);
while (sendBuffer.size() >= EMIT_BUFFER_SIZE) {
emitDataFrame(false);
}
}
/**
* Emit a single data frame to the connection. The frame's size be limited by this stream's
* write window. This method will block until the write window is nonempty.
*/
private void emitDataFrame(boolean outFinished) throws IOException {
long toWrite;
synchronized (SpdyStream.this) {
writeTimeout.enter();
try {
while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) {
waitForIo(); // Wait until we receive a WINDOW_UPDATE for this stream.
}
} finally {
writeTimeout.exitAndThrowIfTimedOut();
}
checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size());
bytesLeftInWriteWindow -= toWrite;
}
writeTimeout.enter();
try {
connection.writeData(id, outFinished && toWrite == sendBuffer.size(), sendBuffer, toWrite);
} finally {
writeTimeout.exitAndThrowIfTimedOut();
}
}
@Override public void flush() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
synchronized (SpdyStream.this) {
checkOutNotClosed();
}
while (sendBuffer.size() > 0) {
emitDataFrame(false);
connection.flush();
}
}
@Override public Timeout timeout() {
return writeTimeout;
}
@Override public void close() throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
synchronized (SpdyStream.this) {
if (closed) return;
}
if (!sink.finished) {
// Emit the remaining data, setting the END_STREAM flag on the last frame.
if (sendBuffer.size() > 0) {
while (sendBuffer.size() > 0) {
emitDataFrame(true);
}
} else {
// Send an empty frame just so we can set the END_STREAM flag.
connection.writeData(id, true, null, 0);
}
}
synchronized (SpdyStream.this) {
closed = true;
}
connection.flush();
cancelStreamIfNecessary();
}
}
/**
* {@code delta} will be negative if a settings frame initial window is
* smaller than the last.
*/
void addBytesToWriteWindow(long delta) {
bytesLeftInWriteWindow += delta;
if (delta > 0) SpdyStream.this.notifyAll();
}
private void checkOutNotClosed() throws IOException {
if (sink.closed) {
throw new IOException("stream closed");
} else if (sink.finished) {
throw new IOException("stream finished");
} else if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
}
}
/**
* Like {@link #wait}, but throws an {@code InterruptedIOException} when
* interrupted instead of the more awkward {@link InterruptedException}.
*/
private void waitForIo() throws InterruptedIOException {
try {
wait();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
/**
* The Okio timeout watchdog will call {@link #timedOut} if the timeout is
* reached. In that case we close the stream (asynchronously) which will
* notify the waiting thread.
*/
class SpdyTimeout extends AsyncTimeout {
@Override protected void timedOut() {
closeLater(ErrorCode.CANCEL);
}
@Override protected IOException newTimeoutException(IOException cause) {
SocketTimeoutException socketTimeoutException = new SocketTimeoutException("timeout");
if (cause != null) {
socketTimeoutException.initCause(cause);
}
return socketTimeoutException;
}
public void exitAndThrowIfTimedOut() throws IOException {
if (exit()) throw newTimeoutException(null /* cause */);
}
}
}