all: add max message size to client calls

This commit is contained in:
Carl Mastrangelo 2017-01-05 17:23:34 -08:00 committed by GitHub
parent 6ed3cbb143
commit 8d49df28ee
18 changed files with 334 additions and 34 deletions

View File

@ -31,6 +31,8 @@
package io.grpc;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
@ -77,6 +79,12 @@ public final class CallOptions {
*/
private boolean waitForReady;
@Nullable
private Integer maxInboundMessageSize;
@Nullable
private Integer maxOutboundMessageSize;
/**
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
@ -362,6 +370,47 @@ public final class CallOptions {
return waitForReady;
}
/**
* Sets the maximum allowed message size acceptable from the remote peer. If unset, this will
* default to the value set on the {@link ManagedChannelBuilder#maxInboundMessageSize(int)}.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public CallOptions withMaxInboundMessageSize(int maxSize) {
checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize);
CallOptions newOptions = new CallOptions(this);
newOptions.maxInboundMessageSize = maxSize;
return newOptions;
}
/**
* Sets the maximum allowed message size acceptable sent to the remote peer.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public CallOptions withMaxOutboundMessageSize(int maxSize) {
checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize);
CallOptions newOptions = new CallOptions(this);
newOptions.maxOutboundMessageSize = maxSize;
return newOptions;
}
/**
* Gets the maximum allowed message size acceptable from the remote peer.
*/
@Nullable
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public Integer getMaxInboundMessageSize() {
return maxInboundMessageSize;
}
/**
* Gets the maximum allowed message size acceptable to send the remote peer.
*/
@Nullable
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public Integer getMaxOutboundMessageSize() {
return maxOutboundMessageSize;
}
/**
* Copy constructor.
*/
@ -374,20 +423,23 @@ public final class CallOptions {
compressorName = other.compressorName;
customOptions = other.customOptions;
waitForReady = other.waitForReady;
maxInboundMessageSize = other.maxInboundMessageSize;
maxOutboundMessageSize = other.maxOutboundMessageSize;
}
@Override
public String toString() {
MoreObjects.ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
toStringHelper.add("deadline", deadline);
toStringHelper.add("authority", authority);
toStringHelper.add("callCredentials", credentials);
toStringHelper.add("affinity", affinity);
toStringHelper.add("executor", executor != null ? executor.getClass() : null);
toStringHelper.add("compressorName", compressorName);
toStringHelper.add("customOptions", Arrays.deepToString(customOptions));
toStringHelper.add("waitForReady", isWaitForReady());
return toStringHelper.toString();
return MoreObjects.toStringHelper(this)
.add("deadline", deadline)
.add("authority", authority)
.add("callCredentials", credentials)
.add("affinity", affinity)
.add("executor", executor != null ? executor.getClass() : null)
.add("compressorName", compressorName)
.add("customOptions", Arrays.deepToString(customOptions))
.add("waitForReady", isWaitForReady())
.add("maxInboundMessageSize", maxInboundMessageSize)
.add("maxOutboundMessageSize", maxOutboundMessageSize)
.toString();
}
}

View File

@ -574,6 +574,12 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport {
@Override
public void setDecompressor(Decompressor decompressor) {}
@Override
public void setMaxInboundMessageSize(int maxSize) {}
@Override
public void setMaxOutboundMessageSize(int maxSize) {}
}
}

View File

@ -68,6 +68,16 @@ public abstract class AbstractClientStream extends AbstractStream
super(bufferAllocator, maxMessageSize, statsTraceCtx);
}
@Override
public void setMaxInboundMessageSize(int maxSize) {
setMaxInboundMessageSizeProtected(maxSize);
}
@Override
public void setMaxOutboundMessageSize(int maxSize) {
setMaxOutboundMessageSizeProtected(maxSize);
}
@Override
protected final ClientStreamListener listener() {
return listener;

View File

@ -99,6 +99,16 @@ public abstract class AbstractClientStream2 extends AbstractStream2
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
}
@Override
public void setMaxOutboundMessageSize(int maxSize) {
framer.setMaxOutboundMessageSize(maxSize);
}
@Override
public void setMaxInboundMessageSize(int maxSize) {
transportState().setMaxInboundMessageSize(maxSize);
}
/** {@inheritDoc} */
@Override
protected abstract TransportState transportState();

View File

