diff --git a/packages/grpc-js-xds/interop/xds-interop-client.ts b/packages/grpc-js-xds/interop/xds-interop-client.ts index f6beccbf..a414ffe7 100644 --- a/packages/grpc-js-xds/interop/xds-interop-client.ts +++ b/packages/grpc-js-xds/interop/xds-interop-client.ts @@ -196,6 +196,18 @@ const currentConfig: ClientConfiguration = { let anyCallSucceeded = false; const accumulatedStats: LoadBalancerAccumulatedStatsResponse = { + num_rpcs_started_by_method: { + EMPTY_CALL: 0, + UNARY_CALL: 0 + }, + num_rpcs_succeeded_by_method: { + EMPTY_CALL: 0, + UNARY_CALL: 0 + }, + num_rpcs_failed_by_method: { + EMPTY_CALL: 0, + UNARY_CALL: 0 + }, stats_per_method: { EMPTY_CALL: { rpcs_started: 0, @@ -208,14 +220,28 @@ const accumulatedStats: LoadBalancerAccumulatedStatsResponse = { } }; +function addAccumulatedCallStarted(callName: string) { + accumulatedStats.stats_per_method![callName].rpcs_started! += 1; + accumulatedStats.num_rpcs_started_by_method![callName] += 1; +} + +function addAccumulatedCallEnded(callName: string, result: grpc.status) { + accumulatedStats.stats_per_method![callName].result![result] = (accumulatedStats.stats_per_method![callName].result![result] ?? 0) + 1; + if (result === grpc.status.OK) { + accumulatedStats.num_rpcs_succeeded_by_method![callName] += 1; + } else { + accumulatedStats.num_rpcs_failed_by_method![callName] += 1; + } +} + const callTimeHistogram: {[callType: string]: {[status: number]: number[]}} = { UnaryCall: {}, EmptyCall: {} } function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) { - const callTypeStats = accumulatedStats.stats_per_method![callTypeEnumMapReverse[type]]; - callTypeStats.rpcs_started! += 1; + const callEnumName = callTypeEnumMapReverse[type]; + addAccumulatedCallStarted(callEnumName); const notifier = callStatsTracker.startCall(); let gotMetadata: boolean = false; let hostname: string | null = null; @@ -235,7 +261,7 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail } else { callTimeHistogram[type][statusCode][duration[0]] = 1; } - callTypeStats.result![statusCode] = (callTypeStats.result![statusCode] ?? 0) + 1; + addAccumulatedCallEnded(callEnumName, statusCode); if (error) { if (failOnFailedRpcs && anyCallSucceeded) { console.error('A call failed after a call succeeded'); diff --git a/packages/grpc-js-xds/scripts/xds.sh b/packages/grpc-js-xds/scripts/xds.sh index 68225fae..66fe4d56 100755 --- a/packages/grpc-js-xds/scripts/xds.sh +++ b/packages/grpc-js-xds/scripts/xds.sh @@ -52,7 +52,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh GRPC_NODE_VERBOSITY=DEBUG \ NODE_XDS_INTEROP_VERBOSITY=1 \ python3 grpc/tools/run_tests/run_xds_tests.py \ - --test_case="all,timeout" \ + --test_case="all,timeout,circuit_breaking" \ --project_id=grpc-testing \ --source_image=projects/grpc-testing/global/images/xds-test-server-4 \ --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ diff --git a/packages/grpc-js-xds/src/load-balancer-cds.ts b/packages/grpc-js-xds/src/load-balancer-cds.ts index 6c57a410..4829edd4 100644 --- a/packages/grpc-js-xds/src/load-balancer-cds.ts +++ b/packages/grpc-js-xds/src/load-balancer-cds.ts @@ -80,11 +80,17 @@ export class CdsLoadBalancer implements LoadBalancer { this.watcher = { onValidUpdate: (update) => { this.latestCdsUpdate = update; + let maxConcurrentRequests: number | undefined = undefined; + for (const threshold of update.circuit_breakers?.thresholds ?? []) { + if (threshold.priority === 'DEFAULT') { + maxConcurrentRequests = threshold.max_requests?.value; + } + } /* the lrs_server.self field indicates that the same server should be * used for load reporting as for other xDS operations. Setting * lrsLoadReportingServerName to the empty string sets that behavior. * Otherwise, if the field is omitted, load reporting is disabled. */ - const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(update.name, [], [], update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name, update.lrs_server?.self ? '' : undefined); + const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(update.name, [], [], update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name, update.lrs_server?.self ? '' : undefined, maxConcurrentRequests); trace('Child update EDS config: ' + JSON.stringify(edsConfig)); this.childBalancer.updateAddressList( [], diff --git a/packages/grpc-js-xds/src/load-balancer-eds.ts b/packages/grpc-js-xds/src/load-balancer-eds.ts index ae9a781b..183e0d67 100644 --- a/packages/grpc-js-xds/src/load-balancer-eds.ts +++ b/packages/grpc-js-xds/src/load-balancer-eds.ts @@ -15,7 +15,7 @@ * */ -import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental } from '@grpc/grpc-js'; +import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental, StatusObject } from '@grpc/grpc-js'; import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client'; import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment'; import { Locality__Output } from './generated/envoy/api/v2/core/Locality'; @@ -34,6 +34,11 @@ import { validateLoadBalancingConfig } from '@grpc/grpc-js/build/src/experimenta import { WeightedTarget, WeightedTargetLoadBalancingConfig } from './load-balancer-weighted-target'; import { LrsLoadBalancingConfig } from './load-balancer-lrs'; import { Watcher } from './xds-stream-state/xds-stream-state'; +import Filter = experimental.Filter; +import BaseFilter = experimental.BaseFilter; +import FilterFactory = experimental.FilterFactory; +import FilterStackFactory = experimental.FilterStackFactory; +import CallStream = experimental.CallStream; const TRACER_NAME = 'eds_balancer'; @@ -47,7 +52,10 @@ function localityToName(locality: Locality__Output) { return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`; } +const DEFAULT_MAX_CONCURRENT_REQUESTS = 1024; + export class EdsLoadBalancingConfig implements LoadBalancingConfig { + private maxConcurrentRequests: number; getLoadBalancerName(): string { return TYPE_NAME; } @@ -55,7 +63,8 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig { const jsonObj: {[key: string]: any} = { cluster: this.cluster, locality_picking_policy: this.localityPickingPolicy.map(policy => policy.toJsonObject()), - endpoint_picking_policy: this.endpointPickingPolicy.map(policy => policy.toJsonObject()) + endpoint_picking_policy: this.endpointPickingPolicy.map(policy => policy.toJsonObject()), + max_concurrent_requests: this.maxConcurrentRequests }; if (this.edsServiceName !== undefined) { jsonObj.eds_service_name = this.edsServiceName; @@ -68,8 +77,8 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig { }; } - constructor(private cluster: string, private localityPickingPolicy: LoadBalancingConfig[], private endpointPickingPolicy: LoadBalancingConfig[], private edsServiceName?: string, private lrsLoadReportingServerName?: string) { - + constructor(private cluster: string, private localityPickingPolicy: LoadBalancingConfig[], private endpointPickingPolicy: LoadBalancingConfig[], private edsServiceName?: string, private lrsLoadReportingServerName?: string, maxConcurrentRequests?: number) { + this.maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS; } getCluster() { @@ -92,6 +101,10 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig { return this.lrsLoadReportingServerName; } + getMaxConcurrentRequests() { + return this.maxConcurrentRequests; + } + static createFromJson(obj: any): EdsLoadBalancingConfig { if (!('cluster' in obj && typeof obj.cluster === 'string')) { throw new Error('eds config must have a string field cluster'); @@ -108,7 +121,28 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig { if ('lrs_load_reporting_server_name' in obj && (!obj.lrs_load_reporting_server_name === undefined || typeof obj.lrs_load_reporting_server_name === 'string')) { throw new Error('eds config lrs_load_reporting_server_name must be a string if provided'); } - return new EdsLoadBalancingConfig(obj.cluster, obj.locality_picking_policy.map(validateLoadBalancingConfig), obj.endpoint_picking_policy.map(validateLoadBalancingConfig), obj.eds_service_name, obj.lrs_load_reporting_server_name); + if ('max_concurrent_requests' in obj && (!obj.max_concurrent_requests === undefined || typeof obj.max_concurrent_requests === 'number')) { + throw new Error('eds config max_concurrent_requests must be a number if provided'); + } + return new EdsLoadBalancingConfig(obj.cluster, obj.locality_picking_policy.map(validateLoadBalancingConfig), obj.endpoint_picking_policy.map(validateLoadBalancingConfig), obj.eds_service_name, obj.lrs_load_reporting_server_name, obj.max_concurrent_requests); + } +} + +class CallEndTrackingFilter extends BaseFilter implements Filter { + constructor(private onCallEnd: () => void) { + super(); + } + receiveTrailers(status: StatusObject) { + this.onCallEnd(); + return status; + } +} + +class CallTrackingFilterFactory implements FilterFactory { + constructor(private onCallEnd: () => void) {} + + createFilter(callStream: CallStream) { + return new CallEndTrackingFilter(this.onCallEnd); } } @@ -149,6 +183,8 @@ export class EdsLoadBalancer implements LoadBalancer { private clusterDropStats: XdsClusterDropStats | null = null; + private concurrentRequests: number = 0; + constructor(private readonly channelControlHelper: ChannelControlHelper) { this.childBalancer = new ChildLoadBalancerHandler({ createSubchannel: (subchannelAddress, subchannelArgs) => @@ -169,19 +205,42 @@ export class EdsLoadBalancer implements LoadBalancer { * Otherwise, delegate picking the subchannel to the child * balancer. */ if (dropCategory === null) { - return originalPicker.pick(pickArgs); + const originalPick = originalPicker.pick(pickArgs); + let extraFilterFactory: FilterFactory = new CallTrackingFilterFactory(() => { + this.concurrentRequests -= 1; + }); + if (originalPick.extraFilterFactory) { + extraFilterFactory = new FilterStackFactory([originalPick.extraFilterFactory, extraFilterFactory]); + } + return { + pickResultType: originalPick.pickResultType, + status: originalPick.status, + subchannel: originalPick.subchannel, + onCallStarted: () => { + originalPick.onCallStarted?.(); + this.concurrentRequests += 1; + }, + extraFilterFactory: extraFilterFactory + }; } else { - this.clusterDropStats?.addCallDropped(dropCategory); + let details: string; + if (dropCategory === true) { + details = 'Call dropped by load balancing policy.'; + this.clusterDropStats?.addUncategorizedCallDropped(); + } else { + details = `Call dropped by load balancing policy. Category: ${dropCategory}`; + this.clusterDropStats?.addCallDropped(dropCategory); + } return { pickResultType: PickResultType.DROP, status: { code: Status.UNAVAILABLE, - details: `Call dropped by load balancing policy. Category: ${dropCategory}`, + details: details, metadata: new Metadata(), }, subchannel: null, extraFilterFactory: null, - onCallStarted: null, + onCallStarted: null }; } }, @@ -218,9 +277,13 @@ export class EdsLoadBalancer implements LoadBalancer { /** * Check whether a single call should be dropped according to the current * policy, based on randomly chosen numbers. Returns the drop category if - * the call should be dropped, and null otherwise. + * the call should be dropped, and null otherwise. true is a valid + * output, as a sentinel value indicating a drop with no category. */ - private checkForDrop(): string | null { + private checkForDrop(): string | true | null { + if (this.lastestConfig && this.concurrentRequests >= this.lastestConfig.getMaxConcurrentRequests()) { + return true; + } if (!this.latestEdsUpdate?.policy) { return null; } diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index adc54ed4..ddca9284 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -114,6 +114,7 @@ function localityEqual( } export interface XdsClusterDropStats { + addUncategorizedCallDropped(): void; addCallDropped(category: string): void; } @@ -158,6 +159,7 @@ interface ClusterLocalityStats { interface ClusterLoadReport { callsDropped: Map; + uncategorizedCallsDropped: number; localityStats: ClusterLocalityStats[]; intervalStart: [number, number]; } @@ -195,6 +197,7 @@ class ClusterLoadReportMap { } const newStats: ClusterLoadReport = { callsDropped: new Map(), + uncategorizedCallsDropped: 0, localityStats: [], intervalStart: process.hrtime(), }; @@ -871,8 +874,10 @@ export class XdsClient { totalDroppedRequests += count; } } + totalDroppedRequests += stats.uncategorizedCallsDropped; // Clear out dropped call stats after sending them stats.callsDropped.clear(); + stats.uncategorizedCallsDropped = 0; const interval = process.hrtime(stats.intervalStart); stats.intervalStart = process.hrtime(); // Skip clusters with 0 requests @@ -957,6 +962,7 @@ export class XdsClient { trace('addClusterDropStats(lrsServer=' + lrsServer + ', clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')'); if (lrsServer !== '') { return { + addUncategorizedCallDropped: () => {}, addCallDropped: (category) => {}, }; } @@ -965,6 +971,9 @@ export class XdsClient { edsServiceName ); return { + addUncategorizedCallDropped: () => { + clusterStats.uncategorizedCallsDropped += 1; + }, addCallDropped: (category) => { const prevCount = clusterStats.callsDropped.get(category) ?? 0; clusterStats.callsDropped.set(category, prevCount + 1);