blob: eebfce66e0a65e79ed0109d0178c9834ce8b8db9 [file] [log] [blame]
/*
* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.net.http;
import java.net.http.WSFrame.Opcode;
import java.net.http.WebSocket.MessagePart;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.String.format;
import static java.lang.System.Logger.Level.TRACE;
import static java.net.http.WSUtils.dump;
import static java.net.http.WSUtils.logger;
import static java.net.http.WebSocket.CloseCode.NOT_CONSISTENT;
import static java.net.http.WebSocket.CloseCode.of;
import static java.util.Objects.requireNonNull;
/*
* Consumes frame parts and notifies a message consumer, when there is
* sufficient data to produce a message, or part thereof.
*
* Data consumed but not yet translated is accumulated until it's sufficient to
* form a message.
*/
final class WSFrameConsumer implements WSFrame.Consumer {
private final AtomicInteger invocationOrder = new AtomicInteger();
private final WSMessageConsumer output;
private final WSCharsetToolkit.Decoder decoder = new WSCharsetToolkit.Decoder();
private boolean fin;
private Opcode opcode, originatingOpcode;
private MessagePart part = MessagePart.WHOLE;
private long payloadLen;
private WSShared<ByteBuffer> binaryData;
WSFrameConsumer(WSMessageConsumer output) {
this.output = requireNonNull(output);
}
@Override
public void fin(boolean value) {
assert invocationOrder.compareAndSet(0, 1) : dump(invocationOrder, value);
if (logger.isLoggable(TRACE)) {
// Checked for being loggable because of autoboxing of 'value'
logger.log(TRACE, "Reading fin: {0}", value);
}
fin = value;
}
@Override
public void rsv1(boolean value) {
assert invocationOrder.compareAndSet(1, 2) : dump(invocationOrder, value);
if (logger.isLoggable(TRACE)) {
logger.log(TRACE, "Reading rsv1: {0}", value);
}
if (value) {
throw new WSProtocolException("5.2.", "rsv1 bit is set unexpectedly");
}
}
@Override
public void rsv2(boolean value) {
assert invocationOrder.compareAndSet(2, 3) : dump(invocationOrder, value);
if (logger.isLoggable(TRACE)) {
logger.log(TRACE, "Reading rsv2: {0}", value);
}
if (value) {
throw new WSProtocolException("5.2.", "rsv2 bit is set unexpectedly");
}
}
@Override
public void rsv3(boolean value) {
assert invocationOrder.compareAndSet(3, 4) : dump(invocationOrder, value);
if (logger.isLoggable(TRACE)) {
logger.log(TRACE, "Reading rsv3: {0}", value);
}
if (value) {
throw new WSProtocolException("5.2.", "rsv3 bit is set unexpectedly");
}
}
@Override
public void opcode(Opcode v) {
assert invocationOrder.compareAndSet(4, 5) : dump(invocationOrder, v);
logger.log(TRACE, "Reading opcode: {0}", v);
if (v == Opcode.PING || v == Opcode.PONG || v == Opcode.CLOSE) {
if (!fin) {
throw new WSProtocolException("5.5.", "A fragmented control frame " + v);
}
opcode = v;
} else if (v == Opcode.TEXT || v == Opcode.BINARY) {
if (originatingOpcode != null) {
throw new WSProtocolException
("5.4.", format("An unexpected frame %s (fin=%s)", v, fin));
}
opcode = v;
if (!fin) {
originatingOpcode = v;
}
} else if (v == Opcode.CONTINUATION) {
if (originatingOpcode == null) {
throw new WSProtocolException
("5.4.", format("An unexpected frame %s (fin=%s)", v, fin));
}
opcode = v;
} else {
throw new WSProtocolException("5.2.", "An unknown opcode " + v);
}
}
@Override
public void mask(boolean value) {
assert invocationOrder.compareAndSet(5, 6) : dump(invocationOrder, value);
if (logger.isLoggable(TRACE)) {
logger.log(TRACE, "Reading mask: {0}", value);
}
if (value) {
throw new WSProtocolException
("5.1.", "Received a masked frame from the server");
}
}
@Override
public void payloadLen(long value) {
assert invocationOrder.compareAndSet(6, 7) : dump(invocationOrder, value);
if (logger.isLoggable(TRACE)) {
logger.log(TRACE, "Reading payloadLen: {0}", value);
}
if (opcode.isControl()) {
if (value > 125) {
throw new WSProtocolException
("5.5.", format("A control frame %s has a payload length of %s",
opcode, value));
}
assert Opcode.CLOSE.isControl();
if (opcode == Opcode.CLOSE && value == 1) {
throw new WSProtocolException
("5.5.1.", "A Close frame's status code is only 1 byte long");
}
}
payloadLen = value;
}
@Override
public void maskingKey(int value) {
assert false : dump(invocationOrder, value);
}
@Override
public void payloadData(WSShared<ByteBuffer> data, boolean isLast) {
assert invocationOrder.compareAndSet(7, isLast ? 8 : 7)
: dump(invocationOrder, data, isLast);
if (logger.isLoggable(TRACE)) {
logger.log(TRACE, "Reading payloadData: data={0}, isLast={1}", data, isLast);
}
if (opcode.isControl()) {
if (binaryData != null) {
binaryData.put(data);
data.dispose();
} else if (!isLast) {
// The first chunk of the message
int remaining = data.remaining();
// It shouldn't be 125, otherwise the next chunk will be of size
// 0, which is not what Reader promises to deliver (eager
// reading)
assert remaining < 125 : dump(remaining);
WSShared<ByteBuffer> b = WSShared.wrap(ByteBuffer.allocate(125)).put(data);
data.dispose();
binaryData = b; // Will be disposed by the user
} else {
// The only chunk; will be disposed by the user
binaryData = data.position(data.limit()); // FIXME: remove this hack
}
} else {
part = determinePart(isLast);
boolean text = opcode == Opcode.TEXT || originatingOpcode == Opcode.TEXT;
if (!text) {
output.onBinary(part, data);
} else {
boolean binaryNonEmpty = data.hasRemaining();
WSShared<CharBuffer> textData;
try {
textData = decoder.decode(data, part == MessagePart.WHOLE || part == MessagePart.LAST);
} catch (CharacterCodingException e) {
throw new WSProtocolException
("5.6.", "Invalid UTF-8 sequence in frame " + opcode, NOT_CONSISTENT, e);
}
if (!(binaryNonEmpty && !textData.hasRemaining())) {
// If there's a binary data, that result in no text, then we
// don't deliver anything
output.onText(part, textData);
}
}
}
}
@Override
public void endFrame() {
assert invocationOrder.compareAndSet(8, 0) : dump(invocationOrder);
if (opcode.isControl()) {
binaryData.flip();
}
switch (opcode) {
case CLOSE:
WebSocket.CloseCode cc;
String reason;
if (payloadLen == 0) {
cc = null;
reason = "";
} else {
ByteBuffer b = binaryData.buffer();
int len = b.remaining();
assert 2 <= len && len <= 125 : dump(len, payloadLen);
try {
cc = of(b.getChar());
reason = WSCharsetToolkit.decode(b).toString();
} catch (IllegalArgumentException e) {
throw new WSProtocolException
("5.5.1", "Incorrect status code", e);
} catch (CharacterCodingException e) {
throw new WSProtocolException
("5.5.1", "Close reason is a malformed UTF-8 sequence", e);
}
}
binaryData.dispose(); // Manual dispose
output.onClose(cc, reason);
break;
case PING:
output.onPing(binaryData);
binaryData = null;
break;
case PONG:
output.onPong(binaryData);
binaryData = null;
break;
default:
assert opcode == Opcode.TEXT || opcode == Opcode.BINARY
|| opcode == Opcode.CONTINUATION : dump(opcode);
if (fin) {
// It is always the last chunk:
// either TEXT(FIN=TRUE)/BINARY(FIN=TRUE) or CONT(FIN=TRUE)
originatingOpcode = null;
}
break;
}
payloadLen = 0;
opcode = null;
}
private MessagePart determinePart(boolean isLast) {
boolean lastChunk = fin && isLast;
switch (part) {
case LAST:
case WHOLE:
return lastChunk ? MessagePart.WHOLE : MessagePart.FIRST;
case FIRST:
case PART:
return lastChunk ? MessagePart.LAST : MessagePart.PART;
default:
throw new InternalError(String.valueOf(part));
}
}
}