mirror of https://github.com/grpc/grpc-java.git
xds: fix xdsClient resource not exist for invalid resource, fix xdsServerWrapper start on resource not exist (#8660)
Fix bugs: 1. Invalid resource at xdsClient, the watcher should have been delivered an error instead of resource not found. 2. If the resource is properly determined to not exist, it shouldn't cause start() to fail. From A36 xDS for Servers: "XdsServer's start must not fail due to transient xDS issues, like missing xDS configuration from the xDS server."
This commit is contained in:
parent
ab7f867a4a
commit
0b0079c8a1
|
|
@ -2209,13 +2209,15 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
|
||||||
}
|
}
|
||||||
retainedResources.add(edsName);
|
retainedResources.add(edsName);
|
||||||
}
|
}
|
||||||
continue;
|
} else if (invalidResources.contains(resourceName)) {
|
||||||
}
|
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
|
||||||
|
} else {
|
||||||
// For State of the World services, notify watchers when their watched resource is missing
|
// For State of the World services, notify watchers when their watched resource is missing
|
||||||
// from the ADS update.
|
// from the ADS update.
|
||||||
subscriber.onAbsent();
|
subscriber.onAbsent();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
|
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
|
||||||
// LDS/CDS resources should be deleted.
|
// LDS/CDS resources should be deleted.
|
||||||
if (type == ResourceType.LDS || type == ResourceType.CDS) {
|
if (type == ResourceType.LDS || type == ResourceType.CDS) {
|
||||||
|
|
|
||||||
|
|
@ -571,10 +571,6 @@ final class XdsServerWrapper extends Server {
|
||||||
for (SslContextProviderSupplier s: toRelease) {
|
for (SslContextProviderSupplier s: toRelease) {
|
||||||
s.close();
|
s.close();
|
||||||
}
|
}
|
||||||
if (!initialStarted) {
|
|
||||||
initialStarted = true;
|
|
||||||
initialStartFuture.set(exception);
|
|
||||||
}
|
|
||||||
if (restartTimer != null) {
|
if (restartTimer != null) {
|
||||||
restartTimer.cancel();
|
restartTimer.cancel();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1563,12 +1563,16 @@ public abstract class ClientXdsClientTestBase {
|
||||||
call.sendResponse(CDS, clusters, VERSION_1, "0000");
|
call.sendResponse(CDS, clusters, VERSION_1, "0000");
|
||||||
|
|
||||||
// The response NACKed with errors indicating indices of the failed resources.
|
// The response NACKed with errors indicating indices of the failed resources.
|
||||||
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(
|
String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: "
|
||||||
"CDS response Cluster 'cluster.googleapis.com' validation error: "
|
|
||||||
+ "Cluster cluster.googleapis.com: malformed UpstreamTlsContext: "
|
+ "Cluster cluster.googleapis.com: malformed UpstreamTlsContext: "
|
||||||
+ "io.grpc.xds.ClientXdsClient$ResourceInvalidException: "
|
+ "io.grpc.xds.ClientXdsClient$ResourceInvalidException: "
|
||||||
+ "ca_certificate_provider_instance is required in upstream-tls-context"));
|
+ "ca_certificate_provider_instance is required in upstream-tls-context";
|
||||||
verifyNoInteractions(cdsResourceWatcher);
|
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
|
||||||
|
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
||||||
|
verify(cdsResourceWatcher).onError(captor.capture());
|
||||||
|
Status errorStatus = captor.getValue();
|
||||||
|
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
|
||||||
|
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -1587,10 +1591,14 @@ public abstract class ClientXdsClientTestBase {
|
||||||
call.sendResponse(CDS, clusters, VERSION_1, "0000");
|
call.sendResponse(CDS, clusters, VERSION_1, "0000");
|
||||||
|
|
||||||
// The response NACKed with errors indicating indices of the failed resources.
|
// The response NACKed with errors indicating indices of the failed resources.
|
||||||
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(
|
String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: "
|
||||||
"CDS response Cluster 'cluster.googleapis.com' validation error: "
|
+ "transport-socket with name envoy.transport_sockets.bad not supported.";
|
||||||
+ "transport-socket with name envoy.transport_sockets.bad not supported."));
|
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
|
||||||
verifyNoInteractions(cdsResourceWatcher);
|
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
||||||
|
verify(cdsResourceWatcher).onError(captor.capture());
|
||||||
|
Status errorStatus = captor.getValue();
|
||||||
|
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
|
||||||
|
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -2438,10 +2446,15 @@ public abstract class ClientXdsClientTestBase {
|
||||||
List<Any> listeners = ImmutableList.of(Any.pack(listener));
|
List<Any> listeners = ImmutableList.of(Any.pack(listener));
|
||||||
call.sendResponse(ResourceType.LDS, listeners, "0", "0000");
|
call.sendResponse(ResourceType.LDS, listeners, "0", "0000");
|
||||||
// The response NACKed with errors indicating indices of the failed resources.
|
// The response NACKed with errors indicating indices of the failed resources.
|
||||||
call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(
|
String errorMsg = "LDS response Listener \'grpc/server?xds.resource.listening_address="
|
||||||
"LDS response Listener \'grpc/server?xds.resource.listening_address=0.0.0.0:7000\' "
|
+ "0.0.0.0:7000\' validation error: "
|
||||||
+ "validation error: common-tls-context is required in downstream-tls-context"));
|
+ "common-tls-context is required in downstream-tls-context";
|
||||||
verifyNoInteractions(ldsResourceWatcher);
|
call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
|
||||||
|
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
||||||
|
verify(ldsResourceWatcher).onError(captor.capture());
|
||||||
|
Status errorStatus = captor.getValue();
|
||||||
|
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
|
||||||
|
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -2462,11 +2475,16 @@ public abstract class ClientXdsClientTestBase {
|
||||||
List<Any> listeners = ImmutableList.of(Any.pack(listener));
|
List<Any> listeners = ImmutableList.of(Any.pack(listener));
|
||||||
call.sendResponse(ResourceType.LDS, listeners, "0", "0000");
|
call.sendResponse(ResourceType.LDS, listeners, "0", "0000");
|
||||||
// The response NACKed with errors indicating indices of the failed resources.
|
// The response NACKed with errors indicating indices of the failed resources.
|
||||||
|
String errorMsg = "LDS response Listener \'grpc/server?xds.resource.listening_address="
|
||||||
|
+ "0.0.0.0:7000\' validation error: "
|
||||||
|
+ "transport-socket with name envoy.transport_sockets.bad1 not supported.";
|
||||||
call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(
|
call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(
|
||||||
"LDS response Listener \'grpc/server?xds.resource.listening_address=0.0.0.0:7000\' "
|
errorMsg));
|
||||||
+ "validation error: "
|
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
||||||
+ "transport-socket with name envoy.transport_sockets.bad1 not supported."));
|
verify(ldsResourceWatcher).onError(captor.capture());
|
||||||
verifyNoInteractions(ldsResourceWatcher);
|
Status errorStatus = captor.getValue();
|
||||||
|
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
|
||||||
|
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DiscoveryRpcCall startResourceWatcher(
|
private DiscoveryRpcCall startResourceWatcher(
|
||||||
|
|
|
||||||
|
|
@ -63,15 +63,15 @@ import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
|
||||||
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
|
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
|
||||||
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
|
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
|
||||||
import io.netty.handler.codec.http2.Http2Settings;
|
import io.netty.handler.codec.http2.Http2Settings;
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
@ -190,8 +190,8 @@ public class XdsClientWrapperForServerSdsTestMisc {
|
||||||
try {
|
try {
|
||||||
start.get(5, TimeUnit.SECONDS);
|
start.get(5, TimeUnit.SECONDS);
|
||||||
fail("Start should throw exception");
|
fail("Start should throw exception");
|
||||||
} catch (ExecutionException ex) {
|
} catch (TimeoutException ex) {
|
||||||
assertThat(ex.getCause()).isInstanceOf(IOException.class);
|
assertThat(start.isDone()).isFalse();
|
||||||
}
|
}
|
||||||
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
|
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
|
||||||
}
|
}
|
||||||
|
|
@ -214,8 +214,8 @@ public class XdsClientWrapperForServerSdsTestMisc {
|
||||||
try {
|
try {
|
||||||
start.get(5, TimeUnit.SECONDS);
|
start.get(5, TimeUnit.SECONDS);
|
||||||
fail("Start should throw exception");
|
fail("Start should throw exception");
|
||||||
} catch (ExecutionException ex) {
|
} catch (TimeoutException ex) {
|
||||||
assertThat(ex.getCause()).isInstanceOf(IOException.class);
|
assertThat(start.isDone()).isFalse();
|
||||||
}
|
}
|
||||||
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
|
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
|
||||||
}
|
}
|
||||||
|
|
@ -238,8 +238,8 @@ public class XdsClientWrapperForServerSdsTestMisc {
|
||||||
try {
|
try {
|
||||||
start.get(5, TimeUnit.SECONDS);
|
start.get(5, TimeUnit.SECONDS);
|
||||||
fail("Start should throw exception");
|
fail("Start should throw exception");
|
||||||
} catch (ExecutionException ex) {
|
} catch (TimeoutException ex) {
|
||||||
assertThat(ex.getCause()).isInstanceOf(IOException.class);
|
assertThat(start.isDone()).isFalse();
|
||||||
}
|
}
|
||||||
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
|
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
|
|
@ -261,9 +262,10 @@ public class XdsServerWrapperTest {
|
||||||
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
|
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
|
||||||
try {
|
try {
|
||||||
start.get(5000, TimeUnit.MILLISECONDS);
|
start.get(5000, TimeUnit.MILLISECONDS);
|
||||||
fail("Start should throw exception");
|
fail("server should not start() successfully.");
|
||||||
} catch (ExecutionException ex) {
|
} catch (TimeoutException ex) {
|
||||||
assertThat(ex.getCause()).isInstanceOf(IOException.class);
|
// expect to block here.
|
||||||
|
assertThat(start.isDone()).isFalse();
|
||||||
}
|
}
|
||||||
verify(mockBuilder, times(1)).build();
|
verify(mockBuilder, times(1)).build();
|
||||||
verify(mockServer, never()).start();
|
verify(mockServer, never()).start();
|
||||||
|
|
@ -602,9 +604,10 @@ public class XdsServerWrapperTest {
|
||||||
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
|
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
|
||||||
try {
|
try {
|
||||||
start.get(5000, TimeUnit.MILLISECONDS);
|
start.get(5000, TimeUnit.MILLISECONDS);
|
||||||
fail("Start should throw exception");
|
fail("server should not start()");
|
||||||
} catch (ExecutionException ex) {
|
} catch (TimeoutException ex) {
|
||||||
assertThat(ex.getCause()).isInstanceOf(IOException.class);
|
// expect to block here.
|
||||||
|
assertThat(start.isDone()).isFalse();
|
||||||
}
|
}
|
||||||
verify(listener, times(1)).onNotServing(any(StatusException.class));
|
verify(listener, times(1)).onNotServing(any(StatusException.class));
|
||||||
verify(mockBuilder, times(1)).build();
|
verify(mockBuilder, times(1)).build();
|
||||||
|
|
@ -627,6 +630,13 @@ public class XdsServerWrapperTest {
|
||||||
assertThat(sslSupplier0.isShutdown()).isTrue();
|
assertThat(sslSupplier0.isShutdown()).isTrue();
|
||||||
xdsClient.deliverRdsUpdate("rds",
|
xdsClient.deliverRdsUpdate("rds",
|
||||||
Collections.singletonList(createVirtualHost("virtual-host-1")));
|
Collections.singletonList(createVirtualHost("virtual-host-1")));
|
||||||
|
try {
|
||||||
|
start.get(5000, TimeUnit.MILLISECONDS);
|
||||||
|
fail("Start should throw exception");
|
||||||
|
} catch (ExecutionException ex) {
|
||||||
|
assertThat(ex.getCause()).isInstanceOf(IOException.class);
|
||||||
|
assertThat(ex.getCause().getMessage()).isEqualTo("error!");
|
||||||
|
}
|
||||||
RdsResourceWatcher saveRdsWatcher = xdsClient.rdsWatchers.get("rds");
|
RdsResourceWatcher saveRdsWatcher = xdsClient.rdsWatchers.get("rds");
|
||||||
assertThat(executor.forwardNanos(RETRY_DELAY_NANOS)).isEqualTo(1);
|
assertThat(executor.forwardNanos(RETRY_DELAY_NANOS)).isEqualTo(1);
|
||||||
verify(mockBuilder, times(1)).build();
|
verify(mockBuilder, times(1)).build();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue