alts: Make concurrent handshake limit part of ALTS instead of TSI

The handshake limit is more a property of ALTS than TSI. This allows
other TSI implementations to accept a high connection rate (b/179376431)
This commit is contained in:
Eric Anderson 2021-02-11 15:25:27 -08:00 committed by Eric Anderson
parent 3752b9e365
commit 1161ad9ed8
3 changed files with 95 additions and 46 deletions

View File

@ -50,6 +50,10 @@ import javax.annotation.Nullable;
// TODO(carl-mastrangelo): rename this AltsProtocolNegotiators.
public final class AltsProtocolNegotiator {
private static final Logger logger = Logger.getLogger(AltsProtocolNegotiator.class.getName());
// Avoid performing too many handshakes in parallel, as it may cause queuing in the handshake
// server and cause unbounded blocking on the event loop (b/168808426). This is a workaround until
// there is an async TSI handshaking API to avoid the blocking.
private static final AsyncSemaphore handshakeSemaphore = new AsyncSemaphore(32);
@Grpc.TransportAttr
public static final Attributes.Key<TsiPeer> TSI_PEER_KEY = Attributes.Key.create("TSI_PEER");
@ -110,8 +114,8 @@ public final class AltsProtocolNegotiator {
TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority());
NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);
ChannelHandler thh =
new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator());
ChannelHandler thh = new TsiHandshakeHandler(
gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh);
return wuah;
}
@ -165,8 +169,8 @@ public final class AltsProtocolNegotiator {
TsiHandshaker handshaker = handshakerFactory.newHandshaker(/* authority= */ null);
NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);
ChannelHandler thh =
new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator());
ChannelHandler thh = new TsiHandshakeHandler(
gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh);
return wuah;
}
@ -259,8 +263,8 @@ public final class AltsProtocolNegotiator {
|| isXdsDirectPath) {
TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority());
NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
securityHandler =
new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator());
securityHandler = new TsiHandshakeHandler(
gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
} else {
securityHandler = InternalProtocolNegotiators.clientTlsHandler(
gnh, sslContext, grpcHandler.getAuthority());

View File

@ -0,0 +1,61 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.alts.internal;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.LinkedList;
import java.util.Queue;
import javax.annotation.concurrent.GuardedBy;
/** Provides a semaphore primitive, without blocking waiting on permits. */
final class AsyncSemaphore {
private final Object lock = new Object();
@SuppressWarnings("JdkObsolete") // LinkedList avoids high watermark memory issues
private final Queue<ChannelPromise> queue = new LinkedList<>();
@GuardedBy("lock")
private int permits;
public AsyncSemaphore(int permits) {
this.permits = permits;
}
public ChannelFuture acquire(ChannelHandlerContext ctx) {
synchronized (lock) {
if (permits > 0) {
permits--;
return ctx.newSucceededFuture();
}
ChannelPromise promise = ctx.newPromise();
queue.add(promise);
return promise;
}
}
public void release() {
ChannelPromise next;
synchronized (lock) {
next = queue.poll();
if (next == null) {
permits++;
return;
}
}
next.setSuccess();
}
}

View File

@ -35,12 +35,9 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.security.GeneralSecurityException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import javax.annotation.Nullable;
/**
@ -82,14 +79,11 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder {
}
private static final int HANDSHAKE_FRAME_SIZE = 1024;
// Avoid performing too many handshakes in parallel, as it may cause queuing in the handshake
// server and cause unbounded blocking on the event loop (b/168808426). This is a workaround until
// there is an async TSI handshaking API to avoid the blocking.
private static final AsyncSemaphore semaphore = new AsyncSemaphore(32);
private final NettyTsiHandshaker handshaker;
private final HandshakeValidator handshakeValidator;
private final ChannelHandler next;
private final AsyncSemaphore semaphore;
private ProtocolNegotiationEvent pne;
private boolean semaphoreAcquired;
@ -99,9 +93,20 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder {
*/
public TsiHandshakeHandler(
ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator) {
this(next, handshaker, handshakeValidator, null);
}
/**
* Constructs a TsHandshakeHandler. If a semaphore is provided, a permit from the semaphore is
* required to start the handshake and is returned when the handshake ends.
*/
public TsiHandshakeHandler(
ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator,
AsyncSemaphore semaphore) {
this.handshaker = checkNotNull(handshaker, "handshaker");
this.handshakeValidator = checkNotNull(handshakeValidator, "handshakeValidator");
this.next = checkNotNull(next, "next");
this.semaphore = semaphore;
}
@Override
@ -152,7 +157,7 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder {
pne = (ProtocolNegotiationEvent) evt;
InternalProtocolNegotiators.negotiationLogger(ctx)
.log(ChannelLogLevel.INFO, "TsiHandshake started");
ChannelFuture acquire = semaphore.acquire(ctx);
ChannelFuture acquire = semaphoreAcquire(ctx);
if (acquire.isSuccess()) {
semaphoreAcquired = true;
sendHandshake(ctx);
@ -164,7 +169,7 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder {
return;
}
if (ctx.isRemoved()) {
semaphore.release();
semaphoreRelease();
return;
}
semaphoreAcquired = true;
@ -222,44 +227,23 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder {
@Override
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
if (semaphoreAcquired) {
semaphore.release();
semaphoreRelease();
semaphoreAcquired = false;
}
handshaker.close();
}
private static class AsyncSemaphore {
private final Object lock = new Object();
@SuppressWarnings("JdkObsolete") // LinkedList avoids high watermark memory issues
private final Queue<ChannelPromise> queue = new LinkedList<>();
private int permits;
public AsyncSemaphore(int permits) {
this.permits = permits;
private ChannelFuture semaphoreAcquire(ChannelHandlerContext ctx) {
if (semaphore == null) {
return ctx.newSucceededFuture();
} else {
return semaphore.acquire(ctx);
}
}
public ChannelFuture acquire(ChannelHandlerContext ctx) {
synchronized (lock) {
if (permits > 0) {
permits--;
return ctx.newSucceededFuture();
}
ChannelPromise promise = ctx.newPromise();
queue.add(promise);
return promise;
}
}
public void release() {
ChannelPromise next;
synchronized (lock) {
next = queue.poll();
if (next == null) {
permits++;
return;
}
}
next.setSuccess();
private void semaphoreRelease() {
if (semaphore != null) {
semaphore.release();
}
}
}