xds: ignore endpoints with health_status != HEALTHY or UNKNOWN

Let child balancer (currently is Round Robin) handle the address list with only healthy addresses for its locality. If no healthy address available, let the child balancer handleNameResolutionError.
diff --git a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
index 7eeb044..f498799 100644
--- a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
+++ b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
@@ -195,15 +195,8 @@
     private final int loadBalancingWeight;
     private final boolean isHealthy;
 
-    /** Must only be used for testing. */
-    // TODO(chengyuanzhang): migrate tests to use LbEndpoint(EquivalentAddressGroup, int, boolean)
-    //  and should add tests for LB policies dealing with unhealthy endpoints.
     @VisibleForTesting
-    LbEndpoint(EquivalentAddressGroup eag, int loadBalancingWeight) {
-      this(eag, loadBalancingWeight, true);
-    }
-
-    private LbEndpoint(EquivalentAddressGroup eag, int loadBalancingWeight, boolean isHealthy) {
+    LbEndpoint(EquivalentAddressGroup eag, int loadBalancingWeight, boolean isHealthy) {
       this.eag = eag;
       this.loadBalancingWeight = loadBalancingWeight;
       this.isHealthy = isHealthy;
diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java
index fabefd3..797f9ae 100644
--- a/xds/src/main/java/io/grpc/xds/LocalityStore.java
+++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java
@@ -210,22 +210,9 @@
       // TODO: put endPointWeights into attributes for WRR.
       for (Locality locality : newLocalities) {
         if (localityMap.containsKey(locality)) {
-          final LocalityLbInfo localityLbInfo = localityMap.get(locality);
-          LocalityLbEndpoints localityInfo = localityInfoMap.get(locality);
-          final List<EquivalentAddressGroup> eags = new ArrayList<>();
-          for (LbEndpoint endpoint: localityInfo.getEndpoints()) {
-            eags.add(endpoint.getAddress());
-          }
-          // In extreme case handleResolvedAddresses() may trigger updateBalancingState()
-          // immediately, so execute handleResolvedAddresses() after all the setup in this method is
-          // complete.
-          helper.getSynchronizationContext().execute(new Runnable() {
-            @Override
-            public void run() {
-              localityLbInfo.childBalancer.handleResolvedAddresses(
-                  ResolvedAddresses.newBuilder().setAddresses(eags).build());
-            }
-          });
+          LocalityLbInfo localityLbInfo = localityMap.get(locality);
+          LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality);
+          handleEagsOnChildBalancer(helper, localityLbInfo, localityLbEndpoints, locality);
         }
       }
 
@@ -586,33 +573,48 @@
         updatePriorityState(currentPriority);
       }
 
-      private void initLocality(final Locality locality) {
+      private void initLocality(Locality locality) {
         ChildHelper childHelper =
             new ChildHelper(locality, loadStatsStore.getLocalityCounter(locality),
                 orcaOobUtil);
-        final LocalityLbInfo localityLbInfo =
+        LocalityLbInfo localityLbInfo =
             new LocalityLbInfo(
                 loadBalancerProvider.newLoadBalancer(childHelper),
                 childHelper);
         localityMap.put(locality, localityLbInfo);
+        LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality);
+        handleEagsOnChildBalancer(childHelper, localityLbInfo, localityLbEndpoints, locality);
+      }
+    }
 
-        LocalityLbEndpoints localityInfo = localityInfoMap.get(locality);
-        final List<EquivalentAddressGroup> eags = new ArrayList<>();
-        for (LbEndpoint endpoint: localityInfo.getEndpoints()) {
+    private static void handleEagsOnChildBalancer(
+        Helper childHelper, final LocalityLbInfo localityLbInfo,
+        final LocalityLbEndpoints localityLbEndpoints, final Locality locality) {
+      final List<EquivalentAddressGroup> eags = new ArrayList<>();
+      for (LbEndpoint endpoint: localityLbEndpoints.getEndpoints()) {
+        if (endpoint.isHealthy()) {
           eags.add(endpoint.getAddress());
         }
-        // In extreme case handleResolvedAddresses() may trigger updateBalancingState() immediately,
-        // so execute handleResolvedAddresses() after all the setup in the caller is complete.
-        helper.getSynchronizationContext().execute(new Runnable() {
-          @Override
-          public void run() {
-            // TODO: put endPointWeights into attributes for WRR.
+      }
+      // In extreme case handleResolvedAddresses() may trigger updateBalancingState()
+      // immediately, so execute handleResolvedAddresses() after all the setup in the caller is
+      // complete.
+      childHelper.getSynchronizationContext().execute(new Runnable() {
+        @Override
+        public void run() {
+          if (eags.isEmpty()
+              && !localityLbInfo.childBalancer.canHandleEmptyAddressListFromNameResolution()) {
+            localityLbInfo.childBalancer.handleNameResolutionError(
+                Status.UNAVAILABLE.withDescription(
+                    "No healthy address available from EDS update '" + localityLbEndpoints
+                        + "' for locality '" + locality + "'"));
+          } else {
             localityLbInfo.childBalancer
                 .handleResolvedAddresses(ResolvedAddresses.newBuilder()
                     .setAddresses(eags).build());
           }
-        });
-      }
+        }
+      });
     }
   }
 }
