Merge pull request #1514 from murgatroid99/grpc-js_eds_call_dropping

grpc-js: xDS: add support for dropping calls and reporting drops
This commit is contained in:
Michael Lumish 2020-07-27 14:32:33 -07:00 committed by GitHub
commit 209c224094
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1124 additions and 22 deletions

View File

@ -48,7 +48,7 @@
"clean": "node -e 'require(\"rimraf\")(\"./build\", () => {})'",
"compile": "tsc -p .",
"format": "clang-format -i -style=\"{Language: JavaScript, BasedOnStyle: Google, ColumnLimit: 80}\" src/*.ts test/*.ts",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib ../index envoy/service/discovery/v2/ads.proto envoy/api/v2/listener.proto envoy/api/v2/route.proto envoy/api/v2/cluster.proto envoy/api/v2/endpoint.proto",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib ../index envoy/service/discovery/v2/ads.proto envoy/service/load_stats/v2/lrs.proto envoy/api/v2/listener.proto envoy/api/v2/route.proto envoy/api/v2/cluster.proto envoy/api/v2/endpoint.proto",
"lint": "npm run check",
"prepare": "npm run compile",
"test": "gulp test",

View File

@ -390,6 +390,12 @@ export class ChannelImplementation implements Channel {
);
}
break;
case PickResultType.DROP:
callStream.cancelWithStatus(
pickResult.status!.code,
pickResult.status!.details
);
break;
default:
throw new Error(
`Invalid state: unknown pickResultType ${pickResult.pickResultType}`

View File

@ -0,0 +1,117 @@
// Original file: deps/envoy-api/envoy/api/v2/endpoint/load_report.proto
import { UpstreamLocalityStats as _envoy_api_v2_endpoint_UpstreamLocalityStats, UpstreamLocalityStats__Output as _envoy_api_v2_endpoint_UpstreamLocalityStats__Output } from '../../../../envoy/api/v2/endpoint/UpstreamLocalityStats';
import { Duration as _google_protobuf_Duration, Duration__Output as _google_protobuf_Duration__Output } from '../../../../google/protobuf/Duration';
import { Long } from '@grpc/proto-loader';
export interface _envoy_api_v2_endpoint_ClusterStats_DroppedRequests {
/**
* Identifier for the policy specifying the drop.
*/
'category'?: (string);
/**
* Total number of deliberately dropped requests for the category.
*/
'dropped_count'?: (number | string | Long);
}
export interface _envoy_api_v2_endpoint_ClusterStats_DroppedRequests__Output {
/**
* Identifier for the policy specifying the drop.
*/
'category': (string);
/**
* Total number of deliberately dropped requests for the category.
*/
'dropped_count': (string);
}
/**
* Per cluster load stats. Envoy reports these stats a management server in a
* :ref:`LoadStatsRequest<envoy_api_msg_service.load_stats.v2.LoadStatsRequest>`
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
* Next ID: 7
* [#next-free-field: 7]
*/
export interface ClusterStats {
/**
* The name of the cluster.
*/
'cluster_name'?: (string);
/**
* Need at least one.
*/
'upstream_locality_stats'?: (_envoy_api_v2_endpoint_UpstreamLocalityStats)[];
/**
* Cluster-level stats such as total_successful_requests may be computed by
* summing upstream_locality_stats. In addition, below there are additional
* cluster-wide stats.
*
* The total number of dropped requests. This covers requests
* deliberately dropped by the drop_overload policy and circuit breaking.
*/
'total_dropped_requests'?: (number | string | Long);
/**
* Period over which the actual load report occurred. This will be guaranteed to include every
* request reported. Due to system load and delays between the *LoadStatsRequest* sent from Envoy
* and the *LoadStatsResponse* message sent from the management server, this may be longer than
* the requested load reporting interval in the *LoadStatsResponse*.
*/
'load_report_interval'?: (_google_protobuf_Duration);
/**
* Information about deliberately dropped requests for each category specified
* in the DropOverload policy.
*/
'dropped_requests'?: (_envoy_api_v2_endpoint_ClusterStats_DroppedRequests)[];
/**
* The eds_cluster_config service_name of the cluster.
* It's possible that two clusters send the same service_name to EDS,
* in that case, the management server is supposed to do aggregation on the load reports.
*/
'cluster_service_name'?: (string);
}
/**
* Per cluster load stats. Envoy reports these stats a management server in a
* :ref:`LoadStatsRequest<envoy_api_msg_service.load_stats.v2.LoadStatsRequest>`
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
* Next ID: 7
* [#next-free-field: 7]
*/
export interface ClusterStats__Output {
/**
* The name of the cluster.
*/
'cluster_name': (string);
/**
* Need at least one.
*/
'upstream_locality_stats': (_envoy_api_v2_endpoint_UpstreamLocalityStats__Output)[];
/**
* Cluster-level stats such as total_successful_requests may be computed by
* summing upstream_locality_stats. In addition, below there are additional
* cluster-wide stats.
*
* The total number of dropped requests. This covers requests
* deliberately dropped by the drop_overload policy and circuit breaking.
*/
'total_dropped_requests': (string);
/**
* Period over which the actual load report occurred. This will be guaranteed to include every
* request reported. Due to system load and delays between the *LoadStatsRequest* sent from Envoy
* and the *LoadStatsResponse* message sent from the management server, this may be longer than
* the requested load reporting interval in the *LoadStatsResponse*.
*/
'load_report_interval'?: (_google_protobuf_Duration__Output);
/**
* Information about deliberately dropped requests for each category specified
* in the DropOverload policy.
*/
'dropped_requests': (_envoy_api_v2_endpoint_ClusterStats_DroppedRequests__Output)[];
/**
* The eds_cluster_config service_name of the cluster.
* It's possible that two clusters send the same service_name to EDS,
* in that case, the management server is supposed to do aggregation on the load reports.
*/
'cluster_service_name': (string);
}

View File

@ -0,0 +1,41 @@
// Original file: deps/envoy-api/envoy/api/v2/endpoint/load_report.proto
import { Long } from '@grpc/proto-loader';
/**
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
*/
export interface EndpointLoadMetricStats {
/**
* Name of the metric; may be empty.
*/
'metric_name'?: (string);
/**
* Number of calls that finished and included this metric.
*/
'num_requests_finished_with_metric'?: (number | string | Long);
/**
* Sum of metric values across all calls that finished with this metric for
* load_reporting_interval.
*/
'total_metric_value'?: (number | string);
}
/**
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
*/
export interface EndpointLoadMetricStats__Output {
/**
* Name of the metric; may be empty.
*/
'metric_name': (string);
/**
* Number of calls that finished and included this metric.
*/
'num_requests_finished_with_metric': (string);
/**
* Sum of metric values across all calls that finished with this metric for
* load_reporting_interval.
*/
'total_metric_value': (number | string);
}

View File

@ -0,0 +1,106 @@
// Original file: deps/envoy-api/envoy/api/v2/endpoint/load_report.proto
import { Address as _envoy_api_v2_core_Address, Address__Output as _envoy_api_v2_core_Address__Output } from '../../../../envoy/api/v2/core/Address';
import { EndpointLoadMetricStats as _envoy_api_v2_endpoint_EndpointLoadMetricStats, EndpointLoadMetricStats__Output as _envoy_api_v2_endpoint_EndpointLoadMetricStats__Output } from '../../../../envoy/api/v2/endpoint/EndpointLoadMetricStats';
import { Struct as _google_protobuf_Struct, Struct__Output as _google_protobuf_Struct__Output } from '../../../../google/protobuf/Struct';
import { Long } from '@grpc/proto-loader';
/**
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
* [#next-free-field: 8]
*/
export interface UpstreamEndpointStats {
/**
* Upstream host address.
*/
'address'?: (_envoy_api_v2_core_Address);
/**
* The total number of requests successfully completed by the endpoints in the
* locality. These include non-5xx responses for HTTP, where errors
* originate at the client and the endpoint responded successfully. For gRPC,
* the grpc-status values are those not covered by total_error_requests below.
*/
'total_successful_requests'?: (number | string | Long);
/**
* The total number of unfinished requests for this endpoint.
*/
'total_requests_in_progress'?: (number | string | Long);
/**
* The total number of requests that failed due to errors at the endpoint.
* For HTTP these are responses with 5xx status codes and for gRPC the
* grpc-status values:
*
* - DeadlineExceeded
* - Unimplemented
* - Internal
* - Unavailable
* - Unknown
* - DataLoss
*/
'total_error_requests'?: (number | string | Long);
/**
* Stats for multi-dimensional load balancing.
*/
'load_metric_stats'?: (_envoy_api_v2_endpoint_EndpointLoadMetricStats)[];
/**
* Opaque and implementation dependent metadata of the
* endpoint. Envoy will pass this directly to the management server.
*/
'metadata'?: (_google_protobuf_Struct);
/**
* The total number of requests that were issued to this endpoint
* since the last report. A single TCP connection, HTTP or gRPC
* request or stream is counted as one request.
*/
'total_issued_requests'?: (number | string | Long);
}
/**
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
* [#next-free-field: 8]
*/
export interface UpstreamEndpointStats__Output {
/**
* Upstream host address.
*/
'address'?: (_envoy_api_v2_core_Address__Output);
/**
* The total number of requests successfully completed by the endpoints in the
* locality. These include non-5xx responses for HTTP, where errors
* originate at the client and the endpoint responded successfully. For gRPC,
* the grpc-status values are those not covered by total_error_requests below.
*/
'total_successful_requests': (string);
/**
* The total number of unfinished requests for this endpoint.
*/
'total_requests_in_progress': (string);
/**
* The total number of requests that failed due to errors at the endpoint.
* For HTTP these are responses with 5xx status codes and for gRPC the
* grpc-status values:
*
* - DeadlineExceeded
* - Unimplemented
* - Internal
* - Unavailable
* - Unknown
* - DataLoss
*/
'total_error_requests': (string);
/**
* Stats for multi-dimensional load balancing.
*/
'load_metric_stats': (_envoy_api_v2_endpoint_EndpointLoadMetricStats__Output)[];
/**
* Opaque and implementation dependent metadata of the
* endpoint. Envoy will pass this directly to the management server.
*/
'metadata'?: (_google_protobuf_Struct__Output);
/**
* The total number of requests that were issued to this endpoint
* since the last report. A single TCP connection, HTTP or gRPC
* request or stream is counted as one request.
*/
'total_issued_requests': (string);
}

View File

@ -0,0 +1,108 @@
// Original file: deps/envoy-api/envoy/api/v2/endpoint/load_report.proto
import { Locality as _envoy_api_v2_core_Locality, Locality__Output as _envoy_api_v2_core_Locality__Output } from '../../../../envoy/api/v2/core/Locality';
import { EndpointLoadMetricStats as _envoy_api_v2_endpoint_EndpointLoadMetricStats, EndpointLoadMetricStats__Output as _envoy_api_v2_endpoint_EndpointLoadMetricStats__Output } from '../../../../envoy/api/v2/endpoint/EndpointLoadMetricStats';
import { UpstreamEndpointStats as _envoy_api_v2_endpoint_UpstreamEndpointStats, UpstreamEndpointStats__Output as _envoy_api_v2_endpoint_UpstreamEndpointStats__Output } from '../../../../envoy/api/v2/endpoint/UpstreamEndpointStats';
import { Long } from '@grpc/proto-loader';
/**
* These are stats Envoy reports to GLB every so often. Report frequency is
* defined by
* :ref:`LoadStatsResponse.load_reporting_interval<envoy_api_field_service.load_stats.v2.LoadStatsResponse.load_reporting_interval>`.
* Stats per upstream region/zone and optionally per subzone.
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
* [#next-free-field: 9]
*/
export interface UpstreamLocalityStats {
/**
* Name of zone, region and optionally endpoint group these metrics were
* collected from. Zone and region names could be empty if unknown.
*/
'locality'?: (_envoy_api_v2_core_Locality);
/**
* The total number of requests successfully completed by the endpoints in the
* locality.
*/
'total_successful_requests'?: (number | string | Long);
/**
* The total number of unfinished requests
*/
'total_requests_in_progress'?: (number | string | Long);
/**
* The total number of requests that failed due to errors at the endpoint,
* aggregated over all endpoints in the locality.
*/
'total_error_requests'?: (number | string | Long);
/**
* Stats for multi-dimensional load balancing.
*/
'load_metric_stats'?: (_envoy_api_v2_endpoint_EndpointLoadMetricStats)[];
/**
* [#not-implemented-hide:] The priority of the endpoint group these metrics
* were collected from.
*/
'priority'?: (number);
/**
* Endpoint granularity stats information for this locality. This information
* is populated if the Server requests it by setting
* :ref:`LoadStatsResponse.report_endpoint_granularity<envoy_api_field_service.load_stats.v2.LoadStatsResponse.report_endpoint_granularity>`.
*/
'upstream_endpoint_stats'?: (_envoy_api_v2_endpoint_UpstreamEndpointStats)[];
/**
* The total number of requests that were issued by this Envoy since
* the last report. This information is aggregated over all the
* upstream endpoints in the locality.
*/
'total_issued_requests'?: (number | string | Long);
}
/**
* These are stats Envoy reports to GLB every so often. Report frequency is
* defined by
* :ref:`LoadStatsResponse.load_reporting_interval<envoy_api_field_service.load_stats.v2.LoadStatsResponse.load_reporting_interval>`.
* Stats per upstream region/zone and optionally per subzone.
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
* [#next-free-field: 9]
*/
export interface UpstreamLocalityStats__Output {
/**
* Name of zone, region and optionally endpoint group these metrics were
* collected from. Zone and region names could be empty if unknown.
*/
'locality'?: (_envoy_api_v2_core_Locality__Output);
/**
* The total number of requests successfully completed by the endpoints in the
* locality.
*/
'total_successful_requests': (string);
/**
* The total number of unfinished requests
*/
'total_requests_in_progress': (string);
/**
* The total number of requests that failed due to errors at the endpoint,
* aggregated over all endpoints in the locality.
*/
'total_error_requests': (string);
/**
* Stats for multi-dimensional load balancing.
*/
'load_metric_stats': (_envoy_api_v2_endpoint_EndpointLoadMetricStats__Output)[];
/**
* [#not-implemented-hide:] The priority of the endpoint group these metrics
* were collected from.
*/
'priority': (number);
/**
* Endpoint granularity stats information for this locality. This information
* is populated if the Server requests it by setting
* :ref:`LoadStatsResponse.report_endpoint_granularity<envoy_api_field_service.load_stats.v2.LoadStatsResponse.report_endpoint_granularity>`.
*/
'upstream_endpoint_stats': (_envoy_api_v2_endpoint_UpstreamEndpointStats__Output)[];
/**
* The total number of requests that were issued by this Envoy since
* the last report. This information is aggregated over all the
* upstream endpoints in the locality.
*/
'total_issued_requests': (string);
}

View File

@ -0,0 +1,108 @@
// Original file: deps/envoy-api/envoy/service/load_stats/v2/lrs.proto
import * as grpc from '../../../../../index'
import { LoadStatsRequest as _envoy_service_load_stats_v2_LoadStatsRequest, LoadStatsRequest__Output as _envoy_service_load_stats_v2_LoadStatsRequest__Output } from '../../../../envoy/service/load_stats/v2/LoadStatsRequest';
import { LoadStatsResponse as _envoy_service_load_stats_v2_LoadStatsResponse, LoadStatsResponse__Output as _envoy_service_load_stats_v2_LoadStatsResponse__Output } from '../../../../envoy/service/load_stats/v2/LoadStatsResponse';
export interface LoadReportingServiceClient extends grpc.Client {
/**
* Advanced API to allow for multi-dimensional load balancing by remote
* server. For receiving LB assignments, the steps are:
* 1, The management server is configured with per cluster/zone/load metric
* capacity configuration. The capacity configuration definition is
* outside of the scope of this document.
* 2. Envoy issues a standard {Stream,Fetch}Endpoints request for the clusters
* to balance.
*
* Independently, Envoy will initiate a StreamLoadStats bidi stream with a
* management server:
* 1. Once a connection establishes, the management server publishes a
* LoadStatsResponse for all clusters it is interested in learning load
* stats about.
* 2. For each cluster, Envoy load balances incoming traffic to upstream hosts
* based on per-zone weights and/or per-instance weights (if specified)
* based on intra-zone LbPolicy. This information comes from the above
* {Stream,Fetch}Endpoints.
* 3. When upstream hosts reply, they optionally add header <define header
* name> with ASCII representation of EndpointLoadMetricStats.
* 4. Envoy aggregates load reports over the period of time given to it in
* LoadStatsResponse.load_reporting_interval. This includes aggregation
* stats Envoy maintains by itself (total_requests, rpc_errors etc.) as
* well as load metrics from upstream hosts.
* 5. When the timer of load_reporting_interval expires, Envoy sends new
* LoadStatsRequest filled with load reports for each cluster.
* 6. The management server uses the load reports from all reported Envoys
* from around the world, computes global assignment and prepares traffic
* assignment destined for each zone Envoys are located in. Goto 2.
*/
StreamLoadStats(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_envoy_service_load_stats_v2_LoadStatsRequest, _envoy_service_load_stats_v2_LoadStatsResponse__Output>;
StreamLoadStats(options?: grpc.CallOptions): grpc.ClientDuplexStream<_envoy_service_load_stats_v2_LoadStatsRequest, _envoy_service_load_stats_v2_LoadStatsResponse__Output>;
/**
* Advanced API to allow for multi-dimensional load balancing by remote
* server. For receiving LB assignments, the steps are:
* 1, The management server is configured with per cluster/zone/load metric
* capacity configuration. The capacity configuration definition is
* outside of the scope of this document.
* 2. Envoy issues a standard {Stream,Fetch}Endpoints request for the clusters
* to balance.
*
* Independently, Envoy will initiate a StreamLoadStats bidi stream with a
* management server:
* 1. Once a connection establishes, the management server publishes a
* LoadStatsResponse for all clusters it is interested in learning load
* stats about.
* 2. For each cluster, Envoy load balances incoming traffic to upstream hosts
* based on per-zone weights and/or per-instance weights (if specified)
* based on intra-zone LbPolicy. This information comes from the above
* {Stream,Fetch}Endpoints.
* 3. When upstream hosts reply, they optionally add header <define header
* name> with ASCII representation of EndpointLoadMetricStats.
* 4. Envoy aggregates load reports over the period of time given to it in
* LoadStatsResponse.load_reporting_interval. This includes aggregation
* stats Envoy maintains by itself (total_requests, rpc_errors etc.) as
* well as load metrics from upstream hosts.
* 5. When the timer of load_reporting_interval expires, Envoy sends new
* LoadStatsRequest filled with load reports for each cluster.
* 6. The management server uses the load reports from all reported Envoys
* from around the world, computes global assignment and prepares traffic
* assignment destined for each zone Envoys are located in. Goto 2.
*/
streamLoadStats(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_envoy_service_load_stats_v2_LoadStatsRequest, _envoy_service_load_stats_v2_LoadStatsResponse__Output>;
streamLoadStats(options?: grpc.CallOptions): grpc.ClientDuplexStream<_envoy_service_load_stats_v2_LoadStatsRequest, _envoy_service_load_stats_v2_LoadStatsResponse__Output>;
}
export interface LoadReportingServiceHandlers {
/**
* Advanced API to allow for multi-dimensional load balancing by remote
* server. For receiving LB assignments, the steps are:
* 1, The management server is configured with per cluster/zone/load metric
* capacity configuration. The capacity configuration definition is
* outside of the scope of this document.
* 2. Envoy issues a standard {Stream,Fetch}Endpoints request for the clusters
* to balance.
*
* Independently, Envoy will initiate a StreamLoadStats bidi stream with a
* management server:
* 1. Once a connection establishes, the management server publishes a
* LoadStatsResponse for all clusters it is interested in learning load
* stats about.
* 2. For each cluster, Envoy load balances incoming traffic to upstream hosts
* based on per-zone weights and/or per-instance weights (if specified)
* based on intra-zone LbPolicy. This information comes from the above
* {Stream,Fetch}Endpoints.
* 3. When upstream hosts reply, they optionally add header <define header
* name> with ASCII representation of EndpointLoadMetricStats.
* 4. Envoy aggregates load reports over the period of time given to it in
* LoadStatsResponse.load_reporting_interval. This includes aggregation
* stats Envoy maintains by itself (total_requests, rpc_errors etc.) as
* well as load metrics from upstream hosts.
* 5. When the timer of load_reporting_interval expires, Envoy sends new
* LoadStatsRequest filled with load reports for each cluster.
* 6. The management server uses the load reports from all reported Envoys
* from around the world, computes global assignment and prepares traffic
* assignment destined for each zone Envoys are located in. Goto 2.
*/
StreamLoadStats(call: grpc.ServerDuplexStream<_envoy_service_load_stats_v2_LoadStatsRequest, _envoy_service_load_stats_v2_LoadStatsResponse__Output>): void;
}

View File

@ -0,0 +1,34 @@
// Original file: deps/envoy-api/envoy/service/load_stats/v2/lrs.proto
import { Node as _envoy_api_v2_core_Node, Node__Output as _envoy_api_v2_core_Node__Output } from '../../../../envoy/api/v2/core/Node';
import { ClusterStats as _envoy_api_v2_endpoint_ClusterStats, ClusterStats__Output as _envoy_api_v2_endpoint_ClusterStats__Output } from '../../../../envoy/api/v2/endpoint/ClusterStats';
/**
* A load report Envoy sends to the management server.
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
*/
export interface LoadStatsRequest {
/**
* Node identifier for Envoy instance.
*/
'node'?: (_envoy_api_v2_core_Node);
/**
* A list of load stats to report.
*/
'cluster_stats'?: (_envoy_api_v2_endpoint_ClusterStats)[];
}
/**
* A load report Envoy sends to the management server.
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
*/
export interface LoadStatsRequest__Output {
/**
* Node identifier for Envoy instance.
*/
'node'?: (_envoy_api_v2_core_Node__Output);
/**
* A list of load stats to report.
*/
'cluster_stats': (_envoy_api_v2_endpoint_ClusterStats__Output)[];
}

View File

@ -0,0 +1,71 @@
// Original file: deps/envoy-api/envoy/service/load_stats/v2/lrs.proto
import { Duration as _google_protobuf_Duration, Duration__Output as _google_protobuf_Duration__Output } from '../../../../google/protobuf/Duration';
/**
* The management server sends envoy a LoadStatsResponse with all clusters it
* is interested in learning load stats about.
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
*/
export interface LoadStatsResponse {
/**
* Clusters to report stats for.
* Not populated if *send_all_clusters* is true.
*/
'clusters'?: (string)[];
/**
* The minimum interval of time to collect stats over. This is only a minimum for two reasons:
* 1. There may be some delay from when the timer fires until stats sampling occurs.
* 2. For clusters that were already feature in the previous *LoadStatsResponse*, any traffic
* that is observed in between the corresponding previous *LoadStatsRequest* and this
* *LoadStatsResponse* will also be accumulated and billed to the cluster. This avoids a period
* of inobservability that might otherwise exists between the messages. New clusters are not
* subject to this consideration.
*/
'load_reporting_interval'?: (_google_protobuf_Duration);
/**
* Set to *true* if the management server supports endpoint granularity
* report.
*/
'report_endpoint_granularity'?: (boolean);
/**
* If true, the client should send all clusters it knows about.
* Only clients that advertise the "envoy.lrs.supports_send_all_clusters" capability in their
* :ref:`client_features<envoy_api_field_core.Node.client_features>` field will honor this field.
*/
'send_all_clusters'?: (boolean);
}
/**
* The management server sends envoy a LoadStatsResponse with all clusters it
* is interested in learning load stats about.
* [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
*/
export interface LoadStatsResponse__Output {
/**
* Clusters to report stats for.
* Not populated if *send_all_clusters* is true.
*/
'clusters': (string)[];
/**
* The minimum interval of time to collect stats over. This is only a minimum for two reasons:
* 1. There may be some delay from when the timer fires until stats sampling occurs.
* 2. For clusters that were already feature in the previous *LoadStatsResponse*, any traffic
* that is observed in between the corresponding previous *LoadStatsRequest* and this
* *LoadStatsResponse* will also be accumulated and billed to the cluster. This avoids a period
* of inobservability that might otherwise exists between the messages. New clusters are not
* subject to this consideration.
*/
'load_reporting_interval'?: (_google_protobuf_Duration__Output);
/**
* Set to *true* if the management server supports endpoint granularity
* report.
*/
'report_endpoint_granularity': (boolean);
/**
* If true, the client should send all clusters it knows about.
* Only clients that advertise the "envoy.lrs.supports_send_all_clusters" capability in their
* :ref:`client_features<envoy_api_field_core.Node.client_features>` field will honor this field.
*/
'send_all_clusters': (boolean);
}

View File

@ -0,0 +1,146 @@
import * as grpc from '../index';
import { ServiceDefinition, EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader';
import { LoadReportingServiceClient as _envoy_service_load_stats_v2_LoadReportingServiceClient } from './envoy/service/load_stats/v2/LoadReportingService';
type ConstructorArguments<Constructor> = Constructor extends new (...args: infer Args) => any ? Args: never;
type SubtypeConstructor<Constructor, Subtype> = {
new(...args: ConstructorArguments<Constructor>): Subtype;
}
export interface ProtoGrpcType {
envoy: {
api: {
v2: {
core: {
Address: MessageTypeDefinition
AsyncDataSource: MessageTypeDefinition
BackoffStrategy: MessageTypeDefinition
BindConfig: MessageTypeDefinition
BuildVersion: MessageTypeDefinition
CidrRange: MessageTypeDefinition
ControlPlane: MessageTypeDefinition
DataSource: MessageTypeDefinition
Extension: MessageTypeDefinition
HeaderMap: MessageTypeDefinition
HeaderValue: MessageTypeDefinition
HeaderValueOption: MessageTypeDefinition
HttpUri: MessageTypeDefinition
Locality: MessageTypeDefinition
Metadata: MessageTypeDefinition
Node: MessageTypeDefinition
Pipe: MessageTypeDefinition
RemoteDataSource: MessageTypeDefinition
RequestMethod: EnumTypeDefinition
RetryPolicy: MessageTypeDefinition
RoutingPriority: EnumTypeDefinition
RuntimeDouble: MessageTypeDefinition
RuntimeFeatureFlag: MessageTypeDefinition
RuntimeFractionalPercent: MessageTypeDefinition
RuntimeUInt32: MessageTypeDefinition
SocketAddress: MessageTypeDefinition
SocketOption: MessageTypeDefinition
TcpKeepalive: MessageTypeDefinition
TrafficDirection: EnumTypeDefinition
TransportSocket: MessageTypeDefinition
}
endpoint: {
ClusterStats: MessageTypeDefinition
EndpointLoadMetricStats: MessageTypeDefinition
UpstreamEndpointStats: MessageTypeDefinition
UpstreamLocalityStats: MessageTypeDefinition
}
}
}
service: {
load_stats: {
v2: {
LoadReportingService: SubtypeConstructor<typeof grpc.Client, _envoy_service_load_stats_v2_LoadReportingServiceClient> & { service: ServiceDefinition }
LoadStatsRequest: MessageTypeDefinition
LoadStatsResponse: MessageTypeDefinition
}
}
}
type: {
FractionalPercent: MessageTypeDefinition
Percent: MessageTypeDefinition
SemanticVersion: MessageTypeDefinition
}
}
google: {
protobuf: {
Any: MessageTypeDefinition
BoolValue: MessageTypeDefinition
BytesValue: MessageTypeDefinition
DescriptorProto: MessageTypeDefinition
DoubleValue: MessageTypeDefinition
Duration: MessageTypeDefinition
EnumDescriptorProto: MessageTypeDefinition
EnumOptions: MessageTypeDefinition
EnumValueDescriptorProto: MessageTypeDefinition
EnumValueOptions: MessageTypeDefinition
FieldDescriptorProto: MessageTypeDefinition
FieldOptions: MessageTypeDefinition
FileDescriptorProto: MessageTypeDefinition
FileDescriptorSet: MessageTypeDefinition
FileOptions: MessageTypeDefinition
FloatValue: MessageTypeDefinition
GeneratedCodeInfo: MessageTypeDefinition
Int32Value: MessageTypeDefinition
Int64Value: MessageTypeDefinition
ListValue: MessageTypeDefinition
MessageOptions: MessageTypeDefinition
MethodDescriptorProto: MessageTypeDefinition
MethodOptions: MessageTypeDefinition
NullValue: EnumTypeDefinition
OneofDescriptorProto: MessageTypeDefinition
OneofOptions: MessageTypeDefinition
ServiceDescriptorProto: MessageTypeDefinition
ServiceOptions: MessageTypeDefinition
SourceCodeInfo: MessageTypeDefinition
StringValue: MessageTypeDefinition
Struct: MessageTypeDefinition
Timestamp: MessageTypeDefinition
UInt32Value: MessageTypeDefinition
UInt64Value: MessageTypeDefinition
UninterpretedOption: MessageTypeDefinition
Value: MessageTypeDefinition
}
}
udpa: {
annotations: {
FieldMigrateAnnotation: MessageTypeDefinition
FileMigrateAnnotation: MessageTypeDefinition
MigrateAnnotation: MessageTypeDefinition
PackageVersionStatus: EnumTypeDefinition
StatusAnnotation: MessageTypeDefinition
}
}
validate: {
AnyRules: MessageTypeDefinition
BoolRules: MessageTypeDefinition
BytesRules: MessageTypeDefinition
DoubleRules: MessageTypeDefinition
DurationRules: MessageTypeDefinition
EnumRules: MessageTypeDefinition
FieldRules: MessageTypeDefinition
Fixed32Rules: MessageTypeDefinition
Fixed64Rules: MessageTypeDefinition
FloatRules: MessageTypeDefinition
Int32Rules: MessageTypeDefinition
Int64Rules: MessageTypeDefinition
KnownRegex: EnumTypeDefinition
MapRules: MessageTypeDefinition
MessageRules: MessageTypeDefinition
RepeatedRules: MessageTypeDefinition
SFixed32Rules: MessageTypeDefinition
SFixed64Rules: MessageTypeDefinition
SInt32Rules: MessageTypeDefinition
SInt64Rules: MessageTypeDefinition
StringRules: MessageTypeDefinition
TimestampRules: MessageTypeDefinition
UInt32Rules: MessageTypeDefinition
UInt64Rules: MessageTypeDefinition
}
}

View File

@ -32,10 +32,10 @@ import {
PriorityLoadBalancingConfig,
} from './load-balancing-config';
import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { XdsClient, Watcher } from './xds-client';
import { XdsClient, Watcher, XdsClusterDropStats } from './xds-client';
import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import { ConnectivityState } from './channel';
import { UnavailablePicker } from './picker';
import { UnavailablePicker, Picker, PickResultType } from './picker';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { LocalitySubchannelAddress } from './load-balancer-priority';
import { Status } from './constants';
@ -83,8 +83,48 @@ export class EdsLoadBalancer implements LoadBalancer {
private nextPriorityChildNumber = 0;
private clusterDropStats: XdsClusterDropStats | null = null;
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
this.childBalancer = new ChildLoadBalancerHandler({
createSubchannel: (subchannelAddress, subchannelArgs) =>
this.channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs
),
requestReresolution: () =>
this.channelControlHelper.requestReresolution(),
updateState: (connectivityState, originalPicker) => {
if (this.latestEdsUpdate === null) {
return;
}
const edsPicker: Picker = {
pick: (pickArgs) => {
const dropCategory = this.checkForDrop();
/* If we drop the call, it ends with an UNAVAILABLE status.
* Otherwise, delegate picking the subchannel to the child
* balancer. */
if (dropCategory === null) {
return originalPicker.pick(pickArgs);
} else {
this.clusterDropStats?.addCallDropped(dropCategory);
return {
pickResultType: PickResultType.DROP,
status: {
code: Status.UNAVAILABLE,
details: `Call dropped by load balancing policy. Category: ${dropCategory}`,
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactory: null,
onCallStarted: null,
};
}
},
};
this.channelControlHelper.updateState(connectivityState, edsPicker);
},
});
this.watcher = {
onValidUpdate: (update) => {
this.latestEdsUpdate = update;
@ -112,6 +152,44 @@ export class EdsLoadBalancer implements LoadBalancer {
};
}
/**
* Check whether a single call should be dropped according to the current
* policy, based on randomly chosen numbers. Returns the drop category if
* the call should be dropped, and null otherwise.
*/
private checkForDrop(): string | null {
if (!this.latestEdsUpdate?.policy) {
return null;
}
/* The drop_overloads policy is a list of pairs of category names and
* probabilities. For each one, if the random number is within that
* probability range, we drop the call citing that category. Otherwise, the
* call proceeds as usual. */
for (const dropOverload of this.latestEdsUpdate.policy.drop_overloads) {
if (!dropOverload.drop_percentage) {
continue;
}
let randNum: number;
switch (dropOverload.drop_percentage.denominator) {
case 'HUNDRED':
randNum = Math.random() * 100;
break;
case 'TEN_THOUSAND':
randNum = Math.random() * 10_000;
break;
case 'MILLION':
randNum = Math.random() * 1_000_000;
break;
default:
continue;
}
if (randNum < dropOverload.drop_percentage.numerator) {
return dropOverload.category;
}
}
return null;
}
/**
* Should be called when this balancer gets a new config and when the
* XdsClient returns a new ClusterLoadAssignment.
@ -307,6 +385,14 @@ export class EdsLoadBalancer implements LoadBalancer {
this.isWatcherActive = true;
}
if (lbConfig.eds.lrsLoadReportingServerName) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
lbConfig.eds.lrsLoadReportingServerName,
lbConfig.eds.cluster,
lbConfig.eds.edsServiceName ?? ''
);
}
/* If updateAddressList is called after receiving an update and the update
* is still valid, we want to update the child config with the information
* in the new EdsLoadBalancingConfig. */

View File

@ -26,6 +26,7 @@ export enum PickResultType {
COMPLETE,
QUEUE,
TRANSIENT_FAILURE,
DROP,
}
export interface PickResult {
@ -74,6 +75,14 @@ export interface TransientFailurePickResult extends PickResult {
onCallStarted: null;
}
export interface DropCallPickResult extends PickResult {
pickResultType: PickResultType.DROP;
subchannel: null;
status: StatusObject;
extraFilterFactory: null;
onCallStarted: null;
}
export interface PickArgs {
metadata: Metadata;
}

View File

@ -18,6 +18,7 @@
import * as protoLoader from '@grpc/proto-loader';
import { loadPackageDefinition } from './make-client';
import * as adsTypes from './generated/ads';
import * as lrsTypes from './generated/lrs';
import { createGoogleDefaultCredentials } from './channel-credentials';
import { loadBootstrapInfo } from './xds-bootstrap';
import { ClientDuplexStream, ServiceError } from './call';
@ -34,6 +35,15 @@ import { DiscoveryRequest } from './generated/envoy/api/v2/DiscoveryRequest';
import { DiscoveryResponse__Output } from './generated/envoy/api/v2/DiscoveryResponse';
import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import { LoadReportingServiceClient } from './generated/envoy/service/load_stats/v2/LoadReportingService';
import { LoadStatsRequest } from './generated/envoy/service/load_stats/v2/LoadStatsRequest';
import { LoadStatsResponse__Output } from './generated/envoy/service/load_stats/v2/LoadStatsResponse';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import {
ClusterStats,
_envoy_api_v2_endpoint_ClusterStats_DroppedRequests,
} from './generated/envoy/api/v2/endpoint/ClusterStats';
import { UpstreamLocalityStats } from './generated/envoy/api/v2/endpoint/UpstreamLocalityStats';
const TRACER_NAME = 'xds_client';
@ -46,9 +56,13 @@ const clientVersion = require('../../package.json').version;
const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
const CDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Cluster';
let loadedProtos: Promise<adsTypes.ProtoGrpcType> | null = null;
let loadedProtos: Promise<
adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
> | null = null;
function loadAdsProtos(): Promise<adsTypes.ProtoGrpcType> {
function loadAdsProtos(): Promise<
adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
> {
if (loadedProtos !== null) {
return loadedProtos;
}
@ -80,7 +94,7 @@ function loadAdsProtos(): Promise<adsTypes.ProtoGrpcType> {
(packageDefinition) =>
(loadPackageDefinition(
packageDefinition
) as unknown) as adsTypes.ProtoGrpcType
) as unknown) as adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
);
return loadedProtos;
}
@ -91,14 +105,102 @@ export interface Watcher<UpdateType> {
onResourceDoesNotExist(): void;
}
export interface XdsClusterDropStats {
addCallDropped(category: string): void;
}
interface ClusterLocalityStats {
locality: Locality__Output;
callsStarted: number;
callsSucceeded: number;
callsFailed: number;
callsInProgress: number;
}
interface ClusterLoadReport {
callsDropped: Map<string, number>;
localityStats: ClusterLocalityStats[];
intervalStart: [number, number];
}
class ClusterLoadReportMap {
private statsMap: {
clusterName: string;
edsServiceName: string;
stats: ClusterLoadReport;
}[] = [];
get(
clusterName: string,
edsServiceName: string
): ClusterLoadReport | undefined {
for (const statsObj of this.statsMap) {
if (
statsObj.clusterName === clusterName &&
statsObj.edsServiceName === edsServiceName
) {
return statsObj.stats;
}
}
return undefined;
}
getOrCreate(clusterName: string, edsServiceName: string): ClusterLoadReport {
for (const statsObj of this.statsMap) {
if (
statsObj.clusterName === clusterName &&
statsObj.edsServiceName === edsServiceName
) {
return statsObj.stats;
}
}
const newStats: ClusterLoadReport = {
callsDropped: new Map<string, number>(),
localityStats: [],
intervalStart: process.hrtime(),
};
this.statsMap.push({
clusterName,
edsServiceName,
stats: newStats,
});
return newStats;
}
*entries(): IterableIterator<
[{ clusterName: string; edsServiceName: string }, ClusterLoadReport]
> {
for (const statsEntry of this.statsMap) {
yield [
{
clusterName: statsEntry.clusterName,
edsServiceName: statsEntry.edsServiceName,
},
statsEntry.stats,
];
}
}
}
export class XdsClient {
private node: Node | null = null;
private client: AggregatedDiscoveryServiceClient | null = null;
private adsNode: Node | null = null;
private adsClient: AggregatedDiscoveryServiceClient | null = null;
private adsCall: ClientDuplexStream<
DiscoveryRequest,
DiscoveryResponse__Output
> | null = null;
private lrsNode: Node | null = null;
private lrsClient: LoadReportingServiceClient | null = null;
private lrsCall: ClientDuplexStream<
LoadStatsRequest,
LoadStatsResponse__Output
> | null = null;
private latestLrsSettings: LoadStatsResponse__Output | null = null;
private clusterStatsMap: ClusterLoadReportMap = new ClusterLoadReportMap();
private statsTimer: NodeJS.Timer;
private hasShutdown = false;
private endpointWatchers: Map<
@ -146,17 +248,32 @@ export class XdsClient {
if (this.hasShutdown) {
return;
}
this.node = {
const node: Node = {
...bootstrapInfo.node,
build_version: `gRPC Node Pure JS ${clientVersion}`,
user_agent_name: 'gRPC Node Pure JS',
};
this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
this.adsNode = {
...node,
client_features: ['envoy.lb.does_not_support_overprovisioning'],
};
this.lrsNode = {
...node,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
this.adsClient = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
bootstrapInfo.xdsServers[0].serverUri,
createGoogleDefaultCredentials(),
channelArgs
);
this.maybeStartAdsStream();
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService(
bootstrapInfo.xdsServers[0].serverUri,
createGoogleDefaultCredentials(),
channelArgs
);
this.maybeStartLrsStream();
},
(error) => {
trace('Failed to initialize xDS Client. ' + error.message);
@ -168,6 +285,8 @@ export class XdsClient {
});
}
);
this.statsTimer = setInterval(() => {}, 0);
clearInterval(this.statsTimer);
}
/**
@ -175,7 +294,7 @@ export class XdsClient {
* existing stream, and there
*/
private maybeStartAdsStream() {
if (this.client === null) {
if (this.adsClient === null) {
return;
}
if (this.adsCall !== null) {
@ -184,7 +303,7 @@ export class XdsClient {
if (this.hasShutdown) {
return;
}
this.adsCall = this.client.StreamAggregatedResources();
this.adsCall = this.adsClient.StreamAggregatedResources();
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
switch (message.type_url) {
case EDS_TYPE_URL: {
@ -276,7 +395,7 @@ export class XdsClient {
const endpointWatcherNames = Array.from(this.endpointWatchers.keys());
if (endpointWatcherNames.length > 0) {
this.adsCall.write({
node: this.node!,
node: this.adsNode!,
type_url: EDS_TYPE_URL,
resource_names: endpointWatcherNames,
});
@ -288,7 +407,7 @@ export class XdsClient {
return;
}
this.adsCall.write({
node: this.node!,
node: this.adsNode!,
type_url: typeUrl,
version_info: versionInfo,
response_nonce: nonce,
@ -307,7 +426,7 @@ export class XdsClient {
return;
}
this.adsCall.write({
node: this.node!,
node: this.adsNode!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
@ -320,7 +439,7 @@ export class XdsClient {
return;
}
this.adsCall.write({
node: this.node!,
node: this.adsNode!,
type_url: CDS_TYPE_URL,
resource_names: Array.from(this.clusterWatchers.keys()),
response_nonce: this.lastCdsNonce,
@ -337,7 +456,7 @@ export class XdsClient {
return;
}
this.adsCall.write({
node: this.node!,
node: this.adsNode!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
@ -353,7 +472,7 @@ export class XdsClient {
return;
}
this.adsCall.write({
node: this.node!,
node: this.adsNode!,
type_url: CDS_TYPE_URL,
resource_names: Array.from(this.clusterWatchers.keys()),
response_nonce: this.lastCdsNonce,
@ -422,7 +541,7 @@ export class XdsClient {
private updateEdsNames() {
if (this.adsCall) {
this.adsCall.write({
node: this.node!,
node: this.adsNode!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
@ -434,7 +553,7 @@ export class XdsClient {
private updateCdsNames() {
if (this.adsCall) {
this.adsCall.write({
node: this.node!,
node: this.adsNode!,
type_url: CDS_TYPE_URL,
resource_names: Array.from(this.clusterWatchers.keys()),
response_nonce: this.lastCdsNonce,
@ -455,6 +574,125 @@ export class XdsClient {
// Also do the same for other types of watchers when those are implemented
}
private maybeStartLrsStream() {
if (!this.lrsClient) {
return;
}
if (this.lrsCall) {
return;
}
if (this.hasShutdown) {
return;
}
this.lrsCall = this.lrsClient.streamLoadStats();
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
if (
message.load_reporting_interval?.seconds !==
this.latestLrsSettings?.load_reporting_interval?.seconds ||
message.load_reporting_interval?.nanos !==
this.latestLrsSettings?.load_reporting_interval?.nanos
) {
/* Only reset the timer if the interval has changed or was not set
* before. */
clearInterval(this.statsTimer);
/* Convert a google.protobuf.Duration to a number of milliseconds for
* use with setInterval. */
const loadReportingIntervalMs =
Number.parseInt(message.load_reporting_interval!.seconds) * 1000 +
message.load_reporting_interval!.nanos / 1_000_000;
setInterval(() => {
this.sendStats();
}, loadReportingIntervalMs);
}
this.latestLrsSettings = message;
});
this.lrsCall.on('error', (error: ServiceError) => {
trace(
'LRS stream ended. code=' + error.code + ' details= ' + error.details
);
this.lrsCall = null;
clearInterval(this.statsTimer);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
});
this.lrsCall.write({
node: this.lrsNode!,
});
}
private sendStats() {
if (!this.lrsCall) {
return;
}
const clusterStats: ClusterStats[] = [];
for (const [
{ clusterName, edsServiceName },
stats,
] of this.clusterStatsMap.entries()) {
if (
this.latestLrsSettings!.send_all_clusters ||
this.latestLrsSettings!.clusters.indexOf(clusterName) > 0
) {
const upstreamLocalityStats: UpstreamLocalityStats[] = [];
for (const localityStats of stats.localityStats) {
// Skip localities with 0 requests
if (
localityStats.callsStarted > 0 ||
localityStats.callsSucceeded > 0 ||
localityStats.callsFailed > 0
) {
upstreamLocalityStats.push({
locality: localityStats.locality,
total_issued_requests: localityStats.callsStarted,
total_successful_requests: localityStats.callsSucceeded,
total_error_requests: localityStats.callsFailed,
total_requests_in_progress: localityStats.callsInProgress,
});
localityStats.callsStarted = 0;
localityStats.callsSucceeded = 0;
localityStats.callsFailed = 0;
}
}
const droppedRequests: _envoy_api_v2_endpoint_ClusterStats_DroppedRequests[] = [];
let totalDroppedRequests = 0;
for (const [category, count] of stats.callsDropped.entries()) {
if (count > 0) {
droppedRequests.push({
category,
dropped_count: count,
});
totalDroppedRequests += count;
}
}
// Clear out dropped call stats after sending them
stats.callsDropped.clear();
const interval = process.hrtime(stats.intervalStart);
stats.intervalStart = process.hrtime();
// Skip clusters with 0 requests
if (upstreamLocalityStats.length > 0 || totalDroppedRequests > 0) {
clusterStats.push({
cluster_name: clusterName,
cluster_service_name: edsServiceName,
dropped_requests: droppedRequests,
total_dropped_requests: totalDroppedRequests,
upstream_locality_stats: upstreamLocalityStats,
load_report_interval: {
seconds: interval[0],
nanos: interval[1],
},
});
}
}
}
this.lrsCall.write({
node: this.lrsNode!,
cluster_stats: clusterStats,
});
}
addEndpointWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
@ -553,9 +791,41 @@ export class XdsClient {
}
}
/**
*
* @param lrsServer The target name of the server to send stats to. An empty
* string indicates that the default LRS client should be used. Currently
* only the empty string is supported here.
* @param clusterName
* @param edsServiceName
*/
addClusterDropStats(
lrsServer: string,
clusterName: string,
edsServiceName: string
): XdsClusterDropStats {
if (lrsServer !== '') {
return {
addCallDropped: category => {}
};
}
const clusterStats = this.clusterStatsMap.getOrCreate(
clusterName,
edsServiceName
);
return {
addCallDropped: (category) => {
const prevCount = clusterStats.callsDropped.get(category) ?? 0;
clusterStats.callsDropped.set(category, prevCount + 1);
},
};
}
shutdown(): void {
this.adsCall?.cancel();
this.client?.close();
this.adsClient?.close();
this.lrsCall?.cancel();
this.lrsClient?.close();
this.hasShutdown = true;
}
}