mirror of https://github.com/grpc/grpc-java.git
This reverts commit 7291ad44c6.
This commit is contained in:
parent
53a2d50695
commit
95b9d6db29
|
|
@ -16,8 +16,6 @@
|
|||
|
||||
package io.grpc.okhttp;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.errorprone.annotations.DoNotCall;
|
||||
|
|
@ -64,10 +62,6 @@ import javax.net.ssl.TrustManager;
|
|||
public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpServerBuilder> {
|
||||
private static final Logger log = Logger.getLogger(OkHttpServerBuilder.class.getName());
|
||||
private static final int DEFAULT_FLOW_CONTROL_WINDOW = 65535;
|
||||
|
||||
static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE;
|
||||
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);
|
||||
|
||||
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
|
||||
private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
|
||||
OkHttpChannelBuilder.DEFAULT_TRANSPORT_EXECUTOR_POOL;
|
||||
|
|
@ -116,7 +110,6 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpSer
|
|||
int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
|
||||
int maxInboundMetadataSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
|
||||
int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
|
||||
long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
|
||||
|
||||
@VisibleForTesting
|
||||
OkHttpServerBuilder(
|
||||
|
|
@ -185,27 +178,6 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpSer
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a custom max connection idle time, connection being idle for longer than which will be
|
||||
* gracefully terminated. Idleness duration is defined since the most recent time the number of
|
||||
* outstanding RPCs became zero or the connection establishment. An unreasonably small value might
|
||||
* be increased. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable
|
||||
* max connection idle.
|
||||
*/
|
||||
@Override
|
||||
public OkHttpServerBuilder maxConnectionIdle(long maxConnectionIdle, TimeUnit timeUnit) {
|
||||
checkArgument(maxConnectionIdle > 0L, "max connection idle must be positive: %s",
|
||||
maxConnectionIdle);
|
||||
maxConnectionIdleInNanos = timeUnit.toNanos(maxConnectionIdle);
|
||||
if (maxConnectionIdleInNanos >= AS_LARGE_AS_INFINITE) {
|
||||
maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
|
||||
}
|
||||
if (maxConnectionIdleInNanos < MIN_MAX_CONNECTION_IDLE_NANO) {
|
||||
maxConnectionIdleInNanos = MIN_MAX_CONNECTION_IDLE_NANO;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a time waiting for read activity after sending a keepalive ping. If the time expires
|
||||
* without any read activity on the connection, the connection is considered dead. An unreasonably
|
||||
|
|
|
|||
|
|
@ -16,8 +16,6 @@
|
|||
|
||||
package io.grpc.okhttp;
|
||||
|
||||
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
|
@ -30,7 +28,6 @@ import io.grpc.ServerStreamTracer;
|
|||
import io.grpc.Status;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.KeepAliveManager;
|
||||
import io.grpc.internal.MaxConnectionIdleManager;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.SerializingExecutor;
|
||||
import io.grpc.internal.ServerTransport;
|
||||
|
|
@ -94,7 +91,6 @@ final class OkHttpServerTransport implements ServerTransport,
|
|||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private Attributes attributes;
|
||||
private KeepAliveManager keepAliveManager;
|
||||
private MaxConnectionIdleManager maxConnectionIdleManager;
|
||||
|
||||
private final Object lock = new Object();
|
||||
@GuardedBy("lock")
|
||||
|
|
@ -193,11 +189,6 @@ final class OkHttpServerTransport implements ServerTransport,
|
|||
keepAliveManager.onTransportStarted();
|
||||
}
|
||||
|
||||
if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
|
||||
maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
|
||||
maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
|
||||
}
|
||||
|
||||
transportExecutor.execute(
|
||||
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
|
||||
} catch (Error | IOException | RuntimeException ex) {
|
||||
|
|
@ -320,9 +311,6 @@ final class OkHttpServerTransport implements ServerTransport,
|
|||
if (keepAliveManager != null) {
|
||||
keepAliveManager.onTransportTermination();
|
||||
}
|
||||
if (maxConnectionIdleManager != null) {
|
||||
maxConnectionIdleManager.onTransportTermination();
|
||||
}
|
||||
transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
|
||||
scheduledExecutorService =
|
||||
config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
|
||||
|
|
@ -381,9 +369,6 @@ final class OkHttpServerTransport implements ServerTransport,
|
|||
void streamClosed(int streamId, boolean flush) {
|
||||
synchronized (lock) {
|
||||
streams.remove(streamId);
|
||||
if (maxConnectionIdleManager != null && streams.isEmpty()) {
|
||||
maxConnectionIdleManager.onTransportIdle();
|
||||
}
|
||||
if (gracefulShutdown && streams.isEmpty()) {
|
||||
frameWriter.close();
|
||||
} else {
|
||||
|
|
@ -448,7 +433,6 @@ final class OkHttpServerTransport implements ServerTransport,
|
|||
final int flowControlWindow;
|
||||
final int maxInboundMessageSize;
|
||||
final int maxInboundMetadataSize;
|
||||
final long maxConnectionIdleNanos;
|
||||
|
||||
public Config(
|
||||
OkHttpServerBuilder builder,
|
||||
|
|
@ -468,7 +452,6 @@ final class OkHttpServerTransport implements ServerTransport,
|
|||
flowControlWindow = builder.flowControlWindow;
|
||||
maxInboundMessageSize = builder.maxInboundMessageSize;
|
||||
maxInboundMetadataSize = builder.maxInboundMetadataSize;
|
||||
maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -714,9 +697,6 @@ final class OkHttpServerTransport implements ServerTransport,
|
|||
authority == null ? null : asciiString(authority),
|
||||
statsTraceCtx,
|
||||
tracer);
|
||||
if (maxConnectionIdleManager != null && streams.isEmpty()) {
|
||||
maxConnectionIdleManager.onTransportActive();
|
||||
}
|
||||
streams.put(streamId, stream);
|
||||
listener.streamCreated(streamForApp, method, metadata);
|
||||
stream.onStreamAllocated();
|
||||
|
|
@ -973,9 +953,6 @@ final class OkHttpServerTransport implements ServerTransport,
|
|||
synchronized (lock) {
|
||||
Http2ErrorStreamState stream =
|
||||
new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
|
||||
if (maxConnectionIdleManager != null && streams.isEmpty()) {
|
||||
maxConnectionIdleManager.onTransportActive();
|
||||
}
|
||||
streams.put(streamId, stream);
|
||||
if (inFinished) {
|
||||
stream.inboundDataReceived(new Buffer(), 0, true);
|
||||
|
|
|
|||
|
|
@ -90,7 +90,6 @@ import org.mockito.ArgumentCaptor;
|
|||
public class OkHttpServerTransportTest {
|
||||
private static final int TIME_OUT_MS = 2000;
|
||||
private static final int INITIAL_WINDOW_SIZE = 65535;
|
||||
private static final long MAX_CONNECTION_IDLE = TimeUnit.SECONDS.toNanos(1);
|
||||
|
||||
private MockServerTransportListener mockTransportListener = new MockServerTransportListener();
|
||||
private ServerTransportListener transportListener
|
||||
|
|
@ -106,11 +105,10 @@ public class OkHttpServerTransportTest {
|
|||
private ExecutorService threadPool = Executors.newCachedThreadPool();
|
||||
private HandshakerSocketFactory handshakerSocketFactory
|
||||
= mock(HandshakerSocketFactory.class, delegatesTo(new PlaintextHandshakerSocketFactory()));
|
||||
private final FakeClock fakeClock = new FakeClock();
|
||||
private OkHttpServerBuilder serverBuilder
|
||||
= new OkHttpServerBuilder(new InetSocketAddress(1234), handshakerSocketFactory)
|
||||
.executor(new FakeClock().getScheduledExecutorService()) // Executor unused
|
||||
.scheduledExecutorService(fakeClock.getScheduledExecutorService())
|
||||
.scheduledExecutorService(new FakeClock().getScheduledExecutorService())
|
||||
.transportExecutor(new Executor() {
|
||||
@Override public void execute(Runnable runnable) {
|
||||
if (runnable instanceof OkHttpServerTransport.FrameHandler) {
|
||||
|
|
@ -121,8 +119,7 @@ public class OkHttpServerTransportTest {
|
|||
}
|
||||
}
|
||||
})
|
||||
.flowControlWindow(INITIAL_WINDOW_SIZE)
|
||||
.maxConnectionIdle(MAX_CONNECTION_IDLE, TimeUnit.NANOSECONDS);
|
||||
.flowControlWindow(INITIAL_WINDOW_SIZE);
|
||||
|
||||
@Rule public final Timeout globalTimeout = Timeout.seconds(10);
|
||||
|
||||
|
|
@ -149,63 +146,6 @@ public class OkHttpServerTransportTest {
|
|||
shutdownAndTerminate(/*lastStreamId=*/ 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxConnectionIdleTimer() throws Exception {
|
||||
initTransport();
|
||||
handshake();
|
||||
clientFrameWriter.headers(1, Arrays.asList(
|
||||
HTTP_SCHEME_HEADER,
|
||||
METHOD_HEADER,
|
||||
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
|
||||
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
|
||||
CONTENT_TYPE_HEADER,
|
||||
TE_HEADER));
|
||||
clientFrameWriter.synStream(true, false, 1, -1, Arrays.asList(
|
||||
new Header("some-client-sent-trailer", "trailer-value")));
|
||||
pingPong();
|
||||
|
||||
MockStreamListener streamListener = mockTransportListener.newStreams.pop();
|
||||
assertThat(streamListener.messages.peek()).isNull();
|
||||
assertThat(streamListener.halfClosedCalled).isTrue();
|
||||
|
||||
streamListener.stream.close(Status.OK, new Metadata());
|
||||
|
||||
List<Header> responseTrailers = Arrays.asList(
|
||||
new Header(":status", "200"),
|
||||
CONTENT_TYPE_HEADER,
|
||||
new Header("grpc-status", "0"));
|
||||
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
|
||||
verify(clientFramesRead)
|
||||
.headers(false, true, 1, -1, responseTrailers, HeadersMode.HTTP_20_HEADERS);
|
||||
|
||||
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
|
||||
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
|
||||
verifyGracefulShutdown(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxConnectionIdleTimer_respondWithError() throws Exception {
|
||||
initTransport();
|
||||
handshake();
|
||||
|
||||
clientFrameWriter.headers(1, Arrays.asList(
|
||||
HTTP_SCHEME_HEADER,
|
||||
METHOD_HEADER,
|
||||
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
|
||||
CONTENT_TYPE_HEADER,
|
||||
TE_HEADER,
|
||||
new Header("host", "example.com:80"),
|
||||
new Header("host", "example.com:80")));
|
||||
clientFrameWriter.flush();
|
||||
|
||||
verifyHttpError(
|
||||
1, 400, Status.Code.INTERNAL, "Multiple host headers disallowed. RFC7230 section 5.4");
|
||||
|
||||
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
|
||||
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
|
||||
verifyGracefulShutdown(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void startThenShutdownTwice() throws Exception {
|
||||
initTransport();
|
||||
|
|
@ -376,8 +316,7 @@ public class OkHttpServerTransportTest {
|
|||
clientFrameWriter.data(true, 1, requestMessageFrame, (int) requestMessageFrame.size());
|
||||
pingPong();
|
||||
|
||||
serverTransport.shutdown();
|
||||
verifyGracefulShutdown(1);
|
||||
shutdownAndVerifyGraceful(1);
|
||||
verify(transportListener, never()).transportTerminated();
|
||||
|
||||
MockStreamListener streamListener = mockTransportListener.newStreams.pop();
|
||||
|
|
@ -1099,8 +1038,8 @@ public class OkHttpServerTransportTest {
|
|||
return metadata;
|
||||
}
|
||||
|
||||
private void verifyGracefulShutdown(int lastStreamId)
|
||||
throws IOException {
|
||||
private void shutdownAndVerifyGraceful(int lastStreamId) throws IOException {
|
||||
serverTransport.shutdown();
|
||||
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
|
||||
verify(clientFramesRead).goAway(2147483647, ErrorCode.NO_ERROR, ByteString.EMPTY);
|
||||
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
|
||||
|
|
@ -1113,8 +1052,7 @@ public class OkHttpServerTransportTest {
|
|||
|
||||
private void shutdownAndTerminate(int lastStreamId) throws IOException {
|
||||
assertThat(serverTransport.getActiveStreams().length).isEqualTo(0);
|
||||
serverTransport.shutdown();
|
||||
verifyGracefulShutdown(lastStreamId);
|
||||
shutdownAndVerifyGraceful(lastStreamId);
|
||||
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isFalse();
|
||||
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue