blob: 7998021d80ad556298a343695c3ee7433c0249b6 [file] [log] [blame]
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.interceptors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.interceptors.LogHelper.PayloadBuilder;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.Address;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel;
import io.grpc.observabilitylog.v1.GrpcLogRecord.MetadataEntry;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link LogHelper}.
*/
@RunWith(JUnit4.class)
public class LogHelperTest {
private static final Charset US_ASCII = Charset.forName("US-ASCII");
public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller();
private static final String DATA_A = "aaaaaaaaa";
private static final String DATA_B = "bbbbbbbbb";
private static final String DATA_C = "ccccccccc";
private static final Metadata.Key<String> KEY_A =
Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> KEY_B =
Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> KEY_C =
Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER);
private static final MetadataEntry ENTRY_A =
MetadataEntry
.newBuilder()
.setKey(KEY_A.name())
.setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII)))
.build();
private static final MetadataEntry ENTRY_B =
MetadataEntry
.newBuilder()
.setKey(KEY_B.name())
.setValue(ByteString.copyFrom(DATA_B.getBytes(US_ASCII)))
.build();
private static final MetadataEntry ENTRY_C =
MetadataEntry
.newBuilder()
.setKey(KEY_C.name())
.setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII)))
.build();
private final Metadata nonEmptyMetadata = new Metadata();
private final int nonEmptyMetadataSize = 30;
private final Sink sink = mock(GcpLogSink.class);
private final Timestamp timestamp
= Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build();
private final TimeProvider timeProvider = new TimeProvider() {
@Override
public long currentTimeNanos() {
return TimeUnit.SECONDS.toNanos(9876) + 54321;
}
};
private final LogHelper logHelper =
new LogHelper(
sink,
timeProvider);
@Before
public void setUp() throws Exception {
nonEmptyMetadata.put(KEY_A, DATA_A);
nonEmptyMetadata.put(KEY_B, DATA_B);
nonEmptyMetadata.put(KEY_C, DATA_C);
}
@Test
public void socketToProto_ipv4() throws Exception {
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
assertEquals(
Address
.newBuilder()
.setType(Address.Type.TYPE_IPV4)
.setAddress("127.0.0.1")
.setIpPort(12345)
.build(),
LogHelper.socketAddressToProto(socketAddress));
}
@Test
public void socketToProto_ipv6() throws Exception {
// this is a ipv6 link local address
InetAddress address = InetAddress.getByName("2001:db8:0:0:0:0:2:1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
assertEquals(
Address
.newBuilder()
.setType(Address.Type.TYPE_IPV6)
.setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required
.setIpPort(12345)
.build(),
LogHelper.socketAddressToProto(socketAddress));
}
@Test
public void socketToProto_unknown() throws Exception {
SocketAddress unknownSocket = new SocketAddress() {
@Override
public String toString() {
return "some-socket-address";
}
};
assertEquals(
Address.newBuilder()
.setType(Address.Type.TYPE_UNKNOWN)
.setAddress("some-socket-address")
.build(),
LogHelper.socketAddressToProto(unknownSocket));
}
@Test
public void metadataToProto_empty() throws Exception {
assertEquals(
GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setMetadata(
GrpcLogRecord.Metadata.getDefaultInstance())
.build(),
metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, new Metadata()));
}
@Test
public void metadataToProto() throws Exception {
assertEquals(
GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setMetadata(
GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.addEntry(ENTRY_B)
.addEntry(ENTRY_C)
.build())
.setPayloadSize(nonEmptyMetadataSize)
.build(),
metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata));
}
@Test
public void logRequestHeader() throws Exception {
long seqId = 1;
String serviceName = "service";
String methodName = "method";
String authority = "authority";
Duration timeout = Durations.fromMillis(1234);
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress peerAddress = new InetSocketAddress(address, port);
GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
builder.setAuthority(authority)
.setTimeout(timeout);
GrpcLogRecord base = builder.build();
// logged on client
{
logHelper.logRequestHeader(
seqId,
serviceName,
methodName,
authority,
timeout,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
verify(sink).write(base);
}
// logged on server
{
logHelper.logRequestHeader(
seqId,
serviceName,
methodName,
authority,
timeout,
nonEmptyMetadata,
EventLogger.LOGGER_SERVER,
rpcId,
peerAddress);
verify(sink).write(
base.toBuilder()
.setPeerAddress(LogHelper.socketAddressToProto(peerAddress))
.setEventLogger(EventLogger.LOGGER_SERVER)
.build());
}
// timeout is null
{
logHelper.logRequestHeader(
seqId,
serviceName,
methodName,
authority,
null,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
verify(sink).write(
base.toBuilder()
.clearTimeout()
.build());
}
// peerAddress is not null (error on client)
try {
logHelper.logRequestHeader(
seqId,
serviceName,
methodName,
authority,
timeout,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
fail();
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat().contains("peerAddress can only be specified by server");
}
}
@Test
public void logResponseHeader() throws Exception {
long seqId = 1;
String serviceName = "service";
String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress peerAddress = new InetSocketAddress(address, port);
GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_RESPONSE_HEADER)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress));
GrpcLogRecord base = builder.build();
// logged on client
{
logHelper.logResponseHeader(
seqId,
serviceName,
methodName,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
verify(sink).write(base);
}
// logged on server
{
logHelper.logResponseHeader(
seqId,
serviceName,
methodName,
nonEmptyMetadata,
EventLogger.LOGGER_SERVER,
rpcId,
null);
verify(sink).write(
base.toBuilder()
.setEventLogger(EventLogger.LOGGER_SERVER)
.clearPeerAddress()
.build());
}
// peerAddress is not null (error on server)
try {
logHelper.logResponseHeader(
seqId,
serviceName,
methodName,
nonEmptyMetadata,
EventLogger.LOGGER_SERVER,
rpcId,
peerAddress);
fail();
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat()
.contains("peerAddress can only be specified for client");
}
}
@Test
public void logTrailer() throws Exception {
long seqId = 1;
String serviceName = "service";
String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress peerAddress = new InetSocketAddress(address, port);
Status statusDescription = Status.INTERNAL.withDescription("test description");
GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_TRAILER)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setStatusCode(Status.INTERNAL.getCode().value())
.setStatusMessage("test description")
.setRpcId(rpcId);
builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress));
GrpcLogRecord base = builder.build();
// logged on client
{
logHelper.logTrailer(
seqId,
serviceName,
methodName,
statusDescription,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
verify(sink).write(base);
}
// logged on server
{
logHelper.logTrailer(
seqId,
serviceName,
methodName,
statusDescription,
nonEmptyMetadata,
EventLogger.LOGGER_SERVER,
rpcId,
null);
verify(sink).write(
base.toBuilder()
.clearPeerAddress()
.setEventLogger(EventLogger.LOGGER_SERVER)
.build());
}
// peer address is null
{
logHelper.logTrailer(
seqId,
serviceName,
methodName,
statusDescription,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
verify(sink).write(
base.toBuilder()
.clearPeerAddress()
.build());
}
// status description is null
{
logHelper.logTrailer(
seqId,
serviceName,
methodName,
statusDescription.getCode().toStatus(),
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
verify(sink).write(
base.toBuilder()
.clearStatusMessage()
.build());
}
}
@Test
public void logRpcMessage() throws Exception {
long seqId = 1;
String serviceName = "service";
String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
byte[] message = new byte[100];
GrpcLogRecord.Builder builder = messageTestHelper(message)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_REQUEST_MESSAGE)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
GrpcLogRecord base = builder.build();
// request message
{
logHelper.logRpcMessage(
seqId,
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
EventLogger.LOGGER_CLIENT,
rpcId);
verify(sink).write(base);
}
// response message, logged on client
{
logHelper.logRpcMessage(
seqId,
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
EventLogger.LOGGER_CLIENT,
rpcId);
verify(sink).write(
base.toBuilder()
.setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE)
.build());
}
// request message, logged on server
{
logHelper.logRpcMessage(
seqId,
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
EventLogger.LOGGER_SERVER,
rpcId);
verify(sink).write(
base.toBuilder()
.setEventLogger(EventLogger.LOGGER_SERVER)
.build());
}
// response message, logged on server
{
logHelper.logRpcMessage(
seqId,
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
EventLogger.LOGGER_SERVER,
rpcId);
verify(sink).write(
base.toBuilder()
.setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE)
.setEventLogger(EventLogger.LOGGER_SERVER)
.build());
}
}
@Test
public void getPeerAddressTest() throws Exception {
SocketAddress peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234);
assertNull(LogHelper.getPeerAddress(Attributes.EMPTY));
assertSame(
peer,
LogHelper.getPeerAddress(
Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build()));
}
private static GrpcLogRecord metadataToProtoTestHelper(
EventType type, Metadata metadata) {
GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder();
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair
= LogHelper.createMetadataProto(metadata);
builder.setMetadata(pair.payload);
builder.setPayloadSize(pair.size);
builder.setEventType(type);
return builder.build();
}
private static GrpcLogRecord messageTestHelper(byte[] message) {
GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder();
PayloadBuilder<ByteString> pair
= LogHelper.createMesageProto(message);
builder.setMessage(pair.payload);
builder.setPayloadSize(pair.size);
return builder.build();
}
// Used only in tests
// Copied from internal
static final class ByteArrayMarshaller implements Marshaller<byte[]> {
@Override
public InputStream stream(byte[] value) {
return new ByteArrayInputStream(value);
}
@Override
public byte[] parse(InputStream stream) {
try {
return parseHelper(stream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private byte[] parseHelper(InputStream stream) throws IOException {
try {
return IoUtils.toByteArray(stream);
} finally {
stream.close();
}
}
}
// Copied from internal
static final class IoUtils {
/** maximum buffer to be read is 16 KB. */
private static final int MAX_BUFFER_LENGTH = 16384;
/** Returns the byte array. */
public static byte[] toByteArray(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
copy(in, out);
return out.toByteArray();
}
/** Copies the data from input stream to output stream. */
public static long copy(InputStream from, OutputStream to) throws IOException {
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
checkNotNull(from);
checkNotNull(to);
byte[] buf = new byte[MAX_BUFFER_LENGTH];
long total = 0;
while (true) {
int r = from.read(buf);
if (r == -1) {
break;
}
to.write(buf, 0, r);
total += r;
}
return total;
}
}
}