diff --git a/packages/grpc-js-xds/interop/xds-interop-client.ts b/packages/grpc-js-xds/interop/xds-interop-client.ts index f6beccbf..a414ffe7 100644 --- a/packages/grpc-js-xds/interop/xds-interop-client.ts +++ b/packages/grpc-js-xds/interop/xds-interop-client.ts @@ -196,6 +196,18 @@ const currentConfig: ClientConfiguration = { let anyCallSucceeded = false; const accumulatedStats: LoadBalancerAccumulatedStatsResponse = { + num_rpcs_started_by_method: { + EMPTY_CALL: 0, + UNARY_CALL: 0 + }, + num_rpcs_succeeded_by_method: { + EMPTY_CALL: 0, + UNARY_CALL: 0 + }, + num_rpcs_failed_by_method: { + EMPTY_CALL: 0, + UNARY_CALL: 0 + }, stats_per_method: { EMPTY_CALL: { rpcs_started: 0, @@ -208,14 +220,28 @@ const accumulatedStats: LoadBalancerAccumulatedStatsResponse = { } }; +function addAccumulatedCallStarted(callName: string) { + accumulatedStats.stats_per_method![callName].rpcs_started! += 1; + accumulatedStats.num_rpcs_started_by_method![callName] += 1; +} + +function addAccumulatedCallEnded(callName: string, result: grpc.status) { + accumulatedStats.stats_per_method![callName].result![result] = (accumulatedStats.stats_per_method![callName].result![result] ?? 0) + 1; + if (result === grpc.status.OK) { + accumulatedStats.num_rpcs_succeeded_by_method![callName] += 1; + } else { + accumulatedStats.num_rpcs_failed_by_method![callName] += 1; + } +} + const callTimeHistogram: {[callType: string]: {[status: number]: number[]}} = { UnaryCall: {}, EmptyCall: {} } function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) { - const callTypeStats = accumulatedStats.stats_per_method![callTypeEnumMapReverse[type]]; - callTypeStats.rpcs_started! += 1; + const callEnumName = callTypeEnumMapReverse[type]; + addAccumulatedCallStarted(callEnumName); const notifier = callStatsTracker.startCall(); let gotMetadata: boolean = false; let hostname: string | null = null; @@ -235,7 +261,7 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail } else { callTimeHistogram[type][statusCode][duration[0]] = 1; } - callTypeStats.result![statusCode] = (callTypeStats.result![statusCode] ?? 0) + 1; + addAccumulatedCallEnded(callEnumName, statusCode); if (error) { if (failOnFailedRpcs && anyCallSucceeded) { console.error('A call failed after a call succeeded'); diff --git a/packages/grpc-js-xds/src/load-balancer-cds.ts b/packages/grpc-js-xds/src/load-balancer-cds.ts index 6c57a410..4829edd4 100644 --- a/packages/grpc-js-xds/src/load-balancer-cds.ts +++ b/packages/grpc-js-xds/src/load-balancer-cds.ts @@ -80,11 +80,17 @@ export class CdsLoadBalancer implements LoadBalancer { this.watcher = { onValidUpdate: (update) => { this.latestCdsUpdate = update; + let maxConcurrentRequests: number | undefined = undefined; + for (const threshold of update.circuit_breakers?.thresholds ?? []) { + if (threshold.priority === 'DEFAULT') { + maxConcurrentRequests = threshold.max_requests?.value; + } + } /* the lrs_server.self field indicates that the same server should be * used for load reporting as for other xDS operations. Setting * lrsLoadReportingServerName to the empty string sets that behavior. * Otherwise, if the field is omitted, load reporting is disabled. */ - const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(update.name, [], [], update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name, update.lrs_server?.self ? '' : undefined); + const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(update.name, [], [], update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name, update.lrs_server?.self ? '' : undefined, maxConcurrentRequests); trace('Child update EDS config: ' + JSON.stringify(edsConfig)); this.childBalancer.updateAddressList( [], diff --git a/packages/grpc-js-xds/src/load-balancer-eds.ts b/packages/grpc-js-xds/src/load-balancer-eds.ts index 76bdd665..183e0d67 100644 --- a/packages/grpc-js-xds/src/load-balancer-eds.ts +++ b/packages/grpc-js-xds/src/load-balancer-eds.ts @@ -37,6 +37,7 @@ import { Watcher } from './xds-stream-state/xds-stream-state'; import Filter = experimental.Filter; import BaseFilter = experimental.BaseFilter; import FilterFactory = experimental.FilterFactory; +import FilterStackFactory = experimental.FilterStackFactory; import CallStream = experimental.CallStream; const TRACER_NAME = 'eds_balancer'; @@ -204,27 +205,42 @@ export class EdsLoadBalancer implements LoadBalancer { * Otherwise, delegate picking the subchannel to the child * balancer. */ if (dropCategory === null) { - return originalPicker.pick(pickArgs); + const originalPick = originalPicker.pick(pickArgs); + let extraFilterFactory: FilterFactory = new CallTrackingFilterFactory(() => { + this.concurrentRequests -= 1; + }); + if (originalPick.extraFilterFactory) { + extraFilterFactory = new FilterStackFactory([originalPick.extraFilterFactory, extraFilterFactory]); + } + return { + pickResultType: originalPick.pickResultType, + status: originalPick.status, + subchannel: originalPick.subchannel, + onCallStarted: () => { + originalPick.onCallStarted?.(); + this.concurrentRequests += 1; + }, + extraFilterFactory: extraFilterFactory + }; } else { + let details: string; if (dropCategory === true) { + details = 'Call dropped by load balancing policy.'; this.clusterDropStats?.addUncategorizedCallDropped(); } else { + details = `Call dropped by load balancing policy. Category: ${dropCategory}`; this.clusterDropStats?.addCallDropped(dropCategory); } return { pickResultType: PickResultType.DROP, status: { code: Status.UNAVAILABLE, - details: `Call dropped by load balancing policy. Category: ${dropCategory}`, + details: details, metadata: new Metadata(), }, subchannel: null, - extraFilterFactory: new CallTrackingFilterFactory(() => { - this.concurrentRequests -= 1; - }), - onCallStarted: () => { - this.concurrentRequests += 1; - }, + extraFilterFactory: null, + onCallStarted: null }; } }, @@ -265,15 +281,12 @@ export class EdsLoadBalancer implements LoadBalancer { * output, as a sentinel value indicating a drop with no category. */ private checkForDrop(): string | true | null { + if (this.lastestConfig && this.concurrentRequests >= this.lastestConfig.getMaxConcurrentRequests()) { + return true; + } if (!this.latestEdsUpdate?.policy) { return null; } - if (!this.lastestConfig) { - return null; - } - if (this.concurrentRequests >= this.lastestConfig.getMaxConcurrentRequests()) { - return true; - } /* The drop_overloads policy is a list of pairs of category names and * probabilities. For each one, if the random number is within that * probability range, we drop the call citing that category. Otherwise, the