From f30a5d8588ac552e4119172e2028dfff6eab162f Mon Sep 17 00:00:00 2001 From: cjihrig Date: Fri, 17 May 2019 14:20:17 -0400 Subject: [PATCH] grpc-js: support client cancellation This commit adds client cancellation support and tests for cancellation and deadlines. --- packages/grpc-js/src/server-call.ts | 15 +- .../grpc-js/test/test-server-deadlines.ts | 188 ++++++++++++++++++ 2 files changed, 201 insertions(+), 2 deletions(-) create mode 100644 packages/grpc-js/test/test-server-deadlines.ts diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 6c3f7375..8c295def 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -56,11 +56,11 @@ const defaultResponseOptions = { waitForTrailers: true, } as http2.ServerStreamResponseOptions; -export interface ServerSurfaceCall { +export type ServerSurfaceCall = { cancelled: boolean; getPeer(): string; sendMetadata(responseMetadata: Metadata): void; -} +} & EventEmitter; export type ServerUnaryCall = ServerSurfaceCall & { request: RequestType | null; @@ -88,6 +88,7 @@ export class ServerUnaryCallImpl extends EventEmitter super(); this.cancelled = false; this.request = null; + this.call.setupSurfaceCall(this); } getPeer(): string { @@ -111,6 +112,7 @@ export class ServerReadableStreamImpl ) { super({ objectMode: true }); this.cancelled = false; + this.call.setupSurfaceCall(this); this.call.setupReadable(this); } @@ -143,6 +145,7 @@ export class ServerWritableStreamImpl this.cancelled = false; this.request = null; this.trailingMetadata = new Metadata(); + this.call.setupSurfaceCall(this); this.on('error', err => { this.call.sendError(err as ServiceError); @@ -212,6 +215,7 @@ export class ServerDuplexStreamImpl extends Duplex super({ objectMode: true }); this.cancelled = false; this.trailingMetadata = new Metadata(); + this.call.setupSurfaceCall(this); this.call.setupReadable(this); this.on('error', err => { @@ -513,6 +517,13 @@ export class Http2ServerCallStream< this.stream.resume(); } + setupSurfaceCall(call: ServerSurfaceCall) { + this.once('cancelled', reason => { + call.cancelled = true; + call.emit('cancelled', reason); + }); + } + setupReadable( readable: | ServerReadableStream diff --git a/packages/grpc-js/test/test-server-deadlines.ts b/packages/grpc-js/test/test-server-deadlines.ts new file mode 100644 index 00000000..ef54ef3f --- /dev/null +++ b/packages/grpc-js/test/test-server-deadlines.ts @@ -0,0 +1,188 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Allow `any` data type for testing runtime type checking. +// tslint:disable no-any +import * as assert from 'assert'; +import * as path from 'path'; + +import * as grpc from '../src'; +import { ServerCredentials } from '../src'; +import { ServiceError } from '../src/call'; +import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; +import { Server } from '../src/server'; +import { + sendUnaryData, + ServerUnaryCall, + ServerWritableStream, +} from '../src/server-call'; + +import { loadProtoFile } from './common'; + +const clientInsecureCreds = grpc.credentials.createInsecure(); +const serverInsecureCreds = ServerCredentials.createInsecure(); + +describe('Server deadlines', () => { + let server: Server; + let client: ServiceClient; + + before(done => { + const protoFile = path.join(__dirname, 'fixtures', 'test_service.proto'); + const testServiceDef = loadProtoFile(protoFile); + const testServiceClient = testServiceDef.TestService as ServiceClientConstructor; + + server = new Server(); + server.addService(testServiceClient.service, { + unary(call: ServerUnaryCall, cb: sendUnaryData) { + setTimeout(() => { + cb(null, {}); + }, 2000); + }, + }); + + server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => { + assert.ifError(err); + client = new testServiceClient(`localhost:${port}`, clientInsecureCreds); + server.start(); + done(); + }); + }); + + after(done => { + client.close(); + server.tryShutdown(done); + }); + + it('works with deadlines', done => { + const metadata = new grpc.Metadata(); + const { + path, + requestSerialize: serialize, + responseDeserialize: deserialize, + } = client.unary as any; + + metadata.set('grpc-timeout', '100m'); + client.makeUnaryRequest( + path, + serialize, + deserialize, + {}, + metadata, + {}, + (error: any, response: any) => { + assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); + assert.strictEqual(error.details, 'Deadline exceeded'); + assert.strictEqual(error.message, 'Deadline exceeded'); + done(); + } + ); + }); + + it('rejects invalid deadline', done => { + const metadata = new grpc.Metadata(); + const { + path, + requestSerialize: serialize, + responseDeserialize: deserialize, + } = client.unary as any; + + metadata.set('grpc-timeout', 'Infinity'); + client.makeUnaryRequest( + path, + serialize, + deserialize, + {}, + metadata, + {}, + (error: any, response: any) => { + assert.strictEqual(error.code, grpc.status.OUT_OF_RANGE); + assert.strictEqual(error.details, 'Invalid deadline'); + assert.strictEqual(error.message, 'Invalid deadline'); + done(); + } + ); + }); +}); + +describe('Cancellation', () => { + let server: Server; + let client: ServiceClient; + let inHandler = false; + let cancelledInServer = false; + + before(done => { + const protoFile = path.join(__dirname, 'fixtures', 'test_service.proto'); + const testServiceDef = loadProtoFile(protoFile); + const testServiceClient = testServiceDef.TestService as ServiceClientConstructor; + + server = new Server(); + server.addService(testServiceClient.service, { + serverStream(stream: ServerWritableStream) { + inHandler = true; + stream.on('cancelled', () => { + stream.write({}); + stream.end(); + cancelledInServer = true; + }); + }, + }); + + server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => { + assert.ifError(err); + client = new testServiceClient(`localhost:${port}`, clientInsecureCreds); + server.start(); + done(); + }); + }); + + after(done => { + client.close(); + server.tryShutdown(done); + }); + + it('handles requests cancelled by the client', done => { + const call = client.serverStream({}); + + call.on('data', assert.ifError); + call.on('error', (error: ServiceError) => { + assert.strictEqual(error.code, grpc.status.CANCELLED); + assert.strictEqual(error.details, 'Cancelled on client'); + assert.strictEqual(error.message, 'Cancelled on client'); + waitForServerCancel(); + }); + + function waitForHandler() { + if (inHandler === true) { + call.cancel(); + return; + } + + setImmediate(waitForHandler); + } + + function waitForServerCancel() { + if (cancelledInServer === true) { + done(); + return; + } + + setImmediate(waitForServerCancel); + } + + waitForHandler(); + }); +});