blob: e453dd8bc91ecf113a5b2c3ac6619ae6490e99b2 [file] [log] [blame]
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.benchmarks.qps;
import static io.grpc.benchmarks.Utils.HISTOGRAM_MAX_VALUE;
import static io.grpc.benchmarks.Utils.HISTOGRAM_PRECISION;
import static io.grpc.benchmarks.Utils.saveHistogram;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CHANNELS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DIRECTEXECUTOR;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.FLOW_CONTROL_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.OUTSTANDING_RPCS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAMING_RPCS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.WARMUP_DURATION;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc.BenchmarkServiceStub;
import io.grpc.benchmarks.proto.Messages.Payload;
import io.grpc.benchmarks.proto.Messages.SimpleRequest;
import io.grpc.benchmarks.proto.Messages.SimpleResponse;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramIterationValue;
/**
* QPS Client using the non-blocking API.
*/
public class AsyncClient {
private final ClientConfiguration config;
public AsyncClient(ClientConfiguration config) {
this.config = config;
}
/**
* Start the QPS Client.
*/
public void run() throws Exception {
if (config == null) {
return;
}
SimpleRequest req = newRequest();
List<ManagedChannel> channels = new ArrayList<>(config.channels);
for (int i = 0; i < config.channels; i++) {
channels.add(config.newChannel());
}
// Do a warmup first. It's the same as the actual benchmark, except that
// we ignore the statistics.
warmup(req, channels);
long startTime = System.nanoTime();
long endTime = startTime + TimeUnit.SECONDS.toNanos(config.duration);
List<Histogram> histograms = doBenchmark(req, channels, endTime);
long elapsedTime = System.nanoTime() - startTime;
Histogram merged = merge(histograms);
printStats(merged, elapsedTime);
if (config.histogramFile != null) {
saveHistogram(merged, config.histogramFile);
}
shutdown(channels);
}
private SimpleRequest newRequest() {
ByteString body = ByteString.copyFrom(new byte[config.clientPayload]);
Payload payload = Payload.newBuilder().setType(config.payloadType).setBody(body).build();
return SimpleRequest.newBuilder()
.setResponseType(config.payloadType)
.setResponseSize(config.serverPayload)
.setPayload(payload)
.build();
}
private void warmup(SimpleRequest req, List<? extends Channel> channels) throws Exception {
long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(config.warmupDuration);
doBenchmark(req, channels, endTime);
// I don't know if this helps, but it doesn't hurt trying. We sometimes run warmups
// of several minutes at full load and it would be nice to start the actual benchmark
// with a clean heap.
System.gc();
}
private List<Histogram> doBenchmark(SimpleRequest req,
List<? extends Channel> channels,
long endTime) throws Exception {
// Initiate the concurrent calls
List<Future<Histogram>> futures =
new ArrayList<Future<Histogram>>(config.outstandingRpcsPerChannel);
for (int i = 0; i < config.channels; i++) {
for (int j = 0; j < config.outstandingRpcsPerChannel; j++) {
Channel channel = channels.get(i);
futures.add(doRpcs(channel, req, endTime));
}
}
// Wait for completion
List<Histogram> histograms = new ArrayList<>(futures.size());
for (Future<Histogram> future : futures) {
histograms.add(future.get());
}
return histograms;
}
private Future<Histogram> doRpcs(Channel channel, SimpleRequest request, long endTime) {
switch (config.rpcType) {
case UNARY:
return doUnaryCalls(channel, request, endTime);
case STREAMING:
return doStreamingCalls(channel, request, endTime);
default:
throw new IllegalStateException("unsupported rpc type");
}
}
private Future<Histogram> doUnaryCalls(Channel channel, final SimpleRequest request,
final long endTime) {
final BenchmarkServiceStub stub = BenchmarkServiceGrpc.newStub(channel);
final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
final HistogramFuture future = new HistogramFuture(histogram);
stub.unaryCall(request, new StreamObserver<SimpleResponse>() {
long lastCall = System.nanoTime();
@Override
public void onNext(SimpleResponse value) {
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
System.err.println("Encountered an error in unaryCall. Status is " + status);
t.printStackTrace();
future.cancel(true);
}
@Override
public void onCompleted() {
long now = System.nanoTime();
// Record the latencies in microseconds
histogram.recordValue((now - lastCall) / 1000);
lastCall = now;
if (endTime - now > 0) {
stub.unaryCall(request, this);
} else {
future.done();
}
}
});
return future;
}
private static Future<Histogram> doStreamingCalls(Channel channel, final SimpleRequest request,
final long endTime) {
final BenchmarkServiceStub stub = BenchmarkServiceGrpc.newStub(channel);
final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
final HistogramFuture future = new HistogramFuture(histogram);
ThisIsAHackStreamObserver responseObserver =
new ThisIsAHackStreamObserver(request, histogram, future, endTime);
StreamObserver<SimpleRequest> requestObserver = stub.streamingCall(responseObserver);
responseObserver.requestObserver = requestObserver;
requestObserver.onNext(request);
return future;
}
/**
* This seems necessary as we need to reference the requestObserver in the responseObserver.
* The alternative would be to use the channel layer directly.
*/
private static class ThisIsAHackStreamObserver implements StreamObserver<SimpleResponse> {
final SimpleRequest request;
final Histogram histogram;
final HistogramFuture future;
final long endTime;
long lastCall = System.nanoTime();
StreamObserver<SimpleRequest> requestObserver;
ThisIsAHackStreamObserver(SimpleRequest request,
Histogram histogram,
HistogramFuture future,
long endTime) {
this.request = request;
this.histogram = histogram;
this.future = future;
this.endTime = endTime;
}
@Override
public void onNext(SimpleResponse value) {
long now = System.nanoTime();
// Record the latencies in microseconds
histogram.recordValue((now - lastCall) / 1000);
lastCall = now;
if (endTime - now > 0) {
requestObserver.onNext(request);
} else {
requestObserver.onCompleted();
}
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
System.err.println("Encountered an error in streamingCall. Status is " + status);
t.printStackTrace();
future.cancel(true);
}
@Override
public void onCompleted() {
future.done();
}
}
private static Histogram merge(List<Histogram> histograms) {
Histogram merged = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
for (Histogram histogram : histograms) {
for (HistogramIterationValue value : histogram.allValues()) {
long latency = value.getValueIteratedTo();
long count = value.getCountAtValueIteratedTo();
merged.recordValueWithCount(latency, count);
}
}
return merged;
}
private void printStats(Histogram histogram, long elapsedTime) {
long latency50 = histogram.getValueAtPercentile(50);
long latency90 = histogram.getValueAtPercentile(90);
long latency95 = histogram.getValueAtPercentile(95);
long latency99 = histogram.getValueAtPercentile(99);
long latency999 = histogram.getValueAtPercentile(99.9);
long latencyMax = histogram.getValueAtPercentile(100);
long queriesPerSecond = histogram.getTotalCount() * 1000000000L / elapsedTime;
StringBuilder values = new StringBuilder();
values.append("Channels: ").append(config.channels).append('\n')
.append("Outstanding RPCs per Channel: ")
.append(config.outstandingRpcsPerChannel).append('\n')
.append("Server Payload Size: ").append(config.serverPayload).append('\n')
.append("Client Payload Size: ").append(config.clientPayload).append('\n')
.append("50%ile Latency (in micros): ").append(latency50).append('\n')
.append("90%ile Latency (in micros): ").append(latency90).append('\n')
.append("95%ile Latency (in micros): ").append(latency95).append('\n')
.append("99%ile Latency (in micros): ").append(latency99).append('\n')
.append("99.9%ile Latency (in micros): ").append(latency999).append('\n')
.append("Maximum Latency (in micros): ").append(latencyMax).append('\n')
.append("QPS: ").append(queriesPerSecond).append('\n');
System.out.println(values);
}
private static void shutdown(List<ManagedChannel> channels) {
for (ManagedChannel channel : channels) {
channel.shutdown();
}
}
/**
* checkstyle complains if there is no javadoc comment here.
*/
public static void main(String... args) throws Exception {
ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder(
ADDRESS, CHANNELS, OUTSTANDING_RPCS, CLIENT_PAYLOAD, SERVER_PAYLOAD,
TLS, TESTCA, TRANSPORT, DURATION, WARMUP_DURATION, DIRECTEXECUTOR,
SAVE_HISTOGRAM, STREAMING_RPCS, FLOW_CONTROL_WINDOW);
ClientConfiguration config;
try {
config = configBuilder.build(args);
} catch (Exception e) {
System.out.println(e.getMessage());
configBuilder.printUsage();
return;
}
AsyncClient client = new AsyncClient(config);
client.run();
}
private static class HistogramFuture implements Future<Histogram> {
private final Histogram histogram;
private boolean canceled;
private boolean done;
HistogramFuture(Histogram histogram) {
Preconditions.checkNotNull(histogram, "histogram");
this.histogram = histogram;
}
@Override
public synchronized boolean cancel(boolean mayInterruptIfRunning) {
if (!done && !canceled) {
canceled = true;
notifyAll();
return true;
}
return false;
}
@Override
public synchronized boolean isCancelled() {
return canceled;
}
@Override
public synchronized boolean isDone() {
return done || canceled;
}
@Override
public synchronized Histogram get() throws InterruptedException {
while (!isDone() && !isCancelled()) {
wait();
}
if (isCancelled()) {
throw new CancellationException();
}
return histogram;
}
@Override
public Histogram get(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
private synchronized void done() {
done = true;
notifyAll();
}
}
}