grpc-js: Implement deadline and cancellation propagation

This commit is contained in:
Michael Lumish 2020-10-30 11:38:30 -07:00
parent d8021d20d9
commit ae2b64bd65
6 changed files with 310 additions and 30 deletions

View File

@ -18,7 +18,7 @@
import * as http2 from 'http2';
import { CallCredentials } from './call-credentials';
import { Status } from './constants';
import { Propagate, Status } from './constants';
import { Filter, FilterFactory } from './filter';
import { FilterStackFactory, FilterStack } from './filter-stack';
import { Metadata } from './metadata';
@ -27,6 +27,7 @@ import { ChannelImplementation } from './channel';
import { Subchannel } from './subchannel';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { ServerSurfaceCall } from './server-call';
const TRACER_NAME = 'call_stream';
@ -42,7 +43,7 @@ export interface CallStreamOptions {
deadline: Deadline;
flags: number;
host: string;
parentCall: Call | null;
parentCall: ServerSurfaceCall | null;
}
export type PartialCallStreamOptions = Partial<CallStreamOptions>;
@ -218,6 +219,11 @@ export class Http2CallStream implements Call {
metadata: new Metadata(),
});
};
if (this.options.parentCall && this.options.flags & Propagate.CANCELLATION) {
this.options.parentCall.on('cancelled', () => {
this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
});
}
}
private outputStatus() {
@ -623,7 +629,11 @@ export class Http2CallStream implements Call {
}
getDeadline(): Deadline {
return this.options.deadline;
if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
return this.options.parentCall.getDeadline();
} else {
return this.options.deadline;
}
}
getCredentials(): CallCredentials {

View File

@ -28,7 +28,7 @@ import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, PickResultType } from './picker';
import { Metadata } from './metadata';
import { Status, LogVerbosity } from './constants';
import { Status, LogVerbosity, Propagate } from './constants';
import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
@ -39,6 +39,8 @@ import { SubchannelAddress } from './subchannel';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
import { mapProxyName } from './http_proxy';
import { GrpcUri, parseUri, uriToString } from './uri-parser';
import { ServerSurfaceCall } from './server-call';
import { SurfaceCall } from './call';
export enum ConnectivityState {
CONNECTING,
@ -118,7 +120,7 @@ export interface Channel {
method: string,
deadline: Deadline,
host: string | null | undefined,
parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any
parentCall: ServerSurfaceCall | null,
propagateFlags: number | null | undefined
): Call;
}
@ -509,7 +511,7 @@ export class ChannelImplementation implements Channel {
method: string,
deadline: Deadline,
host: string | null | undefined,
parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any
parentCall: ServerSurfaceCall | null,
propagateFlags: number | null | undefined
): Call {
if (typeof method !== 'string') {
@ -537,9 +539,9 @@ export class ChannelImplementation implements Channel {
);
const finalOptions: CallStreamOptions = {
deadline: deadline,
flags: propagateFlags || 0,
host: host || this.defaultAuthority,
parentCall: parentCall || null,
flags: propagateFlags ?? Propagate.DEFAULTS,
host: host ?? this.defaultAuthority,
parentCall: parentCall,
};
const stream: Http2CallStream = new Http2CallStream(
method,

View File

@ -311,21 +311,11 @@ export class InterceptingCall implements InterceptingCallInterface {
}
function getCall(channel: Channel, path: string, options: CallOptions): Call {
let deadline;
let host;
const parent = null;
let propagateFlags;
let credentials;
if (options) {
deadline = options.deadline;
host = options.host;
propagateFlags = options.propagate_flags;
credentials = options.credentials;
}
if (deadline === undefined) {
deadline = Infinity;
}
const deadline = options.deadline ?? Infinity;
const host = options.host;
const parent = options.parent ?? null;
const propagateFlags = options.propagate_flags;
const credentials = options.credentials;
const call = channel.createCall(path, deadline, host, parent, propagateFlags);
if (credentials) {
call.setCredentials(credentials);

View File

@ -50,7 +50,8 @@ export enum Propagate {
CENSUS_STATS_CONTEXT = 2,
CENSUS_TRACING_CONTEXT = 4,
CANCELLATION = 8,
DEFAULTS = 65536,
// https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/propagation_bits.h#L43
DEFAULTS = 0xffff | Propagate.DEADLINE | Propagate.CENSUS_STATS_CONTEXT | Propagate.CENSUS_TRACING_CONTEXT | Propagate.CANCELLATION,
}
// -1 means unlimited

View File

@ -19,7 +19,7 @@ import { EventEmitter } from 'events';
import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream';
import { StatusObject } from './call-stream';
import { Deadline, StatusObject } from './call-stream';
import {
Status,
DEFAULT_MAX_SEND_MESSAGE_LENGTH,
@ -78,6 +78,7 @@ export type ServerSurfaceCall = {
readonly metadata: Metadata;
getPeer(): string;
sendMetadata(responseMetadata: Metadata): void;
getDeadline(): Deadline;
} & EventEmitter;
export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
@ -120,6 +121,10 @@ export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata);
}
getDeadline(): Deadline {
return this.call.getDeadline();
}
}
export class ServerReadableStreamImpl<RequestType, ResponseType>
@ -153,6 +158,10 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata);
}
getDeadline(): Deadline {
return this.call.getDeadline();
}
}
export class ServerWritableStreamImpl<RequestType, ResponseType>
@ -186,6 +195,10 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
this.call.sendMetadata(responseMetadata);
}
getDeadline(): Deadline {
return this.call.getDeadline();
}
_write(
chunk: ResponseType,
encoding: string,
@ -257,6 +270,10 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
this.call.sendMetadata(responseMetadata);
}
getDeadline(): Deadline {
return this.call.getDeadline();
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
end(metadata?: any) {
if (metadata) {
@ -357,7 +374,8 @@ export class Http2ServerCallStream<
ResponseType
> extends EventEmitter {
cancelled = false;
deadline: NodeJS.Timer = setTimeout(() => {}, 0);
deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
private deadline: Deadline = Infinity;
private wantTrailers = false;
private metadataSent = false;
private canPush = false;
@ -405,7 +423,7 @@ export class Http2ServerCallStream<
}
// Clear noop timer
clearTimeout(this.deadline);
clearTimeout(this.deadlineTimer);
}
private checkCancelled(): boolean {
@ -452,7 +470,9 @@ export class Http2ServerCallStream<
const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
this.deadline = setTimeout(handleExpiredDeadline, timeout, this);
const now = new Date();
this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
metadata.remove(GRPC_TIMEOUT_HEADER);
}
@ -566,7 +586,7 @@ export class Http2ServerCallStream<
statusObj.details
);
clearTimeout(this.deadline);
clearTimeout(this.deadlineTimer);
if (!this.wantTrailers) {
this.wantTrailers = true;
@ -779,6 +799,10 @@ export class Http2ServerCallStream<
return 'unknown';
}
}
getDeadline(): Deadline {
return this.deadline;
}
}
/* eslint-disable @typescript-eslint/no-explicit-any */

