blob: 38b871d33ad8620269cd39b6f2e842594d6a56a2 [file] [log] [blame]
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.cronet;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalLogId;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.SecurityLevel;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* A cronet-based {@link ConnectionClientTransport} implementation.
*/
class CronetClientTransport implements ConnectionClientTransport {
private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
private final InetSocketAddress address;
private final String authority;
private final String userAgent;
private Listener listener;
private final Object lock = new Object();
@GuardedBy("lock")
private final Set<CronetClientStream> streams =
new HashSet<CronetClientStream>();
private final Executor executor;
private final int maxMessageSize;
private final boolean alwaysUsePut;
private final TransportTracer transportTracer;
private final Attributes attrs;
// Indicates the transport is in go-away state: no new streams will be processed,
// but existing streams may continue.
@GuardedBy("lock")
private boolean goAway;
// Used to indicate the special phase while we are going to enter go-away state but before
// goAway is turned to true, see the comment at where this is set about why it is needed.
@GuardedBy("lock")
private boolean startedGoAway;
@GuardedBy("lock")
private Status goAwayStatus;
@GuardedBy("lock")
private boolean stopped;
@GuardedBy("lock")
// Whether this transport has started.
private boolean started;
private StreamBuilderFactory streamFactory;
CronetClientTransport(
StreamBuilderFactory streamFactory,
InetSocketAddress address,
String authority,
@Nullable String userAgent,
Executor executor,
int maxMessageSize,
boolean alwaysUsePut,
TransportTracer transportTracer) {
this.address = Preconditions.checkNotNull(address, "address");
this.authority = authority;
this.userAgent = GrpcUtil.getGrpcUserAgent("cronet", userAgent);
this.maxMessageSize = maxMessageSize;
this.alwaysUsePut = alwaysUsePut;
this.executor = Preconditions.checkNotNull(executor, "executor");
this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
this.attrs = Attributes.newBuilder()
.set(CallCredentials.ATTR_AUTHORITY, authority)
.set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build();
}
@Override
public ListenableFuture<SocketStats> getStats() {
SettableFuture<SocketStats> f = SettableFuture.create();
f.set(null);
return f;
}
@Override
public CronetClientStream newStream(final MethodDescriptor<?, ?> method, final Metadata headers,
final CallOptions callOptions) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(headers, "headers");
final String defaultPath = "/" + method.getFullMethodName();
final String url = "https://" + authority + defaultPath;
final StatsTraceContext statsTraceCtx =
StatsTraceContext.newClientContext(callOptions, headers);
class StartCallback implements Runnable {
final CronetClientStream clientStream = new CronetClientStream(
url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize,
alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer);
@Override
public void run() {
synchronized (lock) {
if (goAway) {
clientStream.transportState().transportReportStatus(goAwayStatus, true, new Metadata());
} else if (started) {
startStream(clientStream);
} else {
throw new AssertionError("Transport is not started");
}
}
}
}
return new StartCallback().clientStream;
}
@GuardedBy("lock")
private void startStream(CronetClientStream stream) {
streams.add(stream);
stream.transportState().start(streamFactory);
}
@Override
public Runnable start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
synchronized (lock) {
started = true;
}
return new Runnable() {
@Override
public void run() {
// Listener callbacks should not be called simultaneously
CronetClientTransport.this.listener.transportReady();
}
};
}
@Override
public String toString() {
return super.toString() + "(" + address + ")";
}
public void shutdown() {
shutdown(Status.UNAVAILABLE.withDescription("Transport stopped"));
}
@Override
public void shutdown(Status status) {
synchronized (lock) {
if (goAway) {
return;
}
}
startGoAway(status);
}
@Override
public void shutdownNow(Status status) {
shutdown(status);
ArrayList<CronetClientStream> streamsCopy;
synchronized (lock) {
// A copy is always necessary since cancel() can call finishStream() which calls
// streams.remove()
streamsCopy = new ArrayList<>(streams);
}
for (int i = 0; i < streamsCopy.size(); i++) {
// Avoid deadlock by calling into stream without lock held
streamsCopy.get(i).cancel(status);
}
stopIfNecessary();
}
@Override
public Attributes getAttributes() {
return attrs;
}
private void startGoAway(Status status) {
synchronized (lock) {
if (startedGoAway) {
// Another go-away is in progress, ignore this one.
return;
}
// We use startedGoAway here instead of goAway, because once the goAway becomes true, other
// thread in stopIfNecessary() may stop the transport and cause the
// listener.transportTerminated() be called before listener.transportShutdown().
startedGoAway = true;
}
listener.transportShutdown(status);
synchronized (lock) {
goAway = true;
goAwayStatus = status;
}
stopIfNecessary();
}
@Override
public void ping(final PingCallback callback, Executor executor) {
// TODO(ericgribkoff): depend on cronet implemenetation
throw new UnsupportedOperationException();
}
@Override
public InternalLogId getLogId() {
return logId;
}
/**
* When the transport is in goAway state, we should stop it once all active streams finish.
*/
void stopIfNecessary() {
synchronized (lock) {
if (goAway && !stopped && streams.size() == 0) {
stopped = true;
} else {
return;
}
}
listener.transportTerminated();
}
void finishStream(CronetClientStream stream, Status status) {
synchronized (lock) {
if (streams.remove(stream)) {
boolean isCancelled = (status.getCode() == Code.CANCELLED
|| status.getCode() == Code.DEADLINE_EXCEEDED);
stream.transportState().transportReportStatus(status, isCancelled, new Metadata());
} else {
return;
}
}
stopIfNecessary();
}
}