xds: refactor XdsClient test to cover protocol version v2 and v3 (#7577)

This change refactors client side XdsClient's unit test. The main testing logic (test cases) will being the abstract class while the extended classes will be providing xDS version-specific services and messages. With this approach, we do not suffer from maintaining two copies of test logics in order to cover both v2 and v3 xDS protocols. So every time making changes to XdsClient's own logic, we only need to modify the corresponding test logic in the abstract class. Also, this approach could be sustainable for future xDS protocol version upgrades without necessity to re-implement test logics.
This commit is contained in:
Chengyuan Zhang 2020-11-04 13:47:27 -08:00 committed by GitHub
parent d7764d7e32
commit 8020a735f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1392 additions and 1058 deletions

View File

@ -1,313 +0,0 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsClientTestHelper.buildClusterV2;
import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryRequestV2;
import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponseV2;
import static io.grpc.xds.XdsClientTestHelper.buildSecureClusterV2;
import static io.grpc.xds.XdsClientTestHelper.buildUpstreamTlsContextV2;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.ManagedChannel;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.internal.FakeClock.ScheduledTask;
import io.grpc.internal.FakeClock.TaskFilter;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.EdsResourceWatcher;
import io.grpc.xds.XdsClient.LdsResourceWatcher;
import io.grpc.xds.XdsClient.RdsResourceWatcher;
import io.grpc.xds.XdsClient.ResourceWatcher;
import io.grpc.xds.XdsClient.XdsChannel;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Tests for {@link ClientXdsClient} for xDS v2.
*/
@RunWith(JUnit4.class)
public class ClientXdsClientTestV2 {
private static final String CDS_RESOURCE = "cluster.googleapis.com";
private static final Node NODE = Node.newBuilder().build();
private static final TaskFilter LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER =
new TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(ResourceType.LDS.toString());
}
};
private static final TaskFilter RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER =
new TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(ResourceType.RDS.toString());
}
};
private static final TaskFilter CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER =
new TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(ResourceType.CDS.toString());
}
};
private static final TaskFilter EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER =
new TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(ResourceType.EDS.toString());
}
};
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private final FakeClock fakeClock = new FakeClock();
private final Queue<RpcCall<DiscoveryRequest, DiscoveryResponse>> resourceDiscoveryCalls =
new ArrayDeque<>();
private final Queue<RpcCall<LoadStatsRequest, LoadStatsResponse>> loadReportCalls =
new ArrayDeque<>();
private final AtomicBoolean adsEnded = new AtomicBoolean(true);
private final AtomicBoolean lrsEnded = new AtomicBoolean(true);
@Captor
private ArgumentCaptor<CdsUpdate> cdsUpdateCaptor;
@Mock
private BackoffPolicy.Provider backoffPolicyProvider;
@Mock
private BackoffPolicy backoffPolicy1;
@Mock
private BackoffPolicy backoffPolicy2;
@Mock
private CdsResourceWatcher cdsResourceWatcher;
private ManagedChannel channel;
private ClientXdsClient xdsClient;
@Before
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L);
final String serverName = InProcessServerBuilder.generateName();
AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended
adsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<DiscoveryRequest> requestObserver = mock(StreamObserver.class);
RpcCall<DiscoveryRequest, DiscoveryResponse> call =
new RpcCall<>(requestObserver, responseObserver);
resourceDiscoveryCalls.offer(call);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
adsEnded.set(true);
}
}, MoreExecutors.directExecutor());
return requestObserver;
}
};
LoadReportingServiceImplBase lrsServiceImpl = new LoadReportingServiceImplBase() {
@Override
public StreamObserver<LoadStatsRequest> streamLoadStats(
StreamObserver<LoadStatsResponse> responseObserver) {
assertThat(lrsEnded.get()).isTrue();
lrsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<LoadStatsRequest> requestObserver = mock(StreamObserver.class);
RpcCall<LoadStatsRequest, LoadStatsResponse> call =
new RpcCall<>(requestObserver, responseObserver);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
lrsEnded.set(true);
}
}, MoreExecutors.directExecutor());
loadReportCalls.offer(call);
return requestObserver;
}
};
cleanupRule.register(
InProcessServerBuilder
.forName(serverName)
.addService(adsServiceImpl)
.addService(lrsServiceImpl)
.directExecutor()
.build()
.start());
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
xdsClient =
new ClientXdsClient(
new XdsChannel(channel, /* useProtocolV3= */ false),
EnvoyProtoData.Node.newBuilder().build(),
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
fakeClock.getStopwatchSupplier());
assertThat(resourceDiscoveryCalls).isEmpty();
assertThat(loadReportCalls).isEmpty();
}
@After
public void tearDown() {
xdsClient.shutdown();
assertThat(adsEnded.get()).isTrue();
assertThat(lrsEnded.get()).isTrue();
assertThat(channel.isShutdown()).isTrue();
assertThat(fakeClock.getPendingTasks()).isEmpty();
}
/**
* CDS response containing UpstreamTlsContext for a cluster.
*/
@Test
public void cdsResponseV2WithUpstreamTlsContext() {
RpcCall<DiscoveryRequest, DiscoveryResponse> call =
startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher);
// Management server sends back CDS response with UpstreamTlsContext.
io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext testUpstreamTlsContext =
buildUpstreamTlsContextV2("secret1", "unix:/var/uds2");
List<Any> clusters = ImmutableList.of(
Any.pack(buildClusterV2("cluster-bar.googleapis.com", null, false)),
Any.pack(buildSecureClusterV2(CDS_RESOURCE,
"eds-cluster-foo.googleapis.com", true, testUpstreamTlsContext)),
Any.pack(buildClusterV2("cluster-baz.googleapis.com", null, false)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", clusters, AbstractXdsClient.ADS_TYPE_URL_CDS_V2, "0000");
call.responseObserver.onNext(response);
// Client sent an ACK CDS request.
verify(call.requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", CDS_RESOURCE,
AbstractXdsClient.ADS_TYPE_URL_CDS_V2, "0000")));
verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate
.getUpstreamTlsContext();
SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext()
.getValidationContextSdsSecretConfig();
assertThat(validationContextSdsSecretConfig.getName()).isEqualTo("secret1");
assertThat(
Iterables.getOnlyElement(
validationContextSdsSecretConfig
.getSdsConfig()
.getApiConfigSource()
.getGrpcServicesList())
.getGoogleGrpc()
.getTargetUri())
.isEqualTo("unix:/var/uds2");
}
private RpcCall<DiscoveryRequest, DiscoveryResponse> startResourceWatcher(
ResourceType type, String name, ResourceWatcher watcher) {
TaskFilter timeoutTaskFilter;
switch (type) {
case LDS:
timeoutTaskFilter = LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
xdsClient.watchLdsResource(name, (LdsResourceWatcher) watcher);
break;
case RDS:
timeoutTaskFilter = RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
xdsClient.watchRdsResource(name, (RdsResourceWatcher) watcher);
break;
case CDS:
timeoutTaskFilter = CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
xdsClient.watchCdsResource(name, (CdsResourceWatcher) watcher);
break;
case EDS:
timeoutTaskFilter = EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER;
xdsClient.watchEdsResource(name, (EdsResourceWatcher) watcher);
break;
case UNKNOWN:
default:
throw new AssertionError("should never be here");
}
RpcCall<DiscoveryRequest, DiscoveryResponse> call = resourceDiscoveryCalls.poll();
verify(call.requestObserver).onNext(
eq(buildDiscoveryRequestV2(NODE, "", name, type.typeUrlV2(), "")));
ScheduledTask timeoutTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter));
assertThat(timeoutTask.getDelay(TimeUnit.SECONDS))
.isEqualTo(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC);
return call;
}
private static class RpcCall<ReqT, RespT> {
private final StreamObserver<ReqT> requestObserver;
private final StreamObserver<RespT> responseObserver;
RpcCall(StreamObserver<ReqT> requestObserver, StreamObserver<RespT> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
}
}
}

