| /* |
| * Copyright 2015 The gRPC Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.benchmarks; |
| |
| import static io.grpc.benchmarks.Utils.pickUnusedPort; |
| |
| import com.google.protobuf.ByteString; |
| import io.grpc.ManagedChannel; |
| import io.grpc.Server; |
| import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; |
| import io.grpc.benchmarks.proto.Messages.Payload; |
| import io.grpc.benchmarks.proto.Messages.SimpleRequest; |
| import io.grpc.benchmarks.proto.Messages.SimpleResponse; |
| import io.grpc.benchmarks.qps.AsyncServer; |
| import io.grpc.inprocess.InProcessChannelBuilder; |
| import io.grpc.inprocess.InProcessServerBuilder; |
| import io.grpc.internal.AbstractManagedChannelImplBuilder; |
| import io.grpc.internal.AbstractServerImplBuilder; |
| import io.grpc.netty.NegotiationType; |
| import io.grpc.netty.NettyChannelBuilder; |
| import io.grpc.netty.NettyServerBuilder; |
| import io.grpc.okhttp.OkHttpChannelBuilder; |
| import io.grpc.stub.StreamObserver; |
| import io.netty.channel.Channel; |
| import io.netty.channel.DefaultEventLoopGroup; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.channel.ServerChannel; |
| import io.netty.channel.local.LocalAddress; |
| import io.netty.channel.local.LocalChannel; |
| import io.netty.channel.local.LocalServerChannel; |
| import java.net.InetSocketAddress; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import org.openjdk.jmh.annotations.Benchmark; |
| import org.openjdk.jmh.annotations.BenchmarkMode; |
| import org.openjdk.jmh.annotations.Mode; |
| import org.openjdk.jmh.annotations.OperationsPerInvocation; |
| import org.openjdk.jmh.annotations.OutputTimeUnit; |
| import org.openjdk.jmh.annotations.Param; |
| import org.openjdk.jmh.annotations.Scope; |
| import org.openjdk.jmh.annotations.Setup; |
| import org.openjdk.jmh.annotations.State; |
| import org.openjdk.jmh.annotations.TearDown; |
| import org.openjdk.jmh.annotations.Threads; |
| |
| /** Some text. */ |
| @State(Scope.Benchmark) |
| public class TransportBenchmark { |
| public enum Transport { |
| INPROCESS, NETTY, NETTY_LOCAL, NETTY_EPOLL, OKHTTP |
| } |
| |
| @Param({"INPROCESS", "NETTY", "NETTY_LOCAL", "OKHTTP"}) |
| public Transport transport; |
| @Param({"true", "false"}) |
| public boolean direct; |
| |
| private ManagedChannel channel; |
| private Server server; |
| private BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub; |
| private BenchmarkServiceGrpc.BenchmarkServiceStub asyncStub; |
| private EventLoopGroup groupToShutdown; |
| |
| @Setup |
| @SuppressWarnings("LiteralClassName") // Epoll is not available on windows |
| public void setUp() throws Exception { |
| AbstractServerImplBuilder<?> serverBuilder; |
| AbstractManagedChannelImplBuilder<?> channelBuilder; |
| switch (transport) { |
| case INPROCESS: |
| { |
| String name = "bench" + Math.random(); |
| serverBuilder = InProcessServerBuilder.forName(name); |
| channelBuilder = InProcessChannelBuilder.forName(name); |
| break; |
| } |
| case NETTY: |
| { |
| InetSocketAddress address = new InetSocketAddress("localhost", pickUnusedPort()); |
| serverBuilder = NettyServerBuilder.forAddress(address); |
| channelBuilder = NettyChannelBuilder.forAddress(address) |
| .negotiationType(NegotiationType.PLAINTEXT); |
| break; |
| } |
| case NETTY_LOCAL: |
| { |
| String name = "bench" + Math.random(); |
| LocalAddress address = new LocalAddress(name); |
| EventLoopGroup group = new DefaultEventLoopGroup(); |
| serverBuilder = NettyServerBuilder.forAddress(address) |
| .bossEventLoopGroup(group) |
| .workerEventLoopGroup(group) |
| .channelType(LocalServerChannel.class); |
| channelBuilder = NettyChannelBuilder.forAddress(address) |
| .eventLoopGroup(group) |
| .channelType(LocalChannel.class) |
| .negotiationType(NegotiationType.PLAINTEXT); |
| groupToShutdown = group; |
| break; |
| } |
| case NETTY_EPOLL: |
| { |
| InetSocketAddress address = new InetSocketAddress("localhost", pickUnusedPort()); |
| |
| // Reflection used since they are only available on linux. |
| Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup"); |
| EventLoopGroup group = (EventLoopGroup) groupClass.getConstructor().newInstance(); |
| |
| @SuppressWarnings("unchecked") |
| Class<? extends ServerChannel> serverChannelClass = (Class<? extends ServerChannel>) |
| Class.forName("io.netty.channel.epoll.EpollServerSocketChannel"); |
| serverBuilder = NettyServerBuilder.forAddress(address) |
| .bossEventLoopGroup(group) |
| .workerEventLoopGroup(group) |
| .channelType(serverChannelClass); |
| @SuppressWarnings("unchecked") |
| Class<? extends Channel> channelClass = (Class<? extends Channel>) |
| Class.forName("io.netty.channel.epoll.EpollSocketChannel"); |
| channelBuilder = NettyChannelBuilder.forAddress(address) |
| .eventLoopGroup(group) |
| .channelType(channelClass) |
| .negotiationType(NegotiationType.PLAINTEXT); |
| groupToShutdown = group; |
| break; |
| } |
| case OKHTTP: |
| { |
| int port = pickUnusedPort(); |
| InetSocketAddress address = new InetSocketAddress("localhost", port); |
| serverBuilder = NettyServerBuilder.forAddress(address); |
| channelBuilder = OkHttpChannelBuilder.forAddress("localhost", port).usePlaintext(); |
| break; |
| } |
| default: |
| throw new Exception("Unknown transport: " + transport); |
| } |
| |
| if (direct) { |
| serverBuilder.directExecutor(); |
| // Because blocking stubs avoid the executor, this doesn't do much. |
| channelBuilder.directExecutor(); |
| } |
| |
| server = serverBuilder |
| .addService(new AsyncServer.BenchmarkServiceImpl()) |
| .build(); |
| server.start(); |
| channel = channelBuilder.build(); |
| stub = BenchmarkServiceGrpc.newBlockingStub(channel); |
| asyncStub = BenchmarkServiceGrpc.newStub(channel); |
| // Wait for channel to start |
| stub.unaryCall(SimpleRequest.getDefaultInstance()); |
| } |
| |
| @TearDown |
| public void tearDown() throws Exception { |
| channel.shutdown(); |
| server.shutdown(); |
| channel.awaitTermination(1, TimeUnit.SECONDS); |
| server.awaitTermination(1, TimeUnit.SECONDS); |
| if (!channel.isTerminated()) { |
| throw new Exception("failed to shut down channel"); |
| } |
| if (!server.isTerminated()) { |
| throw new Exception("failed to shut down server"); |
| } |
| if (groupToShutdown != null) { |
| Future<?> unused = groupToShutdown.shutdownGracefully(0, 1, TimeUnit.SECONDS); |
| groupToShutdown.awaitTermination(1, TimeUnit.SECONDS); |
| if (!groupToShutdown.isTerminated()) { |
| throw new Exception("failed to shut down event loop group."); |
| } |
| } |
| } |
| |
| private static final SimpleRequest UNARY_CALL_1024_REQUEST = SimpleRequest.newBuilder() |
| .setResponseSize(1024) |
| .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1024]))) |
| .build(); |
| |
| @Benchmark |
| @BenchmarkMode(Mode.SampleTime) |
| @OutputTimeUnit(TimeUnit.NANOSECONDS) |
| public SimpleResponse unaryCall1024Latency() { |
| return stub.unaryCall(UNARY_CALL_1024_REQUEST); |
| } |
| |
| private static final int BYTE_THROUGHPUT_RESPONSE_SIZE = 1048576; |
| private static final SimpleRequest BYTE_THROUGHPUT_REQUEST = SimpleRequest.newBuilder() |
| .setResponseSize(BYTE_THROUGHPUT_RESPONSE_SIZE) |
| .build(); |
| |
| @Benchmark |
| @BenchmarkMode(Mode.Throughput) |
| @OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE) |
| @Threads(10) |
| public SimpleResponse unaryCallsByteThroughput() { |
| return stub.unaryCall(BYTE_THROUGHPUT_REQUEST); |
| } |
| |
| private static final Throwable OK_THROWABLE = new RuntimeException("OK"); |
| |
| @State(Scope.Thread) |
| public static class PingPongStreamState { |
| private final ThreadlessExecutor executor = new ThreadlessExecutor(); |
| private StreamObserver<SimpleRequest> requestObserver; |
| private SimpleResponse response; |
| private Throwable status; |
| |
| @Setup |
| public void setUp(TransportBenchmark bench) { |
| requestObserver = bench.asyncStub |
| .withExecutor(executor) |
| .streamingCall(new StreamObserver<SimpleResponse>() { |
| @Override public void onNext(SimpleResponse next) { |
| assert response == null; |
| response = next; |
| } |
| |
| @Override public void onError(Throwable t) { |
| status = t; |
| } |
| |
| @Override public void onCompleted() { |
| status = OK_THROWABLE; |
| } |
| }); |
| } |
| |
| /** Issues request and waits for response. */ |
| public SimpleResponse pingPong(SimpleRequest request) throws InterruptedException { |
| requestObserver.onNext(request); |
| while (true) { |
| executor.waitAndDrain(); |
| if (response != null) { |
| SimpleResponse savedResponse = response; |
| response = null; |
| return savedResponse; |
| } |
| if (status != null) { |
| throw new RuntimeException("Unexpected stream termination", status); |
| } |
| } |
| } |
| |
| @TearDown |
| public void tearDown() throws InterruptedException { |
| requestObserver.onCompleted(); |
| while (status == null) { |
| executor.waitAndDrain(); |
| } |
| if (status != OK_THROWABLE) { |
| throw new RuntimeException("Non-graceful stream shutdown", status); |
| } |
| } |
| } |
| |
| @Benchmark |
| @BenchmarkMode(Mode.Throughput) |
| @OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE) |
| @Threads(10) |
| public SimpleResponse streamingCallsByteThroughput(PingPongStreamState state) |
| throws InterruptedException { |
| return state.pingPong(BYTE_THROUGHPUT_REQUEST); |
| } |
| } |