blob: 1b5571b5deafe46ad244b33421033b30f118d4b6 [file] [log] [blame]
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.benchmarks;
import com.squareup.okhttp.HttpUrl;
import com.squareup.okhttp.internal.SslContextBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.SslHandler;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
/** Netty isn't an HTTP client, but it's almost one. */
class NettyHttpClient implements HttpClient {
private static final boolean VERBOSE = false;
// Guarded by this. Real apps need more capable connection management.
private final Deque<HttpChannel> freeChannels = new ArrayDeque<>();
private final Deque<HttpUrl> backlog = new ArrayDeque<>();
private int totalChannels = 0;
private int concurrencyLevel;
private int targetBacklog;
private Bootstrap bootstrap;
@Override public void prepare(final Benchmark benchmark) {
this.concurrencyLevel = benchmark.concurrencyLevel;
this.targetBacklog = benchmark.targetBacklog;
ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() {
@Override public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
if (benchmark.tls) {
SSLContext sslContext = SslContextBuilder.localhost();
SSLEngine engine = sslContext.createSSLEngine();
engine.setUseClientMode(true);
pipeline.addLast("ssl", new SslHandler(engine));
}
pipeline.addLast("codec", new HttpClientCodec());
pipeline.addLast("inflater", new HttpContentDecompressor());
pipeline.addLast("handler", new HttpChannel(channel));
}
};
bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(concurrencyLevel))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(NioSocketChannel.class)
.handler(channelInitializer);
}
@Override public void enqueue(HttpUrl url) throws Exception {
HttpChannel httpChannel = null;
synchronized (this) {
if (!freeChannels.isEmpty()) {
httpChannel = freeChannels.pop();
} else if (totalChannels < concurrencyLevel) {
totalChannels++; // Create a new channel. (outside of the synchronized block).
} else {
backlog.add(url); // Enqueue this for later, to be picked up when another request completes.
return;
}
}
if (httpChannel == null) {
Channel channel = bootstrap.connect(url.host(), url.port())
.sync().channel();
httpChannel = (HttpChannel) channel.pipeline().last();
}
httpChannel.sendRequest(url);
}
@Override public synchronized boolean acceptingJobs() {
return backlog.size() < targetBacklog || hasFreeChannels();
}
private boolean hasFreeChannels() {
int activeChannels = totalChannels - freeChannels.size();
return activeChannels < concurrencyLevel;
}
private void release(HttpChannel httpChannel) {
HttpUrl url;
synchronized (this) {
url = backlog.pop();
if (url == null) {
// There were no URLs in the backlog. Pool this channel for later.
freeChannels.push(httpChannel);
return;
}
}
// We removed a URL from the backlog. Schedule it right away.
httpChannel.sendRequest(url);
}
class HttpChannel extends SimpleChannelInboundHandler<HttpObject> {
private final SocketChannel channel;
byte[] buffer = new byte[1024];
int total;
long start;
public HttpChannel(SocketChannel channel) {
this.channel = channel;
}
private void sendRequest(HttpUrl url) {
start = System.nanoTime();
total = 0;
HttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, url.encodedPath());
request.headers().set(HttpHeaders.Names.HOST, url.host());
request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
channel.writeAndFlush(request);
}
@Override protected void channelRead0(
ChannelHandlerContext context, HttpObject message) throws Exception {
if (message instanceof HttpResponse) {
receive((HttpResponse) message);
}
if (message instanceof HttpContent) {
receive((HttpContent) message);
if (message instanceof LastHttpContent) {
release(this);
}
}
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
void receive(HttpResponse response) {
// Don't do anything with headers.
}
void receive(HttpContent content) {
// Consume the response body.
ByteBuf byteBuf = content.content();
for (int toRead; (toRead = byteBuf.readableBytes()) > 0; ) {
byteBuf.readBytes(buffer, 0, Math.min(buffer.length, toRead));
total += toRead;
}
if (VERBOSE && content instanceof LastHttpContent) {
long finish = System.nanoTime();
System.out.println(String.format("Transferred % 8d bytes in %4d ms",
total, TimeUnit.NANOSECONDS.toMillis(finish - start)));
}
}
@Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
System.out.println("Failed: " + cause);
}
}
}