grpc-js: Handle filters in ResolvingCall instead of LoadBalancingCall

This commit is contained in:
Michael Lumish 2022-10-17 11:32:22 -07:00
parent aaa568fc01
commit c4c321d37d
3 changed files with 83 additions and 74 deletions

View File

@ -402,7 +402,7 @@ export class InternalChannel {
method +
'"'
);
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, this.filterStackFactory, callNumber);
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber);
}
createInnerCall(

View File

@ -41,14 +41,11 @@ export interface StatusObjectWithProgress extends StatusObject {
export class LoadBalancingCall implements Call {
private child: SubchannelCall | null = null;
private readPending = false;
private writeFilterPending = false;
private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
private pendingHalfClose = false;
private readFilterPending = false;
private pendingChildStatus: StatusObject | null = null;
private ended = false;
private serviceUrl: string;
private filterStack: FilterStack;
private metadata: Metadata | null = null;
private listener: InterceptingListener | null = null;
private onCallEnded: ((statusCode: Status) => void) | null = null;
@ -59,11 +56,8 @@ export class LoadBalancingCall implements Call {
private readonly host : string,
private readonly credentials: CallCredentials,
private readonly deadline: Deadline,
filterStackFactory: FilterStackFactory,
private readonly callNumber: number
) {
this.filterStack = filterStackFactory.createFilter();
const splitPath: string[] = this.methodName.split('/');
let serviceName = '';
/* The standard path format is "/{serviceName}/{methodName}", so if we split
@ -90,8 +84,7 @@ export class LoadBalancingCall implements Call {
if (!this.ended) {
this.ended = true;
this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
const filteredStatus = this.filterStack.receiveTrailers(status);
const finalStatus = {...filteredStatus, progress};
const finalStatus = {...status, progress};
this.listener?.onReceiveStatus(finalStatus);
this.onCallEnded?.(finalStatus.code);
}
@ -152,26 +145,13 @@ export class LoadBalancingCall implements Call {
try {
this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
onReceiveMetadata: metadata => {
this.listener!.onReceiveMetadata(this.filterStack.receiveMetadata(metadata));
this.listener!.onReceiveMetadata(metadata);
},
onReceiveMessage: message => {
this.readFilterPending = true;
this.filterStack.receiveMessage(message).then(filteredMesssage => {
this.readFilterPending = false;
this.listener!.onReceiveMessage(filteredMesssage);
if (this.pendingChildStatus) {
this.outputStatus(this.pendingChildStatus, 'PROCESSED');
}
}, (status: StatusObject) => {
this.cancelWithStatus(status.code, status.details);
});
this.listener!.onReceiveMessage(message);
},
onReceiveStatus: status => {
if (this.readFilterPending) {
this.pendingChildStatus = status;
} else {
this.outputStatus(status, 'PROCESSED');
}
this.outputStatus(status, 'PROCESSED');
}
});
} catch (error) {
@ -201,7 +181,7 @@ export class LoadBalancingCall implements Call {
if (this.pendingMessage) {
this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
}
if (this.pendingHalfClose && !this.writeFilterPending) {
if (this.pendingHalfClose) {
this.child.halfClose();
}
}, (error: Error & { code: number }) => {
@ -249,29 +229,16 @@ export class LoadBalancingCall implements Call {
start(metadata: Metadata, listener: InterceptingListener): void {
this.trace('start called');
this.listener = listener;
this.filterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
this.metadata = filteredMetadata;
this.doPick();
}, (status: StatusObject) => {
this.outputStatus(status, 'PROCESSED');
});
this.metadata = metadata;
this.doPick();
}
sendMessageWithContext(context: MessageContext, message: Buffer): void {
this.trace('write() called with message of length ' + message.length);
this.writeFilterPending = true;
this.filterStack.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
this.writeFilterPending = false;
if (this.child) {
this.child.sendMessageWithContext(context, filteredMessage.message);
if (this.pendingHalfClose) {
this.child.halfClose();
}
} else {
this.pendingMessage = {context, message: filteredMessage.message};
}
}, (status: StatusObject) => {
this.cancelWithStatus(status.code, status.details);
})
if (this.child) {
this.child.sendMessageWithContext(context, message);
} else {
this.pendingMessage = {context, message};
}
}
startRead(): void {
this.trace('startRead called');
@ -283,7 +250,7 @@ export class LoadBalancingCall implements Call {
}
halfClose(): void {
this.trace('halfClose called');
if (this.child && !this.writeFilterPending) {
if (this.child) {
this.child.halfClose();
} else {
this.pendingHalfClose = true;

View File

@ -19,7 +19,7 @@ import { CallCredentials } from "./call-credentials";
import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
import { LogVerbosity, Propagate, Status } from "./constants";
import { Deadline, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline";
import { FilterStackFactory } from "./filter-stack";
import { FilterStack, FilterStackFactory } from "./filter-stack";
import { InternalChannel } from "./internal-channel";
import { Metadata } from "./metadata";
import * as logging from './logging';
@ -33,12 +33,16 @@ export class ResolvingCall implements Call {
private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
private pendingHalfClose = false;
private ended = false;
private readFilterPending = false;
private writeFilterPending = false;
private pendingChildStatus: StatusObject | null = null;
private metadata: Metadata | null = null;
private listener: InterceptingListener | null = null;
private deadline: Deadline;
private host: string;
private statusWatchers: ((status: StatusObject) => void)[] = [];
private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
private filterStack: FilterStack | null = null;
constructor(
private readonly channel: InternalChannel,
@ -96,14 +100,35 @@ export class ResolvingCall implements Call {
private outputStatus(status: StatusObject) {
if (!this.ended) {
this.ended = true;
this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
this.statusWatchers.forEach(watcher => watcher(status));
if (!this.filterStack) {
this.filterStack = this.filterStackFactory.createFilter();
}
const filteredStatus = this.filterStack.receiveTrailers(status);
this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"');
this.statusWatchers.forEach(watcher => watcher(filteredStatus));
process.nextTick(() => {
this.listener?.onReceiveStatus(status);
this.listener?.onReceiveStatus(filteredStatus);
});
}
}
private sendMessageOnChild(context: MessageContext, message: Buffer): void {
if (!this.child) {
throw new Error('sendMessageonChild called with child not populated');
}
const child = this.child;
this.writeFilterPending = true;
this.filterStack!.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
this.writeFilterPending = false;
child.sendMessageWithContext(context, filteredMessage.message);
if (this.pendingHalfClose) {
child.halfClose();
}
}, (status: StatusObject) => {
this.cancelWithStatus(status.code, status.details);
});
}
getConfig(): void {
if (this.ended) {
return;
@ -148,29 +173,46 @@ export class ResolvingCall implements Call {
}
this.filterStackFactory.push(config.dynamicFilterFactories);
this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
this.child.start(this.metadata, {
onReceiveMetadata: metadata => {
this.listener!.onReceiveMetadata(metadata);
},
onReceiveMessage: message => {
this.listener!.onReceiveMessage(message);
},
onReceiveStatus: status => {
this.outputStatus(status);
this.filterStack = this.filterStackFactory.createFilter();
this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => {
this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
this.child.start(filteredMetadata, {
onReceiveMetadata: metadata => {
this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata));
},
onReceiveMessage: message => {
this.readFilterPending = true;
this.filterStack!.receiveMessage(message).then(filteredMesssage => {
this.readFilterPending = false;
this.listener!.onReceiveMessage(filteredMesssage);
if (this.pendingChildStatus) {
this.outputStatus(this.pendingChildStatus);
}
}, (status: StatusObject) => {
this.cancelWithStatus(status.code, status.details);
});
},
onReceiveStatus: status => {
if (this.readFilterPending) {
this.pendingChildStatus = status;
} else {
this.outputStatus(status);
}
}
});
if (this.readPending) {
this.child.startRead();
}
});
if (this.readPending) {
this.child.startRead();
}
if (this.pendingMessage) {
this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
}
if (this.pendingHalfClose) {
this.child.halfClose();
}
if (this.pendingMessage) {
this.sendMessageOnChild(this.pendingMessage.context, this.pendingMessage.message);
} else if (this.pendingHalfClose) {
this.child.halfClose();
}
}, (status: StatusObject) => {
this.outputStatus(status);
})
}
reportResolverError(status: StatusObject) {
if (this.metadata?.getOptions().waitForReady) {
this.channel.queueCallForConfig(this);
@ -195,7 +237,7 @@ export class ResolvingCall implements Call {
sendMessageWithContext(context: MessageContext, message: Buffer): void {
this.trace('write() called with message of length ' + message.length);
if (this.child) {
this.child.sendMessageWithContext(context, message);
this.sendMessageOnChild(context, message);
} else {
this.pendingMessage = {context, message};
}
@ -210,7 +252,7 @@ export class ResolvingCall implements Call {
}
halfClose(): void {
this.trace('halfClose called');
if (this.child) {
if (this.child && !this.writeFilterPending) {
this.child.halfClose();
} else {
this.pendingHalfClose = true;