From 8d49df28ee7639dd383a00c1b2449c18924a141f Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Thu, 5 Jan 2017 17:23:34 -0800 Subject: [PATCH] all: add max message size to client calls --- core/src/main/java/io/grpc/CallOptions.java | 74 ++++++++++++++++--- .../io/grpc/inprocess/InProcessTransport.java | 6 ++ .../grpc/internal/AbstractClientStream.java | 10 +++ .../grpc/internal/AbstractClientStream2.java | 10 +++ .../java/io/grpc/internal/AbstractStream.java | 8 ++ .../io/grpc/internal/AbstractStream2.java | 4 + .../java/io/grpc/internal/ClientCallImpl.java | 6 ++ .../java/io/grpc/internal/ClientStream.java | 10 +++ .../java/io/grpc/internal/DelayedStream.java | 28 +++++++ .../io/grpc/internal/MessageDeframer.java | 20 ++--- .../java/io/grpc/internal/MessageFramer.java | 29 ++++++++ .../io/grpc/internal/NoopClientStream.java | 6 ++ .../test/java/io/grpc/CallOptionsTest.java | 29 +++++--- .../internal/AbstractClientStream2Test.java | 6 ++ .../io/grpc/internal/ClientCallImplTest.java | 19 +++++ .../integration/AbstractInteropTest.java | 72 +++++++++++++++++- .../testing/integration/InProcessTest.java | 12 ++- .../main/java/io/grpc/stub/AbstractStub.java | 19 +++++ 18 files changed, 334 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/io/grpc/CallOptions.java b/core/src/main/java/io/grpc/CallOptions.java index 0d9919d81c..5c2bef7378 100644 --- a/core/src/main/java/io/grpc/CallOptions.java +++ b/core/src/main/java/io/grpc/CallOptions.java @@ -31,6 +31,8 @@ package io.grpc; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -77,6 +79,12 @@ public final class CallOptions { */ private boolean waitForReady; + @Nullable + private Integer maxInboundMessageSize; + @Nullable + private Integer maxOutboundMessageSize; + + /** * Override the HTTP/2 authority the channel claims to be connecting to. This is not * generally safe. Overriding allows advanced users to re-use a single Channel for multiple @@ -362,6 +370,47 @@ public final class CallOptions { return waitForReady; } + /** + * Sets the maximum allowed message size acceptable from the remote peer. If unset, this will + * default to the value set on the {@link ManagedChannelBuilder#maxInboundMessageSize(int)}. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563") + public CallOptions withMaxInboundMessageSize(int maxSize) { + checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize); + CallOptions newOptions = new CallOptions(this); + newOptions.maxInboundMessageSize = maxSize; + return newOptions; + } + + /** + * Sets the maximum allowed message size acceptable sent to the remote peer. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563") + public CallOptions withMaxOutboundMessageSize(int maxSize) { + checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize); + CallOptions newOptions = new CallOptions(this); + newOptions.maxOutboundMessageSize = maxSize; + return newOptions; + } + + /** + * Gets the maximum allowed message size acceptable from the remote peer. + */ + @Nullable + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563") + public Integer getMaxInboundMessageSize() { + return maxInboundMessageSize; + } + + /** + * Gets the maximum allowed message size acceptable to send the remote peer. + */ + @Nullable + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563") + public Integer getMaxOutboundMessageSize() { + return maxOutboundMessageSize; + } + /** * Copy constructor. */ @@ -374,20 +423,23 @@ public final class CallOptions { compressorName = other.compressorName; customOptions = other.customOptions; waitForReady = other.waitForReady; + maxInboundMessageSize = other.maxInboundMessageSize; + maxOutboundMessageSize = other.maxOutboundMessageSize; } @Override public String toString() { - MoreObjects.ToStringHelper toStringHelper = MoreObjects.toStringHelper(this); - toStringHelper.add("deadline", deadline); - toStringHelper.add("authority", authority); - toStringHelper.add("callCredentials", credentials); - toStringHelper.add("affinity", affinity); - toStringHelper.add("executor", executor != null ? executor.getClass() : null); - toStringHelper.add("compressorName", compressorName); - toStringHelper.add("customOptions", Arrays.deepToString(customOptions)); - toStringHelper.add("waitForReady", isWaitForReady()); - - return toStringHelper.toString(); + return MoreObjects.toStringHelper(this) + .add("deadline", deadline) + .add("authority", authority) + .add("callCredentials", credentials) + .add("affinity", affinity) + .add("executor", executor != null ? executor.getClass() : null) + .add("compressorName", compressorName) + .add("customOptions", Arrays.deepToString(customOptions)) + .add("waitForReady", isWaitForReady()) + .add("maxInboundMessageSize", maxInboundMessageSize) + .add("maxOutboundMessageSize", maxOutboundMessageSize) + .toString(); } } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 051586632f..3bdf0f1182 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -574,6 +574,12 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport { @Override public void setDecompressor(Decompressor decompressor) {} + + @Override + public void setMaxInboundMessageSize(int maxSize) {} + + @Override + public void setMaxOutboundMessageSize(int maxSize) {} } } diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index ffdc6caf3f..c0036ca77f 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -68,6 +68,16 @@ public abstract class AbstractClientStream extends AbstractStream super(bufferAllocator, maxMessageSize, statsTraceCtx); } + @Override + public void setMaxInboundMessageSize(int maxSize) { + setMaxInboundMessageSizeProtected(maxSize); + } + + @Override + public void setMaxOutboundMessageSize(int maxSize) { + setMaxOutboundMessageSizeProtected(maxSize); + } + @Override protected final ClientStreamListener listener() { return listener; diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream2.java b/core/src/main/java/io/grpc/internal/AbstractClientStream2.java index a0745dfbd9..35de0fd5a7 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream2.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream2.java @@ -99,6 +99,16 @@ public abstract class AbstractClientStream2 extends AbstractStream2 framer = new MessageFramer(this, bufferAllocator, statsTraceCtx); } + @Override + public void setMaxOutboundMessageSize(int maxSize) { + framer.setMaxOutboundMessageSize(maxSize); + } + + @Override + public void setMaxInboundMessageSize(int maxSize) { + transportState().setMaxInboundMessageSize(maxSize); + } + /** {@inheritDoc} */ @Override protected abstract TransportState transportState(); diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index 49f07497b0..b9cc5316a3 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -138,6 +138,14 @@ public abstract class AbstractStream implements Stream { statsTraceCtx); } + protected final void setMaxInboundMessageSizeProtected(int maxSize) { + deframer.setMaxInboundMessageSize(maxSize); + } + + protected final void setMaxOutboundMessageSizeProtected(int maxSize) { + framer.setMaxOutboundMessageSize(maxSize); + } + @VisibleForTesting AbstractStream(MessageFramer framer, MessageDeframer deframer) { this.framer = framer; diff --git a/core/src/main/java/io/grpc/internal/AbstractStream2.java b/core/src/main/java/io/grpc/internal/AbstractStream2.java index 155d43ae07..c4cef5ade9 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream2.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream2.java @@ -155,6 +155,10 @@ public abstract class AbstractStream2 implements Stream { this.deframer = deframer; } + final void setMaxInboundMessageSize(int maxSize) { + deframer.setMaxInboundMessageSize(maxSize); + } + /** * Override this method to provide a stream listener. */ diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 5a964614e3..5c0c1877ea 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -229,6 +229,12 @@ final class ClientCallImpl extends ClientCall if (callOptions.getAuthority() != null) { stream.setAuthority(callOptions.getAuthority()); } + if (callOptions.getMaxInboundMessageSize() != null) { + stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize()); + } + if (callOptions.getMaxOutboundMessageSize() != null) { + stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize()); + } stream.setCompressor(compressor); stream.start(new ClientStreamListenerImpl(observer)); diff --git a/core/src/main/java/io/grpc/internal/ClientStream.java b/core/src/main/java/io/grpc/internal/ClientStream.java index c639ea0bc4..9ab9e0cda3 100644 --- a/core/src/main/java/io/grpc/internal/ClientStream.java +++ b/core/src/main/java/io/grpc/internal/ClientStream.java @@ -74,4 +74,14 @@ public interface ClientStream extends Stream { * @param listener non-{@code null} listener of stream events */ void start(ClientStreamListener listener); + + /** + * Sets the max size accepted from the remote endpoint. + */ + void setMaxInboundMessageSize(int maxSize); + + /** + * Sets the max size sent to the remote endpoint. + */ + void setMaxOutboundMessageSize(int maxSize); } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index a36f289ed7..a06cbc5546 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -72,6 +72,34 @@ class DelayedStream implements ClientStream { @GuardedBy("this") private DelayedStreamListener delayedListener; + @Override + public void setMaxInboundMessageSize(final int maxSize) { + if (passThrough) { + realStream.setMaxInboundMessageSize(maxSize); + } else { + delayOrExecute(new Runnable() { + @Override + public void run() { + realStream.setMaxInboundMessageSize(maxSize); + } + }); + } + } + + @Override + public void setMaxOutboundMessageSize(final int maxSize) { + if (passThrough) { + realStream.setMaxOutboundMessageSize(maxSize); + } else { + delayOrExecute(new Runnable() { + @Override + public void run() { + realStream.setMaxOutboundMessageSize(maxSize); + } + }); + } + } + /** * Transfers all pending and future requests and mutations to the given stream. * diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java index d1f5e2c5c6..8492c8045e 100644 --- a/core/src/main/java/io/grpc/internal/MessageDeframer.java +++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java @@ -97,7 +97,7 @@ public class MessageDeframer implements Closeable { } private final Listener listener; - private final int maxMessageSize; + private int maxInboundMessageSize; private final StatsTraceContext statsTraceCtx; private Decompressor decompressor; private State state = State.HEADER; @@ -122,10 +122,14 @@ public class MessageDeframer implements Closeable { StatsTraceContext statsTraceCtx) { this.listener = Preconditions.checkNotNull(listener, "sink"); this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor"); - this.maxMessageSize = maxMessageSize; + this.maxInboundMessageSize = maxMessageSize; this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); } + void setMaxInboundMessageSize(int messageSize) { + maxInboundMessageSize = messageSize; + } + /** * Sets the decompressor available to use. The message encoding for the stream comes later in * time, and thus will not be available at the time of construction. This should only be set @@ -338,10 +342,9 @@ public class MessageDeframer implements Closeable { // Update the required length to include the length of the frame. requiredLength = nextFrame.readInt(); - if (requiredLength < 0 || requiredLength > maxMessageSize) { - throw Status.INTERNAL.withDescription(String.format("Frame size %d exceeds maximum: %d. " - + "If this is normal, increase the maxMessageSize in the channel/server builder", - requiredLength, maxMessageSize)).asRuntimeException(); + if (requiredLength < 0 || requiredLength > maxInboundMessageSize) { + throw Status.INTERNAL.withDescription(String.format("Frame size %d exceeds maximum: %d. ", + requiredLength, maxInboundMessageSize)).asRuntimeException(); } // Continue reading the frame body. @@ -377,7 +380,7 @@ public class MessageDeframer implements Closeable { // Enforce the maxMessageSize limit on the returned stream. InputStream unlimitedStream = decompressor.decompress(ReadableBuffers.openStream(nextFrame, true)); - return new SizeEnforcingInputStream(unlimitedStream, maxMessageSize, statsTraceCtx); + return new SizeEnforcingInputStream(unlimitedStream, maxInboundMessageSize, statsTraceCtx); } catch (IOException e) { throw new RuntimeException(e); } @@ -461,8 +464,7 @@ public class MessageDeframer implements Closeable { private void verifySize() { if (count > maxMessageSize) { throw Status.INTERNAL.withDescription(String.format( - "Compressed frame exceeds maximum frame size: %d. Bytes read: %d. " - + "If this is normal, increase the maxMessageSize in the channel/server builder", + "Compressed frame exceeds maximum frame size: %d. Bytes read: %d. ", maxMessageSize, count)).asRuntimeException(); } } diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index d6090e3651..15b9ca9cdb 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -33,6 +33,7 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static java.lang.Math.min; import com.google.common.io.ByteStreams; @@ -58,6 +59,9 @@ import javax.annotation.Nullable; * MessageFramer.Sink}. */ public class MessageFramer { + + private static final int NO_MAX_OUTBOUND_MESSAGE_SIZE = -1; + /** * Sink implemented by the transport layer to receive frames and forward them to their * destination. @@ -79,6 +83,8 @@ public class MessageFramer { private static final byte COMPRESSED = 1; private final Sink sink; + // effectively final. Can only be set once. + private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE; private WritableBuffer buffer; private Compressor compressor = Codec.Identity.NONE; private boolean messageCompression = true; @@ -111,6 +117,11 @@ public class MessageFramer { return this; } + void setMaxOutboundMessageSize(int maxSize) { + checkState(maxOutboundMessageSize == NO_MAX_OUTBOUND_MESSAGE_SIZE, "max size already set"); + maxOutboundMessageSize = maxSize; + } + /** * Writes out a payload message. * @@ -155,6 +166,12 @@ public class MessageFramer { } BufferChainOutputStream bufferChain = new BufferChainOutputStream(); int written = writeToOutputStream(message, bufferChain); + if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) { + throw Status.INTERNAL + .withDescription( + String.format("message too large %d > %d", written , maxOutboundMessageSize)) + .asRuntimeException(); + } writeBufferChain(bufferChain, false); return written; } @@ -169,6 +186,12 @@ public class MessageFramer { } finally { compressingStream.close(); } + if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) { + throw Status.CANCELLED + .withDescription( + String.format("message too large %d > %d", written , maxOutboundMessageSize)) + .asRuntimeException(); + } writeBufferChain(bufferChain, true); return written; @@ -186,6 +209,12 @@ public class MessageFramer { */ private int writeKnownLengthUncompressed(InputStream message, int messageLength) throws IOException { + if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) { + throw Status.CANCELLED + .withDescription( + String.format("message too large %d > %d", messageLength , maxOutboundMessageSize)) + .asRuntimeException(); + } ByteBuffer header = ByteBuffer.wrap(headerScratch); header.put(UNCOMPRESSED); header.putInt(messageLength); diff --git a/core/src/main/java/io/grpc/internal/NoopClientStream.java b/core/src/main/java/io/grpc/internal/NoopClientStream.java index 4924ccd91c..fabd3a0096 100644 --- a/core/src/main/java/io/grpc/internal/NoopClientStream.java +++ b/core/src/main/java/io/grpc/internal/NoopClientStream.java @@ -79,4 +79,10 @@ public class NoopClientStream implements ClientStream { @Override public void setDecompressor(Decompressor decompressor) {} + + @Override + public void setMaxInboundMessageSize(int maxSize) {} + + @Override + public void setMaxOutboundMessageSize(int maxSize) {} } diff --git a/core/src/test/java/io/grpc/CallOptionsTest.java b/core/src/test/java/io/grpc/CallOptionsTest.java index baa3cdb0cb..1f51b084a9 100644 --- a/core/src/test/java/io/grpc/CallOptionsTest.java +++ b/core/src/test/java/io/grpc/CallOptionsTest.java @@ -160,25 +160,30 @@ public class CallOptionsTest { @Test public void toStringMatches_noDeadline_default() { - String expected = "CallOptions{deadline=null, authority=authority, callCredentials=null, " - + "affinity={sample=blah}, " - + "executor=class io.grpc.internal.SerializingExecutor, compressorName=compressor, " - + "customOptions=[[option1, value1], [option2, value2]], waitForReady=true}"; String actual = allSet .withDeadline(null) .withExecutor(new SerializingExecutor(directExecutor())) .withCallCredentials(null) + .withMaxInboundMessageSize(44) + .withMaxOutboundMessageSize(55) .toString(); - assertThat(actual).isEqualTo(expected); + assertThat(actual).contains("deadline=null"); + assertThat(actual).contains("authority=authority"); + assertThat(actual).contains("callCredentials=null"); + assertThat(actual).contains("affinity={sample=blah}"); + assertThat(actual).contains("executor=class io.grpc.internal.SerializingExecutor"); + assertThat(actual).contains("compressorName=compressor"); + assertThat(actual).contains("customOptions=[[option1, value1], [option2, value2]]"); + assertThat(actual).contains("waitForReady=true"); + assertThat(actual).contains("maxInboundMessageSize=44"); + assertThat(actual).contains("maxOutboundMessageSize=55"); } @Test public void toStringMatches_noDeadline() { - assertThat("CallOptions{deadline=null, authority=null, callCredentials=null, " - + "affinity={}, executor=null, compressorName=null, customOptions=[], " - + "waitForReady=false}") - .isEqualTo(CallOptions.DEFAULT.toString()); + String actual = CallOptions.DEFAULT.toString(); + assertThat(actual).contains("deadline=null"); } @Test @@ -207,19 +212,19 @@ public class CallOptionsTest { CallOptions opts = CallOptions.DEFAULT; assertThat(opts.getOption(option1)).isEqualTo("default"); } - + @Test public void withCustomOption() { CallOptions opts = CallOptions.DEFAULT.withOption(option1, "v1"); assertThat(opts.getOption(option1)).isEqualTo("v1"); } - + @Test public void withCustomOptionLastOneWins() { CallOptions opts = CallOptions.DEFAULT.withOption(option1, "v1").withOption(option1, "v2"); assertThat(opts.getOption(option1)).isEqualTo("v2"); } - + @Test public void withMultipleCustomOption() { CallOptions opts = CallOptions.DEFAULT.withOption(option1, "v1").withOption(option2, "v2"); diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStream2Test.java b/core/src/test/java/io/grpc/internal/AbstractClientStream2Test.java index 9ba9fc2650..9a77d69f25 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStream2Test.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStream2Test.java @@ -254,6 +254,12 @@ public class AbstractClientStream2Test { @Override public void setAuthority(String authority) {} + + @Override + public void setMaxInboundMessageSize(int maxSize) {} + + @Override + public void setMaxOutboundMessageSize(int maxSize) {} } private static class BaseSink implements AbstractClientStream2.Sink { diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 48d375e54c..876429575f 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -824,6 +824,25 @@ public class ClientCallImplTest { assertSame(cause, status.getCause()); } + @Test + public void startAddsMaxSize() { + CallOptions callOptions = + CallOptions.DEFAULT.withMaxInboundMessageSize(1).withMaxOutboundMessageSize(2); + ClientCallImpl call = new ClientCallImpl( + DESCRIPTOR, + new SerializingExecutor(Executors.newSingleThreadExecutor()), + callOptions, + statsTraceCtx, + provider, + deadlineCancellationExecutor) + .setDecompressorRegistry(decompressorRegistry); + + call.start(callListener, new Metadata()); + + verify(stream).setMaxInboundMessageSize(1); + verify(stream).setMaxOutboundMessageSize(2); + } + private void assertStatusInStats(Status.Code statusCode) { StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); assertNotNull(record); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 145795b7f6..9dd4d0c8dc 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -31,6 +31,7 @@ package io.grpc.testing.integration; +import static com.google.common.truth.Truth.assertThat; import static io.grpc.testing.integration.Messages.PayloadType.COMPRESSABLE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -41,6 +42,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.ComputeEngineCredentials; import com.google.auth.oauth2.GoogleCredentials; @@ -785,6 +787,74 @@ public abstract class AbstractInteropTest { } } + @Test(timeout = 10000) + public void maxInboundSize_exact() { + StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) + .build(); + int size = blockingStub.streamingOutputCall(request).next().getSerializedSize(); + + TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel) + .withMaxInboundMessageSize(size); + + stub.streamingOutputCall(request).next(); + } + + @Test(timeout = 10000) + public void maxInboundSize_tooBig() { + StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) + .build(); + int size = blockingStub.streamingOutputCall(request).next().getSerializedSize(); + + TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel) + .withMaxInboundMessageSize(size - 1); + + try { + stub.streamingOutputCall(request).next(); + fail(); + } catch (StatusRuntimeException ex) { + Status s = ex.getStatus(); + assertThat(s.getCode()).named(s.toString()).isEqualTo(Status.Code.INTERNAL); + assertThat(Throwables.getStackTraceAsString(ex)).contains("exceeds maximum"); + } + } + + @Test(timeout = 10000) + public void maxOutboundSize_exact() { + // warm up the channel and JVM + blockingStub.emptyCall(Empty.getDefaultInstance()); + + // set at least one field to ensure the size is non-zero. + StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) + .build(); + TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel) + .withMaxOutboundMessageSize(request.getSerializedSize()); + + stub.streamingOutputCall(request).next(); + } + + @Test(timeout = 10000) + public void maxOutboundSize_tooBig() { + // warm up the channel and JVM + blockingStub.emptyCall(Empty.getDefaultInstance()); + // set at least one field to ensure the size is non-zero. + StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) + .build(); + TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel) + .withMaxOutboundMessageSize(request.getSerializedSize() - 1); + try { + stub.streamingOutputCall(request).next(); + fail(); + } catch (StatusRuntimeException ex) { + Status s = ex.getStatus(); + assertThat(s.getCode()).named(s.toString()).isEqualTo(Status.Code.CANCELLED); + assertThat(Throwables.getStackTraceAsString(ex)).contains("message too large"); + } + } + protected int unaryPayloadLength() { // 10MiB. return 10485760; @@ -955,7 +1025,7 @@ public abstract class AbstractInteropTest { // Test FullDuplexCall @SuppressWarnings("unchecked") StreamObserver responseObserver = - (StreamObserver) mock(StreamObserver.class); + mock(StreamObserver.class); StreamObserver requestObserver = asyncStub.fullDuplexCall(responseObserver); requestObserver.onNext(streamingRequest); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java index 9e26f2abca..d1db91a146 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java @@ -65,8 +65,18 @@ public class InProcessTest extends AbstractInteropTest { @Override protected boolean metricsExpected() { - // TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are + // TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizes are // not counted. (https://github.com/grpc/grpc-java/issues/2284) return false; } + + @Override + public void maxInboundSize_tooBig() { + // noop, not enforced. + } + + @Override + public void maxOutboundSize_tooBig() { + // noop, not enforced. + } } diff --git a/stub/src/main/java/io/grpc/stub/AbstractStub.java b/stub/src/main/java/io/grpc/stub/AbstractStub.java index 87f0e56482..f66ccb4260 100644 --- a/stub/src/main/java/io/grpc/stub/AbstractStub.java +++ b/stub/src/main/java/io/grpc/stub/AbstractStub.java @@ -40,6 +40,7 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.Deadline; import io.grpc.ExperimentalApi; +import io.grpc.ManagedChannelBuilder; import java.util.concurrent.TimeUnit; @@ -189,4 +190,22 @@ public abstract class AbstractStub> { public final S withWaitForReady() { return build(channel, callOptions.withWaitForReady()); } + + /** + * Returns a new stub that limits the maximum acceptable message size from a remote peer. + * + *

If unset, the {@link ManagedChannelBuilder#maxInboundMessageSize(int)} limit is used. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563") + public final S withMaxInboundMessageSize(int maxSize) { + return build(channel, callOptions.withMaxInboundMessageSize(maxSize)); + } + + /** + * Returns a new stub that limits the maximum acceptable message size to send a remote peer. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563") + public final S withMaxOutboundMessageSize(int maxSize) { + return build(channel, callOptions.withMaxOutboundMessageSize(maxSize)); + } }