From 8a38cd8549dd804b2bf070e3ec4620c446d411e8 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 10 Jun 2021 14:48:33 -0700 Subject: [PATCH] grpc-js: Refactor FilterStack usage --- packages/grpc-js-xds/src/load-balancer-eds.ts | 10 +++------- packages/grpc-js-xds/src/load-balancer-lrs.ts | 11 +---------- .../src/load-balancer-xds-cluster-manager.ts | 2 +- packages/grpc-js/src/call-stream.ts | 11 +++-------- packages/grpc-js/src/channel.ts | 2 +- packages/grpc-js/src/filter-stack.ts | 8 ++++++++ packages/grpc-js/src/load-balancer-pick-first.ts | 2 +- packages/grpc-js/src/load-balancer-round-robin.ts | 2 +- packages/grpc-js/src/picker.ts | 14 +++++++------- packages/grpc-js/src/subchannel.ts | 4 ++-- 10 files changed, 28 insertions(+), 38 deletions(-) diff --git a/packages/grpc-js-xds/src/load-balancer-eds.ts b/packages/grpc-js-xds/src/load-balancer-eds.ts index 183e0d67..efb05eb7 100644 --- a/packages/grpc-js-xds/src/load-balancer-eds.ts +++ b/packages/grpc-js-xds/src/load-balancer-eds.ts @@ -37,7 +37,6 @@ import { Watcher } from './xds-stream-state/xds-stream-state'; import Filter = experimental.Filter; import BaseFilter = experimental.BaseFilter; import FilterFactory = experimental.FilterFactory; -import FilterStackFactory = experimental.FilterStackFactory; import CallStream = experimental.CallStream; const TRACER_NAME = 'eds_balancer'; @@ -206,12 +205,9 @@ export class EdsLoadBalancer implements LoadBalancer { * balancer. */ if (dropCategory === null) { const originalPick = originalPicker.pick(pickArgs); - let extraFilterFactory: FilterFactory = new CallTrackingFilterFactory(() => { + const trackingFilterFactory: FilterFactory = new CallTrackingFilterFactory(() => { this.concurrentRequests -= 1; }); - if (originalPick.extraFilterFactory) { - extraFilterFactory = new FilterStackFactory([originalPick.extraFilterFactory, extraFilterFactory]); - } return { pickResultType: originalPick.pickResultType, status: originalPick.status, @@ -220,7 +216,7 @@ export class EdsLoadBalancer implements LoadBalancer { originalPick.onCallStarted?.(); this.concurrentRequests += 1; }, - extraFilterFactory: extraFilterFactory + extraFilterFactories: originalPick.extraFilterFactories.concat(trackingFilterFactory) }; } else { let details: string; @@ -239,7 +235,7 @@ export class EdsLoadBalancer implements LoadBalancer { metadata: new Metadata(), }, subchannel: null, - extraFilterFactory: null, + extraFilterFactories: [], onCallStarted: null }; } diff --git a/packages/grpc-js-xds/src/load-balancer-lrs.ts b/packages/grpc-js-xds/src/load-balancer-lrs.ts index 0792b11c..566aa04c 100644 --- a/packages/grpc-js-xds/src/load-balancer-lrs.ts +++ b/packages/grpc-js-xds/src/load-balancer-lrs.ts @@ -32,7 +32,6 @@ import PickResult = experimental.PickResult; import Filter = experimental.Filter; import BaseFilter = experimental.BaseFilter; import FilterFactory = experimental.FilterFactory; -import FilterStackFactory = experimental.FilterStackFactory; import Call = experimental.CallStream; import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig @@ -148,14 +147,6 @@ class LoadReportingPicker implements Picker { const trackingFilterFactory = new CallEndTrackingFilterFactory( this.localityStatsReporter ); - /* In the unlikely event that the wrappedPick already has an - * extraFilterFactory, preserve it in a FilterStackFactory. */ - const extraFilterFactory = wrappedPick.extraFilterFactory - ? new FilterStackFactory([ - wrappedPick.extraFilterFactory, - trackingFilterFactory, - ]) - : trackingFilterFactory; return { pickResultType: PickResultType.COMPLETE, subchannel: wrappedPick.subchannel, @@ -164,7 +155,7 @@ class LoadReportingPicker implements Picker { wrappedPick.onCallStarted?.(); this.localityStatsReporter.addCallStarted(); }, - extraFilterFactory: extraFilterFactory, + extraFilterFactories: wrappedPick.extraFilterFactories.concat(trackingFilterFactory), }; } else { return wrappedPick; diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts index 920a43db..7df79e26 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -107,7 +107,7 @@ class XdsClusterManagerPicker implements Picker { metadata: new Metadata(), }, subchannel: null, - extraFilterFactory: null, + extraFilterFactories: [], onCallStarted: null }; } diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 3a81cbe9..ae634260 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -210,7 +210,7 @@ export interface Call { export class Http2CallStream implements Call { credentials: CallCredentials; - filterStack: Filter; + filterStack: FilterStack; private http2Stream: http2.ClientHttp2Stream | null = null; private pendingRead = false; private isWriteFilterPending = false; @@ -462,14 +462,9 @@ export class Http2CallStream implements Call { attachHttp2Stream( stream: http2.ClientHttp2Stream, subchannel: Subchannel, - extraFilterFactory?: FilterFactory + extraFilters: FilterFactory[] ): void { - if (extraFilterFactory !== undefined) { - this.filterStack = new FilterStack([ - this.filterStack, - extraFilterFactory.createFilter(this), - ]); - } + this.filterStack.push(extraFilters.map(filterFactory => filterFactory.createFilter(this))); if (this.finalStatus !== null) { stream.close(NGHTTP2_CANCEL); } else { diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 8b2658f3..cd2fc94a 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -373,7 +373,7 @@ export class ChannelImplementation implements Channel { pickResult.subchannel!.startCallStream( finalMetadata, callStream, - pickResult.extraFilterFactory ?? undefined + pickResult.extraFilterFactories ); /* If we reach this point, the call stream has started * successfully */ diff --git a/packages/grpc-js/src/filter-stack.ts b/packages/grpc-js/src/filter-stack.ts index 4fd88854..4e0ccf07 100644 --- a/packages/grpc-js/src/filter-stack.ts +++ b/packages/grpc-js/src/filter-stack.ts @@ -77,11 +77,19 @@ export class FilterStack implements Filter { filter.refresh(); } } + + push(filters: Filter[]) { + this.filters.unshift(...filters); + } } export class FilterStackFactory implements FilterFactory { constructor(private readonly factories: Array>) {} + push(filterFactories: FilterFactory[]) { + this.factories.unshift(...filterFactories); + } + createFilter(callStream: Call): FilterStack { return new FilterStack( this.factories.map((factory) => factory.createFilter(callStream)) diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 31dc1784..688f9556 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -83,7 +83,7 @@ class PickFirstPicker implements Picker { pickResultType: PickResultType.COMPLETE, subchannel: this.subchannel, status: null, - extraFilterFactory: null, + extraFilterFactories: [], onCallStarted: null, }; } diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index daba4594..2159e64c 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -78,7 +78,7 @@ class RoundRobinPicker implements Picker { pickResultType: PickResultType.COMPLETE, subchannel: pickedSubchannel, status: null, - extraFilterFactory: null, + extraFilterFactories: [], onCallStarted: null, }; } diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index 6df61b59..25929884 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -47,7 +47,7 @@ export interface PickResult { * provided by the load balancer to be used with the call. For technical * reasons filters from this factory will not see sendMetadata events. */ - extraFilterFactory: FilterFactory | null; + extraFilterFactories: FilterFactory[]; onCallStarted: (() => void) | null; } @@ -55,7 +55,7 @@ export interface CompletePickResult extends PickResult { pickResultType: PickResultType.COMPLETE; subchannel: Subchannel | null; status: null; - extraFilterFactory: FilterFactory | null; + extraFilterFactories: FilterFactory[]; onCallStarted: (() => void) | null; } @@ -63,7 +63,7 @@ export interface QueuePickResult extends PickResult { pickResultType: PickResultType.QUEUE; subchannel: null; status: null; - extraFilterFactory: null; + extraFilterFactories: []; onCallStarted: null; } @@ -71,7 +71,7 @@ export interface TransientFailurePickResult extends PickResult { pickResultType: PickResultType.TRANSIENT_FAILURE; subchannel: null; status: StatusObject; - extraFilterFactory: null; + extraFilterFactories: []; onCallStarted: null; } @@ -79,7 +79,7 @@ export interface DropCallPickResult extends PickResult { pickResultType: PickResultType.DROP; subchannel: null; status: StatusObject; - extraFilterFactory: null; + extraFilterFactories: []; onCallStarted: null; } @@ -119,7 +119,7 @@ export class UnavailablePicker implements Picker { pickResultType: PickResultType.TRANSIENT_FAILURE, subchannel: null, status: this.status, - extraFilterFactory: null, + extraFilterFactories: [], onCallStarted: null, }; } @@ -148,7 +148,7 @@ export class QueuePicker { pickResultType: PickResultType.QUEUE, subchannel: null, status: null, - extraFilterFactory: null, + extraFilterFactories: [], onCallStarted: null, }; } diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 772bdb22..c50a68e7 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -688,7 +688,7 @@ export class Subchannel { startCallStream( metadata: Metadata, callStream: Http2CallStream, - extraFilterFactory?: FilterFactory + extraFilterFactories: FilterFactory[] ) { const headers = metadata.toHttp2Headers(); headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); @@ -720,7 +720,7 @@ export class Subchannel { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } logging.trace(LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' + this.subchannelAddressString + ' with headers\n' + headersString); - callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory); + callStream.attachHttp2Stream(http2Stream, this, extraFilterFactories); } /**