blob: 542e84b9c8b28b6315ebbba17b72d64128bac400 [file] [log] [blame]
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannel;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
final class ManagedChannelOrphanWrapper extends ForwardingManagedChannel {
private static final ReferenceQueue<ManagedChannelOrphanWrapper> refqueue =
new ReferenceQueue<>();
// Retain the References so they don't get GC'd
private static final ConcurrentMap<ManagedChannelReference, ManagedChannelReference> refs =
new ConcurrentHashMap<>();
private static final Logger logger =
Logger.getLogger(ManagedChannelOrphanWrapper.class.getName());
private final ManagedChannelReference phantom;
ManagedChannelOrphanWrapper(ManagedChannel delegate) {
this(delegate, refqueue, refs);
}
@VisibleForTesting
ManagedChannelOrphanWrapper(
ManagedChannel delegate,
ReferenceQueue<ManagedChannelOrphanWrapper> refqueue,
ConcurrentMap<ManagedChannelReference, ManagedChannelReference> refs) {
super(delegate);
phantom = new ManagedChannelReference(this, delegate, refqueue, refs);
}
@Override
public ManagedChannel shutdown() {
phantom.clearSafely();
return super.shutdown();
}
@Override
public ManagedChannel shutdownNow() {
phantom.clearSafely();
return super.shutdownNow();
}
@VisibleForTesting
static final class ManagedChannelReference extends WeakReference<ManagedChannelOrphanWrapper> {
private static final String ALLOCATION_SITE_PROPERTY_NAME =
"io.grpc.ManagedChannel.enableAllocationTracking";
private static final boolean ENABLE_ALLOCATION_TRACKING =
Boolean.parseBoolean(System.getProperty(ALLOCATION_SITE_PROPERTY_NAME, "true"));
private static final RuntimeException missingCallSite = missingCallSite();
private final ReferenceQueue<ManagedChannelOrphanWrapper> refqueue;
private final ConcurrentMap<ManagedChannelReference, ManagedChannelReference> refs;
private final String channelStr;
private final Reference<RuntimeException> allocationSite;
private final AtomicBoolean shutdown = new AtomicBoolean();
ManagedChannelReference(
ManagedChannelOrphanWrapper orphanable,
ManagedChannel channel,
ReferenceQueue<ManagedChannelOrphanWrapper> refqueue,
ConcurrentMap<ManagedChannelReference, ManagedChannelReference> refs) {
super(orphanable, refqueue);
allocationSite = new SoftReference<>(
ENABLE_ALLOCATION_TRACKING
? new RuntimeException("ManagedChannel allocation site")
: missingCallSite);
this.channelStr = channel.toString();
this.refqueue = refqueue;
this.refs = refs;
this.refs.put(this, this);
cleanQueue(refqueue);
}
/**
* This clear() is *not* called automatically by the JVM. As this is a weak ref, the reference
* will be cleared automatically by the JVM, but will not be removed from {@link #refs}.
* We do it here to avoid this ending up on the reference queue.
*/
@Override
public void clear() {
clearInternal();
// We run this here to periodically clean up the queue if at least some of the channels are
// being shutdown properly.
cleanQueue(refqueue);
}
/**
* Safe to call concurrently.
*/
private void clearSafely() {
if (!shutdown.getAndSet(true)) {
clear();
}
}
// avoid reentrancy
private void clearInternal() {
super.clear();
refs.remove(this);
allocationSite.clear();
}
private static RuntimeException missingCallSite() {
RuntimeException e = new RuntimeException(
"ManagedChannel allocation site not recorded. Set -D"
+ ALLOCATION_SITE_PROPERTY_NAME + "=true to enable it");
e.setStackTrace(new StackTraceElement[0]);
return e;
}
@VisibleForTesting
static int cleanQueue(ReferenceQueue<ManagedChannelOrphanWrapper> refqueue) {
ManagedChannelReference ref;
int orphanedChannels = 0;
while ((ref = (ManagedChannelReference) refqueue.poll()) != null) {
RuntimeException maybeAllocationSite = ref.allocationSite.get();
ref.clearInternal(); // technically the reference is gone already.
if (!ref.shutdown.get()) {
orphanedChannels++;
Level level = Level.SEVERE;
if (logger.isLoggable(level)) {
String fmt =
"*~*~*~ Channel {0} was not shutdown properly!!! ~*~*~*"
+ System.getProperty("line.separator")
+ " Make sure to call shutdown()/shutdownNow() and wait "
+ "until awaitTermination() returns true.";
LogRecord lr = new LogRecord(level, fmt);
lr.setLoggerName(logger.getName());
lr.setParameters(new Object[] {ref.channelStr});
lr.setThrown(maybeAllocationSite);
logger.log(lr);
}
}
}
return orphanedChannels;
}
}
}