Merge pull request #1397 from murgatroid99/grpc-js_pre_xds_internal_changes

grpc-js: Some internal changes in preparation for implementing xDS Global LB
This commit is contained in:
Michael Lumish 2020-04-27 14:52:42 -07:00 committed by GitHub
commit 440d985f1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 70 additions and 24 deletions

View File

@ -19,8 +19,8 @@ import * as http2 from 'http2';
import { CallCredentials } from './call-credentials';
import { Status } from './constants';
import { Filter } from './filter';
import { FilterStackFactory } from './filter-stack';
import { Filter, FilterFactory } from './filter';
import { FilterStackFactory, FilterStack } from './filter-stack';
import { Metadata } from './metadata';
import { StreamDecoder } from './stream-decoder';
import { ChannelImplementation } from './channel';
@ -407,8 +407,15 @@ export class Http2CallStream implements Call {
attachHttp2Stream(
stream: http2.ClientHttp2Stream,
subchannel: Subchannel
subchannel: Subchannel,
extraFilterFactory?: FilterFactory<Filter>
): void {
if (extraFilterFactory !== undefined) {
this.filterStack = new FilterStack([
this.filterStack,
extraFilterFactory.createFilter(this),
]);
}
if (this.finalStatus !== null) {
stream.close(NGHTTP2_CANCEL);
} else {

View File

@ -174,7 +174,9 @@ export class ChannelImplementation implements Channel {
* resolver */
const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
if (defaultSchemeMapResult === null) {
throw new Error(`Could not find a default scheme for target name "${target}"`);
throw new Error(
`Could not find a default scheme for target name "${target}"`
);
}
if (this.options['grpc.default_authority']) {
this.defaultAuthority = this.options['grpc.default_authority'] as string;
@ -300,8 +302,12 @@ export class ChannelImplementation implements Channel {
try {
pickResult.subchannel!.startCallStream(
finalMetadata,
callStream
callStream,
pickResult.extraFilterFactory ?? undefined
);
/* If we reach this point, the call stream has started
* successfully */
pickResult.onCallStarted?.();
} catch (error) {
if (
(error as NodeJS.ErrnoException).code ===

View File

@ -65,6 +65,8 @@ class PickFirstPicker implements Picker {
pickResultType: PickResultType.COMPLETE,
subchannel: this.subchannel,
status: null,
extraFilterFactory: null,
onCallStarted: null,
};
}
}

View File

@ -60,6 +60,8 @@ class RoundRobinPicker implements Picker {
pickResultType: PickResultType.COMPLETE,
subchannel: pickedSubchannel,
status: null,
extraFilterFactory: null,
onCallStarted: null,
};
}

View File

@ -67,7 +67,8 @@ export interface LoadBalancer {
*/
updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig | null
lbConfig: LoadBalancingConfig | null,
attributes: { [key: string]: unknown }
): void;
/**
* If the load balancer is currently in the IDLE state, start connecting.

View File

@ -20,6 +20,7 @@ import { StatusObject } from './call-stream';
import { Metadata } from './metadata';
import { Status } from './constants';
import { LoadBalancer } from './load-balancer';
import { FilterFactory, Filter } from './filter';
export enum PickResultType {
COMPLETE,
@ -40,24 +41,37 @@ export interface PickResult {
* `pickResultType` is TRANSIENT_FAILURE.
*/
status: StatusObject | null;
/**
* Extra FilterFactory (can be multiple encapsulated in a FilterStackFactory)
* provided by the load balancer to be used with the call. For technical
* reasons filters from this factory will not see sendMetadata events.
*/
extraFilterFactory: FilterFactory<Filter> | null;
onCallStarted: (() => void) | null;
}
export interface CompletePickResult extends PickResult {
pickResultType: PickResultType.COMPLETE;
subchannel: Subchannel | null;
status: null;
extraFilterFactory: FilterFactory<Filter> | null;
onCallStarted: (() => void) | null;
}
export interface QueuePickResult extends PickResult {
pickResultType: PickResultType.QUEUE;
subchannel: null;
status: null;
extraFilterFactory: null;
onCallStarted: null;
}
export interface TransientFailurePickResult extends PickResult {
pickResultType: PickResultType.TRANSIENT_FAILURE;
subchannel: null;
status: StatusObject;
extraFilterFactory: null;
onCallStarted: null;
}
export interface PickArgs {
@ -95,6 +109,8 @@ export class UnavailablePicker implements Picker {
pickResultType: PickResultType.TRANSIENT_FAILURE,
subchannel: null,
status: this.status,
extraFilterFactory: null,
onCallStarted: null,
};
}
}
@ -122,6 +138,8 @@ export class QueuePicker {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
extraFilterFactory: null,
onCallStarted: null,
};
}
}

View File

@ -124,7 +124,7 @@ class DnsResolver implements Resolver {
if (this.ipResult !== null) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.listener.onSuccessfulResolution(this.ipResult!, null, null);
this.listener.onSuccessfulResolution(this.ipResult!, null, null, {});
});
return;
}
@ -186,7 +186,8 @@ class DnsResolver implements Resolver {
this.listener.onSuccessfulResolution(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError
this.latestServiceConfigError,
{}
);
},
(err) => {
@ -230,7 +231,8 @@ class DnsResolver implements Resolver {
this.listener.onSuccessfulResolution(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError
this.latestServiceConfigError,
{}
);
}
},
@ -244,7 +246,8 @@ class DnsResolver implements Resolver {
this.listener.onSuccessfulResolution(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError
this.latestServiceConfigError,
{}
);
}
}

