From 8e5f5bc18a0f7d8fac4b85e33ddd49c0f5efa3ff Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 1 Feb 2021 14:18:24 -0800 Subject: [PATCH] grpc-js: Add ConfigSelector to Resolver API and plumb it through the channel --- packages/grpc-js-xds/src/resolver-xds.ts | 2 +- packages/grpc-js/src/channel.ts | 66 +++++++++++++++---- packages/grpc-js/src/picker.ts | 1 + packages/grpc-js/src/resolver-dns.ts | 4 +- packages/grpc-js/src/resolver-uds.ts | 1 + packages/grpc-js/src/resolver.ts | 20 +++++- .../grpc-js/src/resolving-load-balancer.ts | 38 ++++++++++- 7 files changed, 113 insertions(+), 19 deletions(-) diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 814294c8..e1a0af11 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -60,7 +60,7 @@ class XdsResolver implements Resolver { onValidUpdate: (update: ServiceConfig) => { trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update)); this.hasReportedSuccess = true; - this.listener.onSuccessfulResolution([], update, null, { + this.listener.onSuccessfulResolution([], update, null, null, { xdsClient: xdsClient, }); }, diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index e1a76c09..7c61aa1f 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -33,7 +33,7 @@ import { FilterStackFactory } from './filter-stack'; import { CallCredentialsFilterFactory } from './call-credentials-filter'; import { DeadlineFilterFactory } from './deadline-filter'; import { CompressionFilterFactory } from './compression-filter'; -import { getDefaultAuthority, mapUriDefaultScheme } from './resolver'; +import { CallConfig, ConfigSelector, getDefaultAuthority, mapUriDefaultScheme } from './resolver'; import { trace, log } from './logging'; import { SubchannelAddress } from './subchannel'; import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; @@ -136,9 +136,18 @@ export class ChannelImplementation implements Channel { private subchannelPool: SubchannelPool; private connectivityState: ConnectivityState = ConnectivityState.IDLE; private currentPicker: Picker = new UnavailablePicker(); + /** + * Calls queued up to get a call config. Should only be populated before the + * first time the resolver returns a result, which includes the ConfigSelector. + */ + private configSelectionQueue: Array<{ + callStream: Http2CallStream; + callMetadata: Metadata; + }> = []; private pickQueue: Array<{ callStream: Http2CallStream; callMetadata: Metadata; + callConfig: CallConfig; }> = []; private connectivityStateWatchers: ConnectivityStateWatcher[] = []; private defaultAuthority: string; @@ -152,6 +161,7 @@ export class ChannelImplementation implements Channel { * is non-empty. */ private callRefTimer: NodeJS.Timer; + private configSelector: ConfigSelector | null = null; constructor( target: string, private readonly credentials: ChannelCredentials, @@ -227,8 +237,8 @@ export class ChannelImplementation implements Channel { const queueCopy = this.pickQueue.slice(); this.callRefTimer.unref?.(); this.pickQueue = []; - for (const { callStream, callMetadata } of queueCopy) { - this.tryPick(callStream, callMetadata); + for (const { callStream, callMetadata, callConfig } of queueCopy) { + this.tryPick(callStream, callMetadata, callConfig); } this.updateState(connectivityState); }, @@ -242,7 +252,17 @@ export class ChannelImplementation implements Channel { this.resolvingLoadBalancer = new ResolvingLoadBalancer( this.target, channelControlHelper, - options + options, + (configSelector) => { + this.configSelector = configSelector; + /* We process the queue asynchronously to ensure that the corresponding + * load balancer update has completed. */ + process.nextTick(() => { + for (const {callStream, callMetadata} of this.configSelectionQueue) { + this.tryGetConfig(callStream, callMetadata); + } + }); + } ); this.filterStackFactory = new FilterStackFactory([ new CallCredentialsFilterFactory(this), @@ -252,9 +272,9 @@ export class ChannelImplementation implements Channel { ]); } - private pushPick(callStream: Http2CallStream, callMetadata: Metadata) { + private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) { this.callRefTimer.ref?.(); - this.pickQueue.push({ callStream, callMetadata }); + this.pickQueue.push({ callStream, callMetadata, callConfig }); } /** @@ -264,8 +284,8 @@ export class ChannelImplementation implements Channel { * @param callStream * @param callMetadata */ - private tryPick(callStream: Http2CallStream, callMetadata: Metadata) { - const pickResult = this.currentPicker.pick({ metadata: callMetadata }); + private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) { + const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation }); trace( LogVerbosity.DEBUG, 'channel', @@ -301,7 +321,7 @@ export class ChannelImplementation implements Channel { ' has state ' + ConnectivityState[pickResult.subchannel!.getConnectivityState()] ); - this.pushPick(callStream, callMetadata); + this.pushPick(callStream, callMetadata, callConfig); break; } /* We need to clone the callMetadata here because the transparent @@ -321,6 +341,7 @@ export class ChannelImplementation implements Channel { ); /* If we reach this point, the call stream has started * successfully */ + callConfig.onCommitted?.(); pickResult.onCallStarted?.(); } catch (error) { if ( @@ -349,7 +370,7 @@ export class ChannelImplementation implements Channel { (error as Error).message + '. Retrying pick' ); - this.tryPick(callStream, callMetadata); + this.tryPick(callStream, callMetadata, callConfig); } else { trace( LogVerbosity.INFO, @@ -378,7 +399,7 @@ export class ChannelImplementation implements Channel { ConnectivityState[subchannelState] + ' after metadata filters. Retrying pick' ); - this.tryPick(callStream, callMetadata); + this.tryPick(callStream, callMetadata, callConfig); } }, (error: Error & { code: number }) => { @@ -392,11 +413,11 @@ export class ChannelImplementation implements Channel { } break; case PickResultType.QUEUE: - this.pushPick(callStream, callMetadata); + this.pushPick(callStream, callMetadata, callConfig); break; case PickResultType.TRANSIENT_FAILURE: if (callMetadata.getOptions().waitForReady) { - this.pushPick(callStream, callMetadata); + this.pushPick(callStream, callMetadata, callConfig); } else { callStream.cancelWithStatus( pickResult.status!.code, @@ -451,8 +472,25 @@ export class ChannelImplementation implements Channel { } } + private tryGetConfig(stream: Http2CallStream, metadata: Metadata) { + if (this.configSelector === null) { + this.callRefTimer.ref?.(); + this.configSelectionQueue.push({ + callStream: stream, + callMetadata: metadata + }); + } else { + const callConfig = this.configSelector(stream.getMethod(), metadata); + if (callConfig.status === Status.OK) { + this.tryPick(stream, metadata, callConfig); + } else { + stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod()); + } + } + } + _startCallStream(stream: Http2CallStream, metadata: Metadata) { - this.tryPick(stream, metadata.clone()); + this.tryGetConfig(stream, metadata.clone()); } close() { diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index 184047b2..6df61b59 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -85,6 +85,7 @@ export interface DropCallPickResult extends PickResult { export interface PickArgs { metadata: Metadata; + extraPickInfo: {[key: string]: string}; } /** diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 2db8a5e4..96d78f8a 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -129,7 +129,7 @@ class DnsResolver implements Resolver { if (this.ipResult !== null) { trace('Returning IP address for target ' + uriToString(this.target)); setImmediate(() => { - this.listener.onSuccessfulResolution(this.ipResult!, null, null, {}); + this.listener.onSuccessfulResolution(this.ipResult!, null, null, null, {}); }); return; } @@ -192,6 +192,7 @@ class DnsResolver implements Resolver { this.latestLookupResult, this.latestServiceConfig, this.latestServiceConfigError, + null, {} ); }, @@ -237,6 +238,7 @@ class DnsResolver implements Resolver { this.latestLookupResult, this.latestServiceConfig, this.latestServiceConfigError, + null, {} ); } diff --git a/packages/grpc-js/src/resolver-uds.ts b/packages/grpc-js/src/resolver-uds.ts index 14bc0176..e7667c78 100644 --- a/packages/grpc-js/src/resolver-uds.ts +++ b/packages/grpc-js/src/resolver-uds.ts @@ -40,6 +40,7 @@ class UdsResolver implements Resolver { this.addresses, null, null, + null, {} ); } diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 57c750ae..e9176234 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -15,13 +15,30 @@ * */ -import { ServiceConfig } from './service-config'; +import { MethodConfig, ServiceConfig } from './service-config'; import * as resolver_dns from './resolver-dns'; import * as resolver_uds from './resolver-uds'; import { StatusObject } from './call-stream'; import { SubchannelAddress } from './subchannel'; import { GrpcUri, uriToString } from './uri-parser'; import { ChannelOptions } from './channel-options'; +import { Metadata } from './metadata'; +import { Status } from './constants'; + +export interface CallConfig { + methodConfig: MethodConfig; + onCommitted?: () => void; + pickInformation: {[key: string]: string}; + status: Status; +} + +/** + * Selects a configuration for a method given the name and metadata. Defined in + * https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc + */ +export interface ConfigSelector { + (methodName: string, metadata: Metadata): CallConfig; +} /** * A listener object passed to the resolver's constructor that provides name @@ -41,6 +58,7 @@ export interface ResolverListener { addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null, + configSelector: ConfigSelector | null, attributes: { [key: string]: unknown } ): void; /** diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 2ce59d0c..039a17b0 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -23,7 +23,7 @@ import { } from './load-balancer'; import { ServiceConfig, validateServiceConfig } from './service-config'; import { ConnectivityState } from './channel'; -import { createResolver, Resolver } from './resolver'; +import { ConfigSelector, createResolver, Resolver } from './resolver'; import { ServiceError } from './call'; import { Picker, UnavailablePicker, QueuePicker } from './picker'; import { BackoffTimeout } from './backoff-timeout'; @@ -46,6 +46,36 @@ function trace(text: string): void { const DEFAULT_LOAD_BALANCER_NAME = 'pick_first'; +function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSelector { + return function defaultConfigSelector(methodName: string, metadata: Metadata) { + const splitName = methodName.split('/').filter(x => x.length > 0); + const service = splitName[0] ?? ''; + const method = splitName[1] ?? ''; + if (serviceConfig && serviceConfig.methodConfig) { + for (const methodConfig of serviceConfig.methodConfig) { + for (const name of methodConfig.name) { + if (name.service === service && (name.method === undefined || name.method === method)) { + return { + methodConfig: methodConfig, + pickInformation: {}, + status: Status.OK + }; + } + } + } + } + return { + methodConfig: {name: []}, + pickInformation: {}, + status: Status.OK + }; + } +} + +export interface ResolutionCallback { + (configSelector: ConfigSelector): void; +} + export class ResolvingLoadBalancer implements LoadBalancer { /** * The resolver class constructed for the target address. @@ -93,7 +123,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { constructor( private readonly target: GrpcUri, private readonly channelControlHelper: ChannelControlHelper, - private readonly channelOptions: ChannelOptions + private readonly channelOptions: ChannelOptions, + private readonly onSuccessfulResolution: ResolutionCallback ) { if (channelOptions['grpc.service_config']) { this.defaultServiceConfig = validateServiceConfig( @@ -134,6 +165,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null, + configSelector: ConfigSelector | null, attributes: { [key: string]: unknown } ) => { let workingServiceConfig: ServiceConfig | null = null; @@ -180,6 +212,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { loadBalancingConfig, attributes ); + const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig; + this.onSuccessfulResolution(configSelector ?? getDefaultConfigSelector(finalServiceConfig)); }, onError: (error: StatusObject) => { this.handleResolutionFailure(error);