diff --git a/netty/src/main/java/io/grpc/netty/MaxConnectionIdleManager.java b/core/src/main/java/io/grpc/internal/MaxConnectionIdleManager.java similarity index 78% rename from netty/src/main/java/io/grpc/netty/MaxConnectionIdleManager.java rename to core/src/main/java/io/grpc/internal/MaxConnectionIdleManager.java index 964ae44b17..a6e2a13913 100644 --- a/netty/src/main/java/io/grpc/netty/MaxConnectionIdleManager.java +++ b/core/src/main/java/io/grpc/internal/MaxConnectionIdleManager.java @@ -14,20 +14,19 @@ * limitations under the License. */ -package io.grpc.netty; +package io.grpc.internal; import com.google.common.annotations.VisibleForTesting; -import io.grpc.internal.LogExceptionRunnable; -import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; + import javax.annotation.CheckForNull; /** * Monitors connection idle time; shutdowns the connection if the max connection idle is reached. */ -abstract class MaxConnectionIdleManager { +public class MaxConnectionIdleManager { private static final Ticker systemTicker = new Ticker() { @Override public long nanoTime() { @@ -46,23 +45,24 @@ abstract class MaxConnectionIdleManager { private boolean shutdownDelayed; private boolean isActive; - MaxConnectionIdleManager(long maxConnectionIdleInNanos) { + public MaxConnectionIdleManager(long maxConnectionIdleInNanos) { this(maxConnectionIdleInNanos, systemTicker); } @VisibleForTesting - MaxConnectionIdleManager(long maxConnectionIdleInNanos, Ticker ticker) { + public MaxConnectionIdleManager(long maxConnectionIdleInNanos, Ticker ticker) { this.maxConnectionIdleInNanos = maxConnectionIdleInNanos; this.ticker = ticker; } - /** A {@link NettyServerHandler} was added to the transport. */ - void start(ChannelHandlerContext ctx) { - start(ctx, ctx.executor()); - } - + /** + * Start the initial scheduled shutdown given the transport status reaches max connection idle. + * + * @param closeJob Closes the connection by sending GO_AWAY with status code NO_ERROR and ASCII + * debug data max_idle and then doing the graceful connection termination. + */ @VisibleForTesting - void start(final ChannelHandlerContext ctx, final ScheduledExecutorService scheduler) { + public void start(final Runnable closeJob, final ScheduledExecutorService scheduler) { this.scheduler = scheduler; nextIdleMonitorTime = ticker.nanoTime() + maxConnectionIdleInNanos; @@ -78,7 +78,7 @@ abstract class MaxConnectionIdleManager { } // if isActive, exit. Will schedule a new shutdownFuture once onTransportIdle } else { - close(ctx); + closeJob.run(); shutdownFuture = null; } } @@ -88,20 +88,15 @@ abstract class MaxConnectionIdleManager { scheduler.schedule(shutdownTask, maxConnectionIdleInNanos, TimeUnit.NANOSECONDS); } - /** - * Closes the connection by sending GO_AWAY with status code NO_ERROR and ASCII debug data - * max_idle and then doing the graceful connection termination. - */ - abstract void close(ChannelHandlerContext ctx); /** There are outstanding RPCs on the transport. */ - void onTransportActive() { + public void onTransportActive() { isActive = true; shutdownDelayed = true; } /** There are no outstanding RPCs on the transport. */ - void onTransportIdle() { + public void onTransportIdle() { isActive = false; if (shutdownFuture == null) { return; @@ -116,7 +111,7 @@ abstract class MaxConnectionIdleManager { } /** Transport is being terminated. */ - void onTransportTermination() { + public void onTransportTermination() { if (shutdownFuture != null) { shutdownFuture.cancel(false); shutdownFuture = null; @@ -124,7 +119,7 @@ abstract class MaxConnectionIdleManager { } @VisibleForTesting - interface Ticker { + public interface Ticker { long nanoTime(); } } diff --git a/netty/src/test/java/io/grpc/netty/MaxConnectionIdleManagerTest.java b/core/src/test/java/io/grpc/internal/MaxConnectionIdleManagerTest.java similarity index 62% rename from netty/src/test/java/io/grpc/netty/MaxConnectionIdleManagerTest.java rename to core/src/test/java/io/grpc/internal/MaxConnectionIdleManagerTest.java index d2ae98980d..53566054a6 100644 --- a/netty/src/test/java/io/grpc/netty/MaxConnectionIdleManagerTest.java +++ b/core/src/test/java/io/grpc/internal/MaxConnectionIdleManagerTest.java @@ -14,17 +14,11 @@ * limitations under the License. */ -package io.grpc.netty; +package io.grpc.internal; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -import io.grpc.internal.FakeClock; -import io.grpc.netty.MaxConnectionIdleManager.Ticker; -import io.netty.channel.ChannelHandlerContext; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,7 +30,7 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class MaxConnectionIdleManagerTest { private final FakeClock fakeClock = new FakeClock(); - private final Ticker ticker = new Ticker() { + private final MaxConnectionIdleManager.Ticker ticker = new MaxConnectionIdleManager.Ticker() { @Override public long nanoTime() { return fakeClock.getTicker().read(); @@ -44,7 +38,7 @@ public class MaxConnectionIdleManagerTest { }; @Mock - private ChannelHandlerContext ctx; + private Runnable closure; @Before public void setUp() { @@ -54,21 +48,21 @@ public class MaxConnectionIdleManagerTest { @Test public void maxIdleReached() { MaxConnectionIdleManager maxConnectionIdleManager = - spy(new TestMaxConnectionIdleManager(123L, ticker)); + new MaxConnectionIdleManager(123L, ticker); - maxConnectionIdleManager.start(ctx, fakeClock.getScheduledExecutorService()); + maxConnectionIdleManager.start(closure, fakeClock.getScheduledExecutorService()); maxConnectionIdleManager.onTransportIdle(); fakeClock.forwardNanos(123L); - verify(maxConnectionIdleManager).close(eq(ctx)); + verify(closure).run(); } @Test public void maxIdleNotReachedAndReached() { MaxConnectionIdleManager maxConnectionIdleManager = - spy(new TestMaxConnectionIdleManager(123L, ticker)); + new MaxConnectionIdleManager(123L, ticker); - maxConnectionIdleManager.start(ctx, fakeClock.getScheduledExecutorService()); + maxConnectionIdleManager.start(closure, fakeClock.getScheduledExecutorService()); maxConnectionIdleManager.onTransportIdle(); fakeClock.forwardNanos(100L); // max idle not reached @@ -79,35 +73,25 @@ public class MaxConnectionIdleManagerTest { maxConnectionIdleManager.onTransportActive(); fakeClock.forwardNanos(100L); - verify(maxConnectionIdleManager, never()).close(any(ChannelHandlerContext.class)); + verify(closure, never()).run(); // max idle reached maxConnectionIdleManager.onTransportIdle(); fakeClock.forwardNanos(123L); - verify(maxConnectionIdleManager).close(eq(ctx)); + verify(closure).run(); } @Test public void shutdownThenMaxIdleReached() { MaxConnectionIdleManager maxConnectionIdleManager = - spy(new TestMaxConnectionIdleManager(123L, ticker)); + new MaxConnectionIdleManager(123L, ticker); - maxConnectionIdleManager.start(ctx, fakeClock.getScheduledExecutorService()); + maxConnectionIdleManager.start(closure, fakeClock.getScheduledExecutorService()); maxConnectionIdleManager.onTransportIdle(); maxConnectionIdleManager.onTransportTermination(); fakeClock.forwardNanos(123L); - verify(maxConnectionIdleManager, never()).close(any(ChannelHandlerContext.class)); - } - - private static class TestMaxConnectionIdleManager extends MaxConnectionIdleManager { - TestMaxConnectionIdleManager(long maxConnectionIdleInNanos, Ticker ticker) { - super(maxConnectionIdleInNanos, ticker); - } - - @Override - void close(ChannelHandlerContext ctx) { - } + verify(closure, never()).run(); } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 1dc73fa5e0..fa21221a0a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -46,6 +46,7 @@ import io.grpc.Status; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.LogExceptionRunnable; +import io.grpc.internal.MaxConnectionIdleManager; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; @@ -284,16 +285,7 @@ class NettyServerHandler extends AbstractNettyHandler { if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { maxConnectionIdleManager = null; } else { - maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) { - @Override - void close(ChannelHandlerContext ctx) { - if (gracefulShutdown == null) { - gracefulShutdown = new GracefulShutdown("max_idle", null); - gracefulShutdown.start(ctx); - ctx.flush(); - } - } - }; + maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos); } connection.addListener(new Http2ConnectionAdapter() { @@ -364,7 +356,16 @@ class NettyServerHandler extends AbstractNettyHandler { } if (maxConnectionIdleManager != null) { - maxConnectionIdleManager.start(ctx); + maxConnectionIdleManager.start(new Runnable() { + @Override + public void run() { + if (gracefulShutdown == null) { + gracefulShutdown = new GracefulShutdown("max_idle", null); + gracefulShutdown.start(ctx); + ctx.flush(); + } + } + }, ctx.executor()); } if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {