Apply upstream changes for timeout / pooled connection issue

This change contains two upstream commits:

Okio:

Upstream commit: a4aee7d9594a2eae48cd274163003e0566719b91

Author: jwilson <jwilson@squareup.com>
Date:   Sat May 16 08:19:39 2015 -0400

    Change Timeout.throwIfReached() to throw InterruptedIOException

    Previously this was throwing IOException, but nothing was anticipated
    that. This is slightly semantically incorrect; the thread wasn't
    interrupted. But it's much more convenient to use a single exception
    type for both timeouts.

    Also add a new type, ForwardingTimeout.

OkHttp:

Upstream commit: 4df674f8c5e2c07d881b4f2780922c7d15940814

Author: jwilson <jwilson@squareup.com>
Date:   Sat May 16 10:35:59 2015 -0400

    Don't share timeouts between pooled connections.

    This was causing crashes.

    Closes https://github.com/square/okio/issues/133

Bug: 21799272
Change-Id: I6f38fa8aae7dfaa74361500b75ecd09f41eb1f91
diff --git a/okhttp-tests/src/test/java/com/squareup/okhttp/CallTest.java b/okhttp-tests/src/test/java/com/squareup/okhttp/CallTest.java
index 93f2b5b..9b908cc 100644
--- a/okhttp-tests/src/test/java/com/squareup/okhttp/CallTest.java
+++ b/okhttp-tests/src/test/java/com/squareup/okhttp/CallTest.java
@@ -700,6 +700,72 @@
     }
   }
 
+  @Test public void reusedSinksGetIndependentTimeoutInstances() throws Exception {
+    server.enqueue(new MockResponse());
+    server.enqueue(new MockResponse());
+
+    // Call 1: set a deadline on the request body.
+    RequestBody requestBody1 = new RequestBody() {
+      @Override public MediaType contentType() {
+        return MediaType.parse("text/plain");
+      }
+      @Override public void writeTo(BufferedSink sink) throws IOException {
+        sink.writeUtf8("abc");
+        sink.timeout().deadline(5, TimeUnit.SECONDS);
+      }
+    };
+    Request request1 = new Request.Builder()
+        .url(server.getUrl("/"))
+        .method("POST", requestBody1)
+        .build();
+    Response response1 = client.newCall(request1).execute();
+    assertEquals(200, response1.code());
+
+    // Call 2: check for the absence of a deadline on the request body.
+    RequestBody requestBody2 = new RequestBody() {
+      @Override public MediaType contentType() {
+        return MediaType.parse("text/plain");
+      }
+      @Override public void writeTo(BufferedSink sink) throws IOException {
+        assertFalse(sink.timeout().hasDeadline());
+        sink.writeUtf8("def");
+      }
+    };
+    Request request2 = new Request.Builder()
+        .url(server.getUrl("/"))
+        .method("POST", requestBody2)
+        .build();
+    Response response2 = client.newCall(request2).execute();
+    assertEquals(200, response2.code());
+
+    // Use sequence numbers to confirm the connection was pooled.
+    assertEquals(0, server.takeRequest().getSequenceNumber());
+    assertEquals(1, server.takeRequest().getSequenceNumber());
+  }
+
+  @Test public void reusedSourcesGetIndependentTimeoutInstances() throws Exception {
+    server.enqueue(new MockResponse().setBody("abc"));
+    server.enqueue(new MockResponse().setBody("def"));
+
+    // Call 1: set a deadline on the response body.
+    Request request1 = new Request.Builder().url(server.getUrl("/")).build();
+    Response response1 = client.newCall(request1).execute();
+    BufferedSource body1 = response1.body().source();
+    assertEquals("abc", body1.readUtf8());
+    body1.timeout().deadline(5, TimeUnit.SECONDS);
+
+    // Call 2: check for the absence of a deadline on the request body.
+    Request request2 = new Request.Builder().url(server.getUrl("/")).build();
+    Response response2 = client.newCall(request2).execute();
+    BufferedSource body2 = response2.body().source();
+    assertEquals("def", body2.readUtf8());
+    assertFalse(body2.timeout().hasDeadline());
+
+    // Use sequence numbers to confirm the connection was pooled.
+    assertEquals(0, server.takeRequest().getSequenceNumber());
+    assertEquals(1, server.takeRequest().getSequenceNumber());
+  }
+
   @Test public void tls() throws Exception {
     server.get().useHttps(sslContext.getSocketFactory(), false);
     server.enqueue(new MockResponse()
diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java
index d07b8b7..0d7d4e5 100644
--- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java
+++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpConnection.java
@@ -30,6 +30,7 @@
 import okio.Buffer;
 import okio.BufferedSink;
 import okio.BufferedSource;
+import okio.ForwardingTimeout;
 import okio.Okio;
 import okio.Sink;
 import okio.Source;
@@ -265,8 +266,21 @@
     return source;
   }
 
+  /**
+   * Sets the delegate of {@code timeout} to {@link Timeout#NONE} and resets its underlying timeout
+   * to the default configuration. Use this to avoid unexpected sharing of timeouts between pooled
+   * connections.
+   */
+  private void detachTimeout(ForwardingTimeout timeout) {
+    Timeout oldDelegate = timeout.delegate();
+    timeout.setDelegate(Timeout.NONE);
+    oldDelegate.clearDeadline();
+    oldDelegate.clearTimeout();
+  }
+
   /** An HTTP body with a fixed length known in advance. */
   private final class FixedLengthSink implements Sink {
+    private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
     private boolean closed;
     private long bytesRemaining;
 
@@ -275,7 +289,7 @@
     }
 
     @Override public Timeout timeout() {
-      return sink.timeout();
+      return timeout;
     }
 
     @Override public void write(Buffer source, long byteCount) throws IOException {
@@ -298,6 +312,7 @@
       if (closed) return;
       closed = true;
       if (bytesRemaining > 0) throw new ProtocolException("unexpected end of stream");
+      detachTimeout(timeout);
       state = STATE_READ_RESPONSE_HEADERS;
     }
   }
