blob: 3ca5de65558f7128af5d111b221d74cb9d41ce30 [file] [log] [blame]
/*
* Copyright 2017 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.services;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.BinaryLog;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Internal;
import io.grpc.InternalClientInterceptors;
import io.grpc.InternalServerInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.annotation.Nullable;
// TODO(zpencer): rename class to AbstractBinaryLog
@Internal
public abstract class BinaryLogProvider extends BinaryLog {
@VisibleForTesting
public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller();
private final ClientInterceptor binaryLogShim = new BinaryLogShim();
/**
* Wraps a channel to provide binary logging on {@link ClientCall}s as needed.
*/
@Override
public final Channel wrapChannel(Channel channel) {
return ClientInterceptors.intercept(channel, binaryLogShim);
}
private static MethodDescriptor<byte[], byte[]> toByteBufferMethod(
MethodDescriptor<?, ?> method) {
return method.toBuilder(BYTEARRAY_MARSHALLER, BYTEARRAY_MARSHALLER).build();
}
/**
* Wraps a {@link ServerMethodDefinition} such that it performs binary logging if needed.
*/
@Override
public final <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
ServerMethodDefinition<ReqT, RespT> oMethodDef) {
ServerInterceptor binlogInterceptor =
getServerInterceptor(oMethodDef.getMethodDescriptor().getFullMethodName());
if (binlogInterceptor == null) {
return oMethodDef;
}
MethodDescriptor<byte[], byte[]> binMethod =
BinaryLogProvider.toByteBufferMethod(oMethodDef.getMethodDescriptor());
ServerMethodDefinition<byte[], byte[]> binDef =
InternalServerInterceptors.wrapMethod(oMethodDef, binMethod);
ServerCallHandler<byte[], byte[]> binlogHandler =
InternalServerInterceptors.interceptCallHandlerCreate(
binlogInterceptor, binDef.getServerCallHandler());
return ServerMethodDefinition.create(binMethod, binlogHandler);
}
/**
* Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor,
* so the interceptor must be reusable across calls. At runtime, the request and response
* marshallers are always {@code Marshaller<InputStream>}.
* Returns {@code null} if this method is not binary logged.
*/
// TODO(zpencer): ensure the interceptor properly handles retries and hedging
@Nullable
protected abstract ServerInterceptor getServerInterceptor(String fullMethodName);
/**
* Returns a {@link ClientInterceptor} for binary logging. gRPC is free to cache the interceptor,
* so the interceptor must be reusable across calls. At runtime, the request and response
* marshallers are always {@code Marshaller<InputStream>}.
* Returns {@code null} if this method is not binary logged.
*/
// TODO(zpencer): ensure the interceptor properly handles retries and hedging
@Nullable
protected abstract ClientInterceptor getClientInterceptor(
String fullMethodName, CallOptions callOptions);
@Override
public void close() throws IOException {
// default impl: noop
// TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there
}
// Creating a named class makes debugging easier
private static final class ByteArrayMarshaller implements Marshaller<byte[]> {
@Override
public InputStream stream(byte[] value) {
return new ByteArrayInputStream(value);
}
@Override
public byte[] parse(InputStream stream) {
try {
return parseHelper(stream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private byte[] parseHelper(InputStream stream) throws IOException {
try {
return IoUtils.toByteArray(stream);
} finally {
stream.close();
}
}
}
/**
* The pipeline of interceptors is hard coded when the {@link ManagedChannel} is created.
* This shim interceptor should always be installed as a placeholder. When a call starts,
* this interceptor checks with the {@link BinaryLogProvider} to see if logging should happen
* for this particular {@link ClientCall}'s method.
*/
private final class BinaryLogShim implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientInterceptor binlogInterceptor = getClientInterceptor(
method.getFullMethodName(), callOptions);
if (binlogInterceptor == null) {
return next.newCall(method, callOptions);
} else {
return InternalClientInterceptors
.wrapClientInterceptor(
binlogInterceptor,
BYTEARRAY_MARSHALLER,
BYTEARRAY_MARSHALLER)
.interceptCall(method, callOptions, next);
}
}
}
// Copied from internal
private static final class IoUtils {
/** maximum buffer to be read is 16 KB. */
private static final int MAX_BUFFER_LENGTH = 16384;
/** Returns the byte array. */
public static byte[] toByteArray(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
copy(in, out);
return out.toByteArray();
}
/** Copies the data from input stream to output stream. */
public static long copy(InputStream from, OutputStream to) throws IOException {
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
Preconditions.checkNotNull(from);
Preconditions.checkNotNull(to);
byte[] buf = new byte[MAX_BUFFER_LENGTH];
long total = 0;
while (true) {
int r = from.read(buf);
if (r == -1) {
break;
}
to.write(buf, 0, r);
total += r;
}
return total;
}
}
}