blob: f82d87cade0e3ac2ff1d4b23f9fa46d5b4fa7e72 [file] [log] [blame]
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.DecompressorRegistry;
import io.grpc.InternalDecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
private static final Logger log = Logger.getLogger(ServerCallImpl.class.getName());
@VisibleForTesting
static final String TOO_MANY_RESPONSES = "Too many responses";
@VisibleForTesting
static final String MISSING_RESPONSE = "Completed without a response";
private final ServerStream stream;
private final MethodDescriptor<ReqT, RespT> method;
private final Tag tag;
private final Context.CancellableContext context;
private final byte[] messageAcceptEncoding;
private final DecompressorRegistry decompressorRegistry;
private final CompressorRegistry compressorRegistry;
private CallTracer serverCallTracer;
// state
private volatile boolean cancelled;
private boolean sendHeadersCalled;
private boolean closeCalled;
private Compressor compressor;
private boolean messageSent;
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
Metadata inboundHeaders, Context.CancellableContext context,
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
CallTracer serverCallTracer, Tag tag) {
this.stream = stream;
this.method = method;
this.context = context;
this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
this.decompressorRegistry = decompressorRegistry;
this.compressorRegistry = compressorRegistry;
this.serverCallTracer = serverCallTracer;
this.serverCallTracer.reportCallStarted();
this.tag = tag;
}
@Override
public void request(int numMessages) {
PerfMark.startTask("ServerCall.request", tag);
try {
stream.request(numMessages);
} finally {
PerfMark.stopTask("ServerCall.request", tag);
}
}
@Override
public void sendHeaders(Metadata headers) {
PerfMark.startTask("ServerCall.sendHeaders", tag);
try {
sendHeadersInternal(headers);
} finally {
PerfMark.stopTask("ServerCall.sendHeaders", tag);
}
}
private void sendHeadersInternal(Metadata headers) {
checkState(!sendHeadersCalled, "sendHeaders has already been called");
checkState(!closeCalled, "call is closed");
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor == null) {
compressor = Codec.Identity.NONE;
} else {
if (messageAcceptEncoding != null) {
// TODO(carl-mastrangelo): remove the string allocation.
if (!GrpcUtil.iterableContains(
ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)),
compressor.getMessageEncoding())) {
// resort to using no compression.
compressor = Codec.Identity.NONE;
}
} else {
compressor = Codec.Identity.NONE;
}
}
// Always put compressor, even if it's identity.
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
stream.setCompressor(compressor);
headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
byte[] advertisedEncodings =
InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
if (advertisedEncodings.length != 0) {
headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
}
// Don't check if sendMessage has been called, since it requires that sendHeaders was already
// called.
sendHeadersCalled = true;
stream.writeHeaders(headers);
}
@Override
public void sendMessage(RespT message) {
PerfMark.startTask("ServerCall.sendMessage", tag);
try {
sendMessageInternal(message);
} finally {
PerfMark.stopTask("ServerCall.sendMessage", tag);
}
}
private void sendMessageInternal(RespT message) {
checkState(sendHeadersCalled, "sendHeaders has not been called");
checkState(!closeCalled, "call is closed");
if (method.getType().serverSendsOneMessage() && messageSent) {
internalClose(Status.INTERNAL.withDescription(TOO_MANY_RESPONSES));
return;
}
messageSent = true;
try {
InputStream resp = method.streamResponse(message);
stream.writeMessage(resp);
stream.flush();
} catch (RuntimeException e) {
close(Status.fromThrowable(e), new Metadata());
} catch (Error e) {
close(
Status.CANCELLED.withDescription("Server sendMessage() failed with Error"),
new Metadata());
throw e;
}
}
@Override
public void setMessageCompression(boolean enable) {
stream.setMessageCompression(enable);
}
@Override
public void setCompression(String compressorName) {
// Added here to give a better error message.
checkState(!sendHeadersCalled, "sendHeaders has been called");
compressor = compressorRegistry.lookupCompressor(compressorName);
checkArgument(compressor != null, "Unable to find compressor by name %s", compressorName);
}
@Override
public boolean isReady() {
return stream.isReady();
}
@Override
public void close(Status status, Metadata trailers) {
PerfMark.startTask("ServerCall.close", tag);
try {
closeInternal(status, trailers);
} finally {
PerfMark.stopTask("ServerCall.close", tag);
}
}
private void closeInternal(Status status, Metadata trailers) {
checkState(!closeCalled, "call already closed");
try {
closeCalled = true;
if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) {
internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE));
return;
}
stream.close(status, trailers);
} finally {
serverCallTracer.reportCallEnded(status.isOk());
}
}
@Override
public boolean isCancelled() {
return cancelled;
}
ServerStreamListener newServerStreamListener(ServerCall.Listener<ReqT> listener) {
return new ServerStreamListenerImpl<>(this, listener, context);
}
@Override
public Attributes getAttributes() {
return stream.getAttributes();
}
@Override
public String getAuthority() {
return stream.getAuthority();
}
@Override
public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
return method;
}
/**
* Close the {@link ServerStream} because an internal error occurred. Allow the application to
* run until completion, but silently ignore interactions with the {@link ServerStream} from now
* on.
*/
private void internalClose(Status internalError) {
log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError});
stream.cancel(internalError);
serverCallTracer.reportCallEnded(internalError.isOk()); // error so always false
}
/**
* All of these callbacks are assumed to called on an application thread, and the caller is
* responsible for handling thrown exceptions.
*/
@VisibleForTesting
static final class ServerStreamListenerImpl<ReqT> implements ServerStreamListener {
private final ServerCallImpl<ReqT, ?> call;
private final ServerCall.Listener<ReqT> listener;
private final Context.CancellableContext context;
public ServerStreamListenerImpl(
ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener,
Context.CancellableContext context) {
this.call = checkNotNull(call, "call");
this.listener = checkNotNull(listener, "listener must not be null");
this.context = checkNotNull(context, "context");
// Wire ourselves up so that if the context is cancelled, our flag call.cancelled also
// reflects the new state. Use a DirectExecutor so that it happens in the same thread
// as the caller of {@link Context#cancel}.
this.context.addListener(
new Context.CancellationListener() {
@Override
public void cancelled(Context context) {
// If the context has a cancellation cause then something exceptional happened
// and we should also mark the call as cancelled.
if (context.cancellationCause() != null) {
ServerStreamListenerImpl.this.call.cancelled = true;
}
}
},
MoreExecutors.directExecutor());
}
@Override
public void messagesAvailable(MessageProducer producer) {
PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag);
try {
messagesAvailableInternal(producer);
} finally {
PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag);
}
}
@SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
private void messagesAvailableInternal(final MessageProducer producer) {
if (call.cancelled) {
GrpcUtil.closeQuietly(producer);
return;
}
InputStream message;
try {
while ((message = producer.next()) != null) {
try {
listener.onMessage(call.method.parseRequest(message));
} catch (Throwable t) {
GrpcUtil.closeQuietly(message);
throw t;
}
message.close();
}
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
Throwables.throwIfUnchecked(t);
throw new RuntimeException(t);
}
}
@Override
public void halfClosed() {
PerfMark.startTask("ServerStreamListener.halfClosed", call.tag);
try {
if (call.cancelled) {
return;
}
listener.onHalfClose();
} finally {
PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag);
}
}
@Override
public void closed(Status status) {
PerfMark.startTask("ServerStreamListener.closed", call.tag);
try {
closedInternal(status);
} finally {
PerfMark.stopTask("ServerStreamListener.closed", call.tag);
}
}
private void closedInternal(Status status) {
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
}
} finally {
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
// Note that in failure situations JumpToApplicationThreadServerStreamListener has already
// closed the context. In these situations this cancel() call will be a no-op.
context.cancel(null);
}
}
@Override
public void onReady() {
PerfMark.startTask("ServerStreamListener.onReady", call.tag);
try {
if (call.cancelled) {
return;
}
listener.onReady();
} finally {
PerfMark.stopTask("ServerCall.closed", call.tag);
}
}
}
}