blob: aac13a5f87f8764af43dc40344c49491d59aa5d3 [file] [log] [blame]
* Copyright 2014 The gRPC Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package io.grpc.stub;
import static;
import static;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.Status;
* Utility functions for adapting {@link ServerCallHandler}s to application service implementation,
* meant to be used by the generated code.
public final class ServerCalls {
static final String TOO_MANY_REQUESTS = "Too many requests";
static final String MISSING_REQUEST = "Half-closed without a request";
private ServerCalls() {
* Creates a {@link ServerCallHandler} for a unary call method of the service.
* @param method an adaptor to the actual method on the service implementation.
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
UnaryMethod<ReqT, RespT> method) {
return asyncUnaryRequestCall(method);
* Creates a {@link ServerCallHandler} for a server streaming method of the service.
* @param method an adaptor to the actual method on the service implementation.
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall(
ServerStreamingMethod<ReqT, RespT> method) {
return asyncUnaryRequestCall(method);
* Creates a {@link ServerCallHandler} for a client streaming method of the service.
* @param method an adaptor to the actual method on the service implementation.
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall(
ClientStreamingMethod<ReqT, RespT> method) {
return asyncStreamingRequestCall(method);
* Creates a {@link ServerCallHandler} for a bidi streaming method of the service.
* @param method an adaptor to the actual method on the service implementation.
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall(
BidiStreamingMethod<ReqT, RespT> method) {
return asyncStreamingRequestCall(method);
* Adaptor to a unary call method.
public interface UnaryMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {}
* Adaptor to a server streaming method.
public interface ServerStreamingMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {}
* Adaptor to a client streaming method.
public interface ClientStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {}
* Adaptor to a bidirectional streaming method.
public interface BidiStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {}
private static final class UnaryServerCallHandler<ReqT, RespT>
implements ServerCallHandler<ReqT, RespT> {
private final UnaryRequestMethod<ReqT, RespT> method;
// Non private to avoid synthetic class
UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method) {
this.method = method;
public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
"asyncUnaryRequestCall is only for clientSendsOneMessage methods");
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<ReqT, RespT>(call);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, ServerCall will catch it. Note that disabling auto
// inbound flow control has no effect on unary calls.
return new UnaryServerCallListener(responseObserver, call);
private final class UnaryServerCallListener extends ServerCall.Listener<ReqT> {
private final ServerCall<ReqT, RespT> call;
private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
private boolean canInvoke = true;
private ReqT request;
// Non private to avoid synthetic class
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
ServerCall<ReqT, RespT> call) { = call;
this.responseObserver = responseObserver;
public void onMessage(ReqT request) {
if (this.request != null) {
// Safe to close the call, because the application has not yet been invoked
new Metadata());
canInvoke = false;
// We delay calling method.invoke() until onHalfClose() to make sure the client
// half-closes.
this.request = request;
public void onHalfClose() {
if (!canInvoke) {
if (request == null) {
// Safe to close the call, because the application has not yet been invoked
new Metadata());
method.invoke(request, responseObserver);
if (call.isReady()) {
// Since we are calling invoke in halfClose we have missed the onReady
// event from the transport so recover it here.
public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) {;
public void onReady() {
if (responseObserver.onReadyHandler != null) {;
* Creates a {@link ServerCallHandler} for a unary request call method of the service.
* @param method an adaptor to the actual method on the service implementation.
private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryRequestCall(
UnaryRequestMethod<ReqT, RespT> method) {
return new UnaryServerCallHandler<ReqT, RespT>(method);
private static final class StreamingServerCallHandler<ReqT, RespT>
implements ServerCallHandler<ReqT, RespT> {
private final StreamingRequestMethod<ReqT, RespT> method;
// Non private to avoid synthetic class
StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method) {
this.method = method;
public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<ReqT, RespT>(call);
StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
if (responseObserver.autoFlowControlEnabled) {
return new StreamingServerCallListener(requestObserver, responseObserver, call);
private final class StreamingServerCallListener extends ServerCall.Listener<ReqT> {
private final StreamObserver<ReqT> requestObserver;
private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
private final ServerCall<ReqT, RespT> call;
private boolean halfClosed = false;
// Non private to avoid synthetic class
StreamObserver<ReqT> requestObserver,
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
ServerCall<ReqT, RespT> call) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver; = call;
public void onMessage(ReqT request) {
// Request delivery of the next inbound message.
if (responseObserver.autoFlowControlEnabled) {
public void onHalfClose() {
halfClosed = true;
public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) {;
if (!halfClosed) {
.withDescription("cancelled before receiving half close")
public void onReady() {
if (responseObserver.onReadyHandler != null) {;
* Creates a {@link ServerCallHandler} for a streaming request call method of the service.
* @param method an adaptor to the actual method on the service implementation.
private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncStreamingRequestCall(
StreamingRequestMethod<ReqT, RespT> method) {
return new StreamingServerCallHandler<ReqT, RespT>(method);
private interface UnaryRequestMethod<ReqT, RespT> {
void invoke(ReqT request, StreamObserver<RespT> responseObserver);
private interface StreamingRequestMethod<ReqT, RespT> {
StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
private static final class ServerCallStreamObserverImpl<ReqT, RespT>
extends ServerCallStreamObserver<RespT> {
final ServerCall<ReqT, RespT> call;
volatile boolean cancelled;
private boolean frozen;
private boolean autoFlowControlEnabled = true;
private boolean sentHeaders;
private Runnable onReadyHandler;
private Runnable onCancelHandler;
// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) { = call;
private void freeze() {
this.frozen = true;
public void setMessageCompression(boolean enable) {
public void setCompression(String compression) {
public void onNext(RespT response) {
if (cancelled) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
if (!sentHeaders) {
call.sendHeaders(new Metadata());
sentHeaders = true;
public void onError(Throwable t) {
Metadata metadata = Status.trailersFromThrowable(t);
if (metadata == null) {
metadata = new Metadata();
call.close(Status.fromThrowable(t), metadata);
public void onCompleted() {
if (cancelled) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
} else {
call.close(Status.OK, new Metadata());
public boolean isReady() {
return call.isReady();
public void setOnReadyHandler(Runnable r) {
checkState(!frozen, "Cannot alter onReadyHandler after initialization");
this.onReadyHandler = r;
public boolean isCancelled() {
return call.isCancelled();
public void setOnCancelHandler(Runnable onCancelHandler) {
checkState(!frozen, "Cannot alter onCancelHandler after initialization");
this.onCancelHandler = onCancelHandler;
public void disableAutoInboundFlowControl() {
checkState(!frozen, "Cannot disable auto flow control after initialization");
autoFlowControlEnabled = false;
public void request(int count) {
* Sets unimplemented status for method on given response stream for unary call.
* @param methodDescriptor of method for which error will be thrown.
* @param responseObserver on which error will be set.
public static void asyncUnimplementedUnaryCall(
MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
checkNotNull(methodDescriptor, "methodDescriptor");
checkNotNull(responseObserver, "responseObserver");
.withDescription(String.format("Method %s is unimplemented",
* Sets unimplemented status for streaming call.
* @param methodDescriptor of method for which error will be thrown.
* @param responseObserver on which error will be set.
public static <T> StreamObserver<T> asyncUnimplementedStreamingCall(
MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
// NB: For streaming call we want to do the same as for unary call. Fail-fast by setting error
// on responseObserver and then return no-op observer.
asyncUnimplementedUnaryCall(methodDescriptor, responseObserver);
return new NoopStreamObserver<T>();
* No-op implementation of StreamObserver. Used in abstract stubs for default implementations of
* methods which throws UNIMPLEMENTED error and tests.
static class NoopStreamObserver<V> implements StreamObserver<V> {
public void onNext(V value) {
public void onError(Throwable t) {
public void onCompleted() {