census: add delayed name resolution tracing annotation (#10044)

diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java
index 8b3520a..5d55ae1 100644
--- a/api/src/main/java/io/grpc/ClientStreamTracer.java
+++ b/api/src/main/java/io/grpc/ClientStreamTracer.java
@@ -27,6 +27,12 @@
 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2861")
 @ThreadSafe
 public abstract class ClientStreamTracer extends StreamTracer {
+  /**
+   * The call was delayed due to waiting for name resolution result.
+   */
+  public static final CallOptions.Key<Boolean> NAME_RESOLUTION_DELAYED =
+      CallOptions.Key.createWithDefault("io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED",
+          false);
 
   /**
    * The stream is being created on a ready transport.
diff --git a/census/src/main/java/io/grpc/census/CensusTracingModule.java b/census/src/main/java/io/grpc/census/CensusTracingModule.java
index dfe4377..4afa08b 100644
--- a/census/src/main/java/io/grpc/census/CensusTracingModule.java
+++ b/census/src/main/java/io/grpc/census/CensusTracingModule.java
@@ -17,6 +17,7 @@
 package io.grpc.census;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
 import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -269,6 +270,9 @@
           "previous-rpc-attempts", AttributeValue.longAttributeValue(info.getPreviousAttempts()));
       attemptSpan.putAttribute(
           "transparent-retry", AttributeValue.booleanAttributeValue(info.isTransparentRetry()));
+      if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED)) {
+        span.addAnnotation("Delayed name resolution complete");
+      }
       return new ClientTracer(attemptSpan, span, tracingHeader, isSampledToLocalTracing);
     }
 
diff --git a/census/src/test/java/io/grpc/census/CensusModulesTest.java b/census/src/test/java/io/grpc/census/CensusModulesTest.java
index 2447b2c..12c71d7 100644
--- a/census/src/test/java/io/grpc/census/CensusModulesTest.java
+++ b/census/src/test/java/io/grpc/census/CensusModulesTest.java
@@ -18,6 +18,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.truth.Truth.assertWithMessage;
+import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
 import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL;
 import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL;
 import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL;
@@ -132,7 +133,8 @@
   private static final CallOptions CALL_OPTIONS =
       CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue");
   private static final ClientStreamTracer.StreamInfo STREAM_INFO =
-      ClientStreamTracer.StreamInfo.newBuilder().build();
+      ClientStreamTracer.StreamInfo.newBuilder()
+          .setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, true)).build();
 
   private static class StringInputStream extends InputStream {
     final String string;
@@ -768,6 +770,7 @@
         .putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0));
     inOrder.verify(spyAttemptSpan)
         .putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false));
+    inOrder.verify(spyClientSpan).addAnnotation("Delayed name resolution complete");
     inOrder.verify(spyAttemptSpan).addAnnotation("Delayed LB pick complete");
     inOrder.verify(spyAttemptSpan, times(2)).addMessageEvent(messageEventCaptor.capture());
     List<MessageEvent> events = messageEventCaptor.getAllValues();
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 4ab29d2..696a338 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -19,6 +19,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
 import static io.grpc.ConnectivityState.IDLE;
 import static io.grpc.ConnectivityState.SHUTDOWN;
 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
@@ -1085,7 +1086,8 @@
         ClientCall<ReqT, RespT> realCall;
         Context previous = context.attach();
         try {
-          realCall = newClientCall(method, callOptions);
+          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED, true);
+          realCall = newClientCall(method, delayResolutionOption);
         } finally {
           context.detach(previous);
         }
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 3491eab..055b648 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -19,6 +19,7 @@
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.truth.Truth.assertWithMessage;
+import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
 import static io.grpc.ConnectivityState.CONNECTING;
 import static io.grpc.ConnectivityState.IDLE;
 import static io.grpc.ConnectivityState.READY;
@@ -1072,6 +1073,50 @@
   }
 
   @Test
+  public void delayedNameResolution() {
+    ClientStream mockStream = mock(ClientStream.class);
+    final ClientStreamTracer tracer = new ClientStreamTracer() {};
+    ClientStreamTracer.Factory factory = new ClientStreamTracer.Factory() {
+      @Override
+      public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
+        return tracer;
+      }
+    };
+    FakeNameResolverFactory nsFactory = new FakeNameResolverFactory.Builder(expectedUri)
+        .setResolvedAtStart(false).build();
+    channelBuilder.nameResolverFactory(nsFactory);
+    createChannel();
+
+    CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory);
+    ClientCall<String, Integer> call = channel.newCall(method, callOptions);
+    call.start(mockCallListener, new Metadata());
+
+    nsFactory.allResolved();
+    Subchannel subchannel =
+        createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
+    requestConnectionSafely(helper, subchannel);
+    MockClientTransportInfo transportInfo = transports.poll();
+    transportInfo.listener.transportReady();
+    ClientTransport mockTransport = transportInfo.transport;
+    when(mockTransport.newStream(
+        any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
+        ArgumentMatchers.<ClientStreamTracer[]>any()))
+        .thenReturn(mockStream);
+    when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
+        PickResult.withSubchannel(subchannel));
+
+    updateBalancingStateSafely(helper, READY, mockPicker);
+    assertEquals(2, executor.runDueTasks());
+
+    verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
+    verify(mockTransport).newStream(
+        same(method), any(Metadata.class), callOptionsCaptor.capture(),
+        tracersCaptor.capture());
+    assertThat(Arrays.asList(tracersCaptor.getValue()).contains(tracer)).isTrue();
+    assertThat(callOptionsCaptor.getValue().getOption(NAME_RESOLUTION_DELAYED)).isTrue();
+  }
+
+  @Test
   public void nameResolvedAfterChannelShutdown() {
     // Delay the success of name resolution until allResolved() is called.
     FakeNameResolverFactory nameResolverFactory =