alts: convert handshaker service channel to SharedResourceHolder (#4802)

alts: convert handshaker service channel to SharedResourceHolder
This commit is contained in:
Jiangtao Li 2018-08-29 10:09:09 -07:00 committed by GitHub
parent 87513d8e83
commit 433ac00de4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 39 deletions

View File

@ -37,7 +37,9 @@ import io.grpc.alts.internal.RpcProtocolVersionsUtil;
import io.grpc.alts.internal.TsiHandshaker;
import io.grpc.alts.internal.TsiHandshakerFactory;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ProxyParameters;
import io.grpc.internal.SharedResourcePool;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyChannelBuilder.TransportCreationParamsFilter;
import io.grpc.netty.InternalNettyChannelBuilder.TransportCreationParamsFilterFactory;
@ -60,6 +62,8 @@ public final class AltsChannelBuilder extends ForwardingChannelBuilder<AltsChann
private final NettyChannelBuilder delegate;
private final AltsClientOptions.Builder handshakerOptionsBuilder =
new AltsClientOptions.Builder();
private ObjectPool<ManagedChannel> handshakerChannelPool =
SharedResourcePool.forResource(HandshakerServiceChannel.SHARED_HANDSHAKER_CHANNEL);
private TcpfFactory tcpfFactoryForTest;
private boolean enableUntrustedAlts;
@ -109,7 +113,10 @@ public final class AltsChannelBuilder extends ForwardingChannelBuilder<AltsChann
/** Sets a new handshaker service address for testing. */
public AltsChannelBuilder setHandshakerAddressForTesting(String handshakerAddress) {
HandshakerServiceChannel.setHandshakerAddressForTesting(handshakerAddress);
// Instead of using the default shared channel to the handshaker service, create a fix object
// pool of handshaker service channel for testing.
handshakerChannelPool =
HandshakerServiceChannel.getHandshakerChannelPoolForTesting(handshakerAddress);
return this;
}
@ -164,9 +171,11 @@ public final class AltsChannelBuilder extends ForwardingChannelBuilder<AltsChann
@Override
public TsiHandshaker newHandshaker() {
// Used the shared grpc channel to connecting to the ALTS handshaker service.
ManagedChannel channel = HandshakerServiceChannel.get();
// TODO: Release the channel if it is not used.
// https://github.com/grpc/grpc-java/issues/4755.
return AltsTsiHandshaker.newClient(
HandshakerServiceGrpc.newStub(channel), handshakerOptions);
HandshakerServiceGrpc.newStub(handshakerChannelPool.getObject()),
handshakerOptions);
}
};

View File

