blob: aac13a5f87f8764af43dc40344c49491d59aa5d3 [file] [log] [blame]
/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.stub;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.Status;
/**
* Utility functions for adapting {@link ServerCallHandler}s to application service implementation,
* meant to be used by the generated code.
*/
public final class ServerCalls {
@VisibleForTesting
static final String TOO_MANY_REQUESTS = "Too many requests";
@VisibleForTesting
static final String MISSING_REQUEST = "Half-closed without a request";
private ServerCalls() {
}
/**
* Creates a {@link ServerCallHandler} for a unary call method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
UnaryMethod<ReqT, RespT> method) {
return asyncUnaryRequestCall(method);
}
/**
* Creates a {@link ServerCallHandler} for a server streaming method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall(
ServerStreamingMethod<ReqT, RespT> method) {
return asyncUnaryRequestCall(method);
}
/**
* Creates a {@link ServerCallHandler} for a client streaming method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall(
ClientStreamingMethod<ReqT, RespT> method) {
return asyncStreamingRequestCall(method);
}
/**
* Creates a {@link ServerCallHandler} for a bidi streaming method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall(
BidiStreamingMethod<ReqT, RespT> method) {
return asyncStreamingRequestCall(method);
}
/**
* Adaptor to a unary call method.
*/
public interface UnaryMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {}
/**
* Adaptor to a server streaming method.
*/
public interface ServerStreamingMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {}
/**
* Adaptor to a client streaming method.
*/
public interface ClientStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {}
/**
* Adaptor to a bidirectional streaming method.
*/
public interface BidiStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {}
private static final class UnaryServerCallHandler<ReqT, RespT>
implements ServerCallHandler<ReqT, RespT> {
private final UnaryRequestMethod<ReqT, RespT> method;
// Non private to avoid synthetic class
UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method) {
this.method = method;
}
@Override
public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
Preconditions.checkArgument(
call.getMethodDescriptor().getType().clientSendsOneMessage(),
"asyncUnaryRequestCall is only for clientSendsOneMessage methods");
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<ReqT, RespT>(call);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, ServerCall will catch it. Note that disabling auto
// inbound flow control has no effect on unary calls.
call.request(2);
return new UnaryServerCallListener(responseObserver, call);
}
private final class UnaryServerCallListener extends ServerCall.Listener<ReqT> {
private final ServerCall<ReqT, RespT> call;
private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
private boolean canInvoke = true;
private ReqT request;
// Non private to avoid synthetic class
UnaryServerCallListener(
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
ServerCall<ReqT, RespT> call) {
this.call = call;
this.responseObserver = responseObserver;
}
@Override
public void onMessage(ReqT request) {
if (this.request != null) {
// Safe to close the call, because the application has not yet been invoked
call.close(
Status.INTERNAL.withDescription(TOO_MANY_REQUESTS),
new Metadata());
canInvoke = false;
return;
}
// We delay calling method.invoke() until onHalfClose() to make sure the client
// half-closes.
this.request = request;
}
@Override
public void onHalfClose() {
if (!canInvoke) {
return;
}
if (request == null) {
// Safe to close the call, because the application has not yet been invoked
call.close(
Status.INTERNAL.withDescription(MISSING_REQUEST),
new Metadata());
return;
}
method.invoke(request, responseObserver);
responseObserver.freeze();
if (call.isReady()) {
// Since we are calling invoke in halfClose we have missed the onReady
// event from the transport so recover it here.
onReady();
}
}
@Override
public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) {
responseObserver.onCancelHandler.run();
}
}
@Override
public void onReady() {
if (responseObserver.onReadyHandler != null) {
responseObserver.onReadyHandler.run();
}
}
}
}
/**
* Creates a {@link ServerCallHandler} for a unary request call method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryRequestCall(
UnaryRequestMethod<ReqT, RespT> method) {
return new UnaryServerCallHandler<ReqT, RespT>(method);
}
private static final class StreamingServerCallHandler<ReqT, RespT>
implements ServerCallHandler<ReqT, RespT> {
private final StreamingRequestMethod<ReqT, RespT> method;
// Non private to avoid synthetic class
StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method) {
this.method = method;
}
@Override
public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<ReqT, RespT>(call);
StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
responseObserver.freeze();
if (responseObserver.autoFlowControlEnabled) {
call.request(1);
}
return new StreamingServerCallListener(requestObserver, responseObserver, call);
}
private final class StreamingServerCallListener extends ServerCall.Listener<ReqT> {
private final StreamObserver<ReqT> requestObserver;
private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
private final ServerCall<ReqT, RespT> call;
private boolean halfClosed = false;
// Non private to avoid synthetic class
StreamingServerCallListener(
StreamObserver<ReqT> requestObserver,
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
ServerCall<ReqT, RespT> call) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
this.call = call;
}
@Override
public void onMessage(ReqT request) {
requestObserver.onNext(request);
// Request delivery of the next inbound message.
if (responseObserver.autoFlowControlEnabled) {
call.request(1);
}
}
@Override
public void onHalfClose() {
halfClosed = true;
requestObserver.onCompleted();
}
@Override
public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) {
responseObserver.onCancelHandler.run();
}
if (!halfClosed) {
requestObserver.onError(
Status.CANCELLED
.withDescription("cancelled before receiving half close")
.asRuntimeException());
}
}
@Override
public void onReady() {
if (responseObserver.onReadyHandler != null) {
responseObserver.onReadyHandler.run();
}
}
}
}
/**
* Creates a {@link ServerCallHandler} for a streaming request call method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncStreamingRequestCall(
StreamingRequestMethod<ReqT, RespT> method) {
return new StreamingServerCallHandler<ReqT, RespT>(method);
}
private interface UnaryRequestMethod<ReqT, RespT> {
void invoke(ReqT request, StreamObserver<RespT> responseObserver);
}
private interface StreamingRequestMethod<ReqT, RespT> {
StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
}
private static final class ServerCallStreamObserverImpl<ReqT, RespT>
extends ServerCallStreamObserver<RespT> {
final ServerCall<ReqT, RespT> call;
volatile boolean cancelled;
private boolean frozen;
private boolean autoFlowControlEnabled = true;
private boolean sentHeaders;
private Runnable onReadyHandler;
private Runnable onCancelHandler;
// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) {
this.call = call;
}
private void freeze() {
this.frozen = true;
}
@Override
public void setMessageCompression(boolean enable) {
call.setMessageCompression(enable);
}
@Override
public void setCompression(String compression) {
call.setCompression(compression);
}
@Override
public void onNext(RespT response) {
if (cancelled) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
}
if (!sentHeaders) {
call.sendHeaders(new Metadata());
sentHeaders = true;
}
call.sendMessage(response);
}
@Override
public void onError(Throwable t) {
Metadata metadata = Status.trailersFromThrowable(t);
if (metadata == null) {
metadata = new Metadata();
}
call.close(Status.fromThrowable(t), metadata);
}
@Override
public void onCompleted() {
if (cancelled) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
} else {
call.close(Status.OK, new Metadata());
}
}
@Override
public boolean isReady() {
return call.isReady();
}
@Override
public void setOnReadyHandler(Runnable r) {
checkState(!frozen, "Cannot alter onReadyHandler after initialization");
this.onReadyHandler = r;
}
@Override
public boolean isCancelled() {
return call.isCancelled();
}
@Override
public void setOnCancelHandler(Runnable onCancelHandler) {
checkState(!frozen, "Cannot alter onCancelHandler after initialization");
this.onCancelHandler = onCancelHandler;
}
@Override
public void disableAutoInboundFlowControl() {
checkState(!frozen, "Cannot disable auto flow control after initialization");
autoFlowControlEnabled = false;
}
@Override
public void request(int count) {
call.request(count);
}
}
/**
* Sets unimplemented status for method on given response stream for unary call.
*
* @param methodDescriptor of method for which error will be thrown.
* @param responseObserver on which error will be set.
*/
public static void asyncUnimplementedUnaryCall(
MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
checkNotNull(methodDescriptor, "methodDescriptor");
checkNotNull(responseObserver, "responseObserver");
responseObserver.onError(Status.UNIMPLEMENTED
.withDescription(String.format("Method %s is unimplemented",
methodDescriptor.getFullMethodName()))
.asRuntimeException());
}
/**
* Sets unimplemented status for streaming call.
*
* @param methodDescriptor of method for which error will be thrown.
* @param responseObserver on which error will be set.
*/
public static <T> StreamObserver<T> asyncUnimplementedStreamingCall(
MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
// NB: For streaming call we want to do the same as for unary call. Fail-fast by setting error
// on responseObserver and then return no-op observer.
asyncUnimplementedUnaryCall(methodDescriptor, responseObserver);
return new NoopStreamObserver<T>();
}
/**
* No-op implementation of StreamObserver. Used in abstract stubs for default implementations of
* methods which throws UNIMPLEMENTED error and tests.
*/
static class NoopStreamObserver<V> implements StreamObserver<V> {
@Override
public void onNext(V value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
}
}