diff --git a/xds/src/main/java/io/grpc/xds/FilterChainMatchingProtocolNegotiators.java b/xds/src/main/java/io/grpc/xds/FilterChainMatchingProtocolNegotiators.java index 0c8780fe74..24cd4e9ae7 100644 --- a/xds/src/main/java/io/grpc/xds/FilterChainMatchingProtocolNegotiators.java +++ b/xds/src/main/java/io/grpc/xds/FilterChainMatchingProtocolNegotiators.java @@ -17,7 +17,8 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_REF; +import static io.grpc.xds.InternalXdsAttributes.ATTR_DRAIN_GRACE_NANOS; +import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_MANAGER; import static io.grpc.xds.XdsServerWrapper.ATTR_SERVER_ROUTING_CONFIG; import static io.grpc.xds.internal.sds.SdsProtocolNegotiators.ATTR_SERVER_SSL_CONTEXT_PROVIDER_SUPPLIER; @@ -28,6 +29,7 @@ import com.google.protobuf.UInt32Value; import io.grpc.Attributes; import io.grpc.internal.ObjectPool; import io.grpc.netty.GrpcHttp2ConnectionHandler; +import io.grpc.netty.InternalGracefulServerCloseCommand; import io.grpc.netty.InternalProtocolNegotiationEvent; import io.grpc.netty.InternalProtocolNegotiator; import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator; @@ -40,6 +42,8 @@ import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHan import io.grpc.xds.XdsServerWrapper.ServerRoutingConfig; import io.grpc.xds.internal.Matchers.CidrMatcher; import io.grpc.xds.internal.sds.SslContextProviderSupplier; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -54,7 +58,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -77,14 +81,16 @@ final class FilterChainMatchingProtocolNegotiators { static final class FilterChainMatchingHandler extends ChannelInboundHandlerAdapter { private final GrpcHttp2ConnectionHandler grpcHandler; - private final FilterChainSelector selector; + private final FilterChainSelectorManager filterChainSelectorManager; private final ProtocolNegotiator delegate; FilterChainMatchingHandler( - GrpcHttp2ConnectionHandler grpcHandler, FilterChainSelector selector, + GrpcHttp2ConnectionHandler grpcHandler, + FilterChainSelectorManager filterChainSelectorManager, ProtocolNegotiator delegate) { this.grpcHandler = checkNotNull(grpcHandler, "grpcHandler"); - this.selector = checkNotNull(selector, "selector"); + this.filterChainSelectorManager = + checkNotNull(filterChainSelectorManager, "filterChainSelectorManager"); this.delegate = checkNotNull(delegate, "delegate"); } @@ -94,6 +100,19 @@ final class FilterChainMatchingProtocolNegotiators { super.userEventTriggered(ctx, evt); return; } + long drainGraceTime = 0; + TimeUnit drainGraceTimeUnit = null; + Long drainGraceNanosObj = grpcHandler.getEagAttributes().get(ATTR_DRAIN_GRACE_NANOS); + if (drainGraceNanosObj != null) { + drainGraceTime = drainGraceNanosObj; + drainGraceTimeUnit = TimeUnit.NANOSECONDS; + } + FilterChainSelectorManager.Closer closer = new FilterChainSelectorManager.Closer( + new GracefullyShutdownChannelRunnable(ctx.channel(), drainGraceTime, drainGraceTimeUnit)); + FilterChainSelector selector = filterChainSelectorManager.register(closer); + ctx.channel().closeFuture().addListener( + new FilterChainSelectorManagerDeregister(filterChainSelectorManager, closer)); + checkNotNull(selector, "selector"); SelectedConfig config = selector.select( (InetSocketAddress) ctx.channel().localAddress(), (InetSocketAddress) ctx.channel().remoteAddress()); @@ -354,10 +373,10 @@ final class FilterChainMatchingProtocolNegotiators { @Override public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { - AtomicReference filterChainSelectorRef = - grpcHandler.getEagAttributes().get(ATTR_FILTER_CHAIN_SELECTOR_REF); - checkNotNull(filterChainSelectorRef, "filterChainSelectorRef"); - return new FilterChainMatchingHandler(grpcHandler, filterChainSelectorRef.get(), + FilterChainSelectorManager filterChainSelectorManager = + grpcHandler.getEagAttributes().get(ATTR_FILTER_CHAIN_SELECTOR_MANAGER); + checkNotNull(filterChainSelectorManager, "filterChainSelectorManager"); + return new FilterChainMatchingHandler(grpcHandler, filterChainSelectorManager, delegate.newNegotiator(offloadExecutorPool)); } @@ -384,4 +403,42 @@ final class FilterChainMatchingProtocolNegotiators { this.sslContextProviderSupplier = sslContextProviderSupplier; } } + + private static class FilterChainSelectorManagerDeregister implements ChannelFutureListener { + private final FilterChainSelectorManager filterChainSelectorManager; + private final FilterChainSelectorManager.Closer closer; + + public FilterChainSelectorManagerDeregister( + FilterChainSelectorManager filterChainSelectorManager, + FilterChainSelectorManager.Closer closer) { + this.filterChainSelectorManager = + checkNotNull(filterChainSelectorManager, "filterChainSelectorManager"); + this.closer = checkNotNull(closer, "closer"); + } + + @Override public void operationComplete(ChannelFuture future) throws Exception { + filterChainSelectorManager.deregister(closer); + } + } + + private static class GracefullyShutdownChannelRunnable implements Runnable { + private final Channel channel; + private final long drainGraceTime; + @Nullable + private final TimeUnit drainGraceTimeUnit; + + public GracefullyShutdownChannelRunnable( + Channel channel, long drainGraceTime, @Nullable TimeUnit drainGraceTimeUnit) { + this.channel = checkNotNull(channel, "channel"); + this.drainGraceTime = drainGraceTime; + this.drainGraceTimeUnit = drainGraceTimeUnit; + } + + @Override public void run() { + Object gracefulCloseCommand = InternalGracefulServerCloseCommand.create( + "xds_drain", drainGraceTime, drainGraceTimeUnit); + channel.writeAndFlush(gracefulCloseCommand) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/FilterChainSelectorManager.java b/xds/src/main/java/io/grpc/xds/FilterChainSelectorManager.java new file mode 100644 index 0000000000..4295d75f59 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/FilterChainSelectorManager.java @@ -0,0 +1,95 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector; +import java.util.Comparator; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; + +/** + * Maintains the current xDS selector and any resources using that selector. When the selector + * changes, old resources are closed to avoid old config usages. + */ +final class FilterChainSelectorManager { + private static final AtomicLong closerId = new AtomicLong(); + + private final Object lock = new Object(); + @GuardedBy("lock") + private FilterChainSelector selector; + // Avoid HashSet since it does not decrease in size, forming a high water mark. + @GuardedBy("lock") + private TreeSet closers = new TreeSet(new CloserComparator()); + + public FilterChainSelector register(Closer closer) { + synchronized (lock) { + Preconditions.checkState(closers.add(closer), "closer already registered"); + return selector; + } + } + + public void deregister(Closer closer) { + synchronized (lock) { + closers.remove(closer); + } + } + + /** Only safe to be called by code that is responsible for updating the selector. */ + public FilterChainSelector getSelectorToUpdateSelector() { + synchronized (lock) { + return selector; + } + } + + public void updateSelector(FilterChainSelector newSelector) { + TreeSet oldClosers; + synchronized (lock) { + oldClosers = closers; + closers = new TreeSet(closers.comparator()); + selector = newSelector; + } + for (Closer closer : oldClosers) { + closer.closer.run(); + } + } + + @VisibleForTesting + int getRegisterCount() { + synchronized (lock) { + return closers.size(); + } + } + + public static final class Closer { + private final long id = closerId.getAndIncrement(); + private final Runnable closer; + + /** {@code closer} may be run multiple times. */ + public Closer(Runnable closer) { + this.closer = Preconditions.checkNotNull(closer, "closer"); + } + } + + private static class CloserComparator implements Comparator { + @Override public int compare(Closer c1, Closer c2) { + return Long.compare(c1.id, c2.id); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java index 82eddd355a..410a64df9c 100644 --- a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java +++ b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java @@ -22,10 +22,8 @@ import io.grpc.Grpc; import io.grpc.Internal; import io.grpc.NameResolver; import io.grpc.internal.ObjectPool; -import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; import io.grpc.xds.internal.sds.SslContextProviderSupplier; -import java.util.concurrent.atomic.AtomicReference; /** * Internal attributes used for xDS implementation. Do not use. @@ -81,9 +79,14 @@ public final class InternalXdsAttributes { * Filter chain match for network filters. */ @Grpc.TransportAttr - static final Attributes.Key> - ATTR_FILTER_CHAIN_SELECTOR_REF = Attributes.Key.create( - "io.grpc.xds.InternalXdsAttributes.filterChainSelectorRef"); + static final Attributes.Key + ATTR_FILTER_CHAIN_SELECTOR_MANAGER = Attributes.Key.create( + "io.grpc.xds.InternalXdsAttributes.filterChainSelectorManager"); + + /** Grace time to use when draining. Null for an infinite grace time. */ + @Grpc.TransportAttr + static final Attributes.Key ATTR_DRAIN_GRACE_NANOS = + Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.drainGraceTime"); private InternalXdsAttributes() {} } diff --git a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java index 34879fd8cd..c95c1e6d48 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java @@ -16,9 +16,11 @@ package io.grpc.xds; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_REF; +import static io.grpc.xds.InternalXdsAttributes.ATTR_DRAIN_GRACE_NANOS; +import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_MANAGER; import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.DoNotCall; @@ -33,11 +35,10 @@ import io.grpc.netty.InternalNettyServerBuilder; import io.grpc.netty.InternalNettyServerCredentials; import io.grpc.netty.InternalProtocolNegotiator; import io.grpc.netty.NettyServerBuilder; -import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector; import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingNegotiatorServerFactory; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; /** @@ -45,6 +46,8 @@ import java.util.logging.Logger; */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/7514") public final class XdsServerBuilder extends ForwardingServerBuilder { + private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000); + private final NettyServerBuilder delegate; private final int port; private XdsServingStatusListener xdsServingStatusListener; @@ -52,6 +55,8 @@ public final class XdsServerBuilder extends ForwardingServerBuilder= 0, "drain grace time must be non-negative: %s", + drainGraceTime); + checkNotNull(drainGraceTimeUnit, "drainGraceTimeUnit"); + if (drainGraceTimeUnit.toNanos(drainGraceTime) >= AS_LARGE_AS_INFINITE) { + drainGraceTimeUnit = null; + } + this.drainGraceTime = drainGraceTime; + this.drainGraceTimeUnit = drainGraceTimeUnit; + return this; + } + @DoNotCall("Unsupported. Use forPort(int, ServerCredentials) instead") public static ServerBuilder forPort(int port) { throw new UnsupportedOperationException( @@ -94,12 +119,15 @@ public final class XdsServerBuilder extends ForwardingServerBuilder filterChainSelectorRef = new AtomicReference<>(); - InternalNettyServerBuilder.eagAttributes(delegate, Attributes.newBuilder() - .set(ATTR_FILTER_CHAIN_SELECTOR_REF, filterChainSelectorRef) - .build()); + FilterChainSelectorManager filterChainSelectorManager = new FilterChainSelectorManager(); + Attributes.Builder builder = Attributes.newBuilder() + .set(ATTR_FILTER_CHAIN_SELECTOR_MANAGER, filterChainSelectorManager); + if (drainGraceTimeUnit != null) { + builder.set(ATTR_DRAIN_GRACE_NANOS, drainGraceTimeUnit.toNanos(drainGraceTime)); + } + InternalNettyServerBuilder.eagAttributes(delegate, builder.build()); return new XdsServerWrapper("0.0.0.0:" + port, delegate, xdsServingStatusListener, - filterChainSelectorRef, xdsClientPoolFactory, filterRegistry); + filterChainSelectorManager, xdsClientPoolFactory, filterRegistry); } @VisibleForTesting diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index faa6e9d34b..29821f2cba 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -100,7 +100,7 @@ final class XdsServerWrapper extends Server { private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance; private final XdsClientPoolFactory xdsClientPoolFactory; private final XdsServingStatusListener listener; - private final AtomicReference filterChainSelectorRef; + private final FilterChainSelectorManager filterChainSelectorManager; private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean shutdown = new AtomicBoolean(false); private boolean isServing; @@ -117,11 +117,11 @@ final class XdsServerWrapper extends Server { String listenerAddress, ServerBuilder delegateBuilder, XdsServingStatusListener listener, - AtomicReference filterChainSelectorRef, + FilterChainSelectorManager filterChainSelectorManager, XdsClientPoolFactory xdsClientPoolFactory, FilterRegistry filterRegistry) { - this(listenerAddress, delegateBuilder, listener, filterChainSelectorRef, xdsClientPoolFactory, - filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE)); + this(listenerAddress, delegateBuilder, listener, filterChainSelectorManager, + xdsClientPoolFactory, filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE)); sharedTimeService = true; } @@ -130,7 +130,7 @@ final class XdsServerWrapper extends Server { String listenerAddress, ServerBuilder delegateBuilder, XdsServingStatusListener listener, - AtomicReference filterChainSelectorRef, + FilterChainSelectorManager filterChainSelectorManager, XdsClientPoolFactory xdsClientPoolFactory, FilterRegistry filterRegistry, ScheduledExecutorService timeService) { @@ -138,7 +138,8 @@ final class XdsServerWrapper extends Server { this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder"); this.delegateBuilder.intercept(new ConfigApplyingInterceptor()); this.listener = checkNotNull(listener, "listener"); - this.filterChainSelectorRef = checkNotNull(filterChainSelectorRef, "filterChainSelectorRef"); + this.filterChainSelectorManager + = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager"); this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); this.timeService = checkNotNull(timeService, "timeService"); this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry"); @@ -361,8 +362,8 @@ final class XdsServerWrapper extends Server { } checkNotNull(update.listener(), "update"); if (!pendingRds.isEmpty()) { - // filter chain state has not yet been applied to filterChainSelectorRef and there are - // two sets of sslContextProviderSuppliers, so we release the old ones. + // filter chain state has not yet been applied to filterChainSelectorManager and there + // are two sets of sslContextProviderSuppliers, so we release the old ones. releaseSuppliersInFlight(); pendingRds.clear(); } @@ -443,7 +444,7 @@ final class XdsServerWrapper extends Server { logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName); xdsClient.cancelLdsResourceWatch(resourceName, this); List toRelease = getSuppliersInUse(); - filterChainSelectorRef.set(FilterChainSelector.NO_FILTER_CHAIN); + filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN); for (SslContextProviderSupplier s: toRelease) { s.close(); } @@ -460,7 +461,7 @@ final class XdsServerWrapper extends Server { defaultFilterChain == null ? null : defaultFilterChain.getSslContextProviderSupplier(), defaultFilterChain == null ? null : generateRoutingConfig(defaultFilterChain)); List toRelease = getSuppliersInUse(); - filterChainSelectorRef.set(selector); + filterChainSelectorManager.updateSelector(selector); for (SslContextProviderSupplier e: toRelease) { e.close(); } @@ -482,7 +483,7 @@ final class XdsServerWrapper extends Server { private void handleConfigNotFound(StatusException exception) { cleanUpRouteDiscoveryStates(); List toRelease = getSuppliersInUse(); - filterChainSelectorRef.set(FilterChainSelector.NO_FILTER_CHAIN); + filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN); for (SslContextProviderSupplier s: toRelease) { s.close(); } @@ -511,7 +512,7 @@ final class XdsServerWrapper extends Server { private List getSuppliersInUse() { List toRelease = new ArrayList<>(); - FilterChainSelector selector = filterChainSelectorRef.get(); + FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector(); if (selector != null) { for (FilterChain f: selector.getRoutingConfigs().keySet()) { if (f.getSslContextProviderSupplier() != null) { diff --git a/xds/src/test/java/io/grpc/xds/FilterChainMatchingProtocolNegotiatorsTest.java b/xds/src/test/java/io/grpc/xds/FilterChainMatchingProtocolNegotiatorsTest.java index 167f3f03c6..891dec322c 100644 --- a/xds/src/test/java/io/grpc/xds/FilterChainMatchingProtocolNegotiatorsTest.java +++ b/xds/src/test/java/io/grpc/xds/FilterChainMatchingProtocolNegotiatorsTest.java @@ -65,6 +65,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -87,6 +88,7 @@ public class FilterChainMatchingProtocolNegotiatorsTest { private ChannelHandlerContext channelHandlerCtx; @Mock private ProtocolNegotiator mockDelegate; + private FilterChainSelectorManager selectorManager = new FilterChainSelectorManager(); private static final HttpConnectionManager HTTP_CONNECTION_MANAGER = createRds("routing-config"); private static final String LOCAL_IP = "10.1.2.3"; // dest private static final String REMOTE_IP = "10.4.2.3"; // source @@ -94,6 +96,16 @@ public class FilterChainMatchingProtocolNegotiatorsTest { private final ServerRoutingConfig noopConfig = ServerRoutingConfig.create( new ArrayList(), new AtomicReference>()); + @After + @SuppressWarnings("FutureReturnValueIgnored") + public void tearDown() { + if (channel.isActive()) { + channel.close(); + channel.runPendingTasks(); + } + assertThat(selectorManager.getRegisterCount()).isEqualTo(0); + } + @Test public void nofilterChainMatch_defaultSslContext() throws Exception { final SettableFuture sslSet = SettableFuture.create(); @@ -103,10 +115,10 @@ public class FilterChainMatchingProtocolNegotiatorsTest { SslContextProviderSupplier defaultSsl = new SslContextProviderSupplier(createTls(), tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( - new HashMap(), defaultSsl, noopConfig); + selectorManager.updateSelector(new FilterChainSelector( + new HashMap(), defaultSsl, noopConfig)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); setupChannel("172.168.1.1", "172.168.1.2", 80, filterChainMatchingHandler); ChannelHandlerContext channelHandlerCtx = pipeline.context(filterChainMatchingHandler); assertThat(channelHandlerCtx).isNotNull(); @@ -125,10 +137,10 @@ public class FilterChainMatchingProtocolNegotiatorsTest { @Test public void noFilterChainMatch_noDefaultSslContext() { - FilterChainSelector selector = new FilterChainSelector( - new HashMap(), null, null); + selectorManager.updateSelector(new FilterChainSelector( + new HashMap(), null, null)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); setupChannel("172.168.1.1", "172.168.2.2", 90, filterChainMatchingHandler); channelHandlerCtx = pipeline.context(filterChainMatchingHandler); assertThat(channelHandlerCtx).isNotNull(); @@ -139,6 +151,33 @@ public class FilterChainMatchingProtocolNegotiatorsTest { assertThat(channel.closeFuture().isDone()).isTrue(); } + @Test + public void filterSelectorChange_drainsConnection() { + ChannelHandler next = new ChannelInboundHandlerAdapter(); + when(mockDelegate.newHandler(grpcHandler)).thenReturn(next); + selectorManager.updateSelector(new FilterChainSelector( + new HashMap(), null, noopConfig)); + FilterChainMatchingHandler filterChainMatchingHandler = + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); + setupChannel("172.168.1.1", "172.168.2.2", 90, filterChainMatchingHandler); + channelHandlerCtx = pipeline.context(filterChainMatchingHandler); + assertThat(channelHandlerCtx).isNotNull(); + + pipeline.fireUserEventTriggered(event); + channelHandlerCtx = pipeline.context(filterChainMatchingHandler); + assertThat(channelHandlerCtx).isNull(); + + channel.runPendingTasks(); + channelHandlerCtx = pipeline.context(next); + assertThat(channelHandlerCtx).isNotNull(); + assertThat(channel.readOutbound()).isNull(); + + selectorManager.updateSelector(new FilterChainSelector( + new HashMap(), null, noopConfig)); + assertThat(channel.readOutbound().getClass().getName()) + .isEqualTo("io.grpc.netty.GracefulServerCloseCommand"); + } + @Test public void singleFilterChainWithoutAlpn() throws Exception { EnvoyServerProtoData.FilterChainMatch filterChainMatch = @@ -157,10 +196,10 @@ public class FilterChainMatchingProtocolNegotiatorsTest { "filter-chain-foo", filterChainMatch, HTTP_CONNECTION_MANAGER, tlsContext, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector(ImmutableMap.of(filterChain, noopConfig), - null, null); + selectorManager.updateSelector(new FilterChainSelector(ImmutableMap.of(filterChain, noopConfig), + null, null)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); ChannelHandler next = captureAttrHandler(sslSet, routingSettable); @@ -196,11 +235,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest { EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-bar", null, HTTP_CONNECTION_MANAGER, defaultTlsContext, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChain, randomConfig("no-match")), - defaultFilterChain.getSslContextProviderSupplier(), noopConfig); + defaultFilterChain.getSslContextProviderSupplier(), noopConfig)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -242,12 +281,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest { ServerRoutingConfig routingConfig = ServerRoutingConfig.create( new ArrayList(), new AtomicReference<>( ImmutableList.of(createVirtualHost("virtual")))); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainWithDestPort, routingConfig), - defaultFilterChain.getSslContextProviderSupplier(), noopConfig); + defaultFilterChain.getSslContextProviderSupplier(), noopConfig)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -285,12 +324,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest { "filter-chain-bar", null, HTTP_CONNECTION_MANAGER, tlsContextForDefaultFilterChain, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainWithMatch, noopConfig), - defaultFilterChain.getSslContextProviderSupplier(), randomConfig("no-match")); + defaultFilterChain.getSslContextProviderSupplier(), randomConfig("no-match"))); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -329,12 +368,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest { EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-bar", null, HTTP_CONNECTION_MANAGER, tlsContextForDefaultFilterChain, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainWithMismatch, randomConfig("no-match")), - defaultFilterChain.getSslContextProviderSupplier(), noopConfig); + defaultFilterChain.getSslContextProviderSupplier(), noopConfig)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -374,11 +413,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest { "filter-chain-bar", null, HTTP_CONNECTION_MANAGER, tlsContextForDefaultFilterChain, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChain0Length, noopConfig), - defaultFilterChain.getSslContextProviderSupplier(), null); + defaultFilterChain.getSslContextProviderSupplier(), null)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -431,13 +470,13 @@ public class FilterChainMatchingProtocolNegotiatorsTest { tlsContextManager); EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainLessSpecific, randomConfig("no-match"), filterChainMoreSpecific, noopConfig), - defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")); + defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"))); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -490,12 +529,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest { tlsContextManager); EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainLessSpecific, randomConfig("no-match"), filterChainMoreSpecific, noopConfig), - defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")); + defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"))); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -547,13 +586,13 @@ public class FilterChainMatchingProtocolNegotiatorsTest { tlsContextMoreSpecific, tlsContextManager); EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainLessSpecific, randomConfig("no-match"), filterChainMoreSpecific, noopConfig), - defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")); + defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"))); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -610,12 +649,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest { EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainMoreSpecificWith2, noopConfig, filterChainLessSpecific, randomConfig("no-match")), - defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")); + defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"))); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -653,11 +692,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest { EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-bar", null, HTTP_CONNECTION_MANAGER,tlsContextForDefaultFilterChain, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainWithMismatch, randomConfig("no-match")), - defaultFilterChain.getSslContextProviderSupplier(), noopConfig); + defaultFilterChain.getSslContextProviderSupplier(), noopConfig)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -698,11 +737,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest { "filter-chain-bar", null, HTTP_CONNECTION_MANAGER, tlsContextForDefaultFilterChain, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainWithMatch, noopConfig), - defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")); + defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"))); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); setupChannel(LOCAL_IP, LOCAL_IP, 15000, filterChainMatchingHandler); pipeline.fireUserEventTriggered(event); channel.runPendingTasks(); @@ -757,13 +796,13 @@ public class FilterChainMatchingProtocolNegotiatorsTest { EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainMoreSpecificWith2, noopConfig, filterChainLessSpecific, randomConfig("no-match")), - defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")); + defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"))); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); setupChannel(LOCAL_IP, REMOTE_IP, 15000, filterChainMatchingHandler); pipeline.fireUserEventTriggered(event); channel.runPendingTasks(); @@ -823,12 +862,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest { EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, null); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChain1, noopConfig, filterChain2, noopConfig), - defaultFilterChain.getSslContextProviderSupplier(), noopConfig); + defaultFilterChain.getSslContextProviderSupplier(), noopConfig)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); setupChannel(LOCAL_IP, REMOTE_IP, 15000, filterChainMatchingHandler); pipeline.fireUserEventTriggered(event); channel.runPendingTasks(); @@ -884,13 +923,13 @@ public class FilterChainMatchingProtocolNegotiatorsTest { EnvoyServerProtoData.FilterChain defaultFilterChain = new EnvoyServerProtoData.FilterChain( "filter-chain-baz", null, HTTP_CONNECTION_MANAGER, null, tlsContextManager); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChainEmptySourcePorts, randomConfig("no-match"), filterChainSourcePortMatch, noopConfig), - defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")); + defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"))); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); ChannelHandler next = captureAttrHandler(sslSet, routingSettable); @@ -1040,11 +1079,11 @@ public class FilterChainMatchingProtocolNegotiatorsTest { map.put(filterChain4, randomConfig("4")); map.put(filterChain5, noopConfig); map.put(filterChain6, randomConfig("6")); - FilterChainSelector selector = new FilterChainSelector( - map, defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default")); + selectorManager.updateSelector(new FilterChainSelector( + map, defaultFilterChain.getSslContextProviderSupplier(), randomConfig("default"))); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); @@ -1114,12 +1153,12 @@ public class FilterChainMatchingProtocolNegotiatorsTest { "filter-chain-baz", defaultFilterChainMatch, HTTP_CONNECTION_MANAGER, tlsContext3, mock(TlsContextManager.class)); - FilterChainSelector selector = new FilterChainSelector( + selectorManager.updateSelector(new FilterChainSelector( ImmutableMap.of(filterChain1, randomConfig("1"), filterChain2, randomConfig("2")), - defaultFilterChain.getSslContextProviderSupplier(), noopConfig); + defaultFilterChain.getSslContextProviderSupplier(), noopConfig)); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, selectorManager, mockDelegate); final SettableFuture sslSet = SettableFuture.create(); final SettableFuture routingSettable = SettableFuture.create(); ChannelHandler next = captureAttrHandler(sslSet, routingSettable); diff --git a/xds/src/test/java/io/grpc/xds/FilterChainSelectorManagerTest.java b/xds/src/test/java/io/grpc/xds/FilterChainSelectorManagerTest.java new file mode 100644 index 0000000000..d7b883f194 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/FilterChainSelectorManagerTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.collect.ImmutableList; +import io.grpc.xds.EnvoyServerProtoData.FilterChain; +import io.grpc.xds.Filter.NamedFilterConfig; +import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector; +import io.grpc.xds.FilterChainSelectorManager.Closer; +import io.grpc.xds.XdsServerWrapper.ServerRoutingConfig; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public final class FilterChainSelectorManagerTest { + private FilterChainSelectorManager manager = new FilterChainSelectorManager(); + private ServerRoutingConfig noopConfig = ServerRoutingConfig.create( + Collections.emptyList(), + new AtomicReference>()); + private FilterChainSelector selector1 = new FilterChainSelector( + Collections.emptyMap(), null, null); + private FilterChainSelector selector2 = new FilterChainSelector( + Collections.emptyMap(), null, noopConfig); + private CounterRunnable runnable1 = new CounterRunnable(); + private CounterRunnable runnable2 = new CounterRunnable(); + + @Test + public void updateSelector_changesSelector() { + assertThat(manager.getSelectorToUpdateSelector()).isNull(); + assertThat(manager.register(new Closer(runnable1))).isNull(); + + manager.updateSelector(selector1); + + assertThat(runnable1.counter).isEqualTo(1); + assertThat(manager.getSelectorToUpdateSelector()).isSameInstanceAs(selector1); + assertThat(manager.register(new Closer(runnable2))).isSameInstanceAs(selector1); + assertThat(runnable2.counter).isEqualTo(0); + } + + @Test + public void updateSelector_callsCloserOnce() { + assertThat(manager.register(new Closer(runnable1))).isNull(); + + manager.updateSelector(selector1); + manager.updateSelector(selector2); + + assertThat(runnable1.counter).isEqualTo(1); + } + + @Test + public void deregister_removesCloser() { + Closer closer1 = new Closer(runnable1); + manager.updateSelector(selector1); + assertThat(manager.register(closer1)).isSameInstanceAs(selector1); + assertThat(manager.getRegisterCount()).isEqualTo(1); + + manager.deregister(closer1); + + assertThat(manager.getRegisterCount()).isEqualTo(0); + manager.updateSelector(selector2); + assertThat(runnable1.counter).isEqualTo(0); + } + + @Test + public void deregister_removesCorrectCloser() { + Closer closer1 = new Closer(runnable1); + Closer closer2 = new Closer(runnable2); + manager.updateSelector(selector1); + assertThat(manager.register(closer1)).isSameInstanceAs(selector1); + assertThat(manager.register(closer2)).isSameInstanceAs(selector1); + assertThat(manager.getRegisterCount()).isEqualTo(2); + + manager.deregister(closer1); + + assertThat(manager.getRegisterCount()).isEqualTo(1); + manager.updateSelector(selector2); + assertThat(runnable1.counter).isEqualTo(0); + assertThat(runnable2.counter).isEqualTo(1); + } + + private static class CounterRunnable implements Runnable { + int counter; + + @Override public void run() { + counter++; + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java index 532cb282b2..1871cb7977 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java @@ -72,7 +72,6 @@ import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -101,7 +100,7 @@ public class XdsClientWrapperForServerSdsTestMisc { @Mock private XdsServingStatusListener listener; private FakeXdsClient xdsClient = new FakeXdsClient(); - private AtomicReference selectorRef = new AtomicReference<>(); + private FilterChainSelectorManager selectorManager = new FilterChainSelectorManager(); private XdsServerWrapper xdsServerWrapper; @@ -117,13 +116,14 @@ public class XdsClientWrapperForServerSdsTestMisc { when(mockBuilder.build()).thenReturn(mockServer); when(mockServer.isShutdown()).thenReturn(false); xdsServerWrapper = new XdsServerWrapper("0.0.0.0:" + PORT, mockBuilder, listener, - selectorRef, new FakeXdsClientPoolFactory(xdsClient), FilterRegistry.newRegistry()); + selectorManager, new FakeXdsClientPoolFactory(xdsClient), FilterRegistry.newRegistry()); } @Test public void nonInetSocketAddress_expectNull() throws Exception { sendListenerUpdate(new InProcessSocketAddress("test1"), null, null, tlsContextManager); - assertThat(getSslContextProviderSupplier(selectorRef.get())).isNull(); + assertThat(getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector())) + .isNull(); } @Test @@ -168,7 +168,7 @@ public class XdsClientWrapperForServerSdsTestMisc { LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener); xdsClient.ldsWatcher.onChanged(listenerUpdate); start.get(5, TimeUnit.SECONDS); - FilterChainSelector selector = selectorRef.get(); + FilterChainSelector selector = selectorManager.getSelectorToUpdateSelector(); assertThat(getSslContextProviderSupplier(selector)).isNull(); } @@ -193,7 +193,7 @@ public class XdsClientWrapperForServerSdsTestMisc { } catch (ExecutionException ex) { assertThat(ex.getCause()).isInstanceOf(IOException.class); } - assertThat(selectorRef.get()).isSameInstanceAs(NO_FILTER_CHAIN); + assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN); } @Test @@ -217,7 +217,7 @@ public class XdsClientWrapperForServerSdsTestMisc { } catch (ExecutionException ex) { assertThat(ex.getCause()).isInstanceOf(IOException.class); } - assertThat(selectorRef.get()).isSameInstanceAs(NO_FILTER_CHAIN); + assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN); } @Test @@ -241,7 +241,7 @@ public class XdsClientWrapperForServerSdsTestMisc { } catch (ExecutionException ex) { assertThat(ex.getCause()).isInstanceOf(IOException.class); } - assertThat(selectorRef.get()).isSameInstanceAs(NO_FILTER_CHAIN); + assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN); } @Test @@ -263,13 +263,14 @@ public class XdsClientWrapperForServerSdsTestMisc { localAddress = new InetSocketAddress(ipLocalAddress, PORT); sendListenerUpdate(localAddress, tlsContext1, null, tlsContextManager); - SslContextProviderSupplier returnedSupplier = getSslContextProviderSupplier(selectorRef.get()); + SslContextProviderSupplier returnedSupplier = + getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector()); assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext1); callUpdateSslContext(returnedSupplier); XdsServerTestHelper .generateListenerUpdate(xdsClient, Arrays.asList(1234), tlsContext2, tlsContext3, tlsContextManager); - returnedSupplier = getSslContextProviderSupplier(selectorRef.get()); + returnedSupplier = getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector()); assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext2); verify(tlsContextManager, times(1)).releaseServerSslContextProvider(eq(sslContextProvider1)); reset(tlsContextManager); @@ -294,7 +295,7 @@ public class XdsClientWrapperForServerSdsTestMisc { } }; pipeline = channel.pipeline(); - returnedSupplier = getSslContextProviderSupplier(selectorRef.get()); + returnedSupplier = getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector()); assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext3); callUpdateSslContext(returnedSupplier); xdsServerWrapper.shutdown(); @@ -314,7 +315,7 @@ public class XdsClientWrapperForServerSdsTestMisc { sendListenerUpdate(localAddress, tlsContext1, null, tlsContextManager); SslContextProviderSupplier returnedSupplier = - getSslContextProviderSupplier(selectorRef.get()); + getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector()); assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext1); callUpdateSslContext(returnedSupplier); xdsClient.ldsWatcher.onResourceDoesNotExist("not-found Error"); @@ -331,7 +332,7 @@ public class XdsClientWrapperForServerSdsTestMisc { sendListenerUpdate(localAddress, tlsContext1, null, tlsContextManager); SslContextProviderSupplier returnedSupplier = - getSslContextProviderSupplier(selectorRef.get()); + getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector()); assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext1); callUpdateSslContext(returnedSupplier); xdsClient.ldsWatcher.onError(Status.PERMISSION_DENIED); @@ -348,7 +349,7 @@ public class XdsClientWrapperForServerSdsTestMisc { sendListenerUpdate(localAddress, tlsContext1, null, tlsContextManager); SslContextProviderSupplier returnedSupplier = - getSslContextProviderSupplier(selectorRef.get()); + getSslContextProviderSupplier(selectorManager.getSelectorToUpdateSelector()); assertThat(returnedSupplier.getTlsContext()).isSameInstanceAs(tlsContext1); callUpdateSslContext(returnedSupplier); xdsClient.ldsWatcher.onError(Status.CANCELLED); @@ -412,8 +413,10 @@ public class XdsClientWrapperForServerSdsTestMisc { ProtocolNegotiator mockDelegate = mock(ProtocolNegotiator.class); GrpcHttp2ConnectionHandler grpcHandler = FakeGrpcHttp2ConnectionHandler.newHandler(); when(mockDelegate.newHandler(grpcHandler)).thenReturn(next); + FilterChainSelectorManager manager = new FilterChainSelectorManager(); + manager.updateSelector(selector); FilterChainMatchingHandler filterChainMatchingHandler = - new FilterChainMatchingHandler(grpcHandler, selector, mockDelegate); + new FilterChainMatchingHandler(grpcHandler, manager, mockDelegate); pipeline.addLast(filterChainMatchingHandler); ProtocolNegotiationEvent event = InternalProtocolNegotiationEvent.getDefault(); pipeline.fireUserEventTriggered(event); diff --git a/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java b/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java index 476dc10a16..0d15c1f660 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java @@ -281,4 +281,15 @@ public class XdsServerBuilderTest { assertThat(expected).hasMessageThat().contains("Server already built!"); } } + + @Test + public void drainGraceTime_negativeThrows() throws IOException { + buildBuilder(null); + try { + builder.drainGraceTime(-1, TimeUnit.SECONDS); + fail("exception expected"); + } catch (IllegalArgumentException expected) { + assertThat(expected).hasMessageThat().contains("drain grace time"); + } + } } diff --git a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java index 876b091374..d442136115 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java @@ -99,7 +99,7 @@ public class XdsServerWrapperTest { @Mock private XdsServingStatusListener listener; - private AtomicReference selectorRef = new AtomicReference<>(); + private FilterChainSelectorManager selectorManager = new FilterChainSelectorManager(); private FakeClock executor = new FakeClock(); private FakeXdsClient xdsClient = new FakeXdsClient(); private FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry(); @@ -109,7 +109,7 @@ public class XdsServerWrapperTest { public void setup() { when(mockBuilder.build()).thenReturn(mockServer); xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener, - selectorRef, new FakeXdsClientPoolFactory(xdsClient), + selectorManager, new FakeXdsClientPoolFactory(xdsClient), filterRegistry, executor.getScheduledExecutorService()); } @@ -141,7 +141,7 @@ public class XdsServerWrapperTest { XdsClient xdsClient = mock(XdsClient.class); when(xdsClient.getBootstrapInfo()).thenReturn(b); xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener, - selectorRef, new FakeXdsClientPoolFactory(xdsClient), filterRegistry); + selectorManager, new FakeXdsClientPoolFactory(xdsClient), filterRegistry); final SettableFuture start = SettableFuture.create(); Executors.newSingleThreadExecutor().execute(new Runnable() { @Override @@ -377,8 +377,10 @@ public class XdsServerWrapperTest { xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain), null); start.get(5000, TimeUnit.MILLISECONDS); assertThat(ldsWatched).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1"); - assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1); - ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(filterChain); + assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size()) + .isEqualTo(1); + ServerRoutingConfig realConfig = + selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(filterChain); assertThat(realConfig.virtualHosts().get()).isEqualTo(httpConnectionManager.virtualHosts()); assertThat(realConfig.httpFilterConfigs()).isEqualTo(httpConnectionManager.httpFilterConfigs()); verify(listener).onServing(); @@ -408,7 +410,7 @@ public class XdsServerWrapperTest { xdsClient.rdsCount = new CountDownLatch(3); xdsClient.deliverLdsUpdate(Arrays.asList(f0, f1), null); assertThat(start.isDone()).isFalse(); - assertThat(selectorRef.get()).isNull(); + assertThat(selectorManager.getSelectorToUpdateSelector()).isNull(); verify(mockServer, never()).start(); verify(listener, never()).onServing(); @@ -426,23 +428,26 @@ public class XdsServerWrapperTest { Collections.singletonList(createVirtualHost("virtual-host-2"))); start.get(5000, TimeUnit.MILLISECONDS); verify(mockServer).start(); - ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(f0); + ServerRoutingConfig realConfig = + selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f0); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-0"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f0.getHttpConnectionManager().httpFilterConfigs()); - assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(2); - realConfig = selectorRef.get().getRoutingConfigs().get(f2); + assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size()) + .isEqualTo(2); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f2); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-1"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f2.getHttpConnectionManager().httpFilterConfigs()); - realConfig = selectorRef.get().getDefaultRoutingConfig(); + realConfig = selectorManager.getSelectorToUpdateSelector().getDefaultRoutingConfig(); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-2"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f3.getHttpConnectionManager().httpFilterConfigs()); - assertThat(selectorRef.get().getDefaultSslContextProviderSupplier()).isEqualTo( + assertThat(selectorManager.getSelectorToUpdateSelector().getDefaultSslContextProviderSupplier()) + .isEqualTo( f3.getSslContextProviderSupplier()); } @@ -468,31 +473,32 @@ public class XdsServerWrapperTest { xdsClient.rdsCount = new CountDownLatch(1); xdsClient.deliverLdsUpdate(Arrays.asList(f0, f1), f2); assertThat(start.isDone()).isFalse(); - assertThat(selectorRef.get()).isNull(); + assertThat(selectorManager.getSelectorToUpdateSelector()).isNull(); xdsClient.rdsCount.await(5, TimeUnit.SECONDS); xdsClient.deliverRdsUpdate("r0", Collections.singletonList(createVirtualHost("virtual-host-0"))); start.get(5000, TimeUnit.MILLISECONDS); verify(mockServer, times(1)).start(); - ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(f0); + ServerRoutingConfig realConfig = + selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f0); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-0"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f0.getHttpConnectionManager().httpFilterConfigs()); - realConfig = selectorRef.get().getRoutingConfigs().get(f1); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-0"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f1.getHttpConnectionManager().httpFilterConfigs()); - realConfig = selectorRef.get().getDefaultRoutingConfig(); + realConfig = selectorManager.getSelectorToUpdateSelector().getDefaultRoutingConfig(); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-0"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f2.getHttpConnectionManager().httpFilterConfigs()); - assertThat(selectorRef.get().getDefaultSslContextProviderSupplier()).isSameInstanceAs( - f2.getSslContextProviderSupplier()); + assertThat(selectorManager.getSelectorToUpdateSelector().getDefaultSslContextProviderSupplier()) + .isSameInstanceAs(f2.getSslContextProviderSupplier()); EnvoyServerProtoData.FilterChain f3 = createFilterChain("filter-chain-3", createRds("r0")); EnvoyServerProtoData.FilterChain f4 = createFilterChain("filter-chain-4", createRds("r1")); @@ -505,24 +511,26 @@ public class XdsServerWrapperTest { xdsClient.deliverRdsUpdate("r0", Collections.singletonList(createVirtualHost("virtual-host-0"))); - assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(2); - realConfig = selectorRef.get().getRoutingConfigs().get(f5); + assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size()) + .isEqualTo(2); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f5); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-1"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f5.getHttpConnectionManager().httpFilterConfigs()); - realConfig = selectorRef.get().getRoutingConfigs().get(f3); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f3); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-0"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f3.getHttpConnectionManager().httpFilterConfigs()); - realConfig = selectorRef.get().getDefaultRoutingConfig(); + realConfig = selectorManager.getSelectorToUpdateSelector().getDefaultRoutingConfig(); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-1"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f4.getHttpConnectionManager().httpFilterConfigs()); - assertThat(selectorRef.get().getDefaultSslContextProviderSupplier()).isSameInstanceAs( + assertThat(selectorManager.getSelectorToUpdateSelector().getDefaultSslContextProviderSupplier()) + .isSameInstanceAs( f4.getSslContextProviderSupplier()); verify(mockServer, times(1)).start(); xdsServerWrapper.shutdown(); @@ -556,33 +564,35 @@ public class XdsServerWrapperTest { xdsClient.rdsCount.await(); xdsClient.rdsWatchers.get("r0").onError(Status.CANCELLED); start.get(5000, TimeUnit.MILLISECONDS); - assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(2); - ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(f1); + assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size()) + .isEqualTo(2); + ServerRoutingConfig realConfig = + selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1); assertThat(realConfig.virtualHosts().get()).isNull(); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f1.getHttpConnectionManager().httpFilterConfigs()); - realConfig = selectorRef.get().getRoutingConfigs().get(f0); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f0); assertThat(realConfig.virtualHosts().get()).isEqualTo(hcmVirtual.virtualHosts()); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f0.getHttpConnectionManager().httpFilterConfigs()); xdsClient.deliverRdsUpdate("r0", Collections.singletonList(createVirtualHost("virtual-host-1"))); - realConfig = selectorRef.get().getRoutingConfigs().get(f1); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-1"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f1.getHttpConnectionManager().httpFilterConfigs()); xdsClient.rdsWatchers.get("r0").onError(Status.CANCELLED); - realConfig = selectorRef.get().getRoutingConfigs().get(f1); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-1"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f1.getHttpConnectionManager().httpFilterConfigs()); xdsClient.rdsWatchers.get("r0").onResourceDoesNotExist("r0"); - realConfig = selectorRef.get().getRoutingConfigs().get(f1); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1); assertThat(realConfig.virtualHosts().get()).isNull(); assertThat(realConfig.httpFilterConfigs()).isEqualTo( f1.getHttpConnectionManager().httpFilterConfigs()); @@ -615,7 +625,8 @@ public class XdsServerWrapperTest { SslContextProviderSupplier sslSupplier0 = filterChain0.getSslContextProviderSupplier(); xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain0), null); xdsClient.ldsWatcher.onError(Status.INTERNAL); - assertThat(selectorRef.get()).isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN); + assertThat(selectorManager.getSelectorToUpdateSelector()) + .isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN); assertThat(xdsClient.rdsWatchers).isEmpty(); verify(mockBuilder, times(1)).build(); verify(listener, times(2)).onNotServing(any(StatusException.class)); @@ -634,8 +645,10 @@ public class XdsServerWrapperTest { verify(mockBuilder, times(1)).build(); verify(mockServer, times(2)).start(); verify(listener, times(1)).onServing(); - assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1); - ServerRoutingConfig realConfig = selectorRef.get().getRoutingConfigs().get(filterChain1); + assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size()) + .isEqualTo(1); + ServerRoutingConfig realConfig = + selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(filterChain1); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-1"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( @@ -648,8 +661,10 @@ public class XdsServerWrapperTest { verify(mockBuilder, times(1)).build(); verify(mockServer, times(2)).start(); verify(listener, times(2)).onNotServing(any(StatusException.class)); - assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1); - realConfig = selectorRef.get().getRoutingConfigs().get(filterChain1); + assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size()) + .isEqualTo(1); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs() + .get(filterChain1); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-2"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( @@ -661,7 +676,8 @@ public class XdsServerWrapperTest { assertThat(xdsClient.rdsWatchers).isEmpty(); verify(mockServer, times(3)).shutdown(); when(mockServer.isShutdown()).thenReturn(true); - assertThat(selectorRef.get()).isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN); + assertThat(selectorManager.getSelectorToUpdateSelector()) + .isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN); verify(listener, times(3)).onNotServing(any(StatusException.class)); assertThat(sslSupplier1.isShutdown()).isTrue(); // no op @@ -686,8 +702,10 @@ public class XdsServerWrapperTest { verify(mockServer, times(3)).start(); verify(listener, times(1)).onServing(); verify(listener, times(3)).onNotServing(any(StatusException.class)); - assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1); - realConfig = selectorRef.get().getRoutingConfigs().get(filterChain2); + assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size()) + .isEqualTo(1); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs() + .get(filterChain2); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-1"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo( @@ -712,8 +730,10 @@ public class XdsServerWrapperTest { when(mockServer.isShutdown()).thenReturn(false); verify(listener, times(4)).onNotServing(any(StatusException.class)); - assertThat(selectorRef.get().getRoutingConfigs().size()).isEqualTo(1); - realConfig = selectorRef.get().getRoutingConfigs().get(filterChain3); + assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size()) + .isEqualTo(1); + realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs() + .get(filterChain3); assertThat(realConfig.virtualHosts().get()).isEqualTo( Collections.singletonList(createVirtualHost("virtual-host-1"))); assertThat(realConfig.httpFilterConfigs()).isEqualTo(