From a2896051b880b91da677558ff2ae98a2cb6f51a2 Mon Sep 17 00:00:00 2001 From: sanjaypujare Date: Wed, 25 Mar 2020 10:14:29 -0700 Subject: [PATCH] xds: integration of XdsClientImpl with XdsServerBuilder to deliver Listener updates (#6838) --- .../io/grpc/xds/EnvoyServerProtoData.java | 9 +- .../xds/XdsClientWrapperForServerSds.java | 296 ++++++++++++++++++ .../internal/sds/SdsProtocolNegotiators.java | 106 +++++-- .../xds/internal/sds/XdsServerBuilder.java | 40 ++- .../xds/XdsClientWrapperForServerSdsTest.java | 225 +++++++++++++ .../XdsClientWrapperForServerSdsTestMisc.java | 125 ++++++++ .../sds/SdsProtocolNegotiatorsTest.java | 39 ++- 7 files changed, 806 insertions(+), 34 deletions(-) create mode 100644 xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java create mode 100644 xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTest.java create mode 100644 xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java diff --git a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java index 21bfe93732..34e03479ac 100644 --- a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java +++ b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java @@ -91,7 +91,8 @@ final class EnvoyServerProtoData { private final List prefixRanges; private final List applicationProtocols; - private FilterChainMatch(int destinationPort, + @VisibleForTesting + FilterChainMatch(int destinationPort, List prefixRanges, List applicationProtocols) { this.destinationPort = destinationPort; this.prefixRanges = Collections.unmodifiableList(prefixRanges); @@ -164,7 +165,8 @@ final class EnvoyServerProtoData { // TODO(sanjaypujare): remove dependency on envoy data type along with rest of the code. private final io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext downstreamTlsContext; - private FilterChain(FilterChainMatch filterChainMatch, + @VisibleForTesting + FilterChain(FilterChainMatch filterChainMatch, io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext downstreamTlsContext) { this.filterChainMatch = filterChainMatch; this.downstreamTlsContext = downstreamTlsContext; @@ -223,7 +225,8 @@ final class EnvoyServerProtoData { private final String address; private final List filterChains; - private Listener(String name, String address, + @VisibleForTesting + Listener(String name, String address, List filterChains) { this.name = name; this.address = address; diff --git a/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java b/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java new file mode 100644 index 0000000000..593294eb38 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java @@ -0,0 +1,296 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext; +import io.envoyproxy.envoy.api.v2.core.Node; +import io.grpc.Internal; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.internal.ExponentialBackoffPolicy; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SharedResourceHolder; +import io.grpc.xds.EnvoyServerProtoData.CidrRange; +import io.grpc.xds.EnvoyServerProtoData.FilterChain; +import io.grpc.xds.EnvoyServerProtoData.FilterChainMatch; +import io.netty.channel.Channel; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * Serves as a wrapper for {@link XdsClientImpl} used on the server side by {@link + * io.grpc.xds.internal.sds.XdsServerBuilder}. + */ +@Internal +public final class XdsClientWrapperForServerSds { + private static final Logger logger = + Logger.getLogger(XdsClientWrapperForServerSds.class.getName()); + + private static final TimeServiceResource timeServiceResource = + new TimeServiceResource("GrpcServerXdsClient"); + + private EnvoyServerProtoData.Listener curListener; + // TODO(sanjaypujare): implement shutting down XdsServer which will need xdsClient reference + @SuppressWarnings("unused") + @Nullable private XdsClient xdsClient; + private final int port; + private final ScheduledExecutorService timeService; + + /** + * Factory method for creating a {@link XdsClientWrapperForServerSds}. + * + * @param port server's port for which listener config is needed. + * @param bootstrapper {@link Bootstrapper} instance to load bootstrap config. + * @param syncContext {@link SynchronizationContext} needed by {@link XdsClient}. + */ + public static XdsClientWrapperForServerSds newInstance( + int port, Bootstrapper bootstrapper, SynchronizationContext syncContext) throws IOException { + Bootstrapper.BootstrapInfo bootstrapInfo = bootstrapper.readBootstrap(); + final List serverList = bootstrapInfo.getServers(); + if (serverList.isEmpty()) { + throw new NoSuchElementException("No management server provided by bootstrap"); + } + final Node node = bootstrapInfo.getNode(); + ScheduledExecutorService timeService = SharedResourceHolder.get(timeServiceResource); + XdsClientImpl xdsClientImpl = + new XdsClientImpl( + "", + serverList, + XdsClient.XdsChannelFactory.getInstance(), + node, + syncContext, + timeService, + new ExponentialBackoffPolicy.Provider(), + GrpcUtil.STOPWATCH_SUPPLIER); + return new XdsClientWrapperForServerSds(port, xdsClientImpl, timeService); + } + + @VisibleForTesting + XdsClientWrapperForServerSds(int port, XdsClient xdsClient, + ScheduledExecutorService timeService) { + this.port = port; + this.xdsClient = xdsClient; + this.timeService = timeService; + xdsClient.watchListenerData( + port, + new XdsClient.ListenerWatcher() { + @Override + public void onListenerChanged(XdsClient.ListenerUpdate update) { + logger.log( + Level.INFO, + "Setting myListener from ConfigUpdate listener :{0}", + update.getListener().toString()); + curListener = update.getListener(); + } + + @Override + public void onError(Status error) { + // In order to distinguish between IO error and resource not found, set curListener + // to null in case of NOT_FOUND + if (error.getCode().equals(Status.Code.NOT_FOUND)) { + curListener = null; + } + // TODO(sanjaypujare): Implement logic for other cases based on final design. + logger.log(Level.SEVERE, "ListenerWatcher in XdsClientWrapperForServerSds:{0}", error); + } + }); + } + + /** + * Locates the best matching FilterChain to the channel from the current listener and if found + * returns the DownstreamTlsContext from that FilterChain, else null. + */ + @Nullable + public DownstreamTlsContext getDownstreamTlsContext(Channel channel) { + if (curListener != null && channel != null) { + SocketAddress localAddress = channel.localAddress(); + checkState( + localAddress instanceof InetSocketAddress, + "Channel localAddress is expected to be InetSocketAddress"); + InetSocketAddress localInetAddr = (InetSocketAddress) localAddress; + checkState( + port == localInetAddr.getPort(), + "Channel localAddress port does not match requested listener port"); + List filterChains = curListener.getFilterChains(); + FilterChainComparator comparator = new FilterChainComparator(localInetAddr); + FilterChain bestMatch = + filterChains.isEmpty() ? null : Collections.max(filterChains, comparator); + if (bestMatch != null && comparator.isMatching(bestMatch.getFilterChainMatch())) { + return bestMatch.getDownstreamTlsContext(); + } + } + return null; + } + + private static final class FilterChainComparator implements Comparator { + private InetSocketAddress localAddress; + + private enum Match { + NO_MATCH, + EMPTY_PREFIX_RANGE_MATCH, + IPANY_MATCH, + EXACT_ADDRESS_MATCH + } + + private FilterChainComparator(InetSocketAddress localAddress) { + checkNotNull(localAddress, "localAddress cannot be null"); + this.localAddress = localAddress; + } + + @Override + public int compare(FilterChain first, FilterChain second) { + checkNotNull(first, "first arg cannot be null"); + checkNotNull(second, "second arg cannot be null"); + FilterChainMatch firstMatch = first.getFilterChainMatch(); + FilterChainMatch secondMatch = second.getFilterChainMatch(); + + if (firstMatch == null) { + return (secondMatch == null) ? 0 : (isMatching(secondMatch) ? -1 : 1); + } else { + return (secondMatch == null) + ? (isMatching(firstMatch) ? 1 : -1) + : compare(firstMatch, secondMatch); + } + } + + private int compare(FilterChainMatch first, FilterChainMatch second) { + int channelPort = localAddress.getPort(); + + if (first.getDestinationPort() == channelPort) { + return (second.getDestinationPort() == channelPort) + ? compare(first.getPrefixRanges(), second.getPrefixRanges()) + : (isInetAddressMatching(first.getPrefixRanges()) ? 1 : 0); + } else { + return (second.getDestinationPort() == channelPort) + ? (isInetAddressMatching(second.getPrefixRanges()) ? -1 : 0) + : 0; + } + } + + private int compare(List first, List second) { + return getInetAddressMatch(first).ordinal() - getInetAddressMatch(second).ordinal(); + } + + private boolean isInetAddressMatching(List prefixRanges) { + return getInetAddressMatch(prefixRanges).ordinal() > Match.NO_MATCH.ordinal(); + } + + private Match getInetAddressMatch(List prefixRanges) { + if (prefixRanges == null || prefixRanges.isEmpty()) { + return Match.EMPTY_PREFIX_RANGE_MATCH; + } + InetAddress localInetAddress = localAddress.getAddress(); + for (CidrRange cidrRange : prefixRanges) { + if (cidrRange.getPrefixLen() == 32) { + try { + InetAddress cidrAddr = InetAddress.getByName(cidrRange.getAddressPrefix()); + if (cidrAddr.isAnyLocalAddress()) { + return Match.IPANY_MATCH; + } + if (cidrAddr.equals(localInetAddress)) { + return Match.EXACT_ADDRESS_MATCH; + } + } catch (UnknownHostException e) { + logger.log(Level.WARNING, "cidrRange address parsing", e); + // continue + } + } + // TODO(sanjaypujare): implement prefix match logic as needed + } + return Match.NO_MATCH; + } + + private boolean isMatching(FilterChainMatch filterChainMatch) { + if (filterChainMatch == null) { + return true; + } + int destPort = filterChainMatch.getDestinationPort(); + if (destPort != localAddress.getPort()) { + return false; + } + return isInetAddressMatching(filterChainMatch.getPrefixRanges()); + } + } + + /** Shutdown this instance and release resources. */ + public void shutdown() { + logger.log(Level.FINER, "Shutdown"); + if (xdsClient != null) { + xdsClient.shutdown(); + } + if (timeService != null) { + timeServiceResource.close(timeService); + } + } + + private static final class TimeServiceResource + implements SharedResourceHolder.Resource { + + private final String name; + + TimeServiceResource(String name) { + this.name = name; + } + + @Override + public ScheduledExecutorService create() { + // Use Netty's DefaultThreadFactory in order to get the benefit of FastThreadLocal. + ThreadFactory threadFactory = new DefaultThreadFactory(name, /* daemon= */ true); + if (Epoll.isAvailable()) { + return new EpollEventLoopGroup(1, threadFactory); + } else { + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void close(ScheduledExecutorService instance) { + try { + if (instance instanceof EpollEventLoopGroup) { + ((EpollEventLoopGroup)instance).shutdownGracefully(0, 0, TimeUnit.SECONDS).sync(); + } else { + instance.shutdown(); + } + } catch (InterruptedException e) { + logger.log(Level.SEVERE, "Interrupted during shutdown", e); + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/sds/SdsProtocolNegotiators.java b/xds/src/main/java/io/grpc/xds/internal/sds/SdsProtocolNegotiators.java index 9f09aba546..0cd3315ff7 100644 --- a/xds/src/main/java/io/grpc/xds/internal/sds/SdsProtocolNegotiators.java +++ b/xds/src/main/java/io/grpc/xds/internal/sds/SdsProtocolNegotiators.java @@ -21,20 +21,26 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext; import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext; +import io.grpc.SynchronizationContext; import io.grpc.netty.GrpcHttp2ConnectionHandler; import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory; +import io.grpc.netty.InternalProtocolNegotiationEvent; import io.grpc.netty.InternalProtocolNegotiator; import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator; import io.grpc.netty.InternalProtocolNegotiators; import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.ProtocolNegotiationEvent; +import io.grpc.xds.Bootstrapper; import io.grpc.xds.XdsAttributes; +import io.grpc.xds.XdsClientWrapperForServerSds; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslContext; import io.netty.util.AsciiString; + import java.util.ArrayList; import java.util.List; import java.util.logging.Level; @@ -55,7 +61,6 @@ final class SdsProtocolNegotiators { * Returns a {@link ProtocolNegotiatorFactory} to be used on {@link NettyChannelBuilder}. Passing * {@code null} for upstreamTlsContext will fall back to plaintext. */ - // TODO (sanjaypujare) integrate with xDS client to get upstreamTlsContext from CDS public static ProtocolNegotiatorFactory clientProtocolNegotiatorFactory( @Nullable UpstreamTlsContext upstreamTlsContext) { return new ClientSdsProtocolNegotiatorFactory(upstreamTlsContext); @@ -64,17 +69,20 @@ final class SdsProtocolNegotiators { /** * Creates an SDS based {@link ProtocolNegotiator} for a {@link io.grpc.netty.NettyServerBuilder}. * Passing {@code null} for downstreamTlsContext will fall back to plaintext. + * If xDS returns no DownstreamTlsContext, it will fall back to plaintext. + * + * @param downstreamTlsContext passed in {@link XdsServerBuilder#tlsContext}. + * @param port the listening port passed to {@link XdsServerBuilder#forPort(int)}. */ - // TODO (sanjaypujare) integrate with xDS client to get LDS public static ProtocolNegotiator serverProtocolNegotiator( - @Nullable DownstreamTlsContext downstreamTlsContext) { - return new ServerSdsProtocolNegotiator(downstreamTlsContext); + @Nullable DownstreamTlsContext downstreamTlsContext, int port, + SynchronizationContext syncContext) { + return new ServerSdsProtocolNegotiator(downstreamTlsContext, port, syncContext); } private static final class ClientSdsProtocolNegotiatorFactory implements InternalNettyChannelBuilder.ProtocolNegotiatorFactory { - // TODO (sanjaypujare) integrate with xDS client to get upstreamTlsContext from CDS private final UpstreamTlsContext upstreamTlsContext; ClientSdsProtocolNegotiatorFactory(UpstreamTlsContext upstreamTlsContext) { @@ -245,12 +253,21 @@ final class SdsProtocolNegotiators { private static final class ServerSdsProtocolNegotiator implements ProtocolNegotiator { - // TODO (sanjaypujare) integrate with xDS client to get LDS. LDS watcher will - // inject/update the downstreamTlsContext from LDS private DownstreamTlsContext downstreamTlsContext; + private final XdsClientWrapperForServerSds xdsClientWrapperForServerSds; - ServerSdsProtocolNegotiator(DownstreamTlsContext downstreamTlsContext) { + ServerSdsProtocolNegotiator( + DownstreamTlsContext downstreamTlsContext, int port, SynchronizationContext syncContext) { this.downstreamTlsContext = downstreamTlsContext; + XdsClientWrapperForServerSds localXdsClientWrapperForServerSds; + try { + localXdsClientWrapperForServerSds = + XdsClientWrapperForServerSds.newInstance(port, Bootstrapper.getInstance(), syncContext); + } catch (Exception e) { + logger.log(Level.WARNING, "Exception while creating the xDS client", e); + localXdsClientWrapperForServerSds = null; + } + this.xdsClientWrapperForServerSds = localXdsClientWrapperForServerSds; } @Override @@ -260,28 +277,77 @@ final class SdsProtocolNegotiators { @Override public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { - if (isTlsContextEmpty(downstreamTlsContext)) { - return InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHandler); - } - return new ServerSdsHandler(grpcHandler, downstreamTlsContext); - } - - private static boolean isTlsContextEmpty(DownstreamTlsContext downstreamTlsContext) { - return downstreamTlsContext == null || !downstreamTlsContext.hasCommonTlsContext(); + return new HandlerPickerHandler(grpcHandler, downstreamTlsContext, + xdsClientWrapperForServerSds); } @Override public void close() {} } + @VisibleForTesting + static final class HandlerPickerHandler + extends ChannelInboundHandlerAdapter { + private final GrpcHttp2ConnectionHandler grpcHandler; + private final DownstreamTlsContext downstreamTlsContextFromBuilder; + private final XdsClientWrapperForServerSds xdsClientWrapperForServerSds; + + HandlerPickerHandler( + GrpcHttp2ConnectionHandler grpcHandler, + DownstreamTlsContext downstreamTlsContext, + XdsClientWrapperForServerSds xdsClientWrapperForServerSds) { + checkNotNull(grpcHandler, "grpcHandler"); + this.grpcHandler = grpcHandler; + this.downstreamTlsContextFromBuilder = downstreamTlsContext; + this.xdsClientWrapperForServerSds = xdsClientWrapperForServerSds; + } + + private static boolean isTlsContextEmpty(DownstreamTlsContext downstreamTlsContext) { + return downstreamTlsContext == null || !downstreamTlsContext.hasCommonTlsContext(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof ProtocolNegotiationEvent) { + DownstreamTlsContext downstreamTlsContext = + xdsClientWrapperForServerSds == null + ? null + : xdsClientWrapperForServerSds.getDownstreamTlsContext(ctx.channel()); + if (isTlsContextEmpty(downstreamTlsContext)) { + downstreamTlsContext = downstreamTlsContextFromBuilder; + } + if (isTlsContextEmpty(downstreamTlsContext)) { + logger.log(Level.INFO, "Fallback to plaintext for {0}", ctx.channel().localAddress()); + ctx.pipeline() + .replace( + this, + null, + InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHandler)); + ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault(); + ctx.fireUserEventTriggered(pne); + return; + } else { + ctx.pipeline() + .replace(this, null, new ServerSdsHandler(grpcHandler, downstreamTlsContext)); + ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault(); + ctx.fireUserEventTriggered(pne); + return; + } + } else { + super.userEventTriggered(ctx, evt); + } + } + } + @VisibleForTesting static final class ServerSdsHandler - extends InternalProtocolNegotiators.ProtocolNegotiationHandler { + extends InternalProtocolNegotiators.ProtocolNegotiationHandler { private final GrpcHttp2ConnectionHandler grpcHandler; private final DownstreamTlsContext downstreamTlsContext; ServerSdsHandler( - GrpcHttp2ConnectionHandler grpcHandler, DownstreamTlsContext downstreamTlsContext) { + GrpcHttp2ConnectionHandler grpcHandler, + DownstreamTlsContext downstreamTlsContext) { super( // superclass (InternalProtocolNegotiators.ProtocolNegotiationHandler) expects 'next' // handler but we don't have a next handler _yet_. So we "disable" superclass's behavior @@ -303,8 +369,8 @@ final class SdsProtocolNegotiators { ctx.pipeline().addBefore(ctx.name(), null, bufferReads); final SslContextProvider sslContextProvider = - TlsContextManagerImpl.getInstance() - .findOrCreateServerSslContextProvider(downstreamTlsContext); + TlsContextManagerImpl.getInstance() + .findOrCreateServerSslContextProvider(downstreamTlsContext); sslContextProvider.addCallback( new SslContextProvider.Callback() { diff --git a/xds/src/main/java/io/grpc/xds/internal/sds/XdsServerBuilder.java b/xds/src/main/java/io/grpc/xds/internal/sds/XdsServerBuilder.java index d3611f0c49..aa9d072c7d 100644 --- a/xds/src/main/java/io/grpc/xds/internal/sds/XdsServerBuilder.java +++ b/xds/src/main/java/io/grpc/xds/internal/sds/XdsServerBuilder.java @@ -21,17 +21,22 @@ import io.grpc.BindableService; import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.HandlerRegistry; +import io.grpc.InternalLogId; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerInterceptor; import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.ServerTransportFilter; +import io.grpc.SynchronizationContext; import io.grpc.netty.NettyServerBuilder; + import java.io.File; import java.net.InetSocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; /** @@ -39,14 +44,18 @@ import javax.annotation.Nullable; * with peers. Note, this is not ready to use yet. */ public final class XdsServerBuilder extends ServerBuilder { + private static final Logger logger = + Logger.getLogger(XdsServerBuilder.class.getName()); private final NettyServerBuilder delegate; + private final int port; // TODO (sanjaypujare) integrate with xDS client to get downstreamTlsContext from LDS @Nullable private DownstreamTlsContext downstreamTlsContext; - private XdsServerBuilder(NettyServerBuilder nettyDelegate) { + private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) { this.delegate = nettyDelegate; + this.port = port; } @Override @@ -132,14 +141,39 @@ public final class XdsServerBuilder extends ServerBuilder { /** Creates a gRPC server builder for the given port. */ public static XdsServerBuilder forPort(int port) { NettyServerBuilder nettyDelegate = NettyServerBuilder.forAddress(new InetSocketAddress(port)); - return new XdsServerBuilder(nettyDelegate); + return new XdsServerBuilder(nettyDelegate, port); } @Override public Server build() { // note: doing it in build() will overwrite any previously set ProtocolNegotiator + final InternalLogId logId = InternalLogId.allocate("XdsServerBuilder", Integer.toString(port)); + SynchronizationContext syncContext = + new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + // needed by syncContext + private boolean panicMode; + + @Override + public void uncaughtException(Thread t, Throwable e) { + logger.log( + Level.SEVERE, + "[" + logId + "] Uncaught exception in the SynchronizationContext. Panic!", + e); + panic(e); + } + + void panic(final Throwable t) { + if (panicMode) { + // Preserve the first panic information + return; + } + panicMode = true; + } + }); delegate.protocolNegotiator( - SdsProtocolNegotiators.serverProtocolNegotiator(this.downstreamTlsContext)); + SdsProtocolNegotiators.serverProtocolNegotiator( + this.downstreamTlsContext, port, syncContext)); return delegate.build(); } } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTest.java b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTest.java new file mode 100644 index 0000000000..5fd7bf055c --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTest.java @@ -0,0 +1,225 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.base.Strings; +import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext; +import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; +import io.netty.channel.Channel; +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link XdsClientWrapperForServerSds}. */ +@RunWith(Parameterized.class) +public class XdsClientWrapperForServerSdsTest { + + private static final int PORT = 7000; + + /** Iterable of various configurations to use for tests. */ + @Parameterized.Parameters(name = "{6}") + public static Iterable data() { + return Arrays.asList( + new Object[][] { + { + -1, // creates null filterChainMatch for filter1 + null, + null, + "192.168.10.1", + "192.168.10.2", + 1, + "null filter chain match, expect filter1" + }, + { + PORT + 1, + "192.168.10.1", + "192.168.10.2", + null, + null, + 2, + "only dest port match, expect filter2" + }, + { + PORT, // matches dest port + "168.20.20.2", + "10.1.2.3", // matches local address + "192.168.10.1", + "192.168.10.2", + 1, + "dest port & address match, expect filter1" + }, + { + -1, // creates null filterChainMatch for filter1 + null, + null, + null, // empty address range for filter2 + null, // empty address range for filter2 + 2, + "empty address range over empty filterChainMatch, expect filter2" + }, + { + PORT, + null, + null, + "192.168.1.4", + "0.0.0.0", // IPANY for filter2 + 2, + "IPANY over empty address match, expect filter2" + }, + { + PORT, + "192.168.1.4", + "0.0.0.0", // IPANY for filter1 + "168.154.4.7", + "10.1.2.3", // matches local address + 2, + "exact IP over IPANY match, expect filter2" + }, + { + PORT,// matches dest port but no address match + "168.20.20.2", + "10.1.2.4", + "192.168.10.1", + "192.168.10.2", + 0, + "dest port match but no address match, expect null" + } + }); + } + + @Parameter(0) + public int destPort1; + @Parameter(1) + public String addressPrefix11; + @Parameter(2) + public String addressPrefix12; + @Parameter(3) + public String addressPrefix21; + @Parameter(4) + public String addressPrefix22; + @Parameter(5) + public int expectedIndex; + @Parameter(6) + public String testName; + + @Mock private XdsClient xdsClient; + @Mock private Channel channel; + + private XdsClientWrapperForServerSds xdsClientWrapperForServerSds; + private DownstreamTlsContext[] tlsContexts = new DownstreamTlsContext[3]; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + xdsClientWrapperForServerSds = new XdsClientWrapperForServerSds(PORT, xdsClient, null); + tlsContexts[0] = null; + tlsContexts[1] = CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("CERT1", "VA1"); + tlsContexts[2] = CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("CERT2", "VA2"); + } + + /** + * Common method called by most tests. Creates 2 filterChains each with 2 addresses. First + * filterChain's destPort is always PORT. + */ + @Test + public void commonFilterChainMatchTest() + throws UnknownHostException { + ArgumentCaptor listenerWatcherCaptor = ArgumentCaptor.forClass(null); + verify(xdsClient).watchListenerData(eq(PORT), listenerWatcherCaptor.capture()); + XdsClient.ListenerWatcher registeredWatcher = listenerWatcherCaptor.getValue(); + InetAddress ipLocalAddress = Inet4Address.getByName("10.1.2.3"); + InetSocketAddress localAddress = new InetSocketAddress(ipLocalAddress, PORT); + when(channel.localAddress()).thenReturn(localAddress); + EnvoyServerProtoData.Listener listener = + buildTestListener( + "listener1", + "10.1.2.3", + destPort1, + PORT, + addressPrefix11, + addressPrefix12, + addressPrefix21, + addressPrefix22, + tlsContexts[1], + tlsContexts[2]); + XdsClient.ListenerUpdate listenerUpdate = + XdsClient.ListenerUpdate.newBuilder().setListener(listener).build(); + registeredWatcher.onListenerChanged(listenerUpdate); + DownstreamTlsContext downstreamTlsContext = + xdsClientWrapperForServerSds.getDownstreamTlsContext(channel); + assertThat(downstreamTlsContext).isSameInstanceAs(tlsContexts[expectedIndex]); + } + + static EnvoyServerProtoData.Listener buildTestListener( + String name, + String address, + int destPort1, + int destPort2, + String addressPrefix11, + String addressPrefix12, + String addressPrefix21, + String addressPrefix22, + DownstreamTlsContext tlsContext1, + DownstreamTlsContext tlsContext2) { + EnvoyServerProtoData.FilterChainMatch filterChainMatch1 = + destPort1 > 0 ? buildFilterChainMatch(destPort1, addressPrefix11, addressPrefix12) : null; + EnvoyServerProtoData.FilterChainMatch filterChainMatch2 = + destPort2 > 0 ? buildFilterChainMatch(destPort2, addressPrefix21, addressPrefix22) : null; + EnvoyServerProtoData.FilterChain filterChain1 = + new EnvoyServerProtoData.FilterChain(filterChainMatch1, tlsContext1); + EnvoyServerProtoData.FilterChain filterChain2 = + new EnvoyServerProtoData.FilterChain(filterChainMatch2, tlsContext2); + EnvoyServerProtoData.Listener listener = + new EnvoyServerProtoData.Listener(name, address, Arrays.asList(filterChain1, filterChain2)); + return listener; + } + + static EnvoyServerProtoData.FilterChainMatch buildFilterChainMatch( + int destPort, String... addressPrefix) { + ArrayList prefixRanges = new ArrayList<>(); + for (String address : addressPrefix) { + if (!Strings.isNullOrEmpty(address)) { + prefixRanges.add(new EnvoyServerProtoData.CidrRange(address, 32)); + } + } + return new EnvoyServerProtoData.FilterChainMatch( + destPort, prefixRanges, Arrays.asList()); + } + + static EnvoyServerProtoData.FilterChainMatch buildFilterChainMatch( + int destPort, EnvoyServerProtoData.CidrRange... prefixRanges) { + return new EnvoyServerProtoData.FilterChainMatch( + destPort, Arrays.asList(prefixRanges), Arrays.asList()); + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java new file mode 100644 index 0000000000..86b9944997 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java @@ -0,0 +1,125 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext; +import io.grpc.inprocess.InProcessSocketAddress; +import io.netty.channel.Channel; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.Collections; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link XdsClientWrapperForServerSds}. */ +@RunWith(JUnit4.class) +public class XdsClientWrapperForServerSdsTestMisc { + + private static final int PORT = 7000; + + @Mock private XdsClient xdsClient; + @Mock private Channel channel; + + private XdsClientWrapperForServerSds xdsClientWrapperForServerSds; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + xdsClientWrapperForServerSds = new XdsClientWrapperForServerSds(PORT, xdsClient, null); + } + + @Test + public void verifyListenerWatcherRegistered() { + verify(xdsClient, times(1)).watchListenerData(eq(PORT), any(XdsClient.ListenerWatcher.class)); + } + + @Test + public void nonInetSocketAddress_expectException() { + try { + DownstreamTlsContext unused = + commonTestPrep(new InProcessSocketAddress("test1")); + fail("exception expected"); + } catch (IllegalStateException expected) { + assertThat(expected) + .hasMessageThat() + .isEqualTo("Channel localAddress is expected to be InetSocketAddress"); + } + } + + @Test + public void nonMatchingPort_expectException() throws UnknownHostException { + try { + InetAddress ipLocalAddress = InetAddress.getByName("10.1.2.3"); + InetSocketAddress localAddress = new InetSocketAddress(ipLocalAddress, PORT + 1); + DownstreamTlsContext unused = commonTestPrep(localAddress); + fail("exception expected"); + } catch (IllegalStateException expected) { + assertThat(expected) + .hasMessageThat() + .isEqualTo("Channel localAddress port does not match requested listener port"); + } + } + + @Test + public void emptyFilterChain_expectNull() throws UnknownHostException { + InetAddress ipLocalAddress = InetAddress.getByName("10.1.2.3"); + InetSocketAddress localAddress = new InetSocketAddress(ipLocalAddress, PORT); + ArgumentCaptor listenerWatcherCaptor = ArgumentCaptor.forClass(null); + verify(xdsClient).watchListenerData(eq(PORT), listenerWatcherCaptor.capture()); + XdsClient.ListenerWatcher registeredWatcher = listenerWatcherCaptor.getValue(); + when(channel.localAddress()).thenReturn(localAddress); + EnvoyServerProtoData.Listener listener = + new EnvoyServerProtoData.Listener("listener1", + "10.1.2.3", Collections.emptyList()); + XdsClient.ListenerUpdate listenerUpdate = + XdsClient.ListenerUpdate.newBuilder().setListener(listener).build(); + registeredWatcher.onListenerChanged(listenerUpdate); + DownstreamTlsContext tlsContext = xdsClientWrapperForServerSds.getDownstreamTlsContext(channel); + assertThat(tlsContext).isNull(); + } + + private DownstreamTlsContext commonTestPrep(SocketAddress localAddress) { + ArgumentCaptor listenerWatcherCaptor = ArgumentCaptor.forClass(null); + verify(xdsClient).watchListenerData(eq(PORT), listenerWatcherCaptor.capture()); + XdsClient.ListenerWatcher registeredWatcher = listenerWatcherCaptor.getValue(); + when(channel.localAddress()).thenReturn(localAddress); + EnvoyServerProtoData.Listener listener = + XdsClientWrapperForServerSdsTest.buildTestListener( + "listener1", "10.1.2.3", PORT, PORT, null, null, null, null, null, null); + XdsClient.ListenerUpdate listenerUpdate = + XdsClient.ListenerUpdate.newBuilder().setListener(listener).build(); + registeredWatcher.onListenerChanged(listenerUpdate); + return xdsClientWrapperForServerSds.getDownstreamTlsContext(channel); + } +} diff --git a/xds/src/test/java/io/grpc/xds/internal/sds/SdsProtocolNegotiatorsTest.java b/xds/src/test/java/io/grpc/xds/internal/sds/SdsProtocolNegotiatorsTest.java index f1678fba85..a72ef5056d 100644 --- a/xds/src/test/java/io/grpc/xds/internal/sds/SdsProtocolNegotiatorsTest.java +++ b/xds/src/test/java/io/grpc/xds/internal/sds/SdsProtocolNegotiatorsTest.java @@ -189,19 +189,23 @@ public class SdsProtocolNegotiatorsTest { DownstreamTlsContext downstreamTlsContext = buildDownstreamTlsContextFromFilenames(SERVER_1_KEY_FILE, SERVER_1_PEM_FILE, CA_PEM_FILE); - SdsProtocolNegotiators.ServerSdsHandler serverSdsHandler = - new SdsProtocolNegotiators.ServerSdsHandler(grpcHandler, downstreamTlsContext); - pipeline.addLast(serverSdsHandler); - channelHandlerCtx = pipeline.context(serverSdsHandler); - assertNotNull(channelHandlerCtx); // serverSdsHandler ctx is non-null since we just added it + SdsProtocolNegotiators.HandlerPickerHandler handlerPickerHandler = + new SdsProtocolNegotiators.HandlerPickerHandler(grpcHandler, downstreamTlsContext, null); + pipeline.addLast(handlerPickerHandler); + channelHandlerCtx = pipeline.context(handlerPickerHandler); + assertThat(channelHandlerCtx).isNotNull(); // should find HandlerPickerHandler - // kick off protocol negotiation + // kick off protocol negotiation: should replace HandlerPickerHandler with ServerSdsHandler pipeline.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault()); + channelHandlerCtx = pipeline.context(handlerPickerHandler); + assertThat(channelHandlerCtx).isNull(); + channelHandlerCtx = pipeline.context(SdsProtocolNegotiators.ServerSdsHandler.class); + assertThat(channelHandlerCtx).isNotNull(); channel.runPendingTasks(); // need this for tasks to execute on eventLoop - channelHandlerCtx = pipeline.context(serverSdsHandler); + channelHandlerCtx = pipeline.context(SdsProtocolNegotiators.ServerSdsHandler.class); assertThat(channelHandlerCtx).isNull(); - // pipeline should have SslHandler and ServerTlsHandler + // pipeline should only have SslHandler and ServerTlsHandler Iterator> iterator = pipeline.iterator(); assertThat(iterator.next().getValue()).isInstanceOf(SslHandler.class); // ProtocolNegotiators.ServerTlsHandler.class is not accessible, get canonical name @@ -209,6 +213,25 @@ public class SdsProtocolNegotiatorsTest { .contains("ProtocolNegotiators.ServerTlsHandler"); } + @Test + public void serverSdsHandler_nullTlsContext_expectPlaintext() throws IOException { + SdsProtocolNegotiators.HandlerPickerHandler handlerPickerHandler = + new SdsProtocolNegotiators.HandlerPickerHandler(grpcHandler, null, null); + pipeline.addLast(handlerPickerHandler); + channelHandlerCtx = pipeline.context(handlerPickerHandler); + assertThat(channelHandlerCtx).isNotNull(); // should find HandlerPickerHandler + + // kick off protocol negotiation + pipeline.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault()); + channelHandlerCtx = pipeline.context(handlerPickerHandler); + assertThat(channelHandlerCtx).isNull(); + channel.runPendingTasks(); // need this for tasks to execute on eventLoop + Iterator> iterator = pipeline.iterator(); + assertThat(iterator.next().getValue()).isInstanceOf(FakeGrpcHttp2ConnectionHandler.class); + // no more handlers in the pipeline + assertThat(iterator.hasNext()).isFalse(); + } + @Test public void clientSdsProtocolNegotiatorNewHandler_fireProtocolNegotiationEvent() throws IOException, InterruptedException {