grpclb: support multiple authorities in lb backends for all SRV records (#7951)
diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java
index b0cf979..04431fb 100644
--- a/api/src/main/java/io/grpc/LoadBalancer.java
+++ b/api/src/main/java/io/grpc/LoadBalancer.java
@@ -930,12 +930,17 @@
*
* @since 1.4.0
*/
- // TODO(ejona): Allow passing a List<EAG> here and to updateOobChannelAddresses, but want to
- // wait until https://github.com/grpc/grpc-java/issues/4469 is done.
- // https://github.com/grpc/grpc-java/issues/4618
public abstract ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority);
/**
+ * Accept a list of EAG for multiple authorities: https://github.com/grpc/grpc-java/issues/4618
+ * */
+ public ManagedChannel createOobChannel(List<EquivalentAddressGroup> eag,
+ String authority) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Updates the addresses used for connections in the {@code Channel} that was created by {@link
* #createOobChannel(EquivalentAddressGroup, String)}. This is superior to {@link
* #createOobChannel(EquivalentAddressGroup, String)} when the old and new addresses overlap,
@@ -950,6 +955,15 @@
}
/**
+ * Updates the addresses with a new EAG list. Connection is continued when old and new addresses
+ * overlap.
+ * */
+ public void updateOobChannelAddresses(ManagedChannel channel,
+ List<EquivalentAddressGroup> eag) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Creates an out-of-band channel for LoadBalancer's own RPC needs, e.g., talking to an external
* load-balancer service, that is specified by a target string. See the documentation on
* {@link ManagedChannelBuilder#forTarget} for the format of a target string.
diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
index 97be3f2..5ec8e94 100644
--- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java
+++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
@@ -339,12 +339,12 @@
Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
+ final List<EquivalentAddressGroup> newImmutableAddressGroups =
+ Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
syncContext.execute(new Runnable() {
@Override
public void run() {
- List<EquivalentAddressGroup> newImmutableAddressGroups =
- Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
ManagedClientTransport savedTransport = null;
SocketAddress previousAddress = addressIndex.getCurrentAddress();
addressIndex.updateGroups(newImmutableAddressGroups);
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 0ab8d49..ac9c78a 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -1465,6 +1465,12 @@
@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
+ return createOobChannel(Collections.singletonList(addressGroup), authority);
+ }
+
+ @Override
+ public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
+ String authority) {
// TODO(ejona): can we be even stricter? Like terminating?
checkState(!terminated, "Channel is terminated");
long oobChannelCreationTime = timeProvider.currentTimeNanos();
@@ -1505,7 +1511,7 @@
}
final InternalSubchannel internalSubchannel = new InternalSubchannel(
- Collections.singletonList(addressGroup),
+ addressGroup,
authority, userAgent, backoffPolicyProvider, oobTransportFactory,
oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
// All callback methods are run from syncContext
@@ -1625,6 +1631,12 @@
@Override
public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
+ updateOobChannelAddresses(channel, Collections.singletonList(eag));
+ }
+
+ @Override
+ public void updateOobChannelAddresses(ManagedChannel channel,
+ List<EquivalentAddressGroup> eag) {
checkArgument(channel instanceof OobChannel,
"channel must have been returned from createOobChannel");
((OobChannel) channel).updateAddresses(eag);
diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java
index 5e0dd12..f69fd17 100644
--- a/core/src/main/java/io/grpc/internal/OobChannel.java
+++ b/core/src/main/java/io/grpc/internal/OobChannel.java
@@ -193,8 +193,8 @@
delayedTransport.reprocess(subchannelPicker);
}
- void updateAddresses(EquivalentAddressGroup eag) {
- subchannel.updateAddresses(Collections.singletonList(eag));
+ void updateAddresses(List<EquivalentAddressGroup> eag) {
+ subchannel.updateAddresses(eag);
}
@Override
diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java
index 033524f..4229957 100644
--- a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java
+++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java
@@ -31,6 +31,7 @@
import io.grpc.NameResolver;
import io.grpc.NameResolverRegistry;
import io.grpc.SynchronizationContext;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
@@ -51,10 +52,20 @@
}
@Override
+ public ManagedChannel createOobChannel(List<EquivalentAddressGroup> eag, String authority) {
+ return delegate().createOobChannel(eag, authority);
+ }
+
+ @Override
public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
delegate().updateOobChannelAddresses(channel, eag);
}
+ @Override
+ public void updateOobChannelAddresses(ManagedChannel channel, List<EquivalentAddressGroup> eag) {
+ delegate().updateOobChannelAddresses(channel, eag);
+ }
+
@Deprecated
@Override
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
index a2a2925..6a76f75 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
@@ -402,7 +402,7 @@
assertFalse(channel.inUseStateAggregator.isInUse());
// Now make an RPC on an OOB channel
- ManagedChannel oob = helper.createOobChannel(servers.get(0), "oobauthority");
+ ManagedChannel oob = helper.createOobChannel(servers, "oobauthority");
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class),
@@ -438,13 +438,13 @@
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
deliverResolutionResult();
Helper helper = helperCaptor.getValue();
- ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost");
+ ManagedChannel oobChannel = helper.createOobChannel(servers.subList(0,1), "localhost");
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
- helper.updateOobChannelAddresses(oobChannel, servers.get(1));
+ helper.updateOobChannelAddresses(oobChannel, servers.subList(1,2));
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t1 = newTransports.poll();
@@ -462,7 +462,7 @@
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
deliverResolutionResult();
- ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost");
+ ManagedChannel oobChannel = helper.createOobChannel(servers.subList(0,1), "localhost");
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t0 = newTransports.poll();
@@ -470,7 +470,8 @@
List<SocketAddress> changedList = new ArrayList<>(servers.get(0).getAddresses());
changedList.add(new FakeSocketAddress("aDifferentServer"));
- helper.updateOobChannelAddresses(oobChannel, new EquivalentAddressGroup(changedList));
+ helper.updateOobChannelAddresses(oobChannel, Collections.singletonList(
+ new EquivalentAddressGroup(changedList)));
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
assertNull(newTransports.poll());
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 9291363..7f89689 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -704,7 +704,8 @@
@Test
public void channelzMembership_oob() throws Exception {
createChannel();
- OobChannel oob = (OobChannel) helper.createOobChannel(addressGroup, AUTHORITY);
+ OobChannel oob = (OobChannel) helper.createOobChannel(
+ Collections.singletonList(addressGroup), AUTHORITY);
// oob channels are not root channels
assertNull(channelz.getRootChannel(oob.getLogId().getId()));
assertTrue(channelz.containsSubchannel(oob.getLogId()));
@@ -1621,8 +1622,10 @@
public void oobchannels() {
createChannel();
- ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority");
- ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority");
+ ManagedChannel oob1 = helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oob1authority");
+ ManagedChannel oob2 = helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oob2authority");
verify(balancerRpcExecutorPool, times(2)).getObject();
assertEquals("oob1authority", oob1.authority());
@@ -1755,7 +1758,8 @@
.containsExactly(channelCredValue, callCredValue).inOrder();
// Verify that the oob channel does not
- ManagedChannel oob = helper.createOobChannel(addressGroup, "oobauthority");
+ ManagedChannel oob = helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oobauthority");
headers = new Metadata();
call = oob.newCall(method, callOptions);
@@ -1886,8 +1890,10 @@
@Test
public void oobChannelsWhenChannelShutdownNow() {
createChannel();
- ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
- ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
+ ManagedChannel oob1 = helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oob1Authority");
+ ManagedChannel oob2 = helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oob2Authority");
oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata());
@@ -1915,8 +1921,10 @@
@Test
public void oobChannelsNoConnectionShutdown() {
createChannel();
- ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
- ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
+ ManagedChannel oob1 = helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oob1Authority");
+ ManagedChannel oob2 = helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oob2Authority");
channel.shutdown();
verify(mockLoadBalancer).shutdown();
@@ -1934,8 +1942,8 @@
@Test
public void oobChannelsNoConnectionShutdownNow() {
createChannel();
- helper.createOobChannel(addressGroup, "oob1Authority");
- helper.createOobChannel(addressGroup, "oob2Authority");
+ helper.createOobChannel(Collections.singletonList(addressGroup), "oob1Authority");
+ helper.createOobChannel(Collections.singletonList(addressGroup), "oob2Authority");
channel.shutdownNow();
verify(mockLoadBalancer).shutdown();
@@ -2116,7 +2124,8 @@
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
if (isOobChannel) {
- OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority");
+ OobChannel oobChannel = (OobChannel) helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oobAuthority");
oobChannel.getSubchannel().requestConnection();
} else {
Subchannel subchannel =
@@ -3183,7 +3192,8 @@
public void channelTracing_oobChannelStateChangeEvent() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
- OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
+ OobChannel oobChannel = (OobChannel) helper.createOobChannel(
+ Collections.singletonList(addressGroup), "authority");
timer.forwardNanos(1234);
oobChannel.handleSubchannelStateChange(
ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
@@ -3199,7 +3209,8 @@
channelBuilder.maxTraceEvents(10);
createChannel();
timer.forwardNanos(1234);
- OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
+ OobChannel oobChannel = (OobChannel) helper.createOobChannel(
+ Collections.singletonList(addressGroup), "authority");
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Child OobChannel created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
@@ -3207,13 +3218,13 @@
.setChannelRef(oobChannel)
.build());
assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
- .setDescription("OobChannel for [[test-addr]/{}] created")
+ .setDescription("OobChannel for [[[test-addr]/{}]] created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains(
new ChannelTrace.Event.Builder()
- .setDescription("Subchannel for [[test-addr]/{}] created")
+ .setDescription("Subchannel for [[[test-addr]/{}]] created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
@@ -3349,7 +3360,8 @@
ClientStream mockStream = mock(ClientStream.class);
createChannel();
- OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
+ OobChannel oobChannel = (OobChannel) helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oobauthority");
AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel();
FakeClock callExecutor = new FakeClock();
CallOptions options =
@@ -3411,7 +3423,8 @@
createChannel();
String authority = "oobauthority";
- OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, authority);
+ OobChannel oobChannel = (OobChannel) helper.createOobChannel(
+ Collections.singletonList(addressGroup), authority);
assertEquals(authority, getStats(oobChannel).target);
}
@@ -3419,7 +3432,8 @@
public void channelsAndSubchannels_oob_instrumented_state() throws Exception {
createChannel();
- OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
+ OobChannel oobChannel = (OobChannel) helper.createOobChannel(
+ Collections.singletonList(addressGroup), "oobauthority");
assertEquals(IDLE, getStats(oobChannel).state);
oobChannel.getSubchannel().requestConnection();
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
index ddb4170..1a8dec3 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
@@ -75,26 +75,28 @@
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Attributes attributes = resolvedAddresses.getAttributes();
List<EquivalentAddressGroup> newLbAddresses = attributes.get(GrpclbConstants.ATTR_LB_ADDRS);
- if ((newLbAddresses == null || newLbAddresses.isEmpty())
- && resolvedAddresses.getAddresses().isEmpty()) {
+ if (newLbAddresses == null) {
+ newLbAddresses = Collections.emptyList();
+ }
+ if (newLbAddresses.isEmpty() && resolvedAddresses.getAddresses().isEmpty()) {
handleNameResolutionError(
Status.UNAVAILABLE.withDescription("No backend or balancer addresses found"));
return;
}
- List<LbAddressGroup> newLbAddressGroups = new ArrayList<>();
-
- if (newLbAddresses != null) {
- for (EquivalentAddressGroup lbAddr : newLbAddresses) {
- String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
- if (lbAddrAuthority == null) {
- throw new AssertionError(
- "This is a bug: LB address " + lbAddr + " does not have an authority.");
- }
- newLbAddressGroups.add(new LbAddressGroup(lbAddr, lbAddrAuthority));
+ List<EquivalentAddressGroup> overrideAuthorityLbAddresses =
+ new ArrayList<>(newLbAddresses.size());
+ for (EquivalentAddressGroup lbAddr : newLbAddresses) {
+ String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
+ if (lbAddrAuthority == null) {
+ throw new AssertionError(
+ "This is a bug: LB address " + lbAddr + " does not have an authority.");
}
+ Attributes attrs = lbAddr.getAttributes().toBuilder()
+ .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, lbAddrAuthority)
+ .build();
+ overrideAuthorityLbAddresses.add(new EquivalentAddressGroup(lbAddr.getAddresses(), attrs));
}
- newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups);
List<EquivalentAddressGroup> newBackendServers =
Collections.unmodifiableList(resolvedAddresses.getAddresses());
GrpclbConfig newConfig = (GrpclbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
@@ -106,7 +108,8 @@
helper.getChannelLogger().log(ChannelLogLevel.INFO, "Config: " + newConfig);
recreateStates();
}
- grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
+ grpclbState.handleAddresses(Collections.unmodifiableList(overrideAuthorityLbAddresses),
+ newBackendServers);
}
@Override
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
index a241453..b2d8b45 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
@@ -63,7 +63,6 @@
import io.grpc.stub.StreamObserver;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -108,6 +107,8 @@
return "BUFFER_ENTRY";
}
};
+ @VisibleForTesting
+ static final String NO_USE_AUTHORITY_SUFFIX = "-notIntendedToBeUsed";
enum Mode {
ROUND_ROBIN,
@@ -224,7 +225,8 @@
* not yet connected.
*/
void handleAddresses(
- List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) {
+ List<EquivalentAddressGroup> newLbAddressGroups,
+ List<EquivalentAddressGroup> newBackendServers) {
logger.log(
ChannelLogLevel.DEBUG,
"[grpclb-<{0}>] Resolved addresses: lb addresses {0}, backends: {1}",
@@ -237,8 +239,7 @@
shutdownLbComm();
syncContext.execute(new FallbackModeTask());
} else {
- LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups);
- startLbComm(newLbAddressGroup);
+ startLbComm(newLbAddressGroups);
// Avoid creating a new RPC just because the addresses were updated, as it can cause a
// stampeding herd. The current RPC may be on a connection to an address not present in
// newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an
@@ -318,24 +319,20 @@
}
}
- private void startLbComm(LbAddressGroup lbAddressGroup) {
- checkNotNull(lbAddressGroup, "lbAddressGroup");
+ private void startLbComm(List<EquivalentAddressGroup> overrideAuthorityEags) {
+ checkNotNull(overrideAuthorityEags, "overrideAuthorityEags");
+ assert !overrideAuthorityEags.isEmpty();
+ String doNotUseAuthority = overrideAuthorityEags.get(0).getAttributes()
+ .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE) + NO_USE_AUTHORITY_SUFFIX;
if (lbCommChannel == null) {
- lbCommChannel = helper.createOobChannel(
- lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority());
+ lbCommChannel = helper.createOobChannel(overrideAuthorityEags, doNotUseAuthority);
logger.log(
ChannelLogLevel.DEBUG,
- "[grpclb-<{0}>] Created grpclb channel: address={1}, authority={2}",
+ "[grpclb-<{0}>] Created grpclb channel: EAG={1}",
serviceName,
- lbAddressGroup.getAddresses(),
- lbAddressGroup.getAuthority());
- } else if (lbAddressGroup.getAuthority().equals(lbCommChannel.authority())) {
- helper.updateOobChannelAddresses(lbCommChannel, lbAddressGroup.getAddresses());
+ overrideAuthorityEags);
} else {
- // Full restart of channel
- shutdownLbComm();
- lbCommChannel = helper.createOobChannel(
- lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority());
+ helper.updateOobChannelAddresses(lbCommChannel, overrideAuthorityEags);
}
}
@@ -867,47 +864,6 @@
helper.updateBalancingState(state, picker);
}
- private LbAddressGroup flattenLbAddressGroups(List<LbAddressGroup> groupList) {
- assert !groupList.isEmpty();
- List<EquivalentAddressGroup> eags = new ArrayList<>(groupList.size());
- String authority = groupList.get(0).getAuthority();
- for (LbAddressGroup group : groupList) {
- if (!authority.equals(group.getAuthority())) {
- // TODO(ejona): Allow different authorities for different addresses. Requires support from
- // Helper.
- logger.log(ChannelLogLevel.WARNING,
- "[grpclb-<{0}>] Multiple authorities found for LB. "
- + "Skipping addresses for {1} in preference to {2}",
- serviceName,
- group.getAuthority(),
- authority);
- } else {
- eags.add(group.getAddresses());
- }
- }
- // ALTS code can use the presence of ATTR_LB_ADDR_AUTHORITY to select ALTS instead of TLS, with
- // Netty.
- // TODO(ejona): The process here is a bit of a hack because ATTR_LB_ADDR_AUTHORITY isn't
- // actually used in the normal case. https://github.com/grpc/grpc-java/issues/4618 should allow
- // this to be more obvious.
- Attributes attrs = Attributes.newBuilder()
- .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority)
- .build();
- return new LbAddressGroup(flattenEquivalentAddressGroup(eags, attrs), authority);
- }
-
- /**
- * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object.
- */
- private static EquivalentAddressGroup flattenEquivalentAddressGroup(
- List<EquivalentAddressGroup> groupList, Attributes attrs) {
- List<SocketAddress> addrs = new ArrayList<>();
- for (EquivalentAddressGroup group : groupList) {
- addrs.addAll(group.getAddresses());
- }
- return new EquivalentAddressGroup(addrs, attrs);
- }
-
private static Attributes createSubchannelAttrs() {
return Attributes.newBuilder()
.set(STATE_INFO,
diff --git a/grpclb/src/main/java/io/grpc/grpclb/LbAddressGroup.java b/grpclb/src/main/java/io/grpc/grpclb/LbAddressGroup.java
deleted file mode 100644
index cca096f..0000000
--- a/grpclb/src/main/java/io/grpc/grpclb/LbAddressGroup.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2016 The gRPC Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.grpc.grpclb;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import io.grpc.EquivalentAddressGroup;
-
-/**
- * Represents a balancer address entry.
- */
-final class LbAddressGroup {
- private final EquivalentAddressGroup addresses;
- private final String authority;
-
- LbAddressGroup(EquivalentAddressGroup addresses, String authority) {
- this.addresses = checkNotNull(addresses, "addresses");
- this.authority = checkNotNull(authority, "authority");
- }
-
- EquivalentAddressGroup getAddresses() {
- return addresses;
- }
-
- String getAuthority() {
- return authority;
- }
-}
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
index c7ed8b1..e768cf0 100644
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
@@ -25,6 +25,7 @@
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY;
import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT;
+import static io.grpc.grpclb.GrpclbState.NO_USE_AUTHORITY_SUFFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -108,6 +109,7 @@
import org.junit.runners.JUnit4;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
@@ -801,11 +803,11 @@
// Recover with a subsequent success
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
- EquivalentAddressGroup eag = grpclbBalancerList.get(0);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
- verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0)));
+ verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
+ eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
}
@@ -816,7 +818,8 @@
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
- verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
+ verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
+ eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@@ -853,28 +856,27 @@
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
- verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
+ verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
+ eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
ManagedChannel oobChannel = fakeOobChannels.poll();
assertEquals(1, lbRequestObservers.size());
List<EquivalentAddressGroup> backendList2 = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList2 = createResolvedBalancerAddresses(2);
- EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList(
- grpclbBalancerList2.get(0).getAddresses().get(0),
- grpclbBalancerList2.get(1).getAddresses().get(0)),
- lbAttributes(lbAuthority(0)));
deliverResolvedAddresses(backendList2, grpclbBalancerList2);
- verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag));
+ verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2)));
assertEquals(1, lbRequestObservers.size()); // No additional RPC
}
+
@Test
public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() {
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
- verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
+ verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
+ eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
ManagedChannel oobChannel = fakeOobChannels.poll();
assertEquals(1, lbRequestObservers.size());
@@ -885,9 +887,8 @@
new EquivalentAddressGroup(
new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority)));
deliverResolvedAddresses(backendList2, grpclbBalancerList2);
- assertTrue(oobChannel.isTerminated());
- verify(helper).createOobChannel(eq(grpclbBalancerList2.get(0)), eq(newAuthority));
- assertEquals(2, lbRequestObservers.size()); // An additional RPC
+ verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2)));
+ assertEquals(1, lbRequestObservers.size()); // No additional RPC
}
@Test
@@ -899,7 +900,8 @@
// Fallback timer is started as soon as the addresses are resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
- verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
+ verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
+ eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@@ -1214,8 +1216,8 @@
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
- inOrder.verify(helper)
- .createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
+ inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
+ eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
@@ -1275,12 +1277,7 @@
// New addresses are updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
- same(oobChannel),
- eq(new EquivalentAddressGroup(
- Arrays.asList(
- grpclbBalancerList.get(0).getAddresses().get(0),
- grpclbBalancerList.get(1).getAddresses().get(0)),
- lbAttributes(lbAuthority(0)))));
+ same(oobChannel), eq(xattr(grpclbBalancerList)));
if (timerExpires) {
// Still in fallback logic, except that the backend list is empty
@@ -1299,8 +1296,7 @@
// New LB address is updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
- same(oobChannel),
- eq(grpclbBalancerList.get(0)));
+ same(oobChannel), eq(xattr(grpclbBalancerList)));
if (timerExpires) {
// New backend addresses are used for fallback
@@ -1365,7 +1361,8 @@
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
- inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
+ inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
+ eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
// Attempted to connect to balancer
assertThat(fakeOobChannels).hasSize(1);
@@ -1430,7 +1427,7 @@
// No fallback timeout timer scheduled.
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
verify(helper, never())
- .createOobChannel(any(EquivalentAddressGroup.class), anyString());
+ .createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>anyList(), anyString());
}
@Test
@@ -1459,7 +1456,8 @@
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
- inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
+ inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
+ eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
@@ -1609,16 +1607,36 @@
lbAttributes("fake-authority-2")),
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-3"),
- lbAttributes("fake-authority-1")));
- final EquivalentAddressGroup goldenOobChannelEag = new EquivalentAddressGroup(
- Arrays.<SocketAddress>asList(
- new FakeSocketAddress("fake-address-1"),
- new FakeSocketAddress("fake-address-3")),
- lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day
-
+ lbAttributes("fake-authority-1").toBuilder()
+ .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value").build()
+ ));
deliverResolvedAddresses(backendList, grpclbBalancerList);
- verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1");
+ List<EquivalentAddressGroup> goldenOobEagList =
+ Arrays.asList(
+ new EquivalentAddressGroup(
+ new FakeSocketAddress("fake-address-1"),
+ Attributes.newBuilder()
+ .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1")
+ .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1")
+ .build()),
+ new EquivalentAddressGroup(
+ new FakeSocketAddress("fake-address-2"),
+ Attributes.newBuilder()
+ .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-2")
+ .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-2")
+ .build()),
+ new EquivalentAddressGroup(
+ new FakeSocketAddress("fake-address-3"),
+ Attributes.newBuilder()
+ .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1")
+ .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value")
+ .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1")
+ .build()
+ ));
+
+ verify(helper).createOobChannel(eq(goldenOobEagList),
+ eq("fake-authority-1" + NO_USE_AUTHORITY_SUFFIX));
}
@Test
@@ -2323,13 +2341,8 @@
// Fallback timer is started as soon as the addresses are resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
-
- List<SocketAddress> addrs = new ArrayList<>();
- addrs.addAll(grpclbBalancerList.get(0).getAddresses());
- addrs.addAll(grpclbBalancerList.get(1).getAddresses());
- Attributes attr = grpclbBalancerList.get(0).getAttributes();
- EquivalentAddressGroup oobChannelEag = new EquivalentAddressGroup(addrs, attr);
- verify(helper).createOobChannel(eq(oobChannelEag), eq(lbAuthority(0)));
+ verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
+ eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@@ -2632,6 +2645,18 @@
.build();
}
+ private List<EquivalentAddressGroup> xattr(List<EquivalentAddressGroup> lbAddr) {
+ List<EquivalentAddressGroup> oobAddr = new ArrayList<>(lbAddr.size());
+ for (EquivalentAddressGroup lb : lbAddr) {
+ String authority = lb.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
+ Attributes attrs = lb.getAttributes().toBuilder()
+ .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, authority)
+ .build();
+ oobAddr.add(new EquivalentAddressGroup(lb.getAddresses(), attrs));
+ }
+ return oobAddr;
+ }
+
private static class ServerEntry {
final InetSocketAddress addr;
final String token;
@@ -2699,7 +2724,7 @@
}
@Override
- public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
+ public ManagedChannel createOobChannel(List<EquivalentAddressGroup> eag, String authority) {
ManagedChannel channel =
InProcessChannelBuilder
.forName("fakeLb")
@@ -2712,6 +2737,11 @@
}
@Override
+ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
+ return createOobChannel(Collections.singletonList(eag), authority);
+ }
+
+ @Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
FakeSubchannel subchannel =
mock(
@@ -2751,5 +2781,10 @@
@Override
public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
}
+
+ @Override
+ public void updateOobChannelAddresses(ManagedChannel channel,
+ List<EquivalentAddressGroup> eag) {
+ }
}
}