grpc-js: Refactor FilterStack usage

This commit is contained in:
Michael Lumish 2021-06-10 14:48:33 -07:00
parent 47ac924abe
commit 8a38cd8549
10 changed files with 28 additions and 38 deletions

View File

@ -37,7 +37,6 @@ import { Watcher } from './xds-stream-state/xds-stream-state';
import Filter = experimental.Filter;
import BaseFilter = experimental.BaseFilter;
import FilterFactory = experimental.FilterFactory;
import FilterStackFactory = experimental.FilterStackFactory;
import CallStream = experimental.CallStream;
const TRACER_NAME = 'eds_balancer';
@ -206,12 +205,9 @@ export class EdsLoadBalancer implements LoadBalancer {
* balancer. */
if (dropCategory === null) {
const originalPick = originalPicker.pick(pickArgs);
let extraFilterFactory: FilterFactory<Filter> = new CallTrackingFilterFactory(() => {
const trackingFilterFactory: FilterFactory<Filter> = new CallTrackingFilterFactory(() => {
this.concurrentRequests -= 1;
});
if (originalPick.extraFilterFactory) {
extraFilterFactory = new FilterStackFactory([originalPick.extraFilterFactory, extraFilterFactory]);
}
return {
pickResultType: originalPick.pickResultType,
status: originalPick.status,
@ -220,7 +216,7 @@ export class EdsLoadBalancer implements LoadBalancer {
originalPick.onCallStarted?.();
this.concurrentRequests += 1;
},
extraFilterFactory: extraFilterFactory
extraFilterFactories: originalPick.extraFilterFactories.concat(trackingFilterFactory)
};
} else {
let details: string;
@ -239,7 +235,7 @@ export class EdsLoadBalancer implements LoadBalancer {
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactory: null,
extraFilterFactories: [],
onCallStarted: null
};
}

View File

@ -32,7 +32,6 @@ import PickResult = experimental.PickResult;
import Filter = experimental.Filter;
import BaseFilter = experimental.BaseFilter;
import FilterFactory = experimental.FilterFactory;
import FilterStackFactory = experimental.FilterStackFactory;
import Call = experimental.CallStream;
import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig
@ -148,14 +147,6 @@ class LoadReportingPicker implements Picker {
const trackingFilterFactory = new CallEndTrackingFilterFactory(
this.localityStatsReporter
);
/* In the unlikely event that the wrappedPick already has an
* extraFilterFactory, preserve it in a FilterStackFactory. */
const extraFilterFactory = wrappedPick.extraFilterFactory
? new FilterStackFactory([
wrappedPick.extraFilterFactory,
trackingFilterFactory,
])
: trackingFilterFactory;
return {
pickResultType: PickResultType.COMPLETE,
subchannel: wrappedPick.subchannel,
@ -164,7 +155,7 @@ class LoadReportingPicker implements Picker {
wrappedPick.onCallStarted?.();
this.localityStatsReporter.addCallStarted();
},
extraFilterFactory: extraFilterFactory,
extraFilterFactories: wrappedPick.extraFilterFactories.concat(trackingFilterFactory),
};
} else {
return wrappedPick;

View File

@ -107,7 +107,7 @@ class XdsClusterManagerPicker implements Picker {
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactory: null,
extraFilterFactories: [],
onCallStarted: null
};
}

View File

@ -210,7 +210,7 @@ export interface Call {
export class Http2CallStream implements Call {
credentials: CallCredentials;
filterStack: Filter;
filterStack: FilterStack;
private http2Stream: http2.ClientHttp2Stream | null = null;
private pendingRead = false;
private isWriteFilterPending = false;
@ -462,14 +462,9 @@ export class Http2CallStream implements Call {
attachHttp2Stream(
stream: http2.ClientHttp2Stream,
subchannel: Subchannel,
extraFilterFactory?: FilterFactory<Filter>
extraFilters: FilterFactory<Filter>[]
): void {
if (extraFilterFactory !== undefined) {
this.filterStack = new FilterStack([
this.filterStack,
extraFilterFactory.createFilter(this),
]);
}
this.filterStack.push(extraFilters.map(filterFactory => filterFactory.createFilter(this)));
if (this.finalStatus !== null) {
stream.close(NGHTTP2_CANCEL);
} else {

View File

@ -373,7 +373,7 @@ export class ChannelImplementation implements Channel {
pickResult.subchannel!.startCallStream(
finalMetadata,
callStream,
pickResult.extraFilterFactory ?? undefined
pickResult.extraFilterFactories
);
/* If we reach this point, the call stream has started
* successfully */

View File

@ -77,11 +77,19 @@ export class FilterStack implements Filter {
filter.refresh();
}
}
push(filters: Filter[]) {
this.filters.unshift(...filters);
}
}
export class FilterStackFactory implements FilterFactory<FilterStack> {
constructor(private readonly factories: Array<FilterFactory<Filter>>) {}
push(filterFactories: FilterFactory<Filter>[]) {
this.factories.unshift(...filterFactories);
}
createFilter(callStream: Call): FilterStack {
return new FilterStack(
this.factories.map((factory) => factory.createFilter(callStream))

View File

@ -83,7 +83,7 @@ class PickFirstPicker implements Picker {
pickResultType: PickResultType.COMPLETE,
subchannel: this.subchannel,
status: null,
extraFilterFactory: null,
extraFilterFactories: [],
onCallStarted: null,
};
}

View File

@ -78,7 +78,7 @@ class RoundRobinPicker implements Picker {
pickResultType: PickResultType.COMPLETE,
subchannel: pickedSubchannel,
status: null,
extraFilterFactory: null,
extraFilterFactories: [],
onCallStarted: null,
};
}

View File

@ -47,7 +47,7 @@ export interface PickResult {
* provided by the load balancer to be used with the call. For technical
* reasons filters from this factory will not see sendMetadata events.
*/
extraFilterFactory: FilterFactory<Filter> | null;
extraFilterFactories: FilterFactory<Filter>[];
onCallStarted: (() => void) | null;
}
@ -55,7 +55,7 @@ export interface CompletePickResult extends PickResult {
pickResultType: PickResultType.COMPLETE;
subchannel: Subchannel | null;
status: null;
extraFilterFactory: FilterFactory<Filter> | null;
extraFilterFactories: FilterFactory<Filter>[];
onCallStarted: (() => void) | null;
}
@ -63,7 +63,7 @@ export interface QueuePickResult extends PickResult {
pickResultType: PickResultType.QUEUE;
subchannel: null;
status: null;
extraFilterFactory: null;
extraFilterFactories: [];
onCallStarted: null;
}
@ -71,7 +71,7 @@ export interface TransientFailurePickResult extends PickResult {
pickResultType: PickResultType.TRANSIENT_FAILURE;
subchannel: null;
status: StatusObject;
extraFilterFactory: null;
extraFilterFactories: [];
onCallStarted: null;
}
@ -79,7 +79,7 @@ export interface DropCallPickResult extends PickResult {
pickResultType: PickResultType.DROP;
subchannel: null;
status: StatusObject;
extraFilterFactory: null;
extraFilterFactories: [];
onCallStarted: null;
}
@ -119,7 +119,7 @@ export class UnavailablePicker implements Picker {
pickResultType: PickResultType.TRANSIENT_FAILURE,
subchannel: null,
status: this.status,
extraFilterFactory: null,
extraFilterFactories: [],
onCallStarted: null,
};
}
@ -148,7 +148,7 @@ export class QueuePicker {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
extraFilterFactory: null,
extraFilterFactories: [],
onCallStarted: null,
};
}

View File

@ -688,7 +688,7 @@ export class Subchannel {
startCallStream(
metadata: Metadata,
callStream: Http2CallStream,
extraFilterFactory?: FilterFactory<Filter>
extraFilterFactories: FilterFactory<Filter>[]
) {
const headers = metadata.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
@ -720,7 +720,7 @@ export class Subchannel {
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
}
logging.trace(LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' + this.subchannelAddressString + ' with headers\n' + headersString);
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory);
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactories);
}
/**