blob: 4afa08bc7163e36ebb8d0fa61830ee459da98728 [file] [log] [blame]
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.census;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.grpc.StreamTracer;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.BlankSpan;
import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.MessageEvent;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Provides factories for {@link StreamTracer} that records traces to Census.
*
* <p>On the client-side, a factory is created for each call, because ClientCall starts earlier than
* the ClientStream, and in some cases may even not create a ClientStream at all. Therefore, it's
* the factory that reports the summary to Census.
*
* <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
* starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call and
* it's the tracer that reports the summary to Census.
*/
final class CensusTracingModule {
private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName());
@Nullable
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
@Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
/*
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK
* reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
try {
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
tmpStreamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpCallEndedUpdater = null;
tmpStreamClosedUpdater = null;
}
callEndedUpdater = tmpCallEndedUpdater;
streamClosedUpdater = tmpStreamClosedUpdater;
}
private final Tracer censusTracer;
@VisibleForTesting
final Metadata.Key<SpanContext> tracingHeader;
private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
CensusTracingModule(
Tracer censusTracer, final BinaryFormat censusPropagationBinaryFormat) {
this.censusTracer = checkNotNull(censusTracer, "censusTracer");
checkNotNull(censusPropagationBinaryFormat, "censusPropagationBinaryFormat");
this.tracingHeader =
Metadata.Key.of("grpc-trace-bin", new Metadata.BinaryMarshaller<SpanContext>() {
@Override
public byte[] toBytes(SpanContext context) {
return censusPropagationBinaryFormat.toByteArray(context);
}
@Override
public SpanContext parseBytes(byte[] serialized) {
try {
return censusPropagationBinaryFormat.fromByteArray(serialized);
} catch (Exception e) {
logger.log(Level.FINE, "Failed to parse tracing header", e);
return SpanContext.INVALID;
}
}
});
}
/**
* Creates a {@link CallAttemptsTracerFactory} for a new call.
*/
@VisibleForTesting
CallAttemptsTracerFactory newClientCallTracer(
@Nullable Span clientSpan, MethodDescriptor<?, ?> method) {
return new CallAttemptsTracerFactory(clientSpan, method);
}
/**
* Returns the server tracer factory.
*/
ServerStreamTracer.Factory getServerTracerFactory() {
return serverTracerFactory;
}
/**
* Returns the client interceptor that facilitates Census-based stats reporting.
*/
ClientInterceptor getClientInterceptor() {
return clientInterceptor;
}
@VisibleForTesting
static Status convertStatus(io.grpc.Status grpcStatus) {
Status status;
switch (grpcStatus.getCode()) {
case OK:
status = Status.OK;
break;
case CANCELLED:
status = Status.CANCELLED;
break;
case UNKNOWN:
status = Status.UNKNOWN;
break;
case INVALID_ARGUMENT:
status = Status.INVALID_ARGUMENT;
break;
case DEADLINE_EXCEEDED:
status = Status.DEADLINE_EXCEEDED;
break;
case NOT_FOUND:
status = Status.NOT_FOUND;
break;
case ALREADY_EXISTS:
status = Status.ALREADY_EXISTS;
break;
case PERMISSION_DENIED:
status = Status.PERMISSION_DENIED;
break;
case RESOURCE_EXHAUSTED:
status = Status.RESOURCE_EXHAUSTED;
break;
case FAILED_PRECONDITION:
status = Status.FAILED_PRECONDITION;
break;
case ABORTED:
status = Status.ABORTED;
break;
case OUT_OF_RANGE:
status = Status.OUT_OF_RANGE;
break;
case UNIMPLEMENTED:
status = Status.UNIMPLEMENTED;
break;
case INTERNAL:
status = Status.INTERNAL;
break;
case UNAVAILABLE:
status = Status.UNAVAILABLE;
break;
case DATA_LOSS:
status = Status.DATA_LOSS;
break;
case UNAUTHENTICATED:
status = Status.UNAUTHENTICATED;
break;
default:
throw new AssertionError("Unhandled status code " + grpcStatus.getCode());
}
if (grpcStatus.getDescription() != null) {
status = status.withDescription(grpcStatus.getDescription());
}
return status;
}
private static EndSpanOptions createEndSpanOptions(
io.grpc.Status status, boolean sampledToLocalTracing) {
return EndSpanOptions.builder()
.setStatus(convertStatus(status))
.setSampleToLocalSpanStore(sampledToLocalTracing)
.build();
}
private void recordMessageEvent(
Span span, MessageEvent.Type type,
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
MessageEvent.Builder eventBuilder = MessageEvent.builder(type, seqNo);
if (optionalUncompressedSize != -1) {
eventBuilder.setUncompressedMessageSize(optionalUncompressedSize);
}
if (optionalWireSize != -1) {
eventBuilder.setCompressedMessageSize(optionalWireSize);
}
span.addMessageEvent(eventBuilder.build());
}
private void recordAnnotation(
Span span, MessageEvent.Type type, int seqNo, boolean isCompressed, long size) {
String messageType = isCompressed ? "compressed" : "uncompressed";
Map<String, AttributeValue> attributes = new HashMap<>();
attributes.put("id", AttributeValue.longAttributeValue(seqNo));
attributes.put("type", AttributeValue.stringAttributeValue(messageType));
String messageDirection = type == MessageEvent.Type.SENT ? "↗ " : "↘ ";
String inlineDescription =
messageDirection + size + " bytes " + type.name().toLowerCase(Locale.US);
span.addAnnotation(inlineDescription, attributes);
}
@VisibleForTesting
final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
volatile int callEnded;
private final boolean isSampledToLocalTracing;
private final Span span;
private final String fullMethodName;
CallAttemptsTracerFactory(@Nullable Span clientSpan, MethodDescriptor<?, ?> method) {
checkNotNull(method, "method");
this.isSampledToLocalTracing = method.isSampledToLocalTracing();
this.fullMethodName = method.getFullMethodName();
this.span = clientSpan;
}
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
Span attemptSpan = censusTracer
.spanBuilderWithExplicitParent(
"Attempt." + fullMethodName.replace('/', '.'),
span)
.setRecordEvents(true)
.startSpan();
attemptSpan.putAttribute(
"previous-rpc-attempts", AttributeValue.longAttributeValue(info.getPreviousAttempts()));
attemptSpan.putAttribute(
"transparent-retry", AttributeValue.booleanAttributeValue(info.isTransparentRetry()));
if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED)) {
span.addAnnotation("Delayed name resolution complete");
}
return new ClientTracer(attemptSpan, span, tracingHeader, isSampledToLocalTracing);
}
/**
* Record a finished call and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
void callEnded(io.grpc.Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (callEnded != 0) {
return;
}
callEnded = 1;
}
span.end(createEndSpanOptions(status, isSampledToLocalTracing));
}
}
private final class ClientTracer extends ClientStreamTracer {
private final Span span;
private final Span parentSpan;
final Metadata.Key<SpanContext> tracingHeader;
final boolean isSampledToLocalTracing;
volatile int seqNo;
boolean isPendingStream;
ClientTracer(
Span span, Span parentSpan, Metadata.Key<SpanContext> tracingHeader,
boolean isSampledToLocalTracing) {
this.span = checkNotNull(span, "span");
this.parentSpan = checkNotNull(parentSpan, "parent span");
this.tracingHeader = tracingHeader;
this.isSampledToLocalTracing = isSampledToLocalTracing;
}
@Override
public void streamCreated(Attributes transportAtts, Metadata headers) {
if (span != BlankSpan.INSTANCE) {
headers.discardAll(tracingHeader);
headers.put(tracingHeader, span.getContext());
}
if (isPendingStream) {
span.addAnnotation("Delayed LB pick complete");
}
}
@Override
public void createPendingStream() {
isPendingStream = true;
}
@Override
public void outboundMessageSent(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordMessageEvent(
span, MessageEvent.Type.SENT, seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void inboundMessageRead(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordAnnotation(
span, MessageEvent.Type.RECEIVED, seqNo, true, optionalWireSize);
}
@Override
public void inboundMessage(int seqNo) {
this.seqNo = seqNo;
}
@Override
public void inboundUncompressedSize(long bytes) {
recordAnnotation(
parentSpan, MessageEvent.Type.RECEIVED, seqNo, false, bytes);
}
@Override
public void streamClosed(io.grpc.Status status) {
span.end(createEndSpanOptions(status, isSampledToLocalTracing));
}
}
private final class ServerTracer extends ServerStreamTracer {
private final Span span;
volatile boolean isSampledToLocalTracing;
volatile int streamClosed;
private int seqNo;
ServerTracer(String fullMethodName, @Nullable SpanContext remoteSpan) {
checkNotNull(fullMethodName, "fullMethodName");
this.span =
censusTracer
.spanBuilderWithRemoteParent(
generateTraceSpanName(true, fullMethodName),
remoteSpan)
.setRecordEvents(true)
.startSpan();
}
@Override
public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
}
/**
* Record a finished stream and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
@Override
public void streamClosed(io.grpc.Status status) {
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (streamClosed != 0) {
return;
}
streamClosed = 1;
}
span.end(createEndSpanOptions(status, isSampledToLocalTracing));
}
/*
TODO(dnvindhya): Replace deprecated ContextUtils usage with ContextHandleUtils to interact
with io.grpc.Context as described in {@link io.opencensus.trace.unsafeContextUtils} to remove
SuppressWarnings annotation.
*/
@SuppressWarnings("deprecation")
@Override
public Context filterContext(Context context) {
// Access directly the unsafe trace API to create the new Context. This is a safe usage
// because gRPC always creates a new Context for each of the server calls and does not
// inherit from the parent Context.
return io.opencensus.trace.unsafe.ContextUtils.withValue(context, span);
}
@Override
public void outboundMessageSent(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordMessageEvent(
span, MessageEvent.Type.SENT, seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void inboundMessageRead(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordAnnotation(
span, MessageEvent.Type.RECEIVED, seqNo, true, optionalWireSize);
}
@Override
public void inboundMessage(int seqNo) {
this.seqNo = seqNo;
}
@Override
public void inboundUncompressedSize(long bytes) {
recordAnnotation(
span, MessageEvent.Type.RECEIVED, seqNo, false, bytes);
}
}
@VisibleForTesting
final class ServerTracerFactory extends ServerStreamTracer.Factory {
@SuppressWarnings("ReferenceEquality")
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
SpanContext remoteSpan = headers.get(tracingHeader);
if (remoteSpan == SpanContext.INVALID) {
remoteSpan = null;
}
return new ServerTracer(fullMethodName, remoteSpan);
}
}
@VisibleForTesting
final class TracingClientInterceptor implements ClientInterceptor {
@SuppressWarnings("deprecation")
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tracing context from the current Context.
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
Span parentSpan = io.opencensus.trace.unsafe.ContextUtils.getValue(Context.current());
Span clientSpan = censusTracer
.spanBuilderWithExplicitParent(
generateTraceSpanName(false, method.getFullMethodName()),
parentSpan)
.setRecordEvents(true)
.startSpan();
final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory)
.withOption(CLIENT_TRACE_SPAN_CONTEXT_KEY, clientSpan.getContext()));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(io.grpc.Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
}
/**
* Convert a full method name to a tracing span name.
*
* @param isServer {@code false} if the span is on the client-side, {@code true} if on the
* server-side
* @param fullMethodName the method name as returned by
* {@link MethodDescriptor#getFullMethodName}.
*/
@VisibleForTesting
static String generateTraceSpanName(boolean isServer, String fullMethodName) {
String prefix = isServer ? "Recv" : "Sent";
return prefix + "." + fullMethodName.replace('/', '.');
}
}