blob: 8d3337057c600e1f084a9503ce78fbbfecd7da5e [file] [log] [blame]
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.interceptors;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
import io.grpc.Internal;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.net.SocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/** A logging interceptor for {@code LoggingServerProvider}. */
@Internal
public final class InternalLoggingServerInterceptor implements ServerInterceptor {
private static final Logger logger = Logger
.getLogger(InternalLoggingServerInterceptor.class.getName());
private final LogHelper helper;
public interface Factory {
ServerInterceptor create();
}
public static class FactoryImpl implements Factory {
private final Sink sink;
private final LogHelper helper;
static LogHelper createLogHelper(Sink sink, TimeProvider provider) {
return new LogHelper(sink, provider);
}
public FactoryImpl(Sink sink) {
this.sink = sink;
this.helper = createLogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
}
@Override
public ServerInterceptor create() {
return new InternalLoggingServerInterceptor(helper);
}
/**
* Closes the sink instance.
*/
public void close() {
if (sink != null) {
sink.close();
}
}
}
private InternalLoggingServerInterceptor(LogHelper helper) {
this.helper = helper;
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
final AtomicLong seq = new AtomicLong(1);
final String rpcId = UUID.randomUUID().toString();
final String authority = call.getAuthority();
final String serviceName = call.getMethodDescriptor().getServiceName();
final String methodName = call.getMethodDescriptor().getBareMethodName();
final SocketAddress peerAddress = LogHelper.getPeerAddress(call.getAttributes());
Deadline deadline = Context.current().getDeadline();
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
// TODO (dnvindhya): implement isMethodToBeLogged() to check for methods to be logged
// according to config. Until then always return true.
if (!helper.isMethodToBeLogged(call.getMethodDescriptor().getFullMethodName())) {
return next.startCall(call, headers);
}
// Event: EventType.GRPC_CALL_REQUEST_HEADER
try {
helper.logRequestHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
timeout,
headers,
EventLogger.LOGGER_SERVER,
rpcId,
peerAddress);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
// exceptions to callers which will lead to RPC getting aborted.
// Expected exceptions to be caught:
// 1. IllegalArgumentException
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}
ServerCall<ReqT, RespT> wrapperCall =
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
// Event: EventType.GRPC_CALL_RESPONSE_HEADER
try {
helper.logResponseHeader(
seq.getAndIncrement(),
serviceName,
methodName,
headers,
EventLogger.LOGGER_SERVER,
rpcId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
super.sendHeaders(headers);
}
@Override
public void sendMessage(RespT message) {
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
super.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
// Event: EventType.GRPC_CALL_TRAILER
try {
helper.logTrailer(
seq.getAndIncrement(),
serviceName,
methodName,
status,
trailers,
EventLogger.LOGGER_SERVER,
rpcId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
super.close(status, trailers);
}
};
ServerCall.Listener<ReqT> listener = next.startCall(wrapperCall, headers);
return new SimpleForwardingServerCallListener<ReqT>(listener) {
@Override
public void onMessage(ReqT message) {
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
super.onMessage(message);
}
@Override
public void onHalfClose() {
// Event: EventType.GRPC_CALL_HALF_CLOSE
try {
helper.logHalfClose(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
super.onHalfClose();
}
@Override
public void onCancel() {
// Event: EventType.GRPC_CALL_CANCEL
try {
helper.logCancel(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
super.onCancel();
}
};
}
}