mirror of https://github.com/grpc/grpc-java.git
netty: move buffering logic from ProtocolNegotiators
This is part 1 of a larger change to simplify channel initialization. Part two will be to let Protocol negotiators install themselves in a deterministic manner and delegate error handling to the exception handler. Changes: 1. Copied most of AbstractBufferingHandler to WriteBufferingAndExceptionHandler. WBAEH does not handle adding more than one handler. Eventually, pipeline initialization will happen in the protocol negotiator rather than in each handler. 2. Added tests for error handling. 3. The WBAEH is always added to the NettyClientTransport. This means for a brief period, there will be double buffering on the pipeline. The buffering should go away after part 2.
This commit is contained in:
parent
bcd2372280
commit
659b78b06c
|
|
@ -371,7 +371,6 @@ class NettyClientHandler extends AbstractNettyHandler {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
|
||||
*/
|
||||
|
|
@ -443,6 +442,12 @@ class NettyClientHandler extends AbstractNettyHandler {
|
|||
this.attributes = attributes;
|
||||
this.securityInfo = securityInfo;
|
||||
super.handleProtocolNegotiationCompleted(attributes, securityInfo);
|
||||
// Once protocol negotiator is complete, release all writes and remove the buffer.
|
||||
ChannelHandlerContext handlerCtx =
|
||||
ctx().pipeline().context(WriteBufferingAndExceptionHandler.class);
|
||||
if (handlerCtx != null) {
|
||||
((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -459,7 +464,6 @@ class NettyClientHandler extends AbstractNettyHandler {
|
|||
return securityInfo;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
|
||||
Http2Exception http2Ex) {
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ import io.netty.bootstrap.Bootstrap;
|
|||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
|
|
@ -222,12 +223,14 @@ class NettyClientTransport implements ConnectionClientTransport {
|
|||
b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler);
|
||||
|
||||
/**
|
||||
* We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
|
||||
* is executed in the event loop and we need this handler to be in the pipeline immediately so
|
||||
* that it may begin buffering writes.
|
||||
*/
|
||||
b.handler(negotiationHandler);
|
||||
b.handler(bufferingHandler);
|
||||
ChannelFuture regFuture = b.register();
|
||||
if (regFuture.isDone() && !regFuture.isSuccess()) {
|
||||
channel = null;
|
||||
|
|
|
|||
|
|
@ -601,9 +601,7 @@ public final class ProtocolNegotiators {
|
|||
bufferedWrites = null;
|
||||
}
|
||||
|
||||
// In case something goes wrong ensure that the channel gets closed as the
|
||||
// NettyClientTransport relies on the channel's close future to get completed.
|
||||
ctx.close();
|
||||
ctx.fireExceptionCaught(cause);
|
||||
}
|
||||
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.netty;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import io.grpc.Status;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
|
||||
/**
|
||||
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
|
||||
* {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
|
||||
* write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
|
||||
* i.e. before it's active or the TLS Handshake is complete.
|
||||
*/
|
||||
final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler {
|
||||
|
||||
private final Queue<ChannelWrite> bufferedWrites = new ArrayDeque<>();
|
||||
private final ChannelHandler next;
|
||||
private boolean writing;
|
||||
private boolean flushRequested;
|
||||
private Throwable failCause;
|
||||
|
||||
WriteBufferingAndExceptionHandler(ChannelHandler next) {
|
||||
this.next = checkNotNull(next, "next");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.pipeline().addBefore(ctx.name(), null, next);
|
||||
super.handlerAdded(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* If this channel becomes inactive, then notify all buffered writes that we failed.
|
||||
*/
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) {
|
||||
Status status = Status.UNAVAILABLE.withDescription(
|
||||
"Connection closed while performing protocol negotiation for " + ctx.pipeline().names());
|
||||
failWrites(status.asRuntimeException());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
Status status = Utils.statusFromThrowable(cause);
|
||||
failWrites(status.asRuntimeException());
|
||||
if (ctx.channel().isActive()) {
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is
|
||||
* called, or we have somehow failed. If we have already failed in the past, then the write
|
||||
* will fail immediately.
|
||||
*/
|
||||
@Override
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||
if (failCause != null) {
|
||||
promise.setFailure(failCause);
|
||||
ReferenceCountUtil.release(msg);
|
||||
} else {
|
||||
bufferedWrites.add(new ChannelWrite(msg, promise));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect failures do not show up as {@link #channelInactive} or {@link #exceptionCaught}, so
|
||||
* it needs to be watched.
|
||||
*/
|
||||
@Override
|
||||
public void connect(
|
||||
ChannelHandlerContext ctx,
|
||||
SocketAddress remoteAddress,
|
||||
SocketAddress localAddress,
|
||||
ChannelPromise promise) throws Exception {
|
||||
final class ConnectListener implements ChannelFutureListener {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
if (!future.isSuccess()) {
|
||||
failWrites(future.cause());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
super.connect(ctx, remoteAddress, localAddress, promise);
|
||||
promise.addListener(new ConnectListener());
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls to this method will not trigger an immediate flush. The flush will be deferred until
|
||||
* {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
|
||||
*/
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx) {
|
||||
/**
|
||||
* Swallowing any flushes is not only an optimization but also required
|
||||
* for the SslHandler to work correctly. If the SslHandler receives multiple
|
||||
* flushes while the handshake is still ongoing, then the handshake "randomly"
|
||||
* times out. Not sure at this point why this is happening. Doing a single flush
|
||||
* seems to work but multiple flushes don't ...
|
||||
*/
|
||||
flushRequested = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* If we are still performing protocol negotiation, then this will propagate failures to all
|
||||
* buffered writes.
|
||||
*/
|
||||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
|
||||
Status status = Status.UNAVAILABLE.withDescription(
|
||||
"Connection closing while performing protocol negotiation for " + ctx.pipeline().names());
|
||||
failWrites(status.asRuntimeException());
|
||||
super.close(ctx, future);
|
||||
}
|
||||
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
|
||||
// TODO(carl-mastrangelo): remove the isActive check and just fail if not yet ready.
|
||||
if (!ctx.channel().isActive() || writing) {
|
||||
return;
|
||||
}
|
||||
// Make sure that method can't be reentered, so that the ordering
|
||||
// in the queue can't be messed up.
|
||||
writing = true;
|
||||
while (!bufferedWrites.isEmpty()) {
|
||||
ChannelWrite write = bufferedWrites.poll();
|
||||
ctx.write(write.msg, write.promise);
|
||||
}
|
||||
if (flushRequested) {
|
||||
ctx.flush();
|
||||
}
|
||||
// Removal has to happen last as the above writes will likely trigger
|
||||
// new writes that have to be added to the end of queue in order to not
|
||||
// mess up the ordering.
|
||||
ctx.pipeline().remove(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Propagate failures to all buffered writes.
|
||||
*/
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
private void failWrites(Throwable cause) {
|
||||
if (failCause == null) {
|
||||
failCause = cause;
|
||||
}
|
||||
while (!bufferedWrites.isEmpty()) {
|
||||
ChannelWrite write = bufferedWrites.poll();
|
||||
write.promise.setFailure(cause);
|
||||
ReferenceCountUtil.release(write.msg);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class ChannelWrite {
|
||||
final Object msg;
|
||||
final ChannelPromise promise;
|
||||
|
||||
ChannelWrite(Object msg, ChannelPromise promise) {
|
||||
this.msg = msg;
|
||||
this.promise = promise;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,271 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.netty;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import io.grpc.Status;
|
||||
import io.grpc.Status.Code;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerAdapter;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultEventLoop;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
import io.netty.channel.local.LocalServerChannel;
|
||||
import java.net.ConnectException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.junit.After;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.DisableOnDebug;
|
||||
import org.junit.rules.TestRule;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/**
|
||||
* Tests for {@link WriteBufferingAndExceptionHandler}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class WriteBufferingAndExceptionHandlerTest {
|
||||
|
||||
private static final long TIMEOUT_SECONDS = 10;
|
||||
|
||||
@Rule
|
||||
public final TestRule timeout = new DisableOnDebug(Timeout.seconds(TIMEOUT_SECONDS));
|
||||
|
||||
private final EventLoop group = new DefaultEventLoop();
|
||||
private Channel chan;
|
||||
private Channel server;
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (server != null) {
|
||||
server.close();
|
||||
}
|
||||
if (chan != null) {
|
||||
chan.close();
|
||||
}
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void connectionFailuresPropagated() throws Exception {
|
||||
WriteBufferingAndExceptionHandler handler =
|
||||
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
|
||||
ChannelFuture cf = new Bootstrap()
|
||||
.channel(LocalChannel.class)
|
||||
.handler(handler)
|
||||
.group(group)
|
||||
.register();
|
||||
chan = cf.channel();
|
||||
cf.sync();
|
||||
// Write before connect. In the event connect fails, the pipeline is torn down and the handler
|
||||
// won't be able to fail the writes with the correct exception.
|
||||
ChannelFuture wf = chan.writeAndFlush(new Object());
|
||||
chan.connect(new LocalAddress("bogus"));
|
||||
|
||||
try {
|
||||
wf.sync();
|
||||
fail();
|
||||
} catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(ConnectException.class);
|
||||
assertThat(e).hasMessageThat().contains("connection refused");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelInactiveFailuresPropagated() throws Exception {
|
||||
WriteBufferingAndExceptionHandler handler =
|
||||
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
|
||||
LocalAddress addr = new LocalAddress("local");
|
||||
ChannelFuture cf = new Bootstrap()
|
||||
.channel(LocalChannel.class)
|
||||
.handler(handler)
|
||||
.group(group)
|
||||
.register();
|
||||
chan = cf.channel();
|
||||
cf.sync();
|
||||
ChannelFuture sf = new ServerBootstrap()
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelHandlerAdapter() {})
|
||||
.group(group)
|
||||
.bind(addr);
|
||||
server = sf.channel();
|
||||
sf.sync();
|
||||
|
||||
ChannelFuture wf = chan.writeAndFlush(new Object());
|
||||
chan.connect(addr);
|
||||
chan.pipeline().fireChannelInactive();
|
||||
|
||||
try {
|
||||
wf.sync();
|
||||
fail();
|
||||
} catch (Exception e) {
|
||||
Status status = Status.fromThrowable(e);
|
||||
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(status.getDescription())
|
||||
.contains("Connection closed while performing protocol negotiation");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelCloseFailuresPropagated() throws Exception {
|
||||
WriteBufferingAndExceptionHandler handler =
|
||||
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
|
||||
LocalAddress addr = new LocalAddress("local");
|
||||
ChannelFuture cf = new Bootstrap()
|
||||
.channel(LocalChannel.class)
|
||||
.handler(handler)
|
||||
.group(group)
|
||||
.register();
|
||||
chan = cf.channel();
|
||||
cf.sync();
|
||||
ChannelFuture sf = new ServerBootstrap()
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelHandlerAdapter() {})
|
||||
.group(group)
|
||||
.bind(addr);
|
||||
server = sf.channel();
|
||||
sf.sync();
|
||||
|
||||
ChannelFuture wf = chan.writeAndFlush(new Object());
|
||||
chan.connect(addr);
|
||||
chan.close();
|
||||
|
||||
try {
|
||||
wf.sync();
|
||||
fail();
|
||||
} catch (Exception e) {
|
||||
Status status = Status.fromThrowable(e);
|
||||
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(status.getDescription())
|
||||
.contains("Connection closing while performing protocol negotiation");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void uncaughtExceptionFailuresPropagated() throws Exception {
|
||||
WriteBufferingAndExceptionHandler handler =
|
||||
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
|
||||
LocalAddress addr = new LocalAddress("local");
|
||||
ChannelFuture cf = new Bootstrap()
|
||||
.channel(LocalChannel.class)
|
||||
.handler(handler)
|
||||
.group(group)
|
||||
.register();
|
||||
chan = cf.channel();
|
||||
cf.sync();
|
||||
ChannelFuture sf = new ServerBootstrap()
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelHandlerAdapter() {})
|
||||
.group(group)
|
||||
.bind(addr);
|
||||
server = sf.channel();
|
||||
sf.sync();
|
||||
|
||||
ChannelFuture wf = chan.writeAndFlush(new Object());
|
||||
chan.connect(addr);
|
||||
chan.pipeline().fireExceptionCaught(Status.ABORTED.withDescription("zap").asRuntimeException());
|
||||
|
||||
try {
|
||||
wf.sync();
|
||||
fail();
|
||||
} catch (Exception e) {
|
||||
Status status = Status.fromThrowable(e);
|
||||
assertThat(status.getCode()).isEqualTo(Code.ABORTED);
|
||||
assertThat(status.getDescription()).contains("zap");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void writesBuffered() throws Exception {
|
||||
final AtomicBoolean handlerAdded = new AtomicBoolean();
|
||||
final AtomicBoolean flush = new AtomicBoolean();
|
||||
final AtomicReference<Object> write = new AtomicReference<>();
|
||||
final WriteBufferingAndExceptionHandler handler =
|
||||
new WriteBufferingAndExceptionHandler(new ChannelOutboundHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
assertFalse(handlerAdded.getAndSet(true));
|
||||
super.handlerAdded(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||
assertFalse(flush.getAndSet(true));
|
||||
super.flush(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||
assertNull(write.getAndSet(msg));
|
||||
promise.setSuccess();
|
||||
}
|
||||
});
|
||||
LocalAddress addr = new LocalAddress("local");
|
||||
ChannelFuture cf = new Bootstrap()
|
||||
.channel(LocalChannel.class)
|
||||
.handler(handler)
|
||||
.group(group)
|
||||
.register();
|
||||
chan = cf.channel();
|
||||
cf.sync();
|
||||
ChannelFuture sf = new ServerBootstrap()
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelHandlerAdapter() {})
|
||||
.group(group)
|
||||
.bind(addr);
|
||||
server = sf.channel();
|
||||
sf.sync();
|
||||
|
||||
assertTrue(handlerAdded.get());
|
||||
|
||||
chan.write(new Object());
|
||||
chan.connect(addr).sync();
|
||||
assertNull(write.get());
|
||||
|
||||
chan.flush();
|
||||
assertNull(write.get());
|
||||
assertFalse(flush.get());
|
||||
|
||||
assertThat(chan.pipeline().context(handler)).isNotNull();
|
||||
chan.eventLoop().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handler.writeBufferedAndRemove(chan.pipeline().context(handler));
|
||||
}
|
||||
}).sync();
|
||||
|
||||
assertThat(chan.pipeline().context(handler)).isNull();
|
||||
assertThat(write.get().getClass()).isSameAs(Object.class);
|
||||
assertTrue(flush.get());
|
||||
assertThat(chan.pipeline()).doesNotContain(handler);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue