From 3d4a27e6cc3f19ae660a076b63077942721f8441 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 24 Apr 2020 14:00:54 -0700 Subject: [PATCH] Plumb through an extra filter from the load balancer to the call stream --- packages/grpc-js/src/call-stream.ts | 10 +++++++--- packages/grpc-js/src/channel.ts | 3 ++- packages/grpc-js/src/picker.ts | 12 ++++++++++++ packages/grpc-js/src/subchannel.ts | 5 +++-- 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index ca3049e7..3a853315 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -19,8 +19,8 @@ import * as http2 from 'http2'; import { CallCredentials } from './call-credentials'; import { Status } from './constants'; -import { Filter } from './filter'; -import { FilterStackFactory } from './filter-stack'; +import { Filter, FilterFactory } from './filter'; +import { FilterStackFactory, FilterStack } from './filter-stack'; import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; import { ChannelImplementation } from './channel'; @@ -407,8 +407,12 @@ export class Http2CallStream implements Call { attachHttp2Stream( stream: http2.ClientHttp2Stream, - subchannel: Subchannel + subchannel: Subchannel, + extraFilterFactory?: FilterFactory ): void { + if (extraFilterFactory !== undefined) { + this.filterStack = new FilterStack([this.filterStack, extraFilterFactory.createFilter(this)]); + } if (this.finalStatus !== null) { stream.close(NGHTTP2_CANCEL); } else { diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index f2a7bc09..f4df2fce 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -300,7 +300,8 @@ export class ChannelImplementation implements Channel { try { pickResult.subchannel!.startCallStream( finalMetadata, - callStream + callStream, + pickResult.extraFilterFactory ?? undefined ); } catch (error) { if ( diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index d908f026..9470f122 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -20,6 +20,7 @@ import { StatusObject } from './call-stream'; import { Metadata } from './metadata'; import { Status } from './constants'; import { LoadBalancer } from './load-balancer'; +import { FilterFactory, Filter } from './filter'; export enum PickResultType { COMPLETE, @@ -40,24 +41,33 @@ export interface PickResult { * `pickResultType` is TRANSIENT_FAILURE. */ status: StatusObject | null; + /** + * Extra FilterFactory (can be multiple encapsulated in a FilterStackFactory) + * provided by the load balancer to be used with the call. For technical + * reasons filters from this factory will not see sendMetadata events. + */ + extraFilterFactory: FilterFactory | null; } export interface CompletePickResult extends PickResult { pickResultType: PickResultType.COMPLETE; subchannel: Subchannel | null; status: null; + extraFilterFactory: FilterFactory | null; } export interface QueuePickResult extends PickResult { pickResultType: PickResultType.QUEUE; subchannel: null; status: null; + extraFilterFactory: null; } export interface TransientFailurePickResult extends PickResult { pickResultType: PickResultType.TRANSIENT_FAILURE; subchannel: null; status: StatusObject; + extraFilterFactory: null; } export interface PickArgs { @@ -95,6 +105,7 @@ export class UnavailablePicker implements Picker { pickResultType: PickResultType.TRANSIENT_FAILURE, subchannel: null, status: this.status, + extraFilterFactory: null }; } } @@ -122,6 +133,7 @@ export class QueuePicker { pickResultType: PickResultType.QUEUE, subchannel: null, status: null, + extraFilterFactory: null }; } } diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 2af7885e..713f0d9a 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -30,6 +30,7 @@ import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; import * as net from 'net'; import { GrpcUri } from './uri-parser'; import { ConnectionOptions } from 'tls'; +import { FilterFactory, Filter } from './filter'; const clientVersion = require('../../package.json').version; @@ -619,7 +620,7 @@ export class Subchannel { * @param metadata * @param callStream */ - startCallStream(metadata: Metadata, callStream: Http2CallStream) { + startCallStream(metadata: Metadata, callStream: Http2CallStream, extraFilterFactory?: FilterFactory) { const headers = metadata.toHttp2Headers(); headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; @@ -633,7 +634,7 @@ export class Subchannel { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } trace('Starting stream with headers\n' + headersString); - callStream.attachHttp2Stream(http2Stream, this); + callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory); } /**