mirror of https://github.com/grpc/grpc-java.git
netty: change server to new protocol negotiator model
Changes: * PlaintextProtocolNegotiator is the same between client and server * ServerTlsHandler is rewritten to not handle errors * Also, it now sets the security level attribute, which I don't think it did previously * NettyServerTransport now uses WBAEH, similar to the client. I don't think the buffer is needed, but it does correctly handle errors during the startup
This commit is contained in:
parent
bacb741aaf
commit
f94b77c87f
|
|
@ -512,6 +512,8 @@ class NettyServerHandler extends AbstractNettyHandler {
|
||||||
Attributes attrs, InternalChannelz.Security securityInfo) {
|
Attributes attrs, InternalChannelz.Security securityInfo) {
|
||||||
negotiationAttributes = attrs;
|
negotiationAttributes = attrs;
|
||||||
this.securityInfo = securityInfo;
|
this.securityInfo = securityInfo;
|
||||||
|
super.handleProtocolNegotiationCompleted(attrs, securityInfo);
|
||||||
|
NettyClientHandler.writeBufferingAndRemove(ctx().channel());
|
||||||
}
|
}
|
||||||
|
|
||||||
InternalChannelz.Security getSecurityInfo() {
|
InternalChannelz.Security getSecurityInfo() {
|
||||||
|
|
|
||||||
|
|
@ -137,12 +137,14 @@ class NettyServerTransport implements ServerTransport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
|
||||||
|
ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler);
|
||||||
|
|
||||||
ChannelFutureListener terminationNotifier = new TerminationNotifier();
|
ChannelFutureListener terminationNotifier = new TerminationNotifier();
|
||||||
channelUnused.addListener(terminationNotifier);
|
channelUnused.addListener(terminationNotifier);
|
||||||
channel.closeFuture().addListener(terminationNotifier);
|
channel.closeFuture().addListener(terminationNotifier);
|
||||||
|
|
||||||
ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
|
channel.pipeline().addLast(bufferingHandler);
|
||||||
channel.pipeline().addLast(negotiationHandler);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,6 @@ import io.grpc.Attributes;
|
||||||
import io.grpc.ChannelLogger;
|
import io.grpc.ChannelLogger;
|
||||||
import io.grpc.ChannelLogger.ChannelLogLevel;
|
import io.grpc.ChannelLogger.ChannelLogLevel;
|
||||||
import io.grpc.Grpc;
|
import io.grpc.Grpc;
|
||||||
import io.grpc.InternalChannelz;
|
|
||||||
import io.grpc.InternalChannelz.Security;
|
import io.grpc.InternalChannelz.Security;
|
||||||
import io.grpc.InternalChannelz.Tls;
|
import io.grpc.InternalChannelz.Tls;
|
||||||
import io.grpc.SecurityLevel;
|
import io.grpc.SecurityLevel;
|
||||||
|
|
@ -36,11 +35,9 @@ import io.netty.channel.ChannelDuplexHandler;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelHandlerAdapter;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandler;
|
import io.netty.channel.ChannelInboundHandler;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.channel.ChannelPipeline;
|
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.handler.codec.http.DefaultHttpRequest;
|
import io.netty.handler.codec.http.DefaultHttpRequest;
|
||||||
import io.netty.handler.codec.http.HttpClientCodec;
|
import io.netty.handler.codec.http.HttpClientCodec;
|
||||||
|
|
@ -109,34 +106,7 @@ final class ProtocolNegotiators {
|
||||||
* Create a server plaintext handler for gRPC.
|
* Create a server plaintext handler for gRPC.
|
||||||
*/
|
*/
|
||||||
public static ProtocolNegotiator serverPlaintext() {
|
public static ProtocolNegotiator serverPlaintext() {
|
||||||
return new ProtocolNegotiator() {
|
return new PlaintextProtocolNegotiator();
|
||||||
@Override
|
|
||||||
public ChannelHandler newHandler(final GrpcHttp2ConnectionHandler handler) {
|
|
||||||
class PlaintextHandler extends ChannelHandlerAdapter {
|
|
||||||
@Override
|
|
||||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
// Set sttributes before replace to be sure we pass it before accepting any requests.
|
|
||||||
handler.handleProtocolNegotiationCompleted(Attributes.newBuilder()
|
|
||||||
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
|
|
||||||
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
|
|
||||||
.build(),
|
|
||||||
/*securityInfo=*/ null);
|
|
||||||
// Just replace this handler with the gRPC handler.
|
|
||||||
ctx.pipeline().replace(this, null, handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return new PlaintextHandler();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public AsciiString scheme() {
|
|
||||||
return Utils.HTTP;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -147,7 +117,10 @@ final class ProtocolNegotiators {
|
||||||
return new ProtocolNegotiator() {
|
return new ProtocolNegotiator() {
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler handler) {
|
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler handler) {
|
||||||
return new ServerTlsHandler(sslContext, handler);
|
ChannelHandler gnh = new GrpcNegotiationHandler(handler);
|
||||||
|
ChannelHandler sth = new ServerTlsHandler(gnh, sslContext);
|
||||||
|
ChannelHandler wauh = new WaitUntilActiveHandler(sth);
|
||||||
|
return wauh;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -161,67 +134,56 @@ final class ProtocolNegotiators {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static final class ServerTlsHandler extends ChannelInboundHandlerAdapter {
|
static final class ServerTlsHandler extends ChannelInboundHandlerAdapter {
|
||||||
private final GrpcHttp2ConnectionHandler grpcHandler;
|
private final ChannelHandler next;
|
||||||
private final SslContext sslContext;
|
private final SslContext sslContext;
|
||||||
|
|
||||||
ServerTlsHandler(SslContext sslContext, GrpcHttp2ConnectionHandler grpcHandler) {
|
private ProtocolNegotiationEvent pne = ProtocolNegotiationEvent.DEFAULT;
|
||||||
this.sslContext = sslContext;
|
|
||||||
this.grpcHandler = grpcHandler;
|
ServerTlsHandler(ChannelHandler next, SslContext sslContext) {
|
||||||
|
this.sslContext = checkNotNull(sslContext, "sslContext");
|
||||||
|
this.next = checkNotNull(next, "next");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||||
super.handlerAdded(ctx);
|
super.handlerAdded(ctx);
|
||||||
|
|
||||||
SSLEngine sslEngine = sslContext.newEngine(ctx.alloc());
|
SSLEngine sslEngine = sslContext.newEngine(ctx.alloc());
|
||||||
ctx.pipeline().addFirst(new SslHandler(sslEngine, false));
|
ctx.pipeline().addBefore(ctx.name(), null, new SslHandler(sslEngine, false));
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
||||||
fail(ctx, cause);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||||
if (evt instanceof SslHandshakeCompletionEvent) {
|
if (evt instanceof ProtocolNegotiationEvent) {
|
||||||
|
pne = (ProtocolNegotiationEvent) evt;
|
||||||
|
} else if (evt instanceof SslHandshakeCompletionEvent) {
|
||||||
SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
|
SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
|
||||||
if (handshakeEvent.isSuccess()) {
|
if (!handshakeEvent.isSuccess()) {
|
||||||
if (NEXT_PROTOCOL_VERSIONS.contains(sslHandler(ctx.pipeline()).applicationProtocol())) {
|
logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed for new client.", null);
|
||||||
SSLSession session = sslHandler(ctx.pipeline()).engine().getSession();
|
ctx.fireExceptionCaught(handshakeEvent.cause());
|
||||||
// Successfully negotiated the protocol.
|
return;
|
||||||
// Notify about completion and pass down SSLSession in attributes.
|
|
||||||
grpcHandler.handleProtocolNegotiationCompleted(
|
|
||||||
Attributes.newBuilder()
|
|
||||||
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
|
|
||||||
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
|
|
||||||
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
|
|
||||||
.build(),
|
|
||||||
new InternalChannelz.Security(new InternalChannelz.Tls(session)));
|
|
||||||
// Replace this handler with the GRPC handler.
|
|
||||||
ctx.pipeline().replace(this, null, grpcHandler);
|
|
||||||
} else {
|
|
||||||
fail(ctx,
|
|
||||||
unavailableException(
|
|
||||||
"Failed protocol negotiation: Unable to find compatible protocol"));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
fail(ctx, handshakeEvent.cause());
|
|
||||||
}
|
}
|
||||||
|
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
|
||||||
|
if (!NEXT_PROTOCOL_VERSIONS.contains(sslHandler.applicationProtocol())) {
|
||||||
|
logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed for new client.", null);
|
||||||
|
ctx.fireExceptionCaught(unavailableException(
|
||||||
|
"Failed protocol negotiation: Unable to find compatible protocol"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ctx.pipeline().replace(ctx.name(), null, next);
|
||||||
|
fireProtocolNegotiationEvent(ctx, sslHandler.engine().getSession());
|
||||||
|
} else {
|
||||||
|
super.userEventTriggered(ctx, evt);
|
||||||
}
|
}
|
||||||
super.userEventTriggered(ctx, evt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private SslHandler sslHandler(ChannelPipeline pipeline) {
|
private void fireProtocolNegotiationEvent(ChannelHandlerContext ctx, SSLSession session) {
|
||||||
return pipeline.get(SslHandler.class);
|
Security security = new Security(new Tls(session));
|
||||||
}
|
Attributes attrs = pne.getAttributes().toBuilder()
|
||||||
|
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
|
||||||
@SuppressWarnings("FutureReturnValueIgnored")
|
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
|
||||||
private void fail(ChannelHandlerContext ctx, Throwable exception) {
|
.build();
|
||||||
logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed for new client.", exception);
|
ctx.fireUserEventTriggered(pne.withAttributes(attrs).withSecurity(security));
|
||||||
ctx.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -238,21 +238,10 @@ public class ProtocolNegotiatorsTest {
|
||||||
Object unused = ProtocolNegotiators.serverTls(null);
|
Object unused = ProtocolNegotiators.serverTls(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void tlsAdapter_exceptionClosesChannel() throws Exception {
|
|
||||||
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
|
|
||||||
|
|
||||||
// Use addFirst due to the funny error handling in EmbeddedChannel.
|
|
||||||
pipeline.addFirst(handler);
|
|
||||||
|
|
||||||
pipeline.fireExceptionCaught(new Exception("bad"));
|
|
||||||
|
|
||||||
assertFalse(channel.isOpen());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void tlsHandler_handlerAddedAddsSslHandler() throws Exception {
|
public void tlsHandler_handlerAddedAddsSslHandler() throws Exception {
|
||||||
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
|
ChannelHandler handler = new ServerTlsHandler(grpcHandler, sslContext);
|
||||||
|
|
||||||
pipeline.addLast(handler);
|
pipeline.addLast(handler);
|
||||||
|
|
||||||
|
|
@ -261,7 +250,7 @@ public class ProtocolNegotiatorsTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void tlsHandler_userEventTriggeredNonSslEvent() throws Exception {
|
public void tlsHandler_userEventTriggeredNonSslEvent() throws Exception {
|
||||||
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
|
ChannelHandler handler = new ServerTlsHandler(grpcHandler, sslContext);
|
||||||
pipeline.addLast(handler);
|
pipeline.addLast(handler);
|
||||||
channelHandlerCtx = pipeline.context(handler);
|
channelHandlerCtx = pipeline.context(handler);
|
||||||
Object nonSslEvent = new Object();
|
Object nonSslEvent = new Object();
|
||||||
|
|
@ -282,32 +271,52 @@ public class ProtocolNegotiatorsTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
|
ChannelHandler handler = new ServerTlsHandler(grpcHandler, sslContext);
|
||||||
pipeline.addLast(handler);
|
pipeline.addLast(handler);
|
||||||
|
|
||||||
|
final AtomicReference<Throwable> error = new AtomicReference<>();
|
||||||
|
ChannelHandler errorCapture = new ChannelInboundHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
error.set(cause);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pipeline.addLast(errorCapture);
|
||||||
|
|
||||||
pipeline.replace(SslHandler.class, null, badSslHandler);
|
pipeline.replace(SslHandler.class, null, badSslHandler);
|
||||||
channelHandlerCtx = pipeline.context(handler);
|
channelHandlerCtx = pipeline.context(handler);
|
||||||
Object sslEvent = SslHandshakeCompletionEvent.SUCCESS;
|
Object sslEvent = SslHandshakeCompletionEvent.SUCCESS;
|
||||||
|
|
||||||
pipeline.fireUserEventTriggered(sslEvent);
|
pipeline.fireUserEventTriggered(sslEvent);
|
||||||
|
|
||||||
// No h2 protocol was specified, so this should be closed.
|
// No h2 protocol was specified, so there should be an error, (normally handled by WBAEH)
|
||||||
assertFalse(channel.isOpen());
|
assertThat(error.get()).hasMessageThat().contains("Unable to find compatible protocol");
|
||||||
ChannelHandlerContext grpcHandlerCtx = pipeline.context(grpcHandler);
|
ChannelHandlerContext grpcHandlerCtx = pipeline.context(grpcHandler);
|
||||||
assertNull(grpcHandlerCtx);
|
assertNull(grpcHandlerCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void tlsHandler_userEventTriggeredSslEvent_handshakeFailure() throws Exception {
|
public void tlsHandler_userEventTriggeredSslEvent_handshakeFailure() throws Exception {
|
||||||
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
|
ChannelHandler handler = new ServerTlsHandler(grpcHandler, sslContext);
|
||||||
pipeline.addLast(handler);
|
pipeline.addLast(handler);
|
||||||
channelHandlerCtx = pipeline.context(handler);
|
channelHandlerCtx = pipeline.context(handler);
|
||||||
Object sslEvent = new SslHandshakeCompletionEvent(new RuntimeException("bad"));
|
Object sslEvent = new SslHandshakeCompletionEvent(new RuntimeException("bad"));
|
||||||
|
|
||||||
|
final AtomicReference<Throwable> error = new AtomicReference<>();
|
||||||
|
ChannelHandler errorCapture = new ChannelInboundHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
error.set(cause);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pipeline.addLast(errorCapture);
|
||||||
|
|
||||||
pipeline.fireUserEventTriggered(sslEvent);
|
pipeline.fireUserEventTriggered(sslEvent);
|
||||||
|
|
||||||
// No h2 protocol was specified, so this should be closed.
|
// No h2 protocol was specified, so there should be an error, (normally handled by WBAEH)
|
||||||
assertFalse(channel.isOpen());
|
assertThat(error.get()).hasMessageThat().contains("bad");
|
||||||
ChannelHandlerContext grpcHandlerCtx = pipeline.context(grpcHandler);
|
ChannelHandlerContext grpcHandlerCtx = pipeline.context(grpcHandler);
|
||||||
assertNull(grpcHandlerCtx);
|
assertNull(grpcHandlerCtx);
|
||||||
}
|
}
|
||||||
|
|
@ -321,7 +330,7 @@ public class ProtocolNegotiatorsTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
|
ChannelHandler handler = new ServerTlsHandler(grpcHandler, sslContext);
|
||||||
pipeline.addLast(handler);
|
pipeline.addLast(handler);
|
||||||
|
|
||||||
pipeline.replace(SslHandler.class, null, goodSslHandler);
|
pipeline.replace(SslHandler.class, null, goodSslHandler);
|
||||||
|
|
@ -344,7 +353,7 @@ public class ProtocolNegotiatorsTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
|
ChannelHandler handler = new ServerTlsHandler(grpcHandler, sslContext);
|
||||||
pipeline.addLast(handler);
|
pipeline.addLast(handler);
|
||||||
|
|
||||||
pipeline.replace(SslHandler.class, null, goodSslHandler);
|
pipeline.replace(SslHandler.class, null, goodSslHandler);
|
||||||
|
|
@ -360,7 +369,7 @@ public class ProtocolNegotiatorsTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void engineLog() {
|
public void engineLog() {
|
||||||
ChannelHandler handler = new ServerTlsHandler(sslContext, grpcHandler);
|
ChannelHandler handler = new ServerTlsHandler(grpcHandler, sslContext);
|
||||||
pipeline.addLast(handler);
|
pipeline.addLast(handler);
|
||||||
channelHandlerCtx = pipeline.context(handler);
|
channelHandlerCtx = pipeline.context(handler);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue