grpc-js: Add ConfigSelector to Resolver API and plumb it through the channel

This commit is contained in:
Michael Lumish 2021-02-01 14:18:24 -08:00
parent 3a094f0171
commit 8e5f5bc18a
7 changed files with 113 additions and 19 deletions

View File

@ -60,7 +60,7 @@ class XdsResolver implements Resolver {
onValidUpdate: (update: ServiceConfig) => {
trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update));
this.hasReportedSuccess = true;
this.listener.onSuccessfulResolution([], update, null, {
this.listener.onSuccessfulResolution([], update, null, null, {
xdsClient: xdsClient,
});
},

View File

@ -33,7 +33,7 @@ import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
import { CompressionFilterFactory } from './compression-filter';
import { getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import { CallConfig, ConfigSelector, getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import { trace, log } from './logging';
import { SubchannelAddress } from './subchannel';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
@ -136,9 +136,18 @@ export class ChannelImplementation implements Channel {
private subchannelPool: SubchannelPool;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private currentPicker: Picker = new UnavailablePicker();
/**
* Calls queued up to get a call config. Should only be populated before the
* first time the resolver returns a result, which includes the ConfigSelector.
*/
private configSelectionQueue: Array<{
callStream: Http2CallStream;
callMetadata: Metadata;
}> = [];
private pickQueue: Array<{
callStream: Http2CallStream;
callMetadata: Metadata;
callConfig: CallConfig;
}> = [];
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
private defaultAuthority: string;
@ -152,6 +161,7 @@ export class ChannelImplementation implements Channel {
* is non-empty.
*/
private callRefTimer: NodeJS.Timer;
private configSelector: ConfigSelector | null = null;
constructor(
target: string,
private readonly credentials: ChannelCredentials,
@ -227,8 +237,8 @@ export class ChannelImplementation implements Channel {
const queueCopy = this.pickQueue.slice();
this.callRefTimer.unref?.();
this.pickQueue = [];
for (const { callStream, callMetadata } of queueCopy) {
this.tryPick(callStream, callMetadata);
for (const { callStream, callMetadata, callConfig } of queueCopy) {
this.tryPick(callStream, callMetadata, callConfig);
}
this.updateState(connectivityState);
},
@ -242,7 +252,17 @@ export class ChannelImplementation implements Channel {
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
this.target,
channelControlHelper,
options
options,
(configSelector) => {
this.configSelector = configSelector;
/* We process the queue asynchronously to ensure that the corresponding
* load balancer update has completed. */
process.nextTick(() => {
for (const {callStream, callMetadata} of this.configSelectionQueue) {
this.tryGetConfig(callStream, callMetadata);
}
});
}
);
this.filterStackFactory = new FilterStackFactory([
new CallCredentialsFilterFactory(this),
@ -252,9 +272,9 @@ export class ChannelImplementation implements Channel {
]);
}
private pushPick(callStream: Http2CallStream, callMetadata: Metadata) {
private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) {
this.callRefTimer.ref?.();
this.pickQueue.push({ callStream, callMetadata });
this.pickQueue.push({ callStream, callMetadata, callConfig });
}
/**
@ -264,8 +284,8 @@ export class ChannelImplementation implements Channel {
* @param callStream
* @param callMetadata
*/
private tryPick(callStream: Http2CallStream, callMetadata: Metadata) {
const pickResult = this.currentPicker.pick({ metadata: callMetadata });
private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) {
const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation });
trace(
LogVerbosity.DEBUG,
'channel',
@ -301,7 +321,7 @@ export class ChannelImplementation implements Channel {
' has state ' +
ConnectivityState[pickResult.subchannel!.getConnectivityState()]
);
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
break;
}
/* We need to clone the callMetadata here because the transparent
@ -321,6 +341,7 @@ export class ChannelImplementation implements Channel {
);
/* If we reach this point, the call stream has started
* successfully */
callConfig.onCommitted?.();
pickResult.onCallStarted?.();
} catch (error) {
if (
@ -349,7 +370,7 @@ export class ChannelImplementation implements Channel {
(error as Error).message +
'. Retrying pick'
);
this.tryPick(callStream, callMetadata);
this.tryPick(callStream, callMetadata, callConfig);
} else {
trace(
LogVerbosity.INFO,
@ -378,7 +399,7 @@ export class ChannelImplementation implements Channel {
ConnectivityState[subchannelState] +
' after metadata filters. Retrying pick'
);
this.tryPick(callStream, callMetadata);
this.tryPick(callStream, callMetadata, callConfig);
}
},
(error: Error & { code: number }) => {
@ -392,11 +413,11 @@ export class ChannelImplementation implements Channel {
}
break;
case PickResultType.QUEUE:
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
break;
case PickResultType.TRANSIENT_FAILURE:
if (callMetadata.getOptions().waitForReady) {
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
} else {
callStream.cancelWithStatus(
pickResult.status!.code,
@ -451,8 +472,25 @@ export class ChannelImplementation implements Channel {
}
}
private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
if (this.configSelector === null) {
this.callRefTimer.ref?.();
this.configSelectionQueue.push({
callStream: stream,
callMetadata: metadata
});
} else {
const callConfig = this.configSelector(stream.getMethod(), metadata);
if (callConfig.status === Status.OK) {
this.tryPick(stream, metadata, callConfig);
} else {
stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod());
}
}
}
_startCallStream(stream: Http2CallStream, metadata: Metadata) {
this.tryPick(stream, metadata.clone());
this.tryGetConfig(stream, metadata.clone());
}
close() {

View File

@ -85,6 +85,7 @@ export interface DropCallPickResult extends PickResult {
export interface PickArgs {
metadata: Metadata;
extraPickInfo: {[key: string]: string};
}
/**

View File

@ -129,7 +129,7 @@ class DnsResolver implements Resolver {
if (this.ipResult !== null) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.listener.onSuccessfulResolution(this.ipResult!, null, null, {});
this.listener.onSuccessfulResolution(this.ipResult!, null, null, null, {});
});
return;
}
@ -192,6 +192,7 @@ class DnsResolver implements Resolver {
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
);
},
@ -237,6 +238,7 @@ class DnsResolver implements Resolver {
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
);
}

View File

@ -40,6 +40,7 @@ class UdsResolver implements Resolver {
this.addresses,
null,
null,
null,
{}
);
}

View File

@ -15,13 +15,30 @@
*
*/
import { ServiceConfig } from './service-config';
import { MethodConfig, ServiceConfig } from './service-config';
import * as resolver_dns from './resolver-dns';
import * as resolver_uds from './resolver-uds';
import { StatusObject } from './call-stream';
import { SubchannelAddress } from './subchannel';
import { GrpcUri, uriToString } from './uri-parser';
import { ChannelOptions } from './channel-options';
import { Metadata } from './metadata';
import { Status } from './constants';
export interface CallConfig {
methodConfig: MethodConfig;
onCommitted?: () => void;
pickInformation: {[key: string]: string};
status: Status;
}
/**
* Selects a configuration for a method given the name and metadata. Defined in
* https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc
*/
export interface ConfigSelector {
(methodName: string, metadata: Metadata): CallConfig;
}
/**
* A listener object passed to the resolver's constructor that provides name
@ -41,6 +58,7 @@ export interface ResolverListener {
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
): void;
/**

View File

@ -23,7 +23,7 @@ import {
} from './load-balancer';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { ConnectivityState } from './channel';
import { createResolver, Resolver } from './resolver';
import { ConfigSelector, createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
import { Picker, UnavailablePicker, QueuePicker } from './picker';
import { BackoffTimeout } from './backoff-timeout';
@ -46,6 +46,36 @@ function trace(text: string): void {
const DEFAULT_LOAD_BALANCER_NAME = 'pick_first';
function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSelector {
return function defaultConfigSelector(methodName: string, metadata: Metadata) {
const splitName = methodName.split('/').filter(x => x.length > 0);
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
for (const methodConfig of serviceConfig.methodConfig) {
for (const name of methodConfig.name) {
if (name.service === service && (name.method === undefined || name.method === method)) {
return {
methodConfig: methodConfig,
pickInformation: {},
status: Status.OK
};
}
}
}
}
return {
methodConfig: {name: []},
pickInformation: {},
status: Status.OK
};
}
}
export interface ResolutionCallback {
(configSelector: ConfigSelector): void;
}
export class ResolvingLoadBalancer implements LoadBalancer {
/**
* The resolver class constructed for the target address.
@ -93,7 +123,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
constructor(
private readonly target: GrpcUri,
private readonly channelControlHelper: ChannelControlHelper,
private readonly channelOptions: ChannelOptions
private readonly channelOptions: ChannelOptions,
private readonly onSuccessfulResolution: ResolutionCallback
) {
if (channelOptions['grpc.service_config']) {
this.defaultServiceConfig = validateServiceConfig(
@ -134,6 +165,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
) => {
let workingServiceConfig: ServiceConfig | null = null;
@ -180,6 +212,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
loadBalancingConfig,
attributes
);
const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(configSelector ?? getDefaultConfigSelector(finalServiceConfig));
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);