View File

@ -0,0 +1,505 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.Message;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.Cluster;
import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType;
import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.Listener;
import io.envoyproxy.envoy.api.v2.RouteConfiguration;
import io.envoyproxy.envoy.api.v2.auth.CommonTlsContext;
import io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig;
import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext;
import io.envoyproxy.envoy.api.v2.cluster.CircuitBreakers;
import io.envoyproxy.envoy.api.v2.cluster.CircuitBreakers.Thresholds;
import io.envoyproxy.envoy.api.v2.core.Address;
import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource;
import io.envoyproxy.envoy.api.v2.core.ApiConfigSource;
import io.envoyproxy.envoy.api.v2.core.ConfigSource;
import io.envoyproxy.envoy.api.v2.core.GrpcService;
import io.envoyproxy.envoy.api.v2.core.GrpcService.GoogleGrpc;
import io.envoyproxy.envoy.api.v2.core.HealthStatus;
import io.envoyproxy.envoy.api.v2.core.Locality;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.core.RoutingPriority;
import io.envoyproxy.envoy.api.v2.core.SelfConfigSource;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.core.TransportSocket;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.api.v2.endpoint.Endpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints;
import io.envoyproxy.envoy.api.v2.listener.FilterChain;
import io.envoyproxy.envoy.api.v2.route.Route;
import io.envoyproxy.envoy.api.v2.route.RouteAction;
import io.envoyproxy.envoy.api.v2.route.VirtualHost;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds;
import io.envoyproxy.envoy.config.listener.v2.ApiListener;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
import io.envoyproxy.envoy.type.FractionalPercent;
import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
/**
* Tests for {@link ClientXdsClient} with protocol version v2.
*/
@RunWith(JUnit4.class)
public class ClientXdsClientV2Test extends ClientXdsClientTestBase {
@Override
protected BindableService createAdsService() {
return new AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended
adsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<DiscoveryRequest> requestObserver = mock(StreamObserver.class);
DiscoveryRpcCall call = new DiscoveryRpcCallV2(requestObserver, responseObserver);
resourceDiscoveryCalls.offer(call);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
adsEnded.set(true);
}
}, MoreExecutors.directExecutor());
return requestObserver;
}
};
}
@Override
protected BindableService createLrsService() {
return new LoadReportingServiceImplBase() {
@Override
public StreamObserver<LoadStatsRequest> streamLoadStats(
StreamObserver<LoadStatsResponse> responseObserver) {
assertThat(lrsEnded.get()).isTrue();
lrsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<LoadStatsRequest> requestObserver = mock(StreamObserver.class);
LrsRpcCall call = new LrsRpcCallV2(requestObserver, responseObserver);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
lrsEnded.set(true);
}
}, MoreExecutors.directExecutor());
loadReportCalls.offer(call);
return requestObserver;
}
};
}
@Override
protected MessageFactory createMessageFactory() {
return new MessageFactoryV2();
}
@Override
protected boolean useProtocolV3() {
return false;
}
private static class DiscoveryRpcCallV2 extends DiscoveryRpcCall {
StreamObserver<DiscoveryRequest> requestObserver;
StreamObserver<DiscoveryResponse> responseObserver;
private DiscoveryRpcCallV2(StreamObserver<DiscoveryRequest> requestObserver,
StreamObserver<DiscoveryResponse> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
}
@Override
protected void verifyRequest(EnvoyProtoData.Node node, String versionInfo,
List<String> resources, ResourceType type, String nonce) {
verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNodeV2(), versionInfo, resources, type.typeUrlV2(), nonce)));
}
@Override
protected void verifyNoMoreRequest() {
verifyNoMoreInteractions(requestObserver);
}
@Override
protected void sendResponse(String versionInfo, List<Any> resources, ResourceType type,
String nonce) {
DiscoveryResponse response =
DiscoveryResponse.newBuilder()
.setVersionInfo(versionInfo)
.addAllResources(resources)
.setTypeUrl(type.typeUrl())
.setNonce(nonce)
.build();
responseObserver.onNext(response);
}
@Override
protected void sendError(Throwable t) {
responseObserver.onError(t);
}
@Override
protected void sendCompleted() {
responseObserver.onCompleted();
}
}
private static class LrsRpcCallV2 extends LrsRpcCall {
private final StreamObserver<LoadStatsRequest> requestObserver;
private final StreamObserver<LoadStatsResponse> responseObserver;
private final InOrder inOrder;
private LrsRpcCallV2(StreamObserver<LoadStatsRequest> requestObserver,
StreamObserver<LoadStatsResponse> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
inOrder = inOrder(requestObserver);
}
@Override
protected void verifyNextReportClusters(List<String[]> clusters) {
inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters)));
}
@Override
protected void sendResponse(List<String> clusters, long loadReportIntervalNano) {
LoadStatsResponse response =
LoadStatsResponse.newBuilder()
.addAllClusters(clusters)
.setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano))
.build();
responseObserver.onNext(response);
}
}
private static class MessageFactoryV2 extends MessageFactory {
@Override
protected Message buildListener(String name, Message routeConfiguration) {
return Listener.newBuilder()
.setName(name)
.setAddress(Address.getDefaultInstance())
.addFilterChains(FilterChain.getDefaultInstance())
.setApiListener(
ApiListener.newBuilder().setApiListener(Any.pack(
HttpConnectionManager.newBuilder()
.setRouteConfig((RouteConfiguration) routeConfiguration).build())))
.build();
}
@Override
protected Message buildListenerForRds(String name, String rdsResourceName) {
return Listener.newBuilder()
.setName(name)
.setAddress(Address.getDefaultInstance())
.addFilterChains(FilterChain.getDefaultInstance())
.setApiListener(
ApiListener.newBuilder().setApiListener(Any.pack(
HttpConnectionManager.newBuilder()
.setRds(
Rds.newBuilder()
.setRouteConfigName(rdsResourceName)
.setConfigSource(
ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance())))
.build())))
.build();
}
@Override
protected Message buildRouteConfiguration(String name, List<Message> virtualHostList) {
RouteConfiguration.Builder builder = RouteConfiguration.newBuilder();
builder.setName(name);
for (Message virtualHost : virtualHostList) {
builder.addVirtualHosts((VirtualHost) virtualHost);
}
return builder.build();
}
@Override
protected List<Message> buildOpaqueVirtualHosts(int num) {
List<Message> virtualHosts = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
VirtualHost virtualHost =
VirtualHost.newBuilder()
.setName(num + ": do not care")
.addDomains("do not care")
.addRoutes(
Route.newBuilder()
.setRoute(RouteAction.newBuilder().setCluster("do not care"))
.setMatch(io.envoyproxy.envoy.api.v2.route.RouteMatch.newBuilder()
.setPrefix("do not care")))
.build();
virtualHosts.add(virtualHost);
}
return virtualHosts;
}
@Override
protected Message buildCluster(String clusterName, @Nullable String edsServiceName,
boolean enableLrs, @Nullable Message upstreamTlsContext,
@Nullable Message circuitBreakers) {
Cluster.Builder builder = Cluster.newBuilder();
builder.setName(clusterName);
builder.setType(DiscoveryType.EDS);
EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder();
edsClusterConfigBuilder.setEdsConfig(
ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance()));
if (edsServiceName != null) {
edsClusterConfigBuilder.setServiceName(edsServiceName);
}
builder.setEdsClusterConfig(edsClusterConfigBuilder);
builder.setLbPolicy(LbPolicy.ROUND_ROBIN);
if (enableLrs) {
builder.setLrsServer(
ConfigSource.newBuilder()
.setSelf(SelfConfigSource.getDefaultInstance()));
}
if (upstreamTlsContext != null) {
builder.setTransportSocket(
TransportSocket.newBuilder()
.setName("envoy.transport_sockets.tls")
.setTypedConfig(Any.pack(upstreamTlsContext)));
}
if (circuitBreakers != null) {
builder.setCircuitBreakers((CircuitBreakers) circuitBreakers);
}
return builder.build();
}
@Override
protected Message buildUpstreamTlsContext(String secretName, String targetUri) {
GrpcService grpcService =
GrpcService.newBuilder()
.setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(targetUri))
.build();
ConfigSource sdsConfig =
ConfigSource.newBuilder()
.setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService))
.build();
SdsSecretConfig validationContextSdsSecretConfig =
SdsSecretConfig.newBuilder()
.setName(secretName)
.setSdsConfig(sdsConfig)
.build();
return UpstreamTlsContext.newBuilder()
.setCommonTlsContext(
CommonTlsContext.newBuilder()
.setValidationContextSdsSecretConfig(validationContextSdsSecretConfig))
.build();
}
@Override
protected Message buildCircuitBreakers(int highPriorityMaxRequests,
int defaultPriorityMaxRequests) {
return CircuitBreakers.newBuilder()
.addThresholds(
Thresholds.newBuilder()
.setPriority(RoutingPriority.HIGH)
.setMaxRequests(UInt32Value.newBuilder().setValue(highPriorityMaxRequests)))
.addThresholds(
Thresholds.newBuilder()
.setPriority(RoutingPriority.DEFAULT)
.setMaxRequests(UInt32Value.newBuilder().setValue(defaultPriorityMaxRequests)))
.build();
}
@Override
protected Message buildClusterLoadAssignment(String cluster,
List<Message> localityLbEndpointsList, List<Message> dropOverloadList) {
ClusterLoadAssignment.Builder builder = ClusterLoadAssignment.newBuilder();
builder.setClusterName(cluster);
for (Message localityLbEndpoints : localityLbEndpointsList) {
builder.addEndpoints((LocalityLbEndpoints) localityLbEndpoints);
}
Policy.Builder policyBuilder = Policy.newBuilder();
for (Message dropOverload : dropOverloadList) {
policyBuilder.addDropOverloads((DropOverload) dropOverload);
}
builder.setPolicy(policyBuilder);
return builder.build();
}
@Override
protected Message buildLocalityLbEndpoints(String region, String zone, String subZone,
List<Message> lbEndpointList, int loadBalancingWeight, int priority) {
LocalityLbEndpoints.Builder builder = LocalityLbEndpoints.newBuilder();
builder.setLocality(
Locality.newBuilder().setRegion(region).setZone(zone).setSubZone(subZone));
for (Message lbEndpoint : lbEndpointList) {
builder.addLbEndpoints((LbEndpoint) lbEndpoint);
}
builder.setLoadBalancingWeight(UInt32Value.of(loadBalancingWeight));
builder.setPriority(priority);
return builder.build();
}
@Override
protected Message buildLbEndpoint(String address, int port, String healthStatus,
int lbWeight) {
HealthStatus status;
switch (healthStatus) {
case "unknown":
status = HealthStatus.UNKNOWN;
break;
case "healthy":
status = HealthStatus.HEALTHY;
break;
case "unhealthy":
status = HealthStatus.UNHEALTHY;
break;
case "draining":
status = HealthStatus.DRAINING;
break;
case "timeout":
status = HealthStatus.TIMEOUT;
break;
case "degraded":
status = HealthStatus.DEGRADED;
break;
default:
status = HealthStatus.UNRECOGNIZED;
}
return LbEndpoint.newBuilder()
.setEndpoint(
Endpoint.newBuilder().setAddress(
Address.newBuilder().setSocketAddress(
SocketAddress.newBuilder().setAddress(address).setPortValue(port))))
.setHealthStatus(status)
.setLoadBalancingWeight(UInt32Value.of(lbWeight))
.build();
}
@Override
protected Message buildDropOverload(String category, int dropPerMillion) {
return DropOverload.newBuilder()
.setCategory(category)
.setDropPercentage(
FractionalPercent.newBuilder()
.setNumerator(dropPerMillion)
.setDenominator(DenominatorType.MILLION))
.build();
}
}
/**
* Matches a {@link DiscoveryRequest} with the same node metadata, versionInfo, typeUrl,
* response nonce and collection of resource names regardless of order.
*/
private static class DiscoveryRequestMatcher implements ArgumentMatcher<DiscoveryRequest> {
private final Node node;
private final String versionInfo;
private final String typeUrl;
private final Set<String> resources;
private final String responseNonce;
private DiscoveryRequestMatcher(Node node, String versionInfo, List<String> resources,
String typeUrl, String responseNonce) {
this.node = node;
this.versionInfo = versionInfo;
this.resources = new HashSet<>(resources);
this.typeUrl = typeUrl;
this.responseNonce = responseNonce;
}
@Override
public boolean matches(DiscoveryRequest argument) {
if (!typeUrl.equals(argument.getTypeUrl())) {
return false;
}
if (!versionInfo.equals(argument.getVersionInfo())) {
return false;
}
if (!responseNonce.equals(argument.getResponseNonce())) {
return false;
}
if (!resources.equals(new HashSet<>(argument.getResourceNamesList()))) {
return false;
}
return node.equals(argument.getNode());
}
}
/**
* Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with
* the same list of clusterName:clusterServiceName pair.
*/
private static class LrsRequestMatcher implements ArgumentMatcher<LoadStatsRequest> {
private final List<String> expected;
private LrsRequestMatcher(List<String[]> clusterNames) {
expected = new ArrayList<>();
for (String[] pair : clusterNames) {
expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1]));
}
Collections.sort(expected);
}
@Override
public boolean matches(LoadStatsRequest argument) {
List<String> actual = new ArrayList<>();
for (ClusterStats clusterStats : argument.getClusterStatsList()) {
actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName());
}
Collections.sort(actual);
return actual.equals(expected);
}
}
}

