diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index fd015aff..6f614233 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -235,8 +235,8 @@ export class ChannelImplementation implements Channel { updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.currentPicker = picker; const queueCopy = this.pickQueue.slice(); - this.callRefTimer.unref?.(); this.pickQueue = []; + this.callRefTimerUnref(); for (const { callStream, callMetadata, callConfig } of queueCopy) { this.tryPick(callStream, callMetadata, callConfig); } @@ -258,11 +258,30 @@ export class ChannelImplementation implements Channel { /* We process the queue asynchronously to ensure that the corresponding * load balancer update has completed. */ process.nextTick(() => { - for (const {callStream, callMetadata} of this.configSelectionQueue) { + const localQueue = this.configSelectionQueue; + this.configSelectionQueue = []; + this.callRefTimerUnref() + for (const {callStream, callMetadata} of localQueue) { this.tryGetConfig(callStream, callMetadata); } this.configSelectionQueue = []; }); + }, + (status) => { + if (this.configSelectionQueue.length > 0) { + trace(LogVerbosity.DEBUG, 'channel', 'Name resolution failed for target ' + uriToString(this.target) + ' with calls queued for config selection'); + } + const localQueue = this.configSelectionQueue; + this.configSelectionQueue = []; + this.callRefTimerUnref(); + for (const {callStream, callMetadata} of localQueue) { + if (callMetadata.getOptions().waitForReady) { + this.callRefTimerRef(); + this.configSelectionQueue.push({callStream, callMetadata}); + } else { + callStream.cancelWithStatus(status.code, status.details); + } + } } ); this.filterStackFactory = new FilterStackFactory([ @@ -273,9 +292,23 @@ export class ChannelImplementation implements Channel { ]); } + private callRefTimerRef() { + if (!this.callRefTimer.hasRef()) { + trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.ref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); + this.callRefTimer.ref?.(); + } + } + + private callRefTimerUnref() { + if (this.callRefTimer.hasRef()) { + trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.unref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); + this.callRefTimer.unref?.(); + } + } + private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) { - this.callRefTimer.ref?.(); this.pickQueue.push({ callStream, callMetadata, callConfig }); + this.callRefTimerRef(); } /** @@ -480,11 +513,11 @@ export class ChannelImplementation implements Channel { * ResolvingLoadBalancer may be idle and if so it needs to be kicked * because it now has a pending request. */ this.resolvingLoadBalancer.exitIdle(); - this.callRefTimer.ref?.(); this.configSelectionQueue.push({ callStream: stream, callMetadata: metadata }); + this.callRefTimerRef(); } else { const callConfig = this.configSelector(stream.getMethod(), metadata); if (callConfig.status === Status.OK) { diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 7cf7e36d..84fe4ae1 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -76,6 +76,10 @@ export interface ResolutionCallback { (configSelector: ConfigSelector): void; } +export interface ResolutionFailureCallback { + (status: StatusObject): void; +} + export class ResolvingLoadBalancer implements LoadBalancer { /** * The resolver class constructed for the target address. @@ -124,7 +128,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { private readonly target: GrpcUri, private readonly channelControlHelper: ChannelControlHelper, private readonly channelOptions: ChannelOptions, - private readonly onSuccessfulResolution: ResolutionCallback + private readonly onSuccessfulResolution: ResolutionCallback, + private readonly onFailedResolution: ResolutionFailureCallback ) { if (channelOptions['grpc.service_config']) { this.defaultServiceConfig = validateServiceConfig( @@ -261,6 +266,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker(error) ); + this.onFailedResolution(error); } this.backoffTimeout.runOnce(); }