blob: 26392acc733c4363d53fc56f650753a1775de2c9 [file] [log] [blame]
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
import static io.grpc.xds.XdsResourceType.ParsedResource;
import static io.grpc.xds.XdsResourceType.ValidatedResourceUpdate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import io.grpc.ChannelCredentials;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.TimeProvider;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* XdsClient implementation for client side usages.
*/
final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore {
private static boolean LOG_XDS_NODE_ID = Boolean.parseBoolean(
System.getenv("GRPC_LOG_XDS_NODE_ID"));
private static final Logger classLogger = Logger.getLogger(XdsClientImpl.class.getName());
// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.log(
XdsLogLevel.ERROR,
"Uncaught exception in XdsClient SynchronizationContext. Panic!",
e);
// TODO(chengyuanzhang): better error handling.
throw new AssertionError(e);
}
});
private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
private final LoadBalancerRegistry loadBalancerRegistry
= LoadBalancerRegistry.getDefaultRegistry();
private final Map<ServerInfo, AbstractXdsClient> serverChannelMap = new HashMap<>();
private final Map<XdsResourceType<? extends ResourceUpdate>,
Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
resourceSubscribers = new HashMap<>();
private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
private final LoadStatsManager2 loadStatsManager;
private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
private final XdsChannelFactory xdsChannelFactory;
private final Bootstrapper.BootstrapInfo bootstrapInfo;
private final Context context;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Supplier<Stopwatch> stopwatchSupplier;
private final TimeProvider timeProvider;
private boolean reportingLoad;
private final TlsContextManager tlsContextManager;
private final InternalLogId logId;
private final XdsLogger logger;
private volatile boolean isShutdown;
XdsClientImpl(
XdsChannelFactory xdsChannelFactory,
Bootstrapper.BootstrapInfo bootstrapInfo,
Context context,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier,
TimeProvider timeProvider,
TlsContextManager tlsContextManager) {
this.xdsChannelFactory = xdsChannelFactory;
this.bootstrapInfo = bootstrapInfo;
this.context = context;
this.timeService = timeService;
loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
this.backoffPolicyProvider = backoffPolicyProvider;
this.stopwatchSupplier = stopwatchSupplier;
this.timeProvider = timeProvider;
this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager");
logId = InternalLogId.allocate("xds-client", null);
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
if (LOG_XDS_NODE_ID) {
classLogger.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
}
}
private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
syncContext.throwIfNotInThisSynchronizationContext();
if (serverChannelMap.containsKey(serverInfo)) {
return;
}
AbstractXdsClient xdsChannel = new AbstractXdsClient(
xdsChannelFactory,
serverInfo,
bootstrapInfo.node(),
this,
this,
context,
timeService,
syncContext,
backoffPolicyProvider,
stopwatchSupplier);
LoadReportClient lrsClient = new LoadReportClient(
loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(),
bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
serverChannelMap.put(serverInfo, xdsChannel);
serverLrsClientMap.put(serverInfo, lrsClient);
}
@Override
public void handleResourceResponse(
XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
List<Any> resources, String nonce) {
syncContext.throwIfNotInThisSynchronizationContext();
if (xdsResourceType == null) {
logger.log(XdsLogLevel.WARNING, "Ignore an unknown type of DiscoveryResponse");
return;
}
Set<String> toParseResourceNames = null;
if (!(xdsResourceType == XdsListenerResource.getInstance()
|| xdsResourceType == XdsRouteConfigureResource.getInstance())
&& resourceSubscribers.containsKey(xdsResourceType)) {
toParseResourceNames = resourceSubscribers.get(xdsResourceType).keySet();
}
XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
bootstrapInfo, filterRegistry, loadBalancerRegistry, tlsContextManager,
toParseResourceNames);
handleResourceUpdate(args, resources, xdsResourceType);
}
@Override
public void handleStreamClosed(Status error) {
syncContext.throwIfNotInThisSynchronizationContext();
cleanUpResourceTimers();
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
subscriber.onError(error);
}
}
}
@Override
public void handleStreamRestarted(ServerInfo serverInfo) {
syncContext.throwIfNotInThisSynchronizationContext();
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (subscriber.serverInfo.equals(serverInfo)) {
subscriber.restartTimer();
}
}
}
}
@Override
void shutdown() {
syncContext.execute(
new Runnable() {
@Override
public void run() {
if (isShutdown) {
return;
}
isShutdown = true;
for (AbstractXdsClient xdsChannel : serverChannelMap.values()) {
xdsChannel.shutdown();
}
if (reportingLoad) {
for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
lrsClient.stopLoadReporting();
}
}
cleanUpResourceTimers();
}
});
}
@Override
boolean isShutDown() {
return isShutdown;
}
@Override
public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
return Collections.unmodifiableMap(subscribedResourceTypeUrls);
}
@Nullable
@Override
public Collection<String> getSubscribedResources(ServerInfo serverInfo,
XdsResourceType<? extends ResourceUpdate> type) {
Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
resourceSubscribers.getOrDefault(type, Collections.emptyMap());
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
for (String key : resources.keySet()) {
if (resources.get(key).serverInfo.equals(serverInfo)) {
builder.add(key);
}
}
Collection<String> retVal = builder.build();
return retVal.isEmpty() ? null : retVal;
}
// As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
// ResourceTypes that do not have subscribers does not show up in the snapshot keys.
@Override
ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot() {
final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
SettableFuture.create();
syncContext.execute(new Runnable() {
@Override
public void run() {
// A map from a "resource type" to a map ("resource name": "resource metadata")
ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
ImmutableMap.builder();
for (XdsResourceType<?> resourceType: resourceSubscribers.keySet()) {
ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
: resourceSubscribers.get(resourceType).entrySet()) {
metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
}
metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
}
future.set(metadataSnapshot.buildOrThrow());
}
});
return future;
}
@Override
TlsContextManager getTlsContextManager() {
return tlsContextManager;
}
@Override
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) {
syncContext.execute(new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
if (!resourceSubscribers.containsKey(type)) {
resourceSubscribers.put(type, new HashMap<>());
subscribedResourceTypeUrls.put(type.typeUrl(), type);
subscribedResourceTypeUrls.put(type.typeUrlV2(), type);
}
ResourceSubscriber<T> subscriber =
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
if (subscriber == null) {
logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
subscriber = new ResourceSubscriber<>(type, resourceName);
resourceSubscribers.get(type).put(resourceName, subscriber);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(type);
}
}
subscriber.addWatcher(watcher);
}
});
}
@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
syncContext.execute(new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
ResourceSubscriber<T> subscriber =
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.cancelResourceWatch();
resourceSubscribers.get(type).remove(resourceName);
subscribedResourceTypeUrls.remove(type.typeUrl());
subscribedResourceTypeUrls.remove(type.typeUrlV2());
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(type);
}
if (resourceSubscribers.get(type).isEmpty()) {
resourceSubscribers.remove(type);
}
}
}
});
}
@Override
ClusterDropStats addClusterDropStats(
final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
ClusterDropStats dropCounter =
loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!reportingLoad) {
serverLrsClientMap.get(serverInfo).startLoadReporting();
reportingLoad = true;
}
}
});
return dropCounter;
}
@Override
ClusterLocalityStats addClusterLocalityStats(
final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
Locality locality) {
ClusterLocalityStats loadCounter =
loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!reportingLoad) {
serverLrsClientMap.get(serverInfo).startLoadReporting();
reportingLoad = true;
}
}
});
return loadCounter;
}
@Override
Bootstrapper.BootstrapInfo getBootstrapInfo() {
return bootstrapInfo;
}
@Override
public String toString() {
return logId.toString();
}
private void cleanUpResourceTimers() {
for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
subscriber.stopTimer();
}
}
}
@SuppressWarnings("unchecked")
private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Args args,
List<Any> resources,
XdsResourceType<T> xdsResourceType) {
ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
logger.log(XdsLogger.XdsLogLevel.INFO,
"Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
Set<String> invalidResources = result.invalidResources;
List<String> errors = result.errors;
String errorDetail = null;
if (errors.isEmpty()) {
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
serverChannelMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo,
args.nonce);
} else {
errorDetail = Joiner.on('\n').join(errors);
logger.log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail);
serverChannelMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail);
}
long updateTime = timeProvider.currentTimeNanos();
Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
if (parsedResources.containsKey(resourceName)) {
// Happy path: the resource updated successfully. Notify the watchers of the update.
subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime);
continue;
}
if (invalidResources.contains(resourceName)) {
// The resource update is invalid. Capture the error without notifying the watchers.
subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
}
// Nothing else to do for incremental ADS resources.
if (!xdsResourceType.isFullStateOfTheWorld()) {
continue;
}
// Handle State of the World ADS: invalid resources.
if (invalidResources.contains(resourceName)) {
// The resource is missing. Reuse the cached resource if possible.
if (subscriber.data == null) {
// No cached data. Notify the watchers of an invalid update.
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
}
continue;
}
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
}
}
/**
* Tracks a single subscribed resource.
*/
private final class ResourceSubscriber<T extends ResourceUpdate> {
@Nullable private final ServerInfo serverInfo;
@Nullable private final AbstractXdsClient xdsChannel;
private final XdsResourceType<T> type;
private final String resource;
private final Set<ResourceWatcher<T>> watchers = new HashSet<>();
@Nullable private T data;
private boolean absent;
// Tracks whether the deletion has been ignored per bootstrap server feature.
// See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
private boolean resourceDeletionIgnored;
@Nullable private ScheduledHandle respTimer;
@Nullable private ResourceMetadata metadata;
@Nullable private String errorDescription;
ResourceSubscriber(XdsResourceType<T> type, String resource) {
syncContext.throwIfNotInThisSynchronizationContext();
this.type = type;
this.resource = resource;
this.serverInfo = getServerInfo(resource);
if (serverInfo == null) {
this.errorDescription = "Wrong configuration: xds server does not exist for resource "
+ resource;
this.xdsChannel = null;
return;
}
// Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
// is created but not yet requested because the client is in backoff.
this.metadata = ResourceMetadata.newResourceMetadataUnknown();
maybeCreateXdsChannelWithLrs(serverInfo);
this.xdsChannel = serverChannelMap.get(serverInfo);
if (xdsChannel.isInBackoff()) {
return;
}
restartTimer();
}
@Nullable
private ServerInfo getServerInfo(String resource) {
if (BootstrapperImpl.enableFederation && resource.startsWith(XDSTP_SCHEME)) {
URI uri = URI.create(resource);
String authority = uri.getAuthority();
if (authority == null) {
authority = "";
}
AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) {
return null;
}
return authorityInfo.xdsServers().get(0);
}
return bootstrapInfo.servers().get(0); // use first server
}
void addWatcher(ResourceWatcher<T> watcher) {
checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher);
watchers.add(watcher);
if (errorDescription != null) {
watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
return;
}
if (data != null) {
notifyWatcher(watcher, data);
} else if (absent) {
watcher.onResourceDoesNotExist(resource);
}
}
void removeWatcher(ResourceWatcher<T> watcher) {
checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher);
watchers.remove(watcher);
}
void restartTimer() {
if (data != null || absent) { // resource already resolved
return;
}
class ResourceNotFound implements Runnable {
@Override
public void run() {
logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
type, resource);
respTimer = null;
onAbsent();
}
@Override
public String toString() {
return type + this.getClass().getSimpleName();
}
}
// Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
metadata = ResourceMetadata.newResourceMetadataRequested();
respTimer = syncContext.schedule(
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
timeService);
}
void stopTimer() {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
respTimer = null;
}
}
void cancelResourceWatch() {
if (isWatched()) {
throw new IllegalStateException("Can't cancel resource watch with active watchers present");
}
stopTimer();
String message = "Unsubscribing {0} resource {1} from server {2}";
XdsLogLevel logLevel = XdsLogLevel.INFO;
if (resourceDeletionIgnored) {
message += " for which we previously ignored a deletion";
logLevel = XdsLogLevel.FORCE_INFO;
}
logger.log(logLevel, message, type, resource,
serverInfo != null ? serverInfo.target() : "unknown");
}
boolean isWatched() {
return !watchers.isEmpty();
}
void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
respTimer = null;
}
this.metadata = ResourceMetadata
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
ResourceUpdate oldData = this.data;
this.data = parsedResource.getResourceUpdate();
absent = false;
if (resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
+ "of resource for which we previously ignored a deletion: type {1} name {2}",
serverInfo != null ? serverInfo.target() : "unknown", type, resource);
resourceDeletionIgnored = false;
}
if (!Objects.equals(oldData, data)) {
for (ResourceWatcher<T> watcher : watchers) {
notifyWatcher(watcher, data);
}
}
}
void onAbsent() {
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
return;
}
// Ignore deletion of State of the World resources when this feature is on,
// and the resource is reusable.
boolean ignoreResourceDeletionEnabled =
serverInfo != null && serverInfo.ignoreResourceDeletion();
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
serverInfo.target(), type, resource);
resourceDeletionIgnored = true;
}
return;
}
logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
if (!absent) {
data = null;
absent = true;
metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
for (ResourceWatcher<T> watcher : watchers) {
watcher.onResourceDoesNotExist(resource);
}
}
}
void onError(Status error) {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
respTimer = null;
}
// Include node ID in xds failures to allow cross-referencing with control plane logs
// when debugging.
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorAugmented = Status.fromCode(error.getCode())
.withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
.withCause(error.getCause());
for (ResourceWatcher<T> watcher : watchers) {
watcher.onError(errorAugmented);
}
}
void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
metadata = ResourceMetadata
.newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails);
}
private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
watcher.onChanged(update);
}
}
static final class ResourceInvalidException extends Exception {
private static final long serialVersionUID = 0L;
ResourceInvalidException(String message) {
super(message, null, false, false);
}
ResourceInvalidException(String message, Throwable cause) {
super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false);
}
}
abstract static class XdsChannelFactory {
static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() {
@Override
ManagedChannel create(ServerInfo serverInfo) {
String target = serverInfo.target();
ChannelCredentials channelCredentials = serverInfo.channelCredentials();
return Grpc.newChannelBuilder(target, channelCredentials)
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
}
};
abstract ManagedChannel create(ServerInfo serverInfo);
}
}