blob: f6779daa0253c04b62a325ef62cf056f09980d3d [file] [log] [blame]
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.benchmarks.driver;
import com.sun.management.OperatingSystemMXBean;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.benchmarks.Transport;
import io.grpc.benchmarks.Utils;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
import io.grpc.benchmarks.proto.Control;
import io.grpc.benchmarks.proto.Messages;
import io.grpc.benchmarks.proto.Payloads;
import io.grpc.benchmarks.proto.Stats;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.epoll.Epoll;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.LogarithmicIterator;
import org.HdrHistogram.Recorder;
import org.apache.commons.math3.distribution.ExponentialDistribution;
/**
* Implements the client-side contract for the load testing scenarios.
*/
class LoadClient {
private static final Logger log = Logger.getLogger(LoadClient.class.getName());
private ByteBuf genericRequest;
private final Control.ClientConfig config;
private final ExponentialDistribution distribution;
private volatile boolean shutdown;
private final int threadCount;
ManagedChannel[] channels;
BenchmarkServiceGrpc.BenchmarkServiceBlockingStub[] blockingStubs;
BenchmarkServiceGrpc.BenchmarkServiceStub[] asyncStubs;
Recorder recorder;
private ExecutorService fixedThreadPool;
private Messages.SimpleRequest simpleRequest;
private final OperatingSystemMXBean osBean;
private long lastMarkCpuTime;
LoadClient(Control.ClientConfig config) throws Exception {
log.log(Level.INFO, "Client Config \n" + config.toString());
this.config = config;
// Create the channels
channels = new ManagedChannel[config.getClientChannels()];
for (int i = 0; i < config.getClientChannels(); i++) {
channels[i] =
Utils.newClientChannel(
Epoll.isAvailable() ? Transport.NETTY_EPOLL : Transport.NETTY_NIO,
Utils.parseSocketAddress(config.getServerTargets(i % config.getServerTargetsCount())),
config.hasSecurityParams(),
config.hasSecurityParams() && config.getSecurityParams().getUseTestCa(),
config.hasSecurityParams()
? config.getSecurityParams().getServerHostOverride()
: null,
Utils.DEFAULT_FLOW_CONTROL_WINDOW,
false);
}
// Create a stub per channel
if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) {
asyncStubs = new BenchmarkServiceGrpc.BenchmarkServiceStub[channels.length];
for (int i = 0; i < channels.length; i++) {
asyncStubs[i] = BenchmarkServiceGrpc.newStub(channels[i]);
}
} else {
blockingStubs = new BenchmarkServiceGrpc.BenchmarkServiceBlockingStub[channels.length];
for (int i = 0; i < channels.length; i++) {
blockingStubs[i] = BenchmarkServiceGrpc.newBlockingStub(channels[i]);
}
}
// Determine no of threads
if (config.getClientType() == Control.ClientType.SYNC_CLIENT) {
threadCount = config.getOutstandingRpcsPerChannel() * config.getClientChannels();
} else {
threadCount = config.getAsyncClientThreads() == 0
? Runtime.getRuntime().availableProcessors()
: config.getAsyncClientThreads();
}
// Use a fixed sized pool of daemon threads.
fixedThreadPool = Executors.newFixedThreadPool(threadCount,
new DefaultThreadFactory("client-worker", true));
// Create the load distribution
switch (config.getLoadParams().getLoadCase()) {
case CLOSED_LOOP:
distribution = null;
break;
case LOAD_NOT_SET:
distribution = null;
break;
case POISSON:
// Mean of exp distribution per thread is <no threads> / <offered load per second>
distribution = new ExponentialDistribution(
threadCount / config.getLoadParams().getPoisson().getOfferedLoad());
break;
default:
throw new IllegalArgumentException("Scenario not implemented");
}
// Create payloads
switch (config.getPayloadConfig().getPayloadCase()) {
case SIMPLE_PARAMS: {
Payloads.SimpleProtoParams simpleParams = config.getPayloadConfig().getSimpleParams();
simpleRequest = Utils.makeRequest(Messages.PayloadType.COMPRESSABLE,
simpleParams.getReqSize(), simpleParams.getRespSize());
break;
}
case BYTEBUF_PARAMS: {
PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
genericRequest = alloc.buffer(config.getPayloadConfig().getBytebufParams().getRespSize());
if (genericRequest.capacity() > 0) {
genericRequest.writerIndex(genericRequest.capacity() - 1);
}
break;
}
default: {
// Not implemented yet
throw new IllegalArgumentException("Scenario not implemented");
}
}
List<OperatingSystemMXBean> beans =
ManagementFactory.getPlatformMXBeans(OperatingSystemMXBean.class);
if (!beans.isEmpty()) {
osBean = beans.get(0);
} else {
osBean = null;
}
// Create the histogram recorder
recorder = new Recorder((long) config.getHistogramParams().getMaxPossible(), 3);
}
/**
* Start the load scenario.
*/
void start() {
Runnable r;
for (int i = 0; i < threadCount; i++) {
r = null;
switch (config.getPayloadConfig().getPayloadCase()) {
case SIMPLE_PARAMS: {
if (config.getClientType() == Control.ClientType.SYNC_CLIENT) {
if (config.getRpcType() == Control.RpcType.UNARY) {
r = new BlockingUnaryWorker(blockingStubs[i % blockingStubs.length]);
}
} else if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) {
if (config.getRpcType() == Control.RpcType.UNARY) {
r = new AsyncUnaryWorker(asyncStubs[i % asyncStubs.length]);
} else if (config.getRpcType() == Control.RpcType.STREAMING) {
r = new AsyncPingPongWorker(asyncStubs[i % asyncStubs.length]);
}
}
break;
}
case BYTEBUF_PARAMS: {
if (config.getClientType() == Control.ClientType.SYNC_CLIENT) {
if (config.getRpcType() == Control.RpcType.UNARY) {
r = new GenericBlockingUnaryWorker(channels[i % channels.length]);
}
} else if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) {
if (config.getRpcType() == Control.RpcType.UNARY) {
r = new GenericAsyncUnaryWorker(channels[i % channels.length]);
} else if (config.getRpcType() == Control.RpcType.STREAMING) {
r = new GenericAsyncPingPongWorker(channels[i % channels.length]);
}
}
break;
}
default: {
throw Status.UNIMPLEMENTED.withDescription(
"Unknown payload case " + config.getPayloadConfig().getPayloadCase().name())
.asRuntimeException();
}
}
if (r == null) {
throw new IllegalStateException(config.getRpcType().name()
+ " not supported for client type "
+ config.getClientType());
}
fixedThreadPool.execute(r);
}
if (osBean != null) {
lastMarkCpuTime = osBean.getProcessCpuTime();
}
}
/**
* Take a snapshot of the statistics which can be returned to the driver.
*/
Stats.ClientStats getStats() {
Histogram intervalHistogram = recorder.getIntervalHistogram();
Stats.ClientStats.Builder statsBuilder = Stats.ClientStats.newBuilder();
Stats.HistogramData.Builder latenciesBuilder = statsBuilder.getLatenciesBuilder();
double resolution = 1.0 + Math.max(config.getHistogramParams().getResolution(), 0.01);
LogarithmicIterator logIterator = new LogarithmicIterator(intervalHistogram, 1,
resolution);
double base = 1;
while (logIterator.hasNext()) {
latenciesBuilder.addBucket((int) logIterator.next().getCountAddedInThisIterationStep());
base = base * resolution;
}
// Driver expects values for all buckets in the range, not just the range of buckets that
// have values.
while (base < config.getHistogramParams().getMaxPossible()) {
latenciesBuilder.addBucket(0);
base = base * resolution;
}
latenciesBuilder.setMaxSeen(intervalHistogram.getMaxValue());
latenciesBuilder.setMinSeen(intervalHistogram.getMinNonZeroValue());
latenciesBuilder.setCount(intervalHistogram.getTotalCount());
latenciesBuilder.setSum(intervalHistogram.getMean()
* intervalHistogram.getTotalCount());
// TODO: No support for sum of squares
statsBuilder.setTimeElapsed((intervalHistogram.getEndTimeStamp()
- intervalHistogram.getStartTimeStamp()) / 1000.0);
if (osBean != null) {
// Report all the CPU time as user-time (which is intentionally incorrect)
long nowCpu = osBean.getProcessCpuTime();
statsBuilder.setTimeUser(((double) nowCpu - lastMarkCpuTime) / 1000000000.0);
lastMarkCpuTime = nowCpu;
}
return statsBuilder.build();
}
/**
* Shutdown the scenario as cleanly as possible.
*/
void shutdownNow() {
shutdown = true;
for (int i = 0; i < channels.length; i++) {
// Initiate channel shutdown
channels[i].shutdown();
}
for (int i = 0; i < channels.length; i++) {
try {
// Wait for channel termination
channels[i].awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
channels[i].shutdownNow();
}
}
fixedThreadPool.shutdownNow();
}
/**
* Record the event elapsed time to the histogram and delay initiation of the next event based
* on the load distribution.
*/
void delay(long alreadyElapsed) {
recorder.recordValue(alreadyElapsed);
if (distribution != null) {
long nextPermitted = Math.round(distribution.sample() * 1000000000.0);
if (nextPermitted > alreadyElapsed) {
LockSupport.parkNanos(nextPermitted - alreadyElapsed);
}
}
}
/**
* Worker which executes blocking unary calls. Event timing is the duration between sending the
* request and receiving the response.
*/
class BlockingUnaryWorker implements Runnable {
final BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub;
private BlockingUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub) {
this.stub = stub;
}
@Override
public void run() {
while (!shutdown) {
long now = System.nanoTime();
stub.unaryCall(simpleRequest);
delay(System.nanoTime() - now);
}
}
}
/**
* Worker which executes async unary calls. Event timing is the duration between sending the
* request and receiving the response.
*/
private class AsyncUnaryWorker implements Runnable {
final BenchmarkServiceGrpc.BenchmarkServiceStub stub;
final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
AsyncUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub) {
this.stub = stub;
}
@Override
public void run() {
while (true) {
maxOutstanding.acquireUninterruptibly();
if (shutdown) {
maxOutstanding.release();
return;
}
stub.unaryCall(simpleRequest, new StreamObserver<Messages.SimpleResponse>() {
long now = System.nanoTime();
@Override
public void onNext(Messages.SimpleResponse value) {
}
@Override
public void onError(Throwable t) {
maxOutstanding.release();
Level level = shutdown ? Level.FINE : Level.INFO;
log.log(level, "Error in AsyncUnary call", t);
}
@Override
public void onCompleted() {
delay(System.nanoTime() - now);
maxOutstanding.release();
}
});
}
}
}
/**
* Worker which executes a streaming ping-pong call. Event timing is the duration between
* sending the ping and receiving the pong.
*/
private class AsyncPingPongWorker implements Runnable {
final BenchmarkServiceGrpc.BenchmarkServiceStub stub;
final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
AsyncPingPongWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub) {
this.stub = stub;
}
@Override
public void run() {
while (!shutdown) {
maxOutstanding.acquireUninterruptibly();
final AtomicReference<StreamObserver<Messages.SimpleRequest>> requestObserver =
new AtomicReference<StreamObserver<Messages.SimpleRequest>>();
requestObserver.set(stub.streamingCall(
new StreamObserver<Messages.SimpleResponse>() {
long now = System.nanoTime();
@Override
public void onNext(Messages.SimpleResponse value) {
delay(System.nanoTime() - now);
if (shutdown) {
requestObserver.get().onCompleted();
// Must not send another request.
return;
}
requestObserver.get().onNext(simpleRequest);
now = System.nanoTime();
}
@Override
public void onError(Throwable t) {
maxOutstanding.release();
Level level = shutdown ? Level.FINE : Level.INFO;
log.log(level, "Error in Async Ping-Pong call", t);
}
@Override
public void onCompleted() {
maxOutstanding.release();
}
}));
requestObserver.get().onNext(simpleRequest);
}
}
}
/**
* Worker which executes generic blocking unary calls. Event timing is the duration between
* sending the request and receiving the response.
*/
private class GenericBlockingUnaryWorker implements Runnable {
final Channel channel;
GenericBlockingUnaryWorker(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
long now;
while (!shutdown) {
now = System.nanoTime();
ClientCalls.blockingUnaryCall(channel, LoadServer.GENERIC_UNARY_METHOD,
CallOptions.DEFAULT,
genericRequest.slice());
delay(System.nanoTime() - now);
}
}
}
/**
* Worker which executes generic async unary calls. Event timing is the duration between
* sending the request and receiving the response.
*/
private class GenericAsyncUnaryWorker implements Runnable {
final Channel channel;
final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
GenericAsyncUnaryWorker(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
while (true) {
maxOutstanding.acquireUninterruptibly();
if (shutdown) {
maxOutstanding.release();
return;
}
ClientCalls.asyncUnaryCall(
channel.newCall(LoadServer.GENERIC_UNARY_METHOD, CallOptions.DEFAULT),
genericRequest.slice(),
new StreamObserver<ByteBuf>() {
long now = System.nanoTime();
@Override
public void onNext(ByteBuf value) {
}
@Override
public void onError(Throwable t) {
maxOutstanding.release();
Level level = shutdown ? Level.FINE : Level.INFO;
log.log(level, "Error in Generic Async Unary call", t);
}
@Override
public void onCompleted() {
delay(System.nanoTime() - now);
maxOutstanding.release();
}
});
}
}
}
/**
* Worker which executes a streaming ping-pong call. Event timing is the duration between
* sending the ping and receiving the pong.
*/
private class GenericAsyncPingPongWorker implements Runnable {
final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
final Channel channel;
GenericAsyncPingPongWorker(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
while (true) {
maxOutstanding.acquireUninterruptibly();
if (shutdown) {
maxOutstanding.release();
return;
}
final ClientCall<ByteBuf, ByteBuf> call =
channel.newCall(LoadServer.GENERIC_STREAMING_PING_PONG_METHOD, CallOptions.DEFAULT);
call.start(new ClientCall.Listener<ByteBuf>() {
long now = System.nanoTime();
@Override
public void onMessage(ByteBuf message) {
delay(System.nanoTime() - now);
if (shutdown) {
call.cancel("Shutting down", null);
return;
}
call.request(1);
call.sendMessage(genericRequest.slice());
now = System.nanoTime();
}
@Override
public void onClose(Status status, Metadata trailers) {
maxOutstanding.release();
Level level = shutdown ? Level.FINE : Level.INFO;
if (!status.isOk() && status.getCode() != Status.Code.CANCELLED) {
log.log(level, "Error in Generic Async Ping-Pong call", status.getCause());
}
}
}, new Metadata());
call.request(1);
call.sendMessage(genericRequest.slice());
}
}
}
}