xds: promote XdsClientImpl2 (#7484)

Replace the old XdsClient implementation with the new one that supports watching multiple LDS/RDS resources separately.
This commit is contained in:
Chengyuan Zhang 2020-10-08 00:57:26 -07:00 committed by GitHub
parent 460ca75684
commit 18e7e2ddca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 42 additions and 5764 deletions

File diff suppressed because it is too large Load Diff

View File

@ -983,7 +983,8 @@ final class XdsClientImpl2 extends XdsClient {
}
}
private String typeUrlV2() {
@VisibleForTesting
String typeUrlV2() {
switch (this) {
case LDS:
return ADS_TYPE_URL_LDS_V2;

View File

@ -57,7 +57,7 @@ import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Serves as a wrapper for {@link XdsClientImpl} used on the server side by {@link
* Serves as a wrapper for {@link XdsClient} used on the server side by {@link
* io.grpc.xds.internal.sds.XdsServerBuilder}.
*/
@Internal
@ -135,8 +135,8 @@ public final class XdsClientWrapperForServerSds {
}
Node node = bootstrapInfo.getNode();
timeService = SharedResourceHolder.get(timeServiceResource);
XdsClientImpl xdsClientImpl =
new XdsClientImpl(
XdsClientImpl2 xdsClientImpl =
new XdsClientImpl2(
"",
channel,
node,

File diff suppressed because it is too large Load Diff

View File

@ -66,10 +66,10 @@ import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.EnvoyProtoData.Address;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ConfigWatcher;
import io.grpc.xds.XdsClient.ListenerUpdate;
import io.grpc.xds.XdsClient.ListenerWatcher;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClientImpl2.ResourceType;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import java.io.IOException;
import java.util.ArrayDeque;
@ -94,7 +94,7 @@ import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/**
* Tests for {@link XdsClientImpl for server side Listeners}.
* Tests for {@link XdsClientImpl2 for server side Listeners}.
*/
@RunWith(JUnit4.class)
public class XdsClientImplTestForListener {
@ -111,7 +111,7 @@ public class XdsClientImplTestForListener {
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(XdsClientImpl.RpcRetryTask.class.getSimpleName());
return command.toString().contains(XdsClientImpl2.RpcRetryTask.class.getSimpleName());
}
};
private static final TaskFilter LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER =
@ -119,7 +119,7 @@ public class XdsClientImplTestForListener {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString()
.contains(XdsClientImpl.ListenerResourceFetchTimeoutTask.class.getSimpleName());
.contains(XdsClientImpl2.ListenerResourceFetchTimeoutTask.class.getSimpleName());
}
};
private static final String LISTENER_NAME = "INBOUND_LISTENER";
@ -149,12 +149,10 @@ public class XdsClientImplTestForListener {
@Mock
private BackoffPolicy backoffPolicy2;
@Mock
private ConfigWatcher configWatcher;
@Mock
private ListenerWatcher listenerWatcher;
private ManagedChannel channel;
private XdsClientImpl xdsClient;
private XdsClientImpl2 xdsClient;
@Before
public void setUp() throws IOException {
@ -198,7 +196,7 @@ public class XdsClientImplTestForListener {
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
xdsClient =
new XdsClientImpl("", new XdsChannel(channel, /* useProtocolV3= */ false), NODE,
new XdsClientImpl2("", new XdsChannel(channel, /* useProtocolV3= */ false), NODE,
syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
fakeClock.getStopwatchSupplier());
// Only the connection to management server is established, no RPC request is sent until at
@ -238,34 +236,6 @@ public class XdsClientImplTestForListener {
.build();
}
/** Error when ConfigWatcher and then ListenerWatcher registered. */
@Test
public void ldsResponse_configAndListenerWatcher_expectError() {
xdsClient.watchConfigData("somehost:80", configWatcher);
try {
xdsClient.watchListenerData(PORT, listenerWatcher);
fail("expected exception");
} catch (IllegalStateException expected) {
assertThat(expected)
.hasMessageThat()
.isEqualTo("ListenerWatcher cannot be set when ConfigWatcher set");
}
}
/** Error when ListenerWatcher and then ConfigWatcher registered. */
@Test
public void ldsResponse_listenerAndConfigWatcher_expectError() {
xdsClient.watchListenerData(PORT, listenerWatcher);
try {
xdsClient.watchConfigData("somehost:80", configWatcher);
fail("expected exception");
} catch (IllegalStateException expected) {
assertThat(expected)
.hasMessageThat()
.isEqualTo("ListenerWatcher already registered");
}
}
/** Error when 2 ListenerWatchers registered. */
@Test
public void ldsResponse_2listenerWatchers_expectError() {
@ -292,7 +262,7 @@ public class XdsClientImplTestForListener {
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
List<Any> listeners = ImmutableList.of(
@ -315,18 +285,18 @@ public class XdsClientImplTestForListener {
"cluster-baz.googleapis.com"))))
.build()))));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000");
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000")));
ResourceType.LDS.typeUrlV2(), "0000")));
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
@ -341,7 +311,7 @@ public class XdsClientImplTestForListener {
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
@ -367,18 +337,18 @@ public class XdsClientImplTestForListener {
filterChainInbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000");
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000")));
ResourceType.LDS.typeUrlV2(), "0000")));
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
@ -393,7 +363,7 @@ public class XdsClientImplTestForListener {
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
@ -419,13 +389,13 @@ public class XdsClientImplTestForListener {
filterChainInbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000");
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000")));
ResourceType.LDS.typeUrlV2(), "0000")));
ArgumentCaptor<ListenerUpdate> listenerUpdateCaptor = ArgumentCaptor.forClass(null);
verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture());
@ -468,7 +438,7 @@ public class XdsClientImplTestForListener {
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
@ -494,13 +464,13 @@ public class XdsClientImplTestForListener {
filterChainInbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000");
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000")));
ResourceType.LDS.typeUrlV2(), "0000")));
ArgumentCaptor<ListenerUpdate> listenerUpdateCaptor = ArgumentCaptor.forClass(null);
verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture());
@ -517,13 +487,13 @@ public class XdsClientImplTestForListener {
filterChainNewInbound
)));
DiscoveryResponse response1 =
buildDiscoveryResponseV2("1", listeners1, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0001");
buildDiscoveryResponseV2("1", listeners1, ResourceType.LDS.typeUrlV2(), "0001");
responseObserver.onNext(response1);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0001")));
ResourceType.LDS.typeUrlV2(), "0001")));
// Updated listener is notified to config watcher.
listenerUpdateCaptor = ArgumentCaptor.forClass(null);
@ -564,7 +534,7 @@ public class XdsClientImplTestForListener {
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null);
@ -590,13 +560,13 @@ public class XdsClientImplTestForListener {
filterChainOutbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000");
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000")));
ResourceType.LDS.typeUrlV2(), "0000")));
verify(listenerWatcher, never()).onError(any(Status.class));
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
@ -612,7 +582,7 @@ public class XdsClientImplTestForListener {
// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null);
@ -639,18 +609,18 @@ public class XdsClientImplTestForListener {
filterChainOutbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000");
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sends an ACK LDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000")));
ResourceType.LDS.typeUrlV2(), "0000")));
verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
@ -674,7 +644,7 @@ public class XdsClientImplTestForListener {
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
@ -690,7 +660,7 @@ public class XdsClientImplTestForListener {
filterChainInbound
)));
DiscoveryResponse response =
buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000");
buildDiscoveryResponseV2("0", listeners, ResourceType.LDS.typeUrlV2(), "0000");
responseObserver.onNext(response);
// Client sent an ACK CDS request (Omitted).
@ -713,7 +683,7 @@ public class XdsClientImplTestForListener {
// Retry resumes requests for all wanted resources.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -732,7 +702,7 @@ public class XdsClientImplTestForListener {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -751,11 +721,11 @@ public class XdsClientImplTestForListener {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
// Management server sends back a LDS response.
response = buildDiscoveryResponseV2("1", listeners,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0001");
ResourceType.LDS.typeUrlV2(), "0001");
responseObserver.onNext(response);
// Client sent an LDS ACK request (Omitted).
@ -774,7 +744,7 @@ public class XdsClientImplTestForListener {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -792,7 +762,7 @@ public class XdsClientImplTestForListener {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);