diff --git a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java
index ab2ae65..a60681e 100644
--- a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java
+++ b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java
@@ -56,6 +56,7 @@
 import io.grpc.LoadBalancer.SubchannelPicker;
 import io.grpc.LoadBalancerProvider;
 import io.grpc.LoadBalancerRegistry;
+import io.grpc.Status;
 import io.grpc.SynchronizationContext;
 import io.grpc.internal.FakeClock;
 import io.grpc.internal.FakeClock.ScheduledTask;
@@ -203,14 +204,14 @@
   private final EquivalentAddressGroup eag42 =
       new EquivalentAddressGroup(new InetSocketAddress("addr42", 42));
 
-  private final LbEndpoint lbEndpoint11 = new LbEndpoint(eag11, 11);
-  private final LbEndpoint lbEndpoint12 = new LbEndpoint(eag12, 12);
-  private final LbEndpoint lbEndpoint21 = new LbEndpoint(eag21, 21);
-  private final LbEndpoint lbEndpoint22 = new LbEndpoint(eag22, 22);
-  private final LbEndpoint lbEndpoint31 = new LbEndpoint(eag31, 31);
-  private final LbEndpoint lbEndpoint32 = new LbEndpoint(eag32, 32);
-  private final LbEndpoint lbEndpoint41 = new LbEndpoint(eag41, 41);
-  private final LbEndpoint lbEndpoint42 = new LbEndpoint(eag42, 42);
+  private final LbEndpoint lbEndpoint11 = new LbEndpoint(eag11, 11, true);
+  private final LbEndpoint lbEndpoint12 = new LbEndpoint(eag12, 12, true);
+  private final LbEndpoint lbEndpoint21 = new LbEndpoint(eag21, 21, true);
+  private final LbEndpoint lbEndpoint22 = new LbEndpoint(eag22, 22, true);
+  private final LbEndpoint lbEndpoint31 = new LbEndpoint(eag31, 31, true);
+  private final LbEndpoint lbEndpoint32 = new LbEndpoint(eag32, 32, true);
+  private final LbEndpoint lbEndpoint41 = new LbEndpoint(eag41, 41, true);
+  private final LbEndpoint lbEndpoint42 = new LbEndpoint(eag42, 42, true);
 
   private final Map<String, Locality> namedLocalities
       = ImmutableMap.of("sz1", locality1, "sz2", locality2, "sz3", locality3, "sz4", locality4);
@@ -793,6 +794,105 @@
   }
 
   @Test
