blob: 0750b7b62ac3a03de6ea4884f4e64327b657b67d [file] [log] [blame]
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.binder.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import android.content.Context;
import android.os.Binder;
import android.os.DeadObjectException;
import android.os.IBinder;
import android.os.Parcel;
import android.os.Process;
import android.os.RemoteException;
import android.os.TransactionTooLargeException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Grpc;
import io.grpc.Internal;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalLogId;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.SecurityLevel;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.binder.AndroidComponentAddress;
import io.grpc.binder.ApiConstants;
import io.grpc.binder.BindServiceFlags;
import io.grpc.binder.InboundParcelablePolicy;
import io.grpc.binder.SecurityPolicy;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.FailingClientStream;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TimeProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
/**
* Base class for binder-based gRPC transport.
*
* <p>This is used on both the client and service sides of the transport.
*
* <p>A note on synchronization. The nature of this class's interaction with each stream
* (bi-directional communication between gRPC calls and binder transactions) means that acquiring
* multiple locks in two different orders can happen easily. E.g. binder transactions will arrive in
* this class, and need to passed to a stream instance, whereas gRPC calls on a stream instance will
* need to call into this class to send a transaction (possibly waiting for the transport to become
* ready).
*
* <p>The split between Outbound & Inbound helps reduce this risk, but not entirely remove it.
*
* <p>For this reason, while most state within this class is guarded by this instance, methods
* exposed to individual stream instances need to use atomic or volatile types, since those calls
* will already be synchronized on the individual RPC objects.
*
* <p><b>IMPORTANT</b>: This implementation must comply with this published wire format.
* https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md
*/
@ThreadSafe
public abstract class BinderTransport
implements LeakSafeOneWayBinder.TransactionHandler, IBinder.DeathRecipient {
private static final Logger logger = Logger.getLogger(BinderTransport.class.getName());
/**
* Attribute used to store the Android UID of the remote app. This is guaranteed to be set on any
* active transport.
*/
@Internal
public static final Attributes.Key<Integer> REMOTE_UID = Attributes.Key.create("remote-uid");
/** The authority of the server. */
@Internal
public static final Attributes.Key<String> SERVER_AUTHORITY = Attributes.Key.create("server-authority");
/** A transport attribute to hold the {@link InboundParcelablePolicy}. */
@Internal
public static final Attributes.Key<InboundParcelablePolicy> INBOUND_PARCELABLE_POLICY =
Attributes.Key.create("inbound-parcelable-policy");
/**
* Version code for this wire format.
*
* <p>Should this change, we should still endeavor to support earlier wire-format versions. If
* that's not possible, {@link EARLIEST_SUPPORTED_WIRE_FORMAT_VERSION} should be updated below.
*/
@Internal
public static final int WIRE_FORMAT_VERSION = 1;
/** The version code of the earliest wire format we support. */
@Internal
public static final int EARLIEST_SUPPORTED_WIRE_FORMAT_VERSION = 1;
/** The max number of "in-flight" bytes before we start buffering transactions. */
private static final int TRANSACTION_BYTES_WINDOW = 128 * 1024;
/** The number of in-flight bytes we should receive between sendings acks to our peer. */
private static final int TRANSACTION_BYTES_WINDOW_FORCE_ACK = 16 * 1024;
/**
* Sent from the client to host service binder to initiate a new transport, and from the host to
* the binder. and from the host s Followed by: int wire_protocol_version IBinder
* client_transports_callback_binder
*/
@Internal
public static final int SETUP_TRANSPORT = IBinder.FIRST_CALL_TRANSACTION;
/** Send to shutdown the transport from either end. */
@Internal
public static final int SHUTDOWN_TRANSPORT = IBinder.FIRST_CALL_TRANSACTION + 1;
/** Send to acknowledge receipt of rpc bytes, for flow control. */
static final int ACKNOWLEDGE_BYTES = IBinder.FIRST_CALL_TRANSACTION + 2;
/** A ping request. */
private static final int PING = IBinder.FIRST_CALL_TRANSACTION + 3;
/** A response to a ping. */
private static final int PING_RESPONSE = IBinder.FIRST_CALL_TRANSACTION + 4;
/** Reserved transaction IDs for any special events we might need. */
private static final int RESERVED_TRANSACTIONS = 1000;
/** The first call ID we can use. */
private static final int FIRST_CALL_ID = IBinder.FIRST_CALL_TRANSACTION + RESERVED_TRANSACTIONS;
/** The last call ID we can use. */
private static final int LAST_CALL_ID = IBinder.LAST_CALL_TRANSACTION;
/** The states of this transport. */
protected enum TransportState {
NOT_STARTED, // We haven't been started yet.
SETUP, // We're setting up the connection.
READY, // The transport is ready.
SHUTDOWN, // We've been shutdown and won't accept any additional calls (thought existing calls
// may continue).
SHUTDOWN_TERMINATED // We've been shutdown completely (or we failed to start). We can't send or
// receive any data.
}
private final ObjectPool<ScheduledExecutorService> executorServicePool;
private final ScheduledExecutorService scheduledExecutorService;
private final InternalLogId logId;
private final LeakSafeOneWayBinder incomingBinder;
protected final ConcurrentHashMap<Integer, Inbound<?>> ongoingCalls;
@GuardedBy("this")
protected Attributes attributes;
@GuardedBy("this")
private TransportState transportState = TransportState.NOT_STARTED;
@GuardedBy("this")
@Nullable
protected Status shutdownStatus;
@Nullable private IBinder outgoingBinder;
/** The number of outgoing bytes we've transmitted. */
private final AtomicLong numOutgoingBytes;
/** The number of incoming bytes we've received. */
private final AtomicLong numIncomingBytes;
/** The number of our outgoing bytes our peer has told us it received. */
private long acknowledgedOutgoingBytes;
/** The number of incoming bytes we've told our peer we've received. */
private long acknowledgedIncomingBytes;
/**
* Whether there are too many unacknowledged outgoing bytes to allow more RPCs right now. This is
* volatile because it'll be read without holding the lock.
*/
private volatile boolean transmitWindowFull;
private BinderTransport(
ObjectPool<ScheduledExecutorService> executorServicePool,
Attributes attributes,
InternalLogId logId) {
this.executorServicePool = executorServicePool;
this.attributes = attributes;
this.logId = logId;
scheduledExecutorService = executorServicePool.getObject();
incomingBinder = new LeakSafeOneWayBinder(this);
ongoingCalls = new ConcurrentHashMap<>();
numOutgoingBytes = new AtomicLong();
numIncomingBytes = new AtomicLong();
}
// Override in child class.
public final ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}
// Override in child class.
public final ListenableFuture<SocketStats> getStats() {
return immediateFuture(null);
}
// Override in child class.
public final InternalLogId getLogId() {
return logId;
}
// Override in child class.
public final synchronized Attributes getAttributes() {
return attributes;
}
/**
* Returns whether this transport is able to send rpc transactions. Intentionally unsynchronized
* since this will be called while Outbound is held.
*/
final boolean isReady() {
return !transmitWindowFull;
}
abstract void notifyShutdown(Status shutdownStatus);
abstract void notifyTerminated();
void releaseExecutors() {
executorServicePool.returnObject(scheduledExecutorService);
}
@GuardedBy("this")
boolean inState(TransportState transportState) {
return this.transportState == transportState;
}
@GuardedBy("this")
boolean isShutdown() {
return inState(TransportState.SHUTDOWN) || inState(TransportState.SHUTDOWN_TERMINATED);
}
@GuardedBy("this")
final void setState(TransportState newState) {
checkTransition(transportState, newState);
transportState = newState;
}
@GuardedBy("this")
protected boolean setOutgoingBinder(IBinder binder) {
this.outgoingBinder = binder;
try {
binder.linkToDeath(this, 0);
return true;
} catch (RemoteException re) {
return false;
}
}
@Override
public synchronized void binderDied() {
shutdownInternal(Status.UNAVAILABLE.withDescription("binderDied"), true);
}
@GuardedBy("this")
final void shutdownInternal(Status shutdownStatus, boolean forceTerminate) {
if (!isShutdown()) {
this.shutdownStatus = shutdownStatus;
setState(TransportState.SHUTDOWN);
notifyShutdown(shutdownStatus);
}
if (!inState(TransportState.SHUTDOWN_TERMINATED)
&& (forceTerminate || ongoingCalls.isEmpty())) {
incomingBinder.detach();
setState(TransportState.SHUTDOWN_TERMINATED);
sendShutdownTransaction();
ArrayList<Inbound<?>> calls = new ArrayList<>(ongoingCalls.values());
ongoingCalls.clear();
scheduledExecutorService.execute(
() -> {
for (Inbound<?> inbound : calls) {
synchronized (inbound) {
inbound.closeAbnormal(shutdownStatus);
}
}
notifyTerminated();
releaseExecutors();
});
}
}
@GuardedBy("this")
final void sendSetupTransaction() {
sendSetupTransaction(checkNotNull(outgoingBinder));
}
@GuardedBy("this")
final void sendSetupTransaction(IBinder iBinder) {
Parcel parcel = Parcel.obtain();
parcel.writeInt(WIRE_FORMAT_VERSION);
parcel.writeStrongBinder(incomingBinder);
try {
if (!iBinder.transact(SETUP_TRANSPORT, parcel, null, IBinder.FLAG_ONEWAY)) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Failed sending SETUP_TRANSPORT transaction"), true);
}
} catch (RemoteException re) {
shutdownInternal(statusFromRemoteException(re), true);
}
parcel.recycle();
}
@GuardedBy("this")
private final void sendShutdownTransaction() {
if (outgoingBinder != null) {
try {
outgoingBinder.unlinkToDeath(this, 0);
} catch (NoSuchElementException e) {
// Ignore.
}
Parcel parcel = Parcel.obtain();
try {
outgoingBinder.transact(SHUTDOWN_TRANSPORT, parcel, null, IBinder.FLAG_ONEWAY);
} catch (RemoteException re) {
// Ignore.
}
parcel.recycle();
}
}
protected synchronized void sendPing(int id) throws StatusException {
if (inState(TransportState.SHUTDOWN_TERMINATED)) {
throw shutdownStatus.asException();
} else if (outgoingBinder == null) {
throw Status.FAILED_PRECONDITION.withDescription("Transport not ready.").asException();
} else {
Parcel parcel = Parcel.obtain();
parcel.writeInt(id);
try {
outgoingBinder.transact(PING, parcel, null, IBinder.FLAG_ONEWAY);
} catch (RemoteException re) {
throw statusFromRemoteException(re).asException();
} finally {
parcel.recycle();
}
}
}
protected void unregisterInbound(Inbound<?> inbound) {
unregisterCall(inbound.callId);
}
final void unregisterCall(int callId) {
boolean removed = (ongoingCalls.remove(callId) != null);
if (removed && ongoingCalls.isEmpty()) {
// Possibly shutdown (not synchronously, since inbound is held).
scheduledExecutorService.execute(
() -> {
synchronized (this) {
if (inState(TransportState.SHUTDOWN)) {
// No more ongoing calls, and we're shutdown. Finish the shutdown.
shutdownInternal(shutdownStatus, true);
}
}
});
}
}
final void sendTransaction(int callId, Parcel parcel) throws StatusException {
int dataSize = parcel.dataSize();
try {
if (!outgoingBinder.transact(callId, parcel, null, IBinder.FLAG_ONEWAY)) {
throw Status.UNAVAILABLE.withDescription("Failed sending transaction").asException();
}
} catch (RemoteException re) {
throw statusFromRemoteException(re).asException();
}
long nob = numOutgoingBytes.addAndGet(dataSize);
if ((nob - acknowledgedOutgoingBytes) > TRANSACTION_BYTES_WINDOW) {
logger.log(Level.FINE, "transmist window full. Outgoing=" + nob + " Ack'd Outgoing=" +
acknowledgedOutgoingBytes + " " + this);
transmitWindowFull = true;
}
}
final void sendOutOfBandClose(int callId, Status status) {
Parcel parcel = Parcel.obtain();
parcel.writeInt(0); // Placeholder for flags. Will be filled in below.
int flags = TransactionUtils.writeStatus(parcel, status);
TransactionUtils.fillInFlags(parcel, flags | TransactionUtils.FLAG_OUT_OF_BAND_CLOSE);
try {
sendTransaction(callId, parcel);
} catch (StatusException e) {
logger.log(Level.WARNING, "Failed sending oob close transaction", e);
}
parcel.recycle();
}
@Override
public final boolean handleTransaction(int code, Parcel parcel) {
if (code < FIRST_CALL_ID) {
synchronized (this) {
switch (code) {
case ACKNOWLEDGE_BYTES:
handleAcknowledgedBytes(parcel.readLong());
break;
case SHUTDOWN_TRANSPORT:
shutdownInternal(
Status.UNAVAILABLE.withDescription("transport shutdown by peer"), true);
break;
case SETUP_TRANSPORT:
handleSetupTransport(parcel);
break;
case PING:
handlePing(parcel);
break;
case PING_RESPONSE:
handlePingResponse(parcel);
break;
default:
return false;
}
return true;
}
} else {
int size = parcel.dataSize();
Inbound<?> inbound = ongoingCalls.get(code);
if (inbound == null) {
synchronized (this) {
if (!isShutdown()) {
// Create a new inbound. Strictly speaking we could end up doing this twice on
// two threads, hence the need to use putIfAbsent, and check its result.
inbound = createInbound(code);
if (inbound != null) {
Inbound<?> inbound2 = ongoingCalls.putIfAbsent(code, inbound);
if (inbound2 != null) {
inbound = inbound2;
}
}
}
}
}
if (inbound != null) {
inbound.handleTransaction(parcel);
}
long nib = numIncomingBytes.addAndGet(size);
if ((nib - acknowledgedIncomingBytes) > TRANSACTION_BYTES_WINDOW_FORCE_ACK) {
synchronized (this) {
sendAcknowledgeBytes(checkNotNull(outgoingBinder));
}
}
return true;
}
}
@Nullable
@GuardedBy("this")
protected Inbound<?> createInbound(int callId) {
return null;
}
@GuardedBy("this")
protected void handleSetupTransport(Parcel parcel) {}
@GuardedBy("this")
private final void handlePing(Parcel parcel) {
if (transportState == TransportState.READY) {
try {
outgoingBinder.transact(PING_RESPONSE, parcel, null, IBinder.FLAG_ONEWAY);
} catch (RemoteException re) {
// Ignore.
}
}
}
@GuardedBy("this")
protected void handlePingResponse(Parcel parcel) {}
@GuardedBy("this")
private void sendAcknowledgeBytes(IBinder iBinder) {
// Send a transaction to acknowledge reception of incoming data.
long n = numIncomingBytes.get();
acknowledgedIncomingBytes = n;
Parcel parcel = Parcel.obtain();
parcel.writeLong(n);
try {
if (!iBinder.transact(ACKNOWLEDGE_BYTES, parcel, null, IBinder.FLAG_ONEWAY)) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Failed sending ack bytes transaction"), true);
}
} catch (RemoteException re) {
shutdownInternal(statusFromRemoteException(re), true);
}
parcel.recycle();
}
@GuardedBy("this")
final void handleAcknowledgedBytes(long numBytes) {
// The remote side has acknowledged reception of rpc data.
// (update with Math.max in case transactions are delivered out of order).
acknowledgedOutgoingBytes = wrapAwareMax(acknowledgedOutgoingBytes, numBytes);
if ((numOutgoingBytes.get() - acknowledgedOutgoingBytes) < TRANSACTION_BYTES_WINDOW
&& transmitWindowFull) {
logger.log(Level.FINE,
"handleAcknowledgedBytes: Transmit Window No-Longer Full. Unblock calls: " + this);
// We're ready again, and need to poke any waiting transactions.
transmitWindowFull = false;
for (Inbound<?> inbound : ongoingCalls.values()) {
inbound.onTransportReady();
}
}
}
private static final long wrapAwareMax(long a, long b) {
return a - b < 0 ? b : a;
}
/** Concrete client-side transport implementation. */
@ThreadSafe
@Internal
public static final class BinderClientTransport extends BinderTransport
implements ConnectionClientTransport, Bindable.Observer {
private final ObjectPool<? extends Executor> offloadExecutorPool;
private final Executor offloadExecutor;
private final SecurityPolicy securityPolicy;
private final Bindable serviceBinding;
/** Number of ongoing calls which keep this transport "in-use". */
private final AtomicInteger numInUseStreams;
private final PingTracker pingTracker;
@Nullable private ManagedClientTransport.Listener clientTransportListener;
@GuardedBy("this")
private int latestCallId = FIRST_CALL_ID;
public BinderClientTransport(
Context sourceContext,
AndroidComponentAddress targetAddress,
BindServiceFlags bindServiceFlags,
Executor mainThreadExecutor,
ObjectPool<ScheduledExecutorService> executorServicePool,
ObjectPool<? extends Executor> offloadExecutorPool,
SecurityPolicy securityPolicy,
InboundParcelablePolicy inboundParcelablePolicy,
Attributes eagAttrs) {
super(
executorServicePool,
buildClientAttributes(eagAttrs, sourceContext, targetAddress, inboundParcelablePolicy),
buildLogId(sourceContext, targetAddress));
this.offloadExecutorPool = offloadExecutorPool;
this.securityPolicy = securityPolicy;
this.offloadExecutor = offloadExecutorPool.getObject();
numInUseStreams = new AtomicInteger();
pingTracker = new PingTracker(TimeProvider.SYSTEM_TIME_PROVIDER, (id) -> sendPing(id));
serviceBinding =
new ServiceBinding(
mainThreadExecutor,
sourceContext,
targetAddress.asBindIntent(),
bindServiceFlags.toInteger(),
this);
}
@Override
void releaseExecutors() {
super.releaseExecutors();
offloadExecutorPool.returnObject(offloadExecutor);
}
@Override
public synchronized void onBound(IBinder binder) {
sendSetupTransaction(binder);
}
@Override
public synchronized void onUnbound(Status reason) {
shutdownInternal(reason, true);
}
@CheckReturnValue
@Override
public synchronized Runnable start(ManagedClientTransport.Listener clientTransportListener) {
this.clientTransportListener = checkNotNull(clientTransportListener);
return () -> {
synchronized (BinderClientTransport.this) {
if (inState(TransportState.NOT_STARTED)) {
setState(TransportState.SETUP);
serviceBinding.bind();
}
}
};
}
@Override
public synchronized ClientStream newStream(
final MethodDescriptor<?, ?> method,
final Metadata headers,
final CallOptions callOptions) {
if (isShutdown()) {
return newFailingClientStream(shutdownStatus, callOptions, attributes, headers);
} else {
int callId = latestCallId++;
if (latestCallId == LAST_CALL_ID) {
latestCallId = FIRST_CALL_ID;
}
Inbound.ClientInbound inbound =
new Inbound.ClientInbound(
this, attributes, callId, GrpcUtil.shouldBeCountedForInUse(callOptions));
if (ongoingCalls.putIfAbsent(callId, inbound) != null) {
Status failure = Status.INTERNAL.withDescription("Clashing call IDs");
shutdownInternal(failure, true);
return newFailingClientStream(failure, callOptions, attributes, headers);
} else {
if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) {
clientTransportListener.transportInUse(true);
}
StatsTraceContext statsTraceContext =
StatsTraceContext.newClientContext(callOptions, attributes, headers);
Outbound.ClientOutbound outbound =
new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext);
if (method.getType().clientSendsOneMessage()) {
return new SingleMessageClientStream(inbound, outbound, attributes);
} else {
return new MultiMessageClientStream(inbound, outbound, attributes);
}
}
}
}
@Override
protected void unregisterInbound(Inbound<?> inbound) {
if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) {
clientTransportListener.transportInUse(false);
}
super.unregisterInbound(inbound);
}
@Override
public void ping(final PingCallback callback, Executor executor) {
pingTracker.startPing(callback, executor);
}
@Override
public synchronized void shutdown(Status reason) {
checkNotNull(reason, "reason");
shutdownInternal(reason, false);
}
@Override
public synchronized void shutdownNow(Status reason) {
checkNotNull(reason, "reason");
shutdownInternal(reason, true);
}
@Override
@GuardedBy("this")
public void notifyShutdown(Status status) {
clientTransportListener.transportShutdown(status);
}
@Override
@GuardedBy("this")
public void notifyTerminated() {
if (numInUseStreams.getAndSet(0) > 0) {
clientTransportListener.transportInUse(false);
}
serviceBinding.unbind();
clientTransportListener.transportTerminated();
}
@Override
@GuardedBy("this")
protected void handleSetupTransport(Parcel parcel) {
// Add the remote uid to our attributes.
attributes = setSecurityAttrs(attributes, Binder.getCallingUid());
if (inState(TransportState.SETUP)) {
int version = parcel.readInt();
IBinder binder = parcel.readStrongBinder();
if (version != WIRE_FORMAT_VERSION) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Wire format version mismatch"), true);
} else if (binder == null) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Malformed SETUP_TRANSPORT data"), true);
} else {
offloadExecutor.execute(() -> checkSecurityPolicy(binder));
}
}
}
private void checkSecurityPolicy(IBinder binder) {
Status authorization;
Integer remoteUid;
synchronized (this) {
remoteUid = attributes.get(REMOTE_UID);
}
if (remoteUid == null) {
authorization = Status.UNAUTHENTICATED.withDescription("No remote UID available");
} else {
authorization = securityPolicy.checkAuthorization(remoteUid);
}
synchronized (this) {
if (inState(TransportState.SETUP)) {
if (!authorization.isOk()) {
shutdownInternal(authorization, true);
} else if (!setOutgoingBinder(binder)) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Failed to observe outgoing binder"), true);
} else {
// Check state again, since a failure inside setOutgoingBinder (or a callback it
// triggers), could have shut us down.
if (!isShutdown()) {
setState(TransportState.READY);
clientTransportListener.transportReady();
}
}
}
}
}
@GuardedBy("this")
@Override
protected void handlePingResponse(Parcel parcel) {
pingTracker.onPingResponse(parcel.readInt());
}
private static ClientStream newFailingClientStream(
Status failure, CallOptions callOptions, Attributes attributes, Metadata headers) {
StatsTraceContext statsTraceContext =
StatsTraceContext.newClientContext(callOptions, attributes, headers);
statsTraceContext.clientOutboundHeaders();
statsTraceContext.streamClosed(failure);
return new FailingClientStream(failure);
}
private static InternalLogId buildLogId(
Context sourceContext, AndroidComponentAddress targetAddress) {
return InternalLogId.allocate(
BinderClientTransport.class,
sourceContext.getClass().getSimpleName()
+ "->"
+ targetAddress.getComponent().toShortString());
}
private static Attributes buildClientAttributes(
Attributes eagAttrs,
Context sourceContext,
AndroidComponentAddress targetAddress,
InboundParcelablePolicy inboundParcelablePolicy) {
return Attributes.newBuilder()
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) // Trust noone for now.
.set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, AndroidComponentAddress.forContext(sourceContext))
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, targetAddress)
.set(INBOUND_PARCELABLE_POLICY, inboundParcelablePolicy)
.build();
}
private static Attributes setSecurityAttrs(Attributes attributes, int uid) {
return attributes.toBuilder()
.set(REMOTE_UID, uid)
.set(
GrpcAttributes.ATTR_SECURITY_LEVEL,
uid == Process.myUid()
? SecurityLevel.PRIVACY_AND_INTEGRITY
: SecurityLevel.INTEGRITY) // TODO: Have the SecrityPolicy decide this.
.build();
}
}
/** Concrete server-side transport implementation. */
@Internal
public static final class BinderServerTransport extends BinderTransport implements ServerTransport {
private final List<ServerStreamTracer.Factory> streamTracerFactories;
@Nullable private ServerTransportListener serverTransportListener;
public BinderServerTransport(
ObjectPool<ScheduledExecutorService> executorServicePool,
Attributes attributes,
List<ServerStreamTracer.Factory> streamTracerFactories,
IBinder callbackBinder) {
super(executorServicePool, attributes, buildLogId(attributes));
this.streamTracerFactories = streamTracerFactories;
setOutgoingBinder(callbackBinder);
}
public synchronized void setServerTransportListener(ServerTransportListener serverTransportListener) {
this.serverTransportListener = serverTransportListener;
if (isShutdown()) {
setState(TransportState.SHUTDOWN_TERMINATED);
notifyTerminated();
releaseExecutors();
} else {
sendSetupTransaction();
// Check we're not shutdown again, since a failure inside sendSetupTransaction (or a
// callback it triggers), could have shut us down.
if (!isShutdown()) {
setState(TransportState.READY);
attributes = serverTransportListener.transportReady(attributes);
}
}
}
StatsTraceContext createStatsTraceContext(String methodName, Metadata headers) {
return StatsTraceContext.newServerContext(streamTracerFactories, methodName, headers);
}
synchronized Status startStream(ServerStream stream, String methodName, Metadata headers) {
if (isShutdown()) {
return Status.UNAVAILABLE.withDescription("transport is shutdown");
} else {
serverTransportListener.streamCreated(stream, methodName, headers);
return Status.OK;
}
}
@Override
@GuardedBy("this")
public void notifyShutdown(Status status) {
// Nothing to do.
}
@Override
@GuardedBy("this")
public void notifyTerminated() {
if (serverTransportListener != null) {
serverTransportListener.transportTerminated();
}
}
@Override
public synchronized void shutdown() {
shutdownInternal(Status.OK, false);
}
@Override
public synchronized void shutdownNow(Status reason) {
shutdownInternal(reason, true);
}
@Override
@Nullable
@GuardedBy("this")
protected Inbound<?> createInbound(int callId) {
return new Inbound.ServerInbound(this, attributes, callId);
}
private static InternalLogId buildLogId(Attributes attributes) {
return InternalLogId.allocate(
BinderServerTransport.class, "from " + attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
}
}
private static void checkTransition(TransportState current, TransportState next) {
switch (next) {
case SETUP:
checkState(current == TransportState.NOT_STARTED);
break;
case READY:
checkState(current == TransportState.NOT_STARTED || current == TransportState.SETUP);
break;
case SHUTDOWN:
checkState(
current == TransportState.NOT_STARTED
|| current == TransportState.SETUP
|| current == TransportState.READY);
break;
case SHUTDOWN_TERMINATED:
checkState(current == TransportState.SHUTDOWN);
break;
default:
throw new AssertionError();
}
}
@VisibleForTesting
Map<Integer, Inbound<?>> getOngoingCalls() {
return ongoingCalls;
}
private static Status statusFromRemoteException(RemoteException e) {
if (e instanceof DeadObjectException || e instanceof TransactionTooLargeException) {
// These are to be expected from time to time and can simply be retried.
return Status.UNAVAILABLE.withCause(e);
}
// Otherwise, this exception from transact is unexpected.
return Status.INTERNAL.withCause(e);
}
}