From afe883119d72451a81668f49b6dc4c5a83c20867 Mon Sep 17 00:00:00 2001 From: sanjaypujare Date: Thu, 11 Mar 2021 16:23:50 -0800 Subject: [PATCH] xds: add server side Listener processing to ClientXdsClient (#7955) * xds: add server side Listener processing to ClientXdsClient * address review comments * minor fixes for review comments --- .../java/io/grpc/xds/ClientXdsClient.java | 41 +++++++- .../io/grpc/xds/EnvoyServerProtoData.java | 4 + xds/src/main/java/io/grpc/xds/XdsClient.java | 21 ++++- .../io/grpc/xds/ClientXdsClientDataTest.java | 14 +++ .../io/grpc/xds/ClientXdsClientTestBase.java | 93 +++++++++++++++++++ .../io/grpc/xds/EnvoyServerProtoDataTest.java | 2 + .../java/io/grpc/xds/ServerXdsClientTest.java | 2 + 7 files changed, 171 insertions(+), 6 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 0ace80bd00..40c473a70e 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -156,12 +156,17 @@ final class ClientXdsClient extends AbstractXdsClient { // Unpack HttpConnectionManager messages. Map httpConnectionManagers = new HashMap<>(listeners.size()); + Map serverSideListeners = new HashMap<>(listeners.size()); try { for (Listener listener : listeners) { - HttpConnectionManager hcm = unpackCompatibleType( - listener.getApiListener().getApiListener(), HttpConnectionManager.class, - TYPE_URL_HTTP_CONNECTION_MANAGER, TYPE_URL_HTTP_CONNECTION_MANAGER_V2); - httpConnectionManagers.put(listener.getName(), hcm); + if (listener.hasApiListener()) { + HttpConnectionManager hcm = unpackCompatibleType( + listener.getApiListener().getApiListener(), HttpConnectionManager.class, + TYPE_URL_HTTP_CONNECTION_MANAGER, TYPE_URL_HTTP_CONNECTION_MANAGER_V2); + httpConnectionManagers.put(listener.getName(), hcm); + } else { + serverSideListeners.put(listener.getName(), listener); + } } } catch (InvalidProtocolBufferException e) { getLogger().log( @@ -239,6 +244,21 @@ final class ClientXdsClient extends AbstractXdsClient { } ldsUpdates.put(listenerName, update); } + // process serverSideListeners if any + for (Map.Entry entry : serverSideListeners.entrySet()) { + String listenerName = entry.getKey(); + Listener listener = entry.getValue(); + LdsUpdate update; + + StructOrError convertedListener = + parseServerSideListener(listener); + if (convertedListener.getErrorDetail() != null) { + nackResponse(ResourceType.LDS, nonce, convertedListener.getErrorDetail()); + return; + } + update = new LdsUpdate(convertedListener.getStruct()); + ldsUpdates.put(listenerName, update); + } ackResponse(ResourceType.LDS, versionInfo, nonce); for (String resource : ldsResourceSubscribers.keySet()) { @@ -257,6 +277,19 @@ final class ClientXdsClient extends AbstractXdsClient { } } + @VisibleForTesting static StructOrError parseServerSideListener( + Listener listener) { + try { + return StructOrError.fromStruct( + EnvoyServerProtoData.Listener.fromEnvoyProtoListener(listener)); + } catch (InvalidProtocolBufferException e) { + return StructOrError.fromError( + "Failed to unpack Listener " + listener.getName() + ":" + e.getMessage()); + } catch (IllegalArgumentException e) { + return StructOrError.fromError(e.getMessage()); + } + } + private static StructOrError parseVirtualHost( io.envoyproxy.envoy.config.route.v3.VirtualHost proto) { String name = proto.getName(); diff --git a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java index 1dab0db9fa..330867c6e4 100644 --- a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java +++ b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java @@ -22,6 +22,7 @@ import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TrafficDirection; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; import io.grpc.Internal; import java.net.InetAddress; @@ -455,6 +456,9 @@ public final class EnvoyServerProtoData { static Listener fromEnvoyProtoListener(io.envoyproxy.envoy.config.listener.v3.Listener proto) throws InvalidProtocolBufferException { + if (!proto.getTrafficDirection().equals(TrafficDirection.INBOUND)) { + throw new IllegalArgumentException("Listener " + proto.getName() + " is not INBOUND"); + } List filterChains = new ArrayList<>(proto.getFilterChainsCount()); for (io.envoyproxy.envoy.config.listener.v3.FilterChain filterChain : proto.getFilterChainsList()) { diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 4018770dec..bf29d5ebc1 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -59,6 +59,9 @@ abstract class XdsClient { final boolean hasFaultInjection; @Nullable // Can be null even if hasFaultInjection is true. final HttpFault httpFault; + // Server side Listener. + @Nullable + final Listener listener; LdsUpdate( long httpMaxStreamDurationNano, String rdsName, boolean hasFaultInjection, @@ -82,12 +85,22 @@ abstract class XdsClient { ? null : Collections.unmodifiableList(new ArrayList<>(virtualHosts)); this.hasFaultInjection = hasFaultInjection; this.httpFault = httpFault; + this.listener = null; + } + + LdsUpdate(Listener listener) { + this.listener = listener; + this.httpMaxStreamDurationNano = 0L; + this.rdsName = null; + this.virtualHosts = null; + this.hasFaultInjection = false; + this.httpFault = null; } @Override public int hashCode() { return Objects.hash( - httpMaxStreamDurationNano, rdsName, virtualHosts, hasFaultInjection, httpFault); + httpMaxStreamDurationNano, rdsName, virtualHosts, hasFaultInjection, httpFault, listener); } @Override @@ -103,7 +116,8 @@ abstract class XdsClient { && Objects.equals(rdsName, that.rdsName) && Objects.equals(virtualHosts, that.virtualHosts) && hasFaultInjection == that.hasFaultInjection - && Objects.equals(httpFault, that.httpFault); + && Objects.equals(httpFault, that.httpFault) + && Objects.equals(listener, that.listener); } @Override @@ -119,6 +133,9 @@ abstract class XdsClient { toStringHelper.add("faultInjectionEnabled", true) .add("httpFault", httpFault); } + if (listener != null) { + toStringHelper.add("listener", listener); + } return toStringHelper.toString(); } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java index 7cffacdf5d..08adfdde9b 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java @@ -25,7 +25,9 @@ import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.Locality; import io.envoyproxy.envoy.config.core.v3.RuntimeFractionalPercent; import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TrafficDirection; import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.DirectResponseAction; import io.envoyproxy.envoy.config.route.v3.FilterAction; import io.envoyproxy.envoy.config.route.v3.RedirectAction; @@ -613,4 +615,16 @@ public class ClientXdsClientDataTest { StructOrError struct = ClientXdsClient.parseLocalityLbEndpoints(proto); assertThat(struct.getErrorDetail()).isEqualTo("negative priority"); } + + @Test + public void parseServerSideListener_invalidTrafficDirection() { + Listener listener = + Listener.newBuilder() + .setName("listener1") + .setTrafficDirection(TrafficDirection.OUTBOUND) + .build(); + StructOrError struct = + ClientXdsClient.parseServerSideListener(listener); + assertThat(struct.getErrorDetail()).isEqualTo("Listener listener1 is not INBOUND"); + } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 2ab702e5cc..cc9cd2402d 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -32,8 +32,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.StringValue; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TrafficDirection; +import io.envoyproxy.envoy.config.listener.v3.FilterChain; +import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; import io.grpc.BindableService; import io.grpc.ManagedChannel; @@ -64,6 +69,7 @@ import io.grpc.xds.XdsClient.LdsUpdate; import io.grpc.xds.XdsClient.RdsResourceWatcher; import io.grpc.xds.XdsClient.RdsUpdate; import io.grpc.xds.XdsClient.ResourceWatcher; +import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; @@ -1284,6 +1290,71 @@ public abstract class ClientXdsClientTestBase { // See more test on LoadReportClientTest.java } + private static final String LISTENER_RESOURCE = + "grpc/server?xds.resource.listening_address=0.0.0.0:7000"; + + @Test + public void serverSideListenerFound() throws InvalidProtocolBufferException { + Assume.assumeTrue(useProtocolV3()); + ClientXdsClientTestBase.DiscoveryRpcCall call = + startResourceWatcher(LDS, LISTENER_RESOURCE, ldsResourceWatcher); + Listener listener = + buildListenerWithFilterChain( + LISTENER_RESOURCE, 7000, "0.0.0.0", "google-sds-config-default", "ROOTCA"); + List listeners = ImmutableList.of(Any.pack(listener)); + call.sendResponse(ResourceType.LDS, listeners, "0", "0000"); + // Client sends an ACK LDS request. + call.verifyRequest( + ResourceType.LDS, Collections.singletonList(LISTENER_RESOURCE), "0", "0000", NODE); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().listener) + .isEqualTo(EnvoyServerProtoData.Listener.fromEnvoyProtoListener(listener)); + + listener = + buildListenerWithFilterChain( + LISTENER_RESOURCE, 7000, "0.0.0.0", "CERT2", "ROOTCA2"); + listeners = ImmutableList.of(Any.pack(listener)); + call.sendResponse(ResourceType.LDS, listeners, "1", "0001"); + + // Client sends an ACK LDS request. + call.verifyRequest( + ResourceType.LDS, Collections.singletonList(LISTENER_RESOURCE), "1", "0001", NODE); + verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().listener) + .isEqualTo(EnvoyServerProtoData.Listener.fromEnvoyProtoListener(listener)); + + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void serverSideListenerNotFound() { + Assume.assumeTrue(useProtocolV3()); + ClientXdsClientTestBase.DiscoveryRpcCall call = + startResourceWatcher(LDS, LISTENER_RESOURCE, ldsResourceWatcher); + final FilterChain filterChainInbound = + ServerXdsClientNewServerApiTest.buildFilterChain( + ServerXdsClientNewServerApiTest.buildFilterChainMatch("managed-mtls"), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext( + "google-sds-config-default", "ROOTCA"), + ServerXdsClientNewServerApiTest.buildTestFilter("envoy.http_connection_manager")); + Listener listener = + ServerXdsClientNewServerApiTest.buildListenerWithFilterChain( + "grpc/server?xds.resource.listening_address=0.0.0.0:8000", + 7000, + "0.0.0.0", + filterChainInbound); + List listeners = ImmutableList.of(Any.pack(listener)); + call.sendResponse(ResourceType.LDS, listeners, "0", "0000"); + // Client sends an ACK LDS request. + call.verifyRequest( + ResourceType.LDS, Collections.singletonList(LISTENER_RESOURCE), "0", "0000", NODE); + + verifyNoInteractions(ldsResourceWatcher); + fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(ldsResourceWatcher).onResourceDoesNotExist(LISTENER_RESOURCE); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + private DiscoveryRpcCall startResourceWatcher( ResourceType type, String name, ResourceWatcher watcher) { FakeClock.TaskFilter timeoutTaskFilter; @@ -1416,4 +1487,26 @@ public abstract class ClientXdsClientTestBase { protected abstract Message buildDropOverload(String category, int dropPerMillion); } + + static Listener buildListenerWithFilterChain( + String name, int portValue, String address, String certName, String validationContextName) { + FilterChain filterChain = + ServerXdsClientNewServerApiTest.buildFilterChain( + ServerXdsClientNewServerApiTest.buildFilterChainMatch(), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext( + certName, validationContextName), + ServerXdsClientNewServerApiTest.buildTestFilter("envoy.http_connection_manager")); + io.envoyproxy.envoy.config.core.v3.Address listenerAddress = + io.envoyproxy.envoy.config.core.v3.Address.newBuilder() + .setSocketAddress( + SocketAddress.newBuilder().setPortValue(portValue).setAddress(address)) + .build(); + return Listener.newBuilder() + .setName(name) + .setAddress(listenerAddress) + .setDefaultFilterChain(FilterChain.getDefaultInstance()) + .addAllFilterChains(Arrays.asList(filterChain)) + .setTrafficDirection(TrafficDirection.INBOUND) + .build(); + } } diff --git a/xds/src/test/java/io/grpc/xds/EnvoyServerProtoDataTest.java b/xds/src/test/java/io/grpc/xds/EnvoyServerProtoDataTest.java index 0ee203c09c..e725ca9f2e 100644 --- a/xds/src/test/java/io/grpc/xds/EnvoyServerProtoDataTest.java +++ b/xds/src/test/java/io/grpc/xds/EnvoyServerProtoDataTest.java @@ -24,6 +24,7 @@ import com.google.protobuf.UInt32Value; import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.CidrRange; import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TrafficDirection; import io.envoyproxy.envoy.config.core.v3.TransportSocket; import io.envoyproxy.envoy.config.listener.v3.Filter; import io.envoyproxy.envoy.config.listener.v3.FilterChain; @@ -58,6 +59,7 @@ public class EnvoyServerProtoDataTest { .addFilterChains(createOutFilter()) .addFilterChains(createInFilter()) .setDefaultFilterChain(createDefaultFilterChain()) + .setTrafficDirection(TrafficDirection.INBOUND) .build(); Listener xdsListener = Listener.fromEnvoyProtoListener(listener); diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java index a76aa321e0..c92ee475a8 100644 --- a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java +++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java @@ -44,6 +44,7 @@ import io.envoyproxy.envoy.api.v2.Listener; import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext; import io.envoyproxy.envoy.api.v2.core.CidrRange; import io.envoyproxy.envoy.api.v2.core.SocketAddress; +import io.envoyproxy.envoy.api.v2.core.TrafficDirection; import io.envoyproxy.envoy.api.v2.core.TransportSocket; import io.envoyproxy.envoy.api.v2.listener.Filter; import io.envoyproxy.envoy.api.v2.listener.FilterChain; @@ -775,6 +776,7 @@ public class ServerXdsClientTest { .setName(name) .setAddress(listenerAddress) .addAllFilterChains(Arrays.asList(filterChains)) + .setTrafficDirection(TrafficDirection.INBOUND) .build(); }