diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java index 5928a2e38c..0e1273c7f2 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java @@ -16,6 +16,8 @@ package io.grpc.okhttp; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.errorprone.annotations.DoNotCall; @@ -62,6 +64,10 @@ import javax.net.ssl.TrustManager; public final class OkHttpServerBuilder extends ForwardingServerBuilder { private static final Logger log = Logger.getLogger(OkHttpServerBuilder.class.getName()); private static final int DEFAULT_FLOW_CONTROL_WINDOW = 65535; + + static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE; + private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L); + private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L); private static final ObjectPool DEFAULT_TRANSPORT_EXECUTOR_POOL = OkHttpChannelBuilder.DEFAULT_TRANSPORT_EXECUTOR_POOL; @@ -110,6 +116,7 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder 0L, "max connection idle must be positive: %s", + maxConnectionIdle); + maxConnectionIdleInNanos = timeUnit.toNanos(maxConnectionIdle); + if (maxConnectionIdleInNanos >= AS_LARGE_AS_INFINITE) { + maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED; + } + if (maxConnectionIdleInNanos < MIN_MAX_CONNECTION_IDLE_NANO) { + maxConnectionIdleInNanos = MIN_MAX_CONNECTION_IDLE_NANO; + } + return this; + } + /** * Sets a time waiting for read activity after sending a keepalive ping. If the time expires * without any read activity on the connection, the connection is considered dead. An unreasonably diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java index 3a4ed86f5d..b2b581d715 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java @@ -16,6 +16,8 @@ package io.grpc.okhttp; +import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED; + import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -28,6 +30,7 @@ import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; +import io.grpc.internal.MaxConnectionIdleManager; import io.grpc.internal.ObjectPool; import io.grpc.internal.SerializingExecutor; import io.grpc.internal.ServerTransport; @@ -91,6 +94,7 @@ final class OkHttpServerTransport implements ServerTransport, private ScheduledExecutorService scheduledExecutorService; private Attributes attributes; private KeepAliveManager keepAliveManager; + private MaxConnectionIdleManager maxConnectionIdleManager; private final Object lock = new Object(); @GuardedBy("lock") @@ -189,6 +193,11 @@ final class OkHttpServerTransport implements ServerTransport, keepAliveManager.onTransportStarted(); } + if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) { + maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos); + maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService); + } + transportExecutor.execute( new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false))); } catch (Error | IOException | RuntimeException ex) { @@ -311,6 +320,9 @@ final class OkHttpServerTransport implements ServerTransport, if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); } + if (maxConnectionIdleManager != null) { + maxConnectionIdleManager.onTransportTermination(); + } transportExecutor = config.transportExecutorPool.returnObject(transportExecutor); scheduledExecutorService = config.scheduledExecutorServicePool.returnObject(scheduledExecutorService); @@ -369,6 +381,9 @@ final class OkHttpServerTransport implements ServerTransport, void streamClosed(int streamId, boolean flush) { synchronized (lock) { streams.remove(streamId); + if (maxConnectionIdleManager != null && streams.isEmpty()) { + maxConnectionIdleManager.onTransportIdle(); + } if (gracefulShutdown && streams.isEmpty()) { frameWriter.close(); } else { @@ -433,6 +448,7 @@ final class OkHttpServerTransport implements ServerTransport, final int flowControlWindow; final int maxInboundMessageSize; final int maxInboundMetadataSize; + final long maxConnectionIdleNanos; public Config( OkHttpServerBuilder builder, @@ -452,6 +468,7 @@ final class OkHttpServerTransport implements ServerTransport, flowControlWindow = builder.flowControlWindow; maxInboundMessageSize = builder.maxInboundMessageSize; maxInboundMetadataSize = builder.maxInboundMetadataSize; + maxConnectionIdleNanos = builder.maxConnectionIdleInNanos; } } @@ -697,6 +714,9 @@ final class OkHttpServerTransport implements ServerTransport, authority == null ? null : asciiString(authority), statsTraceCtx, tracer); + if (maxConnectionIdleManager != null && streams.isEmpty()) { + maxConnectionIdleManager.onTransportActive(); + } streams.put(streamId, stream); listener.streamCreated(streamForApp, method, metadata); stream.onStreamAllocated(); @@ -953,6 +973,9 @@ final class OkHttpServerTransport implements ServerTransport, synchronized (lock) { Http2ErrorStreamState stream = new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow); + if (maxConnectionIdleManager != null && streams.isEmpty()) { + maxConnectionIdleManager.onTransportActive(); + } streams.put(streamId, stream); if (inFinished) { stream.inboundDataReceived(new Buffer(), 0, true); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java index 201829d38a..2711978bb0 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java @@ -90,6 +90,7 @@ import org.mockito.ArgumentCaptor; public class OkHttpServerTransportTest { private static final int TIME_OUT_MS = 2000; private static final int INITIAL_WINDOW_SIZE = 65535; + private static final long MAX_CONNECTION_IDLE = TimeUnit.SECONDS.toNanos(1); private MockServerTransportListener mockTransportListener = new MockServerTransportListener(); private ServerTransportListener transportListener @@ -105,10 +106,11 @@ public class OkHttpServerTransportTest { private ExecutorService threadPool = Executors.newCachedThreadPool(); private HandshakerSocketFactory handshakerSocketFactory = mock(HandshakerSocketFactory.class, delegatesTo(new PlaintextHandshakerSocketFactory())); + private final FakeClock fakeClock = new FakeClock(); private OkHttpServerBuilder serverBuilder = new OkHttpServerBuilder(new InetSocketAddress(1234), handshakerSocketFactory) .executor(new FakeClock().getScheduledExecutorService()) // Executor unused - .scheduledExecutorService(new FakeClock().getScheduledExecutorService()) + .scheduledExecutorService(fakeClock.getScheduledExecutorService()) .transportExecutor(new Executor() { @Override public void execute(Runnable runnable) { if (runnable instanceof OkHttpServerTransport.FrameHandler) { @@ -119,7 +121,8 @@ public class OkHttpServerTransportTest { } } }) - .flowControlWindow(INITIAL_WINDOW_SIZE); + .flowControlWindow(INITIAL_WINDOW_SIZE) + .maxConnectionIdle(MAX_CONNECTION_IDLE, TimeUnit.NANOSECONDS); @Rule public final Timeout globalTimeout = Timeout.seconds(10); @@ -146,6 +149,64 @@ public class OkHttpServerTransportTest { shutdownAndTerminate(/*lastStreamId=*/ 0); } + @Test + public void maxConnectionIdleTimer() throws Exception { + initTransport(); + handshake(); + clientFrameWriter.headers(1, Arrays.asList( + HTTP_SCHEME_HEADER, + METHOD_HEADER, + new Header(Header.TARGET_AUTHORITY, "example.com:80"), + new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"), + CONTENT_TYPE_HEADER, + TE_HEADER)); + clientFrameWriter.synStream(true, false, 1, -1, Arrays.asList( + new Header("some-client-sent-trailer", "trailer-value"))); + pingPong(); + + MockStreamListener streamListener = mockTransportListener.newStreams.pop(); + assertThat(streamListener.messages.peek()).isNull(); + assertThat(streamListener.halfClosedCalled).isTrue(); + + streamListener.stream.close(Status.OK, new Metadata()); + + List
responseTrailers = Arrays.asList( + new Header(":status", "200"), + CONTENT_TYPE_HEADER, + new Header("grpc-status", "0")); + assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue(); + verify(clientFramesRead) + .headers(false, true, 1, -1, responseTrailers, HeadersMode.HTTP_20_HEADERS); + + fakeClock.forwardNanos(MAX_CONNECTION_IDLE); + fakeClock.forwardNanos(MAX_CONNECTION_IDLE); + verifyGracefulShutdown(1); + } + + @Test + public void maxConnectionIdleTimer_respondWithError() throws Exception { + initTransport(); + handshake(); + + clientFrameWriter.headers(1, Arrays.asList( + HTTP_SCHEME_HEADER, + METHOD_HEADER, + new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"), + CONTENT_TYPE_HEADER, + TE_HEADER, + new Header("host", "example.com:80"), + new Header("host", "example.com:80"))); + clientFrameWriter.flush(); + + verifyHttpError( + 1, 400, Status.Code.INTERNAL, "Multiple host headers disallowed. RFC7230 section 5.4"); + + pingPong(); + fakeClock.forwardNanos(MAX_CONNECTION_IDLE); + fakeClock.forwardNanos(MAX_CONNECTION_IDLE); + verifyGracefulShutdown(1); + } + @Test public void startThenShutdownTwice() throws Exception { initTransport(); @@ -316,7 +377,8 @@ public class OkHttpServerTransportTest { clientFrameWriter.data(true, 1, requestMessageFrame, (int) requestMessageFrame.size()); pingPong(); - shutdownAndVerifyGraceful(1); + serverTransport.shutdown(); + verifyGracefulShutdown(1); verify(transportListener, never()).transportTerminated(); MockStreamListener streamListener = mockTransportListener.newStreams.pop(); @@ -1038,8 +1100,8 @@ public class OkHttpServerTransportTest { return metadata; } - private void shutdownAndVerifyGraceful(int lastStreamId) throws IOException { - serverTransport.shutdown(); + private void verifyGracefulShutdown(int lastStreamId) + throws IOException { assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue(); verify(clientFramesRead).goAway(2147483647, ErrorCode.NO_ERROR, ByteString.EMPTY); assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue(); @@ -1052,7 +1114,8 @@ public class OkHttpServerTransportTest { private void shutdownAndTerminate(int lastStreamId) throws IOException { assertThat(serverTransport.getActiveStreams().length).isEqualTo(0); - shutdownAndVerifyGraceful(lastStreamId); + serverTransport.shutdown(); + verifyGracefulShutdown(lastStreamId); assertThat(clientFrameReader.nextFrame(clientFramesRead)).isFalse(); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); }