xds: Drain old server connections on Listener updates

This is necessary to make sure all connections are using the new
configuration.
This commit is contained in:
Eric Anderson 2021-09-14 16:40:14 -07:00
parent 5307b69c9e
commit 43b507160f
10 changed files with 508 additions and 144 deletions

View File

@ -17,7 +17,8 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_REF;
import static io.grpc.xds.InternalXdsAttributes.ATTR_DRAIN_GRACE_NANOS;
import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_MANAGER;
import static io.grpc.xds.XdsServerWrapper.ATTR_SERVER_ROUTING_CONFIG;
import static io.grpc.xds.internal.sds.SdsProtocolNegotiators.ATTR_SERVER_SSL_CONTEXT_PROVIDER_SUPPLIER;
@ -28,6 +29,7 @@ import com.google.protobuf.UInt32Value;
import io.grpc.Attributes;
import io.grpc.internal.ObjectPool;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalGracefulServerCloseCommand;
import io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
@ -40,6 +42,8 @@ import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHan
import io.grpc.xds.XdsServerWrapper.ServerRoutingConfig;
import io.grpc.xds.internal.Matchers.CidrMatcher;
import io.grpc.xds.internal.sds.SslContextProviderSupplier;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -54,7 +58,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -77,14 +81,16 @@ final class FilterChainMatchingProtocolNegotiators {
static final class FilterChainMatchingHandler extends ChannelInboundHandlerAdapter {
private final GrpcHttp2ConnectionHandler grpcHandler;
private final FilterChainSelector selector;
private final FilterChainSelectorManager filterChainSelectorManager;
private final ProtocolNegotiator delegate;
FilterChainMatchingHandler(
GrpcHttp2ConnectionHandler grpcHandler, FilterChainSelector selector,
GrpcHttp2ConnectionHandler grpcHandler,
FilterChainSelectorManager filterChainSelectorManager,
ProtocolNegotiator delegate) {
this.grpcHandler = checkNotNull(grpcHandler, "grpcHandler");
this.selector = checkNotNull(selector, "selector");
this.filterChainSelectorManager =
checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
this.delegate = checkNotNull(delegate, "delegate");
}
@ -94,6 +100,19 @@ final class FilterChainMatchingProtocolNegotiators {
super.userEventTriggered(ctx, evt);
return;
}
long drainGraceTime = 0;
TimeUnit drainGraceTimeUnit = null;
Long drainGraceNanosObj = grpcHandler.getEagAttributes().get(ATTR_DRAIN_GRACE_NANOS);
if (drainGraceNanosObj != null) {
drainGraceTime = drainGraceNanosObj;
drainGraceTimeUnit = TimeUnit.NANOSECONDS;
}
FilterChainSelectorManager.Closer closer = new FilterChainSelectorManager.Closer(
new GracefullyShutdownChannelRunnable(ctx.channel(), drainGraceTime, drainGraceTimeUnit));
FilterChainSelector selector = filterChainSelectorManager.register(closer);
ctx.channel().closeFuture().addListener(
new FilterChainSelectorManagerDeregister(filterChainSelectorManager, closer));
checkNotNull(selector, "selector");
SelectedConfig config = selector.select(
(InetSocketAddress) ctx.channel().localAddress(),
(InetSocketAddress) ctx.channel().remoteAddress());
@ -354,10 +373,10 @@ final class FilterChainMatchingProtocolNegotiators {
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
AtomicReference<FilterChainSelector> filterChainSelectorRef =
grpcHandler.getEagAttributes().get(ATTR_FILTER_CHAIN_SELECTOR_REF);
checkNotNull(filterChainSelectorRef, "filterChainSelectorRef");
return new FilterChainMatchingHandler(grpcHandler, filterChainSelectorRef.get(),
FilterChainSelectorManager filterChainSelectorManager =
grpcHandler.getEagAttributes().get(ATTR_FILTER_CHAIN_SELECTOR_MANAGER);
checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
return new FilterChainMatchingHandler(grpcHandler, filterChainSelectorManager,
delegate.newNegotiator(offloadExecutorPool));
}
@ -384,4 +403,42 @@ final class FilterChainMatchingProtocolNegotiators {
this.sslContextProviderSupplier = sslContextProviderSupplier;
}
}
private static class FilterChainSelectorManagerDeregister implements ChannelFutureListener {
private final FilterChainSelectorManager filterChainSelectorManager;
private final FilterChainSelectorManager.Closer closer;
public FilterChainSelectorManagerDeregister(
FilterChainSelectorManager filterChainSelectorManager,
FilterChainSelectorManager.Closer closer) {
this.filterChainSelectorManager =
checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
this.closer = checkNotNull(closer, "closer");
}
@Override public void operationComplete(ChannelFuture future) throws Exception {
filterChainSelectorManager.deregister(closer);
}
}
private static class GracefullyShutdownChannelRunnable implements Runnable {
private final Channel channel;
private final long drainGraceTime;
@Nullable
private final TimeUnit drainGraceTimeUnit;
public GracefullyShutdownChannelRunnable(
Channel channel, long drainGraceTime, @Nullable TimeUnit drainGraceTimeUnit) {
this.channel = checkNotNull(channel, "channel");
this.drainGraceTime = drainGraceTime;
this.drainGraceTimeUnit = drainGraceTimeUnit;
}
@Override public void run() {
Object gracefulCloseCommand = InternalGracefulServerCloseCommand.create(
"xds_drain", drainGraceTime, drainGraceTimeUnit);
channel.writeAndFlush(gracefulCloseCommand)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
}

View File

@ -0,0 +1,95 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import java.util.Comparator;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
/**
* Maintains the current xDS selector and any resources using that selector. When the selector
* changes, old resources are closed to avoid old config usages.
*/
final class FilterChainSelectorManager {
private static final AtomicLong closerId = new AtomicLong();
private final Object lock = new Object();
@GuardedBy("lock")
private FilterChainSelector selector;
// Avoid HashSet since it does not decrease in size, forming a high water mark.
@GuardedBy("lock")
private TreeSet<Closer> closers = new TreeSet<Closer>(new CloserComparator());
public FilterChainSelector register(Closer closer) {
synchronized (lock) {
Preconditions.checkState(closers.add(closer), "closer already registered");
return selector;
}
}
public void deregister(Closer closer) {
synchronized (lock) {
closers.remove(closer);
}
}
/** Only safe to be called by code that is responsible for updating the selector. */
public FilterChainSelector getSelectorToUpdateSelector() {
synchronized (lock) {
return selector;
}
}
public void updateSelector(FilterChainSelector newSelector) {
TreeSet<Closer> oldClosers;
synchronized (lock) {
oldClosers = closers;
closers = new TreeSet<Closer>(closers.comparator());
selector = newSelector;
}
for (Closer closer : oldClosers) {
closer.closer.run();
}
}
@VisibleForTesting
int getRegisterCount() {
synchronized (lock) {
return closers.size();
}
}
public static final class Closer {
private final long id = closerId.getAndIncrement();
private final Runnable closer;
/** {@code closer} may be run multiple times. */
public Closer(Runnable closer) {
this.closer = Preconditions.checkNotNull(closer, "closer");
}
}
private static class CloserComparator implements Comparator<Closer> {
@Override public int compare(Closer c1, Closer c2) {
return Long.compare(c1.id, c2.id);
}
}
}

View File

@ -22,10 +22,8 @@ import io.grpc.Grpc;
import io.grpc.Internal;
import io.grpc.NameResolver;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.internal.sds.SslContextProviderSupplier;
import java.util.concurrent.atomic.AtomicReference;
/**
* Internal attributes used for xDS implementation. Do not use.
@ -81,9 +79,14 @@ public final class InternalXdsAttributes {
* Filter chain match for network filters.
*/
@Grpc.TransportAttr
static final Attributes.Key<AtomicReference<FilterChainSelector>>
ATTR_FILTER_CHAIN_SELECTOR_REF = Attributes.Key.create(
"io.grpc.xds.InternalXdsAttributes.filterChainSelectorRef");
static final Attributes.Key<FilterChainSelectorManager>
ATTR_FILTER_CHAIN_SELECTOR_MANAGER = Attributes.Key.create(
"io.grpc.xds.InternalXdsAttributes.filterChainSelectorManager");
/** Grace time to use when draining. Null for an infinite grace time. */
@Grpc.TransportAttr
static final Attributes.Key<Long> ATTR_DRAIN_GRACE_NANOS =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.drainGraceTime");
private InternalXdsAttributes() {}
}

View File

@ -16,9 +16,11 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_REF;
import static io.grpc.xds.InternalXdsAttributes.ATTR_DRAIN_GRACE_NANOS;
import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_MANAGER;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.DoNotCall;
@ -33,11 +35,10 @@ import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.InternalNettyServerCredentials;
import io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingNegotiatorServerFactory;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
/**
@ -45,6 +46,8 @@ import java.util.logging.Logger;
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/7514")
public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBuilder> {
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000);
private final NettyServerBuilder delegate;
private final int port;
private XdsServingStatusListener xdsServingStatusListener;
@ -52,6 +55,8 @@ public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBui
private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
private XdsClientPoolFactory xdsClientPoolFactory =
SharedXdsClientPoolProvider.getDefaultProvider();
private long drainGraceTime = 10;
private TimeUnit drainGraceTimeUnit = TimeUnit.MINUTES;
private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) {
this.delegate = nettyDelegate;
@ -74,6 +79,26 @@ public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBui
return this;
}
/**
* Sets the grace time when draining connections with outdated configuration. When an xDS config
* update changes connection configuration, pre-existing connections stop accepting new RPCs to be
* replaced by new connections. RPCs on those pre-existing connections have the grace time to
* complete. RPCs that do not complete in time will be cancelled, allowing the connection to
* terminate. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value are considered
* infinite. The default is 10 minutes.
*/
public XdsServerBuilder drainGraceTime(long drainGraceTime, TimeUnit drainGraceTimeUnit) {
checkArgument(drainGraceTime >= 0, "drain grace time must be non-negative: %s",
drainGraceTime);
checkNotNull(drainGraceTimeUnit, "drainGraceTimeUnit");
if (drainGraceTimeUnit.toNanos(drainGraceTime) >= AS_LARGE_AS_INFINITE) {
drainGraceTimeUnit = null;
}
this.drainGraceTime = drainGraceTime;
this.drainGraceTimeUnit = drainGraceTimeUnit;
return this;
}
@DoNotCall("Unsupported. Use forPort(int, ServerCredentials) instead")
public static ServerBuilder<?> forPort(int port) {
throw new UnsupportedOperationException(
@ -94,12 +119,15 @@ public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBui
@Override
public Server build() {
checkState(isServerBuilt.compareAndSet(false, true), "Server already built!");
AtomicReference<FilterChainSelector> filterChainSelectorRef = new AtomicReference<>();
InternalNettyServerBuilder.eagAttributes(delegate, Attributes.newBuilder()
.set(ATTR_FILTER_CHAIN_SELECTOR_REF, filterChainSelectorRef)
.build());
FilterChainSelectorManager filterChainSelectorManager = new FilterChainSelectorManager();
Attributes.Builder builder = Attributes.newBuilder()
.set(ATTR_FILTER_CHAIN_SELECTOR_MANAGER, filterChainSelectorManager);
if (drainGraceTimeUnit != null) {
builder.set(ATTR_DRAIN_GRACE_NANOS, drainGraceTimeUnit.toNanos(drainGraceTime));
}
InternalNettyServerBuilder.eagAttributes(delegate, builder.build());
return new XdsServerWrapper("0.0.0.0:" + port, delegate, xdsServingStatusListener,
filterChainSelectorRef, xdsClientPoolFactory, filterRegistry);
filterChainSelectorManager, xdsClientPoolFactory, filterRegistry);
}
@VisibleForTesting

View File

@ -100,7 +100,7 @@ final class XdsServerWrapper extends Server {
private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
private final XdsClientPoolFactory xdsClientPoolFactory;
private final XdsServingStatusListener listener;
private final AtomicReference<FilterChainSelector> filterChainSelectorRef;
private final FilterChainSelectorManager filterChainSelectorManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private boolean isServing;
@ -117,11 +117,11 @@ final class XdsServerWrapper extends Server {
String listenerAddress,
ServerBuilder<?> delegateBuilder,
XdsServingStatusListener listener,
AtomicReference<FilterChainSelector> filterChainSelectorRef,
FilterChainSelectorManager filterChainSelectorManager,
XdsClientPoolFactory xdsClientPoolFactory,
FilterRegistry filterRegistry) {
this(listenerAddress, delegateBuilder, listener, filterChainSelectorRef, xdsClientPoolFactory,
filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
this(listenerAddress, delegateBuilder, listener, filterChainSelectorManager,
xdsClientPoolFactory, filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
sharedTimeService = true;
}
@ -130,7 +130,7 @@ final class XdsServerWrapper extends Server {
String listenerAddress,
ServerBuilder<?> delegateBuilder,
XdsServingStatusListener listener,
AtomicReference<FilterChainSelector> filterChainSelectorRef,
FilterChainSelectorManager filterChainSelectorManager,
XdsClientPoolFactory xdsClientPoolFactory,
FilterRegistry filterRegistry,
ScheduledExecutorService timeService) {
@ -138,7 +138,8 @@ final class XdsServerWrapper extends Server {
this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
this.listener = checkNotNull(listener, "listener");
this.filterChainSelectorRef = checkNotNull(filterChainSelectorRef, "filterChainSelectorRef");
this.filterChainSelectorManager
= checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.timeService = checkNotNull(timeService, "timeService");
this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
@ -361,8 +362,8 @@ final class XdsServerWrapper extends Server {
}
checkNotNull(update.listener(), "update");
if (!pendingRds.isEmpty()) {
// filter chain state has not yet been applied to filterChainSelectorRef and there are
// two sets of sslContextProviderSuppliers, so we release the old ones.
// filter chain state has not yet been applied to filterChainSelectorManager and there
// are two sets of sslContextProviderSuppliers, so we release the old ones.
releaseSuppliersInFlight();
pendingRds.clear();
}
@ -443,7 +444,7 @@ final class XdsServerWrapper extends Server {
logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName);
xdsClient.cancelLdsResourceWatch(resourceName, this);
List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
filterChainSelectorRef.set(FilterChainSelector.NO_FILTER_CHAIN);
filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
for (SslContextProviderSupplier s: toRelease) {
s.close();
}
@ -460,7 +461,7 @@ final class XdsServerWrapper extends Server {
defaultFilterChain == null ? null : defaultFilterChain.getSslContextProviderSupplier(),
defaultFilterChain == null ? null : generateRoutingConfig(defaultFilterChain));
List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
filterChainSelectorRef.set(selector);
filterChainSelectorManager.updateSelector(selector);
for (SslContextProviderSupplier e: toRelease) {
e.close();
}
@ -482,7 +483,7 @@ final class XdsServerWrapper extends Server {
private void handleConfigNotFound(StatusException exception) {
cleanUpRouteDiscoveryStates();
List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
filterChainSelectorRef.set(FilterChainSelector.NO_FILTER_CHAIN);
filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
for (SslContextProviderSupplier s: toRelease) {
s.close();
}
@ -511,7 +512,7 @@ final class XdsServerWrapper extends Server {
private List<SslContextProviderSupplier> getSuppliersInUse() {
List<SslContextProviderSupplier> toRelease = new ArrayList<>();
FilterChainSelector selector = filterChainSelectorRef.get();
FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector();
if (selector != null) {
for (FilterChain f: selector.getRoutingConfigs().keySet()) {
if (f.getSslContextProviderSupplier() != null) {

View File

@ -65,6 +65,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -87,6 +88,7 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
private ChannelHandlerContext channelHandlerCtx;
@Mock
private ProtocolNegotiator mockDelegate;
private FilterChainSelectorManager selectorManager = new FilterChainSelectorManager();
private static final HttpConnectionManager HTTP_CONNECTION_MANAGER = createRds("routing-config");
private static final String LOCAL_IP = "10.1.2.3"; // dest
private static final String REMOTE_IP = "10.4.2.3"; // source
@ -94,6 +96,16 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
private final ServerRoutingConfig noopConfig = ServerRoutingConfig.create(
new ArrayList<NamedFilterConfig>(), new AtomicReference<ImmutableList<VirtualHost>>());
@After
@SuppressWarnings("FutureReturnValueIgnored")
public void tearDown() {
if (channel.isActive()) {
channel.close();
channel.runPendingTasks();
}
assertThat(selectorManager.getRegisterCount()).isEqualTo(0);
}
@Test
public void nofilterChainMatch_defaultSslContext() throws Exception {
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
@ -103,10 +115,10 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
SslContextProviderSupplier defaultSsl = new SslContextProviderSupplier(createTls(),
tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
new HashMap<FilterChain, ServerRoutingConfig>(), defaultSsl, noopConfig);
selectorManager.updateSelector(new FilterChainSelector(
new HashMap<FilterChain, ServerRoutingConfig>(), defaultSsl, noopConfig));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
setupChannel("172.168.1.1", "172.168.1.2", 80, filterChainMatchingHandler);
ChannelHandlerContext channelHandlerCtx = pipeline.context(filterChainMatchingHandler);
assertThat(channelHandlerCtx).isNotNull();
@ -125,10 +137,10 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
@Test
public void noFilterChainMatch_noDefaultSslContext() {
FilterChainSelector selector = new FilterChainSelector(
new HashMap<FilterChain, ServerRoutingConfig>(), null, null);
selectorManager.updateSelector(new FilterChainSelector(
new HashMap<FilterChain, ServerRoutingConfig>(), null, null));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
setupChannel("172.168.1.1", "172.168.2.2", 90, filterChainMatchingHandler);
channelHandlerCtx = pipeline.context(filterChainMatchingHandler);
assertThat(channelHandlerCtx).isNotNull();
@ -139,6 +151,33 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
assertThat(channel.closeFuture().isDone()).isTrue();
}
@Test
public void filterSelectorChange_drainsConnection() {
ChannelHandler next = new ChannelInboundHandlerAdapter();
when(mockDelegate.newHandler(grpcHandler)).thenReturn(next);
selectorManager.updateSelector(new FilterChainSelector(
new HashMap<FilterChain, ServerRoutingConfig>(), null, noopConfig));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
setupChannel("172.168.1.1", "172.168.2.2", 90, filterChainMatchingHandler);
channelHandlerCtx = pipeline.context(filterChainMatchingHandler);
assertThat(channelHandlerCtx).isNotNull();
pipeline.fireUserEventTriggered(event);
channelHandlerCtx = pipeline.context(filterChainMatchingHandler);
assertThat(channelHandlerCtx).isNull();
channel.runPendingTasks();
channelHandlerCtx = pipeline.context(next);
assertThat(channelHandlerCtx).isNotNull();
assertThat(channel.readOutbound()).isNull();
selectorManager.updateSelector(new FilterChainSelector(
new HashMap<FilterChain, ServerRoutingConfig>(), null, noopConfig));
assertThat(channel.readOutbound().getClass().getName())
.isEqualTo("io.grpc.netty.GracefulServerCloseCommand");
}
@Test
public void singleFilterChainWithoutAlpn() throws Exception {
EnvoyServerProtoData.FilterChainMatch filterChainMatch =
@ -157,10 +196,10 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
"filter-chain-foo", filterChainMatch, HTTP_CONNECTION_MANAGER, tlsContext,
tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(ImmutableMap.of(filterChain, noopConfig),
null, null);
selectorManager.updateSelector(new FilterChainSelector(ImmutableMap.of(filterChain, noopConfig),
null, null));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
ChannelHandler next = captureAttrHandler(sslSet, routingSettable);
@ -196,11 +235,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-bar", null, HTTP_CONNECTION_MANAGER, defaultTlsContext,
tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChain, randomConfig("no-match")),
defaultFilterChain.getSslContextProviderSupplier(), noopConfig);
defaultFilterChain.getSslContextProviderSupplier(), noopConfig));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -242,12 +281,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
ServerRoutingConfig routingConfig = ServerRoutingConfig.create(
new ArrayList<NamedFilterConfig>(), new AtomicReference<>(
ImmutableList.of(createVirtualHost("virtual"))));
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainWithDestPort, routingConfig),
defaultFilterChain.getSslContextProviderSupplier(), noopConfig);
defaultFilterChain.getSslContextProviderSupplier(), noopConfig));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -285,12 +324,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
"filter-chain-bar", null, HTTP_CONNECTION_MANAGER,
tlsContextForDefaultFilterChain, tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainWithMatch, noopConfig),
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("no-match"));
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("no-match")));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -329,12 +368,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-bar", null, HTTP_CONNECTION_MANAGER,
tlsContextForDefaultFilterChain, tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainWithMismatch, randomConfig("no-match")),
defaultFilterChain.getSslContextProviderSupplier(), noopConfig);
defaultFilterChain.getSslContextProviderSupplier(), noopConfig));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -374,11 +413,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
"filter-chain-bar", null, HTTP_CONNECTION_MANAGER,
tlsContextForDefaultFilterChain, tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChain0Length, noopConfig),
defaultFilterChain.getSslContextProviderSupplier(), null);
defaultFilterChain.getSslContextProviderSupplier(), null));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -431,13 +470,13 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
tlsContextManager);
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainLessSpecific, randomConfig("no-match"),
filterChainMoreSpecific, noopConfig),
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"));
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -490,12 +529,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
tlsContextManager);
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainLessSpecific, randomConfig("no-match"),
filterChainMoreSpecific, noopConfig),
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"));
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -547,13 +586,13 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
tlsContextMoreSpecific, tlsContextManager);
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainLessSpecific, randomConfig("no-match"),
filterChainMoreSpecific, noopConfig),
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"));
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -610,12 +649,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainMoreSpecificWith2, noopConfig,
filterChainLessSpecific, randomConfig("no-match")),
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"));
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -653,11 +692,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-bar", null, HTTP_CONNECTION_MANAGER,tlsContextForDefaultFilterChain,
tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainWithMismatch, randomConfig("no-match")),
defaultFilterChain.getSslContextProviderSupplier(), noopConfig);
defaultFilterChain.getSslContextProviderSupplier(), noopConfig));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -698,11 +737,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
"filter-chain-bar", null, HTTP_CONNECTION_MANAGER, tlsContextForDefaultFilterChain,
tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainWithMatch, noopConfig),
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"));
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
setupChannel(LOCAL_IP, LOCAL_IP, 15000, filterChainMatchingHandler);
pipeline.fireUserEventTriggered(event);
channel.runPendingTasks();
@ -757,13 +796,13 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainMoreSpecificWith2, noopConfig,
filterChainLessSpecific, randomConfig("no-match")),
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"));
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
setupChannel(LOCAL_IP, REMOTE_IP, 15000, filterChainMatchingHandler);
pipeline.fireUserEventTriggered(event);
channel.runPendingTasks();
@ -823,12 +862,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, null);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChain1, noopConfig, filterChain2, noopConfig),
defaultFilterChain.getSslContextProviderSupplier(), noopConfig);
defaultFilterChain.getSslContextProviderSupplier(), noopConfig));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
setupChannel(LOCAL_IP, REMOTE_IP, 15000, filterChainMatchingHandler);
pipeline.fireUserEventTriggered(event);
channel.runPendingTasks();
@ -884,13 +923,13 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain(
"filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager);
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChainEmptySourcePorts, randomConfig("no-match"),
filterChainSourcePortMatch, noopConfig),
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"));
defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
ChannelHandler next = captureAttrHandler(sslSet, routingSettable);
@ -1040,11 +1079,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
map.put(filterChain4, randomConfig("4"));
map.put(filterChain5, noopConfig);
map.put(filterChain6, randomConfig("6"));
FilterChainSelector selector = new FilterChainSelector(
map, defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"));
selectorManager.updateSelector(new FilterChainSelector(
map, defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
@ -1114,12 +1153,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest {
"filter-chain-baz", defaultFilterChainMatch, HTTP_CONNECTION_MANAGER, tlsContext3,
mock(TlsContextManager.class));
FilterChainSelector selector = new FilterChainSelector(
selectorManager.updateSelector(new FilterChainSelector(
ImmutableMap.of(filterChain1, randomConfig("1"), filterChain2, randomConfig("2")),
defaultFilterChain.getSslContextProviderSupplier(), noopConfig);
defaultFilterChain.getSslContextProviderSupplier(), noopConfig));
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate);
final SettableFuture<SslContextProviderSupplier> sslSet = SettableFuture.create();
final SettableFuture<ServerRoutingConfig> routingSettable = SettableFuture.create();
ChannelHandler next = captureAttrHandler(sslSet, routingSettable);

View File

@ -0,0 +1,107 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.collect.ImmutableList;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
import io.grpc.xds.Filter.NamedFilterConfig;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import io.grpc.xds.FilterChainSelectorManager.Closer;
import io.grpc.xds.XdsServerWrapper.ServerRoutingConfig;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public final class FilterChainSelectorManagerTest {
private FilterChainSelectorManager manager = new FilterChainSelectorManager();
private ServerRoutingConfig noopConfig = ServerRoutingConfig.create(
Collections.<NamedFilterConfig>emptyList(),
new AtomicReference<ImmutableList<VirtualHost>>());
private FilterChainSelector selector1 = new FilterChainSelector(
Collections.<FilterChain,ServerRoutingConfig>emptyMap(), null, null);
private FilterChainSelector selector2 = new FilterChainSelector(
Collections.<FilterChain,ServerRoutingConfig>emptyMap(), null, noopConfig);
private CounterRunnable runnable1 = new CounterRunnable();
private CounterRunnable runnable2 = new CounterRunnable();
@Test
public void updateSelector_changesSelector() {
assertThat(manager.getSelectorToUpdateSelector()).isNull();
assertThat(manager.register(new Closer(runnable1))).isNull();
manager.updateSelector(selector1);
assertThat(runnable1.counter).isEqualTo(1);
assertThat(manager.getSelectorToUpdateSelector()).isSameInstanceAs(selector1);
assertThat(manager.register(new Closer(runnable2))).isSameInstanceAs(selector1);
assertThat(runnable2.counter).isEqualTo(0);
}
@Test
public void updateSelector_callsCloserOnce() {
assertThat(manager.register(new Closer(runnable1))).isNull();
manager.updateSelector(selector1);
manager.updateSelector(selector2);
assertThat(runnable1.counter).isEqualTo(1);
}
@Test
public void deregister_removesCloser() {
Closer closer1 = new Closer(runnable1);
manager.updateSelector(selector1);
assertThat(manager.register(closer1)).isSameInstanceAs(selector1);
assertThat(manager.getRegisterCount()).isEqualTo(1);
manager.deregister(closer1);
assertThat(manager.getRegisterCount()).isEqualTo(0);
manager.updateSelector(selector2);
assertThat(runnable1.counter).isEqualTo(0);
}
@Test
public void deregister_removesCorrectCloser() {
Closer closer1 = new Closer(runnable1);
Closer closer2 = new Closer(runnable2);
manager.updateSelector(selector1);
assertThat(manager.register(closer1)).isSameInstanceAs(selector1);
assertThat(manager.register(closer2)).isSameInstanceAs(selector1);
assertThat(manager.getRegisterCount()).isEqualTo(2);
manager.deregister(closer1);
assertThat(manager.getRegisterCount()).isEqualTo(1);
manager.updateSelector(selector2);
assertThat(runnable1.counter).isEqualTo(0);
assertThat(runnable2.counter).isEqualTo(1);
}
private static class CounterRunnable implements Runnable {
int counter;
@Override public void run() {
counter++;
}
}
}

View File

@ -72,7 +72,6 @@ import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -101,7 +100,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
@Mock
private XdsServingStatusListener listener;
private FakeXdsClient xdsClient = new FakeXdsClient();
private AtomicReference<FilterChainSelector> selectorRef = new AtomicReference<>();
private FilterChainSelectorManager selectorManager = new FilterChainSelectorManager();
private XdsServerWrapper xdsServerWrapper;
@ -117,13 +116,14 @@ public class XdsClientWrapperForServerSdsTestMisc {
when(mockBuilder.build()).thenReturn(mockServer);
when(mockServer.isShutdown()).thenReturn(false);
xdsServerWrapper = new XdsServerWrapper("0.0.0.0:" + PORT, mockBuilder, listener,
selectorRef, new FakeXdsClientPoolFactory(xdsClient), FilterRegistry.newRegistry());
selectorManager, new FakeXdsClientPoolFactory(xdsClient), FilterRegistry.newRegistry());
}
@Test
public void nonInetSocketAddress_expectNull() throws Exception {
sendListenerUpdate(new InProcessSocketAddress("test1"), null, null, tlsContextManager);
assertThat(getSslContextProviderSupplier(selectorRef.get())).isNull();
assertThat(getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector()))
.isNull();
}
@Test
@ -168,7 +168,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener);
xdsClient.ldsWatcher.onChanged(listenerUpdate);
start.get(5, TimeUnit.SECONDS);
FilterChainSelector selector = selectorRef.get();
FilterChainSelector selector = selectorManager.getSelectorToUpdateSelector();
assertThat(getSslContextProviderSupplier(selector)).isNull();
}
@ -193,7 +193,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
}
assertThat(selectorRef.get()).isSameInstanceAs(NO_FILTER_CHAIN);
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
}
@Test
@ -217,7 +217,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
}
assertThat(selectorRef.get()).isSameInstanceAs(NO_FILTER_CHAIN);
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
}
@Test
@ -241,7 +241,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
}
assertThat(selectorRef.get()).isSameInstanceAs(NO_FILTER_CHAIN);
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
}
@Test
@ -263,13 +263,14 @@ public class XdsClientWrapperForServerSdsTestMisc {
localAddress = new InetSocketAddress(ipLocalAddress, PORT);
sendListenerUpdate(localAddress, tlsContext1, null,
tlsContextManager);
SslContextProviderSupplier returnedSupplier = getSslContextProviderSupplier(selectorRef.get());
SslContextProviderSupplier returnedSupplier =
getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector());
assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext1);
callUpdateSslContext(returnedSupplier);
XdsServerTestHelper
.generateListenerUpdate(xdsClient, Arrays.<Integer>asList(1234), tlsContext2,
tlsContext3, tlsContextManager);
returnedSupplier = getSslContextProviderSupplier(selectorRef.get());
returnedSupplier = getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector());
assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext2);
verify(tlsContextManager, times(1)).releaseServerSslContextProvider(eq(sslContextProvider1));
reset(tlsContextManager);
@ -294,7 +295,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
}
};
pipeline = channel.pipeline();
returnedSupplier = getSslContextProviderSupplier(selectorRef.get());
returnedSupplier = getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector());
assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext3);
callUpdateSslContext(returnedSupplier);
xdsServerWrapper.shutdown();
@ -314,7 +315,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
sendListenerUpdate(localAddress, tlsContext1, null,
tlsContextManager);
SslContextProviderSupplier returnedSupplier =
getSslContextProviderSupplier(selectorRef.get());
getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector());
assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext1);
callUpdateSslContext(returnedSupplier);
xdsClient.ldsWatcher.onResourceDoesNotExist("not-found Error");
@ -331,7 +332,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
sendListenerUpdate(localAddress, tlsContext1, null,
tlsContextManager);
SslContextProviderSupplier returnedSupplier =
getSslContextProviderSupplier(selectorRef.get());
getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector());
assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext1);
callUpdateSslContext(returnedSupplier);
xdsClient.ldsWatcher.onError(Status.PERMISSION_DENIED);
@ -348,7 +349,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
sendListenerUpdate(localAddress, tlsContext1, null,
tlsContextManager);
SslContextProviderSupplier returnedSupplier =
getSslContextProviderSupplier(selectorRef.get());
getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector());
assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext1);
callUpdateSslContext(returnedSupplier);
xdsClient.ldsWatcher.onError(Status.CANCELLED);
@ -412,8 +413,10 @@ public class XdsClientWrapperForServerSdsTestMisc {
ProtocolNegotiator mockDelegate = mock(ProtocolNegotiator.class);
GrpcHttp2ConnectionHandler grpcHandler = FakeGrpcHttp2ConnectionHandler.newHandler();
when(mockDelegate.newHandler(grpcHandler)).thenReturn(next);
FilterChainSelectorManager manager = new FilterChainSelectorManager();
manager.updateSelector(selector);
FilterChainMatchingHandler filterChainMatchingHandler =
new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate);
new FilterChainMatchingHandler(grpcHandler, manager, mockDelegate);
pipeline.addLast(filterChainMatchingHandler);
ProtocolNegotiationEvent event = InternalProtocolNegotiationEvent.getDefault();
pipeline.fireUserEventTriggered(event);

