diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index b0cf979118..04431fbd62 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -930,11 +930,16 @@ public abstract class LoadBalancer { * * @since 1.4.0 */ - // TODO(ejona): Allow passing a List here and to updateOobChannelAddresses, but want to - // wait until https://github.com/grpc/grpc-java/issues/4469 is done. - // https://github.com/grpc/grpc-java/issues/4618 public abstract ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority); + /** + * Accept a list of EAG for multiple authorities: https://github.com/grpc/grpc-java/issues/4618 + * */ + public ManagedChannel createOobChannel(List eag, + String authority) { + throw new UnsupportedOperationException(); + } + /** * Updates the addresses used for connections in the {@code Channel} that was created by {@link * #createOobChannel(EquivalentAddressGroup, String)}. This is superior to {@link @@ -949,6 +954,15 @@ public abstract class LoadBalancer { throw new UnsupportedOperationException(); } + /** + * Updates the addresses with a new EAG list. Connection is continued when old and new addresses + * overlap. + * */ + public void updateOobChannelAddresses(ManagedChannel channel, + List eag) { + throw new UnsupportedOperationException(); + } + /** * Creates an out-of-band channel for LoadBalancer's own RPC needs, e.g., talking to an external * load-balancer service, that is specified by a target string. See the documentation on diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 97be3f26e0..5ec8e9435a 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -339,12 +339,12 @@ final class InternalSubchannel implements InternalInstrumented, Tr Preconditions.checkNotNull(newAddressGroups, "newAddressGroups"); checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry"); Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty"); + final List newImmutableAddressGroups = + Collections.unmodifiableList(new ArrayList<>(newAddressGroups)); syncContext.execute(new Runnable() { @Override public void run() { - List newImmutableAddressGroups = - Collections.unmodifiableList(new ArrayList<>(newAddressGroups)); ManagedClientTransport savedTransport = null; SocketAddress previousAddress = addressIndex.getCurrentAddress(); addressIndex.updateGroups(newImmutableAddressGroups); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 0ab8d49b68..ac9c78ad2f 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1465,6 +1465,12 @@ final class ManagedChannelImpl extends ManagedChannel implements @Override public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) { + return createOobChannel(Collections.singletonList(addressGroup), authority); + } + + @Override + public ManagedChannel createOobChannel(List addressGroup, + String authority) { // TODO(ejona): can we be even stricter? Like terminating? checkState(!terminated, "Channel is terminated"); long oobChannelCreationTime = timeProvider.currentTimeNanos(); @@ -1505,7 +1511,7 @@ final class ManagedChannelImpl extends ManagedChannel implements } final InternalSubchannel internalSubchannel = new InternalSubchannel( - Collections.singletonList(addressGroup), + addressGroup, authority, userAgent, backoffPolicyProvider, oobTransportFactory, oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext, // All callback methods are run from syncContext @@ -1625,6 +1631,12 @@ final class ManagedChannelImpl extends ManagedChannel implements @Override public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { + updateOobChannelAddresses(channel, Collections.singletonList(eag)); + } + + @Override + public void updateOobChannelAddresses(ManagedChannel channel, + List eag) { checkArgument(channel instanceof OobChannel, "channel must have been returned from createOobChannel"); ((OobChannel) channel).updateAddresses(eag); diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index 5e0dd12b2a..f69fd17e5c 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -193,8 +193,8 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented eag) { + subchannel.updateAddresses(eag); } @Override diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java index 033524fa81..42299571f8 100644 --- a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java +++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java @@ -31,6 +31,7 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.NameResolver; import io.grpc.NameResolverRegistry; import io.grpc.SynchronizationContext; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771") @@ -50,11 +51,21 @@ public abstract class ForwardingLoadBalancerHelper extends LoadBalancer.Helper { return delegate().createOobChannel(eag, authority); } + @Override + public ManagedChannel createOobChannel(List eag, String authority) { + return delegate().createOobChannel(eag, authority); + } + @Override public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { delegate().updateOobChannelAddresses(channel, eag); } + @Override + public void updateOobChannelAddresses(ManagedChannel channel, List eag) { + delegate().updateOobChannelAddresses(channel, eag); + } + @Deprecated @Override public ManagedChannelBuilder createResolvingOobChannelBuilder(String target) { diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index a2a2925c4f..6a76f75c8b 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -402,7 +402,7 @@ public class ManagedChannelImplIdlenessTest { assertFalse(channel.inUseStateAggregator.isInUse()); // Now make an RPC on an OOB channel - ManagedChannel oob = helper.createOobChannel(servers.get(0), "oobauthority"); + ManagedChannel oob = helper.createOobChannel(servers, "oobauthority"); verify(mockTransportFactory, never()) .newClientTransport( any(SocketAddress.class), @@ -438,13 +438,13 @@ public class ManagedChannelImplIdlenessTest { verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); deliverResolutionResult(); Helper helper = helperCaptor.getValue(); - ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost"); + ManagedChannel oobChannel = helper.createOobChannel(servers.subList(0,1), "localhost"); oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); MockClientTransportInfo t0 = newTransports.poll(); t0.listener.transportReady(); - helper.updateOobChannelAddresses(oobChannel, servers.get(1)); + helper.updateOobChannelAddresses(oobChannel, servers.subList(1,2)); oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); MockClientTransportInfo t1 = newTransports.poll(); @@ -462,7 +462,7 @@ public class ManagedChannelImplIdlenessTest { verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); Helper helper = helperCaptor.getValue(); deliverResolutionResult(); - ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost"); + ManagedChannel oobChannel = helper.createOobChannel(servers.subList(0,1), "localhost"); oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); MockClientTransportInfo t0 = newTransports.poll(); @@ -470,7 +470,8 @@ public class ManagedChannelImplIdlenessTest { List changedList = new ArrayList<>(servers.get(0).getAddresses()); changedList.add(new FakeSocketAddress("aDifferentServer")); - helper.updateOobChannelAddresses(oobChannel, new EquivalentAddressGroup(changedList)); + helper.updateOobChannelAddresses(oobChannel, Collections.singletonList( + new EquivalentAddressGroup(changedList))); oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); assertNull(newTransports.poll()); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 9291363775..7f896892d0 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -704,7 +704,8 @@ public class ManagedChannelImplTest { @Test public void channelzMembership_oob() throws Exception { createChannel(); - OobChannel oob = (OobChannel) helper.createOobChannel(addressGroup, AUTHORITY); + OobChannel oob = (OobChannel) helper.createOobChannel( + Collections.singletonList(addressGroup), AUTHORITY); // oob channels are not root channels assertNull(channelz.getRootChannel(oob.getLogId().getId())); assertTrue(channelz.containsSubchannel(oob.getLogId())); @@ -1621,8 +1622,10 @@ public class ManagedChannelImplTest { public void oobchannels() { createChannel(); - ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority"); - ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority"); + ManagedChannel oob1 = helper.createOobChannel( + Collections.singletonList(addressGroup), "oob1authority"); + ManagedChannel oob2 = helper.createOobChannel( + Collections.singletonList(addressGroup), "oob2authority"); verify(balancerRpcExecutorPool, times(2)).getObject(); assertEquals("oob1authority", oob1.authority()); @@ -1755,7 +1758,8 @@ public class ManagedChannelImplTest { .containsExactly(channelCredValue, callCredValue).inOrder(); // Verify that the oob channel does not - ManagedChannel oob = helper.createOobChannel(addressGroup, "oobauthority"); + ManagedChannel oob = helper.createOobChannel( + Collections.singletonList(addressGroup), "oobauthority"); headers = new Metadata(); call = oob.newCall(method, callOptions); @@ -1886,8 +1890,10 @@ public class ManagedChannelImplTest { @Test public void oobChannelsWhenChannelShutdownNow() { createChannel(); - ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority"); - ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority"); + ManagedChannel oob1 = helper.createOobChannel( + Collections.singletonList(addressGroup), "oob1Authority"); + ManagedChannel oob2 = helper.createOobChannel( + Collections.singletonList(addressGroup), "oob2Authority"); oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata()); @@ -1915,8 +1921,10 @@ public class ManagedChannelImplTest { @Test public void oobChannelsNoConnectionShutdown() { createChannel(); - ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority"); - ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority"); + ManagedChannel oob1 = helper.createOobChannel( + Collections.singletonList(addressGroup), "oob1Authority"); + ManagedChannel oob2 = helper.createOobChannel( + Collections.singletonList(addressGroup), "oob2Authority"); channel.shutdown(); verify(mockLoadBalancer).shutdown(); @@ -1934,8 +1942,8 @@ public class ManagedChannelImplTest { @Test public void oobChannelsNoConnectionShutdownNow() { createChannel(); - helper.createOobChannel(addressGroup, "oob1Authority"); - helper.createOobChannel(addressGroup, "oob2Authority"); + helper.createOobChannel(Collections.singletonList(addressGroup), "oob1Authority"); + helper.createOobChannel(Collections.singletonList(addressGroup), "oob2Authority"); channel.shutdownNow(); verify(mockLoadBalancer).shutdown(); @@ -2116,7 +2124,8 @@ public class ManagedChannelImplTest { channelBuilder.nameResolverFactory(nameResolverFactory); createChannel(); if (isOobChannel) { - OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority"); + OobChannel oobChannel = (OobChannel) helper.createOobChannel( + Collections.singletonList(addressGroup), "oobAuthority"); oobChannel.getSubchannel().requestConnection(); } else { Subchannel subchannel = @@ -3183,7 +3192,8 @@ public class ManagedChannelImplTest { public void channelTracing_oobChannelStateChangeEvent() throws Exception { channelBuilder.maxTraceEvents(10); createChannel(); - OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority"); + OobChannel oobChannel = (OobChannel) helper.createOobChannel( + Collections.singletonList(addressGroup), "authority"); timer.forwardNanos(1234); oobChannel.handleSubchannelStateChange( ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING)); @@ -3199,7 +3209,8 @@ public class ManagedChannelImplTest { channelBuilder.maxTraceEvents(10); createChannel(); timer.forwardNanos(1234); - OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority"); + OobChannel oobChannel = (OobChannel) helper.createOobChannel( + Collections.singletonList(addressGroup), "authority"); assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() .setDescription("Child OobChannel created") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) @@ -3207,13 +3218,13 @@ public class ManagedChannelImplTest { .setChannelRef(oobChannel) .build()); assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() - .setDescription("OobChannel for [[test-addr]/{}] created") + .setDescription("OobChannel for [[[test-addr]/{}]] created") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) .setTimestampNanos(timer.getTicker().read()) .build()); assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains( new ChannelTrace.Event.Builder() - .setDescription("Subchannel for [[test-addr]/{}] created") + .setDescription("Subchannel for [[[test-addr]/{}]] created") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) .setTimestampNanos(timer.getTicker().read()) .build()); @@ -3349,7 +3360,8 @@ public class ManagedChannelImplTest { ClientStream mockStream = mock(ClientStream.class); createChannel(); - OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority"); + OobChannel oobChannel = (OobChannel) helper.createOobChannel( + Collections.singletonList(addressGroup), "oobauthority"); AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel(); FakeClock callExecutor = new FakeClock(); CallOptions options = @@ -3411,7 +3423,8 @@ public class ManagedChannelImplTest { createChannel(); String authority = "oobauthority"; - OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, authority); + OobChannel oobChannel = (OobChannel) helper.createOobChannel( + Collections.singletonList(addressGroup), authority); assertEquals(authority, getStats(oobChannel).target); } @@ -3419,7 +3432,8 @@ public class ManagedChannelImplTest { public void channelsAndSubchannels_oob_instrumented_state() throws Exception { createChannel(); - OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority"); + OobChannel oobChannel = (OobChannel) helper.createOobChannel( + Collections.singletonList(addressGroup), "oobauthority"); assertEquals(IDLE, getStats(oobChannel).state); oobChannel.getSubchannel().requestConnection(); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index ddb4170c46..1a8dec36e3 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -75,26 +75,28 @@ class GrpclbLoadBalancer extends LoadBalancer { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { Attributes attributes = resolvedAddresses.getAttributes(); List newLbAddresses = attributes.get(GrpclbConstants.ATTR_LB_ADDRS); - if ((newLbAddresses == null || newLbAddresses.isEmpty()) - && resolvedAddresses.getAddresses().isEmpty()) { + if (newLbAddresses == null) { + newLbAddresses = Collections.emptyList(); + } + if (newLbAddresses.isEmpty() && resolvedAddresses.getAddresses().isEmpty()) { handleNameResolutionError( Status.UNAVAILABLE.withDescription("No backend or balancer addresses found")); return; } - List newLbAddressGroups = new ArrayList<>(); - - if (newLbAddresses != null) { - for (EquivalentAddressGroup lbAddr : newLbAddresses) { - String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY); - if (lbAddrAuthority == null) { - throw new AssertionError( - "This is a bug: LB address " + lbAddr + " does not have an authority."); - } - newLbAddressGroups.add(new LbAddressGroup(lbAddr, lbAddrAuthority)); + List overrideAuthorityLbAddresses = + new ArrayList<>(newLbAddresses.size()); + for (EquivalentAddressGroup lbAddr : newLbAddresses) { + String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY); + if (lbAddrAuthority == null) { + throw new AssertionError( + "This is a bug: LB address " + lbAddr + " does not have an authority."); } + Attributes attrs = lbAddr.getAttributes().toBuilder() + .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, lbAddrAuthority) + .build(); + overrideAuthorityLbAddresses.add(new EquivalentAddressGroup(lbAddr.getAddresses(), attrs)); } - newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups); List newBackendServers = Collections.unmodifiableList(resolvedAddresses.getAddresses()); GrpclbConfig newConfig = (GrpclbConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); @@ -106,7 +108,8 @@ class GrpclbLoadBalancer extends LoadBalancer { helper.getChannelLogger().log(ChannelLogLevel.INFO, "Config: " + newConfig); recreateStates(); } - grpclbState.handleAddresses(newLbAddressGroups, newBackendServers); + grpclbState.handleAddresses(Collections.unmodifiableList(overrideAuthorityLbAddresses), + newBackendServers); } @Override diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index a241453f86..b2d8b453de 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -63,7 +63,6 @@ import io.grpc.lb.v1.ServerList; import io.grpc.stub.StreamObserver; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; @@ -108,6 +107,8 @@ final class GrpclbState { return "BUFFER_ENTRY"; } }; + @VisibleForTesting + static final String NO_USE_AUTHORITY_SUFFIX = "-notIntendedToBeUsed"; enum Mode { ROUND_ROBIN, @@ -224,7 +225,8 @@ final class GrpclbState { * not yet connected. */ void handleAddresses( - List newLbAddressGroups, List newBackendServers) { + List newLbAddressGroups, + List newBackendServers) { logger.log( ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Resolved addresses: lb addresses {0}, backends: {1}", @@ -237,8 +239,7 @@ final class GrpclbState { shutdownLbComm(); syncContext.execute(new FallbackModeTask()); } else { - LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups); - startLbComm(newLbAddressGroup); + startLbComm(newLbAddressGroups); // Avoid creating a new RPC just because the addresses were updated, as it can cause a // stampeding herd. The current RPC may be on a connection to an address not present in // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an @@ -318,24 +319,20 @@ final class GrpclbState { } } - private void startLbComm(LbAddressGroup lbAddressGroup) { - checkNotNull(lbAddressGroup, "lbAddressGroup"); + private void startLbComm(List overrideAuthorityEags) { + checkNotNull(overrideAuthorityEags, "overrideAuthorityEags"); + assert !overrideAuthorityEags.isEmpty(); + String doNotUseAuthority = overrideAuthorityEags.get(0).getAttributes() + .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE) + NO_USE_AUTHORITY_SUFFIX; if (lbCommChannel == null) { - lbCommChannel = helper.createOobChannel( - lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority()); + lbCommChannel = helper.createOobChannel(overrideAuthorityEags, doNotUseAuthority); logger.log( ChannelLogLevel.DEBUG, - "[grpclb-<{0}>] Created grpclb channel: address={1}, authority={2}", + "[grpclb-<{0}>] Created grpclb channel: EAG={1}", serviceName, - lbAddressGroup.getAddresses(), - lbAddressGroup.getAuthority()); - } else if (lbAddressGroup.getAuthority().equals(lbCommChannel.authority())) { - helper.updateOobChannelAddresses(lbCommChannel, lbAddressGroup.getAddresses()); + overrideAuthorityEags); } else { - // Full restart of channel - shutdownLbComm(); - lbCommChannel = helper.createOobChannel( - lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority()); + helper.updateOobChannelAddresses(lbCommChannel, overrideAuthorityEags); } } @@ -867,47 +864,6 @@ final class GrpclbState { helper.updateBalancingState(state, picker); } - private LbAddressGroup flattenLbAddressGroups(List groupList) { - assert !groupList.isEmpty(); - List eags = new ArrayList<>(groupList.size()); - String authority = groupList.get(0).getAuthority(); - for (LbAddressGroup group : groupList) { - if (!authority.equals(group.getAuthority())) { - // TODO(ejona): Allow different authorities for different addresses. Requires support from - // Helper. - logger.log(ChannelLogLevel.WARNING, - "[grpclb-<{0}>] Multiple authorities found for LB. " - + "Skipping addresses for {1} in preference to {2}", - serviceName, - group.getAuthority(), - authority); - } else { - eags.add(group.getAddresses()); - } - } - // ALTS code can use the presence of ATTR_LB_ADDR_AUTHORITY to select ALTS instead of TLS, with - // Netty. - // TODO(ejona): The process here is a bit of a hack because ATTR_LB_ADDR_AUTHORITY isn't - // actually used in the normal case. https://github.com/grpc/grpc-java/issues/4618 should allow - // this to be more obvious. - Attributes attrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority) - .build(); - return new LbAddressGroup(flattenEquivalentAddressGroup(eags, attrs), authority); - } - - /** - * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object. - */ - private static EquivalentAddressGroup flattenEquivalentAddressGroup( - List groupList, Attributes attrs) { - List addrs = new ArrayList<>(); - for (EquivalentAddressGroup group : groupList) { - addrs.addAll(group.getAddresses()); - } - return new EquivalentAddressGroup(addrs, attrs); - } - private static Attributes createSubchannelAttrs() { return Attributes.newBuilder() .set(STATE_INFO, diff --git a/grpclb/src/main/java/io/grpc/grpclb/LbAddressGroup.java b/grpclb/src/main/java/io/grpc/grpclb/LbAddressGroup.java deleted file mode 100644 index cca096f75a..0000000000 --- a/grpclb/src/main/java/io/grpc/grpclb/LbAddressGroup.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2016 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.grpclb; - -import static com.google.common.base.Preconditions.checkNotNull; - -import io.grpc.EquivalentAddressGroup; - -/** - * Represents a balancer address entry. - */ -final class LbAddressGroup { - private final EquivalentAddressGroup addresses; - private final String authority; - - LbAddressGroup(EquivalentAddressGroup addresses, String authority) { - this.addresses = checkNotNull(addresses, "addresses"); - this.authority = checkNotNull(authority, "authority"); - } - - EquivalentAddressGroup getAddresses() { - return addresses; - } - - String getAuthority() { - return authority; - } -} diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index c7ed8b1efd..e768cf018e 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -25,6 +25,7 @@ import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY; import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT; +import static io.grpc.grpclb.GrpclbState.NO_USE_AUTHORITY_SUFFIX; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -108,6 +109,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.AdditionalAnswers; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; @@ -801,11 +803,11 @@ public class GrpclbLoadBalancerTest { // Recover with a subsequent success List grpclbBalancerList = createResolvedBalancerAddresses(1); - EquivalentAddressGroup eag = grpclbBalancerList.get(0); deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); - verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); } @@ -816,7 +818,8 @@ public class GrpclbLoadBalancerTest { List grpclbBalancerList = createResolvedBalancerAddresses(1); deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); - verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -853,28 +856,27 @@ public class GrpclbLoadBalancerTest { List grpclbBalancerList = createResolvedBalancerAddresses(1); deliverResolvedAddresses(backendList, grpclbBalancerList); - verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); ManagedChannel oobChannel = fakeOobChannels.poll(); assertEquals(1, lbRequestObservers.size()); List backendList2 = createResolvedBackendAddresses(1); List grpclbBalancerList2 = createResolvedBalancerAddresses(2); - EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList( - grpclbBalancerList2.get(0).getAddresses().get(0), - grpclbBalancerList2.get(1).getAddresses().get(0)), - lbAttributes(lbAuthority(0))); deliverResolvedAddresses(backendList2, grpclbBalancerList2); - verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag)); + verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2))); assertEquals(1, lbRequestObservers.size()); // No additional RPC } + @Test public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { List backendList = createResolvedBackendAddresses(1); List grpclbBalancerList = createResolvedBalancerAddresses(1); deliverResolvedAddresses(backendList, grpclbBalancerList); - verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); ManagedChannel oobChannel = fakeOobChannels.poll(); assertEquals(1, lbRequestObservers.size()); @@ -885,9 +887,8 @@ public class GrpclbLoadBalancerTest { new EquivalentAddressGroup( new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority))); deliverResolvedAddresses(backendList2, grpclbBalancerList2); - assertTrue(oobChannel.isTerminated()); - verify(helper).createOobChannel(eq(grpclbBalancerList2.get(0)), eq(newAuthority)); - assertEquals(2, lbRequestObservers.size()); // An additional RPC + verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2))); + assertEquals(1, lbRequestObservers.size()); // No additional RPC } @Test @@ -899,7 +900,8 @@ public class GrpclbLoadBalancerTest { // Fallback timer is started as soon as the addresses are resolved. assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -1214,8 +1216,8 @@ public class GrpclbLoadBalancerTest { List grpclbBalancerList = createResolvedBalancerAddresses(1); deliverResolvedAddresses(backendList, grpclbBalancerList); - inOrder.verify(helper) - .createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); + inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -1275,12 +1277,7 @@ public class GrpclbLoadBalancerTest { // New addresses are updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( - same(oobChannel), - eq(new EquivalentAddressGroup( - Arrays.asList( - grpclbBalancerList.get(0).getAddresses().get(0), - grpclbBalancerList.get(1).getAddresses().get(0)), - lbAttributes(lbAuthority(0))))); + same(oobChannel), eq(xattr(grpclbBalancerList))); if (timerExpires) { // Still in fallback logic, except that the backend list is empty @@ -1299,8 +1296,7 @@ public class GrpclbLoadBalancerTest { // New LB address is updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( - same(oobChannel), - eq(grpclbBalancerList.get(0))); + same(oobChannel), eq(xattr(grpclbBalancerList))); if (timerExpires) { // New backend addresses are used for fallback @@ -1365,7 +1361,8 @@ public class GrpclbLoadBalancerTest { List grpclbBalancerList = createResolvedBalancerAddresses(1); deliverResolvedAddresses(backendList, grpclbBalancerList); - inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); + inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); // Attempted to connect to balancer assertThat(fakeOobChannels).hasSize(1); @@ -1430,7 +1427,7 @@ public class GrpclbLoadBalancerTest { // No fallback timeout timer scheduled. assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); verify(helper, never()) - .createOobChannel(any(EquivalentAddressGroup.class), anyString()); + .createOobChannel(ArgumentMatchers.anyList(), anyString()); } @Test @@ -1459,7 +1456,8 @@ public class GrpclbLoadBalancerTest { List grpclbBalancerList = createResolvedBalancerAddresses(1); deliverResolvedAddresses(backendList, grpclbBalancerList); - inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0))); + inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); // Attempted to connect to balancer assertEquals(1, fakeOobChannels.size()); @@ -1609,16 +1607,36 @@ public class GrpclbLoadBalancerTest { lbAttributes("fake-authority-2")), new EquivalentAddressGroup( new FakeSocketAddress("fake-address-3"), - lbAttributes("fake-authority-1"))); - final EquivalentAddressGroup goldenOobChannelEag = new EquivalentAddressGroup( - Arrays.asList( - new FakeSocketAddress("fake-address-1"), - new FakeSocketAddress("fake-address-3")), - lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day - + lbAttributes("fake-authority-1").toBuilder() + .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value").build() + )); deliverResolvedAddresses(backendList, grpclbBalancerList); - verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1"); + List goldenOobEagList = + Arrays.asList( + new EquivalentAddressGroup( + new FakeSocketAddress("fake-address-1"), + Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1") + .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1") + .build()), + new EquivalentAddressGroup( + new FakeSocketAddress("fake-address-2"), + Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-2") + .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-2") + .build()), + new EquivalentAddressGroup( + new FakeSocketAddress("fake-address-3"), + Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1") + .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value") + .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1") + .build() + )); + + verify(helper).createOobChannel(eq(goldenOobEagList), + eq("fake-authority-1" + NO_USE_AUTHORITY_SUFFIX)); } @Test @@ -2323,13 +2341,8 @@ public class GrpclbLoadBalancerTest { // Fallback timer is started as soon as the addresses are resolved. assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - - List addrs = new ArrayList<>(); - addrs.addAll(grpclbBalancerList.get(0).getAddresses()); - addrs.addAll(grpclbBalancerList.get(1).getAddresses()); - Attributes attr = grpclbBalancerList.get(0).getAttributes(); - EquivalentAddressGroup oobChannelEag = new EquivalentAddressGroup(addrs, attr); - verify(helper).createOobChannel(eq(oobChannelEag), eq(lbAuthority(0))); + verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -2632,6 +2645,18 @@ public class GrpclbLoadBalancerTest { .build(); } + private List xattr(List lbAddr) { + List oobAddr = new ArrayList<>(lbAddr.size()); + for (EquivalentAddressGroup lb : lbAddr) { + String authority = lb.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY); + Attributes attrs = lb.getAttributes().toBuilder() + .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, authority) + .build(); + oobAddr.add(new EquivalentAddressGroup(lb.getAddresses(), attrs)); + } + return oobAddr; + } + private static class ServerEntry { final InetSocketAddress addr; final String token; @@ -2699,7 +2724,7 @@ public class GrpclbLoadBalancerTest { } @Override - public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + public ManagedChannel createOobChannel(List eag, String authority) { ManagedChannel channel = InProcessChannelBuilder .forName("fakeLb") @@ -2711,6 +2736,11 @@ public class GrpclbLoadBalancerTest { return channel; } + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + return createOobChannel(Collections.singletonList(eag), authority); + } + @Override public Subchannel createSubchannel(CreateSubchannelArgs args) { FakeSubchannel subchannel = @@ -2751,5 +2781,10 @@ public class GrpclbLoadBalancerTest { @Override public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { } + + @Override + public void updateOobChannelAddresses(ManagedChannel channel, + List eag) { + } } }