@ -138,6 +138,14 @@ public abstract class AbstractStream implements Stream {
statsTraceCtx);
}
protected final void setMaxInboundMessageSizeProtected(int maxSize) {
deframer.setMaxInboundMessageSize(maxSize);
}
protected final void setMaxOutboundMessageSizeProtected(int maxSize) {
framer.setMaxOutboundMessageSize(maxSize);
}
@VisibleForTesting
AbstractStream(MessageFramer framer, MessageDeframer deframer) {
this.framer = framer;

View File

@ -155,6 +155,10 @@ public abstract class AbstractStream2 implements Stream {
this.deframer = deframer;
}
final void setMaxInboundMessageSize(int maxSize) {
deframer.setMaxInboundMessageSize(maxSize);
}
/**
* Override this method to provide a stream listener.
*/

View File

@ -229,6 +229,12 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
if (callOptions.getAuthority() != null) {
stream.setAuthority(callOptions.getAuthority());
}
if (callOptions.getMaxInboundMessageSize() != null) {
stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
}
if (callOptions.getMaxOutboundMessageSize() != null) {
stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
}
stream.setCompressor(compressor);
stream.start(new ClientStreamListenerImpl(observer));

View File

@ -74,4 +74,14 @@ public interface ClientStream extends Stream {
* @param listener non-{@code null} listener of stream events
*/
void start(ClientStreamListener listener);
/**
* Sets the max size accepted from the remote endpoint.
*/
void setMaxInboundMessageSize(int maxSize);
/**
* Sets the max size sent to the remote endpoint.
*/
void setMaxOutboundMessageSize(int maxSize);
}

View File

@ -72,6 +72,34 @@ class DelayedStream implements ClientStream {
@GuardedBy("this")
private DelayedStreamListener delayedListener;
@Override
public void setMaxInboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxInboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxInboundMessageSize(maxSize);
}
});
}
}
@Override
public void setMaxOutboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxOutboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxOutboundMessageSize(maxSize);
}
});
}
}
/**
* Transfers all pending and future requests and mutations to the given stream.
*

View File

@ -97,7 +97,7 @@ public class MessageDeframer implements Closeable {
}
private final Listener listener;
private final int maxMessageSize;
private int maxInboundMessageSize;
private final StatsTraceContext statsTraceCtx;
private Decompressor decompressor;
private State state = State.HEADER;
@ -122,10 +122,14 @@ public class MessageDeframer implements Closeable {
StatsTraceContext statsTraceCtx) {
this.listener = Preconditions.checkNotNull(listener, "sink");
this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
this.maxMessageSize = maxMessageSize;
this.maxInboundMessageSize = maxMessageSize;
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
}
void setMaxInboundMessageSize(int messageSize) {
maxInboundMessageSize = messageSize;
}
/**
* Sets the decompressor available to use. The message encoding for the stream comes later in
* time, and thus will not be available at the time of construction. This should only be set
@ -338,10 +342,9 @@ public class MessageDeframer implements Closeable {
// Update the required length to include the length of the frame.
requiredLength = nextFrame.readInt();
if (requiredLength < 0 || requiredLength > maxMessageSize) {
throw Status.INTERNAL.withDescription(String.format("Frame size %d exceeds maximum: %d. "
+ "If this is normal, increase the maxMessageSize in the channel/server builder",
requiredLength, maxMessageSize)).asRuntimeException();
if (requiredLength < 0 || requiredLength > maxInboundMessageSize) {
throw Status.INTERNAL.withDescription(String.format("Frame size %d exceeds maximum: %d. ",
requiredLength, maxInboundMessageSize)).asRuntimeException();
}
// Continue reading the frame body.
@ -377,7 +380,7 @@ public class MessageDeframer implements Closeable {
// Enforce the maxMessageSize limit on the returned stream.
InputStream unlimitedStream =
decompressor.decompress(ReadableBuffers.openStream(nextFrame, true));
return new SizeEnforcingInputStream(unlimitedStream, maxMessageSize, statsTraceCtx);
return new SizeEnforcingInputStream(unlimitedStream, maxInboundMessageSize, statsTraceCtx);
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -461,8 +464,7 @@ public class MessageDeframer implements Closeable {
private void verifySize() {
if (count > maxMessageSize) {
throw Status.INTERNAL.withDescription(String.format(
"Compressed frame exceeds maximum frame size: %d. Bytes read: %d. "
+ "If this is normal, increase the maxMessageSize in the channel/server builder",
"Compressed frame exceeds maximum frame size: %d. Bytes read: %d. ",
maxMessageSize, count)).asRuntimeException();
}
}

View File

@ -33,6 +33,7 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import com.google.common.io.ByteStreams;
@ -58,6 +59,9 @@ import javax.annotation.Nullable;
* MessageFramer.Sink}.
*/
public class MessageFramer {
private static final int NO_MAX_OUTBOUND_MESSAGE_SIZE = -1;
/**
* Sink implemented by the transport layer to receive frames and forward them to their
* destination.
@ -79,6 +83,8 @@ public class MessageFramer {
private static final byte COMPRESSED = 1;
private final Sink sink;
// effectively final. Can only be set once.
private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE;
private WritableBuffer buffer;
private Compressor compressor = Codec.Identity.NONE;
private boolean messageCompression = true;
@ -111,6 +117,11 @@ public class MessageFramer {
return this;
}
void setMaxOutboundMessageSize(int maxSize) {
checkState(maxOutboundMessageSize == NO_MAX_OUTBOUND_MESSAGE_SIZE, "max size already set");
maxOutboundMessageSize = maxSize;
}
/**
* Writes out a payload message.
*
@ -155,6 +166,12 @@ public class MessageFramer {
}
BufferChainOutputStream bufferChain = new BufferChainOutputStream();
int written = writeToOutputStream(message, bufferChain);
if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
throw Status.INTERNAL
.withDescription(
String.format("message too large %d > %d", written , maxOutboundMessageSize))
.asRuntimeException();
}
writeBufferChain(bufferChain, false);
return written;
}
@ -169,6 +186,12 @@ public class MessageFramer {
} finally {
compressingStream.close();
}
if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
throw Status.CANCELLED
.withDescription(
String.format("message too large %d > %d", written , maxOutboundMessageSize))
.asRuntimeException();
}
writeBufferChain(bufferChain, true);
return written;
@ -186,6 +209,12 @@ public class MessageFramer {
*/
private int writeKnownLengthUncompressed(InputStream message, int messageLength)
throws IOException {
if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) {
throw Status.CANCELLED
.withDescription(
String.format("message too large %d > %d", messageLength , maxOutboundMessageSize))
.asRuntimeException();
}
ByteBuffer header = ByteBuffer.wrap(headerScratch);
header.put(UNCOMPRESSED);
header.putInt(messageLength);

View File

@ -79,4 +79,10 @@ public class NoopClientStream implements ClientStream {
@Override
public void setDecompressor(Decompressor decompressor) {}
@Override
public void setMaxInboundMessageSize(int maxSize) {}
@Override
public void setMaxOutboundMessageSize(int maxSize) {}
}

View File

@ -160,25 +160,30 @@ public class CallOptionsTest {
@Test
public void toStringMatches_noDeadline_default() {
String expected = "CallOptions{deadline=null, authority=authority, callCredentials=null, "
+ "affinity={sample=blah}, "
+ "executor=class io.grpc.internal.SerializingExecutor, compressorName=compressor, "
+ "customOptions=[[option1, value1], [option2, value2]], waitForReady=true}";
String actual = allSet
.withDeadline(null)
.withExecutor(new SerializingExecutor(directExecutor()))
.withCallCredentials(null)
.withMaxInboundMessageSize(44)
.withMaxOutboundMessageSize(55)
.toString();
assertThat(actual).isEqualTo(expected);
assertThat(actual).contains("deadline=null");
assertThat(actual).contains("authority=authority");
assertThat(actual).contains("callCredentials=null");
assertThat(actual).contains("affinity={sample=blah}");
assertThat(actual).contains("executor=class io.grpc.internal.SerializingExecutor");
assertThat(actual).contains("compressorName=compressor");
assertThat(actual).contains("customOptions=[[option1, value1], [option2, value2]]");
assertThat(actual).contains("waitForReady=true");
assertThat(actual).contains("maxInboundMessageSize=44");
assertThat(actual).contains("maxOutboundMessageSize=55");
}
@Test
public void toStringMatches_noDeadline() {
assertThat("CallOptions{deadline=null, authority=null, callCredentials=null, "
+ "affinity={}, executor=null, compressorName=null, customOptions=[], "
+ "waitForReady=false}")
.isEqualTo(CallOptions.DEFAULT.toString());
String actual = CallOptions.DEFAULT.toString();
assertThat(actual).contains("deadline=null");
}
@Test
@ -207,19 +212,19 @@ public class CallOptionsTest {
CallOptions opts = CallOptions.DEFAULT;
assertThat(opts.getOption(option1)).isEqualTo("default");
}
@Test
public void withCustomOption() {
CallOptions opts = CallOptions.DEFAULT.withOption(option1, "v1");
assertThat(opts.getOption(option1)).isEqualTo("v1");
}
@Test
public void withCustomOptionLastOneWins() {
CallOptions opts = CallOptions.DEFAULT.withOption(option1, "v1").withOption(option1, "v2");
assertThat(opts.getOption(option1)).isEqualTo("v2");
}
@Test
public void withMultipleCustomOption() {
CallOptions opts = CallOptions.DEFAULT.withOption(option1, "v1").withOption(option2, "v2");

View File

@ -254,6 +254,12 @@ public class AbstractClientStream2Test {
@Override
public void setAuthority(String authority) {}
@Override
public void setMaxInboundMessageSize(int maxSize) {}
@Override
public void setMaxOutboundMessageSize(int maxSize) {}
}
private static class BaseSink implements AbstractClientStream2.Sink {

View File

@ -824,6 +824,25 @@ public class ClientCallImplTest {
assertSame(cause, status.getCause());
}
@Test
public void startAddsMaxSize() {
CallOptions callOptions =
CallOptions.DEFAULT.withMaxInboundMessageSize(1).withMaxOutboundMessageSize(2);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
DESCRIPTOR,
new SerializingExecutor(Executors.newSingleThreadExecutor()),
callOptions,
statsTraceCtx,
provider,
deadlineCancellationExecutor)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(stream).setMaxInboundMessageSize(1);
verify(stream).setMaxOutboundMessageSize(2);
}
private void assertStatusInStats(Status.Code statusCode) {
StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
assertNotNull(record);

View File

@ -31,6 +31,7 @@
package io.grpc.testing.integration;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.testing.integration.Messages.PayloadType.COMPRESSABLE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -41,6 +42,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.auth.oauth2.GoogleCredentials;
@ -785,6 +787,74 @@ public abstract class AbstractInteropTest {
}
}
@Test(timeout = 10000)
public void maxInboundSize_exact() {
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder().setSize(1))
.build();
int size = blockingStub.streamingOutputCall(request).next().getSerializedSize();
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel)
.withMaxInboundMessageSize(size);
stub.streamingOutputCall(request).next();
}
@Test(timeout = 10000)
public void maxInboundSize_tooBig() {
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder().setSize(1))
.build();
int size = blockingStub.streamingOutputCall(request).next().getSerializedSize();
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel)
.withMaxInboundMessageSize(size - 1);
try {
stub.streamingOutputCall(request).next();
fail();
} catch (StatusRuntimeException ex) {
Status s = ex.getStatus();
assertThat(s.getCode()).named(s.toString()).isEqualTo(Status.Code.INTERNAL);
assertThat(Throwables.getStackTraceAsString(ex)).contains("exceeds maximum");
}
}
@Test(timeout = 10000)
public void maxOutboundSize_exact() {
// warm up the channel and JVM
blockingStub.emptyCall(Empty.getDefaultInstance());
// set at least one field to ensure the size is non-zero.
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder().setSize(1))
.build();
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel)
.withMaxOutboundMessageSize(request.getSerializedSize());
stub.streamingOutputCall(request).next();
}
@Test(timeout = 10000)
public void maxOutboundSize_tooBig() {
// warm up the channel and JVM
blockingStub.emptyCall(Empty.getDefaultInstance());
// set at least one field to ensure the size is non-zero.
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder().setSize(1))
.build();
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel)
.withMaxOutboundMessageSize(request.getSerializedSize() - 1);
try {
stub.streamingOutputCall(request).next();
fail();
} catch (StatusRuntimeException ex) {
Status s = ex.getStatus();
assertThat(s.getCode()).named(s.toString()).isEqualTo(Status.Code.CANCELLED);
assertThat(Throwables.getStackTraceAsString(ex)).contains("message too large");
}
}
protected int unaryPayloadLength() {
// 10MiB.
return 10485760;
@ -955,7 +1025,7 @@ public abstract class AbstractInteropTest {
// Test FullDuplexCall
@SuppressWarnings("unchecked")
StreamObserver<StreamingOutputCallResponse> responseObserver =
(StreamObserver<StreamingOutputCallResponse>) mock(StreamObserver.class);
mock(StreamObserver.class);
StreamObserver<StreamingOutputCallRequest> requestObserver
= asyncStub.fullDuplexCall(responseObserver);
requestObserver.onNext(streamingRequest);

View File

@ -65,8 +65,18 @@ public class InProcessTest extends AbstractInteropTest {
@Override
protected boolean metricsExpected() {
// TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are
// TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizes are
// not counted. (https://github.com/grpc/grpc-java/issues/2284)
return false;
}
@Override
public void maxInboundSize_tooBig() {
// noop, not enforced.
}
@Override
public void maxOutboundSize_tooBig() {
// noop, not enforced.
}
}

View File

@ -40,6 +40,7 @@ import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Deadline;
import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
@ -189,4 +190,22 @@ public abstract class AbstractStub<S extends AbstractStub<S>> {
public final S withWaitForReady() {
return build(channel, callOptions.withWaitForReady());
}
/**
* Returns a new stub that limits the maximum acceptable message size from a remote peer.
*
* <p>If unset, the {@link ManagedChannelBuilder#maxInboundMessageSize(int)} limit is used.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public final S withMaxInboundMessageSize(int maxSize) {
return build(channel, callOptions.withMaxInboundMessageSize(maxSize));
}
/**
* Returns a new stub that limits the maximum acceptable message size to send a remote peer.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public final S withMaxOutboundMessageSize(int maxSize) {
return build(channel, callOptions.withMaxOutboundMessageSize(maxSize));
}
}