8156742: Miscellaneous WebSocket API improvements

Reviewed-by: chegar, rriggs
diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WS.java b/jdk/src/java.httpclient/share/classes/java/net/http/WS.java
index 7d51f64..ac327a4 100644
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WS.java
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WS.java
@@ -34,7 +34,6 @@
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import java.util.stream.Stream;
 
 import static java.lang.System.Logger.Level.ERROR;
 import static java.lang.System.Logger.Level.WARNING;
@@ -104,15 +103,6 @@
     }
 
     @Override
-    public CompletableFuture<WebSocket> sendText(Stream<? extends CharSequence> message) {
-        requireNonNull(message, "message");
-        synchronized (stateLock) {
-            checkState();
-            return transmitter.sendText(message);
-        }
-    }
-
-    @Override
     public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
         requireNonNull(message, "message");
         synchronized (stateLock) {
@@ -179,11 +169,11 @@
     }
 
     @Override
-    public long request(long n) {
+    public void request(long n) {
         if (n < 0L) {
             throw new IllegalArgumentException("The number must not be negative: " + n);
         }
-        return receiver.request(n);
+        receiver.request(n);
     }
 
     @Override
diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WSBuilder.java b/jdk/src/java.httpclient/share/classes/java/net/http/WSBuilder.java
index 90d2d79..42a166e 100644
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WSBuilder.java
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSBuilder.java
@@ -25,6 +25,7 @@
 package java.net.http;
 
 import java.net.URI;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -59,8 +60,7 @@
     private final LinkedHashMap<String, List<String>> headers = new LinkedHashMap<>();
     private final WebSocket.Listener listener;
     private Collection<String> subprotocols = Collections.emptyList();
-    private long timeout;
-    private TimeUnit timeUnit;
+    private Duration timeout;
 
     WSBuilder(URI uri, HttpClient client, WebSocket.Listener listener) {
         checkURI(requireNonNull(uri, "uri"));
@@ -93,13 +93,8 @@
     }
 
     @Override
-    public WebSocket.Builder connectTimeout(long timeout, TimeUnit unit) {
-        if (timeout < 0) {
-            throw new IllegalArgumentException("Negative timeout: " + timeout);
-        }
-        requireNonNull(unit, "unit");
-        this.timeout = timeout;
-        this.timeUnit = unit;
+    public WebSocket.Builder connectTimeout(Duration timeout) {
+        this.timeout = requireNonNull(timeout, "timeout");
         return this;
     }
 
@@ -139,9 +134,7 @@
         return new ArrayList<>(subprotocols);
     }
 
-    long getTimeout() { return timeout; }
-
-    TimeUnit getTimeUnit() { return timeUnit; }
+    Duration getConnectTimeout() { return timeout; }
 
     private static Collection<String> checkSubprotocols(String mostPreferred,
                                                         String... lesserPreferred) {
diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WSFrameConsumer.java b/jdk/src/java.httpclient/share/classes/java/net/http/WSFrameConsumer.java
index 9d0521e..eebfce6 100644
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WSFrameConsumer.java
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSFrameConsumer.java
@@ -206,7 +206,7 @@
                 boolean binaryNonEmpty = data.hasRemaining();
                 WSShared<CharBuffer> textData;
                 try {
-                    textData = decoder.decode(data, part.isLast());
+                    textData = decoder.decode(data, part == MessagePart.WHOLE || part == MessagePart.LAST);
                 } catch (CharacterCodingException e) {
                     throw new WSProtocolException
                             ("5.6.", "Invalid UTF-8 sequence in frame " + opcode, NOT_CONSISTENT, e);
diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WSMessageSender.java b/jdk/src/java.httpclient/share/classes/java/net/http/WSMessageSender.java
index 95a9b90..46f8060 100644
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WSMessageSender.java
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSMessageSender.java
@@ -30,7 +30,6 @@
 import java.net.http.WSOutgoingMessage.Close;
 import java.net.http.WSOutgoingMessage.Ping;
 import java.net.http.WSOutgoingMessage.Pong;
-import java.net.http.WSOutgoingMessage.StreamedText;
 import java.net.http.WSOutgoingMessage.Text;
 import java.net.http.WSOutgoingMessage.Visitor;
 import java.nio.ByteBuffer;
@@ -123,11 +122,6 @@
         }
 
         @Override
-        public void visit(StreamedText streamedText) {
-            throw new IllegalArgumentException("Not yet implemented");
-        }
-
-        @Override
         public void visit(Binary message) {
             buffers[1] = message.bytes;
             int mask = random.nextInt();
diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WSOpeningHandshake.java b/jdk/src/java.httpclient/share/classes/java/net/http/WSOpeningHandshake.java
index d3cc7da..9d0a9c4 100644
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WSOpeningHandshake.java
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSOpeningHandshake.java
@@ -32,11 +32,14 @@
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -77,8 +80,12 @@
     WSOpeningHandshake(WSBuilder b) {
         URI httpURI = createHttpUri(b.getUri());
         HttpRequest.Builder requestBuilder = b.getClient().request(httpURI);
-        if (b.getTimeUnit() != null) {
-            requestBuilder.timeout(b.getTimeUnit(), b.getTimeout());
+        Duration connectTimeout = b.getConnectTimeout();
+        if (connectTimeout != null) {
+            requestBuilder.timeout(
+                    TimeUnit.of(ChronoUnit.MILLIS),
+                    connectTimeout.get(ChronoUnit.MILLIS)
+            );
         }
         Collection<String> s = b.getSubprotocols();
         if (!s.isEmpty()) {
diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WSOutgoingMessage.java b/jdk/src/java.httpclient/share/classes/java/net/http/WSOutgoingMessage.java
index bc3eabd..34e91f8 100644
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WSOutgoingMessage.java
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSOutgoingMessage.java
@@ -25,13 +25,11 @@
 package java.net.http;
 
 import java.nio.ByteBuffer;
-import java.util.stream.Stream;
 
 abstract class WSOutgoingMessage {
 
     interface Visitor {
         void visit(Text message);
-        void visit(StreamedText message);
         void visit(Binary message);
         void visit(Ping message);
         void visit(Pong message);
@@ -64,25 +62,6 @@
         }
     }
 
-    static final class StreamedText extends WSOutgoingMessage {
-
-        public final Stream<? extends CharSequence> characters;
-
-        StreamedText(Stream<? extends CharSequence> characters) {
-            this.characters = characters;
-        }
-
-        @Override
-        void accept(Visitor visitor) {
-            visitor.visit(this);
-        }
-
-        @Override
-        public String toString() {
-            return WSUtils.toStringSimple(this) + "[characters=" + characters + "]";
-        }
-    }
-
     static final class Binary extends WSOutgoingMessage {
 
         public final boolean isLast;
diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WSReceiver.java b/jdk/src/java.httpclient/share/classes/java/net/http/WSReceiver.java
index 7ce089b..9b4da82 100644
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WSReceiver.java
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSReceiver.java
@@ -101,11 +101,10 @@
         }
     }
 
-    long request(long n) {
+    void request(long n) {
         long newDemand = demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
         handler.signal();
         assert newDemand >= 0 : newDemand;
-        return newDemand;
     }
 
     private boolean getData() throws IOException {
diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java b/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java
index a9a8287..62219d0 100644
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java
@@ -28,7 +28,6 @@
 import java.net.http.WSOutgoingMessage.Close;
 import java.net.http.WSOutgoingMessage.Ping;
 import java.net.http.WSOutgoingMessage.Pong;
-import java.net.http.WSOutgoingMessage.StreamedText;
 import java.net.http.WSOutgoingMessage.Text;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
@@ -40,7 +39,6 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
-import java.util.stream.Stream;
 
 import static java.lang.String.format;
 import static java.net.http.Pair.pair;
@@ -83,11 +81,6 @@
         return acceptMessage(new Text(isLast, message));
     }
 
-    CompletableFuture<WebSocket> sendText(Stream<? extends CharSequence> message) {
-        checkAndUpdateText(true);
-        return acceptMessage(new StreamedText(message));
-    }
-
     CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
         checkAndUpdateBinary(isLast);
         return acceptMessage(new Binary(isLast, message));
diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java b/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java
index dfc7f71..6d14525 100644
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java
@@ -28,13 +28,11 @@
 import java.net.ProtocolException;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 /**
  * A WebSocket client conforming to RFC&nbsp;6455.
@@ -47,8 +45,9 @@
  * receive messages. When the {@code WebSocket} is no longer
  * needed it must be closed: a Close message must both be {@linkplain
  * #sendClose() sent} and {@linkplain Listener#onClose(WebSocket, Optional,
- * String) received}. Or to close abruptly, {@link #abort()} is called. Once
- * closed it remains closed, cannot be reopened.
+ * String) received}. Otherwise, invoke {@link #abort() abort} to close abruptly.
+ *
+ * <p> Once closed the {@code WebSocket} remains closed and cannot be reopened.
  *
  * <p> Messages of type {@code X} are sent through the {@code WebSocket.sendX}
  * methods and received through {@link WebSocket.Listener}{@code .onX} methods
@@ -71,10 +70,6 @@
  * arranged from the {@code buffer}'s {@link ByteBuffer#position() position} to
  * the {@code buffer}'s {@link ByteBuffer#limit() limit}.
  *
- * <p> All message exchange is run by the threads belonging to the {@linkplain
- * HttpClient#executorService() executor service} of {@code WebSocket}'s {@link
- * HttpClient}.
- *
  * <p> Unless otherwise noted, passing a {@code null} argument to a constructor
  * or method of this type will cause a {@link NullPointerException
  * NullPointerException} to be thrown.
@@ -217,22 +212,17 @@
          * Sets a timeout for the opening handshake.
          *
          * <p> If the opening handshake is not finished within the specified
-         * timeout then {@link #buildAsync()} completes exceptionally with a
-         * {@code HttpTimeoutException}.
+         * amount of time then {@link #buildAsync()} completes exceptionally
+         * with a {@code HttpTimeoutException}.
          *
-         * <p> If the timeout is not specified then it's deemed infinite.
+         * <p> If this method is not invoked then the timeout is deemed infinite.
          *
          * @param timeout
-         *         the maximum time to wait
-         * @param unit
-         *         the time unit of the timeout argument
+         *         the timeout
          *
          * @return this builder
-         *
-         * @throws IllegalArgumentException
-         *         if the {@code timeout} is negative
          */
-        Builder connectTimeout(long timeout, TimeUnit unit);
+        Builder connectTimeout(Duration timeout);
 
         /**
          * Builds a {@code WebSocket}.
@@ -506,7 +496,7 @@
          * <p> Once a Close message is received, the server will not send any
          * more messages.
          *
-         * <p> A Close message may consist of a close code and a reason for
+         * <p> A Close message may consist of a status code and a reason for
          * closing. The reason will have a UTF-8 representation not longer than
          * {@code 123} bytes. The reason may be useful for debugging or passing
          * information relevant to the connection but is not necessarily human
@@ -545,12 +535,8 @@
          * the time {@code onError} is invoked, no more messages can be sent on
          * this {@code WebSocket}.
          *
-         * @apiNote Errors associated with send operations ({@link
-         * WebSocket#sendText(CharSequence, boolean) sendText}, {@link
-         * #sendBinary(ByteBuffer, boolean) sendBinary}, {@link
-         * #sendPing(ByteBuffer) sendPing}, {@link #sendPong(ByteBuffer)
-         * sendPong} and {@link #sendClose(CloseCode, CharSequence) sendClose})
-         * are reported to the {@code CompletionStage} operations return.
+         * @apiNote Errors associated with {@code sendX} methods are reported to
+         * the {@code CompletableFuture} these methods return.
          *
          * @implSpec The default implementation does nothing.
          *
@@ -563,8 +549,8 @@
     }
 
     /**
-     * A marker used by {@link WebSocket.Listener} for partial message
-     * receiving.
+     * A marker used by {@link WebSocket.Listener} in cases where a partial
+     * message may be received.
      *
      * @since 9
      */
@@ -586,19 +572,9 @@
         LAST,
 
         /**
-         * A whole message. The message consists of a single part.
+         * A whole message consisting of a single part.
          */
-        WHOLE;
-
-        /**
-         * Tells whether a part of a message received with this marker is the
-         * last part.
-         *
-         * @return {@code true} if LAST or WHOLE, {@code false} otherwise
-         */
-        public boolean isLast() {
-            return this == LAST || this == WHOLE;
-        }
+        WHOLE
     }
 
     /**
@@ -630,7 +606,7 @@
      * @param message
      *         the message
      * @param isLast
-     *         {@code true} if this is the final part of the message,
+     *         {@code true} if this is the last part of the message,
      *         {@code false} otherwise
      *
      * @return a CompletableFuture with this WebSocket
@@ -679,43 +655,6 @@
     }
 
     /**
-     * Sends a whole Text message with characters from {@code
-     * CharacterSequence}s provided by the given {@code Stream}.
-     *
-     * <p> This is a convenience method. For the general case use {@link
-     * #sendText(CharSequence, boolean)}.
-     *
-     * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
-     * normally when the message has been sent or completes exceptionally if an
-     * error occurs.
-     *
-     * <p> Streamed character sequences should not be modified until the
-     * returned {@code CompletableFuture} completes (either normally or
-     * exceptionally).
-     *
-     * <p> The returned {@code CompletableFuture} can complete exceptionally
-     * with:
-     * <ul>
-     * <li> {@link IOException}
-     *          if an I/O error occurs during this operation
-     * <li> {@link IllegalStateException}
-     *          if the {@code WebSocket} closes while this operation is in progress;
-     *          or if a Close message has been sent already;
-     *          or if there is an outstanding send operation;
-     *          or if a previous Binary message was not sent with {@code isLast == true}
-     * </ul>
-     *
-     * @param message
-     *         the message
-     *
-     * @return a CompletableFuture with this WebSocket
-     *
-     * @throws IllegalArgumentException
-     *         if {@code message} is a malformed (or an incomplete) UTF-16 sequence
-     */
-    CompletableFuture<WebSocket> sendText(Stream<? extends CharSequence> message);
-
-    /**
      * Sends a Binary message with bytes from the given {@code ByteBuffer}.
      *
      * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
@@ -737,7 +676,7 @@
      * @param message
      *         the message
      * @param isLast
-     *         {@code true} if this is the final part of the message,
+     *         {@code true} if this is the last part of the message,
      *         {@code false} otherwise
      *
      * @return a CompletableFuture with this WebSocket
@@ -745,43 +684,6 @@
     CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast);
 
     /**
-     * Sends a Binary message with bytes from the given {@code byte[]}.
-     *
-     * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
-     * normally when the message has been sent or completes exceptionally if an
-     * error occurs.
-     *
-     * <p> The returned {@code CompletableFuture} can complete exceptionally
-     * with:
-     * <ul>
-     * <li> {@link IOException}
-     *          if an I/O error occurs during this operation
-     * <li> {@link IllegalStateException}
-     *          if the {@code WebSocket} closes while this operation is in progress;
-     *          or if a Close message has been sent already;
-     *          or if there is an outstanding send operation;
-     *          or if a previous Text message was not sent with {@code isLast == true}
-     * </ul>
-     *
-     * @implSpec This is equivalent to:
-     * <pre>{@code
-     *     sendBinary(ByteBuffer.wrap(message), isLast)
-     * }</pre>
-     *
-     * @param message
-     *         the message
-     * @param isLast
-     *         {@code true} if this is the final part of the message,
-     *         {@code false} otherwise
-     *
-     * @return a CompletableFuture with this WebSocket
-     */
-    default CompletableFuture<WebSocket> sendBinary(byte[] message, boolean isLast) {
-        Objects.requireNonNull(message, "message");
-        return sendBinary(ByteBuffer.wrap(message), isLast);
-    }
-
-    /**
      * Sends a Ping message.
      *
      * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
@@ -858,10 +760,11 @@
      * normally when the message has been sent or completes exceptionally if an
      * error occurs.
      *
-     * <p> A Close message may consist of a close code and a reason for closing.
-     * The reason must have a valid UTF-8 representation not longer than {@code
-     * 123} bytes. The reason may be useful for debugging or passing information
-     * relevant to the connection but is not necessarily human readable.
+     * <p> A Close message may consist of a status code and a reason for
+     * closing. The reason must have a UTF-8 representation not longer than
+     * {@code 123} bytes. The reason may be useful for debugging or passing
+     * information relevant to the connection but is not necessarily human
+     * readable.
      *
      * <p> The returned {@code CompletableFuture} can complete exceptionally
      * with:
@@ -910,24 +813,21 @@
     CompletableFuture<WebSocket> sendClose();
 
     /**
-     * Requests {@code n} more messages to be received by the {@link Listener
+     * Allows {@code n} more messages to be received by the {@link Listener
      * Listener}.
      *
-     * <p> The actual number might be fewer if either of the endpoints decide to
-     * close the connection before that or an error occurs.
+     * <p> The actual number of received messages might be fewer if a Close
+     * message is received, the connection closes or an error occurs.
      *
      * <p> A {@code WebSocket} that has just been created, hasn't requested
      * anything yet. Usually the initial request for messages is done in {@link
      * Listener#onOpen(java.net.http.WebSocket) Listener.onOpen}.
      *
-     * If all requested messages have been received, and the server sends more,
-     * then these messages are queued.
-     *
      * @implNote This implementation does not distinguish between partial and
      * whole messages, because it's not known beforehand how a message will be
      * received.
      *
-     * <p> If a server sends more messages than requested, the implementation
+     * <p> If a server sends more messages than requested, this implementation
      * queues up these messages on the TCP connection and may eventually force
      * the sender to stop sending through TCP flow control.
      *
@@ -936,12 +836,8 @@
      *
      * @throws IllegalArgumentException
      *         if {@code n < 0}
-     *
-     * @return resulting unfulfilled demand with this request taken into account
      */
-    // TODO return void as it's breaking encapsulation (leaking info when exactly something deemed delivered)
-    // or demand behaves after LONG.MAX_VALUE
-    long request(long n);
+    void request(long n);
 
     /**
      * Returns a {@linkplain Builder#subprotocols(String, String...) subprotocol}
diff --git a/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java b/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java
index 0ee4124..89a3738 100644
--- a/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java
+++ b/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java
@@ -32,10 +32,9 @@
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.channels.SocketChannel;
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.stream.Stream;
 
 /*
  * @test
@@ -83,13 +82,7 @@
                 (ws) ->
                         TestKit.assertThrows(NullPointerException.class,
                                 "message",
-                                () -> ws.sendBinary((byte[]) null, true))
-        );
-        checkAndClose(
-                (ws) ->
-                        TestKit.assertThrows(NullPointerException.class,
-                                "message",
-                                () -> ws.sendBinary((ByteBuffer) null, true))
+                                () -> ws.sendBinary(null, true))
         );
         checkAndClose(
                 (ws) ->
@@ -125,13 +118,7 @@
                 (ws) ->
                         TestKit.assertThrows(NullPointerException.class,
                                 "message",
-                                () -> ws.sendText((CharSequence) null))
-        );
-        checkAndClose(
-                (ws) ->
-                        TestKit.assertThrows(NullPointerException.class,
-                                "message",
-                                () -> ws.sendText((Stream<? extends CharSequence>) null))
+                                () -> ws.sendText(null))
         );
         checkAndClose(
                 (ws) ->
@@ -214,17 +201,7 @@
         // FIXME: check timeout works
         // (i.e. it directly influences the time WebSocket waits for connection + opening handshake)
         TestKit.assertNotThrows(
-                () -> WebSocket.newBuilder(ws, defaultListener()).connectTimeout(1, TimeUnit.SECONDS)
-        );
-        WebSocket.Builder builder = WebSocket.newBuilder(ws, defaultListener());
-        TestKit.assertThrows(IllegalArgumentException.class,
-                "(?i).*\\bnegative\\b.*",
-                () -> builder.connectTimeout(-1, TimeUnit.SECONDS)
-        );
-        WebSocket.Builder builder1 = WebSocket.newBuilder(ws, defaultListener());
-        TestKit.assertThrows(NullPointerException.class,
-                "unit",
-                () -> builder1.connectTimeout(1, null)
+                () -> WebSocket.newBuilder(ws, defaultListener()).connectTimeout(Duration.ofSeconds(1))
         );
         // FIXME: check these headers are actually received by the server
         TestKit.assertNotThrows(