xds: multiple changes needed for PSM security GA as discussed (#7777)

* xds: multiple changes needed for GA:
- check to allow XdsServerBuilder.build() only once
- add transportBuilder() to XdsServerBuilder
- remove "grpc/server" hardcoding
- reorder the shutdown of delegate and xdsClient as per new design
This commit is contained in:
sanjaypujare 2021-01-06 16:44:34 -08:00 committed by GitHub
parent 43d2e53a2a
commit f788eec9e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 5 deletions

View File

@ -76,7 +76,7 @@ final class ServerXdsClient extends AbstractXdsClient {
super(channel, useProtocolV3, node, timeService, backoffPolicyProvider, stopwatchSupplier); super(channel, useProtocolV3, node, timeService, backoffPolicyProvider, stopwatchSupplier);
this.useNewApiForListenerQuery = useProtocolV3 && useNewApiForListenerQuery; this.useNewApiForListenerQuery = useProtocolV3 && useNewApiForListenerQuery;
this.instanceIp = (instanceIp != null ? instanceIp : "0.0.0.0"); this.instanceIp = (instanceIp != null ? instanceIp : "0.0.0.0");
this.grpcServerResourceId = grpcServerResourceId != null ? grpcServerResourceId : "grpc/server"; this.grpcServerResourceId = grpcServerResourceId;
} }
@Override @Override

View File

@ -114,6 +114,12 @@ public final class XdsClientWrapperForServerSds {
.keepAliveTime(5, TimeUnit.MINUTES).build(); .keepAliveTime(5, TimeUnit.MINUTES).build();
timeService = SharedResourceHolder.get(timeServiceResource); timeService = SharedResourceHolder.get(timeServiceResource);
newServerApi = serverInfo.isUseProtocolV3() && experimentalNewServerApiEnvVar; newServerApi = serverInfo.isUseProtocolV3() && experimentalNewServerApiEnvVar;
String grpcServerResourceId = bootstrapInfo.getGrpcServerResourceId();
if (newServerApi && grpcServerResourceId == null) {
reportError(
Status.INVALID_ARGUMENT.withDescription("missing grpc_server_resource_name_id value"));
throw new IOException("missing grpc_server_resource_name_id value");
}
XdsClient xdsClientImpl = XdsClient xdsClientImpl =
new ServerXdsClient( new ServerXdsClient(
channel, channel,
@ -124,7 +130,7 @@ public final class XdsClientWrapperForServerSds {
GrpcUtil.STOPWATCH_SUPPLIER, GrpcUtil.STOPWATCH_SUPPLIER,
experimentalNewServerApiEnvVar, experimentalNewServerApiEnvVar,
"0.0.0.0", "0.0.0.0",
bootstrapInfo.getGrpcServerResourceId()); grpcServerResourceId);
start(xdsClientImpl); start(xdsClientImpl);
} }

View File

@ -16,8 +16,11 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerBuilder; import io.grpc.ForwardingServerBuilder;
import io.grpc.Internal; import io.grpc.Internal;
import io.grpc.Server; import io.grpc.Server;
@ -28,16 +31,19 @@ import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import io.grpc.xds.internal.sds.SdsProtocolNegotiators; import io.grpc.xds.internal.sds.SdsProtocolNegotiators;
import io.grpc.xds.internal.sds.ServerWrapperForXds; import io.grpc.xds.internal.sds.ServerWrapperForXds;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* A version of {@link ServerBuilder} to create xDS managed servers that will use SDS to set up SSL * A version of {@link ServerBuilder} to create xDS managed servers that will use SDS to set up SSL
* with peers. Note, this is not ready to use yet. * with peers. Note, this is not ready to use yet.
*/ */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/7514")
public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBuilder> { public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBuilder> {
private final NettyServerBuilder delegate; private final NettyServerBuilder delegate;
private final int port; private final int port;
private ErrorNotifier errorNotifier; private ErrorNotifier errorNotifier;
private AtomicBoolean isServerBuilt = new AtomicBoolean(false);
private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) { private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) {
this.delegate = nettyDelegate; this.delegate = nettyDelegate;
@ -81,12 +87,17 @@ public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBui
@VisibleForTesting @VisibleForTesting
ServerWrapperForXds buildServer( ServerWrapperForXds buildServer(
XdsClientWrapperForServerSds xdsClient) { XdsClientWrapperForServerSds xdsClient) {
checkState(isServerBuilt.compareAndSet(false, true), "Server already built!");
InternalNettyServerBuilder.eagAttributes(delegate, Attributes.newBuilder() InternalNettyServerBuilder.eagAttributes(delegate, Attributes.newBuilder()
.set(SdsProtocolNegotiators.SERVER_XDS_CLIENT, xdsClient) .set(SdsProtocolNegotiators.SERVER_XDS_CLIENT, xdsClient)
.build()); .build());
return new ServerWrapperForXds(delegate.build(), xdsClient, errorNotifier); return new ServerWrapperForXds(delegate.build(), xdsClient, errorNotifier);
} }
public ServerBuilder<?> transportBuilder() {
return delegate;
}
/** Watcher to receive error notifications from xDS control plane during {@code start()}. */ /** Watcher to receive error notifications from xDS control plane during {@code start()}. */
public interface ErrorNotifier { public interface ErrorNotifier {

View File

@ -113,15 +113,15 @@ public final class ServerWrapperForXds extends Server {
@Override @Override
public Server shutdown() { public Server shutdown() {
xdsClientWrapperForServerSds.shutdown();
delegate.shutdown(); delegate.shutdown();
xdsClientWrapperForServerSds.shutdown();
return this; return this;
} }
@Override @Override
public Server shutdownNow() { public Server shutdownNow() {
xdsClientWrapperForServerSds.shutdown();
delegate.shutdownNow(); delegate.shutdownNow();
xdsClientWrapperForServerSds.shutdown();
return this; return this;
} }

View File

@ -61,7 +61,7 @@ public class XdsServerBuilderTest {
private int port; private int port;
private XdsClientWrapperForServerSds xdsClientWrapperForServerSds; private XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
private void buildServer( private XdsServerBuilder buildServer(
XdsServerBuilder.ErrorNotifier errorNotifier, boolean injectMockXdsClient) XdsServerBuilder.ErrorNotifier errorNotifier, boolean injectMockXdsClient)
throws IOException { throws IOException {
port = XdsServerTestHelper.findFreePort(); port = XdsServerTestHelper.findFreePort();
@ -78,6 +78,7 @@ public class XdsServerBuilderTest {
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, mockXdsClient, port); XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, mockXdsClient, port);
} }
xdsServer = cleanupRule.register(builder.buildServer(xdsClientWrapperForServerSds)); xdsServer = cleanupRule.register(builder.buildServer(xdsClientWrapperForServerSds));
return builder;
} }
private void verifyServer( private void verifyServer(
@ -273,4 +274,15 @@ public class XdsServerBuilderTest {
verifyShutdown(); verifyShutdown();
} }
@Test
public void xdsServer_2ndBuild_expectException() throws IOException {
XdsServerBuilder.ErrorNotifier mockErrorNotifier = mock(XdsServerBuilder.ErrorNotifier.class);
XdsServerBuilder builder = buildServer(mockErrorNotifier, true);
try {
builder.build();
fail("exception expected");
} catch (IllegalStateException expected) {
assertThat(expected).hasMessageThat().contains("Server already built!");
}
}
} }