blob: 61503f0e235059ce7aede2be0e378d18979d8569 [file] [log] [blame]
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http.websocket;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import static jdk.internal.net.http.websocket.MessageQueue.effectiveCapacityOf;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
/*
* A unit test for MessageQueue. The test is aware of the details of the queue
* implementation.
*/
public class MessageQueueTest {
private static final Random r = new SecureRandom();
@DataProvider(name = "illegalCapacities")
public static Object[][] illegalCapacities() {
return new Object[][]{
new Object[]{Integer.MIN_VALUE},
new Object[]{-2},
new Object[]{-1},
new Object[]{ 0},
};
}
@Test(dataProvider = "illegalCapacities")
public void illegalCapacity(int n) {
assertThrows(IllegalArgumentException.class, () -> new MessageQueue(n));
}
@Test(dataProvider = "capacities")
public void emptiness(int n) {
assertTrue(new MessageQueue(n).isEmpty());
}
@Test(dataProvider = "capacities")
public void fullness(int n) throws IOException {
MessageQueue q = new MessageQueue(n);
int cap = effectiveCapacityOf(n);
Adder adder = new Adder();
Queue<Message> referenceQueue = new LinkedList<>();
for (int i = 0; i < cap; i++) {
Message m = createRandomMessage();
referenceQueue.add(m);
adder.add(q, m);
}
for (int i = 0; i < cap + 1; i++) {
Message m = createRandomMessage();
assertThrows(IOException.class, () -> adder.add(q, m));
}
for (int i = 0; i < cap; i++) {
Message expected = referenceQueue.remove();
Message actual = new Remover().removeFrom(q);
assertEquals(actual, expected);
}
}
private Message createRandomMessage() {
Message.Type[] values = Message.Type.values();
Message.Type type = values[r.nextInt(values.length)];
Supplier<? extends ByteBuffer> binarySupplier = null;
ByteBuffer binary = null;
CharBuffer text = null;
boolean isLast = false;
int statusCode = -1;
switch (type) {
case TEXT:
text = CharBuffer.allocate(r.nextInt(17));
isLast = r.nextBoolean();
break;
case BINARY:
binary = ByteBuffer.allocate(r.nextInt(19));
isLast = r.nextBoolean();
break;
case PING:
binary = ByteBuffer.allocate(r.nextInt(19));
break;
case PONG:
if (r.nextBoolean()) {
binary = ByteBuffer.allocate(r.nextInt(19));
} else {
binarySupplier = () -> ByteBuffer.allocate(r.nextInt(19));
}
break;
case CLOSE:
text = CharBuffer.allocate(r.nextInt(17));
statusCode = r.nextInt();
break;
default:
throw new AssertionError();
}
BiConsumer<Integer, Throwable> action = new BiConsumer<>() {
@Override
public void accept(Integer o, Throwable throwable) { }
};
CompletableFuture<Integer> future = new CompletableFuture<>();
return new Message(type, binarySupplier, binary, text, isLast, statusCode, r.nextInt(),
action, future);
}
@Test(dataProvider = "capacities")
public void caterpillarWalk(int n) throws IOException {
// System.out.println("n: " + n);
int cap = effectiveCapacityOf(n);
for (int p = 1; p <= cap; p++) { // pace
// System.out.println(" pace: " + p);
MessageQueue q = new MessageQueue(n);
Queue<Message> referenceQueue = new LinkedList<>();
Adder adder = new Adder();
for (int k = 0; k < (cap / p) + 1; k++) {
// System.out.println(" cycle: " + k);
for (int i = 0; i < p; i++) {
Message m = createRandomMessage();
referenceQueue.add(m);
adder.add(q, m);
}
Remover remover = new Remover();
for (int i = 0; i < p; i++) {
Message expected = referenceQueue.remove();
Message actual = remover.removeFrom(q);
assertEquals(actual, expected);
}
assertTrue(q.isEmpty());
}
}
}
/* Exercises only concurrent additions */
@Test
public void halfConcurrency() throws ExecutionException, InterruptedException {
int n = Runtime.getRuntime().availableProcessors() + 2;
ExecutorService executorService = Executors.newFixedThreadPool(n);
CyclicBarrier start = new CyclicBarrier(n);
Adder adder = new Adder();
List<Future<?>> futures = new ArrayList<>(n);
try {
for (int k = 0; k < 1024; k++) {
MessageQueue q = new MessageQueue(n);
for (int i = 0; i < n; i++) {
Message m = createRandomMessage();
Future<Void> f = executorService.submit(() -> {
start.await();
adder.add(q, m);
return null;
});
futures.add(f);
}
for (Future<?> f : futures) {
f.get(); // Just to check for exceptions
}
futures.clear();
}
} finally {
executorService.shutdownNow();
}
}
// TODO: same message; different messages; a mix thereof
@Test
public void concurrency() throws ExecutionException, InterruptedException {
int nProducers = Runtime.getRuntime().availableProcessors() + 2;
int nThreads = nProducers + 1;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
CyclicBarrier start = new CyclicBarrier(nThreads);
MessageQueue q = new MessageQueue(nProducers);
Adder adder = new Adder();
Remover remover = new Remover();
List<Message> expectedList = new ArrayList<>(nProducers);
List<Message> actualList = new ArrayList<>(nProducers);
List<Future<?>> futures = new ArrayList<>(nProducers);
try {
for (int k = 0; k < 1024; k++) {
for (int i = 0; i < nProducers; i++) {
Message m = createRandomMessage();
expectedList.add(m);
Future<Void> f = executorService.submit(() -> {
start.await();
adder.add(q, m);
return null;
});
futures.add(f);
}
Future<Void> consumer = executorService.submit(() -> {
int i = 0;
start.await();
while (i < nProducers) {
Message m = remover.removeFrom(q);
if (m != null) {
actualList.add(m);
i++;
}
}
return null;
});
for (Future<?> f : futures) {
f.get(); // Just to check for exceptions
}
consumer.get(); // Waiting for consumer to collect all the messages
assertEquals(actualList.size(), expectedList.size());
for (Message m : expectedList) {
assertTrue(actualList.remove(m));
}
assertTrue(actualList.isEmpty());
assertTrue(q.isEmpty());
expectedList.clear();
futures.clear();
}
} finally {
executorService.shutdownNow();
}
}
@Test(dataProvider = "capacities")
public void testSingleThreaded(int n) throws IOException {
Queue<Message> referenceQueue = new LinkedList<>();
MessageQueue q = new MessageQueue(n);
int cap = effectiveCapacityOf(n);
Adder adder = new Adder();
for (int i = 0; i < cap; i++) {
Message m = createRandomMessage();
referenceQueue.add(m);
adder.add(q, m);
}
for (int i = 0; i < cap; i++) {
Message expected = referenceQueue.remove();
Message actual = new Remover().removeFrom(q);
assertEquals(actual, expected);
}
assertTrue(q.isEmpty());
}
@DataProvider(name = "capacities")
public Object[][] capacities() {
return new Object[][]{
new Object[]{ 1},
new Object[]{ 2},
new Object[]{ 3},
new Object[]{ 4},
new Object[]{ 5},
new Object[]{ 6},
new Object[]{ 7},
new Object[]{ 8},
new Object[]{ 9},
new Object[]{128},
new Object[]{256},
};
}
// -- auxiliary test infrastructure --
static class Adder {
@SuppressWarnings("unchecked")
void add(MessageQueue q, Message m) throws IOException {
switch (m.type) {
case TEXT:
q.addText(m.text, m.isLast, m.attachment, m.action, m.future);
break;
case BINARY:
q.addBinary(m.binary, m.isLast, m.attachment, m.action, m.future);
break;
case PING:
q.addPing(m.binary, m.attachment, m.action, m.future);
break;
case PONG:
if (m.binarySupplier != null) {
q.addPong(m.binarySupplier, m.attachment, m.action, m.future);
} else {
q.addPong(m.binary, m.attachment, m.action, m.future);
}
break;
case CLOSE:
q.addClose(m.statusCode, m.text, m.attachment, m.action, m.future);
break;
default:
throw new InternalError();
}
}
}
static class Remover {
Message removeFrom(MessageQueue q) {
Message m = q.peek(new MessageQueue.QueueCallback<>() {
boolean called;
@Override
public <T> Message onText(CharBuffer message,
boolean isLast,
T attachment,
BiConsumer<? super T, ? super Throwable> action,
CompletableFuture<? super T> future) {
assertFalse(called);
called = true;
return new Message(Message.Type.TEXT, null, null, message, isLast,
-1, attachment, action, future);
}
@Override
public <T> Message onBinary(ByteBuffer message,
boolean isLast,
T attachment,
BiConsumer<? super T, ? super Throwable> action,
CompletableFuture<? super T> future) {
assertFalse(called);
called = true;
return new Message(Message.Type.BINARY, null, message, null, isLast,
-1, attachment, action, future);
}
@Override
public <T> Message onPing(ByteBuffer message,
T attachment,
BiConsumer<? super T, ? super Throwable> action,
CompletableFuture<? super T> future) {
assertFalse(called);
called = true;
return new Message(Message.Type.PING, null, message, null, false,
-1, attachment, action, future);
}
@Override
public <T> Message onPong(ByteBuffer message,
T attachment,
BiConsumer<? super T, ? super Throwable> action,
CompletableFuture<? super T> future) {
assertFalse(called);
called = true;
return new Message(Message.Type.PONG, null, message, null, false,
-1, attachment, action, future);
}
@Override
public <T> Message onPong(Supplier<? extends ByteBuffer> message,
T attachment,
BiConsumer<? super T, ? super Throwable> action,
CompletableFuture<? super T> future) {
assertFalse(called);
called = true;
return new Message(Message.Type.PONG, message, null, null, false,
-1, attachment, action, future);
}
@Override
public <T> Message onClose(int statusCode,
CharBuffer reason,
T attachment,
BiConsumer<? super T, ? super Throwable> action,
CompletableFuture<? super T> future) {
assertFalse(called);
called = true;
return new Message(Message.Type.CLOSE, null, null, reason, false,
statusCode, attachment, action, future);
}
@Override
public Message onEmpty() throws RuntimeException {
return null;
}
});
if (m != null) {
q.remove();
}
return m;
}
}
static class Message {
private final Type type;
private final Supplier<? extends ByteBuffer> binarySupplier;
private final ByteBuffer binary;
private final CharBuffer text;
private final boolean isLast;
private final int statusCode;
private final Object attachment;
@SuppressWarnings("rawtypes")
private final BiConsumer action;
@SuppressWarnings("rawtypes")
private final CompletableFuture future;
<T> Message(Type type,
Supplier<? extends ByteBuffer> binarySupplier,
ByteBuffer binary,
CharBuffer text,
boolean isLast,
int statusCode,
T attachment,
BiConsumer<? super T, ? super Throwable> action,
CompletableFuture<? super T> future) {
this.type = type;
this.binarySupplier = binarySupplier;
this.binary = binary;
this.text = text;
this.isLast = isLast;
this.statusCode = statusCode;
this.attachment = attachment;
this.action = action;
this.future = future;
}
@Override
public int hashCode() {
return Objects.hash(type, binarySupplier, binary, text, isLast, statusCode, attachment, action, future);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message = (Message) o;
return isLast == message.isLast &&
statusCode == message.statusCode &&
type == message.type &&
Objects.equals(binarySupplier, message.binarySupplier) &&
Objects.equals(binary, message.binary) &&
Objects.equals(text, message.text) &&
Objects.equals(attachment, message.attachment) &&
Objects.equals(action, message.action) &&
Objects.equals(future, message.future);
}
enum Type {
TEXT,
BINARY,
PING,
PONG,
CLOSE
}
}
}