okhttp: add maxConnectionAge and maxConnectionAgeGrace (#9649)

This commit is contained in:
yifeizhuang 2022-10-21 12:06:26 -07:00 committed by GitHub
parent 38311e8730
commit 47ddfa4f20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 1 deletions

View File

@ -68,6 +68,9 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpSer
static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE; static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE;
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L); private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);
static final long MAX_CONNECTION_AGE_NANOS_DISABLED = Long.MAX_VALUE;
static final long MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE = Long.MAX_VALUE;
private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L);
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L); private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL = private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
@ -120,6 +123,8 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpSer
long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED; long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
boolean permitKeepAliveWithoutCalls; boolean permitKeepAliveWithoutCalls;
long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5); long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5);
long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
@VisibleForTesting @VisibleForTesting
OkHttpServerBuilder( OkHttpServerBuilder(
@ -209,6 +214,45 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpSer
return this; return this;
} }
/**
* Sets a custom max connection age, connection lasting longer than which will be gracefully
* terminated. An unreasonably small value might be increased. A random jitter of +/-10% will be
* added to it. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable
* max connection age.
*/
@Override
public OkHttpServerBuilder maxConnectionAge(long maxConnectionAge, TimeUnit timeUnit) {
checkArgument(maxConnectionAge > 0L, "max connection age must be positive: %s",
maxConnectionAge);
maxConnectionAgeInNanos = timeUnit.toNanos(maxConnectionAge);
if (maxConnectionAgeInNanos >= AS_LARGE_AS_INFINITE) {
maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
}
if (maxConnectionAgeInNanos < MIN_MAX_CONNECTION_AGE_NANO) {
maxConnectionAgeInNanos = MIN_MAX_CONNECTION_AGE_NANO;
}
return this;
}
/**
* Sets a custom grace time for the graceful connection termination. Once the max connection age
* is reached, RPCs have the grace time to complete. RPCs that do not complete in time will be
* cancelled, allowing the connection to terminate. {@code Long.MAX_VALUE} nano seconds or an
* unreasonably large value are considered infinite.
*
* @see #maxConnectionAge(long, TimeUnit)
*/
@Override
public OkHttpServerBuilder maxConnectionAgeGrace(long maxConnectionAgeGrace, TimeUnit timeUnit) {
checkArgument(maxConnectionAgeGrace >= 0L, "max connection age grace must be non-negative: %s",
maxConnectionAgeGrace);
maxConnectionAgeGraceInNanos = timeUnit.toNanos(maxConnectionAgeGrace);
if (maxConnectionAgeGraceInNanos >= AS_LARGE_AS_INFINITE) {
maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
}
return this;
}
/** /**
* Sets a time waiting for read activity after sending a keepalive ping. If the time expires * Sets a time waiting for read activity after sending a keepalive ping. If the time expires
* without any read activity on the connection, the connection is considered dead. An unreasonably * without any read activity on the connection, the connection is considered dead. An unreasonably

View File

@ -16,6 +16,7 @@
package io.grpc.okhttp; package io.grpc.okhttp;
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED; import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -31,6 +32,7 @@ import io.grpc.Status;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveEnforcer; import io.grpc.internal.KeepAliveEnforcer;
import io.grpc.internal.KeepAliveManager; import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.LogExceptionRunnable;
import io.grpc.internal.MaxConnectionIdleManager; import io.grpc.internal.MaxConnectionIdleManager;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
import io.grpc.internal.SerializingExecutor; import io.grpc.internal.SerializingExecutor;
@ -96,6 +98,7 @@ final class OkHttpServerTransport implements ServerTransport,
private Attributes attributes; private Attributes attributes;
private KeepAliveManager keepAliveManager; private KeepAliveManager keepAliveManager;
private MaxConnectionIdleManager maxConnectionIdleManager; private MaxConnectionIdleManager maxConnectionIdleManager;
private ScheduledFuture<?> maxConnectionAgeMonitor;
private final KeepAliveEnforcer keepAliveEnforcer; private final KeepAliveEnforcer keepAliveEnforcer;
private final Object lock = new Object(); private final Object lock = new Object();
@ -223,6 +226,15 @@ final class OkHttpServerTransport implements ServerTransport,
maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService); maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
} }
if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
long maxConnectionAgeInNanos =
(long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos);
maxConnectionAgeMonitor = scheduledExecutorService.schedule(
new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)),
maxConnectionAgeInNanos,
TimeUnit.NANOSECONDS);
}
transportExecutor.execute( transportExecutor.execute(
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false))); new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
} catch (Error | IOException | RuntimeException ex) { } catch (Error | IOException | RuntimeException ex) {
@ -238,6 +250,10 @@ final class OkHttpServerTransport implements ServerTransport,
@Override @Override
public void shutdown() { public void shutdown() {
shutdown(TimeUnit.SECONDS.toNanos(1L));
}
private void shutdown(Long graceTimeInNanos) {
synchronized (lock) { synchronized (lock) {
if (gracefulShutdown || abruptShutdown) { if (gracefulShutdown || abruptShutdown) {
return; return;
@ -251,7 +267,7 @@ final class OkHttpServerTransport implements ServerTransport,
// we also set a timer to limit the upper bound in case the PING is excessively stalled or // we also set a timer to limit the upper bound in case the PING is excessively stalled or
// the client is malicious. // the client is malicious.
secondGoawayTimer = scheduledExecutorService.schedule( secondGoawayTimer = scheduledExecutorService.schedule(
this::triggerGracefulSecondGoaway, 1, TimeUnit.SECONDS); this::triggerGracefulSecondGoaway, graceTimeInNanos, TimeUnit.NANOSECONDS);
frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]); frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING); frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
frameWriter.flush(); frameWriter.flush();
@ -348,6 +364,10 @@ final class OkHttpServerTransport implements ServerTransport,
if (maxConnectionIdleManager != null) { if (maxConnectionIdleManager != null) {
maxConnectionIdleManager.onTransportTermination(); maxConnectionIdleManager.onTransportTermination();
} }
if (maxConnectionAgeMonitor != null) {
maxConnectionAgeMonitor.cancel(false);
}
transportExecutor = config.transportExecutorPool.returnObject(transportExecutor); transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
scheduledExecutorService = scheduledExecutorService =
config.scheduledExecutorServicePool.returnObject(scheduledExecutorService); config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
@ -479,6 +499,8 @@ final class OkHttpServerTransport implements ServerTransport,
final long maxConnectionIdleNanos; final long maxConnectionIdleNanos;
final boolean permitKeepAliveWithoutCalls; final boolean permitKeepAliveWithoutCalls;
final long permitKeepAliveTimeInNanos; final long permitKeepAliveTimeInNanos;
final long maxConnectionAgeInNanos;
final long maxConnectionAgeGraceInNanos;
public Config( public Config(
OkHttpServerBuilder builder, OkHttpServerBuilder builder,
@ -501,6 +523,8 @@ final class OkHttpServerTransport implements ServerTransport,
maxConnectionIdleNanos = builder.maxConnectionIdleInNanos; maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls; permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls;
permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos; permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
} }
} }

View File

@ -152,6 +152,27 @@ public class OkHttpServerTransportTest {
shutdownAndTerminate(/*lastStreamId=*/ 0); shutdownAndTerminate(/*lastStreamId=*/ 0);
} }
@Test
public void maxConnectionAge() throws Exception {
serverBuilder.maxConnectionAge(5, TimeUnit.SECONDS)
.maxConnectionAgeGrace(1, TimeUnit.SECONDS);
initTransport();
handshake();
clientFrameWriter.headers(1, Arrays.asList(
HTTP_SCHEME_HEADER,
METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
CONTENT_TYPE_HEADER,
TE_HEADER));
clientFrameWriter.synStream(true, false, 1, -1, Arrays.asList(
new Header("some-client-sent-trailer", "trailer-value")));
pingPong();
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(6)); // > 1.1 * 5
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1));
verifyGracefulShutdown(1);
}
@Test @Test
public void maxConnectionIdleTimer() throws Exception { public void maxConnectionIdleTimer() throws Exception {
initTransport(); initTransport();