grpc-js: support client cancellation

This commit adds client cancellation support and tests for
cancellation and deadlines.
This commit is contained in:
cjihrig 2019-05-17 14:20:17 -04:00
parent 2ab15fef4f
commit f30a5d8588
No known key found for this signature in database
GPG Key ID: 7434390BDBE9B9C5
2 changed files with 201 additions and 2 deletions

View File

@ -56,11 +56,11 @@ const defaultResponseOptions = {
waitForTrailers: true,
} as http2.ServerStreamResponseOptions;
export interface ServerSurfaceCall {
export type ServerSurfaceCall = {
cancelled: boolean;
getPeer(): string;
sendMetadata(responseMetadata: Metadata): void;
}
} & EventEmitter;
export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
request: RequestType | null;
@ -88,6 +88,7 @@ export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
super();
this.cancelled = false;
this.request = null;
this.call.setupSurfaceCall(this);
}
getPeer(): string {
@ -111,6 +112,7 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
) {
super({ objectMode: true });
this.cancelled = false;
this.call.setupSurfaceCall(this);
this.call.setupReadable(this);
}
@ -143,6 +145,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
this.cancelled = false;
this.request = null;
this.trailingMetadata = new Metadata();
this.call.setupSurfaceCall(this);
this.on('error', err => {
this.call.sendError(err as ServiceError);
@ -212,6 +215,7 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
super({ objectMode: true });
this.cancelled = false;
this.trailingMetadata = new Metadata();
this.call.setupSurfaceCall(this);
this.call.setupReadable(this);
this.on('error', err => {
@ -513,6 +517,13 @@ export class Http2ServerCallStream<
this.stream.resume();
}
setupSurfaceCall(call: ServerSurfaceCall) {
this.once('cancelled', reason => {
call.cancelled = true;
call.emit('cancelled', reason);
});
}
setupReadable(
readable:
| ServerReadableStream<RequestType, ResponseType>

View File

@ -0,0 +1,188 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Allow `any` data type for testing runtime type checking.
// tslint:disable no-any
import * as assert from 'assert';
import * as path from 'path';
import * as grpc from '../src';
import { ServerCredentials } from '../src';
import { ServiceError } from '../src/call';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { Server } from '../src/server';
import {
sendUnaryData,
ServerUnaryCall,
ServerWritableStream,
} from '../src/server-call';
import { loadProtoFile } from './common';
const clientInsecureCreds = grpc.credentials.createInsecure();
const serverInsecureCreds = ServerCredentials.createInsecure();
describe('Server deadlines', () => {
let server: Server;
let client: ServiceClient;
before(done => {
const protoFile = path.join(__dirname, 'fixtures', 'test_service.proto');
const testServiceDef = loadProtoFile(protoFile);
const testServiceClient = testServiceDef.TestService as ServiceClientConstructor;
server = new Server();
server.addService(testServiceClient.service, {
unary(call: ServerUnaryCall<any, any>, cb: sendUnaryData<any>) {
setTimeout(() => {
cb(null, {});
}, 2000);
},
});
server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
assert.ifError(err);
client = new testServiceClient(`localhost:${port}`, clientInsecureCreds);
server.start();
done();
});
});
after(done => {
client.close();
server.tryShutdown(done);
});
it('works with deadlines', done => {
const metadata = new grpc.Metadata();
const {
path,
requestSerialize: serialize,
responseDeserialize: deserialize,
} = client.unary as any;
metadata.set('grpc-timeout', '100m');
client.makeUnaryRequest(
path,
serialize,
deserialize,
{},
metadata,
{},
(error: any, response: any) => {
assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED);
assert.strictEqual(error.details, 'Deadline exceeded');
assert.strictEqual(error.message, 'Deadline exceeded');
done();
}
);
});
it('rejects invalid deadline', done => {
const metadata = new grpc.Metadata();
const {
path,
requestSerialize: serialize,
responseDeserialize: deserialize,
} = client.unary as any;
metadata.set('grpc-timeout', 'Infinity');
client.makeUnaryRequest(
path,
serialize,
deserialize,
{},
metadata,
{},
(error: any, response: any) => {
assert.strictEqual(error.code, grpc.status.OUT_OF_RANGE);
assert.strictEqual(error.details, 'Invalid deadline');
assert.strictEqual(error.message, 'Invalid deadline');
done();
}
);
});
});
describe('Cancellation', () => {
let server: Server;
let client: ServiceClient;
let inHandler = false;
let cancelledInServer = false;
before(done => {
const protoFile = path.join(__dirname, 'fixtures', 'test_service.proto');
const testServiceDef = loadProtoFile(protoFile);
const testServiceClient = testServiceDef.TestService as ServiceClientConstructor;
server = new Server();
server.addService(testServiceClient.service, {
serverStream(stream: ServerWritableStream<any, any>) {
inHandler = true;
stream.on('cancelled', () => {
stream.write({});
stream.end();
cancelledInServer = true;
});
},
});
server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
assert.ifError(err);
client = new testServiceClient(`localhost:${port}`, clientInsecureCreds);
server.start();
done();
});
});
after(done => {
client.close();
server.tryShutdown(done);
});
it('handles requests cancelled by the client', done => {
const call = client.serverStream({});
call.on('data', assert.ifError);
call.on('error', (error: ServiceError) => {
assert.strictEqual(error.code, grpc.status.CANCELLED);
assert.strictEqual(error.details, 'Cancelled on client');
assert.strictEqual(error.message, 'Cancelled on client');
waitForServerCancel();
});
function waitForHandler() {
if (inHandler === true) {
call.cancel();
return;
}
setImmediate(waitForHandler);
}
function waitForServerCancel() {
if (cancelledInServer === true) {
done();
return;
}
setImmediate(waitForServerCancel);
}
waitForHandler();
});
});