core: Propagate EAG Attributes in RoundRobinLoadBalancer

This allows plumbing information to the subchannel via Attributes, like
an authority override. RR still does not support multiple EAGs that only
differ by attributes.
This commit is contained in:
Eric Anderson 2019-09-09 16:39:59 -07:00
parent 7f693941f8
commit b092a29c5d
3 changed files with 50 additions and 32 deletions

View File

@ -42,6 +42,7 @@ import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil; import io.grpc.internal.ServiceConfigUtil;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -88,9 +89,8 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses(); List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
Attributes attributes = resolvedAddresses.getAttributes(); Attributes attributes = resolvedAddresses.getAttributes();
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet(); Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
Set<EquivalentAddressGroup> latestAddrs = stripAttrs(servers); Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(servers);
Set<EquivalentAddressGroup> addedAddrs = setsDifference(latestAddrs, currentAddrs); Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet());
Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs);
Map<String, ?> serviceConfig = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); Map<String, ?> serviceConfig = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
if (serviceConfig != null) { if (serviceConfig != null) {
@ -109,8 +109,18 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
} }
} }
for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry :
latestAddrs.entrySet()) {
EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey();
EquivalentAddressGroup originalAddressGroup = latestEntry.getValue();
Subchannel existingSubchannel = subchannels.get(strippedAddressGroup);
if (existingSubchannel != null) {
// EAG's Attributes may have changed.
existingSubchannel.updateAddresses(Collections.singletonList(originalAddressGroup));
continue;
}
// Create new subchannels for new addresses. // Create new subchannels for new addresses.
for (EquivalentAddressGroup addressGroup : addedAddrs) {
// NB(lukaszx0): we don't merge `attributes` with `subchannelAttr` because subchannel // NB(lukaszx0): we don't merge `attributes` with `subchannelAttr` because subchannel
// doesn't need them. They're describing the resolved server list but we're not taking // doesn't need them. They're describing the resolved server list but we're not taking
// any action based on this information. // any action based on this information.
@ -128,7 +138,7 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
final Subchannel subchannel = checkNotNull( final Subchannel subchannel = checkNotNull(
helper.createSubchannel(CreateSubchannelArgs.newBuilder() helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(addressGroup) .setAddresses(originalAddressGroup)
.setAttributes(subchannelAttrs.build()) .setAttributes(subchannelAttrs.build())
.build()), .build()),
"subchannel"); "subchannel");
@ -141,7 +151,7 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
if (stickyRef != null) { if (stickyRef != null) {
stickyRef.value = subchannel; stickyRef.value = subchannel;
} }
subchannels.put(addressGroup, subchannel); subchannels.put(strippedAddressGroup, subchannel);
subchannel.requestConnection(); subchannel.requestConnection();
} }
@ -168,7 +178,7 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
} }
private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
if (subchannels.get(subchannel.getAddresses()) != subchannel) { if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return; return;
} }
if (stateInfo.getState() == SHUTDOWN && stickinessState != null) { if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
@ -257,16 +267,21 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
/** /**
* Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and * Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and
* remove all attributes. * remove all attributes. The values are the original EAGs.
*/ */
private static Set<EquivalentAddressGroup> stripAttrs(List<EquivalentAddressGroup> groupList) { private static Map<EquivalentAddressGroup, EquivalentAddressGroup> stripAttrs(
Set<EquivalentAddressGroup> addrs = new HashSet<>(groupList.size()); List<EquivalentAddressGroup> groupList) {
Map<EquivalentAddressGroup, EquivalentAddressGroup> addrs = new HashMap<>(groupList.size() * 2);
for (EquivalentAddressGroup group : groupList) { for (EquivalentAddressGroup group : groupList) {
addrs.add(new EquivalentAddressGroup(group.getAddresses())); addrs.put(stripAttrs(group), group);
} }
return addrs; return addrs;
} }
private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
return new EquivalentAddressGroup(eag.getAddresses());
}
@VisibleForTesting @VisibleForTesting
Collection<Subchannel> getSubchannels() { Collection<Subchannel> getSubchannels() {
return subchannels.values(); return subchannels.values();

View File

@ -34,6 +34,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.base.Preconditions;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.ChannelLogger; import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel; import io.grpc.ChannelLogger.ChannelLogLevel;
@ -851,7 +852,7 @@ public class AutoConfiguredLoadBalancerFactoryTest {
this.attrs = args.getAttributes(); this.attrs = args.getAttributes();
} }
final List<EquivalentAddressGroup> addrs; List<EquivalentAddressGroup> addrs;
final Attributes attrs; final Attributes attrs;
@Override @Override
@ -875,6 +876,12 @@ public class AutoConfiguredLoadBalancerFactoryTest {
public Attributes getAttributes() { public Attributes getAttributes() {
return attrs; return attrs;
} }
@Override
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
Preconditions.checkNotNull(addrs, "addrs");
this.addrs = addrs;
}
} }
private static final class FakeLoadBalancerProvider extends LoadBalancerProvider { private static final class FakeLoadBalancerProvider extends LoadBalancerProvider {

View File

@ -195,25 +195,22 @@ public class RoundRobinLoadBalancerTest {
Subchannel oldSubchannel = mock(Subchannel.class); Subchannel oldSubchannel = mock(Subchannel.class);
Subchannel newSubchannel = mock(Subchannel.class); Subchannel newSubchannel = mock(Subchannel.class);
Attributes.Key<String> key = Attributes.Key.create("check-that-it-is-propagated");
FakeSocketAddress removedAddr = new FakeSocketAddress("removed"); FakeSocketAddress removedAddr = new FakeSocketAddress("removed");
EquivalentAddressGroup removedEag = new EquivalentAddressGroup(removedAddr);
FakeSocketAddress oldAddr = new FakeSocketAddress("old"); FakeSocketAddress oldAddr = new FakeSocketAddress("old");
EquivalentAddressGroup oldEag1 = new EquivalentAddressGroup(oldAddr);
EquivalentAddressGroup oldEag2 = new EquivalentAddressGroup(
oldAddr, Attributes.newBuilder().set(key, "oldattr").build());
FakeSocketAddress newAddr = new FakeSocketAddress("new"); FakeSocketAddress newAddr = new FakeSocketAddress("new");
EquivalentAddressGroup newEag = new EquivalentAddressGroup(
newAddr, Attributes.newBuilder().set(key, "newattr").build());
List<Subchannel> allSubchannels = subchannels.put(Collections.singletonList(removedEag), removedSubchannel);
Lists.newArrayList(removedSubchannel, oldSubchannel, newSubchannel); subchannels.put(Collections.singletonList(oldEag1), oldSubchannel);
List<FakeSocketAddress> allAddrs = subchannels.put(Collections.singletonList(newEag), newSubchannel);
Lists.newArrayList(removedAddr, oldAddr, newAddr);
for (int i = 0; i < allSubchannels.size(); i++) {
Subchannel subchannel = allSubchannels.get(i);
List<EquivalentAddressGroup> eagList =
Arrays.asList(new EquivalentAddressGroup(allAddrs.get(i)));
subchannels.put(eagList, subchannel);
}
List<EquivalentAddressGroup> currentServers = List<EquivalentAddressGroup> currentServers = Lists.newArrayList(removedEag, oldEag1);
Lists.newArrayList(
new EquivalentAddressGroup(removedAddr),
new EquivalentAddressGroup(oldAddr));
InOrder inOrder = inOrder(mockHelper); InOrder inOrder = inOrder(mockHelper);
@ -236,15 +233,14 @@ public class RoundRobinLoadBalancerTest {
assertThat(loadBalancer.getSubchannels()).containsExactly(removedSubchannel, assertThat(loadBalancer.getSubchannels()).containsExactly(removedSubchannel,
oldSubchannel); oldSubchannel);
List<EquivalentAddressGroup> latestServers = // This time with Attributes
Lists.newArrayList( List<EquivalentAddressGroup> latestServers = Lists.newArrayList(oldEag2, newEag);
new EquivalentAddressGroup(oldAddr),
new EquivalentAddressGroup(newAddr));
;
loadBalancer.handleResolvedAddresses( loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(latestServers).setAttributes(affinity).build()); ResolvedAddresses.newBuilder().setAddresses(latestServers).setAttributes(affinity).build());
verify(newSubchannel, times(1)).requestConnection(); verify(newSubchannel, times(1)).requestConnection();
verify(oldSubchannel, times(1)).updateAddresses(Arrays.asList(oldEag2));
verify(removedSubchannel, times(1)).shutdown(); verify(removedSubchannel, times(1)).shutdown();
deliverSubchannelState(removedSubchannel, ConnectivityStateInfo.forNonError(SHUTDOWN)); deliverSubchannelState(removedSubchannel, ConnectivityStateInfo.forNonError(SHUTDOWN));