netty: fix keepalive test flakiness

In `NettyHandlerTestBase` class, extended Netty's `EmbeddedChannel` by overriding`eventLoop()` to return an `eventLoop` that uses `FakeClock.getScheduledExecutorService() to schedule tasks.

Resolves #3326
This commit is contained in:
ZHANG Dapeng 2017-08-15 10:49:01 -07:00 committed by GitHub
parent e4cef9d12d
commit 577bbefd1a
2 changed files with 118 additions and 38 deletions

View File

@ -18,13 +18,18 @@ package io.grpc.netty;
import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.internal.FakeClock;
import io.grpc.internal.MessageFramer; import io.grpc.internal.MessageFramer;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.WritableBuffer; import io.grpc.internal.WritableBuffer;
@ -35,6 +40,7 @@ import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
@ -51,12 +57,18 @@ import io.netty.handler.codec.http2.Http2HeadersDecoder;
import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode; import org.mockito.verification.VerificationMode;
/** /**
@ -84,6 +96,12 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
*/ */
protected void manualSetUp() throws Exception {} protected void manualSetUp() throws Exception {}
private final FakeClock fakeClock = new FakeClock();
FakeClock fakeClock() {
return fakeClock;
}
/** /**
* Must be called by subclasses to initialize the handler and channel. * Must be called by subclasses to initialize the handler and channel.
*/ */
@ -94,12 +112,91 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
handler = newHandler(); handler = newHandler();
channel = new EmbeddedChannel(handler); channel = new FakeClockSupportedChanel(handler);
ctx = channel.pipeline().context(handler); ctx = channel.pipeline().context(handler);
writeQueue = initWriteQueue(); writeQueue = initWriteQueue();
} }
private final class FakeClockSupportedChanel extends EmbeddedChannel {
EventLoop eventLoop;
FakeClockSupportedChanel(ChannelHandler... handlers) {
super(handlers);
}
@Override
public EventLoop eventLoop() {
if (eventLoop == null) {
createEventLoop();
}
return eventLoop;
}
void createEventLoop() {
EventLoop realEventLoop = super.eventLoop();
if (realEventLoop == null) {
return;
}
eventLoop = mock(EventLoop.class, delegatesTo(realEventLoop));
doAnswer(
new Answer<ScheduledFuture<Void>>() {
@Override
public ScheduledFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
Runnable command = (Runnable) invocation.getArguments()[0];
Long delay = (Long) invocation.getArguments()[1];
TimeUnit timeUnit = (TimeUnit) invocation.getArguments()[2];
return new FakeClockScheduledNettyFuture(eventLoop, command, delay, timeUnit);
}
}).when(eventLoop).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
}
}
private final class FakeClockScheduledNettyFuture extends DefaultPromise<Void>
implements ScheduledFuture<Void> {
final java.util.concurrent.ScheduledFuture<?> future;
FakeClockScheduledNettyFuture(
EventLoop eventLoop, final Runnable command, long delay, TimeUnit timeUnit) {
super(eventLoop);
Runnable wrap = new Runnable() {
@Override
public void run() {
try {
command.run();
} catch (Throwable t) {
setFailure(t);
return;
}
if (!isDone()) {
Promise<Void> unused = setSuccess(null);
}
// else: The command itself, such as a shutdown task, might have cancelled all the
// scheduled tasks already.
}
};
future = fakeClock.getScheduledExecutorService().schedule(wrap, delay, timeUnit);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (future.cancel(mayInterruptIfRunning)) {
return super.cancel(mayInterruptIfRunning);
}
return false;
}
@Override
public long getDelay(TimeUnit unit) {
return Math.max(future.getDelay(unit), 1L); // never return zero or negative delay.
}
@Override
public int compareTo(Delayed o) {
return future.compareTo(o);
}
}
protected final T handler() { protected final T handler() {
return handler; return handler;
} }
@ -221,9 +318,9 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
} }
protected final ChannelHandlerContext newMockContext() { protected final ChannelHandlerContext newMockContext() {
ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
EventLoop eventLoop = Mockito.mock(EventLoop.class); EventLoop eventLoop = mock(EventLoop.class);
when(ctx.executor()).thenReturn(eventLoop); when(ctx.executor()).thenReturn(eventLoop);
return ctx; return ctx;
} }

View File

