Merge pull request #1346 from murgatroid99/grpc-js_max_message_size

grpc-js: Add max message size enforcement
This commit is contained in:
Michael Lumish 2020-04-13 15:45:00 -07:00 committed by GitHub
commit 227a35e899
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 297 additions and 9 deletions

View File

@ -41,5 +41,7 @@ In addition, all channel arguments defined in [this header file](https://github.
- `grpc.initial_reconnect_backoff_ms`
- `grpc.max_reconnect_backoff_ms`
- `grpc.use_local_subchannel_pool`
- `grpc.max_send_message_length`
- `grpc.max_receive_message_length`
- `channelOverride`
- `channelFactoryOverride`

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "0.7.9",
"version": "0.8.0",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -30,6 +30,8 @@ export interface ChannelOptions {
'grpc.initial_reconnect_backoff_ms'?: number;
'grpc.max_reconnect_backoff_ms'?: number;
'grpc.use_local_subchannel_pool'?: number;
'grpc.max_send_message_length'?: number;
'grpc.max_receive_message_length'?: number;
[key: string]: string | number | undefined;
}
@ -49,6 +51,8 @@ export const recognizedOptions = {
'grpc.initial_reconnect_backoff_ms': true,
'grpc.max_reconnect_backoff_ms': true,
'grpc.use_local_subchannel_pool': true,
'grpc.max_send_message_length': true,
'grpc.max_receive_message_length': true,
};
export function channelOptionsEqual(

View File

@ -37,6 +37,7 @@ import { getDefaultAuthority } from './resolver';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { trace, log } from './logging';
import { SubchannelAddress } from './subchannel';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
export enum ConnectivityState {
CONNECTING,
@ -213,6 +214,7 @@ export class ChannelImplementation implements Channel {
this.filterStackFactory = new FilterStackFactory([
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this),
new MaxMessageSizeFilterFactory(this.options),
new CompressionFilterFactory(this),
]);
// TODO(murgatroid99): Add more centralized handling of channel options

View File

@ -40,3 +40,9 @@ export enum LogVerbosity {
INFO,
ERROR,
}
// -1 means unlimited
export const DEFAULT_MAX_SEND_MESSAGE_LENGTH = -1;
// 4 MB default
export const DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024;

View File

@ -0,0 +1,78 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { BaseFilter, Filter, FilterFactory } from "./filter";
import { Call, WriteObject } from "./call-stream";
import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH } from "./constants";
import { ChannelOptions } from "./channel-options";
export class MaxMessageSizeFilter extends BaseFilter implements Filter {
private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
constructor(
private readonly options: ChannelOptions,
private readonly callStream: Call
) {
super();
if ('grpc.max_send_message_length' in options) {
this.maxSendMessageSize = options['grpc.max_send_message_length']!;
}
if ('grpc.max_receive_message_length' in options) {
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
}
}
async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
/* A configured size of -1 means that there is no limit, so skip the check
* entirely */
if (this.maxSendMessageSize === -1) {
return message;
} else {
const concreteMessage = await message;
if (concreteMessage.message.length > this.maxSendMessageSize) {
this.callStream.cancelWithStatus(Status.RESOURCE_EXHAUSTED, `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`);
return Promise.reject<WriteObject>('Message too large');
} else {
return concreteMessage;
}
}
}
async receiveMessage(message: Promise<Buffer>): Promise<Buffer> {
/* A configured size of -1 means that there is no limit, so skip the check
* entirely */
if (this.maxReceiveMessageSize === -1) {
return message;
} else {
const concreteMessage = await message;
if (concreteMessage.length > this.maxReceiveMessageSize) {
this.callStream.cancelWithStatus(Status.RESOURCE_EXHAUSTED, `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`);
return Promise.reject<Buffer>('Message too large');
} else {
return concreteMessage;
}
}
}
}
export class MaxMessageSizeFilterFactory implements FilterFactory<MaxMessageSizeFilter> {
constructor(private readonly options: ChannelOptions) {}
createFilter(callStream: Call): MaxMessageSizeFilter {
return new MaxMessageSizeFilter(this.options, callStream);
}
}

View File

@ -20,11 +20,12 @@ import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream';
import { StatusObject } from './call-stream';
import { Status } from './constants';
import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH } from './constants';
import { Deserialize, Serialize } from './make-client';
import { Metadata } from './metadata';
import { StreamDecoder } from './stream-decoder';
import { ObjectReadable, ObjectWritable } from './object-stream';
import { ChannelOptions } from './channel-options';
interface DeadlineUnitIndexSignature {
[name: string]: number;
@ -338,10 +339,13 @@ export class Http2ServerCallStream<
private isPushPending = false;
private bufferedMessages: Array<Buffer | null> = [];
private messagesToPush: Array<RequestType | null> = [];
private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
constructor(
private stream: http2.ServerHttp2Stream,
private handler: Handler<RequestType, ResponseType>
private handler: Handler<RequestType, ResponseType>,
private options: ChannelOptions
) {
super();
@ -361,6 +365,13 @@ export class Http2ServerCallStream<
this.stream.on('drain', () => {
this.emit('drain');
});
if ('grpc.max_send_message_length' in options) {
this.maxSendMessageSize = options['grpc.max_send_message_length']!;
}
if ('grpc.max_receive_message_length' in options) {
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
}
}
private checkCancelled(): boolean {
@ -435,6 +446,13 @@ export class Http2ServerCallStream<
stream.once('end', async () => {
try {
const requestBytes = Buffer.concat(chunks, totalLength);
if (this.maxReceiveMessageSize !== -1 && requestBytes.length > this.maxReceiveMessageSize) {
this.sendError({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${requestBytes.length} vs. ${this.maxReceiveMessageSize})`
});
resolve();
}
resolve(await this.deserializeMessage(requestBytes));
} catch (err) {
@ -555,6 +573,14 @@ export class Http2ServerCallStream<
return;
}
if (this.maxSendMessageSize !== -1 && chunk.length > this.maxSendMessageSize) {
this.sendError({
code: Status.RESOURCE_EXHAUSTED,
details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`
});
return;
}
this.sendMetadata();
return this.stream.write(chunk);
}
@ -581,6 +607,13 @@ export class Http2ServerCallStream<
const messages = decoder.write(data);
for (const message of messages) {
if (this.maxReceiveMessageSize !== -1 && message.length > this.maxReceiveMessageSize) {
this.sendError({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`
});
return;
}
this.pushOrBufferMessage(readable, message);
}
});

View File

@ -524,7 +524,7 @@ export class Server {
throw getUnimplementedStatusResponse(path);
}
const call = new Http2ServerCallStream(stream, handler);
const call = new Http2ServerCallStream(stream, handler, this.options);
const metadata: Metadata = call.receiveMetadata(headers) as Metadata;
switch (handler.type) {
case 'unary':
@ -555,7 +555,7 @@ export class Server {
throw new Error(`Unknown handler type: ${handler.type}`);
}
} catch (err) {
const call = new Http2ServerCallStream(stream, null!);
const call = new Http2ServerCallStream(stream, null!, this.options);
if (err.code === undefined) {
err.code = Status.INTERNAL;

View File

@ -55,23 +55,34 @@ describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, functio
describe('Interop-adjacent tests', function() {
let server;
let client;
let port;
before(function(done) {
/* To make testing max message size enforcement easier, the we explicitly
* remove the limit on the size of messages the server can receive, and
* we expect that the size of messages it can send is unlimited by
* default. On the other side, we explicitly limit the size of messages
* the client can send to 4 MB, and we expect that the size of messages
* it can receive is limited to 4 MB by default */
interopServer.getServer(0, true, (err, serverObj) => {
if (err) {
done(err);
} else {
server = serverObj.server;
port = serverObj.port;
server.start();
const ca_path = path.join(__dirname, '../data/ca.pem');
const ca_data = fs.readFileSync(ca_path);
const creds = grpc.credentials.createSsl(ca_data);
const options = {
'grpc.ssl_target_name_override': 'foo.test.google.fr',
'grpc.default_authority': 'foo.test.google.fr'
'grpc.default_authority': 'foo.test.google.fr',
'grpc.max_send_message_length': 4*1024*1024
};
client = new testProto.TestService(`localhost:${serverObj.port}`, creds, options);
client = new testProto.TestService(`localhost:${port}`, creds, options);
done();
}
}, {
'grpc.max_receive_message_length': -1
});
});
after(function() {
@ -133,5 +144,153 @@ describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, functio
done();
});
});
describe('max message size', function() {
// A size that is larger than the default limit
const largeMessageSize = 8 * 1024 * 1024;
const largeMessage = Buffer.alloc(largeMessageSize);
it('should get an error when sending a large message', function(done) {
done = multiDone(done, 2);
const unaryMessage = {payload: {body: largeMessage}};
client.unaryCall(unaryMessage, (error, result) => {
assert(error);
assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
done();
});
const stream = client.fullDuplexCall();
stream.write({payload: {body: largeMessage}});
stream.end();
stream.on('data', () => {});
stream.on('status', (status) => {
assert.strictEqual(status.code, grpc.status.RESOURCE_EXHAUSTED);
done();
});
stream.on('error', (error) => {
});
});
it('should get an error when receiving a large message', function(done) {
done = multiDone(done, 2);
client.unaryCall({response_size: largeMessageSize}, (error, result) => {
assert(error);
assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
done();
});
const stream = client.fullDuplexCall();
stream.write({response_parameters: [{size: largeMessageSize}]});
stream.end();
stream.on('data', () => {});
stream.on('status', (status) => {
assert.strictEqual(status.code, grpc.status.RESOURCE_EXHAUSTED);
done();
});
stream.on('error', (error) => {
});
});
describe('with a client with no message size limits', function() {
let unrestrictedClient;
before(function() {
const ca_path = path.join(__dirname, '../data/ca.pem');
const ca_data = fs.readFileSync(ca_path);
const creds = grpc.credentials.createSsl(ca_data);
const options = {
'grpc.ssl_target_name_override': 'foo.test.google.fr',
'grpc.default_authority': 'foo.test.google.fr',
'grpc.max_send_message_length': -1,
'grpc.max_receive_message_length': -1
};
unrestrictedClient = new testProto.TestService(`localhost:${port}`, creds, options);
});
it('should not get an error when sending or receiving a large message', function(done) {
done = multiDone(done, 2);
const unaryRequestMessage = {
response_size: largeMessageSize,
payload: {
body: largeMessage
}
};
unrestrictedClient.unaryCall(unaryRequestMessage, (error, result) => {
assert.ifError(error);
assert.strictEqual(result.payload.body.length, largeMessageSize);
done();
});
const streamingRequestMessage = {
response_parameters: [{size: largeMessageSize}],
payload: {body: largeMessage}
};
const stream = unrestrictedClient.fullDuplexCall();
stream.write(streamingRequestMessage);
stream.end();
stream.on('data', (result) => {
assert.strictEqual(result.payload.body.length, largeMessageSize);
});
stream.on('status', () => {
done();
});
stream.on('error', (error) => {
assert.ifError(error);
});
});
});
describe('with a server with message size limits and a client without limits', function() {
let restrictedServer;
let restrictedServerClient;
before(function(done) {
interopServer.getServer(0, true, (err, serverObj) => {
if (err) {
done(err);
} else {
restrictedServer = serverObj.server;
restrictedServer.start();
const ca_path = path.join(__dirname, '../data/ca.pem');
const ca_data = fs.readFileSync(ca_path);
const creds = grpc.credentials.createSsl(ca_data);
const options = {
'grpc.ssl_target_name_override': 'foo.test.google.fr',
'grpc.default_authority': 'foo.test.google.fr',
'grpc.max_receive_message_length': -1
};
restrictedServerClient = new testProto.TestService(`localhost:${serverObj.port}`, creds, options);
done();
}
}, {'grpc.max_send_message_length': 4 * 1024 * 1024});
});
after(function() {
restrictedServer.forceShutdown();
});
it('should get an error when sending a large message', function(done) {
restrictedServerClient.unaryCall({payload: {body: largeMessage}}, (error, result) => {
assert(error);
assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
const stream = restrictedServerClient.fullDuplexCall();
stream.write({payload: {body: largeMessage}});
stream.end();
stream.on('data', () => {});
stream.on('status', (status) => {
assert.strictEqual(status.code, grpc.status.RESOURCE_EXHAUSTED);
done();
});
stream.on('error', (error) => {
});
});
});
it('should get an error when requesting a large message', function(done) {
done = multiDone(done, 2);
restrictedServerClient.unaryCall({response_size: largeMessageSize}, (error, result) => {
assert(error);
assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED);
done();
});
const stream = restrictedServerClient.fullDuplexCall();
stream.write({response_parameters: [{size: largeMessageSize}]});
stream.end();
stream.on('data', () => {});
stream.on('status', (status) => {
assert.strictEqual(status.code, grpc.status.RESOURCE_EXHAUSTED);
done();
});
stream.on('error', (error) => {
});
});
});
});
});
});

View File

@ -202,10 +202,14 @@ function handleHalfDuplex(call) {
* @param {boolean} tls Indicates that the bound port should use TLS
* @param {function(Error, {{server: Server, port: number}})} callback Callback
* to call with result or error
* @param {object?} options Optional additional options to use when
* constructing the server
*/
function getServer(port, tls, callback) {
function getServer(port, tls, callback, options) {
// TODO(mlumish): enable TLS functionality
var options = {};
if (!options) {
options = {};
}
var server_creds;
if (tls) {
var key_path = path.join(__dirname, '../data/server1.key');