View File

@ -0,0 +1,505 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.Message;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers;
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource;
import io.envoyproxy.envoy.config.core.v3.ApiConfigSource;
import io.envoyproxy.envoy.config.core.v3.ConfigSource;
import io.envoyproxy.envoy.config.core.v3.GrpcService;
import io.envoyproxy.envoy.config.core.v3.GrpcService.GoogleGrpc;
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
import io.envoyproxy.envoy.config.core.v3.Locality;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
import io.envoyproxy.envoy.config.core.v3.SelfConfigSource;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.config.core.v3.TransportSocket;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy.DropOverload;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats;
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints;
import io.envoyproxy.envoy.config.listener.v3.ApiListener;
import io.envoyproxy.envoy.config.listener.v3.FilterChain;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.Route;
import io.envoyproxy.envoy.config.route.v3.RouteAction;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.config.route.v3.RouteMatch;
import io.envoyproxy.envoy.config.route.v3.VirtualHost;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
/**
* Tests for {@link ClientXdsClient} with protocol version v3.
*/
@RunWith(JUnit4.class)
public class ClientXdsClientV3Test extends ClientXdsClientTestBase {
@Override
protected BindableService createAdsService() {
return new AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended
adsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<DiscoveryRequest> requestObserver = mock(StreamObserver.class);
DiscoveryRpcCall call = new DiscoveryRpcCallV3(requestObserver, responseObserver);
resourceDiscoveryCalls.offer(call);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
adsEnded.set(true);
}
}, MoreExecutors.directExecutor());
return requestObserver;
}
};
}
@Override
protected BindableService createLrsService() {
return new LoadReportingServiceImplBase() {
@Override
public StreamObserver<LoadStatsRequest> streamLoadStats(
StreamObserver<LoadStatsResponse> responseObserver) {
assertThat(lrsEnded.get()).isTrue();
lrsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<LoadStatsRequest> requestObserver = mock(StreamObserver.class);
LrsRpcCall call = new LrsRpcCallV3(requestObserver, responseObserver);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
lrsEnded.set(true);
}
}, MoreExecutors.directExecutor());
loadReportCalls.offer(call);
return requestObserver;
}
};
}
@Override
protected MessageFactory createMessageFactory() {
return new MessageFactoryV3();
}
@Override
protected boolean useProtocolV3() {
return true;
}
private static class DiscoveryRpcCallV3 extends DiscoveryRpcCall {
StreamObserver<DiscoveryRequest> requestObserver;
StreamObserver<DiscoveryResponse> responseObserver;
private DiscoveryRpcCallV3(StreamObserver<DiscoveryRequest> requestObserver,
StreamObserver<DiscoveryResponse> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
}
@Override
protected void verifyRequest(EnvoyProtoData.Node node, String versionInfo,
List<String> resources, ResourceType type, String nonce) {
verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce)));
}
@Override
protected void verifyNoMoreRequest() {
verifyNoMoreInteractions(requestObserver);
}
@Override
protected void sendResponse(String versionInfo, List<Any> resources, ResourceType type,
String nonce) {
DiscoveryResponse response =
DiscoveryResponse.newBuilder()
.setVersionInfo(versionInfo)
.addAllResources(resources)
.setTypeUrl(type.typeUrl())
.setNonce(nonce)
.build();
responseObserver.onNext(response);
}
@Override
protected void sendError(Throwable t) {
responseObserver.onError(t);
}
@Override
protected void sendCompleted() {
responseObserver.onCompleted();
}
}
private static class LrsRpcCallV3 extends LrsRpcCall {
private final StreamObserver<LoadStatsRequest> requestObserver;
private final StreamObserver<LoadStatsResponse> responseObserver;
private final InOrder inOrder;
private LrsRpcCallV3(StreamObserver<LoadStatsRequest> requestObserver,
StreamObserver<LoadStatsResponse> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
inOrder = inOrder(requestObserver);
}
@Override
protected void verifyNextReportClusters(List<String[]> clusters) {
inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters)));
}
@Override
protected void sendResponse(List<String> clusters, long loadReportIntervalNano) {
LoadStatsResponse response =
LoadStatsResponse.newBuilder()
.addAllClusters(clusters)
.setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano))
.build();
responseObserver.onNext(response);
}
}
private static class MessageFactoryV3 extends MessageFactory {
@Override
protected Message buildListener(String name, Message routeConfiguration) {
return Listener.newBuilder()
.setName(name)
.setAddress(Address.getDefaultInstance())
.addFilterChains(FilterChain.getDefaultInstance())
.setApiListener(
ApiListener.newBuilder().setApiListener(Any.pack(
HttpConnectionManager.newBuilder()
.setRouteConfig((RouteConfiguration) routeConfiguration).build())))
.build();
}
@Override
protected Message buildListenerForRds(String name, String rdsResourceName) {
return Listener.newBuilder()
.setName(name)
.setAddress(Address.getDefaultInstance())
.addFilterChains(FilterChain.getDefaultInstance())
.setApiListener(
ApiListener.newBuilder().setApiListener(Any.pack(
HttpConnectionManager.newBuilder()
.setRds(
Rds.newBuilder()
.setRouteConfigName(rdsResourceName)
.setConfigSource(
ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance())))
.build())))
.build();
}
@Override
protected Message buildRouteConfiguration(String name, List<Message> virtualHostList) {
RouteConfiguration.Builder builder = RouteConfiguration.newBuilder();
builder.setName(name);
for (Message virtualHost : virtualHostList) {
builder.addVirtualHosts((VirtualHost) virtualHost);
}
return builder.build();
}
@Override
protected List<Message> buildOpaqueVirtualHosts(int num) {
List<Message> virtualHosts = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
VirtualHost virtualHost =
VirtualHost.newBuilder()
.setName(num + ": do not care")
.addDomains("do not care")
.addRoutes(
Route.newBuilder()
.setRoute(RouteAction.newBuilder().setCluster("do not care"))
.setMatch(RouteMatch.newBuilder().setPrefix("do not care")))
.build();
virtualHosts.add(virtualHost);
}
return virtualHosts;
}
@Override
protected Message buildCluster(String clusterName, @Nullable String edsServiceName,
boolean enableLrs, @Nullable Message upstreamTlsContext,
@Nullable Message circuitBreakers) {
Cluster.Builder builder = Cluster.newBuilder();
builder.setName(clusterName);
builder.setType(DiscoveryType.EDS);
EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder();
edsClusterConfigBuilder.setEdsConfig(
ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance()));
if (edsServiceName != null) {
edsClusterConfigBuilder.setServiceName(edsServiceName);
}
builder.setEdsClusterConfig(edsClusterConfigBuilder);
builder.setLbPolicy(LbPolicy.ROUND_ROBIN);
if (enableLrs) {
builder.setLrsServer(
ConfigSource.newBuilder()
.setSelf(SelfConfigSource.getDefaultInstance()));
}
if (upstreamTlsContext != null) {
builder.setTransportSocket(
TransportSocket.newBuilder()
.setName("envoy.transport_sockets.tls")
.setTypedConfig(Any.pack(upstreamTlsContext)));
}
if (circuitBreakers != null) {
builder.setCircuitBreakers((CircuitBreakers) circuitBreakers);
}
return builder.build();
}
@Override
protected Message buildUpstreamTlsContext(String secretName, String targetUri) {
GrpcService grpcService =
GrpcService.newBuilder()
.setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(targetUri))
.build();
ConfigSource sdsConfig =
ConfigSource.newBuilder()
.setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService))
.build();
SdsSecretConfig validationContextSdsSecretConfig =
SdsSecretConfig.newBuilder()
.setName(secretName)
.setSdsConfig(sdsConfig)
.build();
return UpstreamTlsContext.newBuilder()
.setCommonTlsContext(
CommonTlsContext.newBuilder()
.setValidationContextSdsSecretConfig(validationContextSdsSecretConfig))
.build();
}
@Override
protected Message buildCircuitBreakers(int highPriorityMaxRequests,
int defaultPriorityMaxRequests) {
return CircuitBreakers.newBuilder()
.addThresholds(
Thresholds.newBuilder()
.setPriority(RoutingPriority.HIGH)
.setMaxRequests(UInt32Value.newBuilder().setValue(highPriorityMaxRequests)))
.addThresholds(
Thresholds.newBuilder()
.setPriority(RoutingPriority.DEFAULT)
.setMaxRequests(UInt32Value.newBuilder().setValue(defaultPriorityMaxRequests)))
.build();
}
@Override
protected Message buildClusterLoadAssignment(String cluster,
List<Message> localityLbEndpointsList, List<Message> dropOverloadList) {
ClusterLoadAssignment.Builder builder = ClusterLoadAssignment.newBuilder();
builder.setClusterName(cluster);
for (Message localityLbEndpoints : localityLbEndpointsList) {
builder.addEndpoints((LocalityLbEndpoints) localityLbEndpoints);
}
Policy.Builder policyBuilder = Policy.newBuilder();
for (Message dropOverload : dropOverloadList) {
policyBuilder.addDropOverloads((DropOverload) dropOverload);
}
builder.setPolicy(policyBuilder);
return builder.build();
}
@Override
protected Message buildLocalityLbEndpoints(String region, String zone, String subZone,
List<Message> lbEndpointList, int loadBalancingWeight, int priority) {
LocalityLbEndpoints.Builder builder = LocalityLbEndpoints.newBuilder();
builder.setLocality(
Locality.newBuilder().setRegion(region).setZone(zone).setSubZone(subZone));
for (Message lbEndpoint : lbEndpointList) {
builder.addLbEndpoints((LbEndpoint) lbEndpoint);
}
builder.setLoadBalancingWeight(UInt32Value.of(loadBalancingWeight));
builder.setPriority(priority);
return builder.build();
}
@Override
protected Message buildLbEndpoint(String address, int port, String healthStatus,
int lbWeight) {
HealthStatus status;
switch (healthStatus) {
case "unknown":
status = HealthStatus.UNKNOWN;
break;
case "healthy":
status = HealthStatus.HEALTHY;
break;
case "unhealthy":
status = HealthStatus.UNHEALTHY;
break;
case "draining":
status = HealthStatus.DRAINING;
break;
case "timeout":
status = HealthStatus.TIMEOUT;
break;
case "degraded":
status = HealthStatus.DEGRADED;
break;
default:
status = HealthStatus.UNRECOGNIZED;
}
return LbEndpoint.newBuilder()
.setEndpoint(
Endpoint.newBuilder().setAddress(
Address.newBuilder().setSocketAddress(
SocketAddress.newBuilder().setAddress(address).setPortValue(port))))
.setHealthStatus(status)
.setLoadBalancingWeight(UInt32Value.of(lbWeight))
.build();
}
@Override
protected Message buildDropOverload(String category, int dropPerMillion) {
return DropOverload.newBuilder()
.setCategory(category)
.setDropPercentage(
FractionalPercent.newBuilder()
.setNumerator(dropPerMillion)
.setDenominator(DenominatorType.MILLION))
.build();
}
}
/**
* Matches a {@link DiscoveryRequest} with the same node metadata, versionInfo, typeUrl,
* response nonce and collection of resource names regardless of order.
*/
private static class DiscoveryRequestMatcher implements ArgumentMatcher<DiscoveryRequest> {
private final Node node;
private final String versionInfo;
private final String typeUrl;
private final Set<String> resources;
private final String responseNonce;
private DiscoveryRequestMatcher(Node node, String versionInfo, List<String> resources,
String typeUrl, String responseNonce) {
this.node = node;
this.versionInfo = versionInfo;
this.resources = new HashSet<>(resources);
this.typeUrl = typeUrl;
this.responseNonce = responseNonce;
}
@Override
public boolean matches(DiscoveryRequest argument) {
if (!typeUrl.equals(argument.getTypeUrl())) {
return false;
}
if (!versionInfo.equals(argument.getVersionInfo())) {
return false;
}
if (!responseNonce.equals(argument.getResponseNonce())) {
return false;
}
if (!resources.equals(new HashSet<>(argument.getResourceNamesList()))) {
return false;
}
return node.equals(argument.getNode());
}
}
/**
* Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with
* the same list of clusterName:clusterServiceName pair.
*/
private static class LrsRequestMatcher implements ArgumentMatcher<LoadStatsRequest> {
private final List<String> expected;
private LrsRequestMatcher(List<String[]> clusterNames) {
expected = new ArrayList<>();
for (String[] pair : clusterNames) {
expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1]));
}
Collections.sort(expected);
}
@Override
public boolean matches(LoadStatsRequest argument) {
List<String> actual = new ArrayList<>();
for (ClusterStats clusterStats : argument.getClusterStatsList()) {
actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName());
}
Collections.sort(actual);
return actual.equals(expected);
}
}
}