@ -20,7 +20,6 @@ import static com.google.common.base.Charsets.UTF_8;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
import static io.grpc.internal.testing.TestUtils.sleepAtLeast;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED; import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
@ -488,14 +487,12 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
keepAliveTimeoutInNanos = TimeUnit.MINUTES.toNanos(30L); keepAliveTimeoutInNanos = TimeUnit.MINUTES.toNanos(30L);
manualSetUp(); manualSetUp();
sleepAtLeast(10L); fakeClock().forwardNanos(keepAliveTimeInNanos);
channel().runPendingTasks();
verifyWrite().writePing(eq(ctx()), eq(false), eq(pingBuf), any(ChannelPromise.class)); verifyWrite().writePing(eq(ctx()), eq(false), eq(pingBuf), any(ChannelPromise.class));
spyKeepAliveManager.onDataReceived(); spyKeepAliveManager.onDataReceived();
sleepAtLeast(10L); fakeClock().forwardTime(10L, TimeUnit.MILLISECONDS);
channel().runPendingTasks();
verifyWrite(times(2)) verifyWrite(times(2))
.writePing(eq(ctx()), eq(false), eq(pingBuf), any(ChannelPromise.class)); .writePing(eq(ctx()), eq(false), eq(pingBuf), any(ChannelPromise.class));
@ -504,17 +501,15 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
@Test @Test
public void keepAliveManager_pingTimeout() throws Exception { public void keepAliveManager_pingTimeout() throws Exception {
keepAliveTimeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); keepAliveTimeInNanos = TimeUnit.NANOSECONDS.toNanos(123L);
keepAliveTimeoutInNanos = TimeUnit.MILLISECONDS.toNanos(10L); keepAliveTimeoutInNanos = TimeUnit.NANOSECONDS.toNanos(456L);
manualSetUp(); manualSetUp();
sleepAtLeast(10L); fakeClock().forwardNanos(keepAliveTimeInNanos);
channel().runPendingTasks();
assertTrue(channel().isOpen()); assertTrue(channel().isOpen());
sleepAtLeast(10L); fakeClock().forwardNanos(keepAliveTimeoutInNanos);
channel().runPendingTasks();
assertTrue(!channel().isOpen()); assertTrue(!channel().isOpen());
} }
@ -619,8 +614,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
maxConnectionIdleInNanos = TimeUnit.MINUTES.toNanos(30L); maxConnectionIdleInNanos = TimeUnit.MINUTES.toNanos(30L);
manualSetUp(); manualSetUp();
sleepAtLeast(10L); fakeClock().forwardTime(20, TimeUnit.MINUTES);
channel().runPendingTasks();
// GO_AWAY not sent yet // GO_AWAY not sent yet
verifyWrite(never()).writeGoAway( verifyWrite(never()).writeGoAway(
@ -635,8 +629,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
manualSetUp(); manualSetUp();
assertTrue(channel().isOpen()); assertTrue(channel().isOpen());
sleepAtLeast(10L); fakeClock().forwardNanos(maxConnectionIdleInNanos);
channel().runPendingTasks();
// GO_AWAY sent // GO_AWAY sent
verifyWrite().writeGoAway( verifyWrite().writeGoAway(
@ -653,8 +646,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
manualSetUp(); manualSetUp();
createStream(); createStream();
sleepAtLeast(10L); fakeClock().forwardNanos(maxConnectionIdleInNanos);
channel().runPendingTasks();
// GO_AWAY not sent when active // GO_AWAY not sent when active
verifyWrite(never()).writeGoAway( verifyWrite(never()).writeGoAway(
@ -664,8 +656,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code())); channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code()));
sleepAtLeast(10L); fakeClock().forwardNanos(maxConnectionIdleInNanos);
channel().runPendingTasks();
// GO_AWAY sent // GO_AWAY sent
verifyWrite().writeGoAway( verifyWrite().writeGoAway(
@ -681,8 +672,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
maxConnectionAgeInNanos = TimeUnit.MINUTES.toNanos(30L); maxConnectionAgeInNanos = TimeUnit.MINUTES.toNanos(30L);
manualSetUp(); manualSetUp();
sleepAtLeast(10L); fakeClock().forwardTime(20, TimeUnit.MINUTES);
channel().runPendingTasks();
// GO_AWAY not sent yet // GO_AWAY not sent yet
verifyWrite(never()).writeGoAway( verifyWrite(never()).writeGoAway(
@ -697,8 +687,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
manualSetUp(); manualSetUp();
assertTrue(channel().isOpen()); assertTrue(channel().isOpen());
sleepAtLeast(10L); fakeClock().forwardNanos(maxConnectionAgeInNanos);
channel().runPendingTasks();
// GO_AWAY sent // GO_AWAY sent
verifyWrite().writeGoAway( verifyWrite().writeGoAway(
@ -716,15 +705,13 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
manualSetUp(); manualSetUp();
createStream(); createStream();
sleepAtLeast(10L); fakeClock().forwardNanos(maxConnectionAgeInNanos);
channel().runPendingTasks();
verifyWrite().writeGoAway( verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class)); any(ChannelPromise.class));
sleepAtLeast(10L); fakeClock().forwardTime(20, TimeUnit.MINUTES);
channel().runPendingTasks();
// channel not closed yet // channel not closed yet
assertTrue(channel().isOpen()); assertTrue(channel().isOpen());
@ -733,22 +720,18 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
@Test @Test
public void maxConnectionAgeGrace_channelClosedAfterGracePeriod() throws Exception { public void maxConnectionAgeGrace_channelClosedAfterGracePeriod() throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
maxConnectionAgeGraceInNanos = TimeUnit.MILLISECONDS.toNanos(10L); maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L);
manualSetUp(); manualSetUp();
createStream(); createStream();
// runPendingTasks so that GO_AWAY is sent and the forceful shutdown is scheduled fakeClock().forwardNanos(maxConnectionAgeInNanos);
sleepAtLeast(10L);
channel().runPendingTasks();
verifyWrite().writeGoAway( verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class)); any(ChannelPromise.class));
assertTrue(channel().isOpen()); assertTrue(channel().isOpen());
// need runPendingTasks again so that the forceful shutdown can be executed fakeClock().forwardNanos(maxConnectionAgeGraceInNanos);
sleepAtLeast(10L);
channel().runPendingTasks();
// channel closed // channel closed
assertTrue(!channel().isOpen()); assertTrue(!channel().isOpen());