From 1161ad9ed83d290b5561f91abaad6e2c287ac581 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 11 Feb 2021 15:25:27 -0800 Subject: [PATCH] alts: Make concurrent handshake limit part of ALTS instead of TSI The handshake limit is more a property of ALTS than TSI. This allows other TSI implementations to accept a high connection rate (b/179376431) --- .../alts/internal/AltsProtocolNegotiator.java | 16 +++-- .../io/grpc/alts/internal/AsyncSemaphore.java | 61 ++++++++++++++++++ .../alts/internal/TsiHandshakeHandler.java | 64 +++++++------------ 3 files changed, 95 insertions(+), 46 deletions(-) create mode 100644 alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java diff --git a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java index ce718f46f6..0ea1624812 100644 --- a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java +++ b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java @@ -50,6 +50,10 @@ import javax.annotation.Nullable; // TODO(carl-mastrangelo): rename this AltsProtocolNegotiators. public final class AltsProtocolNegotiator { private static final Logger logger = Logger.getLogger(AltsProtocolNegotiator.class.getName()); + // Avoid performing too many handshakes in parallel, as it may cause queuing in the handshake + // server and cause unbounded blocking on the event loop (b/168808426). This is a workaround until + // there is an async TSI handshaking API to avoid the blocking. + private static final AsyncSemaphore handshakeSemaphore = new AsyncSemaphore(32); @Grpc.TransportAttr public static final Attributes.Key TSI_PEER_KEY = Attributes.Key.create("TSI_PEER"); @@ -110,8 +114,8 @@ public final class AltsProtocolNegotiator { TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority()); NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker); ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler); - ChannelHandler thh = - new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator()); + ChannelHandler thh = new TsiHandshakeHandler( + gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore); ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh); return wuah; } @@ -165,8 +169,8 @@ public final class AltsProtocolNegotiator { TsiHandshaker handshaker = handshakerFactory.newHandshaker(/* authority= */ null); NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker); ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler); - ChannelHandler thh = - new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator()); + ChannelHandler thh = new TsiHandshakeHandler( + gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore); ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh); return wuah; } @@ -259,8 +263,8 @@ public final class AltsProtocolNegotiator { || isXdsDirectPath) { TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority()); NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker); - securityHandler = - new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator()); + securityHandler = new TsiHandshakeHandler( + gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore); } else { securityHandler = InternalProtocolNegotiators.clientTlsHandler( gnh, sslContext, grpcHandler.getAuthority()); diff --git a/alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java b/alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java new file mode 100644 index 0000000000..3ccdcfc763 --- /dev/null +++ b/alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.alts.internal; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.util.LinkedList; +import java.util.Queue; +import javax.annotation.concurrent.GuardedBy; + +/** Provides a semaphore primitive, without blocking waiting on permits. */ +final class AsyncSemaphore { + private final Object lock = new Object(); + @SuppressWarnings("JdkObsolete") // LinkedList avoids high watermark memory issues + private final Queue queue = new LinkedList<>(); + @GuardedBy("lock") + private int permits; + + public AsyncSemaphore(int permits) { + this.permits = permits; + } + + public ChannelFuture acquire(ChannelHandlerContext ctx) { + synchronized (lock) { + if (permits > 0) { + permits--; + return ctx.newSucceededFuture(); + } + ChannelPromise promise = ctx.newPromise(); + queue.add(promise); + return promise; + } + } + + public void release() { + ChannelPromise next; + synchronized (lock) { + next = queue.poll(); + if (next == null) { + permits++; + return; + } + } + next.setSuccess(); + } +} diff --git a/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java b/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java index 1b4737f3cd..f2e19d3ded 100644 --- a/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java +++ b/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java @@ -35,12 +35,9 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; import io.netty.handler.codec.ByteToMessageDecoder; import java.security.GeneralSecurityException; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import javax.annotation.Nullable; /** @@ -82,14 +79,11 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder { } private static final int HANDSHAKE_FRAME_SIZE = 1024; - // Avoid performing too many handshakes in parallel, as it may cause queuing in the handshake - // server and cause unbounded blocking on the event loop (b/168808426). This is a workaround until - // there is an async TSI handshaking API to avoid the blocking. - private static final AsyncSemaphore semaphore = new AsyncSemaphore(32); private final NettyTsiHandshaker handshaker; private final HandshakeValidator handshakeValidator; private final ChannelHandler next; + private final AsyncSemaphore semaphore; private ProtocolNegotiationEvent pne; private boolean semaphoreAcquired; @@ -99,9 +93,20 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder { */ public TsiHandshakeHandler( ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator) { + this(next, handshaker, handshakeValidator, null); + } + + /** + * Constructs a TsHandshakeHandler. If a semaphore is provided, a permit from the semaphore is + * required to start the handshake and is returned when the handshake ends. + */ + public TsiHandshakeHandler( + ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator, + AsyncSemaphore semaphore) { this.handshaker = checkNotNull(handshaker, "handshaker"); this.handshakeValidator = checkNotNull(handshakeValidator, "handshakeValidator"); this.next = checkNotNull(next, "next"); + this.semaphore = semaphore; } @Override @@ -152,7 +157,7 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder { pne = (ProtocolNegotiationEvent) evt; InternalProtocolNegotiators.negotiationLogger(ctx) .log(ChannelLogLevel.INFO, "TsiHandshake started"); - ChannelFuture acquire = semaphore.acquire(ctx); + ChannelFuture acquire = semaphoreAcquire(ctx); if (acquire.isSuccess()) { semaphoreAcquired = true; sendHandshake(ctx); @@ -164,7 +169,7 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder { return; } if (ctx.isRemoved()) { - semaphore.release(); + semaphoreRelease(); return; } semaphoreAcquired = true; @@ -222,44 +227,23 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder { @Override protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { if (semaphoreAcquired) { - semaphore.release(); + semaphoreRelease(); semaphoreAcquired = false; } handshaker.close(); } - private static class AsyncSemaphore { - private final Object lock = new Object(); - @SuppressWarnings("JdkObsolete") // LinkedList avoids high watermark memory issues - private final Queue queue = new LinkedList<>(); - private int permits; - - public AsyncSemaphore(int permits) { - this.permits = permits; + private ChannelFuture semaphoreAcquire(ChannelHandlerContext ctx) { + if (semaphore == null) { + return ctx.newSucceededFuture(); + } else { + return semaphore.acquire(ctx); } + } - public ChannelFuture acquire(ChannelHandlerContext ctx) { - synchronized (lock) { - if (permits > 0) { - permits--; - return ctx.newSucceededFuture(); - } - ChannelPromise promise = ctx.newPromise(); - queue.add(promise); - return promise; - } - } - - public void release() { - ChannelPromise next; - synchronized (lock) { - next = queue.poll(); - if (next == null) { - permits++; - return; - } - } - next.setSuccess(); + private void semaphoreRelease() { + if (semaphore != null) { + semaphore.release(); } } }