@ -21,6 +21,7 @@ import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.HandlerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
@ -39,6 +40,8 @@ import io.grpc.alts.internal.HandshakerServiceGrpc;
import io.grpc.alts.internal.RpcProtocolVersionsUtil;
import io.grpc.alts.internal.TsiHandshaker;
import io.grpc.alts.internal.TsiHandshakerFactory;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.netty.NettyServerBuilder;
import java.io.File;
import java.net.InetSocketAddress;
@ -56,6 +59,8 @@ public final class AltsServerBuilder extends ServerBuilder<AltsServerBuilder> {
private static final Logger logger = Logger.getLogger(AltsServerBuilder.class.getName());
private final NettyServerBuilder delegate;
private ObjectPool<ManagedChannel> handshakerChannelPool =
SharedResourcePool.forResource(HandshakerServiceChannel.SHARED_HANDSHAKER_CHANNEL);
private boolean enableUntrustedAlts;
private AltsServerBuilder(NettyServerBuilder nettyDelegate) {
@ -85,7 +90,10 @@ public final class AltsServerBuilder extends ServerBuilder<AltsServerBuilder> {
/** Sets a new handshaker service address for testing. */
public AltsServerBuilder setHandshakerAddressForTesting(String handshakerAddress) {
HandshakerServiceChannel.setHandshakerAddressForTesting(handshakerAddress);
// Instead of using the default shared channel to the handshaker service, create a fix object
// pool of handshaker service channel for testing.
handshakerChannelPool =
HandshakerServiceChannel.getHandshakerChannelPoolForTesting(handshakerAddress);
return this;
}
@ -194,8 +202,10 @@ public final class AltsServerBuilder extends ServerBuilder<AltsServerBuilder> {
@Override
public TsiHandshaker newHandshaker() {
// Used the shared grpc channel to connecting to the ALTS handshaker service.
// TODO: Release the channel if it is not used.
// https://github.com/grpc/grpc-java/issues/4755.
return AltsTsiHandshaker.newServer(
HandshakerServiceGrpc.newStub(HandshakerServiceChannel.get()),
HandshakerServiceGrpc.newStub(handshakerChannelPool.getObject()),
new AltsHandshakerOptions(RpcProtocolVersionsUtil.getRpcProtocolVersions()));
}
}));

View File

@ -40,6 +40,7 @@ import io.grpc.alts.internal.TsiHandshakerFactory;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ProxyParameters;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyChannelBuilder.TransportCreationParamsFilter;
@ -115,7 +116,10 @@ public final class GoogleDefaultChannelBuilder
@Override
public TsiHandshaker newHandshaker() {
// Used the shared grpc channel to connecting to the ALTS handshaker service.
ManagedChannel channel = HandshakerServiceChannel.get();
// TODO: Release the channel if it is not used.
// https://github.com/grpc/grpc-java/issues/4755.
ManagedChannel channel =
SharedResourceHolder.get(HandshakerServiceChannel.SHARED_HANDSHAKER_CHANNEL);
return AltsTsiHandshaker.newClient(
HandshakerServiceGrpc.newStub(channel), handshakerOptions);
}

View File

@ -16,43 +16,58 @@
package io.grpc.alts;
import com.google.common.base.Preconditions;
import io.grpc.ManagedChannel;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
/**
* Class for creating a single shared grpc channel to the ALTS Handshaker Service. The channel to
* the handshaker service is local and is over plaintext. Each application will have at most one
* connection to the handshaker service.
*
* <p>TODO: Release the channel if it is not used. https://github.com/grpc/grpc-java/issues/4755.
* Class for creating a single shared gRPC channel to the ALTS Handshaker Service using
* SharedResourceHolder. The channel to the handshaker service is local and is over plaintext. Each
* application will have at most one connection to the handshaker service.
*/
final class HandshakerServiceChannel {
// Default handshaker service address.
private static String handshakerAddress = "metadata.google.internal:8080";
// Shared channel to ALTS handshaker service.
private static ManagedChannel channel = null;
// Construct me not!
private HandshakerServiceChannel() {}
static final Resource<ManagedChannel> SHARED_HANDSHAKER_CHANNEL =
new Resource<ManagedChannel>() {
// Sets handshaker service address for testing and creates the channel to the handshaker service.
public static synchronized void setHandshakerAddressForTesting(String handshakerAddress) {
Preconditions.checkState(
channel == null || HandshakerServiceChannel.handshakerAddress.equals(handshakerAddress),
"HandshakerServiceChannel already created with a different handshakerAddress");
HandshakerServiceChannel.handshakerAddress = handshakerAddress;
if (channel == null) {
channel = createChannel();
}
}
private EventLoopGroup eventGroup = null;
/** Create a new channel to ALTS handshaker service, if it has not been created yet. */
private static ManagedChannel createChannel() {
@Override
public ManagedChannel create() {
/* Use its own event loop thread pool to avoid blocking. */
if (eventGroup == null) {
eventGroup =
new NioEventLoopGroup(1, new DefaultThreadFactory("handshaker pool", true));
}
return NettyChannelBuilder.forTarget("metadata.google.internal:8080")
.directExecutor()
.eventLoopGroup(eventGroup)
.usePlaintext()
.build();
}
@Override
public void close(ManagedChannel instance) {
instance.shutdownNow();
if (eventGroup != null) {
eventGroup.shutdownGracefully();
}
}
@Override
public String toString() {
return "grpc-alts-handshaker-service-channel";
}
};
/** Returns a fixed object pool of handshaker service channel for testing only. */
static FixedObjectPool<ManagedChannel> getHandshakerChannelPoolForTesting(
String handshakerAddress) {
ThreadFactory clientThreadFactory = new DefaultThreadFactory("handshaker pool", true);
ManagedChannel channel =
NettyChannelBuilder.forTarget(handshakerAddress)
@ -60,13 +75,6 @@ final class HandshakerServiceChannel {
.eventLoopGroup(new NioEventLoopGroup(1, clientThreadFactory))
.usePlaintext()
.build();
return channel;
}
public static synchronized ManagedChannel get() {
if (channel == null) {
channel = createChannel();
}
return channel;
return new FixedObjectPool<ManagedChannel>(channel);
}
}