View File

@ -25,10 +25,7 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource;
import io.envoyproxy.envoy.config.core.v3.ApiConfigSource;
import io.envoyproxy.envoy.config.core.v3.ConfigSource;
import io.envoyproxy.envoy.config.core.v3.GrpcService;
import io.envoyproxy.envoy.config.core.v3.GrpcService.GoogleGrpc;
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
import io.envoyproxy.envoy.config.core.v3.Locality;
import io.envoyproxy.envoy.config.core.v3.SelfConfigSource;
@ -48,21 +45,19 @@ import io.envoyproxy.envoy.config.route.v3.RouteAction;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.config.route.v3.RouteMatch;
import io.envoyproxy.envoy.config.route.v3.VirtualHost;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
import io.grpc.xds.EnvoyProtoData.Node;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
/**
* Helper methods for building protobuf messages with custom data for xDS protocols.
*/
// TODO(chengyuanzhang, sanjaypujare): delete this class, should not dump everything here.
class XdsClientTestHelper {
static DiscoveryResponse buildDiscoveryResponse(String versionInfo,
List<Any> resources, String typeUrl, String nonce) {
@ -86,11 +81,6 @@ class XdsClientTestHelper {
.build();
}
static DiscoveryRequest buildDiscoveryRequest(Node node, String versionInfo,
String resourceName, String typeUrl, String nonce) {
return buildDiscoveryRequest(node, versionInfo, ImmutableList.of(resourceName), typeUrl, nonce);
}
static DiscoveryRequest buildDiscoveryRequest(Node node, String versionInfo,
List<String> resourceNames, String typeUrl, String nonce) {
return
@ -161,23 +151,6 @@ class XdsClientTestHelper {
.build();
}
static List<VirtualHost> buildVirtualHosts(int num) {
List<VirtualHost> virtualHosts = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
VirtualHost virtualHost =
VirtualHost.newBuilder()
.setName(num + ": do not care")
.addDomains("do not care")
.addRoutes(
Route.newBuilder()
.setRoute(RouteAction.newBuilder().setCluster("do not care"))
.setMatch(RouteMatch.newBuilder().setPrefix("do not care")))
.build();
virtualHosts.add(virtualHost);
}
return virtualHosts;
}
static VirtualHost buildVirtualHost(List<String> domains, String clusterName) {
return VirtualHost.newBuilder()
.setName("virtualhost00.googleapis.com") // don't care
@ -283,22 +256,6 @@ class XdsClientTestHelper {
.build();
}
@SuppressWarnings("deprecation") // disableOverprovisioning is deprecated by needed for v2
static io.envoyproxy.envoy.api.v2.ClusterLoadAssignment buildClusterLoadAssignmentV2(
String clusterName,
List<io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints> localityLbEndpoints,
List<io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload> dropOverloads) {
return
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.newBuilder()
.setClusterName(clusterName)
.addAllEndpoints(localityLbEndpoints)
.setPolicy(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.newBuilder()
.setDisableOverprovisioning(true)
.addAllDropOverloads(dropOverloads))
.build();
}
static DropOverload buildDropOverload(String category, int dropPerMillion) {
return
DropOverload.newBuilder()
@ -384,49 +341,4 @@ class XdsClientTestHelper {
.setLoadBalancingWeight(UInt32Value.of(loadbalancingWeight))
.build();
}
static UpstreamTlsContext buildUpstreamTlsContext(String secretName, String targetUri) {
GrpcService grpcService =
GrpcService.newBuilder()
.setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(targetUri))
.build();
ConfigSource sdsConfig =
ConfigSource.newBuilder()
.setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService))
.build();
SdsSecretConfig validationContextSdsSecretConfig =
SdsSecretConfig.newBuilder()
.setName(secretName)
.setSdsConfig(sdsConfig)
.build();
return UpstreamTlsContext.newBuilder()
.setCommonTlsContext(
CommonTlsContext.newBuilder()
.setValidationContextSdsSecretConfig(validationContextSdsSecretConfig))
.build();
}
static io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext buildUpstreamTlsContextV2(
String secretName, String targetUri) {
io.envoyproxy.envoy.api.v2.core.GrpcService grpcService =
io.envoyproxy.envoy.api.v2.core.GrpcService.newBuilder()
.setGoogleGrpc(io.envoyproxy.envoy.api.v2.core.GrpcService.GoogleGrpc.newBuilder()
.setTargetUri(targetUri))
.build();
io.envoyproxy.envoy.api.v2.core.ConfigSource sdsConfig =
io.envoyproxy.envoy.api.v2.core.ConfigSource.newBuilder()
.setApiConfigSource(io.envoyproxy.envoy.api.v2.core.ApiConfigSource.newBuilder()
.addGrpcServices(grpcService))
.build();
io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig validationContextSdsSecretConfig =
io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig.newBuilder()
.setName(secretName)
.setSdsConfig(sdsConfig)
.build();
return io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext.newBuilder()
.setCommonTlsContext(
io.envoyproxy.envoy.api.v2.auth.CommonTlsContext.newBuilder()
.setValidationContextSdsSecretConfig(validationContextSdsSecretConfig))
.build();
}
}