@@ -308,10 +323,11 @@
    * sink with this sink.
    */
   private final class ChunkedSink implements Sink {
+    private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
     private boolean closed;
 
     @Override public Timeout timeout() {
-      return sink.timeout();
+      return timeout;
     }
 
     @Override public void write(Buffer source, long byteCount) throws IOException {
@@ -333,15 +349,17 @@
       if (closed) return;
       closed = true;
       sink.writeUtf8("0\r\n\r\n");
+      detachTimeout(timeout);
       state = STATE_READ_RESPONSE_HEADERS;
     }
   }
 
   private abstract class AbstractSource implements Source {
+    protected final ForwardingTimeout timeout = new ForwardingTimeout(source.timeout());
     protected boolean closed;
 
     @Override public Timeout timeout() {
-      return source.timeout();
+      return timeout;
     }
 
     /**
@@ -351,6 +369,8 @@
     protected final void endOfInput(boolean recyclable) throws IOException {
       if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
 
+      detachTimeout(timeout);
+
       state = STATE_IDLE;
       if (recyclable && onIdle == ON_IDLE_POOL) {
         onIdle = ON_IDLE_HOLD; // Set the on idle policy back to the default.
diff --git a/okio/okio/src/main/java/okio/ForwardingTimeout.java b/okio/okio/src/main/java/okio/ForwardingTimeout.java
new file mode 100644
index 0000000..e8d384b
--- /dev/null
+++ b/okio/okio/src/main/java/okio/ForwardingTimeout.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2015 Square, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package okio;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/** A {@link Timeout} which forwards calls to another. Useful for subclassing. */
+public class ForwardingTimeout extends Timeout {
+  private Timeout delegate;
+
+  public ForwardingTimeout(Timeout delegate) {
+    if (delegate == null) throw new IllegalArgumentException("delegate == null");
+    this.delegate = delegate;
+  }
+
+  /** {@link Timeout} instance to which this instance is currently delegating. */
+  public final Timeout delegate() {
+    return delegate;
+  }
+
+  public final ForwardingTimeout setDelegate(Timeout delegate) {
+    if (delegate == null) throw new IllegalArgumentException("delegate == null");
+    this.delegate = delegate;
+    return this;
+  }
+
+  @Override public Timeout timeout(long timeout, TimeUnit unit) {
+    return delegate.timeout(timeout, unit);
+  }
+
+  @Override public long timeoutNanos() {
+    return delegate.timeoutNanos();
+  }
+
+  @Override public boolean hasDeadline() {
+    return delegate.hasDeadline();
+  }
+
+  @Override public long deadlineNanoTime() {
+    return delegate.deadlineNanoTime();
+  }
+
+  @Override public Timeout deadlineNanoTime(long deadlineNanoTime) {
+    return delegate.deadlineNanoTime(deadlineNanoTime);
+  }
+
+  @Override public Timeout clearTimeout() {
+    return delegate.clearTimeout();
+  }
+
+  @Override public Timeout clearDeadline() {
+    return delegate.clearDeadline();
+  }
+
+  @Override public void throwIfReached() throws IOException {
+    delegate.throwIfReached();
+  }
+}
diff --git a/okio/okio/src/main/java/okio/Timeout.java b/okio/okio/src/main/java/okio/Timeout.java
index ce220ec..6f539f9 100644
--- a/okio/okio/src/main/java/okio/Timeout.java
+++ b/okio/okio/src/main/java/okio/Timeout.java
@@ -136,18 +136,17 @@
   }
 
   /**
-   * Throws an {@link IOException} if the deadline has been reached or if the
-   * current thread has been interrupted. This method doesn't detect timeouts;
-   * that should be implemented to asynchronously abort an in-progress
-   * operation.
+   * Throws an {@link InterruptedIOException} if the deadline has been reached or if the current
+   * thread has been interrupted. This method doesn't detect timeouts; that should be implemented to
+   * asynchronously abort an in-progress operation.
    */
   public void throwIfReached() throws IOException {
     if (Thread.interrupted()) {
-      throw new InterruptedIOException();
+      throw new InterruptedIOException("thread interrupted");
     }
 
-    if (hasDeadline && System.nanoTime() > deadlineNanoTime) {
-      throw new IOException("deadline reached");
+    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
+      throw new InterruptedIOException("deadline reached");
     }
   }
 }