xds: integration of XdsClientImpl with XdsServerBuilder to deliver Listener updates (#6838)

This commit is contained in:
sanjaypujare 2020-03-25 10:14:29 -07:00 committed by GitHub
parent 47b6b390bb
commit a2896051b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 806 additions and 34 deletions

View File

@ -91,7 +91,8 @@ final class EnvoyServerProtoData {
private final List<CidrRange> prefixRanges;
private final List<String> applicationProtocols;
private FilterChainMatch(int destinationPort,
@VisibleForTesting
FilterChainMatch(int destinationPort,
List<CidrRange> prefixRanges, List<String> applicationProtocols) {
this.destinationPort = destinationPort;
this.prefixRanges = Collections.unmodifiableList(prefixRanges);
@ -164,7 +165,8 @@ final class EnvoyServerProtoData {
// TODO(sanjaypujare): remove dependency on envoy data type along with rest of the code.
private final io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext downstreamTlsContext;
private FilterChain(FilterChainMatch filterChainMatch,
@VisibleForTesting
FilterChain(FilterChainMatch filterChainMatch,
io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext downstreamTlsContext) {
this.filterChainMatch = filterChainMatch;
this.downstreamTlsContext = downstreamTlsContext;
@ -223,7 +225,8 @@ final class EnvoyServerProtoData {
private final String address;
private final List<FilterChain> filterChains;
private Listener(String name, String address,
@VisibleForTesting
Listener(String name, String address,
List<FilterChain> filterChains) {
this.name = name;
this.address = address;

View File

@ -0,0 +1,296 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.grpc.Internal;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.xds.EnvoyServerProtoData.CidrRange;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
import io.grpc.xds.EnvoyServerProtoData.FilterChainMatch;
import io.netty.channel.Channel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Serves as a wrapper for {@link XdsClientImpl} used on the server side by {@link
* io.grpc.xds.internal.sds.XdsServerBuilder}.
*/
@Internal
public final class XdsClientWrapperForServerSds {
private static final Logger logger =
Logger.getLogger(XdsClientWrapperForServerSds.class.getName());
private static final TimeServiceResource timeServiceResource =
new TimeServiceResource("GrpcServerXdsClient");
private EnvoyServerProtoData.Listener curListener;
// TODO(sanjaypujare): implement shutting down XdsServer which will need xdsClient reference
@SuppressWarnings("unused")
@Nullable private XdsClient xdsClient;
private final int port;
private final ScheduledExecutorService timeService;
/**
* Factory method for creating a {@link XdsClientWrapperForServerSds}.
*
* @param port server's port for which listener config is needed.
* @param bootstrapper {@link Bootstrapper} instance to load bootstrap config.
* @param syncContext {@link SynchronizationContext} needed by {@link XdsClient}.
*/
public static XdsClientWrapperForServerSds newInstance(
int port, Bootstrapper bootstrapper, SynchronizationContext syncContext) throws IOException {
Bootstrapper.BootstrapInfo bootstrapInfo = bootstrapper.readBootstrap();
final List<Bootstrapper.ServerInfo> serverList = bootstrapInfo.getServers();
if (serverList.isEmpty()) {
throw new NoSuchElementException("No management server provided by bootstrap");
}
final Node node = bootstrapInfo.getNode();
ScheduledExecutorService timeService = SharedResourceHolder.get(timeServiceResource);
XdsClientImpl xdsClientImpl =
new XdsClientImpl(
"",
serverList,
XdsClient.XdsChannelFactory.getInstance(),
node,
syncContext,
timeService,
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);
return new XdsClientWrapperForServerSds(port, xdsClientImpl, timeService);
}
@VisibleForTesting
XdsClientWrapperForServerSds(int port, XdsClient xdsClient,
ScheduledExecutorService timeService) {
this.port = port;
this.xdsClient = xdsClient;
this.timeService = timeService;
xdsClient.watchListenerData(
port,
new XdsClient.ListenerWatcher() {
@Override
public void onListenerChanged(XdsClient.ListenerUpdate update) {
logger.log(
Level.INFO,
"Setting myListener from ConfigUpdate listener :{0}",
update.getListener().toString());
curListener = update.getListener();
}
@Override
public void onError(Status error) {
// In order to distinguish between IO error and resource not found, set curListener
// to null in case of NOT_FOUND
if (error.getCode().equals(Status.Code.NOT_FOUND)) {
curListener = null;
}
// TODO(sanjaypujare): Implement logic for other cases based on final design.
logger.log(Level.SEVERE, "ListenerWatcher in XdsClientWrapperForServerSds:{0}", error);
}
});
}
/**
* Locates the best matching FilterChain to the channel from the current listener and if found
* returns the DownstreamTlsContext from that FilterChain, else null.
*/
@Nullable
public DownstreamTlsContext getDownstreamTlsContext(Channel channel) {
if (curListener != null && channel != null) {
SocketAddress localAddress = channel.localAddress();
checkState(
localAddress instanceof InetSocketAddress,
"Channel localAddress is expected to be InetSocketAddress");
InetSocketAddress localInetAddr = (InetSocketAddress) localAddress;
checkState(
port == localInetAddr.getPort(),
"Channel localAddress port does not match requested listener port");
List<FilterChain> filterChains = curListener.getFilterChains();
FilterChainComparator comparator = new FilterChainComparator(localInetAddr);
FilterChain bestMatch =
filterChains.isEmpty() ? null : Collections.max(filterChains, comparator);
if (bestMatch != null && comparator.isMatching(bestMatch.getFilterChainMatch())) {
return bestMatch.getDownstreamTlsContext();
}
}
return null;
}
private static final class FilterChainComparator implements Comparator<FilterChain> {
private InetSocketAddress localAddress;
private enum Match {
NO_MATCH,
EMPTY_PREFIX_RANGE_MATCH,
IPANY_MATCH,
EXACT_ADDRESS_MATCH
}
private FilterChainComparator(InetSocketAddress localAddress) {
checkNotNull(localAddress, "localAddress cannot be null");
this.localAddress = localAddress;
}
@Override
public int compare(FilterChain first, FilterChain second) {
checkNotNull(first, "first arg cannot be null");
checkNotNull(second, "second arg cannot be null");
FilterChainMatch firstMatch = first.getFilterChainMatch();
FilterChainMatch secondMatch = second.getFilterChainMatch();
if (firstMatch == null) {
return (secondMatch == null) ? 0 : (isMatching(secondMatch) ? -1 : 1);
} else {
return (secondMatch == null)
? (isMatching(firstMatch) ? 1 : -1)
: compare(firstMatch, secondMatch);
}
}
private int compare(FilterChainMatch first, FilterChainMatch second) {
int channelPort = localAddress.getPort();
if (first.getDestinationPort() == channelPort) {
return (second.getDestinationPort() == channelPort)
? compare(first.getPrefixRanges(), second.getPrefixRanges())
: (isInetAddressMatching(first.getPrefixRanges()) ? 1 : 0);
} else {
return (second.getDestinationPort() == channelPort)
? (isInetAddressMatching(second.getPrefixRanges()) ? -1 : 0)
: 0;
}
}
private int compare(List<CidrRange> first, List<CidrRange> second) {
return getInetAddressMatch(first).ordinal() - getInetAddressMatch(second).ordinal();
}
private boolean isInetAddressMatching(List<CidrRange> prefixRanges) {
return getInetAddressMatch(prefixRanges).ordinal() > Match.NO_MATCH.ordinal();
}
private Match getInetAddressMatch(List<CidrRange> prefixRanges) {
if (prefixRanges == null || prefixRanges.isEmpty()) {
return Match.EMPTY_PREFIX_RANGE_MATCH;
}
InetAddress localInetAddress = localAddress.getAddress();
for (CidrRange cidrRange : prefixRanges) {
if (cidrRange.getPrefixLen() == 32) {
try {
InetAddress cidrAddr = InetAddress.getByName(cidrRange.getAddressPrefix());
if (cidrAddr.isAnyLocalAddress()) {
return Match.IPANY_MATCH;
}
if (cidrAddr.equals(localInetAddress)) {
return Match.EXACT_ADDRESS_MATCH;
}
} catch (UnknownHostException e) {
logger.log(Level.WARNING, "cidrRange address parsing", e);
// continue
}
}
// TODO(sanjaypujare): implement prefix match logic as needed
}
return Match.NO_MATCH;
}
private boolean isMatching(FilterChainMatch filterChainMatch) {
if (filterChainMatch == null) {
return true;
}
int destPort = filterChainMatch.getDestinationPort();
if (destPort != localAddress.getPort()) {
return false;
}
return isInetAddressMatching(filterChainMatch.getPrefixRanges());
}
}
/** Shutdown this instance and release resources. */
public void shutdown() {
logger.log(Level.FINER, "Shutdown");
if (xdsClient != null) {
xdsClient.shutdown();
}
if (timeService != null) {
timeServiceResource.close(timeService);
}
}
private static final class TimeServiceResource
implements SharedResourceHolder.Resource<ScheduledExecutorService> {
private final String name;
TimeServiceResource(String name) {
this.name = name;
}
@Override
public ScheduledExecutorService create() {
// Use Netty's DefaultThreadFactory in order to get the benefit of FastThreadLocal.
ThreadFactory threadFactory = new DefaultThreadFactory(name, /* daemon= */ true);
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(1, threadFactory);
} else {
return Executors.newSingleThreadScheduledExecutor(threadFactory);
}
}
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void close(ScheduledExecutorService instance) {
try {
if (instance instanceof EpollEventLoopGroup) {
((EpollEventLoopGroup)instance).shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
} else {
instance.shutdown();
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "Interrupted during shutdown", e);
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -21,20 +21,26 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext;
import io.grpc.SynchronizationContext;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory;
import io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.ProtocolNegotiationEvent;
import io.grpc.xds.Bootstrapper;
import io.grpc.xds.XdsAttributes;
import io.grpc.xds.XdsClientWrapperForServerSds;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.ssl.SslContext;
import io.netty.util.AsciiString;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
@ -55,7 +61,6 @@ final class SdsProtocolNegotiators {
* Returns a {@link ProtocolNegotiatorFactory} to be used on {@link NettyChannelBuilder}. Passing
* {@code null} for upstreamTlsContext will fall back to plaintext.
*/
// TODO (sanjaypujare) integrate with xDS client to get upstreamTlsContext from CDS
public static ProtocolNegotiatorFactory clientProtocolNegotiatorFactory(
@Nullable UpstreamTlsContext upstreamTlsContext) {
return new ClientSdsProtocolNegotiatorFactory(upstreamTlsContext);
@ -64,17 +69,20 @@ final class SdsProtocolNegotiators {
/**
* Creates an SDS based {@link ProtocolNegotiator} for a {@link io.grpc.netty.NettyServerBuilder}.
* Passing {@code null} for downstreamTlsContext will fall back to plaintext.
* If xDS returns no DownstreamTlsContext, it will fall back to plaintext.
*
* @param downstreamTlsContext passed in {@link XdsServerBuilder#tlsContext}.
* @param port the listening port passed to {@link XdsServerBuilder#forPort(int)}.
*/
// TODO (sanjaypujare) integrate with xDS client to get LDS
public static ProtocolNegotiator serverProtocolNegotiator(
@Nullable DownstreamTlsContext downstreamTlsContext) {
return new ServerSdsProtocolNegotiator(downstreamTlsContext);
@Nullable DownstreamTlsContext downstreamTlsContext, int port,
SynchronizationContext syncContext) {
return new ServerSdsProtocolNegotiator(downstreamTlsContext, port, syncContext);
}
private static final class ClientSdsProtocolNegotiatorFactory
implements InternalNettyChannelBuilder.ProtocolNegotiatorFactory {
// TODO (sanjaypujare) integrate with xDS client to get upstreamTlsContext from CDS
private final UpstreamTlsContext upstreamTlsContext;
ClientSdsProtocolNegotiatorFactory(UpstreamTlsContext upstreamTlsContext) {
@ -245,12 +253,21 @@ final class SdsProtocolNegotiators {
private static final class ServerSdsProtocolNegotiator implements ProtocolNegotiator {
// TODO (sanjaypujare) integrate with xDS client to get LDS. LDS watcher will
// inject/update the downstreamTlsContext from LDS
private DownstreamTlsContext downstreamTlsContext;
private final XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
ServerSdsProtocolNegotiator(DownstreamTlsContext downstreamTlsContext) {
ServerSdsProtocolNegotiator(
DownstreamTlsContext downstreamTlsContext, int port, SynchronizationContext syncContext) {
this.downstreamTlsContext = downstreamTlsContext;
XdsClientWrapperForServerSds localXdsClientWrapperForServerSds;
try {
localXdsClientWrapperForServerSds =
XdsClientWrapperForServerSds.newInstance(port, Bootstrapper.getInstance(), syncContext);
} catch (Exception e) {
logger.log(Level.WARNING, "Exception while creating the xDS client", e);
localXdsClientWrapperForServerSds = null;
}
this.xdsClientWrapperForServerSds = localXdsClientWrapperForServerSds;
}
@Override
@ -260,10 +277,29 @@ final class SdsProtocolNegotiators {
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
if (isTlsContextEmpty(downstreamTlsContext)) {
return InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHandler);
return new HandlerPickerHandler(grpcHandler, downstreamTlsContext,
xdsClientWrapperForServerSds);
}
return new ServerSdsHandler(grpcHandler, downstreamTlsContext);
@Override
public void close() {}
}
@VisibleForTesting
static final class HandlerPickerHandler
extends ChannelInboundHandlerAdapter {
private final GrpcHttp2ConnectionHandler grpcHandler;
private final DownstreamTlsContext downstreamTlsContextFromBuilder;
private final XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
HandlerPickerHandler(
GrpcHttp2ConnectionHandler grpcHandler,
DownstreamTlsContext downstreamTlsContext,
XdsClientWrapperForServerSds xdsClientWrapperForServerSds) {
checkNotNull(grpcHandler, "grpcHandler");
this.grpcHandler = grpcHandler;
this.downstreamTlsContextFromBuilder = downstreamTlsContext;
this.xdsClientWrapperForServerSds = xdsClientWrapperForServerSds;
}
private static boolean isTlsContextEmpty(DownstreamTlsContext downstreamTlsContext) {
@ -271,7 +307,36 @@ final class SdsProtocolNegotiators {
}
@Override
public void close() {}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ProtocolNegotiationEvent) {
DownstreamTlsContext downstreamTlsContext =
xdsClientWrapperForServerSds == null
? null
: xdsClientWrapperForServerSds.getDownstreamTlsContext(ctx.channel());
if (isTlsContextEmpty(downstreamTlsContext)) {
downstreamTlsContext = downstreamTlsContextFromBuilder;
}
if (isTlsContextEmpty(downstreamTlsContext)) {
logger.log(Level.INFO, "Fallback to plaintext for {0}", ctx.channel().localAddress());
ctx.pipeline()
.replace(
this,
null,
InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHandler));
ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault();
ctx.fireUserEventTriggered(pne);
return;
} else {
ctx.pipeline()
.replace(this, null, new ServerSdsHandler(grpcHandler, downstreamTlsContext));
ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault();
ctx.fireUserEventTriggered(pne);
return;
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
@VisibleForTesting
@ -281,7 +346,8 @@ final class SdsProtocolNegotiators {
private final DownstreamTlsContext downstreamTlsContext;
ServerSdsHandler(
GrpcHttp2ConnectionHandler grpcHandler, DownstreamTlsContext downstreamTlsContext) {
GrpcHttp2ConnectionHandler grpcHandler,
DownstreamTlsContext downstreamTlsContext) {
super(
// superclass (InternalProtocolNegotiators.ProtocolNegotiationHandler) expects 'next'
// handler but we don't have a next handler _yet_. So we "disable" superclass's behavior

View File

@ -21,17 +21,22 @@ import io.grpc.BindableService;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.InternalLogId;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
import io.grpc.ServerTransportFilter;
import io.grpc.SynchronizationContext;
import io.grpc.netty.NettyServerBuilder;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
@ -39,14 +44,18 @@ import javax.annotation.Nullable;
* with peers. Note, this is not ready to use yet.
*/
public final class XdsServerBuilder extends ServerBuilder<XdsServerBuilder> {
private static final Logger logger =
Logger.getLogger(XdsServerBuilder.class.getName());
private final NettyServerBuilder delegate;
private final int port;
// TODO (sanjaypujare) integrate with xDS client to get downstreamTlsContext from LDS
@Nullable private DownstreamTlsContext downstreamTlsContext;
private XdsServerBuilder(NettyServerBuilder nettyDelegate) {
private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) {
this.delegate = nettyDelegate;
this.port = port;
}
@Override
@ -132,14 +141,39 @@ public final class XdsServerBuilder extends ServerBuilder<XdsServerBuilder> {
/** Creates a gRPC server builder for the given port. */
public static XdsServerBuilder forPort(int port) {
NettyServerBuilder nettyDelegate = NettyServerBuilder.forAddress(new InetSocketAddress(port));
return new XdsServerBuilder(nettyDelegate);
return new XdsServerBuilder(nettyDelegate, port);
}
@Override
public Server build() {
// note: doing it in build() will overwrite any previously set ProtocolNegotiator
final InternalLogId logId = InternalLogId.allocate("XdsServerBuilder", Integer.toString(port));
SynchronizationContext syncContext =
new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
// needed by syncContext
private boolean panicMode;
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.log(
Level.SEVERE,
"[" + logId + "] Uncaught exception in the SynchronizationContext. Panic!",
e);
panic(e);
}
void panic(final Throwable t) {
if (panicMode) {
// Preserve the first panic information
return;
}
panicMode = true;
}
});
delegate.protocolNegotiator(
SdsProtocolNegotiators.serverProtocolNegotiator(this.downstreamTlsContext));
SdsProtocolNegotiators.serverProtocolNegotiator(
this.downstreamTlsContext, port, syncContext));
return delegate.build();
}
}

View File

@ -0,0 +1,225 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.base.Strings;
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import io.netty.channel.Channel;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** Tests for {@link XdsClientWrapperForServerSds}. */
@RunWith(Parameterized.class)
public class XdsClientWrapperForServerSdsTest {
private static final int PORT = 7000;
/** Iterable of various configurations to use for tests. */
@Parameterized.Parameters(name = "{6}")
public static Iterable<Object[]> data() {
return Arrays.asList(
new Object[][] {
{
-1, // creates null filterChainMatch for filter1
null,
null,
"192.168.10.1",
"192.168.10.2",
1,
"null filter chain match, expect filter1"
},
{
PORT + 1,
"192.168.10.1",
"192.168.10.2",
null,
null,
2,
"only dest port match, expect filter2"
},
{
PORT, // matches dest port
"168.20.20.2",
"10.1.2.3", // matches local address
"192.168.10.1",
"192.168.10.2",
1,
"dest port & address match, expect filter1"
},
{
-1, // creates null filterChainMatch for filter1
null,
null,
null, // empty address range for filter2
null, // empty address range for filter2
2,
"empty address range over empty filterChainMatch, expect filter2"
},
{
PORT,
null,
null,
"192.168.1.4",
"0.0.0.0", // IPANY for filter2
2,
"IPANY over empty address match, expect filter2"
},
{
PORT,
"192.168.1.4",
"0.0.0.0", // IPANY for filter1
"168.154.4.7",
"10.1.2.3", // matches local address
2,
"exact IP over IPANY match, expect filter2"
},
{
PORT,// matches dest port but no address match
"168.20.20.2",
"10.1.2.4",
"192.168.10.1",
"192.168.10.2",
0,
"dest port match but no address match, expect null"
}
});
}
@Parameter(0)
public int destPort1;
@Parameter(1)
public String addressPrefix11;
@Parameter(2)
public String addressPrefix12;
@Parameter(3)
public String addressPrefix21;
@Parameter(4)
public String addressPrefix22;
@Parameter(5)
public int expectedIndex;
@Parameter(6)
public String testName;
@Mock private XdsClient xdsClient;
@Mock private Channel channel;
private XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
private DownstreamTlsContext[] tlsContexts = new DownstreamTlsContext[3];
@Before
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
xdsClientWrapperForServerSds = new XdsClientWrapperForServerSds(PORT, xdsClient, null);
tlsContexts[0] = null;
tlsContexts[1] = CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("CERT1", "VA1");
tlsContexts[2] = CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("CERT2", "VA2");
}
/**
* Common method called by most tests. Creates 2 filterChains each with 2 addresses. First
* filterChain's destPort is always PORT.
*/
@Test
public void commonFilterChainMatchTest()
throws UnknownHostException {
ArgumentCaptor<XdsClient.ListenerWatcher> listenerWatcherCaptor = ArgumentCaptor.forClass(null);
verify(xdsClient).watchListenerData(eq(PORT), listenerWatcherCaptor.capture());
XdsClient.ListenerWatcher registeredWatcher = listenerWatcherCaptor.getValue();
InetAddress ipLocalAddress = Inet4Address.getByName("10.1.2.3");
InetSocketAddress localAddress = new InetSocketAddress(ipLocalAddress, PORT);
when(channel.localAddress()).thenReturn(localAddress);
EnvoyServerProtoData.Listener listener =
buildTestListener(
"listener1",
"10.1.2.3",
destPort1,
PORT,
addressPrefix11,
addressPrefix12,
addressPrefix21,
addressPrefix22,
tlsContexts[1],
tlsContexts[2]);
XdsClient.ListenerUpdate listenerUpdate =
XdsClient.ListenerUpdate.newBuilder().setListener(listener).build();
registeredWatcher.onListenerChanged(listenerUpdate);
DownstreamTlsContext downstreamTlsContext =
xdsClientWrapperForServerSds.getDownstreamTlsContext(channel);
assertThat(downstreamTlsContext).isSameInstanceAs(tlsContexts[expectedIndex]);
}
static EnvoyServerProtoData.Listener buildTestListener(
String name,
String address,
int destPort1,
int destPort2,
String addressPrefix11,
String addressPrefix12,
String addressPrefix21,
String addressPrefix22,
DownstreamTlsContext tlsContext1,
DownstreamTlsContext tlsContext2) {
EnvoyServerProtoData.FilterChainMatch filterChainMatch1 =
destPort1 > 0 ? buildFilterChainMatch(destPort1, addressPrefix11, addressPrefix12) : null;
EnvoyServerProtoData.FilterChainMatch filterChainMatch2 =
destPort2 > 0 ? buildFilterChainMatch(destPort2, addressPrefix21, addressPrefix22) : null;
EnvoyServerProtoData.FilterChain filterChain1 =
new EnvoyServerProtoData.FilterChain(filterChainMatch1, tlsContext1);
EnvoyServerProtoData.FilterChain filterChain2 =
new EnvoyServerProtoData.FilterChain(filterChainMatch2, tlsContext2);
EnvoyServerProtoData.Listener listener =
new EnvoyServerProtoData.Listener(name, address, Arrays.asList(filterChain1, filterChain2));
return listener;
}
static EnvoyServerProtoData.FilterChainMatch buildFilterChainMatch(
int destPort, String... addressPrefix) {
ArrayList<EnvoyServerProtoData.CidrRange> prefixRanges = new ArrayList<>();
for (String address : addressPrefix) {
if (!Strings.isNullOrEmpty(address)) {
prefixRanges.add(new EnvoyServerProtoData.CidrRange(address, 32));
}
}
return new EnvoyServerProtoData.FilterChainMatch(
destPort, prefixRanges, Arrays.<String>asList());
}
static EnvoyServerProtoData.FilterChainMatch buildFilterChainMatch(
int destPort, EnvoyServerProtoData.CidrRange... prefixRanges) {
return new EnvoyServerProtoData.FilterChainMatch(
destPort, Arrays.asList(prefixRanges), Arrays.<String>asList());
}
}

View File

@ -0,0 +1,125 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
import io.grpc.inprocess.InProcessSocketAddress;
import io.netty.channel.Channel;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** Tests for {@link XdsClientWrapperForServerSds}. */
@RunWith(JUnit4.class)
public class XdsClientWrapperForServerSdsTestMisc {
private static final int PORT = 7000;
@Mock private XdsClient xdsClient;
@Mock private Channel channel;
private XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
@Before
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
xdsClientWrapperForServerSds = new XdsClientWrapperForServerSds(PORT, xdsClient, null);
}
@Test
public void verifyListenerWatcherRegistered() {
verify(xdsClient, times(1)).watchListenerData(eq(PORT), any(XdsClient.ListenerWatcher.class));
}
@Test
public void nonInetSocketAddress_expectException() {
try {
DownstreamTlsContext unused =
commonTestPrep(new InProcessSocketAddress("test1"));
fail("exception expected");
} catch (IllegalStateException expected) {
assertThat(expected)
.hasMessageThat()
.isEqualTo("Channel localAddress is expected to be InetSocketAddress");
}
}
@Test
public void nonMatchingPort_expectException() throws UnknownHostException {
try {
InetAddress ipLocalAddress = InetAddress.getByName("10.1.2.3");
InetSocketAddress localAddress = new InetSocketAddress(ipLocalAddress, PORT + 1);
DownstreamTlsContext unused = commonTestPrep(localAddress);
fail("exception expected");
} catch (IllegalStateException expected) {
assertThat(expected)
.hasMessageThat()
.isEqualTo("Channel localAddress port does not match requested listener port");
}
}
@Test
public void emptyFilterChain_expectNull() throws UnknownHostException {
InetAddress ipLocalAddress = InetAddress.getByName("10.1.2.3");
InetSocketAddress localAddress = new InetSocketAddress(ipLocalAddress, PORT);
ArgumentCaptor<XdsClient.ListenerWatcher> listenerWatcherCaptor = ArgumentCaptor.forClass(null);
verify(xdsClient).watchListenerData(eq(PORT), listenerWatcherCaptor.capture());
XdsClient.ListenerWatcher registeredWatcher = listenerWatcherCaptor.getValue();
when(channel.localAddress()).thenReturn(localAddress);
EnvoyServerProtoData.Listener listener =
new EnvoyServerProtoData.Listener("listener1",
"10.1.2.3", Collections.<EnvoyServerProtoData.FilterChain>emptyList());
XdsClient.ListenerUpdate listenerUpdate =
XdsClient.ListenerUpdate.newBuilder().setListener(listener).build();
registeredWatcher.onListenerChanged(listenerUpdate);
DownstreamTlsContext tlsContext = xdsClientWrapperForServerSds.getDownstreamTlsContext(channel);
assertThat(tlsContext).isNull();
}
private DownstreamTlsContext commonTestPrep(SocketAddress localAddress) {
ArgumentCaptor<XdsClient.ListenerWatcher> listenerWatcherCaptor = ArgumentCaptor.forClass(null);
verify(xdsClient).watchListenerData(eq(PORT), listenerWatcherCaptor.capture());
XdsClient.ListenerWatcher registeredWatcher = listenerWatcherCaptor.getValue();
when(channel.localAddress()).thenReturn(localAddress);
EnvoyServerProtoData.Listener listener =
XdsClientWrapperForServerSdsTest.buildTestListener(
"listener1", "10.1.2.3", PORT, PORT, null, null, null, null, null, null);
XdsClient.ListenerUpdate listenerUpdate =
XdsClient.ListenerUpdate.newBuilder().setListener(listener).build();
registeredWatcher.onListenerChanged(listenerUpdate);
return xdsClientWrapperForServerSds.getDownstreamTlsContext(channel);
}
}

View File

@ -189,19 +189,23 @@ public class SdsProtocolNegotiatorsTest {
DownstreamTlsContext downstreamTlsContext =
buildDownstreamTlsContextFromFilenames(SERVER_1_KEY_FILE, SERVER_1_PEM_FILE, CA_PEM_FILE);
SdsProtocolNegotiators.ServerSdsHandler serverSdsHandler =
new SdsProtocolNegotiators.ServerSdsHandler(grpcHandler, downstreamTlsContext);
pipeline.addLast(serverSdsHandler);
channelHandlerCtx = pipeline.context(serverSdsHandler);
assertNotNull(channelHandlerCtx); // serverSdsHandler ctx is non-null since we just added it
SdsProtocolNegotiators.HandlerPickerHandler handlerPickerHandler =
new SdsProtocolNegotiators.HandlerPickerHandler(grpcHandler, downstreamTlsContext, null);
pipeline.addLast(handlerPickerHandler);
channelHandlerCtx = pipeline.context(handlerPickerHandler);
assertThat(channelHandlerCtx).isNotNull(); // should find HandlerPickerHandler
// kick off protocol negotiation
// kick off protocol negotiation: should replace HandlerPickerHandler with ServerSdsHandler
pipeline.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
channelHandlerCtx = pipeline.context(handlerPickerHandler);
assertThat(channelHandlerCtx).isNull();
channelHandlerCtx = pipeline.context(SdsProtocolNegotiators.ServerSdsHandler.class);
assertThat(channelHandlerCtx).isNotNull();
channel.runPendingTasks(); // need this for tasks to execute on eventLoop
channelHandlerCtx = pipeline.context(serverSdsHandler);
channelHandlerCtx = pipeline.context(SdsProtocolNegotiators.ServerSdsHandler.class);
assertThat(channelHandlerCtx).isNull();
// pipeline should have SslHandler and ServerTlsHandler
// pipeline should only have SslHandler and ServerTlsHandler
Iterator<Map.Entry<String, ChannelHandler>> iterator = pipeline.iterator();
assertThat(iterator.next().getValue()).isInstanceOf(SslHandler.class);
// ProtocolNegotiators.ServerTlsHandler.class is not accessible, get canonical name
@ -209,6 +213,25 @@ public class SdsProtocolNegotiatorsTest {
.contains("ProtocolNegotiators.ServerTlsHandler");
}
@Test
public void serverSdsHandler_nullTlsContext_expectPlaintext() throws IOException {
SdsProtocolNegotiators.HandlerPickerHandler handlerPickerHandler =
new SdsProtocolNegotiators.HandlerPickerHandler(grpcHandler, null, null);
pipeline.addLast(handlerPickerHandler);
channelHandlerCtx = pipeline.context(handlerPickerHandler);
assertThat(channelHandlerCtx).isNotNull(); // should find HandlerPickerHandler
// kick off protocol negotiation
pipeline.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
channelHandlerCtx = pipeline.context(handlerPickerHandler);
assertThat(channelHandlerCtx).isNull();
channel.runPendingTasks(); // need this for tasks to execute on eventLoop
Iterator<Map.Entry<String, ChannelHandler>> iterator = pipeline.iterator();
assertThat(iterator.next().getValue()).isInstanceOf(FakeGrpcHttp2ConnectionHandler.class);
// no more handlers in the pipeline
assertThat(iterator.hasNext()).isFalse();
}
@Test
public void clientSdsProtocolNegotiatorNewHandler_fireProtocolNegotiationEvent()
throws IOException, InterruptedException {