diff --git a/api/src/main/java/io/grpc/CallOptions.java b/api/src/main/java/io/grpc/CallOptions.java index 87493d2ba0..9c54bf5026 100644 --- a/api/src/main/java/io/grpc/CallOptions.java +++ b/api/src/main/java/io/grpc/CallOptions.java @@ -79,6 +79,8 @@ public final class CallOptions { private final Integer maxInboundMessageSize; @Nullable private final Integer maxOutboundMessageSize; + @Nullable + private final Integer onReadyThreshold; private CallOptions(Builder builder) { this.deadline = builder.deadline; @@ -91,6 +93,7 @@ public final class CallOptions { this.waitForReady = builder.waitForReady; this.maxInboundMessageSize = builder.maxInboundMessageSize; this.maxOutboundMessageSize = builder.maxOutboundMessageSize; + this.onReadyThreshold = builder.onReadyThreshold; } static class Builder { @@ -105,6 +108,7 @@ public final class CallOptions { Boolean waitForReady; Integer maxInboundMessageSize; Integer maxOutboundMessageSize; + Integer onReadyThreshold; private CallOptions build() { return new CallOptions(this); @@ -203,6 +207,46 @@ public final class CallOptions { return builder.build(); } + /** + * Specifies how many bytes must be queued before the call is + * considered not ready to send more messages. + * + * @param numBytes The number of bytes that must be queued. Must be a + * positive integer. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021") + public CallOptions withOnReadyThreshold(int numBytes) { + checkArgument(numBytes > 0, "numBytes must be positive: %s", numBytes); + Builder builder = toBuilder(this); + builder.onReadyThreshold = numBytes; + return builder.build(); + } + + /** + * Resets to the default number of bytes that must be queued before the + * call will leave the + * 'wait for ready' state. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021") + public CallOptions clearOnReadyThreshold() { + Builder builder = toBuilder(this); + builder.onReadyThreshold = null; + return builder.build(); + } + + /** + * Returns to the default number of bytes that must be queued before the + * call will leave the + * 'wait for ready' state. + * + * @return null if the default threshold is used. + */ + @Nullable + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021") + public Integer getOnReadyThreshold() { + return onReadyThreshold; + } + /** * Returns the compressor's name. */ diff --git a/api/src/main/java/io/grpc/PartialForwardingServerCall.java b/api/src/main/java/io/grpc/PartialForwardingServerCall.java index a7da647308..a313407b23 100644 --- a/api/src/main/java/io/grpc/PartialForwardingServerCall.java +++ b/api/src/main/java/io/grpc/PartialForwardingServerCall.java @@ -58,6 +58,12 @@ abstract class PartialForwardingServerCall extends ServerCall { // noop } + /** + * A hint to the call that specifies how many bytes must be queued before + * {@link #isReady()} will return false. A call may ignore this property if + * unsupported. This may only be set before any messages are sent. + * + * @param numBytes The number of bytes that must be queued. Must be a + * positive integer. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021") + public void setOnReadyThreshold(int numBytes) { + checkArgument(numBytes > 0, "numBytes must be positive: %s", numBytes); + } + /** * Returns the level of security guarantee in communications * diff --git a/binder/src/main/java/io/grpc/binder/internal/MultiMessageServerStream.java b/binder/src/main/java/io/grpc/binder/internal/MultiMessageServerStream.java index cba18ba5b2..f54769caef 100644 --- a/binder/src/main/java/io/grpc/binder/internal/MultiMessageServerStream.java +++ b/binder/src/main/java/io/grpc/binder/internal/MultiMessageServerStream.java @@ -64,6 +64,11 @@ final class MultiMessageServerStream implements ServerStream { } } + @Override + public void setOnReadyThreshold(int numBytes) { + // No-op + } + @Override public boolean isReady() { return outbound.isReady(); diff --git a/binder/src/main/java/io/grpc/binder/internal/SingleMessageServerStream.java b/binder/src/main/java/io/grpc/binder/internal/SingleMessageServerStream.java index 92e9ff4477..383bd7a259 100644 --- a/binder/src/main/java/io/grpc/binder/internal/SingleMessageServerStream.java +++ b/binder/src/main/java/io/grpc/binder/internal/SingleMessageServerStream.java @@ -67,6 +67,11 @@ final class SingleMessageServerStream implements ServerStream { } } + @Override + public void setOnReadyThreshold(int numBytes) { + // No-op + } + @Override public boolean isReady() { return outbound.isReady(); diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index a4ebfa52d6..e929716e8c 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -243,9 +243,13 @@ public abstract class AbstractClientStream extends AbstractStream protected TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, - TransportTracer transportTracer) { + TransportTracer transportTracer, + CallOptions options) { super(maxMessageSize, statsTraceCtx, transportTracer); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); + if (options.getOnReadyThreshold() != null) { + this.setOnReadyThreshold(options.getOnReadyThreshold()); + } } private void setFullStreamDecompression(boolean fullStreamDecompression) { diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index d781cfa9b8..e2a0829200 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -177,6 +177,19 @@ public abstract class AbstractServerStream extends AbstractStream return statsTraceCtx; } + /** + * A hint to the stream that specifies how many bytes must be queued before + * {@link #isReady()} will return false. A stream may ignore this property + * if unsupported. This may only be set before any messages are sent. + * + * @param numBytes The number of bytes that must be queued. Must be a + * positive integer. + */ + @Override + public void setOnReadyThreshold(int numBytes) { + super.setOnReadyThreshold(numBytes); + } + /** * This should only be called from the transport thread (except for private interactions with * {@code AbstractServerStream}). @@ -243,6 +256,8 @@ public abstract class AbstractServerStream extends AbstractStream } } + + @Override protected ServerStreamListener listener() { return listener; diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index cda08576ea..4b7486e466 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -77,6 +77,19 @@ public abstract class AbstractStream implements Stream { } } + /** + * A hint to the stream that specifies how many bytes must be queued before + * {@link #isReady()} will return false. A stream may ignore this property if + * unsupported. This may only be set during stream initialization before + * any messages are set. + * + * @param numBytes The number of bytes that must be queued. Must be a + * positive integer. + */ + protected void setOnReadyThreshold(int numBytes) { + transportState().setOnReadyThreshold(numBytes); + } + /** * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed * (half closure on client; closure on server). @@ -143,6 +156,9 @@ public abstract class AbstractStream implements Stream { @GuardedBy("onReadyLock") private boolean deallocated; + @GuardedBy("onReadyLock") + private int onReadyThreshold; + protected TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, @@ -157,6 +173,7 @@ public abstract class AbstractStream implements Stream { transportTracer); // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break. deframer = rawDeframer; + onReadyThreshold = DEFAULT_ONREADY_THRESHOLD; } final void optimizeForDirectExecutor() { @@ -178,6 +195,20 @@ public abstract class AbstractStream implements Stream { */ protected abstract StreamListener listener(); + /** + * A hint to the stream that specifies how many bytes must be queued before + * {@link #isReady()} will return false. A stream may ignore this property if + * unsupported. This may only be set before any messages are sent. + * + * @param numBytes The number of bytes that must be queued. Must be a + * positive integer. + */ + void setOnReadyThreshold(int numBytes) { + synchronized (onReadyLock) { + this.onReadyThreshold = numBytes; + } + } + @Override public void messagesAvailable(StreamListener.MessageProducer producer) { listener().messagesAvailable(producer); @@ -259,7 +290,7 @@ public abstract class AbstractStream implements Stream { private boolean isReady() { synchronized (onReadyLock) { - return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated; + return allocated && numSentBytesQueued < onReadyThreshold && !deallocated; } } @@ -316,9 +347,9 @@ public abstract class AbstractStream implements Stream { synchronized (onReadyLock) { checkState(allocated, "onStreamAllocated was not called, but it seems the stream is active"); - boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD; + boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold; numSentBytesQueued -= numBytes; - boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD; + boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold; doNotify = !belowThresholdBefore && belowThresholdAfter; } if (doNotify) { diff --git a/core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java b/core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java index c2bcb350a9..cc36ed3c0b 100644 --- a/core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java +++ b/core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java @@ -18,6 +18,7 @@ package io.grpc.internal; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; +import io.grpc.CallOptions; import io.grpc.InternalMetadata; import io.grpc.InternalStatus; import io.grpc.Metadata; @@ -67,8 +68,9 @@ public abstract class Http2ClientStreamTransportState extends AbstractClientStre protected Http2ClientStreamTransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, - TransportTracer transportTracer) { - super(maxMessageSize, statsTraceCtx, transportTracer); + TransportTracer transportTracer, + CallOptions options) { + super(maxMessageSize, statsTraceCtx, transportTracer, options); } /** diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 1bfee21e05..dda36258e7 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -184,6 +184,11 @@ final class ServerCallImpl extends ServerCall { stream.setMessageCompression(enable); } + @Override + public void setOnReadyThreshold(int numBytes) { + stream.setOnReadyThreshold(numBytes); + } + @Override public void setCompression(String compressorName) { // Added here to give a better error message. diff --git a/core/src/main/java/io/grpc/internal/ServerStream.java b/core/src/main/java/io/grpc/internal/ServerStream.java index 861d5f36cc..aa5ba10329 100644 --- a/core/src/main/java/io/grpc/internal/ServerStream.java +++ b/core/src/main/java/io/grpc/internal/ServerStream.java @@ -96,4 +96,15 @@ public interface ServerStream extends Stream { * The HTTP/2 stream id, or {@code -1} if not supported. */ int streamId(); + + /** + * A hint to the stream that specifies how many bytes must be queued before + * {@link #isReady()} will return false. A stream may ignore this property if + * unsupported. This may only be set during stream initialization before + * any messages are set. + * + * @param numBytes The number of bytes that must be queued. Must be a + * positive integer. + */ + void setOnReadyThreshold(int numBytes); } diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 4ce8a467d9..ad3b59030d 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -20,6 +20,7 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -482,6 +483,41 @@ public class AbstractClientStreamTest { assertThat(insight.toString()).isEqualTo("[remote_addr=fake_server_addr]"); } + @Test + public void overrideOnReadyThreshold() { + AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class); + BaseTransportState state = new BaseTransportState(statsTraceCtx, transportTracer); + AbstractClientStream stream = new BaseAbstractClientStream( + allocator, + state, + sink, + statsTraceCtx, + transportTracer, + CallOptions.DEFAULT.withOnReadyThreshold(10), + true); + ClientStreamListener listener = new NoopClientStreamListener(); + stream.start(listener); + state.onStreamAllocated(); + + // Stream should be ready. 0 bytes are queued. + assertTrue(stream.isReady()); + + // Queue some bytes above the custom threshold and check that the stream is not ready. + stream.onSendingBytes(100); + assertFalse(stream.isReady()); + + // Simulate a flush and verify ready now. + stream.transportState().onSentBytes(91); + assertTrue(stream.isReady()); + } + + @Test + public void resetOnReadyThreshold() { + CallOptions options = CallOptions.DEFAULT.withOnReadyThreshold(10); + assertEquals(Integer.valueOf(10), options.getOnReadyThreshold()); + assertNull(options.clearOnReadyThreshold().getOnReadyThreshold()); + } + /** * No-op base class for testing. */ @@ -517,9 +553,23 @@ public class AbstractClientStreamTest { StatsTraceContext statsTraceCtx, TransportTracer transportTracer, boolean useGet) { - super(allocator, statsTraceCtx, transportTracer, new Metadata(), CallOptions.DEFAULT, useGet); + this(allocator, state, sink, statsTraceCtx, transportTracer, CallOptions.DEFAULT, useGet); + } + + public BaseAbstractClientStream( + WritableBufferAllocator allocator, + TransportState state, + Sink sink, + StatsTraceContext statsTraceCtx, + TransportTracer transportTracer, + CallOptions callOptions, + boolean useGet) { + super(allocator, statsTraceCtx, transportTracer, new Metadata(), callOptions, useGet); this.state = state; this.sink = sink; + if (callOptions.getOnReadyThreshold() != null) { + this.transportState().setOnReadyThreshold(callOptions.getOnReadyThreshold()); + } } @Override @@ -567,7 +617,7 @@ public class AbstractClientStreamTest { } public BaseTransportState(StatsTraceContext statsTraceCtx, TransportTracer transportTracer) { - super(DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer); + super(DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer, CallOptions.DEFAULT); } @Override diff --git a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java index 618af766c0..66fa92b1cf 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java @@ -18,6 +18,7 @@ package io.grpc.internal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -371,6 +372,15 @@ public class AbstractServerStreamTest { assertEquals("bad", metadataCaptor.getValue().get(InternalStatus.MESSAGE_KEY)); } + @Test + public void changeOnReadyThreshold() { + stream.setListener(new ServerStreamListenerBase()); + stream.transportState().onStreamAllocated(); + stream.setOnReadyThreshold(Integer.MAX_VALUE); + stream.onSendingBytes(Integer.MAX_VALUE - 1); + assertTrue(stream.isReady()); + } + private static class ServerStreamListenerBase implements ServerStreamListener { @Override public void messagesAvailable(MessageProducer producer) { diff --git a/core/src/test/java/io/grpc/internal/Http2ClientStreamTransportStateTest.java b/core/src/test/java/io/grpc/internal/Http2ClientStreamTransportStateTest.java index d49e41a4f4..e587a7709e 100644 --- a/core/src/test/java/io/grpc/internal/Http2ClientStreamTransportStateTest.java +++ b/core/src/test/java/io/grpc/internal/Http2ClientStreamTransportStateTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import io.grpc.CallOptions; import io.grpc.InternalMetadata; import io.grpc.Metadata; import io.grpc.Status; @@ -347,9 +348,22 @@ public class Http2ClientStreamTransportStateTest { assertEquals(Code.UNKNOWN, statusCaptor.getValue().getCode()); } + @Test + public void transportStateWithOnReadyThreshold() { + BaseTransportState state = new BaseTransportState(transportTracer, + CallOptions.DEFAULT.withOnReadyThreshold(Integer.MAX_VALUE)); + assertEquals(Integer.MAX_VALUE, state.onReadyThreshold); + } + private static class BaseTransportState extends Http2ClientStreamTransportState { + private int onReadyThreshold; + + public BaseTransportState(TransportTracer transportTracer, CallOptions options) { + super(DEFAULT_MAX_MESSAGE_SIZE, StatsTraceContext.NOOP, transportTracer, options); + } + public BaseTransportState(TransportTracer transportTracer) { - super(DEFAULT_MAX_MESSAGE_SIZE, StatsTraceContext.NOOP, transportTracer); + this(transportTracer, CallOptions.DEFAULT); } @Override @@ -367,5 +381,11 @@ public class Http2ClientStreamTransportStateTest { public void runOnTransportThread(Runnable r) { r.run(); } + + @Override + void setOnReadyThreshold(int numBytes) { + onReadyThreshold = numBytes; + super.setOnReadyThreshold(numBytes); + } } } diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index 833f5109e3..c3c2ab15e2 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -147,6 +147,12 @@ public class ServerCallImplTest { verify(stream).request(10); } + @Test + public void setOnReadyThreshold() { + call.setOnReadyThreshold(10); + verify(stream).setOnReadyThreshold(10); + } + @Test public void sendHeader_firstCall() { Metadata headers = new Metadata(); diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java index d44b716146..5bf7118d2f 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java @@ -124,7 +124,8 @@ class CronetClientStream extends AbstractClientStream { this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY); this.annotation = callOptions.getOption(CRONET_ANNOTATION_KEY); this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY); - this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer); + this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer, + callOptions); // Tests expect the "plain" deframer behavior, not MigratingDeframer // https://github.com/grpc/grpc-java/issues/7140 @@ -270,8 +271,8 @@ class CronetClientStream extends AbstractClientStream { public TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock, - TransportTracer transportTracer) { - super(maxMessageSize, statsTraceCtx, transportTracer); + TransportTracer transportTracer, CallOptions options) { + super(maxMessageSize, statsTraceCtx, transportTracer, options); this.lock = Preconditions.checkNotNull(lock, "lock"); } diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index 91e519f9ef..f091714874 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -697,6 +697,11 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans public int streamId() { return -1; } + + @Override + public void setOnReadyThreshold(int numBytes) { + // noop + } } private class InProcessClientStream implements ClientStream { diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 0c0bb7eeb8..d88520a749 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -237,8 +237,9 @@ class NettyClientStream extends AbstractClientStream { int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, - String methodName) { - super(maxMessageSize, statsTraceCtx, transportTracer); + String methodName, + CallOptions options) { + super(maxMessageSize, statsTraceCtx, transportTracer, options); this.methodName = checkNotNull(methodName, "methodName"); this.handler = checkNotNull(handler, "handler"); this.eventLoop = checkNotNull(eventLoop, "eventLoop"); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 689dd847d5..a82470cacb 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -188,7 +188,8 @@ class NettyClientTransport implements ConnectionClientTransport { maxMessageSize, statsTraceCtx, transportTracer, - method.getFullMethodName()) { + method.getFullMethodName(), + callOptions) { @Override protected Status statusFromFailedFuture(ChannelFuture f) { return NettyClientTransport.this.statusFromFailedFuture(f); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 9cb2c043e5..bbe9fab923 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -1000,7 +1000,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase extends CallStreamObserver */ public abstract void setOnCancelHandler(Runnable onCancelHandler); + + /** + * A hint to the call that specifies how many bytes must be queued before + * {@link #isReady()} will return false. A call may ignore this property if + * unsupported. This may only be set during stream initialization before + * any messages are set. + * + * @param numBytes The number of bytes that must be queued. Must be a + * positive integer. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021") + public abstract void setOnReadyThreshold(int numBytes); + /** * Sets the compression algorithm to use for the call. May only be called before sending any * messages. Default gRPC servers support the "gzip" compressor. diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 83954af967..7990a5b34c 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -395,7 +395,7 @@ public final class ServerCalls { call.close(Status.OK, new Metadata()); completed = true; } - + @Override public boolean isReady() { return call.isReady(); @@ -422,6 +422,14 @@ public final class ServerCalls { this.onCancelHandler = onCancelHandler; } + @Override + public void setOnReadyThreshold(int numBytes) { + checkState(!frozen, "Cannot alter setOnReadyThreshold after initialization. May only be " + + "called during the initial call to the application, before the service returns its " + + "StreamObserver"); + call.setOnReadyThreshold(numBytes); + } + @Override public void disableAutoInboundFlowControl() { disableAutoRequest(); diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index 7227d26c5b..1e51ac1011 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -451,6 +451,31 @@ public class ServerCallsTest { assertEquals(2, onReadyCalled.get()); } + @Test + public void setOnReadyThreshold() throws Exception { + final int testThreshold = Integer.MAX_VALUE; + ServerCallHandler callHandler = + ServerCalls.asyncServerStreamingCall( + new ServerCalls.ServerStreamingMethod() { + @Override + public void invoke(Integer req, StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + serverCallObserver.setOnReadyThreshold(req); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(serverCall, new Metadata()); + serverCall.isReady = true; + serverCall.isCancelled = false; + callListener.onReady(); + callListener.onMessage(testThreshold); + // half-closing triggers the unary request delivery and onReady + callListener.onHalfClose(); + + assertEquals(testThreshold, serverCall.getOnReadyThreshold()); + } + @Test public void clientSendsOne_errorMissingRequest_unary() { ServerCallRecorder serverCall = new ServerCallRecorder(UNARY_METHOD); @@ -626,6 +651,7 @@ public class ServerCallsTest { private Status status; private boolean isCancelled; private boolean isReady; + private int onReadyThreshold; public ServerCallRecorder(MethodDescriptor methodDescriptor) { this.methodDescriptor = methodDescriptor; @@ -660,9 +686,19 @@ public class ServerCallsTest { return isReady; } + @Override + public void setOnReadyThreshold(int numBytes) { + super.setOnReadyThreshold(numBytes); + onReadyThreshold = numBytes; + } + @Override public MethodDescriptor getMethodDescriptor() { return methodDescriptor; } + + public int getOnReadyThreshold() { + return onReadyThreshold; + } } } diff --git a/util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java b/util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java index bead2be4e9..b477ae1fdf 100644 --- a/util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java +++ b/util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java @@ -219,6 +219,17 @@ public final class TransmitStatusRuntimeExceptionInterceptor implements ServerIn }); } + @Override + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021") + public void setOnReadyThreshold(final int numBytes) { + serializingExecutor.execute(new Runnable() { + @Override + public void run() { + SerializingServerCall.super.setOnReadyThreshold(numBytes); + } + }); + } + @Override public void setCompression(final String compressor) { serializingExecutor.execute(new Runnable() {