View File

@ -281,4 +281,15 @@ public class XdsServerBuilderTest {
assertThat(expected).hasMessageThat().contains("Server already built!");
}
}
@Test
public void drainGraceTime_negativeThrows() throws IOException {
buildBuilder(null);
try {
builder.drainGraceTime(-1, TimeUnit.SECONDS);
fail("exception expected");
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat().contains("drain grace time");
}
}
}

View File

@ -99,7 +99,7 @@ public class XdsServerWrapperTest {
@Mock
private XdsServingStatusListener listener;
private AtomicReference<FilterChainSelector> selectorRef = new AtomicReference<>();
private FilterChainSelectorManager selectorManager = new FilterChainSelectorManager();
private FakeClock executor = new FakeClock();
private FakeXdsClient xdsClient = new FakeXdsClient();
private FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
@ -109,7 +109,7 @@ public class XdsServerWrapperTest {
public void setup() {
when(mockBuilder.build()).thenReturn(mockServer);
xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener,
selectorRef, new FakeXdsClientPoolFactory(xdsClient),
selectorManager, new FakeXdsClientPoolFactory(xdsClient),
filterRegistry, executor.getScheduledExecutorService());
}
@ -141,7 +141,7 @@ public class XdsServerWrapperTest {
XdsClient xdsClient = mock(XdsClient.class);
when(xdsClient.getBootstrapInfo()).thenReturn(b);
xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener,
selectorRef, new FakeXdsClientPoolFactory(xdsClient), filterRegistry);
selectorManager, new FakeXdsClientPoolFactory(xdsClient), filterRegistry);
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
@ -377,8 +377,10 @@ public class XdsServerWrapperTest {
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain), null);
start.get(5000, TimeUnit.MILLISECONDS);
assertThat(ldsWatched).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1);
ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(filterChain);
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(filterChain);
assertThat(realConfig.virtualHosts().get()).isEqualTo(httpConnectionManager.virtualHosts());
assertThat(realConfig.httpFilterConfigs()).isEqualTo(httpConnectionManager.httpFilterConfigs());
verify(listener).onServing();
@ -408,7 +410,7 @@ public class XdsServerWrapperTest {
xdsClient.rdsCount = new CountDownLatch(3);
xdsClient.deliverLdsUpdate(Arrays.asList(f0, f1), null);
assertThat(start.isDone()).isFalse();
assertThat(selectorRef.get()).isNull();
assertThat(selectorManager.getSelectorToUpdateSelector()).isNull();
verify(mockServer, never()).start();
verify(listener, never()).onServing();
@ -426,23 +428,26 @@ public class XdsServerWrapperTest {
Collections.singletonList(createVirtualHost("virtual-host-2")));
start.get(5000, TimeUnit.MILLISECONDS);
verify(mockServer).start();
ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(f0);
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f0);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f0.getHttpConnectionManager().httpFilterConfigs());
assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(2);
realConfig = selectorRef.get().getRoutingConfigs().get(f2);
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(2);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f2);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f2.getHttpConnectionManager().httpFilterConfigs());
realConfig = selectorRef.get().getDefaultRoutingConfig();
realConfig = selectorManager.getSelectorToUpdateSelector().getDefaultRoutingConfig();
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-2")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f3.getHttpConnectionManager().httpFilterConfigs());
assertThat(selectorRef.get().getDefaultSslContextProviderSupplier()).isEqualTo(
assertThat(selectorManager.getSelectorToUpdateSelector().getDefaultSslContextProviderSupplier())
.isEqualTo(
f3.getSslContextProviderSupplier());
}
@ -468,31 +473,32 @@ public class XdsServerWrapperTest {
xdsClient.rdsCount = new CountDownLatch(1);
xdsClient.deliverLdsUpdate(Arrays.asList(f0, f1), f2);
assertThat(start.isDone()).isFalse();
assertThat(selectorRef.get()).isNull();
assertThat(selectorManager.getSelectorToUpdateSelector()).isNull();
xdsClient.rdsCount.await(5, TimeUnit.SECONDS);
xdsClient.deliverRdsUpdate("r0",
Collections.singletonList(createVirtualHost("virtual-host-0")));
start.get(5000, TimeUnit.MILLISECONDS);
verify(mockServer, times(1)).start();
ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(f0);
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f0);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f0.getHttpConnectionManager().httpFilterConfigs());
realConfig = selectorRef.get().getRoutingConfigs().get(f1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f1.getHttpConnectionManager().httpFilterConfigs());
realConfig = selectorRef.get().getDefaultRoutingConfig();
realConfig = selectorManager.getSelectorToUpdateSelector().getDefaultRoutingConfig();
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f2.getHttpConnectionManager().httpFilterConfigs());
assertThat(selectorRef.get().getDefaultSslContextProviderSupplier()).isSameInstanceAs(
f2.getSslContextProviderSupplier());
assertThat(selectorManager.getSelectorToUpdateSelector().getDefaultSslContextProviderSupplier())
.isSameInstanceAs(f2.getSslContextProviderSupplier());
EnvoyServerProtoData.FilterChain f3 = createFilterChain("filter-chain-3", createRds("r0"));
EnvoyServerProtoData.FilterChain f4 = createFilterChain("filter-chain-4", createRds("r1"));
@ -505,24 +511,26 @@ public class XdsServerWrapperTest {
xdsClient.deliverRdsUpdate("r0",
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(2);
realConfig = selectorRef.get().getRoutingConfigs().get(f5);
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(2);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f5);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f5.getHttpConnectionManager().httpFilterConfigs());
realConfig = selectorRef.get().getRoutingConfigs().get(f3);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f3);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f3.getHttpConnectionManager().httpFilterConfigs());
realConfig = selectorRef.get().getDefaultRoutingConfig();
realConfig = selectorManager.getSelectorToUpdateSelector().getDefaultRoutingConfig();
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f4.getHttpConnectionManager().httpFilterConfigs());
assertThat(selectorRef.get().getDefaultSslContextProviderSupplier()).isSameInstanceAs(
assertThat(selectorManager.getSelectorToUpdateSelector().getDefaultSslContextProviderSupplier())
.isSameInstanceAs(
f4.getSslContextProviderSupplier());
verify(mockServer, times(1)).start();
xdsServerWrapper.shutdown();
@ -556,33 +564,35 @@ public class XdsServerWrapperTest {
xdsClient.rdsCount.await();
xdsClient.rdsWatchers.get("r0").onError(Status.CANCELLED);
start.get(5000, TimeUnit.MILLISECONDS);
assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(2);
ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(f1);
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(2);
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1);
assertThat(realConfig.virtualHosts().get()).isNull();
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f1.getHttpConnectionManager().httpFilterConfigs());
realConfig = selectorRef.get().getRoutingConfigs().get(f0);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f0);
assertThat(realConfig.virtualHosts().get()).isEqualTo(hcmVirtual.virtualHosts());
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f0.getHttpConnectionManager().httpFilterConfigs());
xdsClient.deliverRdsUpdate("r0",
Collections.singletonList(createVirtualHost("virtual-host-1")));
realConfig = selectorRef.get().getRoutingConfigs().get(f1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f1.getHttpConnectionManager().httpFilterConfigs());
xdsClient.rdsWatchers.get("r0").onError(Status.CANCELLED);
realConfig = selectorRef.get().getRoutingConfigs().get(f1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f1.getHttpConnectionManager().httpFilterConfigs());
xdsClient.rdsWatchers.get("r0").onResourceDoesNotExist("r0");
realConfig = selectorRef.get().getRoutingConfigs().get(f1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1);
assertThat(realConfig.virtualHosts().get()).isNull();
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
f1.getHttpConnectionManager().httpFilterConfigs());
@ -615,7 +625,8 @@ public class XdsServerWrapperTest {
SslContextProviderSupplier sslSupplier0 = filterChain0.getSslContextProviderSupplier();
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain0), null);
xdsClient.ldsWatcher.onError(Status.INTERNAL);
assertThat(selectorRef.get()).isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN);
assertThat(selectorManager.getSelectorToUpdateSelector())
.isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN);
assertThat(xdsClient.rdsWatchers).isEmpty();
verify(mockBuilder, times(1)).build();
verify(listener, times(2)).onNotServing(any(StatusException.class));
@ -634,8 +645,10 @@ public class XdsServerWrapperTest {
verify(mockBuilder, times(1)).build();
verify(mockServer, times(2)).start();
verify(listener, times(1)).onServing();
assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1);
ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(filterChain1);
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(filterChain1);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
@ -648,8 +661,10 @@ public class XdsServerWrapperTest {
verify(mockBuilder, times(1)).build();
verify(mockServer, times(2)).start();
verify(listener, times(2)).onNotServing(any(StatusException.class));
assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1);
realConfig = selectorRef.get().getRoutingConfigs().get(filterChain1);
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs()
.get(filterChain1);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-2")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
@ -661,7 +676,8 @@ public class XdsServerWrapperTest {
assertThat(xdsClient.rdsWatchers).isEmpty();
verify(mockServer, times(3)).shutdown();
when(mockServer.isShutdown()).thenReturn(true);
assertThat(selectorRef.get()).isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN);
assertThat(selectorManager.getSelectorToUpdateSelector())
.isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN);
verify(listener, times(3)).onNotServing(any(StatusException.class));
assertThat(sslSupplier1.isShutdown()).isTrue();
// no op
@ -686,8 +702,10 @@ public class XdsServerWrapperTest {
verify(mockServer, times(3)).start();
verify(listener, times(1)).onServing();
verify(listener, times(3)).onNotServing(any(StatusException.class));
assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1);
realConfig = selectorRef.get().getRoutingConfigs().get(filterChain2);
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs()
.get(filterChain2);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(
@ -712,8 +730,10 @@ public class XdsServerWrapperTest {
when(mockServer.isShutdown()).thenReturn(false);
verify(listener, times(4)).onNotServing(any(StatusException.class));
assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1);
realConfig = selectorRef.get().getRoutingConfigs().get(filterChain3);
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs()
.get(filterChain3);
assertThat(realConfig.virtualHosts().get()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.httpFilterConfigs()).isEqualTo(