core/grpclb: resolve TXT records in DNS name resolver and include balancer addresses

This commit is contained in:
Carl Mastrangelo 2017-12-19 10:01:53 -08:00 committed by GitHub
parent 04420dfdf7
commit 6eaae5d081
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 192 additions and 50 deletions

View File

@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
@ -27,7 +28,9 @@ import io.grpc.Status;
import io.grpc.internal.SharedResourceHolder.Resource;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -38,6 +41,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.naming.NamingEnumeration;
@ -49,7 +53,7 @@ import javax.naming.directory.InitialDirContext;
* A DNS-based {@link NameResolver}.
*
* <p>Each {@code A} or {@code AAAA} record emits an {@link EquivalentAddressGroup} in the list
* passed to {@link NameResolver.Listener#onUpdate}
* passed to {@link NameResolver.Listener#onAddresses(List, Attributes)}
*
* @see DnsNameResolverProvider
*/
@ -59,8 +63,16 @@ final class DnsNameResolver extends NameResolver {
private static final boolean JNDI_AVAILABLE = jndiAvailable();
// From https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md
private static final String SERVICE_CONFIG_NAME_PREFIX = "_grpc_config.";
// From https://github.com/grpc/proposal/blob/master/A5-grpclb-in-dns.md
private static final String GRPCLB_NAME_PREFIX = "_grpclb._tcp.";
private static final String jndiProperty =
System.getProperty("io.grpc.internal.DnsNameResolverProvider.enable_jndi", "false");
@VisibleForTesting
static boolean enableJndi = false;
static boolean enableJndi = Boolean.parseBoolean(jndiProperty);
private DelegateResolver delegateResolver = pickDelegateResolver();
@ -174,11 +186,19 @@ final class DnsNameResolver extends NameResolver {
return;
}
// Each address forms an EAG
ArrayList<EquivalentAddressGroup> servers = new ArrayList<EquivalentAddressGroup>();
List<EquivalentAddressGroup> servers = new ArrayList<EquivalentAddressGroup>();
for (InetAddress inetAddr : resolvedInetAddrs.addresses) {
servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port)));
}
savedListener.onAddresses(servers, Attributes.EMPTY);
servers.addAll(resolvedInetAddrs.balancerAddresses);
Attributes.Builder attrs = Attributes.newBuilder();
if (!resolvedInetAddrs.txtRecords.isEmpty()) {
attrs.set(
GrpcAttributes.NAME_RESOLVER_ATTR_DNS_TXT,
Collections.unmodifiableList(new ArrayList<String>(resolvedInetAddrs.txtRecords)));
}
savedListener.onAddresses(servers, attrs.build());
} finally {
synchronized (DnsNameResolver.this) {
resolving = false;
@ -278,10 +298,16 @@ final class DnsNameResolver extends NameResolver {
static final class ResolutionResults {
final List<InetAddress> addresses;
final List<String> txtRecords;
final List<EquivalentAddressGroup> balancerAddresses;
ResolutionResults(List<InetAddress> addresses, List<String> txtRecords) {
ResolutionResults(
List<InetAddress> addresses,
List<String> txtRecords,
List<EquivalentAddressGroup> balancerAddresses) {
this.addresses = Collections.unmodifiableList(checkNotNull(addresses, "addresses"));
this.txtRecords = Collections.unmodifiableList(checkNotNull(txtRecords, "txtRecords"));
this.balancerAddresses =
Collections.unmodifiableList(checkNotNull(balancerAddresses, "balancerAddresses"));
}
}
@ -305,14 +331,16 @@ final class DnsNameResolver extends NameResolver {
ResolutionResults jdkResults = jdkResovler.resolve(host);
List<InetAddress> addresses = jdkResults.addresses;
List<String> txtRecords = Collections.emptyList();
List<EquivalentAddressGroup> balancerAddresses = Collections.emptyList();
try {
ResolutionResults jdniResults = jndiResovler.resolve(host);
txtRecords = jdniResults.txtRecords;
balancerAddresses = jdniResults.balancerAddresses;
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to resolve TXT results", e);
}
return new ResolutionResults(addresses, txtRecords);
return new ResolutionResults(addresses, txtRecords, balancerAddresses);
}
}
@ -328,7 +356,8 @@ final class DnsNameResolver extends NameResolver {
ResolutionResults resolve(String host) throws Exception {
return new ResolutionResults(
Arrays.asList(InetAddress.getAllByName(host)),
Collections.<String>emptyList());
Collections.<String>emptyList(),
Collections.<EquivalentAddressGroup>emptyList());
}
}
@ -340,26 +369,84 @@ final class DnsNameResolver extends NameResolver {
@VisibleForTesting
static final class JndiResolver extends DelegateResolver {
private static final String[] rrTypes = new String[]{"TXT"};
private static final Pattern whitespace = Pattern.compile("\\s+");
@Override
ResolutionResults resolve(String host) throws NamingException {
List<String> serviceConfigTxtRecords = Collections.emptyList();
String serviceConfigHostname = SERVICE_CONFIG_NAME_PREFIX + host;
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER, "About to query TXT records for {0}", new Object[]{serviceConfigHostname});
}
try {
serviceConfigTxtRecords = getAllRecords("TXT", "dns:///" + serviceConfigHostname);
} catch (NamingException e) {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "Unable to look up " + serviceConfigHostname, e);
}
}
String grpclbHostname = GRPCLB_NAME_PREFIX + host;
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER, "About to query SRV records for {0}", new Object[]{grpclbHostname});
}
List<EquivalentAddressGroup> balancerAddresses = Collections.emptyList();
try {
List<String> grpclbSrvRecords = getAllRecords("SRV", "dns:///" + grpclbHostname);
balancerAddresses = new ArrayList<EquivalentAddressGroup>(grpclbSrvRecords.size());
for (String srvRecord : grpclbSrvRecords) {
try {
String[] parts = whitespace.split(srvRecord);
Verify.verify(parts.length == 4, "Bad SRV Record: %s, ", srvRecord);
String srvHostname = parts[3];
int port = Integer.parseInt(parts[2]);
InetAddress[] addrs = InetAddress.getAllByName(srvHostname);
List<SocketAddress> sockaddrs = new ArrayList<SocketAddress>(addrs.length);
for (InetAddress addr : addrs) {
sockaddrs.add(new InetSocketAddress(addr, port));
}
Attributes attrs = Attributes.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, srvHostname)
.build();
balancerAddresses.add(
new EquivalentAddressGroup(Collections.unmodifiableList(sockaddrs), attrs));
} catch (UnknownHostException e) {
logger.log(Level.WARNING, "Can't find address for SRV record" + srvRecord, e);
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Failed to construct SRV record" + srvRecord, e);
}
}
} catch (NamingException e) {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "Unable to look up " + serviceConfigHostname, e);
}
}
return new ResolutionResults(
/*addresses=*/ Collections.<InetAddress>emptyList(),
serviceConfigTxtRecords,
Collections.unmodifiableList(balancerAddresses));
}
private List<String> getAllRecords(String recordType, String name) throws NamingException {
InitialDirContext dirContext = new InitialDirContext();
javax.naming.directory.Attributes attrs = dirContext.getAttributes("dns:///" + host, rrTypes);
List<InetAddress> addresses = new ArrayList<InetAddress>();
List<String> txtRecords = new ArrayList<String>();
String[] rrType = new String[]{recordType};
javax.naming.directory.Attributes attrs = dirContext.getAttributes(name, rrType);
List<String> records = new ArrayList<String>();
NamingEnumeration<? extends Attribute> rrGroups = attrs.getAll();
try {
while (rrGroups.hasMore()) {
Attribute rrEntry = rrGroups.next();
assert Arrays.asList(rrTypes).contains(rrEntry.getID());
assert Arrays.asList(rrType).contains(rrEntry.getID());
NamingEnumeration<?> rrValues = rrEntry.getAll();
try {
while (rrValues.hasMore()) {
String rrValue = (String) rrValues.next();
txtRecords.add(rrValue);
records.add(String.valueOf(rrValues.next()));
}
} finally {
rrValues.close();
@ -368,8 +455,7 @@ final class DnsNameResolver extends NameResolver {
} finally {
rrGroups.close();
}
return new ResolutionResults(addresses, txtRecords);
return records;
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2017, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import io.grpc.Attributes;
import java.util.List;
/**
* Special attributes that are only useful to gRPC.
*/
public final class GrpcAttributes {
/**
* Attribute key TXT DNS records.
*/
public static final Attributes.Key<List<String>> NAME_RESOLVER_ATTR_DNS_TXT =
Attributes.Key.of("dns-txt");
/**
* The naming authority of a gRPC LB server address. It is an address-group-level attribute,
* present when the address group is a LoadBalancer.
*/
public static final Attributes.Key<String> ATTR_LB_ADDR_AUTHORITY =
Attributes.Key.of("io.grpc.grpclb.lbAddrAuthority");
private GrpcAttributes() {}
}

View File

@ -906,27 +906,32 @@ public final class ManagedChannelImpl extends ManagedChannel {
onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
return;
}
logger.log(Level.FINE, "[{0}] resolved address: {1}, config={2}",
new Object[] {getLogId(), servers, config});
helper.runSerialized(new Runnable() {
@Override
public void run() {
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
return;
}
try {
balancer.handleResolvedAddressGroups(servers, config);
} catch (Throwable e) {
logger.log(
Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e);
// It must be a bug! Push the exception back to LoadBalancer in the hope that it may
// be propagated to the application.
balancer.handleNameResolutionError(Status.INTERNAL.withCause(e)
.withDescription("Thrown from handleResolvedAddresses(): " + e));
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "[{0}] resolved address: {1}, config={2}",
new Object[]{getLogId(), servers, config});
}
final class NamesResolved implements Runnable {
@Override
public void run() {
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
return;
}
});
try {
balancer.handleResolvedAddressGroups(servers, config);
} catch (Throwable e) {
logger.log(
Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e);
// It must be a bug! Push the exception back to LoadBalancer in the hope that it may
// be propagated to the application.
balancer.handleNameResolutionError(Status.INTERNAL.withCause(e)
.withDescription("Thrown from handleResolvedAddresses(): " + e));
}
}
}
helper.runSerialized(new NamesResolved());
}
@Override

View File

@ -323,15 +323,25 @@ public class DnsNameResolverTest {
DelegateResolver resolver = new DnsNameResolver.CompositeResolver(jdkDelegate, jndiDelegate);
List<InetAddress> jdkAnswer = createAddressList(2);
jdkDelegate.addAnswer(jdkAnswer, Arrays.asList("jdktxt"));
jdkDelegate.addAnswer(
jdkAnswer,
Arrays.asList("jdktxt"),
Collections.<EquivalentAddressGroup>emptyList());
List<InetAddress> jdniAnswer = createAddressList(2);
jndiDelegate.addAnswer(jdniAnswer, Arrays.asList("jnditxt"));
jndiDelegate.addAnswer(
jdniAnswer,
Arrays.asList("jnditxt"),
Collections.singletonList(
new EquivalentAddressGroup(
Collections.<SocketAddress>singletonList(new SocketAddress() {}),
Attributes.EMPTY)));
ResolutionResults results = resolver.resolve("abc");
assertThat(results.addresses).containsExactlyElementsIn(jdkAnswer).inOrder();
assertThat(results.txtRecords).containsExactly("jnditxt");
assertThat(results.balancerAddresses).hasSize(1);
}
@Test
@ -418,14 +428,19 @@ public class DnsNameResolverTest {
private final Queue<String> invocations = new LinkedList<String>();
MockResolver addAnswer(List<InetAddress> addresses) {
return addAnswer(addresses, null);
return addAnswer(addresses, null, null);
}
MockResolver addAnswer(List<InetAddress> addresses, List<String> txtRecords) {
MockResolver addAnswer(
List<InetAddress> addresses,
List<String> txtRecords,
List<EquivalentAddressGroup> balancerAddresses) {
answers.add(
new ResolutionResults(
addresses,
MoreObjects.firstNonNull(txtRecords, Collections.<String>emptyList())));
MoreObjects.firstNonNull(txtRecords, Collections.<String>emptyList()),
MoreObjects.firstNonNull(
balancerAddresses, Collections.<EquivalentAddressGroup>emptyList())));
return this;
}

View File

@ -40,13 +40,6 @@ public final class GrpclbConstants {
public static final Attributes.Key<LbPolicy> ATTR_LB_POLICY =
Attributes.Key.of("io.grpc.grpclb.lbPolicy");
/**
* The naming authority of an LB server address. It is an address-group-level attribute, present
* when the address group is a LoadBalancer.
*/
public static final Attributes.Key<String> ATTR_LB_ADDR_AUTHORITY =
Attributes.Key.of("io.grpc.grpclb.lbAddrAuthority");
/**
* The opaque token given by the remote balancer for each returned server address. The client
* will send this token with any requests sent to the associated server.

View File

@ -27,6 +27,7 @@ import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.grpclb.GrpclbConstants.LbPolicy;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ObjectPool;
import java.util.ArrayList;
import java.util.Collections;
@ -104,7 +105,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements InternalWithLogId {
List<LbAddressGroup> newLbAddressGroups = new ArrayList<LbAddressGroup>();
List<EquivalentAddressGroup> newBackendServers = new ArrayList<EquivalentAddressGroup>();
for (EquivalentAddressGroup server : updatedServers) {
String lbAddrAuthority = server.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
String lbAddrAuthority = server.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY);
if (lbAddrAuthority != null) {
newLbAddressGroups.add(new LbAddressGroup(server, lbAddrAuthority));
} else {

View File

@ -71,6 +71,7 @@ import io.grpc.grpclb.GrpclbState.RoundRobinPicker;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SerializingExecutor;
import io.grpc.stub.StreamObserver;
@ -1623,7 +1624,7 @@ public class GrpclbLoadBalancerTest {
private static Attributes lbAttributes(String authority) {
return Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority)
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority)
.build();
}