Plumb through an extra filter from the load balancer to the call stream

This commit is contained in:
Michael Lumish 2020-04-24 14:00:54 -07:00
parent 08dd114951
commit 3d4a27e6cc
4 changed files with 24 additions and 6 deletions

View File

@ -19,8 +19,8 @@ import * as http2 from 'http2';
import { CallCredentials } from './call-credentials';
import { Status } from './constants';
import { Filter } from './filter';
import { FilterStackFactory } from './filter-stack';
import { Filter, FilterFactory } from './filter';
import { FilterStackFactory, FilterStack } from './filter-stack';
import { Metadata } from './metadata';
import { StreamDecoder } from './stream-decoder';
import { ChannelImplementation } from './channel';
@ -407,8 +407,12 @@ export class Http2CallStream implements Call {
attachHttp2Stream(
stream: http2.ClientHttp2Stream,
subchannel: Subchannel
subchannel: Subchannel,
extraFilterFactory?: FilterFactory<Filter>
): void {
if (extraFilterFactory !== undefined) {
this.filterStack = new FilterStack([this.filterStack, extraFilterFactory.createFilter(this)]);
}
if (this.finalStatus !== null) {
stream.close(NGHTTP2_CANCEL);
} else {

View File

@ -300,7 +300,8 @@ export class ChannelImplementation implements Channel {
try {
pickResult.subchannel!.startCallStream(
finalMetadata,
callStream
callStream,
pickResult.extraFilterFactory ?? undefined
);
} catch (error) {
if (

View File

@ -20,6 +20,7 @@ import { StatusObject } from './call-stream';
import { Metadata } from './metadata';
import { Status } from './constants';
import { LoadBalancer } from './load-balancer';
import { FilterFactory, Filter } from './filter';
export enum PickResultType {
COMPLETE,
@ -40,24 +41,33 @@ export interface PickResult {
* `pickResultType` is TRANSIENT_FAILURE.
*/
status: StatusObject | null;
/**
* Extra FilterFactory (can be multiple encapsulated in a FilterStackFactory)
* provided by the load balancer to be used with the call. For technical
* reasons filters from this factory will not see sendMetadata events.
*/
extraFilterFactory: FilterFactory<Filter> | null;
}
export interface CompletePickResult extends PickResult {
pickResultType: PickResultType.COMPLETE;
subchannel: Subchannel | null;
status: null;
extraFilterFactory: FilterFactory<Filter> | null;
}
export interface QueuePickResult extends PickResult {
pickResultType: PickResultType.QUEUE;
subchannel: null;
status: null;
extraFilterFactory: null;
}
export interface TransientFailurePickResult extends PickResult {
pickResultType: PickResultType.TRANSIENT_FAILURE;
subchannel: null;
status: StatusObject;
extraFilterFactory: null;
}
export interface PickArgs {
@ -95,6 +105,7 @@ export class UnavailablePicker implements Picker {
pickResultType: PickResultType.TRANSIENT_FAILURE,
subchannel: null,
status: this.status,
extraFilterFactory: null
};
}
}
@ -122,6 +133,7 @@ export class QueuePicker {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
extraFilterFactory: null
};
}
}

View File

@ -30,6 +30,7 @@ import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
import * as net from 'net';
import { GrpcUri } from './uri-parser';
import { ConnectionOptions } from 'tls';
import { FilterFactory, Filter } from './filter';
const clientVersion = require('../../package.json').version;
@ -619,7 +620,7 @@ export class Subchannel {
* @param metadata
* @param callStream
*/
startCallStream(metadata: Metadata, callStream: Http2CallStream) {
startCallStream(metadata: Metadata, callStream: Http2CallStream, extraFilterFactory?: FilterFactory<Filter>) {
const headers = metadata.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
@ -633,7 +634,7 @@ export class Subchannel {
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
}
trace('Starting stream with headers\n' + headersString);
callStream.attachHttp2Stream(http2Stream, this);
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory);
}
/**