View File

@ -0,0 +1,253 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as assert from 'assert';
import * as grpc from '../src';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { loadProtoFile } from './common';
function multiDone(done: () => void, target: number) {
let count = 0;
return () => {
count++;
if (count >= target) {
done();
}
}
}
describe('Call propagation', () => {
let server: grpc.Server;
let Client: ServiceClientConstructor;
let client: ServiceClient;
let proxyServer: grpc.Server;
let proxyClient: ServiceClient;
before((done) => {
Client = loadProtoFile(__dirname + '/fixtures/test_service.proto').TestService as ServiceClientConstructor;
server = new grpc.Server();
server.addService(Client.service, {
unary: () => {},
clientStream: () => {},
serverStream: () => {},
bidiStream: () => {}
});
proxyServer = new grpc.Server();
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
if (error) {
done(error);
return;
}
server.start();
client = new Client(`localhost:${port}`, grpc.credentials.createInsecure());
proxyServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, proxyPort) => {
if (error) {
done(error);
return;
}
proxyServer.start();
proxyClient = new Client(`localhost:${proxyPort}`, grpc.credentials.createInsecure());
done();
});
});
});
afterEach(() => {
proxyServer.removeService(Client.service);
});
after(() => {
server.forceShutdown();
proxyServer.forceShutdown();
});
describe('Cancellation', () => {
it('should work with unary requests', (done) => {
done = multiDone(done, 2);
let call: grpc.ClientUnaryCall;
proxyServer.addService(Client.service, {
unary: (parent: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) => {
client.unary(parent.request, {parent: parent}, (error: grpc.ServiceError, value: unknown) => {
callback(error, value);
assert(error);
assert.strictEqual(error.code, grpc.status.CANCELLED);
done();
});
/* Cancel the original call after the server starts processing it to
* ensure that it does reach the server. */
call.cancel();
}
});
call = proxyClient.unary({}, (error: grpc.ServiceError, value: unknown) => {
assert(error);
assert.strictEqual(error.code, grpc.status.CANCELLED);
done();
});
});
it('Should work with client streaming requests', (done) => {
done = multiDone(done, 2);
let call: grpc.ClientWritableStream<unknown>;
proxyServer.addService(Client.service, {
clientStream: (parent: grpc.ServerReadableStream<any, any>, callback: grpc.sendUnaryData<any>) => {
client.clientStream({parent: parent}, (error: grpc.ServiceError, value: unknown) => {
callback(error, value);
assert(error);
assert.strictEqual(error.code, grpc.status.CANCELLED);
done();
});
/* Cancel the original call after the server starts processing it to
* ensure that it does reach the server. */
call.cancel();
}
});
call = proxyClient.clientStream((error: grpc.ServiceError, value: unknown) => {
assert(error);
assert.strictEqual(error.code, grpc.status.CANCELLED);
done();
});
});
it('Should work with server streaming requests', (done) => {
done = multiDone(done, 2);
let call: grpc.ClientReadableStream<unknown>;
proxyServer.addService(Client.service, {
serverStream: (parent: grpc.ServerWritableStream<any, any>) => {
const child = client.serverStream(parent.request, {parent: parent});
child.on('error', () => {});
child.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.CANCELLED);
done();
});
call.cancel();
}
});
call = proxyClient.serverStream({});
call.on('error', () => {});
call.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.CANCELLED);
done();
});
});
it('Should work with bidi streaming requests', (done) => {
done = multiDone(done, 2);
let call: grpc.ClientDuplexStream<unknown, unknown>;
proxyServer.addService(Client.service, {
bidiStream: (parent: grpc.ServerDuplexStream<any, any>) => {
const child = client.bidiStream({parent: parent});
child.on('error', () => {});
child.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.CANCELLED);
done();
});
call.cancel();
}
});
call = proxyClient.bidiStream();
call.on('error', () => {});
call.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.CANCELLED);
done();
});
});
});
describe('Deadlines', () => {
it('should work with unary requests', (done) => {
done = multiDone(done, 2);
let call: grpc.ClientUnaryCall;
proxyServer.addService(Client.service, {
unary: (parent: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) => {
client.unary(parent.request, {parent: parent, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => {
callback(error, value);
assert(error);
assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
}
});
const deadline = new Date();
deadline.setMilliseconds(deadline.getMilliseconds() + 100);
call = proxyClient.unary({}, {deadline}, (error: grpc.ServiceError, value: unknown) => {
assert(error);
assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
});
it('Should work with client streaming requests', (done) => {
done = multiDone(done, 2);
let call: grpc.ClientWritableStream<unknown>;
proxyServer.addService(Client.service, {
clientStream: (parent: grpc.ServerReadableStream<any, any>, callback: grpc.sendUnaryData<any>) => {
client.clientStream({parent: parent, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => {
callback(error, value);
assert(error);
assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
}
});
const deadline = new Date();
deadline.setMilliseconds(deadline.getMilliseconds() + 100);
call = proxyClient.clientStream({deadline, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => {
assert(error);
assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
});
it('Should work with server streaming requests', (done) => {
done = multiDone(done, 2);
let call: grpc.ClientReadableStream<unknown>;
proxyServer.addService(Client.service, {
serverStream: (parent: grpc.ServerWritableStream<any, any>) => {
const child = client.serverStream(parent.request, {parent: parent, propagate_flags: grpc.propagate.DEADLINE});
child.on('error', () => {});
child.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
}
});
const deadline = new Date();
deadline.setMilliseconds(deadline.getMilliseconds() + 100);
call = proxyClient.serverStream({}, {deadline});
call.on('error', () => {});
call.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
});
it('Should work with bidi streaming requests', (done) => {
done = multiDone(done, 2);
let call: grpc.ClientDuplexStream<unknown, unknown>;
proxyServer.addService(Client.service, {
bidiStream: (parent: grpc.ServerDuplexStream<any, any>) => {
const child = client.bidiStream({parent: parent, propagate_flags: grpc.propagate.DEADLINE});
child.on('error', () => {});
child.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
}
});
const deadline = new Date();
deadline.setMilliseconds(deadline.getMilliseconds() + 100);
call = proxyClient.bidiStream({deadline});
call.on('error', () => {});
call.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
});
});
});