move max connection idle manager to core

This commit is contained in:
yifeizhuang 2022-08-23 11:51:30 -07:00
parent d3331d953d
commit 221ee494d9
3 changed files with 42 additions and 62 deletions

View File

@ -14,20 +14,19 @@
* limitations under the License. * limitations under the License.
*/ */
package io.grpc.netty; package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.grpc.internal.LogExceptionRunnable;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull; import javax.annotation.CheckForNull;
/** /**
* Monitors connection idle time; shutdowns the connection if the max connection idle is reached. * Monitors connection idle time; shutdowns the connection if the max connection idle is reached.
*/ */
abstract class MaxConnectionIdleManager { public class MaxConnectionIdleManager {
private static final Ticker systemTicker = new Ticker() { private static final Ticker systemTicker = new Ticker() {
@Override @Override
public long nanoTime() { public long nanoTime() {
@ -46,23 +45,24 @@ abstract class MaxConnectionIdleManager {
private boolean shutdownDelayed; private boolean shutdownDelayed;
private boolean isActive; private boolean isActive;
MaxConnectionIdleManager(long maxConnectionIdleInNanos) { public MaxConnectionIdleManager(long maxConnectionIdleInNanos) {
this(maxConnectionIdleInNanos, systemTicker); this(maxConnectionIdleInNanos, systemTicker);
} }
@VisibleForTesting @VisibleForTesting
MaxConnectionIdleManager(long maxConnectionIdleInNanos, Ticker ticker) { public MaxConnectionIdleManager(long maxConnectionIdleInNanos, Ticker ticker) {
this.maxConnectionIdleInNanos = maxConnectionIdleInNanos; this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
this.ticker = ticker; this.ticker = ticker;
} }
/** A {@link NettyServerHandler} was added to the transport. */ /**
void start(ChannelHandlerContext ctx) { * Start the initial scheduled shutdown given the transport status reaches max connection idle.
start(ctx, ctx.executor()); *
} * @param closeJob Closes the connection by sending GO_AWAY with status code NO_ERROR and ASCII
* debug data max_idle and then doing the graceful connection termination.
*/
@VisibleForTesting @VisibleForTesting
void start(final ChannelHandlerContext ctx, final ScheduledExecutorService scheduler) { public void start(final Runnable closeJob, final ScheduledExecutorService scheduler) {
this.scheduler = scheduler; this.scheduler = scheduler;
nextIdleMonitorTime = ticker.nanoTime() + maxConnectionIdleInNanos; nextIdleMonitorTime = ticker.nanoTime() + maxConnectionIdleInNanos;
@ -78,7 +78,7 @@ abstract class MaxConnectionIdleManager {
} }
// if isActive, exit. Will schedule a new shutdownFuture once onTransportIdle // if isActive, exit. Will schedule a new shutdownFuture once onTransportIdle
} else { } else {
close(ctx); closeJob.run();
shutdownFuture = null; shutdownFuture = null;
} }
} }
@ -88,20 +88,15 @@ abstract class MaxConnectionIdleManager {
scheduler.schedule(shutdownTask, maxConnectionIdleInNanos, TimeUnit.NANOSECONDS); scheduler.schedule(shutdownTask, maxConnectionIdleInNanos, TimeUnit.NANOSECONDS);
} }
/**
* Closes the connection by sending GO_AWAY with status code NO_ERROR and ASCII debug data
* max_idle and then doing the graceful connection termination.
*/
abstract void close(ChannelHandlerContext ctx);
/** There are outstanding RPCs on the transport. */ /** There are outstanding RPCs on the transport. */
void onTransportActive() { public void onTransportActive() {
isActive = true; isActive = true;
shutdownDelayed = true; shutdownDelayed = true;
} }
/** There are no outstanding RPCs on the transport. */ /** There are no outstanding RPCs on the transport. */
void onTransportIdle() { public void onTransportIdle() {
isActive = false; isActive = false;
if (shutdownFuture == null) { if (shutdownFuture == null) {
return; return;
@ -116,7 +111,7 @@ abstract class MaxConnectionIdleManager {
} }
/** Transport is being terminated. */ /** Transport is being terminated. */
void onTransportTermination() { public void onTransportTermination() {
if (shutdownFuture != null) { if (shutdownFuture != null) {
shutdownFuture.cancel(false); shutdownFuture.cancel(false);
shutdownFuture = null; shutdownFuture = null;
@ -124,7 +119,7 @@ abstract class MaxConnectionIdleManager {
} }
@VisibleForTesting @VisibleForTesting
interface Ticker { public interface Ticker {
long nanoTime(); long nanoTime();
} }
} }

View File

@ -14,17 +14,11 @@
* limitations under the License. * limitations under the License.
*/ */
package io.grpc.netty; package io.grpc.internal;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import io.grpc.internal.FakeClock;
import io.grpc.netty.MaxConnectionIdleManager.Ticker;
import io.netty.channel.ChannelHandlerContext;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -36,7 +30,7 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class MaxConnectionIdleManagerTest { public class MaxConnectionIdleManagerTest {
private final FakeClock fakeClock = new FakeClock(); private final FakeClock fakeClock = new FakeClock();
private final Ticker ticker = new Ticker() { private final MaxConnectionIdleManager.Ticker ticker = new MaxConnectionIdleManager.Ticker() {
@Override @Override
public long nanoTime() { public long nanoTime() {
return fakeClock.getTicker().read(); return fakeClock.getTicker().read();
@ -44,7 +38,7 @@ public class MaxConnectionIdleManagerTest {
}; };
@Mock @Mock
private ChannelHandlerContext ctx; private Runnable closure;
@Before @Before
public void setUp() { public void setUp() {
@ -54,21 +48,21 @@ public class MaxConnectionIdleManagerTest {
@Test @Test
public void maxIdleReached() { public void maxIdleReached() {
MaxConnectionIdleManager maxConnectionIdleManager = MaxConnectionIdleManager maxConnectionIdleManager =
spy(new TestMaxConnectionIdleManager(123L, ticker)); new MaxConnectionIdleManager(123L, ticker);
maxConnectionIdleManager.start(ctx, fakeClock.getScheduledExecutorService()); maxConnectionIdleManager.start(closure, fakeClock.getScheduledExecutorService());
maxConnectionIdleManager.onTransportIdle(); maxConnectionIdleManager.onTransportIdle();
fakeClock.forwardNanos(123L); fakeClock.forwardNanos(123L);
verify(maxConnectionIdleManager).close(eq(ctx)); verify(closure).run();
} }
@Test @Test
public void maxIdleNotReachedAndReached() { public void maxIdleNotReachedAndReached() {
MaxConnectionIdleManager maxConnectionIdleManager = MaxConnectionIdleManager maxConnectionIdleManager =
spy(new TestMaxConnectionIdleManager(123L, ticker)); new MaxConnectionIdleManager(123L, ticker);
maxConnectionIdleManager.start(ctx, fakeClock.getScheduledExecutorService()); maxConnectionIdleManager.start(closure, fakeClock.getScheduledExecutorService());
maxConnectionIdleManager.onTransportIdle(); maxConnectionIdleManager.onTransportIdle();
fakeClock.forwardNanos(100L); fakeClock.forwardNanos(100L);
// max idle not reached // max idle not reached
@ -79,35 +73,25 @@ public class MaxConnectionIdleManagerTest {
maxConnectionIdleManager.onTransportActive(); maxConnectionIdleManager.onTransportActive();
fakeClock.forwardNanos(100L); fakeClock.forwardNanos(100L);
verify(maxConnectionIdleManager, never()).close(any(ChannelHandlerContext.class)); verify(closure, never()).run();
// max idle reached // max idle reached
maxConnectionIdleManager.onTransportIdle(); maxConnectionIdleManager.onTransportIdle();
fakeClock.forwardNanos(123L); fakeClock.forwardNanos(123L);
verify(maxConnectionIdleManager).close(eq(ctx)); verify(closure).run();
} }
@Test @Test
public void shutdownThenMaxIdleReached() { public void shutdownThenMaxIdleReached() {
MaxConnectionIdleManager maxConnectionIdleManager = MaxConnectionIdleManager maxConnectionIdleManager =
spy(new TestMaxConnectionIdleManager(123L, ticker)); new MaxConnectionIdleManager(123L, ticker);
maxConnectionIdleManager.start(ctx, fakeClock.getScheduledExecutorService()); maxConnectionIdleManager.start(closure, fakeClock.getScheduledExecutorService());
maxConnectionIdleManager.onTransportIdle(); maxConnectionIdleManager.onTransportIdle();
maxConnectionIdleManager.onTransportTermination(); maxConnectionIdleManager.onTransportTermination();
fakeClock.forwardNanos(123L); fakeClock.forwardNanos(123L);
verify(maxConnectionIdleManager, never()).close(any(ChannelHandlerContext.class)); verify(closure, never()).run();
}
private static class TestMaxConnectionIdleManager extends MaxConnectionIdleManager {
TestMaxConnectionIdleManager(long maxConnectionIdleInNanos, Ticker ticker) {
super(maxConnectionIdleInNanos, ticker);
}
@Override
void close(ChannelHandlerContext ctx) {
}
} }
} }

View File

@ -46,6 +46,7 @@ import io.grpc.Status;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager; import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.LogExceptionRunnable; import io.grpc.internal.LogExceptionRunnable;
import io.grpc.internal.MaxConnectionIdleManager;
import io.grpc.internal.ServerTransportListener; import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer; import io.grpc.internal.TransportTracer;
@ -284,16 +285,7 @@ class NettyServerHandler extends AbstractNettyHandler {
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
maxConnectionIdleManager = null; maxConnectionIdleManager = null;
} else { } else {
maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) { maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
@Override
void close(ChannelHandlerContext ctx) {
if (gracefulShutdown == null) {
gracefulShutdown = new GracefulShutdown("max_idle", null);
gracefulShutdown.start(ctx);
ctx.flush();
}
}
};
} }
connection.addListener(new Http2ConnectionAdapter() { connection.addListener(new Http2ConnectionAdapter() {
@ -364,7 +356,16 @@ class NettyServerHandler extends AbstractNettyHandler {
} }
if (maxConnectionIdleManager != null) { if (maxConnectionIdleManager != null) {
maxConnectionIdleManager.start(ctx); maxConnectionIdleManager.start(new Runnable() {
@Override
public void run() {
if (gracefulShutdown == null) {
gracefulShutdown = new GracefulShutdown("max_idle", null);
gracefulShutdown.start(ctx);
ctx.flush();
}
}
}, ctx.executor());
} }
if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) { if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {