blob: d73ffdf96be3d1e0f4ee84f783265da2ff49b249 [file] [log] [blame]
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.benchmarks.qps;
import com.google.common.util.concurrent.UncaughtExceptionHandlers;
import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.benchmarks.Utils;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
import io.grpc.benchmarks.proto.Messages;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.stub.StreamObservers;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
/**
* QPS server using the non-blocking API.
*/
public class AsyncServer {
private static final Logger log = Logger.getLogger(AsyncServer.class.getName());
/**
* checkstyle complains if there is no javadoc comment here.
*/
public static void main(String... args) throws Exception {
new AsyncServer().run(args);
}
/** Equivalent of "main", but non-static. */
public void run(String[] args) throws Exception {
ServerConfiguration.Builder configBuilder = ServerConfiguration.newBuilder();
ServerConfiguration config;
try {
config = configBuilder.build(args);
} catch (Exception e) {
System.out.println(e.getMessage());
configBuilder.printUsage();
return;
}
final Server server = newServer(config);
server.start();
System.out.println("QPS Server started on " + config.address);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
System.out.println("QPS Server shutting down");
server.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
server.awaitTermination();
}
@SuppressWarnings("LiteralClassName") // Epoll is not available on windows
static Server newServer(ServerConfiguration config) throws IOException {
final EventLoopGroup boss;
final EventLoopGroup worker;
final Class<? extends ServerChannel> channelType;
ThreadFactory tf = new DefaultThreadFactory("server-elg-", true /*daemon */);
switch (config.transport) {
case NETTY_NIO: {
boss = new NioEventLoopGroup(1, tf);
worker = new NioEventLoopGroup(0, tf);
channelType = NioServerSocketChannel.class;
break;
}
case NETTY_EPOLL: {
try {
// These classes are only available on linux.
Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
@SuppressWarnings("unchecked")
Class<? extends ServerChannel> channelClass = (Class<? extends ServerChannel>)
Class.forName("io.netty.channel.epoll.EpollServerSocketChannel");
boss =
(EventLoopGroup)
(groupClass
.getConstructor(int.class, ThreadFactory.class)
.newInstance(1, tf));
worker =
(EventLoopGroup)
(groupClass
.getConstructor(int.class, ThreadFactory.class)
.newInstance(0, tf));
channelType = channelClass;
break;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
case NETTY_UNIX_DOMAIN_SOCKET: {
try {
// These classes are only available on linux.
Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
@SuppressWarnings("unchecked")
Class<? extends ServerChannel> channelClass = (Class<? extends ServerChannel>)
Class.forName("io.netty.channel.epoll.EpollServerDomainSocketChannel");
boss =
(EventLoopGroup)
(groupClass
.getConstructor(int.class, ThreadFactory.class)
.newInstance(1, tf));
worker =
(EventLoopGroup)
(groupClass
.getConstructor(int.class, ThreadFactory.class)
.newInstance(0, tf));
channelType = channelClass;
break;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
default: {
// Should never get here.
throw new IllegalArgumentException("Unsupported transport: " + config.transport);
}
}
NettyServerBuilder builder = NettyServerBuilder
.forAddress(config.address)
.bossEventLoopGroup(boss)
.workerEventLoopGroup(worker)
.channelType(channelType)
.addService(new BenchmarkServiceImpl())
.flowControlWindow(config.flowControlWindow);
if (config.tls) {
System.out.println("Using fake CA for TLS certificate.\n"
+ "Run the Java client with --tls --testca");
File cert = TestUtils.loadCert("server1.pem");
File key = TestUtils.loadCert("server1.key");
builder.useTransportSecurity(cert, key);
}
if (config.directExecutor) {
builder.directExecutor();
} else {
// TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be
// put. Move it somewhere else, or remove it if no longer necessary.
// See: https://github.com/grpc/grpc-java/issues/2119
builder.executor(new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
new ForkJoinWorkerThreadFactory() {
final AtomicInteger num = new AtomicInteger();
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setDaemon(true);
thread.setName("grpc-server-app-" + "-" + num.getAndIncrement());
return thread;
}
}, UncaughtExceptionHandlers.systemExit(), true /* async */));
}
return builder.build();
}
public static class BenchmarkServiceImpl extends BenchmarkServiceGrpc.BenchmarkServiceImplBase {
// Always use the same canned response for bidi. This is allowed by the spec.
private static final int BIDI_RESPONSE_BYTES = 100;
private static final Messages.SimpleResponse BIDI_RESPONSE = Messages.SimpleResponse
.newBuilder()
.setPayload(Messages.Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[BIDI_RESPONSE_BYTES])).build())
.build();
private final AtomicBoolean shutdown = new AtomicBoolean();
public BenchmarkServiceImpl() {
}
public void shutdown() {
shutdown.set(true);
}
@Override
public void unaryCall(Messages.SimpleRequest request,
StreamObserver<Messages.SimpleResponse> responseObserver) {
responseObserver.onNext(Utils.makeResponse(request));
responseObserver.onCompleted();
}
@Override
public StreamObserver<Messages.SimpleRequest> streamingCall(
final StreamObserver<Messages.SimpleResponse> observer) {
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// TODO(spencerfang): flow control to stop reading when !responseObserver.isReady
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(Messages.SimpleRequest value) {
if (shutdown.get()) {
responseObserver.onCompleted();
return;
}
responseObserver.onNext(Utils.makeResponse(value));
}
@Override
public void onError(Throwable t) {
// other side closed with non OK
responseObserver.onError(t);
}
@Override
public void onCompleted() {
// other side closed with OK
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Messages.SimpleRequest> streamingFromClient(
final StreamObserver<Messages.SimpleResponse> responseObserver) {
return new StreamObserver<Messages.SimpleRequest>() {
Messages.SimpleRequest lastSeen = null;
@Override
public void onNext(Messages.SimpleRequest value) {
if (shutdown.get()) {
responseObserver.onCompleted();
return;
}
lastSeen = value;
}
@Override
public void onError(Throwable t) {
// other side closed with non OK
responseObserver.onError(t);
}
@Override
public void onCompleted() {
if (lastSeen != null) {
responseObserver.onNext(Utils.makeResponse(lastSeen));
responseObserver.onCompleted();
} else {
responseObserver.onError(
Status.FAILED_PRECONDITION
.withDescription("never received any requests").asException());
}
}
};
}
@Override
public void streamingFromServer(
final Messages.SimpleRequest request,
final StreamObserver<Messages.SimpleResponse> observer) {
// send forever, until the client cancels or we shut down
final Messages.SimpleResponse response = Utils.makeResponse(request);
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// If the client cancels, copyWithFlowControl takes care of calling
// responseObserver.onCompleted() for us
StreamObservers.copyWithFlowControl(
new Iterator<Messages.SimpleResponse>() {
@Override
public boolean hasNext() {
return !shutdown.get() && !responseObserver.isCancelled();
}
@Override
public Messages.SimpleResponse next() {
return response;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
responseObserver);
}
@Override
public StreamObserver<Messages.SimpleRequest> streamingBothWays(
final StreamObserver<Messages.SimpleResponse> observer) {
// receive data forever and send data forever until client cancels or we shut down.
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// If the client cancels, copyWithFlowControl takes care of calling
// responseObserver.onCompleted() for us
StreamObservers.copyWithFlowControl(
new Iterator<Messages.SimpleResponse>() {
@Override
public boolean hasNext() {
return !shutdown.get() && !responseObserver.isCancelled();
}
@Override
public Messages.SimpleResponse next() {
return BIDI_RESPONSE;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
responseObserver
);
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(final Messages.SimpleRequest request) {
// noop
}
@Override
public void onError(Throwable t) {
// other side cancelled
}
@Override
public void onCompleted() {
// Should never happen, because clients should cancel this call in order to stop
// the operation. Also because copyWithFlowControl hogs the inbound network thread
// via the handler for onReady, we would never expect this callback to be able to
// run anyways.
log.severe("clients should CANCEL the call to stop bidi streaming");
}
};
}
}
}