tsan, xds: fix XdsClientWrapperForServerSds data races (#8107)

This commit is contained in:
yifeizhuang 2021-04-26 14:37:11 -07:00 committed by GitHub
parent 8468b5c42f
commit 6755cfed34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 16 additions and 12 deletions

View File

@ -54,6 +54,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -70,7 +71,7 @@ public final class XdsClientWrapperForServerSds {
private static final TimeServiceResource timeServiceResource = private static final TimeServiceResource timeServiceResource =
new TimeServiceResource("GrpcServerXdsClient"); new TimeServiceResource("GrpcServerXdsClient");
private EnvoyServerProtoData.Listener curListener; private AtomicReference<EnvoyServerProtoData.Listener> curListener = new AtomicReference<>();
@SuppressWarnings("unused") @SuppressWarnings("unused")
@Nullable private XdsClient xdsClient; @Nullable private XdsClient xdsClient;
private final int port; private final int port;
@ -137,14 +138,14 @@ public final class XdsClientWrapperForServerSds {
new XdsClient.LdsResourceWatcher() { new XdsClient.LdsResourceWatcher() {
@Override @Override
public void onChanged(XdsClient.LdsUpdate update) { public void onChanged(XdsClient.LdsUpdate update) {
curListener = update.listener; curListener.set(update.listener);
reportSuccess(); reportSuccess();
} }
@Override @Override
public void onResourceDoesNotExist(String resourceName) { public void onResourceDoesNotExist(String resourceName) {
logger.log(Level.WARNING, "Resource {0} is unavailable", resourceName); logger.log(Level.WARNING, "Resource {0} is unavailable", resourceName);
curListener = null; curListener.set(null);
reportError(Status.NOT_FOUND.asException(), true); reportError(Status.NOT_FOUND.asException(), true);
} }
@ -180,7 +181,8 @@ public final class XdsClientWrapperForServerSds {
*/ */
@Nullable @Nullable
public DownstreamTlsContext getDownstreamTlsContext(Channel channel) { public DownstreamTlsContext getDownstreamTlsContext(Channel channel) {
if (curListener != null && channel != null) { EnvoyServerProtoData.Listener copyListener = curListener.get();
if (copyListener != null && channel != null) {
SocketAddress localAddress = channel.localAddress(); SocketAddress localAddress = channel.localAddress();
SocketAddress remoteAddress = channel.remoteAddress(); SocketAddress remoteAddress = channel.remoteAddress();
if (localAddress instanceof InetSocketAddress && remoteAddress instanceof InetSocketAddress) { if (localAddress instanceof InetSocketAddress && remoteAddress instanceof InetSocketAddress) {
@ -189,7 +191,7 @@ public final class XdsClientWrapperForServerSds {
checkState( checkState(
port == localInetAddr.getPort(), port == localInetAddr.getPort(),
"Channel localAddress port does not match requested listener port"); "Channel localAddress port does not match requested listener port");
return getDownstreamTlsContext(localInetAddr, remoteInetAddr); return getDownstreamTlsContext(localInetAddr, remoteInetAddr, copyListener);
} }
} }
return null; return null;
@ -204,9 +206,10 @@ public final class XdsClientWrapperForServerSds {
* @param localInetAddr dest address of the inbound connection * @param localInetAddr dest address of the inbound connection
* @param remoteInetAddr source address of the inbound connection * @param remoteInetAddr source address of the inbound connection
*/ */
private DownstreamTlsContext getDownstreamTlsContext( private static DownstreamTlsContext getDownstreamTlsContext(
InetSocketAddress localInetAddr, InetSocketAddress remoteInetAddr) { InetSocketAddress localInetAddr, InetSocketAddress remoteInetAddr,
List<FilterChain> filterChains = curListener.getFilterChains(); EnvoyServerProtoData.Listener listener) {
List<FilterChain> filterChains = listener.getFilterChains();
filterChains = filterOnDestinationPort(filterChains); filterChains = filterOnDestinationPort(filterChains);
filterChains = filterOnIpAddress(filterChains, localInetAddr.getAddress(), true); filterChains = filterOnIpAddress(filterChains, localInetAddr.getAddress(), true);
@ -221,7 +224,7 @@ public final class XdsClientWrapperForServerSds {
} else if (filterChains.size() == 1) { } else if (filterChains.size() == 1) {
return filterChains.get(0).getDownstreamTlsContext(); return filterChains.get(0).getDownstreamTlsContext();
} }
return curListener.getDefaultFilterChain().getDownstreamTlsContext(); return listener.getDefaultFilterChain().getDownstreamTlsContext();
} }
// destination_port present => Always fail match // destination_port present => Always fail match
@ -255,7 +258,7 @@ public final class XdsClientWrapperForServerSds {
return filteredOnMatch.isEmpty() ? filteredOnEmpty : filteredOnMatch; return filteredOnMatch.isEmpty() ? filteredOnEmpty : filteredOnMatch;
} }
private List<FilterChain> filterOnSourceType( private static List<FilterChain> filterOnSourceType(
List<FilterChain> filterChains, InetAddress sourceAddress, InetAddress destAddress) { List<FilterChain> filterChains, InetAddress sourceAddress, InetAddress destAddress) {
ArrayList<FilterChain> filtered = new ArrayList<>(filterChains.size()); ArrayList<FilterChain> filtered = new ArrayList<>(filterChains.size());
for (FilterChain filterChain : filterChains) { for (FilterChain filterChain : filterChains) {
@ -350,7 +353,7 @@ public final class XdsClientWrapperForServerSds {
} }
// use prefix_ranges (CIDR) and get the most specific matches // use prefix_ranges (CIDR) and get the most specific matches
private List<FilterChain> filterOnIpAddress( private static List<FilterChain> filterOnIpAddress(
List<FilterChain> filterChains, InetAddress address, boolean forDestination) { List<FilterChain> filterChains, InetAddress address, boolean forDestination) {
PriorityQueue<QueueElement> heap = new PriorityQueue<>(10, new QueueElementComparator()); PriorityQueue<QueueElement> heap = new PriorityQueue<>(10, new QueueElementComparator());
@ -384,7 +387,8 @@ public final class XdsClientWrapperForServerSds {
synchronized (serverWatchers) { synchronized (serverWatchers) {
serverWatchers.add(serverWatcher); serverWatchers.add(serverWatcher);
} }
if (curListener != null) { EnvoyServerProtoData.Listener copyListener = curListener.get();
if (copyListener != null) {
serverWatcher.onListenerUpdate(); serverWatcher.onListenerUpdate();
} }
} }