blob: 74d1445fc29701eaacdd63ad8fb7dd4e24a516c0 [file] [log] [blame]
/*
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.incubator.http.internal.websocket;
import jdk.incubator.http.WebSocket;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.Pair;
import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
import java.io.IOException;
import java.net.ProtocolException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static jdk.incubator.http.internal.common.Pair.pair;
import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
/*
* A WebSocket client.
*/
final class WebSocketImpl implements WebSocket {
private final URI uri;
private final String subprotocol;
private final RawChannel channel;
private final Listener listener;
/*
* Whether or not Listener.onClose or Listener.onError has been already
* invoked. We keep track of this since only one of these methods is invoked
* and it is invoked at most once.
*/
private boolean lastMethodInvoked;
private final AtomicBoolean outstandingSend = new AtomicBoolean();
private final CooperativeHandler sendHandler =
new CooperativeHandler(this::sendFirst);
private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
queue = new ConcurrentLinkedQueue<>();
private final Context context = new OutgoingMessage.Context();
private final Transmitter transmitter;
private final Receiver receiver;
/*
* Whether or not the WebSocket has been closed. When a WebSocket has been
* closed it means that no further messages can be sent or received.
* A closure can be triggered by:
*
* 1. abort()
* 2. "Failing the WebSocket Connection" (i.e. a fatal error)
* 3. Completion of the Closing handshake
*/
private final AtomicBoolean closed = new AtomicBoolean();
/*
* This lock is enforcing sequential ordering of invocations to listener's
* methods. It is supposed to be uncontended. The only contention that can
* happen is when onOpen, an asynchronous onError (not related to reading
* from the channel, e.g. an error from automatic Pong reply) or onClose
* (related to abort) happens. Since all of the above are one-shot actions,
* the said contention is insignificant.
*/
private final Object lock = new Object();
private final CompletableFuture<?> closeReceived = new CompletableFuture<>();
private final CompletableFuture<?> closeSent = new CompletableFuture<>();
static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
Function<Result, WebSocket> newWebSocket = r -> {
WebSocketImpl ws = new WebSocketImpl(b.getUri(),
r.subprotocol,
r.channel,
b.getListener());
// The order of calls might cause a subtle effects, like CF will be
// returned from the buildAsync _after_ onOpen has been signalled.
// This means if onOpen is lengthy, it might cause some problems.
ws.signalOpen();
return ws;
};
OpeningHandshake h;
try {
h = new OpeningHandshake(b);
} catch (IllegalArgumentException e) {
return failedFuture(e);
}
return h.send().thenApply(newWebSocket);
}
WebSocketImpl(URI uri,
String subprotocol,
RawChannel channel,
Listener listener)
{
this.uri = requireNonNull(uri);
this.subprotocol = requireNonNull(subprotocol);
this.channel = requireNonNull(channel);
this.listener = requireNonNull(listener);
this.transmitter = new Transmitter(channel);
this.receiver = new Receiver(messageConsumerOf(listener), channel);
// Set up the Closing Handshake action
CompletableFuture.allOf(closeReceived, closeSent)
.whenComplete((result, error) -> {
try {
channel.close();
} catch (IOException e) {
Log.logError(e);
} finally {
closed.set(true);
}
});
}
/*
* This initialisation is outside of the constructor for the sake of
* safe publication.
*/
private void signalOpen() {
synchronized (lock) {
// TODO: might hold lock longer than needed causing prolonged
// contention? substitute lock for ConcurrentLinkedQueue<Runnable>?
try {
listener.onOpen(this);
} catch (Exception e) {
signalError(e);
}
}
}
private void signalError(Throwable error) {
synchronized (lock) {
if (lastMethodInvoked) {
Log.logError(error);
} else {
lastMethodInvoked = true;
receiver.close();
try {
listener.onError(this, error);
} catch (Exception e) {
Log.logError(e);
}
}
}
}
/*
* Processes a Close event that came from the channel. Invoked at most once.
*/
private void processClose(int statusCode, String reason) {
receiver.close();
try {
channel.shutdownInput();
} catch (IOException e) {
Log.logError(e);
}
boolean alreadyCompleted = !closeReceived.complete(null);
if (alreadyCompleted) {
// This CF is supposed to be completed only once, the first time a
// Close message is received. No further messages are pulled from
// the socket.
throw new InternalError();
}
int code;
if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
code = NORMAL_CLOSURE;
} else {
code = statusCode;
}
CompletionStage<?> readyToClose = signalClose(statusCode, reason);
if (readyToClose == null) {
readyToClose = CompletableFuture.completedFuture(null);
}
readyToClose.whenComplete((r, error) -> {
enqueueClose(new Close(code, ""))
.whenComplete((r1, error1) -> {
if (error1 != null) {
Log.logError(error1);
}
});
});
}
/*
* Signals a Close event (might not correspond to anything happened on the
* channel, e.g. `abort()`).
*/
private CompletionStage<?> signalClose(int statusCode, String reason) {
synchronized (lock) {
if (lastMethodInvoked) {
Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
} else {
lastMethodInvoked = true;
receiver.close();
try {
return listener.onClose(this, statusCode, reason);
} catch (Exception e) {
Log.logError(e);
}
}
}
return null;
}
@Override
public CompletableFuture<WebSocket> sendText(CharSequence message,
boolean isLast)
{
return enqueueExclusively(new Text(message, isLast));
}
@Override
public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
boolean isLast)
{
return enqueueExclusively(new Binary(message, isLast));
}
@Override
public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
return enqueueExclusively(new Ping(message));
}
@Override
public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
return enqueueExclusively(new Pong(message));
}
@Override
public CompletableFuture<WebSocket> sendClose(int statusCode,
String reason) {
if (!isLegalToSendFromClient(statusCode)) {
return failedFuture(
new IllegalArgumentException("statusCode: " + statusCode));
}
Close msg;
try {
msg = new Close(statusCode, reason);
} catch (IllegalArgumentException e) {
return failedFuture(e);
}
return enqueueClose(msg);
}
/*
* Sends a Close message with the given contents and then shuts down the
* channel for writing since no more messages are expected to be sent after
* this. Invoked at most once.
*/
private CompletableFuture<WebSocket> enqueueClose(Close m) {
return enqueue(m).whenComplete((r, error) -> {
try {
channel.shutdownOutput();
} catch (IOException e) {
Log.logError(e);
}
boolean alreadyCompleted = !closeSent.complete(null);
if (alreadyCompleted) {
// Shouldn't happen as this callback must run at most once
throw new InternalError();
}
});
}
/*
* Accepts the given message into the outgoing queue in a mutually-exclusive
* fashion in respect to other messages accepted through this method. No
* further messages will be accepted until the returned CompletableFuture
* completes. This method is used to enforce "one outstanding send
* operation" policy.
*/
private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m)
{
if (closed.get()) {
return failedFuture(new IllegalStateException("Closed"));
}
if (!outstandingSend.compareAndSet(false, true)) {
return failedFuture(new IllegalStateException("Outstanding send"));
}
return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false));
}
private CompletableFuture<WebSocket> enqueue(OutgoingMessage m) {
CompletableFuture<WebSocket> cf = new CompletableFuture<>();
boolean added = queue.add(pair(m, cf));
if (!added) {
// The queue is supposed to be unbounded
throw new InternalError();
}
sendHandler.handle();
return cf;
}
/*
* This is the main sending method. It may be run in different threads,
* but never concurrently.
*/
private void sendFirst(Runnable whenSent) {
Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
if (p == null) {
whenSent.run();
return;
}
OutgoingMessage message = p.first;
CompletableFuture<WebSocket> cf = p.second;
try {
message.contextualize(context);
Consumer<Exception> h = e -> {
if (e == null) {
cf.complete(WebSocketImpl.this);
} else {
cf.completeExceptionally(e);
}
sendHandler.handle();
whenSent.run();
};
transmitter.send(message, h);
} catch (Exception t) {
cf.completeExceptionally(t);
}
}
@Override
public void request(long n) {
receiver.request(n);
}
@Override
public String getSubprotocol() {
return subprotocol;
}
@Override
public boolean isClosed() {
return closed.get();
}
@Override
public void abort() throws IOException {
try {
channel.close();
} finally {
closed.set(true);
signalClose(CLOSED_ABNORMALLY, "");
}
}
@Override
public String toString() {
return super.toString()
+ "[" + (closed.get() ? "CLOSED" : "OPEN") + "]: " + uri
+ (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "");
}
private MessageStreamConsumer messageConsumerOf(Listener listener) {
// Synchronization performed here in every method is not for the sake of
// ordering invocations to this consumer, after all they are naturally
// ordered in the channel. The reason is to avoid an interference with
// any unrelated to the channel calls to onOpen, onClose and onError.
return new MessageStreamConsumer() {
@Override
public void onText(MessagePart part, CharSequence data) {
receiver.acknowledge();
synchronized (WebSocketImpl.this.lock) {
try {
listener.onText(WebSocketImpl.this, data, part);
} catch (Exception e) {
signalError(e);
}
}
}
@Override
public void onBinary(MessagePart part, ByteBuffer data) {
receiver.acknowledge();
synchronized (WebSocketImpl.this.lock) {
try {
listener.onBinary(WebSocketImpl.this, data.slice(), part);
} catch (Exception e) {
signalError(e);
}
}
}
@Override
public void onPing(ByteBuffer data) {
receiver.acknowledge();
// Let's make a full copy of this tiny data. What we want here
// is to rule out a possibility the shared data we send might be
// corrupted the by processing in the listener.
ByteBuffer slice = data.slice();
ByteBuffer copy = ByteBuffer.allocate(data.remaining())
.put(data)
.flip();
// Non-exclusive send;
CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
pongSent.whenComplete(
(r, error) -> {
if (error != null) {
WebSocketImpl.this.signalError(error);
}
}
);
synchronized (WebSocketImpl.this.lock) {
try {
listener.onPing(WebSocketImpl.this, slice);
} catch (Exception e) {
signalError(e);
}
}
}
@Override
public void onPong(ByteBuffer data) {
receiver.acknowledge();
synchronized (WebSocketImpl.this.lock) {
try {
listener.onPong(WebSocketImpl.this, data.slice());
} catch (Exception e) {
signalError(e);
}
}
}
@Override
public void onClose(int statusCode, CharSequence reason) {
receiver.acknowledge();
processClose(statusCode, reason.toString());
}
@Override
public void onError(Exception error) {
// An signalError doesn't necessarily mean we must signalClose
// the WebSocket. However, if it's something the WebSocket
// Specification recognizes as a reason for "Failing the
// WebSocket Connection", then we must do so, but BEFORE
// notifying the Listener.
if (!(error instanceof FailWebSocketException)) {
signalError(error);
} else {
Exception ex = (Exception) new ProtocolException().initCause(error);
int code = ((FailWebSocketException) error).getStatusCode();
enqueueClose(new Close(code, ""))
.whenComplete((r, e) -> {
if (e != null) {
ex.addSuppressed(e);
}
try {
channel.close();
} catch (IOException e1) {
ex.addSuppressed(e1);
} finally {
closed.set(true);
}
signalError(ex);
});
}
}
@Override
public void onComplete() {
processClose(CLOSED_ABNORMALLY, "");
}
};
}
}