blob: d454c84c9846e7224a7a442045a67f832e1b8035 [file] [log] [blame]
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.interceptors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Charsets;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Timestamps;
import io.grpc.Attributes;
import io.grpc.Deadline;
import io.grpc.Grpc;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.Address;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Helper class for GCP observability logging.
*/
class LogHelper {
private static final Logger logger = Logger.getLogger(LogHelper.class.getName());
// TODO(dnvindhya): Define it in one places(TBD) to make it easily accessible from everywhere
static final Metadata.Key<byte[]> STATUS_DETAILS_KEY =
Metadata.Key.of(
"grpc-status-details-bin",
Metadata.BINARY_BYTE_MARSHALLER);
private final Sink sink;
private final TimeProvider timeProvider;
LogHelper(Sink sink, TimeProvider timeProvider) {
this.sink = sink;
this.timeProvider = timeProvider;
}
/**
* Logs the request header. Binary logging equivalent of logClientHeader.
*/
void logRequestHeader(
long seqId,
String serviceName,
String methodName,
String authority,
@Nullable Duration timeout,
Metadata metadata,
GrpcLogRecord.EventLogger eventLogger,
String rpcId,
// null on client side
@Nullable SocketAddress peerAddress) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
checkArgument(
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_SERVER,
"peerAddress can only be specified by server");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = createMetadataProto(metadata);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setAuthority(authority)
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setRpcId(rpcId);
if (timeout != null) {
logEntryBuilder.setTimeout(timeout);
}
if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress));
}
sink.write(logEntryBuilder.build());
}
/**
* Logs the reponse header. Binary logging equivalent of logServerHeader.
*/
void logResponseHeader(
long seqId,
String serviceName,
String methodName,
Metadata metadata,
GrpcLogRecord.EventLogger eventLogger,
String rpcId,
@Nullable SocketAddress peerAddress) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
// Logging peer address only on the first incoming event. On server side, peer address will
// of logging request header
checkArgument(
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT,
"peerAddress can only be specified for client");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = createMetadataProto(metadata);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_RESPONSE_HEADER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setRpcId(rpcId);
if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress));
}
sink.write(logEntryBuilder.build());
}
/**
* Logs the server trailer.
*/
void logTrailer(
long seqId,
String serviceName,
String methodName,
Status status,
Metadata metadata,
GrpcLogRecord.EventLogger eventLogger,
String rpcId,
@Nullable SocketAddress peerAddress) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(status, "status");
checkNotNull(rpcId, "rpcId");
checkArgument(
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT,
"peerAddress can only be specified for client");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = createMetadataProto(metadata);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_TRAILER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setStatusCode(status.getCode().value())
.setRpcId(rpcId);
String statusDescription = status.getDescription();
if (statusDescription != null) {
logEntryBuilder.setStatusMessage(statusDescription);
}
byte[] statusDetailBytes = metadata.get(STATUS_DETAILS_KEY);
if (statusDetailBytes != null) {
logEntryBuilder.setStatusDetails(ByteString.copyFrom(statusDetailBytes));
}
if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress));
}
sink.write(logEntryBuilder.build());
}
/**
* Logs the RPC message.
*/
<T> void logRpcMessage(
long seqId,
String serviceName,
String methodName,
EventType eventType,
T message,
EventLogger eventLogger,
String rpcId) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
checkArgument(
eventType == EventType.GRPC_CALL_REQUEST_MESSAGE
|| eventType == EventType.GRPC_CALL_RESPONSE_MESSAGE,
"event type must correspond to client message or server message");
checkNotNull(message, "message");
// TODO(dnvindhya): Implement conversion of generics to ByteString
// Following is a temporary workaround to log if message is of following types :
// 1. com.google.protobuf.Message
// 2. byte[]
byte[] messageBytesArray = null;
if (message instanceof com.google.protobuf.Message) {
messageBytesArray = ((com.google.protobuf.Message)message).toByteArray();
} else if (message instanceof byte[]) {
messageBytesArray = (byte[]) message;
} else {
logger.log(Level.WARNING, "message is of UNKNOWN type, message and payload_size fields"
+ "of GrpcLogRecord proto will not be logged");
}
PayloadBuilder<ByteString> pair = null;
if (messageBytesArray != null) {
pair = createMesageProto(messageBytesArray);
}
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(eventType)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
if (pair != null && pair.size != 0) {
logEntryBuilder.setPayloadSize(pair.size);
}
if (pair != null && pair.payload != null) {
logEntryBuilder.setMessage(pair.payload);
}
sink.write(logEntryBuilder.build());
}
/**
* Logs half close.
*/
void logHalfClose(
long seqId,
String serviceName,
String methodName,
GrpcLogRecord.EventLogger eventLogger,
String rpcId) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_HALF_CLOSE)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
sink.write(logEntryBuilder.build());
}
/**
* Logs cancellation.
*/
void logCancel(
long seqId,
String serviceName,
String methodName,
GrpcLogRecord.EventLogger eventLogger,
String rpcId) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_CANCEL)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
sink.write(logEntryBuilder.build());
}
GrpcLogRecord.Builder createTimestamp() {
long nanos = timeProvider.currentTimeNanos();
return GrpcLogRecord.newBuilder().setTimestamp(Timestamps.fromNanos(nanos));
}
static final class PayloadBuilder<T> {
T payload;
int size;
private PayloadBuilder(T payload, int size) {
this.payload = payload;
this.size = size;
}
}
// TODO(dnvindhya): Create a unit test for the metadata conversion
static PayloadBuilder<GrpcLogRecord.Metadata.Builder> createMetadataProto(Metadata metadata) {
checkNotNull(metadata, "metadata");
GrpcLogRecord.Metadata.Builder metadataBuilder = GrpcLogRecord.Metadata.newBuilder();
// This code is tightly coupled with io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata's
// implementation
byte[][] serialized = InternalMetadata.serialize(metadata);
int totalMetadataBytes = 0;
if (serialized != null) {
int singleMetadataEntryBytes = 0;
// Calculate bytes for each GrpcLogRecord.Metadata.MetadataEntry
for (int i = 0; i < serialized.length; i += 2) {
String key = new String(serialized[i], Charsets.UTF_8);
byte[] value = serialized[i + 1];
singleMetadataEntryBytes = totalMetadataBytes + key.length() + value.length;
metadataBuilder.addEntryBuilder()
.setKey(key)
.setValue(ByteString.copyFrom(value));
totalMetadataBytes = singleMetadataEntryBytes;
}
}
return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes);
}
static PayloadBuilder<ByteString> createMesageProto(byte[] message) {
int messageLength = message.length;
ByteString messageData =
ByteString.copyFrom(message, 0, messageLength);
return new PayloadBuilder<ByteString>(messageData, messageLength);
}
static Address socketAddressToProto(SocketAddress address) {
checkNotNull(address, "address");
Address.Builder builder = Address.newBuilder();
if (address instanceof InetSocketAddress) {
InetAddress inetAddress = ((InetSocketAddress) address).getAddress();
if (inetAddress instanceof Inet4Address) {
builder.setType(Address.Type.TYPE_IPV4)
.setAddress(InetAddressUtil.toAddrString(inetAddress));
} else if (inetAddress instanceof Inet6Address) {
builder.setType(Address.Type.TYPE_IPV6)
.setAddress(InetAddressUtil.toAddrString(inetAddress));
} else {
logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address);
builder.setAddress(address.toString());
}
builder.setIpPort(((InetSocketAddress) address).getPort());
} else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) {
// To avoid a compile time dependency on grpc-netty, we check against the
// runtime class name.
builder.setType(Address.Type.TYPE_UNIX)
.setAddress(address.toString());
} else {
builder.setType(Address.Type.TYPE_UNKNOWN).setAddress(address.toString());
}
return builder.build();
}
/**
* Retrieves socket address.
*/
static SocketAddress getPeerAddress(Attributes streamAttributes) {
return streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
}
/**
* Checks deadline for timeout.
*/
static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
if (deadline0 == null) {
return deadline1;
}
if (deadline1 == null) {
return deadline0;
}
return deadline0.minimum(deadline1);
}
// TODO (dnvindhya) : Implement service and method name filtering
// Add unit tests for the method as part of filtering implementation
boolean isMethodToBeLogged(String fullMethodName) {
return true;
}
}