blob: 2170e1f829bb04f0503397635d56dd715eaa2a1f [file] [log] [blame]
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import com.google.common.base.Preconditions;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
import io.grpc.InternalKnownTransport;
import io.grpc.InternalMethodDescriptor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.AbstractClientStream;
import io.grpc.internal.Http2ClientStreamTransportState;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import javax.annotation.Nullable;
/**
* Client stream for a Netty transport. Must only be called from the sending application
* thread.
*/
class NettyClientStream extends AbstractClientStream {
private static final InternalMethodDescriptor methodDescriptorAccessor =
new InternalMethodDescriptor(InternalKnownTransport.NETTY);
private final Sink sink = new Sink();
private final TransportState state;
private final WriteQueue writeQueue;
private final MethodDescriptor<?, ?> method;
private final Channel channel;
private AsciiString authority;
private final AsciiString scheme;
private final AsciiString userAgent;
NettyClientStream(
TransportState state,
MethodDescriptor<?, ?> method,
Metadata headers,
Channel channel,
AsciiString authority,
AsciiString scheme,
AsciiString userAgent,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
super(
new NettyWritableBufferAllocator(channel.alloc()),
statsTraceCtx,
transportTracer,
headers,
useGet(method));
this.state = checkNotNull(state, "transportState");
this.writeQueue = state.handler.getWriteQueue();
this.method = checkNotNull(method, "method");
this.channel = checkNotNull(channel, "channel");
this.authority = checkNotNull(authority, "authority");
this.scheme = checkNotNull(scheme, "scheme");
this.userAgent = userAgent;
}
@Override
protected TransportState transportState() {
return state;
}
@Override
protected Sink abstractClientStreamSink() {
return sink;
}
@Override
public void setAuthority(String authority) {
this.authority = AsciiString.of(checkNotNull(authority, "authority"));
}
@Override
public Attributes getAttributes() {
return state.handler.getAttributes();
}
private static boolean useGet(MethodDescriptor<?, ?> method) {
return method.isSafe();
}
private class Sink implements AbstractClientStream.Sink {
@SuppressWarnings("BetaApi") // BaseEncoding is stable in Guava 20.0
@Override
public void writeHeaders(Metadata headers, byte[] requestPayload) {
// Convert the headers into Netty HTTP/2 headers.
AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
if (defaultPath == null) {
defaultPath = new AsciiString("/" + method.getFullMethodName());
methodDescriptorAccessor.setRawMethodName(method, defaultPath);
}
boolean get = (requestPayload != null);
AsciiString httpMethod;
if (get) {
// Forge the query string
// TODO(ericgribkoff) Add the key back to the query string
defaultPath =
new AsciiString(defaultPath + "?" + BaseEncoding.base64().encode(requestPayload));
httpMethod = Utils.HTTP_GET_METHOD;
} else {
httpMethod = Utils.HTTP_METHOD;
}
Http2Headers http2Headers = Utils.convertClientHeaders(headers, scheme, defaultPath,
authority, httpMethod, userAgent);
ChannelFutureListener failureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// Stream creation failed. Close the stream if not already closed.
// When the channel is shutdown, the lifecycle manager has a better view of the failure,
// especially before negotiation completes (because the negotiator commonly doesn't
// receive the execeptionCaught because NettyClientHandler does not propagate it).
Status s = transportState().handler.getLifecycleManager().getShutdownStatus();
if (s == null) {
s = transportState().statusFromFailedFuture(future);
}
transportState().transportReportStatus(s, true, new Metadata());
}
}
};
// Write the command requesting the creation of the stream.
writeQueue.enqueue(new CreateStreamCommand(http2Headers, transportState(), get),
!method.getType().clientSendsOneMessage() || get).addListener(failureListener);
}
@Override
public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
if (numBytes > 0) {
// Add the bytes to outbound flow control.
onSendingBytes(numBytes);
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// If the future succeeds when http2stream is null, the stream has been cancelled
// before it began and Netty is purging pending writes from the flow-controller.
if (future.isSuccess() && transportState().http2Stream() != null) {
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
transportState().onSentBytes(numBytes);
NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages);
}
}
});
} else {
// The frame is empty and will not impact outbound flow control. Just send it.
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
}
}
@Override
public void request(final int numMessages) {
if (channel.eventLoop().inEventLoop()) {
// Processing data read in the event loop so can call into the deframer immediately
transportState().requestMessagesFromDeframer(numMessages);
} else {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
transportState().requestMessagesFromDeframer(numMessages);
}
});
}
}
@Override
public void cancel(Status status) {
writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
}
}
/** This should only called from the transport thread. */
public abstract static class TransportState extends Http2ClientStreamTransportState
implements StreamIdHolder {
private final NettyClientHandler handler;
private final EventLoop eventLoop;
private int id;
private Http2Stream http2Stream;
public TransportState(
NettyClientHandler handler,
EventLoop eventLoop,
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer);
this.handler = checkNotNull(handler, "handler");
this.eventLoop = checkNotNull(eventLoop, "eventLoop");
}
@Override
public int id() {
return id;
}
public void setId(int id) {
checkArgument(id > 0, "id must be positive");
this.id = id;
}
/**
* Sets the underlying Netty {@link Http2Stream} for this stream. This must be called in the
* context of the transport thread.
*/
public void setHttp2Stream(Http2Stream http2Stream) {
checkNotNull(http2Stream, "http2Stream");
checkState(this.http2Stream == null, "Can only set http2Stream once");
this.http2Stream = http2Stream;
// Now that the stream has actually been initialized, call the listener's onReady callback if
// appropriate.
onStreamAllocated();
getTransportTracer().reportLocalStreamStarted();
}
/**
* Gets the underlying Netty {@link Http2Stream} for this stream.
*/
@Nullable
public Http2Stream http2Stream() {
return http2Stream;
}
/**
* Intended to be overridden by NettyClientTransport, which has more information about failures.
* May only be called from event loop.
*/
protected abstract Status statusFromFailedFuture(ChannelFuture f);
@Override
protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
transportReportStatus(status, stopDelivery, trailers);
handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, status), true);
}
@Override
public void runOnTransportThread(final Runnable r) {
if (eventLoop.inEventLoop()) {
r.run();
} else {
eventLoop.execute(r);
}
}
@Override
public void bytesRead(int processedBytes) {
handler.returnProcessedBytes(http2Stream, processedBytes);
handler.getWriteQueue().scheduleFlush();
}
@Override
public void deframeFailed(Throwable cause) {
http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
}
void transportHeadersReceived(Http2Headers headers, boolean endOfStream) {
if (endOfStream) {
if (!isOutboundClosed()) {
handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true);
}
transportTrailersReceived(Utils.convertTrailers(headers));
} else {
transportHeadersReceived(Utils.convertHeaders(headers));
}
}
void transportDataReceived(ByteBuf frame, boolean endOfStream) {
transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
}
}
}