mirror of https://github.com/grpc/grpc-java.git
xds: ensure we shutdown XdsClientImpl when XDS Server is shut down (#6890)
This commit is contained in:
parent
1086ee89c1
commit
e68b1d2ed6
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package io.grpc.xds.internal.sds;
|
package io.grpc.xds.internal.sds;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
@ -40,7 +41,7 @@ import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.handler.ssl.SslContext;
|
import io.netty.handler.ssl.SslContext;
|
||||||
import io.netty.util.AsciiString;
|
import io.netty.util.AsciiString;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
|
|
@ -51,7 +52,8 @@ import javax.annotation.Nullable;
|
||||||
* Provides client and server side gRPC {@link ProtocolNegotiator}s that use SDS to provide the SSL
|
* Provides client and server side gRPC {@link ProtocolNegotiator}s that use SDS to provide the SSL
|
||||||
* context.
|
* context.
|
||||||
*/
|
*/
|
||||||
final class SdsProtocolNegotiators {
|
@VisibleForTesting
|
||||||
|
public final class SdsProtocolNegotiators {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(SdsProtocolNegotiators.class.getName());
|
private static final Logger logger = Logger.getLogger(SdsProtocolNegotiators.class.getName());
|
||||||
|
|
||||||
|
|
@ -77,7 +79,14 @@ final class SdsProtocolNegotiators {
|
||||||
public static ProtocolNegotiator serverProtocolNegotiator(
|
public static ProtocolNegotiator serverProtocolNegotiator(
|
||||||
@Nullable DownstreamTlsContext downstreamTlsContext, int port,
|
@Nullable DownstreamTlsContext downstreamTlsContext, int port,
|
||||||
SynchronizationContext syncContext) {
|
SynchronizationContext syncContext) {
|
||||||
return new ServerSdsProtocolNegotiator(downstreamTlsContext, port, syncContext);
|
XdsClientWrapperForServerSds xdsClientWrapperForServerSds =
|
||||||
|
ServerSdsProtocolNegotiator.getXdsClientWrapperForServerSds(port, syncContext);
|
||||||
|
if (xdsClientWrapperForServerSds == null && downstreamTlsContext == null) {
|
||||||
|
logger.log(Level.INFO, "Fallback to plaintext for server at port {0}", port);
|
||||||
|
return InternalProtocolNegotiators.serverPlaintext();
|
||||||
|
} else {
|
||||||
|
return new ServerSdsProtocolNegotiator(downstreamTlsContext, xdsClientWrapperForServerSds);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class ClientSdsProtocolNegotiatorFactory
|
private static final class ClientSdsProtocolNegotiatorFactory
|
||||||
|
|
@ -251,23 +260,32 @@ final class SdsProtocolNegotiators {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class ServerSdsProtocolNegotiator implements ProtocolNegotiator {
|
@VisibleForTesting
|
||||||
|
public static final class ServerSdsProtocolNegotiator implements ProtocolNegotiator {
|
||||||
|
|
||||||
private DownstreamTlsContext downstreamTlsContext;
|
@Nullable private final DownstreamTlsContext downstreamTlsContext;
|
||||||
private final XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
|
@Nullable private final XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
|
||||||
|
|
||||||
ServerSdsProtocolNegotiator(
|
/** Constructor. */
|
||||||
DownstreamTlsContext downstreamTlsContext, int port, SynchronizationContext syncContext) {
|
@VisibleForTesting
|
||||||
|
public ServerSdsProtocolNegotiator(
|
||||||
|
@Nullable DownstreamTlsContext downstreamTlsContext,
|
||||||
|
@Nullable XdsClientWrapperForServerSds xdsClientWrapperForServerSds) {
|
||||||
|
checkArgument(downstreamTlsContext != null || xdsClientWrapperForServerSds != null,
|
||||||
|
"both downstreamTlsContext and xdsClientWrapperForServerSds cannot be null");
|
||||||
this.downstreamTlsContext = downstreamTlsContext;
|
this.downstreamTlsContext = downstreamTlsContext;
|
||||||
XdsClientWrapperForServerSds localXdsClientWrapperForServerSds;
|
this.xdsClientWrapperForServerSds = xdsClientWrapperForServerSds;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static XdsClientWrapperForServerSds getXdsClientWrapperForServerSds(
|
||||||
|
int port, SynchronizationContext syncContext) {
|
||||||
try {
|
try {
|
||||||
localXdsClientWrapperForServerSds =
|
return XdsClientWrapperForServerSds.newInstance(
|
||||||
XdsClientWrapperForServerSds.newInstance(port, Bootstrapper.getInstance(), syncContext);
|
port, Bootstrapper.getInstance(), syncContext);
|
||||||
} catch (Exception e) {
|
} catch (IOException e) {
|
||||||
logger.log(Level.WARNING, "Exception while creating the xDS client", e);
|
logger.log(Level.FINE, "Fallback to plaintext due to exception", e);
|
||||||
localXdsClientWrapperForServerSds = null;
|
return null;
|
||||||
}
|
}
|
||||||
this.xdsClientWrapperForServerSds = localXdsClientWrapperForServerSds;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -282,7 +300,11 @@ final class SdsProtocolNegotiators {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {}
|
public void close() {
|
||||||
|
if (xdsClientWrapperForServerSds != null) {
|
||||||
|
xdsClientWrapperForServerSds.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
@ -294,8 +316,8 @@ final class SdsProtocolNegotiators {
|
||||||
|
|
||||||
HandlerPickerHandler(
|
HandlerPickerHandler(
|
||||||
GrpcHttp2ConnectionHandler grpcHandler,
|
GrpcHttp2ConnectionHandler grpcHandler,
|
||||||
DownstreamTlsContext downstreamTlsContext,
|
@Nullable DownstreamTlsContext downstreamTlsContext,
|
||||||
XdsClientWrapperForServerSds xdsClientWrapperForServerSds) {
|
@Nullable XdsClientWrapperForServerSds xdsClientWrapperForServerSds) {
|
||||||
checkNotNull(grpcHandler, "grpcHandler");
|
checkNotNull(grpcHandler, "grpcHandler");
|
||||||
this.grpcHandler = grpcHandler;
|
this.grpcHandler = grpcHandler;
|
||||||
this.downstreamTlsContextFromBuilder = downstreamTlsContext;
|
this.downstreamTlsContextFromBuilder = downstreamTlsContext;
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package io.grpc.xds.internal.sds;
|
package io.grpc.xds.internal.sds;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
|
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
|
||||||
import io.grpc.BindableService;
|
import io.grpc.BindableService;
|
||||||
import io.grpc.CompressorRegistry;
|
import io.grpc.CompressorRegistry;
|
||||||
|
|
@ -29,8 +30,8 @@ import io.grpc.ServerServiceDefinition;
|
||||||
import io.grpc.ServerStreamTracer;
|
import io.grpc.ServerStreamTracer;
|
||||||
import io.grpc.ServerTransportFilter;
|
import io.grpc.ServerTransportFilter;
|
||||||
import io.grpc.SynchronizationContext;
|
import io.grpc.SynchronizationContext;
|
||||||
|
import io.grpc.netty.InternalProtocolNegotiator;
|
||||||
import io.grpc.netty.NettyServerBuilder;
|
import io.grpc.netty.NettyServerBuilder;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
@ -171,9 +172,20 @@ public final class XdsServerBuilder extends ServerBuilder<XdsServerBuilder> {
|
||||||
panicMode = true;
|
panicMode = true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
delegate.protocolNegotiator(
|
InternalProtocolNegotiator.ProtocolNegotiator serverProtocolNegotiator =
|
||||||
SdsProtocolNegotiators.serverProtocolNegotiator(
|
SdsProtocolNegotiators.serverProtocolNegotiator(
|
||||||
this.downstreamTlsContext, port, syncContext));
|
this.downstreamTlsContext, port, syncContext);
|
||||||
|
return buildServer(serverProtocolNegotiator);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a Server using the given serverSdsProtocolNegotiator: gets the
|
||||||
|
* getXdsClientWrapperForServerSds from the serverSdsProtocolNegotiator.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public Server buildServer(
|
||||||
|
InternalProtocolNegotiator.ProtocolNegotiator serverProtocolNegotiator) {
|
||||||
|
delegate.protocolNegotiator(serverProtocolNegotiator);
|
||||||
return delegate.build();
|
return delegate.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,61 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2019 The gRPC Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc.xds;
|
||||||
|
|
||||||
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import io.grpc.Server;
|
||||||
|
import io.grpc.xds.internal.sds.SdsProtocolNegotiators.ServerSdsProtocolNegotiator;
|
||||||
|
import io.grpc.xds.internal.sds.XdsServerBuilder;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for {@link XdsServerBuilder}.
|
||||||
|
*/
|
||||||
|
@RunWith(JUnit4.class)
|
||||||
|
public class XdsServerBuilderTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void buildsXdsServerBuilder() {
|
||||||
|
XdsServerBuilder builder = XdsServerBuilder.forPort(8080);
|
||||||
|
assertThat(builder).isInstanceOf(XdsServerBuilder.class);
|
||||||
|
Server server = builder.build();
|
||||||
|
assertThat(server).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void xdsServer_callsShutdown() throws IOException, InterruptedException {
|
||||||
|
XdsServerBuilder builder = XdsServerBuilder.forPort(8080);
|
||||||
|
XdsClient mockXdsClient = mock(XdsClient.class);
|
||||||
|
XdsClientWrapperForServerSds xdsClientWrapperForServerSds =
|
||||||
|
new XdsClientWrapperForServerSds(8080, mockXdsClient, null);
|
||||||
|
ServerSdsProtocolNegotiator serverSdsProtocolNegotiator =
|
||||||
|
new ServerSdsProtocolNegotiator(null, xdsClientWrapperForServerSds);
|
||||||
|
Server xdsServer = builder.buildServer(serverSdsProtocolNegotiator);
|
||||||
|
xdsServer.start();
|
||||||
|
xdsServer.shutdown();
|
||||||
|
xdsServer.awaitTermination(500L, TimeUnit.MILLISECONDS);
|
||||||
|
verify(mockXdsClient, times(1)).shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -30,6 +30,7 @@ import io.envoyproxy.envoy.api.v2.core.DataSource;
|
||||||
import io.grpc.internal.testing.TestUtils;
|
import io.grpc.internal.testing.TestUtils;
|
||||||
import io.grpc.netty.GrpcHttp2ConnectionHandler;
|
import io.grpc.netty.GrpcHttp2ConnectionHandler;
|
||||||
import io.grpc.netty.InternalProtocolNegotiationEvent;
|
import io.grpc.netty.InternalProtocolNegotiationEvent;
|
||||||
|
import io.grpc.netty.InternalProtocolNegotiator;
|
||||||
import io.grpc.xds.internal.sds.SdsProtocolNegotiators.ClientSdsHandler;
|
import io.grpc.xds.internal.sds.SdsProtocolNegotiators.ClientSdsHandler;
|
||||||
import io.grpc.xds.internal.sds.SdsProtocolNegotiators.ClientSdsProtocolNegotiator;
|
import io.grpc.xds.internal.sds.SdsProtocolNegotiators.ClientSdsProtocolNegotiator;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
|
|
@ -257,6 +258,14 @@ public class SdsProtocolNegotiatorsTest {
|
||||||
assertTrue(channel.isOpen());
|
assertTrue(channel.isOpen());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void serverSdsProtocolNegotiator_passNulls_expectPlaintext() {
|
||||||
|
InternalProtocolNegotiator.ProtocolNegotiator protocolNegotiator =
|
||||||
|
SdsProtocolNegotiators.serverProtocolNegotiator(null, 7000,
|
||||||
|
null);
|
||||||
|
assertThat(protocolNegotiator.scheme().toString()).isEqualTo("http");
|
||||||
|
}
|
||||||
|
|
||||||
private static final class FakeGrpcHttp2ConnectionHandler extends GrpcHttp2ConnectionHandler {
|
private static final class FakeGrpcHttp2ConnectionHandler extends GrpcHttp2ConnectionHandler {
|
||||||
|
|
||||||
FakeGrpcHttp2ConnectionHandler(
|
FakeGrpcHttp2ConnectionHandler(
|
||||||
|
|
|
||||||
|
|
@ -1,39 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 The gRPC Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.grpc.xds.internal.sds;
|
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
|
||||||
|
|
||||||
import io.grpc.Server;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.JUnit4;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unit tests for {@link XdsChannelBuilder}.
|
|
||||||
*/
|
|
||||||
@RunWith(JUnit4.class)
|
|
||||||
public class XdsServerBuilderTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void buildsXdsServerBuilder() {
|
|
||||||
XdsServerBuilder builder = XdsServerBuilder.forPort(8080);
|
|
||||||
assertThat(builder).isInstanceOf(XdsServerBuilder.class);
|
|
||||||
Server server = builder.build();
|
|
||||||
assertThat(server).isNotNull();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue