From d68d94a5f47582419390e2e8f08a10339a866026 Mon Sep 17 00:00:00 2001 From: Robert Date: Mon, 25 Oct 2021 20:21:48 -0700 Subject: [PATCH] re-enable NoCompress flag behavior and check Compressed Flag byte on server --- packages/grpc-js/src/compression-filter.ts | 10 ++++----- packages/grpc-js/src/server-call.ts | 8 +++++-- packages/grpc-js/test/test-server.ts | 26 ++++++++++++++++++++++ 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index b7e5e5d2..ae507d33 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -17,7 +17,7 @@ import * as zlib from 'zlib'; -import { Call, WriteObject } from './call-stream'; +import { Call, WriteObject, WriteFlags } from './call-stream'; import { Channel } from './channel'; import { BaseFilter, Filter, FilterFactory } from './filter'; import { Metadata, MetadataValue } from './metadata'; @@ -238,10 +238,10 @@ export class CompressionFilter extends BaseFilter implements Filter { * and the output is a framed and possibly compressed message. For this * reason, this filter should be at the bottom of the filter stack */ const resolvedMessage: WriteObject = await message; - const compress = !(this.sendCompression instanceof IdentityHandler); - // resolvedMessage.flags === undefined - // ? false - // : (resolvedMessage.flags & WriteFlags.NoCompress) === 0; + const compress = + resolvedMessage.flags === undefined + ? !(this.sendCompression instanceof IdentityHandler) + : (resolvedMessage.flags & WriteFlags.NoCompress) === 0; return { message: await this.sendCompression.writeMessage( resolvedMessage.message, diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 8ea488a4..f1d3a8ab 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -561,7 +561,9 @@ export class Http2ServerCallStream< this.emit('receiveMessage'); - const decompressedMessage = await this.getDecompressedMessage(requestBytes, encoding); + const compressed = requestBytes.readUInt8(0) === 1; + const compressedMessageEncoding = compressed ? encoding : undefined; + const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding); // Encountered an error with decompression; it'll already have been propogated back // Just return early @@ -748,7 +750,9 @@ export class Http2ServerCallStream< } this.emit('receiveMessage'); - const decompressedMessage = await this.getDecompressedMessage(message, encoding); + const compressed = message.readUInt8(0) === 1; + const compressedMessageEncoding = compressed ? encoding : undefined; + const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding); // Encountered an error with decompression; it'll already have been propogated back // Just return early diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index c8171e79..b6cffd81 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -849,5 +849,31 @@ describe('Compressed requests', () => { }) }); }); + + it('Should not compress requests when the NoCompress write flag is used', done => { + const bidiStream = client.bidiStream(); + let timesRequested = 0; + let timesResponded = 0; + + bidiStream.on('data', () => { + timesResponded += 1; + }); + + bidiStream.on('error', (err) => { + assert.ifError(err); + done(); + }); + + bidiStream.on('end', () => { + assert.equal(timesResponded, timesRequested); + done(); + }); + + bidiStream._write({ message: 'foo' }, '2', (err: any) => { + assert.ifError(err); + timesRequested += 1; + setTimeout(() => bidiStream.end(), 10); + }); + }); }); });