From b092a29c5dee537c382cdd8ba1e1ac62419c7e43 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 9 Sep 2019 16:39:59 -0700 Subject: [PATCH] core: Propagate EAG Attributes in RoundRobinLoadBalancer This allows plumbing information to the subchannel via Attributes, like an authority override. RR still does not support multiple EAGs that only differ by attributes. --- .../io/grpc/util/RoundRobinLoadBalancer.java | 39 +++++++++++++------ ...AutoConfiguredLoadBalancerFactoryTest.java | 9 ++++- .../grpc/util/RoundRobinLoadBalancerTest.java | 34 +++++++--------- 3 files changed, 50 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index cab48b9e5c..093a4c9377 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -42,6 +42,7 @@ import io.grpc.internal.GrpcAttributes; import io.grpc.internal.ServiceConfigUtil; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -88,9 +89,8 @@ final class RoundRobinLoadBalancer extends LoadBalancer { List servers = resolvedAddresses.getAddresses(); Attributes attributes = resolvedAddresses.getAttributes(); Set currentAddrs = subchannels.keySet(); - Set latestAddrs = stripAttrs(servers); - Set addedAddrs = setsDifference(latestAddrs, currentAddrs); - Set removedAddrs = setsDifference(currentAddrs, latestAddrs); + Map latestAddrs = stripAttrs(servers); + Set removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet()); Map serviceConfig = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); if (serviceConfig != null) { @@ -109,8 +109,18 @@ final class RoundRobinLoadBalancer extends LoadBalancer { } } - // Create new subchannels for new addresses. - for (EquivalentAddressGroup addressGroup : addedAddrs) { + for (Map.Entry latestEntry : + latestAddrs.entrySet()) { + EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey(); + EquivalentAddressGroup originalAddressGroup = latestEntry.getValue(); + Subchannel existingSubchannel = subchannels.get(strippedAddressGroup); + if (existingSubchannel != null) { + // EAG's Attributes may have changed. + existingSubchannel.updateAddresses(Collections.singletonList(originalAddressGroup)); + continue; + } + // Create new subchannels for new addresses. + // NB(lukaszx0): we don't merge `attributes` with `subchannelAttr` because subchannel // doesn't need them. They're describing the resolved server list but we're not taking // any action based on this information. @@ -128,7 +138,7 @@ final class RoundRobinLoadBalancer extends LoadBalancer { final Subchannel subchannel = checkNotNull( helper.createSubchannel(CreateSubchannelArgs.newBuilder() - .setAddresses(addressGroup) + .setAddresses(originalAddressGroup) .setAttributes(subchannelAttrs.build()) .build()), "subchannel"); @@ -141,7 +151,7 @@ final class RoundRobinLoadBalancer extends LoadBalancer { if (stickyRef != null) { stickyRef.value = subchannel; } - subchannels.put(addressGroup, subchannel); + subchannels.put(strippedAddressGroup, subchannel); subchannel.requestConnection(); } @@ -168,7 +178,7 @@ final class RoundRobinLoadBalancer extends LoadBalancer { } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { - if (subchannels.get(subchannel.getAddresses()) != subchannel) { + if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) { return; } if (stateInfo.getState() == SHUTDOWN && stickinessState != null) { @@ -257,16 +267,21 @@ final class RoundRobinLoadBalancer extends LoadBalancer { /** * Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and - * remove all attributes. + * remove all attributes. The values are the original EAGs. */ - private static Set stripAttrs(List groupList) { - Set addrs = new HashSet<>(groupList.size()); + private static Map stripAttrs( + List groupList) { + Map addrs = new HashMap<>(groupList.size() * 2); for (EquivalentAddressGroup group : groupList) { - addrs.add(new EquivalentAddressGroup(group.getAddresses())); + addrs.put(stripAttrs(group), group); } return addrs; } + private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { + return new EquivalentAddressGroup(eag.getAddresses()); + } + @VisibleForTesting Collection getSubchannels() { return subchannels.values(); diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java index 251bc5641a..496d2ca50e 100644 --- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import com.google.common.base.Preconditions; import io.grpc.Attributes; import io.grpc.ChannelLogger; import io.grpc.ChannelLogger.ChannelLogLevel; @@ -851,7 +852,7 @@ public class AutoConfiguredLoadBalancerFactoryTest { this.attrs = args.getAttributes(); } - final List addrs; + List addrs; final Attributes attrs; @Override @@ -875,6 +876,12 @@ public class AutoConfiguredLoadBalancerFactoryTest { public Attributes getAttributes() { return attrs; } + + @Override + public void updateAddresses(List addrs) { + Preconditions.checkNotNull(addrs, "addrs"); + this.addrs = addrs; + } } private static final class FakeLoadBalancerProvider extends LoadBalancerProvider { diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index a9895df954..90c9acb2fc 100644 --- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -195,25 +195,22 @@ public class RoundRobinLoadBalancerTest { Subchannel oldSubchannel = mock(Subchannel.class); Subchannel newSubchannel = mock(Subchannel.class); + Attributes.Key key = Attributes.Key.create("check-that-it-is-propagated"); FakeSocketAddress removedAddr = new FakeSocketAddress("removed"); + EquivalentAddressGroup removedEag = new EquivalentAddressGroup(removedAddr); FakeSocketAddress oldAddr = new FakeSocketAddress("old"); + EquivalentAddressGroup oldEag1 = new EquivalentAddressGroup(oldAddr); + EquivalentAddressGroup oldEag2 = new EquivalentAddressGroup( + oldAddr, Attributes.newBuilder().set(key, "oldattr").build()); FakeSocketAddress newAddr = new FakeSocketAddress("new"); + EquivalentAddressGroup newEag = new EquivalentAddressGroup( + newAddr, Attributes.newBuilder().set(key, "newattr").build()); - List allSubchannels = - Lists.newArrayList(removedSubchannel, oldSubchannel, newSubchannel); - List allAddrs = - Lists.newArrayList(removedAddr, oldAddr, newAddr); - for (int i = 0; i < allSubchannels.size(); i++) { - Subchannel subchannel = allSubchannels.get(i); - List eagList = - Arrays.asList(new EquivalentAddressGroup(allAddrs.get(i))); - subchannels.put(eagList, subchannel); - } + subchannels.put(Collections.singletonList(removedEag), removedSubchannel); + subchannels.put(Collections.singletonList(oldEag1), oldSubchannel); + subchannels.put(Collections.singletonList(newEag), newSubchannel); - List currentServers = - Lists.newArrayList( - new EquivalentAddressGroup(removedAddr), - new EquivalentAddressGroup(oldAddr)); + List currentServers = Lists.newArrayList(removedEag, oldEag1); InOrder inOrder = inOrder(mockHelper); @@ -236,15 +233,14 @@ public class RoundRobinLoadBalancerTest { assertThat(loadBalancer.getSubchannels()).containsExactly(removedSubchannel, oldSubchannel); - List latestServers = - Lists.newArrayList( - new EquivalentAddressGroup(oldAddr), - new EquivalentAddressGroup(newAddr)); -; + // This time with Attributes + List latestServers = Lists.newArrayList(oldEag2, newEag); + loadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(latestServers).setAttributes(affinity).build()); verify(newSubchannel, times(1)).requestConnection(); + verify(oldSubchannel, times(1)).updateAddresses(Arrays.asList(oldEag2)); verify(removedSubchannel, times(1)).shutdown(); deliverSubchannelState(removedSubchannel, ConnectivityStateInfo.forNonError(SHUTDOWN));