+  public void updateLoaclityStore_withUnHealthyEndPoints() {
+    LbEndpoint lbEndpoint11 = new LbEndpoint(eag11, 11, true);
+    LbEndpoint lbEndpoint12 = new LbEndpoint(eag12, 12, true);
+    LbEndpoint lbEndpoint21 = new LbEndpoint(eag21, 21, false); // unhealthy
+    LbEndpoint lbEndpoint22 = new LbEndpoint(eag22, 22, true);
+    LbEndpoint lbEndpoint31 = new LbEndpoint(eag31, 31, false); // unhealthy
+    LbEndpoint lbEndpoint32 = new LbEndpoint(eag32, 32, false); // unhealthy
+    LbEndpoint lbEndpoint41 = new LbEndpoint(eag41, 41, true);
+    LbEndpoint lbEndpoint42 = new LbEndpoint(eag42, 42, true);
+    LocalityLbEndpoints localityInfo1 =
+        new LocalityLbEndpoints(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1, 0);
+    LocalityLbEndpoints localityInfo2 =
+        new LocalityLbEndpoints(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2, 0);
+    LocalityLbEndpoints localityInfo3 =
+        new LocalityLbEndpoints(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3, 0);
+    LocalityLbEndpoints localityInfo4 =
+        new LocalityLbEndpoints(ImmutableList.of(lbEndpoint41, lbEndpoint42), 4, 0);
+    ImmutableMap<Locality, LocalityLbEndpoints> localityInfoMap = ImmutableMap.of(
+        locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3,
+        locality4, localityInfo4);
+    localityStore.updateLocalityStore(localityInfoMap);
+    verify(helper).updateBalancingState(CONNECTING, BUFFER_PICKER);
+
+    assertThat(loadBalancers).hasSize(4);
+    assertThat(loadBalancers.keySet()).containsExactly("sz1", "sz2", "sz3", "sz4");
+    ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor1 =
+        ArgumentCaptor.forClass(ResolvedAddresses.class);
+    verify(loadBalancers.get("sz1")).handleResolvedAddresses(resolvedAddressesCaptor1.capture());
+    assertThat(resolvedAddressesCaptor1.getValue().getAddresses()).containsExactly(eag11, eag12);
+    ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor2 =
+        ArgumentCaptor.forClass(ResolvedAddresses.class);
+    verify(loadBalancers.get("sz2")).handleResolvedAddresses(resolvedAddressesCaptor2.capture());
+    assertThat(resolvedAddressesCaptor2.getValue().getAddresses()).containsExactly(eag22);
+    verify(loadBalancers.get("sz3"), never()).handleResolvedAddresses(any(ResolvedAddresses.class));
+    ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
+    verify(loadBalancers.get("sz3")).handleNameResolutionError(statusCaptor.capture());
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
+    ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor4 =
+        ArgumentCaptor.forClass(ResolvedAddresses.class);
+    verify(loadBalancers.get("sz4")).handleResolvedAddresses(resolvedAddressesCaptor4.capture());
+    assertThat(resolvedAddressesCaptor4.getValue().getAddresses()).containsExactly(eag41, eag42);
+    // verify no more updateBalancingState except the initial CONNECTING state
+    verify(helper, times(1)).updateBalancingState(
+        any(ConnectivityState.class), any(SubchannelPicker.class));
+
+    // update with different healthy status
+    lbEndpoint11 = new LbEndpoint(eag11, 11, false); // unhealthy
+    lbEndpoint12 = new LbEndpoint(eag12, 12, false); // unhealthy
+    lbEndpoint21 = new LbEndpoint(eag21, 21, true);
+    lbEndpoint22 = new LbEndpoint(eag22, 22, false); // unhealthy
+    lbEndpoint31 = new LbEndpoint(eag31, 31, true);
+    lbEndpoint32 = new LbEndpoint(eag32, 32, false); // unhealthy
+    localityInfo1 =
+        new LocalityLbEndpoints(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1, 0);
+    localityInfo2 =
+        new LocalityLbEndpoints(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2, 0);
+    localityInfo3 =
+        new LocalityLbEndpoints(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3, 0);
+    localityInfoMap = ImmutableMap.of(
+        locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3);
+    localityStore.updateLocalityStore(localityInfoMap);
+
+    verify(loadBalancers.get("sz1"), times(1))
+        .handleResolvedAddresses(any(ResolvedAddresses.class));
+    verify(loadBalancers.get("sz1"), times(1)).handleNameResolutionError(statusCaptor.capture());
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
+    verify(loadBalancers.get("sz2"), times(2))
+        .handleResolvedAddresses(resolvedAddressesCaptor2.capture());
+    assertThat(resolvedAddressesCaptor2.getValue().getAddresses()).containsExactly(eag21);
+    ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor3 =
+        ArgumentCaptor.forClass(ResolvedAddresses.class);
+    verify(loadBalancers.get("sz3")).handleResolvedAddresses(resolvedAddressesCaptor3.capture());
+    assertThat(resolvedAddressesCaptor3.getValue().getAddresses()).containsExactly(eag31);
+    verify(helper, times(2)).updateBalancingState(CONNECTING, BUFFER_PICKER);
+
+    // update with all endpoints unhealthy
+    lbEndpoint11 = new LbEndpoint(eag11, 11, false); // unhealthy
+    lbEndpoint12 = new LbEndpoint(eag12, 12, false); // unhealthy
+    lbEndpoint31 = new LbEndpoint(eag31, 31, false); // unhealthy
+    lbEndpoint32 = new LbEndpoint(eag32, 32, false); // unhealthy
+    localityInfo1 =
+        new LocalityLbEndpoints(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1, 0);
+    localityInfo3 =
+        new LocalityLbEndpoints(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3, 0);
+    localityInfoMap = ImmutableMap.of(locality1, localityInfo1, locality3, localityInfo3);
+    localityStore.updateLocalityStore(localityInfoMap);
+
+    verify(loadBalancers.get("sz1"), times(2)).handleNameResolutionError(statusCaptor.capture());
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
+    verify(loadBalancers.get("sz3"), times(2)).handleNameResolutionError(statusCaptor.capture());
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
+
+    // mimic child balancers update subchannels to TRANSIENT_FAILURE after handleNameResolutionError
+    childHelpers.get("sz1").updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(UNAVAILABLE));
+    childHelpers.get("sz3").updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(UNAVAILABLE));
+    verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
+  }
+
+  @Test
   public void updateLocalityStore_OnlyUpdatingWeightsStillUpdatesPicker() {
     LocalityLbEndpoints localityInfo1 =
         new LocalityLbEndpoints(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1, 0);