mirror of https://github.com/grpc/grpc-node.git
Merge pull request #1785 from murgatroid99/grpc-js_service_config_timeout
grpc-js: Apply timeouts from service configs
This commit is contained in:
commit
663fe77e72
|
@ -70,6 +70,17 @@ function getSystemErrorName(errno: number): string {
|
|||
|
||||
export type Deadline = Date | number;
|
||||
|
||||
function getMinDeadline(deadlineList: Deadline[]): Deadline {
|
||||
let minValue: number = Infinity;
|
||||
for (const deadline of deadlineList) {
|
||||
const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline;
|
||||
if (deadlineMsecs < minValue) {
|
||||
minValue = deadlineMsecs;
|
||||
}
|
||||
}
|
||||
return minValue;
|
||||
}
|
||||
|
||||
export interface CallStreamOptions {
|
||||
deadline: Deadline;
|
||||
flags: number;
|
||||
|
@ -235,6 +246,8 @@ export class Http2CallStream implements Call {
|
|||
|
||||
private internalError: SystemError | null = null;
|
||||
|
||||
private configDeadline: Deadline = Infinity;
|
||||
|
||||
constructor(
|
||||
private readonly methodName: string,
|
||||
private readonly channel: ChannelImplementation,
|
||||
|
@ -675,15 +688,14 @@ export class Http2CallStream implements Call {
|
|||
}
|
||||
|
||||
getDeadline(): Deadline {
|
||||
const deadlineList = [this.options.deadline];
|
||||
if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
|
||||
const parentDeadline = this.options.parentCall.getDeadline();
|
||||
const selfDeadline = this.options.deadline;
|
||||
const parentDeadlineMsecs = parentDeadline instanceof Date ? parentDeadline.getTime() : parentDeadline;
|
||||
const selfDeadlineMsecs = selfDeadline instanceof Date ? selfDeadline.getTime() : selfDeadline;
|
||||
return Math.min(parentDeadlineMsecs, selfDeadlineMsecs);
|
||||
} else {
|
||||
return this.options.deadline;
|
||||
deadlineList.push(this.options.parentCall.getDeadline());
|
||||
}
|
||||
if (this.configDeadline) {
|
||||
deadlineList.push(this.configDeadline);
|
||||
}
|
||||
return getMinDeadline(deadlineList);
|
||||
}
|
||||
|
||||
getCredentials(): CallCredentials {
|
||||
|
@ -710,6 +722,10 @@ export class Http2CallStream implements Call {
|
|||
return this.options.host;
|
||||
}
|
||||
|
||||
setConfigDeadline(configDeadline: Deadline) {
|
||||
this.configDeadline = configDeadline;
|
||||
}
|
||||
|
||||
startRead() {
|
||||
/* If the stream has ended with an error, we should not emit any more
|
||||
* messages and we should communicate that the stream has ended */
|
||||
|
|
|
@ -509,6 +509,11 @@ export class ChannelImplementation implements Channel {
|
|||
}
|
||||
|
||||
private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
|
||||
if (stream.getStatus() !== null) {
|
||||
/* If the stream has a status, it has already finished and we don't need
|
||||
* to take any more actions on it. */
|
||||
return;
|
||||
}
|
||||
if (this.configSelector === null) {
|
||||
/* This branch will only be taken at the beginning of the channel's life,
|
||||
* before the resolver ever returns a result. So, the
|
||||
|
@ -523,6 +528,14 @@ export class ChannelImplementation implements Channel {
|
|||
} else {
|
||||
const callConfig = this.configSelector(stream.getMethod(), metadata);
|
||||
if (callConfig.status === Status.OK) {
|
||||
if (callConfig.methodConfig.timeout) {
|
||||
const deadline = new Date();
|
||||
deadline.setSeconds(deadline.getSeconds() + callConfig.methodConfig.timeout.seconds);
|
||||
deadline.setMilliseconds(deadline.getMilliseconds() + callConfig.methodConfig.timeout.nanos / 1_000_000);
|
||||
stream.setConfigDeadline(deadline);
|
||||
// Refreshing the filters makes the deadline filter pick up the new deadline
|
||||
stream.filterStack.refresh();
|
||||
}
|
||||
this.tryPick(stream, metadata, callConfig);
|
||||
} else {
|
||||
stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod());
|
||||
|
|
|
@ -42,30 +42,41 @@ function getDeadline(deadline: number) {
|
|||
|
||||
export class DeadlineFilter extends BaseFilter implements Filter {
|
||||
private timer: NodeJS.Timer | null = null;
|
||||
private deadline: number;
|
||||
private deadline: number = Infinity;
|
||||
constructor(
|
||||
private readonly channel: Channel,
|
||||
private readonly callStream: Call
|
||||
) {
|
||||
super();
|
||||
const callDeadline = callStream.getDeadline();
|
||||
this.retreiveDeadline();
|
||||
this.runTimer();
|
||||
}
|
||||
|
||||
private retreiveDeadline() {
|
||||
const callDeadline = this.callStream.getDeadline();
|
||||
if (callDeadline instanceof Date) {
|
||||
this.deadline = callDeadline.getTime();
|
||||
} else {
|
||||
this.deadline = callDeadline;
|
||||
}
|
||||
}
|
||||
|
||||
private runTimer() {
|
||||
if (this.timer) {
|
||||
clearTimeout(this.timer);
|
||||
}
|
||||
const now: number = new Date().getTime();
|
||||
let timeout = this.deadline - now;
|
||||
if (timeout <= 0) {
|
||||
process.nextTick(() => {
|
||||
callStream.cancelWithStatus(
|
||||
this.callStream.cancelWithStatus(
|
||||
Status.DEADLINE_EXCEEDED,
|
||||
'Deadline exceeded'
|
||||
);
|
||||
});
|
||||
} else if (this.deadline !== Infinity) {
|
||||
this.timer = setTimeout(() => {
|
||||
callStream.cancelWithStatus(
|
||||
this.callStream.cancelWithStatus(
|
||||
Status.DEADLINE_EXCEEDED,
|
||||
'Deadline exceeded'
|
||||
);
|
||||
|
@ -74,6 +85,11 @@ export class DeadlineFilter extends BaseFilter implements Filter {
|
|||
}
|
||||
}
|
||||
|
||||
refresh() {
|
||||
this.retreiveDeadline();
|
||||
this.runTimer();
|
||||
}
|
||||
|
||||
async sendMetadata(metadata: Promise<Metadata>) {
|
||||
if (this.deadline === Infinity) {
|
||||
return metadata;
|
||||
|
|
|
@ -71,6 +71,12 @@ export class FilterStack implements Filter {
|
|||
|
||||
return result;
|
||||
}
|
||||
|
||||
refresh(): void {
|
||||
for (const filter of this.filters) {
|
||||
filter.refresh();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class FilterStackFactory implements FilterFactory<FilterStack> {
|
||||
|
|
|
@ -32,6 +32,8 @@ export interface Filter {
|
|||
receiveMessage(message: Promise<Buffer>): Promise<Buffer>;
|
||||
|
||||
receiveTrailers(status: StatusObject): StatusObject;
|
||||
|
||||
refresh(): void;
|
||||
}
|
||||
|
||||
export abstract class BaseFilter implements Filter {
|
||||
|
@ -54,6 +56,9 @@ export abstract class BaseFilter implements Filter {
|
|||
receiveTrailers(status: StatusObject): StatusObject {
|
||||
return status;
|
||||
}
|
||||
|
||||
refresh(): void {
|
||||
}
|
||||
}
|
||||
|
||||
export interface FilterFactory<T extends Filter> {
|
||||
|
|
|
@ -34,10 +34,15 @@ export interface MethodConfigName {
|
|||
method?: string;
|
||||
}
|
||||
|
||||
export interface Duration {
|
||||
seconds: number;
|
||||
nanos: number;
|
||||
}
|
||||
|
||||
export interface MethodConfig {
|
||||
name: MethodConfigName[];
|
||||
waitForReady?: boolean;
|
||||
timeout?: string;
|
||||
timeout?: Duration;
|
||||
maxRequestBytes?: number;
|
||||
maxResponseBytes?: number;
|
||||
}
|
||||
|
@ -101,13 +106,25 @@ function validateMethodConfig(obj: any): MethodConfig {
|
|||
result.waitForReady = obj.waitForReady;
|
||||
}
|
||||
if ('timeout' in obj) {
|
||||
if (
|
||||
!(typeof obj.timeout === 'string') ||
|
||||
!TIMEOUT_REGEX.test(obj.timeout)
|
||||
if (typeof obj.timeout === 'object') {
|
||||
if (!('seconds' in obj.timeout) || !(typeof obj.timeout.seconds === 'number')) {
|
||||
throw new Error('Invalid method config: invalid timeout.seconds');
|
||||
}
|
||||
if (!('nanos' in obj.timeout) || !(typeof obj.timeout.nanos === 'number')) {
|
||||
throw new Error('Invalid method config: invalid timeout.nanos');
|
||||
}
|
||||
result.timeout = obj.timeout;
|
||||
} else if (
|
||||
(typeof obj.timeout === 'string') && TIMEOUT_REGEX.test(obj.timeout)
|
||||
) {
|
||||
const timeoutParts = obj.timeout.substring(0, obj.timeout.length - 1).split('.');
|
||||
result.timeout = {
|
||||
seconds: timeoutParts[0] | 0,
|
||||
nanos: (timeoutParts[1] ?? 0) | 0
|
||||
}
|
||||
} else {
|
||||
throw new Error('Invalid method config: invalid timeout');
|
||||
}
|
||||
result.timeout = obj.timeout;
|
||||
}
|
||||
if ('maxRequestBytes' in obj) {
|
||||
if (typeof obj.maxRequestBytes !== 'number') {
|
||||
|
|
Loading…
Reference in New Issue