View File

@ -14,11 +14,7 @@
* limitations under the License.
*/
import {
Resolver,
ResolverListener,
registerResolver,
} from './resolver';
import { Resolver, ResolverListener, registerResolver } from './resolver';
import { SubchannelAddress } from './subchannel';
import { GrpcUri } from './uri-parser';
@ -38,7 +34,8 @@ class UdsResolver implements Resolver {
this.listener.onSuccessfulResolution,
this.addresses,
null,
null
null,
{}
);
}

View File

@ -39,7 +39,8 @@ export interface ResolverListener {
onSuccessfulResolution(
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
serviceConfigError: StatusObject | null,
attributes: { [key: string]: unknown }
): void;
/**
* Called whenever a name resolution attempt fails.
@ -137,7 +138,7 @@ export function mapUriDefaultScheme(target: GrpcUri): GrpcUri | null {
return {
scheme: defaultScheme,
authority: undefined,
path: uriToString(target)
path: uriToString(target),
};
} else {
return null;

View File

@ -136,7 +136,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null
serviceConfigError: ServiceError | null,
attributes: { [key: string]: unknown }
) => {
let workingServiceConfig: ServiceConfig | null = null;
/* This first group of conditionals implements the algorithm described
@ -211,12 +212,14 @@ export class ResolvingLoadBalancer implements LoadBalancer {
)!;
this.innerLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig
loadBalancingConfig,
attributes
);
} else if (this.innerLoadBalancer.getTypeName() === loadBalancerName) {
this.innerLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig
loadBalancingConfig,
attributes
);
} else {
if (
@ -234,7 +237,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
}
this.pendingReplacementLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig
loadBalancingConfig,
attributes
);
}
},

View File

@ -30,6 +30,7 @@ import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
import * as net from 'net';
import { GrpcUri } from './uri-parser';
import { ConnectionOptions } from 'tls';
import { FilterFactory, Filter } from './filter';
const clientVersion = require('../../package.json').version;
@ -619,7 +620,11 @@ export class Subchannel {
* @param metadata
* @param callStream
*/
startCallStream(metadata: Metadata, callStream: Http2CallStream) {
startCallStream(
metadata: Metadata,
callStream: Http2CallStream,
extraFilterFactory?: FilterFactory<Filter>
) {
const headers = metadata.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
@ -633,7 +638,7 @@ export class Subchannel {
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
}
trace('Starting stream with headers\n' + headersString);
callStream.attachHttp2Stream(http2Stream, this);
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory);
}
/**