Merge pull request #2568 from murgatroid99/grpc-js-xds_ring_hash

grpc-js-xds: Implement ring_hash LB policy
This commit is contained in:
Michael Lumish 2023-09-13 12:56:39 -07:00 committed by GitHub
commit f68ceaa44e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 1746 additions and 218 deletions

View File

@ -63,6 +63,9 @@ const compile = checkTask(() => execNpmCommand('compile'));
const runTests = checkTask(() => {
process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION = 'true';
process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG = 'true';
if (Number(process.versions.node.split('.')[0]) > 14) {
process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH = 'true';
}
return gulp.src(`${outDir}/test/**/*.js`)
.pipe(mocha({reporter: 'mocha-jenkins-reporter',
require: ['ts-node/register']}));

View File

@ -33,6 +33,6 @@ COPY --from=build /node/src/grpc-node/packages/grpc-js ./packages/grpc-js/
COPY --from=build /node/src/grpc-node/packages/grpc-js-xds ./packages/grpc-js-xds/
ENV GRPC_VERBOSITY="DEBUG"
ENV GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager,cds_balancer,xds_cluster_resolver,xds_cluster_impl,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds,outlier_detection,server,server_call
ENV GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager,cds_balancer,xds_cluster_resolver,xds_cluster_impl,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds,outlier_detection,server,server_call,ring_hash
ENTRYPOINT [ "/nodejs/bin/node", "/node/src/grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client" ]

View File

@ -467,9 +467,11 @@ function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpc
makeSingleRequest(client, callType, failOnFailedRpcs, callStatsTracker, callStartTimestampsTrackers[callType]);
}
}, 1000/qps);
setInterval(() => {
console.log(`Accumulated stats: ${JSON.stringify(accumulatedStats, undefined, 2)}`);
}, 1000);
if (VERBOSITY >= 2) {
setInterval(() => {
console.log(`Accumulated stats: ${JSON.stringify(accumulatedStats, undefined, 2)}`);
}, 1000);
}
}
const callTypeEnumMap = {

View File

@ -12,7 +12,7 @@
"prepare": "npm run compile",
"pretest": "npm run compile",
"posttest": "npm run check",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto",
"generate-interop-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O interop/generated --grpcLib @grpc/grpc-js grpc/testing/test.proto",
"generate-test-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O test/generated --grpcLib @grpc/grpc-js grpc/testing/echo.proto"
},
@ -38,15 +38,16 @@
"@types/mocha": "^5.2.6",
"@types/node": "^13.11.1",
"@types/yargs": "^15.0.5",
"gts": "^2.0.2",
"typescript": "^3.8.3",
"gts": "^5.0.1",
"typescript": "^4.9.5",
"yargs": "^15.4.1"
},
"dependencies": {
"@grpc/proto-loader": "^0.6.0",
"google-auth-library": "^7.0.2",
"re2-wasm": "^1.0.1",
"vscode-uri": "^3.0.7"
"vscode-uri": "^3.0.7",
"xxhash-wasm": "^1.0.2"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.8.0"

View File

@ -166,6 +166,7 @@ main() {
cd "${TEST_DRIVER_FULL_DIR}"
local failed_tests=0
test_suites=(
"affinity_test"
"api_listener_test"
"baseline_test"
"change_backend_service_test"

View File

@ -20,3 +20,4 @@ export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENA
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
export const EXPERIMENTAL_FEDERATION = (process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION ?? 'false') === 'true';
export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG ?? 'false') === 'true';
export const EXPERIMENTAL_RING_HASH = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH ?? 'false') === 'true';

View File

@ -0,0 +1,67 @@
// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/common/v3/common.proto
import type { UInt32Value as _google_protobuf_UInt32Value, UInt32Value__Output as _google_protobuf_UInt32Value__Output } from '../../../../../google/protobuf/UInt32Value';
/**
* Common Configuration for all consistent hashing load balancers (MaglevLb, RingHashLb, etc.)
*/
export interface ConsistentHashingLbConfig {
/**
* If set to ``true``, the cluster will use hostname instead of the resolved
* address as the key to consistently hash to an upstream host. Only valid for StrictDNS clusters with hostnames which resolve to a single IP address.
*/
'use_hostname_for_hashing'?: (boolean);
/**
* Configures percentage of average cluster load to bound per upstream host. For example, with a value of 150
* no upstream host will get a load more than 1.5 times the average load of all the hosts in the cluster.
* If not specified, the load is not bounded for any upstream host. Typical value for this parameter is between 120 and 200.
* Minimum is 100.
*
* Applies to both Ring Hash and Maglev load balancers.
*
* This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350. For the specified
* ``hash_balance_factor``, requests to any upstream host are capped at ``hash_balance_factor/100`` times the average number of requests
* across the cluster. When a request arrives for an upstream host that is currently serving at its max capacity, linear probing
* is used to identify an eligible host. Further, the linear probe is implemented using a random jump in hosts ring/table to identify
* the eligible host (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump avoids the
* cascading overflow effect when choosing the next host in the ring/table).
*
* If weights are specified on the hosts, they are respected.
*
* This is an O(N) algorithm, unlike other load balancers. Using a lower ``hash_balance_factor`` results in more hosts
* being probed, so use a higher value if you require better performance.
*/
'hash_balance_factor'?: (_google_protobuf_UInt32Value | null);
}
/**
* Common Configuration for all consistent hashing load balancers (MaglevLb, RingHashLb, etc.)
*/
export interface ConsistentHashingLbConfig__Output {
/**
* If set to ``true``, the cluster will use hostname instead of the resolved
* address as the key to consistently hash to an upstream host. Only valid for StrictDNS clusters with hostnames which resolve to a single IP address.
*/
'use_hostname_for_hashing': (boolean);
/**
* Configures percentage of average cluster load to bound per upstream host. For example, with a value of 150
* no upstream host will get a load more than 1.5 times the average load of all the hosts in the cluster.
* If not specified, the load is not bounded for any upstream host. Typical value for this parameter is between 120 and 200.
* Minimum is 100.
*
* Applies to both Ring Hash and Maglev load balancers.
*
* This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350. For the specified
* ``hash_balance_factor``, requests to any upstream host are capped at ``hash_balance_factor/100`` times the average number of requests
* across the cluster. When a request arrives for an upstream host that is currently serving at its max capacity, linear probing
* is used to identify an eligible host. Further, the linear probe is implemented using a random jump in hosts ring/table to identify
* the eligible host (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump avoids the
* cascading overflow effect when choosing the next host in the ring/table).
*
* If weights are specified on the hosts, they are respected.
*
* This is an O(N) algorithm, unlike other load balancers. Using a lower ``hash_balance_factor`` results in more hosts
* being probed, so use a higher value if you require better performance.
*/
'hash_balance_factor': (_google_protobuf_UInt32Value__Output | null);
}

View File

@ -0,0 +1,101 @@
// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/common/v3/common.proto
import type { Percent as _envoy_type_v3_Percent, Percent__Output as _envoy_type_v3_Percent__Output } from '../../../../../envoy/type/v3/Percent';
import type { UInt64Value as _google_protobuf_UInt64Value, UInt64Value__Output as _google_protobuf_UInt64Value__Output } from '../../../../../google/protobuf/UInt64Value';
import type { Long } from '@grpc/proto-loader';
/**
* Configuration for :ref:`locality weighted load balancing
* <arch_overview_load_balancing_locality_weighted_lb>`
*/
export interface _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig {
}
/**
* Configuration for :ref:`locality weighted load balancing
* <arch_overview_load_balancing_locality_weighted_lb>`
*/
export interface _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig__Output {
}
/**
* Configuration for :ref:`zone aware routing
* <arch_overview_load_balancing_zone_aware_routing>`.
*/
export interface _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_ZoneAwareLbConfig {
/**
* Configures percentage of requests that will be considered for zone aware routing
* if zone aware routing is configured. If not specified, the default is 100%.
* * :ref:`runtime values <config_cluster_manager_cluster_runtime_zone_routing>`.
* * :ref:`Zone aware routing support <arch_overview_load_balancing_zone_aware_routing>`.
*/
'routing_enabled'?: (_envoy_type_v3_Percent | null);
/**
* Configures minimum upstream cluster size required for zone aware routing
* If upstream cluster size is less than specified, zone aware routing is not performed
* even if zone aware routing is configured. If not specified, the default is 6.
* * :ref:`runtime values <config_cluster_manager_cluster_runtime_zone_routing>`.
* * :ref:`Zone aware routing support <arch_overview_load_balancing_zone_aware_routing>`.
*/
'min_cluster_size'?: (_google_protobuf_UInt64Value | null);
/**
* If set to true, Envoy will not consider any hosts when the cluster is in :ref:`panic
* mode<arch_overview_load_balancing_panic_threshold>`. Instead, the cluster will fail all
* requests as if all hosts are unhealthy. This can help avoid potentially overwhelming a
* failing service.
*/
'fail_traffic_on_panic'?: (boolean);
}
/**
* Configuration for :ref:`zone aware routing
* <arch_overview_load_balancing_zone_aware_routing>`.
*/
export interface _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_ZoneAwareLbConfig__Output {
/**
* Configures percentage of requests that will be considered for zone aware routing
* if zone aware routing is configured. If not specified, the default is 100%.
* * :ref:`runtime values <config_cluster_manager_cluster_runtime_zone_routing>`.
* * :ref:`Zone aware routing support <arch_overview_load_balancing_zone_aware_routing>`.
*/
'routing_enabled': (_envoy_type_v3_Percent__Output | null);
/**
* Configures minimum upstream cluster size required for zone aware routing
* If upstream cluster size is less than specified, zone aware routing is not performed
* even if zone aware routing is configured. If not specified, the default is 6.
* * :ref:`runtime values <config_cluster_manager_cluster_runtime_zone_routing>`.
* * :ref:`Zone aware routing support <arch_overview_load_balancing_zone_aware_routing>`.
*/
'min_cluster_size': (_google_protobuf_UInt64Value__Output | null);
/**
* If set to true, Envoy will not consider any hosts when the cluster is in :ref:`panic
* mode<arch_overview_load_balancing_panic_threshold>`. Instead, the cluster will fail all
* requests as if all hosts are unhealthy. This can help avoid potentially overwhelming a
* failing service.
*/
'fail_traffic_on_panic': (boolean);
}
export interface LocalityLbConfig {
/**
* Configuration for local zone aware load balancing.
*/
'zone_aware_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_ZoneAwareLbConfig | null);
/**
* Enable locality weighted load balancing.
*/
'locality_weighted_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig | null);
'locality_config_specifier'?: "zone_aware_lb_config"|"locality_weighted_lb_config";
}
export interface LocalityLbConfig__Output {
/**
* Configuration for local zone aware load balancing.
*/
'zone_aware_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_ZoneAwareLbConfig__Output | null);
/**
* Enable locality weighted load balancing.
*/
'locality_weighted_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig__Output | null);
'locality_config_specifier': "zone_aware_lb_config"|"locality_weighted_lb_config";
}

View File

@ -0,0 +1,71 @@
// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/common/v3/common.proto
import type { Duration as _google_protobuf_Duration, Duration__Output as _google_protobuf_Duration__Output } from '../../../../../google/protobuf/Duration';
import type { RuntimeDouble as _envoy_config_core_v3_RuntimeDouble, RuntimeDouble__Output as _envoy_config_core_v3_RuntimeDouble__Output } from '../../../../../envoy/config/core/v3/RuntimeDouble';
import type { Percent as _envoy_type_v3_Percent, Percent__Output as _envoy_type_v3_Percent__Output } from '../../../../../envoy/type/v3/Percent';
/**
* Configuration for :ref:`slow start mode <arch_overview_load_balancing_slow_start>`.
*/
export interface SlowStartConfig {
/**
* Represents the size of slow start window.
* If set, the newly created host remains in slow start mode starting from its creation time
* for the duration of slow start window.
*/
'slow_start_window'?: (_google_protobuf_Duration | null);
/**
* This parameter controls the speed of traffic increase over the slow start window. Defaults to 1.0,
* so that endpoint would get linearly increasing amount of traffic.
* When increasing the value for this parameter, the speed of traffic ramp-up increases non-linearly.
* The value of aggression parameter should be greater than 0.0.
* By tuning the parameter, is possible to achieve polynomial or exponential shape of ramp-up curve.
*
* During slow start window, effective weight of an endpoint would be scaled with time factor and aggression:
* ``new_weight = weight * max(min_weight_percent, time_factor ^ (1 / aggression))``,
* where ``time_factor=(time_since_start_seconds / slow_start_time_seconds)``.
*
* As time progresses, more and more traffic would be sent to endpoint, which is in slow start window.
* Once host exits slow start, time_factor and aggression no longer affect its weight.
*/
'aggression'?: (_envoy_config_core_v3_RuntimeDouble | null);
/**
* Configures the minimum percentage of origin weight that avoids too small new weight,
* which may cause endpoints in slow start mode receive no traffic in slow start window.
* If not specified, the default is 10%.
*/
'min_weight_percent'?: (_envoy_type_v3_Percent | null);
}
/**
* Configuration for :ref:`slow start mode <arch_overview_load_balancing_slow_start>`.
*/
export interface SlowStartConfig__Output {
/**
* Represents the size of slow start window.
* If set, the newly created host remains in slow start mode starting from its creation time
* for the duration of slow start window.
*/
'slow_start_window': (_google_protobuf_Duration__Output | null);
/**
* This parameter controls the speed of traffic increase over the slow start window. Defaults to 1.0,
* so that endpoint would get linearly increasing amount of traffic.
* When increasing the value for this parameter, the speed of traffic ramp-up increases non-linearly.
* The value of aggression parameter should be greater than 0.0.
* By tuning the parameter, is possible to achieve polynomial or exponential shape of ramp-up curve.
*
* During slow start window, effective weight of an endpoint would be scaled with time factor and aggression:
* ``new_weight = weight * max(min_weight_percent, time_factor ^ (1 / aggression))``,
* where ``time_factor=(time_since_start_seconds / slow_start_time_seconds)``.
*
* As time progresses, more and more traffic would be sent to endpoint, which is in slow start window.
* Once host exits slow start, time_factor and aggression no longer affect its weight.
*/
'aggression': (_envoy_config_core_v3_RuntimeDouble__Output | null);
/**
* Configures the minimum percentage of origin weight that avoids too small new weight,
* which may cause endpoints in slow start mode receive no traffic in slow start window.
* If not specified, the default is 10%.
*/
'min_weight_percent': (_envoy_type_v3_Percent__Output | null);
}

View File

@ -0,0 +1,163 @@
// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto
import type { UInt64Value as _google_protobuf_UInt64Value, UInt64Value__Output as _google_protobuf_UInt64Value__Output } from '../../../../../google/protobuf/UInt64Value';
import type { UInt32Value as _google_protobuf_UInt32Value, UInt32Value__Output as _google_protobuf_UInt32Value__Output } from '../../../../../google/protobuf/UInt32Value';
import type { ConsistentHashingLbConfig as _envoy_extensions_load_balancing_policies_common_v3_ConsistentHashingLbConfig, ConsistentHashingLbConfig__Output as _envoy_extensions_load_balancing_policies_common_v3_ConsistentHashingLbConfig__Output } from '../../../../../envoy/extensions/load_balancing_policies/common/v3/ConsistentHashingLbConfig';
import type { _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig, _envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig__Output } from '../../../../../envoy/extensions/load_balancing_policies/common/v3/LocalityLbConfig';
import type { Long } from '@grpc/proto-loader';
// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto
/**
* The hash function used to hash hosts onto the ketama ring.
*/
export enum _envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_HashFunction {
/**
* Currently defaults to XX_HASH.
*/
DEFAULT_HASH = 0,
/**
* Use `xxHash <https://github.com/Cyan4973/xxHash>`_.
*/
XX_HASH = 1,
/**
* Use `MurmurHash2 <https://sites.google.com/site/murmurhash/>`_, this is compatible with
* std:hash<string> in GNU libstdc++ 3.4.20 or above. This is typically the case when compiled
* on Linux and not macOS.
*/
MURMUR_HASH_2 = 2,
}
/**
* This configuration allows the built-in RING_HASH LB policy to be configured via the LB policy
* extension point. See the :ref:`load balancing architecture overview
* <arch_overview_load_balancing_types>` for more information.
* [#next-free-field: 8]
*/
export interface RingHash {
/**
* The hash function used to hash hosts onto the ketama ring. The value defaults to
* :ref:`XX_HASH<envoy_v3_api_enum_value_config.cluster.v3.Cluster.RingHashLbConfig.HashFunction.XX_HASH>`.
*/
'hash_function'?: (_envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_HashFunction | keyof typeof _envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_HashFunction);
/**
* Minimum hash ring size. The larger the ring is (that is, the more hashes there are for each
* provided host) the better the request distribution will reflect the desired weights. Defaults
* to 1024 entries, and limited to 8M entries. See also
* :ref:`maximum_ring_size<envoy_v3_api_field_config.cluster.v3.Cluster.RingHashLbConfig.maximum_ring_size>`.
*/
'minimum_ring_size'?: (_google_protobuf_UInt64Value | null);
/**
* Maximum hash ring size. Defaults to 8M entries, and limited to 8M entries, but can be lowered
* to further constrain resource use. See also
* :ref:`minimum_ring_size<envoy_v3_api_field_config.cluster.v3.Cluster.RingHashLbConfig.minimum_ring_size>`.
*/
'maximum_ring_size'?: (_google_protobuf_UInt64Value | null);
/**
* If set to `true`, the cluster will use hostname instead of the resolved
* address as the key to consistently hash to an upstream host. Only valid for StrictDNS clusters with hostnames which resolve to a single IP address.
*
* ..note::
* This is deprecated and please use :ref:`consistent_hashing_lb_config
* <envoy_v3_api_field_extensions.load_balancing_policies.ring_hash.v3.RingHash.consistent_hashing_lb_config>` instead.
*/
'use_hostname_for_hashing'?: (boolean);
/**
* Configures percentage of average cluster load to bound per upstream host. For example, with a value of 150
* no upstream host will get a load more than 1.5 times the average load of all the hosts in the cluster.
* If not specified, the load is not bounded for any upstream host. Typical value for this parameter is between 120 and 200.
* Minimum is 100.
*
* This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350. For the specified
* `hash_balance_factor`, requests to any upstream host are capped at `hash_balance_factor/100` times the average number of requests
* across the cluster. When a request arrives for an upstream host that is currently serving at its max capacity, linear probing
* is used to identify an eligible host. Further, the linear probe is implemented using a random jump in hosts ring/table to identify
* the eligible host (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump avoids the
* cascading overflow effect when choosing the next host in the ring/table).
*
* If weights are specified on the hosts, they are respected.
*
* This is an O(N) algorithm, unlike other load balancers. Using a lower `hash_balance_factor` results in more hosts
* being probed, so use a higher value if you require better performance.
*
* ..note::
* This is deprecated and please use :ref:`consistent_hashing_lb_config
* <envoy_v3_api_field_extensions.load_balancing_policies.ring_hash.v3.RingHash.consistent_hashing_lb_config>` instead.
*/
'hash_balance_factor'?: (_google_protobuf_UInt32Value | null);
/**
* Common configuration for hashing-based load balancing policies.
*/
'consistent_hashing_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_ConsistentHashingLbConfig | null);
/**
* Enable locality weighted load balancing for ring hash lb explicitly.
*/
'locality_weighted_lb_config'?: (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig | null);
}
/**
* This configuration allows the built-in RING_HASH LB policy to be configured via the LB policy
* extension point. See the :ref:`load balancing architecture overview
* <arch_overview_load_balancing_types>` for more information.
* [#next-free-field: 8]
*/
export interface RingHash__Output {
/**
* The hash function used to hash hosts onto the ketama ring. The value defaults to
* :ref:`XX_HASH<envoy_v3_api_enum_value_config.cluster.v3.Cluster.RingHashLbConfig.HashFunction.XX_HASH>`.
*/
'hash_function': (keyof typeof _envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_HashFunction);
/**
* Minimum hash ring size. The larger the ring is (that is, the more hashes there are for each
* provided host) the better the request distribution will reflect the desired weights. Defaults
* to 1024 entries, and limited to 8M entries. See also
* :ref:`maximum_ring_size<envoy_v3_api_field_config.cluster.v3.Cluster.RingHashLbConfig.maximum_ring_size>`.
*/
'minimum_ring_size': (_google_protobuf_UInt64Value__Output | null);
/**
* Maximum hash ring size. Defaults to 8M entries, and limited to 8M entries, but can be lowered
* to further constrain resource use. See also
* :ref:`minimum_ring_size<envoy_v3_api_field_config.cluster.v3.Cluster.RingHashLbConfig.minimum_ring_size>`.
*/
'maximum_ring_size': (_google_protobuf_UInt64Value__Output | null);
/**
* If set to `true`, the cluster will use hostname instead of the resolved
* address as the key to consistently hash to an upstream host. Only valid for StrictDNS clusters with hostnames which resolve to a single IP address.
*
* ..note::
* This is deprecated and please use :ref:`consistent_hashing_lb_config
* <envoy_v3_api_field_extensions.load_balancing_policies.ring_hash.v3.RingHash.consistent_hashing_lb_config>` instead.
*/
'use_hostname_for_hashing': (boolean);
/**
* Configures percentage of average cluster load to bound per upstream host. For example, with a value of 150
* no upstream host will get a load more than 1.5 times the average load of all the hosts in the cluster.
* If not specified, the load is not bounded for any upstream host. Typical value for this parameter is between 120 and 200.
* Minimum is 100.
*
* This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350. For the specified
* `hash_balance_factor`, requests to any upstream host are capped at `hash_balance_factor/100` times the average number of requests
* across the cluster. When a request arrives for an upstream host that is currently serving at its max capacity, linear probing
* is used to identify an eligible host. Further, the linear probe is implemented using a random jump in hosts ring/table to identify
* the eligible host (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump avoids the
* cascading overflow effect when choosing the next host in the ring/table).
*
* If weights are specified on the hosts, they are respected.
*
* This is an O(N) algorithm, unlike other load balancers. Using a lower `hash_balance_factor` results in more hosts
* being probed, so use a higher value if you require better performance.
*
* ..note::
* This is deprecated and please use :ref:`consistent_hashing_lb_config
* <envoy_v3_api_field_extensions.load_balancing_policies.ring_hash.v3.RingHash.consistent_hashing_lb_config>` instead.
*/
'hash_balance_factor': (_google_protobuf_UInt32Value__Output | null);
/**
* Common configuration for hashing-based load balancing policies.
*/
'consistent_hashing_lb_config': (_envoy_extensions_load_balancing_policies_common_v3_ConsistentHashingLbConfig__Output | null);
/**
* Enable locality weighted load balancing for ring hash lb explicitly.
*/
'locality_weighted_lb_config': (_envoy_extensions_load_balancing_policies_common_v3_LocalityLbConfig_LocalityWeightedLbConfig__Output | null);
}

View File

@ -2,7 +2,6 @@
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
import type { FieldRules as _validate_FieldRules, FieldRules__Output as _validate_FieldRules__Output } from '../../validate/FieldRules';
import type { FieldSecurityAnnotation as _udpa_annotations_FieldSecurityAnnotation, FieldSecurityAnnotation__Output as _udpa_annotations_FieldSecurityAnnotation__Output } from '../../udpa/annotations/FieldSecurityAnnotation';
import type { FieldMigrateAnnotation as _udpa_annotations_FieldMigrateAnnotation, FieldMigrateAnnotation__Output as _udpa_annotations_FieldMigrateAnnotation__Output } from '../../udpa/annotations/FieldMigrateAnnotation';
import type { FieldStatusAnnotation as _xds_annotations_v3_FieldStatusAnnotation, FieldStatusAnnotation__Output as _xds_annotations_v3_FieldStatusAnnotation__Output } from '../../xds/annotations/v3/FieldStatusAnnotation';
@ -31,8 +30,6 @@ export interface FieldOptions {
'weak'?: (boolean);
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
'.validate.rules'?: (_validate_FieldRules | null);
'.udpa.annotations.security'?: (_udpa_annotations_FieldSecurityAnnotation | null);
'.udpa.annotations.sensitive'?: (boolean);
'.envoy.annotations.deprecated_at_minor_version'?: (string);
'.udpa.annotations.field_migrate'?: (_udpa_annotations_FieldMigrateAnnotation | null);
'.envoy.annotations.disallowed_by_default'?: (boolean);
@ -48,8 +45,6 @@ export interface FieldOptions__Output {
'weak': (boolean);
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
'.validate.rules': (_validate_FieldRules__Output | null);
'.udpa.annotations.security': (_udpa_annotations_FieldSecurityAnnotation__Output | null);
'.udpa.annotations.sensitive': (boolean);
'.envoy.annotations.deprecated_at_minor_version': (string);
'.udpa.annotations.field_migrate': (_udpa_annotations_FieldMigrateAnnotation__Output | null);
'.envoy.annotations.disallowed_by_default': (boolean);

View File

@ -0,0 +1,172 @@
import type * as grpc from '@grpc/grpc-js';
import type { EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader';
type SubtypeConstructor<Constructor extends new (...args: any) => any, Subtype> = {
new(...args: ConstructorParameters<Constructor>): Subtype;
};
export interface ProtoGrpcType {
envoy: {
annotations: {
}
config: {
core: {
v3: {
Address: MessageTypeDefinition
AsyncDataSource: MessageTypeDefinition
BackoffStrategy: MessageTypeDefinition
BindConfig: MessageTypeDefinition
BuildVersion: MessageTypeDefinition
CidrRange: MessageTypeDefinition
ControlPlane: MessageTypeDefinition
DataSource: MessageTypeDefinition
EnvoyInternalAddress: MessageTypeDefinition
Extension: MessageTypeDefinition
ExtraSourceAddress: MessageTypeDefinition
HeaderMap: MessageTypeDefinition
HeaderValue: MessageTypeDefinition
HeaderValueOption: MessageTypeDefinition
HttpUri: MessageTypeDefinition
Locality: MessageTypeDefinition
Metadata: MessageTypeDefinition
Node: MessageTypeDefinition
Pipe: MessageTypeDefinition
QueryParameter: MessageTypeDefinition
RemoteDataSource: MessageTypeDefinition
RequestMethod: EnumTypeDefinition
RetryPolicy: MessageTypeDefinition
RoutingPriority: EnumTypeDefinition
RuntimeDouble: MessageTypeDefinition
RuntimeFeatureFlag: MessageTypeDefinition
RuntimeFractionalPercent: MessageTypeDefinition
RuntimePercent: MessageTypeDefinition
RuntimeUInt32: MessageTypeDefinition
SocketAddress: MessageTypeDefinition
SocketOption: MessageTypeDefinition
SocketOptionsOverride: MessageTypeDefinition
TcpKeepalive: MessageTypeDefinition
TrafficDirection: EnumTypeDefinition
TransportSocket: MessageTypeDefinition
WatchedDirectory: MessageTypeDefinition
}
}
}
extensions: {
load_balancing_policies: {
common: {
v3: {
ConsistentHashingLbConfig: MessageTypeDefinition
LocalityLbConfig: MessageTypeDefinition
SlowStartConfig: MessageTypeDefinition
}
}
ring_hash: {
v3: {
RingHash: MessageTypeDefinition
}
}
}
}
type: {
v3: {
FractionalPercent: MessageTypeDefinition
Percent: MessageTypeDefinition
SemanticVersion: MessageTypeDefinition
}
}
}
google: {
protobuf: {
Any: MessageTypeDefinition
BoolValue: MessageTypeDefinition
BytesValue: MessageTypeDefinition
DescriptorProto: MessageTypeDefinition
DoubleValue: MessageTypeDefinition
Duration: MessageTypeDefinition
EnumDescriptorProto: MessageTypeDefinition
EnumOptions: MessageTypeDefinition
EnumValueDescriptorProto: MessageTypeDefinition
EnumValueOptions: MessageTypeDefinition
FieldDescriptorProto: MessageTypeDefinition
FieldOptions: MessageTypeDefinition
FileDescriptorProto: MessageTypeDefinition
FileDescriptorSet: MessageTypeDefinition
FileOptions: MessageTypeDefinition
FloatValue: MessageTypeDefinition
GeneratedCodeInfo: MessageTypeDefinition
Int32Value: MessageTypeDefinition
Int64Value: MessageTypeDefinition
ListValue: MessageTypeDefinition
MessageOptions: MessageTypeDefinition
MethodDescriptorProto: MessageTypeDefinition
MethodOptions: MessageTypeDefinition
NullValue: EnumTypeDefinition
OneofDescriptorProto: MessageTypeDefinition
OneofOptions: MessageTypeDefinition
ServiceDescriptorProto: MessageTypeDefinition
ServiceOptions: MessageTypeDefinition
SourceCodeInfo: MessageTypeDefinition
StringValue: MessageTypeDefinition
Struct: MessageTypeDefinition
Timestamp: MessageTypeDefinition
UInt32Value: MessageTypeDefinition
UInt64Value: MessageTypeDefinition
UninterpretedOption: MessageTypeDefinition
Value: MessageTypeDefinition
}
}
udpa: {
annotations: {
FieldMigrateAnnotation: MessageTypeDefinition
FileMigrateAnnotation: MessageTypeDefinition
MigrateAnnotation: MessageTypeDefinition
PackageVersionStatus: EnumTypeDefinition
StatusAnnotation: MessageTypeDefinition
VersioningAnnotation: MessageTypeDefinition
}
}
validate: {
AnyRules: MessageTypeDefinition
BoolRules: MessageTypeDefinition
BytesRules: MessageTypeDefinition
DoubleRules: MessageTypeDefinition
DurationRules: MessageTypeDefinition
EnumRules: MessageTypeDefinition
FieldRules: MessageTypeDefinition
Fixed32Rules: MessageTypeDefinition
Fixed64Rules: MessageTypeDefinition
FloatRules: MessageTypeDefinition
Int32Rules: MessageTypeDefinition
Int64Rules: MessageTypeDefinition
KnownRegex: EnumTypeDefinition
MapRules: MessageTypeDefinition
MessageRules: MessageTypeDefinition
RepeatedRules: MessageTypeDefinition
SFixed32Rules: MessageTypeDefinition
SFixed64Rules: MessageTypeDefinition
SInt32Rules: MessageTypeDefinition
SInt64Rules: MessageTypeDefinition
StringRules: MessageTypeDefinition
TimestampRules: MessageTypeDefinition
UInt32Rules: MessageTypeDefinition
UInt64Rules: MessageTypeDefinition
}
xds: {
annotations: {
v3: {
FieldStatusAnnotation: MessageTypeDefinition
FileStatusAnnotation: MessageTypeDefinition
MessageStatusAnnotation: MessageTypeDefinition
PackageVersionStatus: EnumTypeDefinition
StatusAnnotation: MessageTypeDefinition
}
}
core: {
v3: {
ContextParams: MessageTypeDefinition
}
}
}
}

View File

@ -116,7 +116,7 @@ export function validateTopLevelFilter(httpFilter: HttpFilter__Output): boolean
try {
typeUrl = getTopLevelFilterUrl(encodedConfig);
} catch (e) {
trace(httpFilter.name + ' validation failed with error ' + e.message);
trace(httpFilter.name + ' validation failed with error ' + (e as Error).message);
return false;
}
const registryEntry = FILTER_REGISTRY.get(typeUrl);
@ -243,4 +243,4 @@ export function createHttpFilter(config: HttpFilterConfig, overrideConfig?: Http
} else {
return null;
}
}
}

View File

@ -23,6 +23,7 @@ import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager';
import * as xds_wrr_locality from './load-balancer-xds-wrr-locality';
import * as ring_hash from './load-balancer-ring-hash';
import * as router_filter from './http-filter/router-filter';
import * as fault_injection_filter from './http-filter/fault-injection-filter';
import * as csds from './csds';
@ -41,6 +42,7 @@ export function register() {
load_balancer_weighted_target.setup();
load_balancer_xds_cluster_manager.setup();
xds_wrr_locality.setup();
ring_hash.setup();
router_filter.setup();
fault_injection_filter.setup();
csds.setup();

View File

@ -41,9 +41,26 @@ const DEFAULT_FAILOVER_TIME_MS = 10_000;
const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000;
export interface LocalityEndpoint extends Endpoint {
/**
* A sequence of strings that determines how to divide endpoints up in priority and
* weighted_target.
*/
localityPath: string[];
/**
* The locality this endpoint is in. Used in wrr_locality and xds_cluster_impl.
*/
locality: Locality__Output;
weight: number;
/**
* The load balancing weight for the entire locality that contains this
* endpoint. Used in xds_wrr_locality.
*/
localityWeight: number;
/**
* The overall load balancing weight for this endpoint, calculated as the
* product of the load balancing weight for this endpoint within its locality
* and the load balancing weight of the locality. Used in ring_hash.
*/
endpointWeight: number;
};
export function isLocalityEndpoint(
@ -317,7 +334,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
* so that when the picker calls exitIdle, that in turn calls exitIdle on
* the PriorityChildImpl, which will start the failover timer. */
if (state === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
picker = new QueuePicker(this, picker);
}
this.channelControlHelper.updateState(state, picker);
}

View File

@ -0,0 +1,507 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { experimental, logVerbosity, connectivityState, status, Metadata, ChannelOptions, LoadBalancingConfig } from '@grpc/grpc-js';
import { isLocalityEndpoint } from './load-balancer-priority';
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import LeafLoadBalancer = experimental.LeafLoadBalancer;
import Endpoint = experimental.Endpoint;
import Picker = experimental.Picker;
import PickArgs = experimental.PickArgs;
import PickResult = experimental.PickResult;
import PickResultType = experimental.PickResultType;
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import createChildChannelControlHelper = experimental.createChildChannelControlHelper;
import UnavailablePicker = experimental.UnavailablePicker;
import subchannelAddressToString = experimental.subchannelAddressToString;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import EndpointMap = experimental.EndpointMap;
import { loadXxhashApi, xxhashApi } from './xxhash';
import { EXPERIMENTAL_RING_HASH } from './environment';
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
import { RingHash__Output } from './generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash';
import { Any__Output } from './generated/google/protobuf/Any';
import { TypedExtensionConfig__Output } from './generated/envoy/config/core/v3/TypedExtensionConfig';
import { LoadBalancingPolicy__Output } from './generated/envoy/config/cluster/v3/LoadBalancingPolicy';
import { registerLbPolicy } from './lb-policy-registry';
const TRACER_NAME = 'ring_hash';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const TYPE_NAME = 'ring_hash';
const DEFAULT_MIN_RING_SIZE = 1024;
const DEFAULT_MAX_RING_SIZE = 4096;
const ABSOLUTE_MAX_RING_SIZE = 8_388_608;
const DEFAULT_RING_SIZE_CAP = 4096;
class RingHashLoadBalancingConfig implements TypedLoadBalancingConfig {
private minRingSize: number;
private maxRingSize: number;
constructor(minRingSize?: number, maxRingSize?: number) {
this.minRingSize = Math.min(
minRingSize ?? DEFAULT_MIN_RING_SIZE,
ABSOLUTE_MAX_RING_SIZE
);
this.maxRingSize = Math.min(
maxRingSize ?? DEFAULT_MAX_RING_SIZE,
ABSOLUTE_MAX_RING_SIZE
);
}
getLoadBalancerName(): string {
return TYPE_NAME;
}
toJsonObject(): object {
return {
[TYPE_NAME]: {
min_ring_size: this.minRingSize,
max_ring_size: this.maxRingSize,
}
};
}
getMinRingSize() {
return this.minRingSize;
}
getMaxRingSize() {
return this.maxRingSize;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
static createFromJson(obj: any): TypedLoadBalancingConfig {
if ('min_ring_size' in obj) {
if (typeof obj.min_ring_size === 'number') {
if (obj.min_ring_size > ABSOLUTE_MAX_RING_SIZE) {
throw new Error(`ring_hash config field min_ring_size exceeds the cap of ${ABSOLUTE_MAX_RING_SIZE}: ${obj.min_ring_size}`);
}
} else {
throw new Error(
'ring_hash config field min_ring_size must be a number if provided'
);
}
}
if ('max_ring_size' in obj) {
if (typeof obj.max_ring_size === 'number') {
if (obj.max_ring_size > ABSOLUTE_MAX_RING_SIZE) {
throw new Error(`ring_hash config field max_ring_size exceeds the cap of ${ABSOLUTE_MAX_RING_SIZE}: ${obj.max_ring_size}`);
}
} else {
throw new Error(
'ring_hash config field max_ring_size must be a number if provided'
);
}
}
return new RingHashLoadBalancingConfig(
obj.min_ring_size,
obj.max_ring_size
);
}
}
interface RingEntry {
leafBalancer: LeafLoadBalancer;
hash: bigint;
}
interface EndpointWeight {
endpoint: Endpoint;
weight: number;
normalizedWeight: number;
}
class RingHashPicker implements Picker {
constructor(private ring: RingEntry[]) {}
/**
* Find the least index in the ring with a hash greater than or equal to the
* hash parameter, or 0 if no such index exists.
* @param hash
*/
private findIndexForHash(hash: bigint): number {
// Binary search to find the target index
let low = 0;
let high = this.ring.length;
let index = 0;
while (low <= high) {
/* Commonly in binary search, this operation can overflow and result in
* the wrong value. However, in this case the ring size is absolutely
* limtied to 1<<23, so low+high < MAX_SAFE_INTEGER */
index = Math.floor((low + high) / 2);
if (index === this.ring.length) {
index = 0;
break;
}
const midval = this.ring[index].hash;
const midval1 = index === 0 ? 0n : this.ring[index - 1].hash;
if (hash <= midval && hash > midval1) {
break;
}
if (midval < hash) {
low = index + 1;
} else {
high = index - 1;
}
if (low > high) {
index = 0;
break;
}
}
return index;
}
pick(pickArgs: PickArgs): PickResult {
trace('Pick called. Hash=' + pickArgs.extraPickInfo.hash);
const firstIndex = this.findIndexForHash(
BigInt(pickArgs.extraPickInfo.hash)
);
for (let i = 0; i < this.ring.length; i++) {
const index = (firstIndex + i) % this.ring.length;
const entryState = this.ring[index].leafBalancer.getConnectivityState();
if (entryState === connectivityState.READY) {
return this.ring[index].leafBalancer.getPicker().pick(pickArgs);
}
if (entryState === connectivityState.IDLE) {
this.ring[index].leafBalancer.startConnecting();
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
}
if (entryState === connectivityState.CONNECTING) {
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
}
}
return {
pickResultType: PickResultType.TRANSIENT_FAILURE,
status: {
code: status.UNAVAILABLE,
details:
'ring_hash: invalid state: all child balancers in TRANSIENT_FAILURE',
metadata: new Metadata(),
},
subchannel: null,
onCallStarted: null,
onCallEnded: null,
};
}
}
class RingHashLoadBalancer implements LoadBalancer {
/**
* Tracks endpoint repetition across address updates, to use an appropriate
* existing leaf load balancer for the same endpoint when possible.
*/
private leafMap = new EndpointMap<LeafLoadBalancer>();
/**
* Tracks endpoints from a single address update, with their associated
* weights aggregated from all weights associated with that endpoint in that
* update.
*/
private leafWeightMap = new EndpointMap<number>();
private childChannelControlHelper: ChannelControlHelper;
private updatesPaused = false;
private currentState: connectivityState = connectivityState.IDLE;
private ring: RingEntry[] = [];
private ringHashSizeCap = DEFAULT_RING_SIZE_CAP;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {
this.childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
{
updateState: (state, picker) => {
this.calculateAndUpdateState();
/* If this LB policy is in the TRANSIENT_FAILURE state, requests will
* not trigger new connections, so we need to explicitly try connecting
* to other endpoints that are currently IDLE to try to eventually
* connect to something. */
if (
state === connectivityState.TRANSIENT_FAILURE &&
this.currentState === connectivityState.TRANSIENT_FAILURE
) {
for (const leaf of this.leafMap.values()) {
const leafState = leaf.getConnectivityState();
if (leafState === connectivityState.CONNECTING) {
break;
}
if (leafState === connectivityState.IDLE) {
leaf.startConnecting();
break;
}
}
}
},
}
);
if (options['grpc.lb.ring_hash.ring_size_cap'] !== undefined) {
this.ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap'];
}
}
private calculateAndUpdateState() {
if (this.updatesPaused) {
return;
}
const stateCounts = {
[connectivityState.READY]: 0,
[connectivityState.TRANSIENT_FAILURE]: 0,
[connectivityState.CONNECTING]: 0,
[connectivityState.IDLE]: 0,
[connectivityState.SHUTDOWN]: 0,
};
for (const leaf of this.leafMap.values()) {
stateCounts[leaf.getConnectivityState()] += 1;
}
if (stateCounts[connectivityState.READY] > 0) {
this.updateState(connectivityState.READY, new RingHashPicker(this.ring));
// REPORT READY
} else if (stateCounts[connectivityState.TRANSIENT_FAILURE] > 1) {
this.updateState(
connectivityState.TRANSIENT_FAILURE,
new UnavailablePicker()
);
} else if (stateCounts[connectivityState.CONNECTING] > 0) {
this.updateState(
connectivityState.CONNECTING,
new RingHashPicker(this.ring)
);
} else if (
stateCounts[connectivityState.TRANSIENT_FAILURE] > 0 &&
this.leafMap.size > 1
) {
this.updateState(
connectivityState.CONNECTING,
new RingHashPicker(this.ring)
);
} else if (stateCounts[connectivityState.IDLE] > 0) {
this.updateState(connectivityState.IDLE, new RingHashPicker(this.ring));
} else {
this.updateState(
connectivityState.TRANSIENT_FAILURE,
new UnavailablePicker()
);
}
}
private updateState(newState: connectivityState, picker: Picker) {
trace(
connectivityState[this.currentState] +
' -> ' +
connectivityState[newState]
);
this.currentState = newState;
this.channelControlHelper.updateState(newState, picker);
}
private constructRing(
endpointList: Endpoint[],
config: RingHashLoadBalancingConfig
) {
this.ring = [];
const endpointWeights: EndpointWeight[] = [];
let weightSum = 0;
for (const endpoint of endpointList) {
const weight = this.leafWeightMap.get(endpoint) ?? 1;
endpointWeights.push({ endpoint, weight, normalizedWeight: 0 });
weightSum += weight;
}
/* The normalized weights sum to 1, with some small potential error due to
* the limitation of floating point precision. */
let minNormalizedWeight = 1;
for (const endpointWeight of endpointWeights) {
endpointWeight.normalizedWeight = endpointWeight.weight / weightSum;
minNormalizedWeight = Math.min(
endpointWeight.normalizedWeight,
minNormalizedWeight
);
}
const minRingSize = Math.min(config.getMinRingSize(), this.ringHashSizeCap);
const maxRingSize = Math.min(config.getMaxRingSize(), this.ringHashSizeCap);
/* Calculate a scale factor that meets the following conditions:
* 1. The result is between minRingSize and maxRingSize, inclusive
* 2. The smallest normalized weight is scaled to a whole number, if it
* does not violate the previous condition.
* The size of the ring is ceil(scale)
*/
const scale = Math.min(
Math.ceil(minNormalizedWeight * minRingSize) / minNormalizedWeight,
maxRingSize
);
trace('Creating a ring with size ' + Math.ceil(scale));
/* For each endpoint, create a number of entries proportional to its
* weight, such that the total number of entries is equal to ceil(scale).
*/
let currentHashes = 0;
let targetHashes = 0;
for (const endpointWeight of endpointWeights) {
const addressString = subchannelAddressToString(
endpointWeight.endpoint.addresses[0]
);
targetHashes += scale * endpointWeight.normalizedWeight;
const leafBalancer = this.leafMap.get(endpointWeight.endpoint);
if (!leafBalancer) {
throw new Error(
'ring_hash: Invalid state: endpoint found in leafWeightMap but not in leafMap'
);
}
let count = 0;
while (currentHashes < targetHashes) {
const hashKey = `${addressString}_${count}`;
const hash = xxhashApi!.h64(hashKey, 0n);
this.ring.push({ hash, leafBalancer });
currentHashes++;
count++;
}
}
/* The ring is sorted by the hash so that it can be efficiently searched
* for a hash that is closest to any arbitrary hash. */
this.ring.sort((a, b) => {
if (a.hash > b.hash) {
return 1;
} else if (a.hash < b.hash) {
return -1;
} else {
return 0;
}
});
}
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
if (!(lbConfig instanceof RingHashLoadBalancingConfig)) {
trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
this.updatesPaused = true;
this.leafWeightMap.clear();
const dedupedEndpointList: Endpoint[] = [];
for (const endpoint of endpointList) {
const leafBalancer = this.leafMap.get(endpoint);
if (leafBalancer) {
leafBalancer.updateEndpoint(endpoint);
} else {
this.leafMap.set(
endpoint,
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, this.options)
);
}
const weight = this.leafWeightMap.get(endpoint);
if (weight === undefined) {
dedupedEndpointList.push(endpoint);
}
this.leafWeightMap.set(endpoint, (weight ?? 0) + (isLocalityEndpoint(endpoint) ? endpoint.endpointWeight : 1));
}
const removedLeaves = this.leafMap.deleteMissing(endpointList);
for (const leaf of removedLeaves) {
leaf.destroy();
}
loadXxhashApi().then(() => {
this.constructRing(dedupedEndpointList, lbConfig);
this.updatesPaused = false;
this.calculateAndUpdateState();
});
}
exitIdle(): void {
/* This operation does not make sense here. We don't want to make the whole
* balancer exit idle, and instead propagate that to individual chlidren as
* relevant. */
}
resetBackoff(): void {
// There is no backoff to reset here
}
destroy(): void {
this.ring = [];
for (const child of this.leafMap.values()) {
child.destroy();
}
this.leafMap.clear();
this.leafWeightMap.clear();
}
getTypeName(): string {
return TYPE_NAME;
}
}
const RING_HASH_TYPE_URL = 'type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash';
const resourceRoot = loadProtosWithOptionsSync([
'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto'], {
keepCase: true,
includeDirs: [
// Paths are relative to src/build
__dirname + '/../../deps/envoy-api/',
__dirname + '/../../deps/xds/',
__dirname + '/../../deps/protoc-gen-validate'
],
}
);
const toObjectOptions = {
longs: String,
enums: String,
defaults: true,
oneofs: true
}
function decodeRingHash(message: Any__Output): RingHash__Output {
const name = message.type_url.substring(message.type_url.lastIndexOf('/') + 1);
const type = resourceRoot.lookup(name);
if (type) {
const decodedMessage = (type as any).decode(message.value);
return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as RingHash__Output;
} else {
throw new Error(`TypedStruct parsing error: unexpected type URL ${message.type_url}`);
}
}
function convertToLoadBalancingPolicy(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig {
if (protoPolicy.typed_config?.type_url !== RING_HASH_TYPE_URL) {
throw new Error(`Ring Hash LB policy parsing error: unexpected type URL ${protoPolicy.typed_config?.type_url}`);
}
const ringHashMessage = decodeRingHash(protoPolicy.typed_config);
if (ringHashMessage.hash_function !== 'XX_HASH') {
throw new Error(`Ring Hash LB policy parsing error: unexpected hash function ${ringHashMessage.hash_function}`);
}
return {
[TYPE_NAME]: {
min_ring_size: ringHashMessage.minimum_ring_size?.value ?? 1024,
max_ring_size: ringHashMessage.maximum_ring_size?.value ?? 8_388_608
}
};
}
export function setup() {
if (EXPERIMENTAL_RING_HASH) {
registerLoadBalancerType(
TYPE_NAME,
RingHashLoadBalancer,
RingHashLoadBalancingConfig
);
registerLbPolicy(RING_HASH_TYPE_URL, convertToLoadBalancingPolicy);
}
}

View File

@ -204,35 +204,7 @@ class XdsClusterManager implements LoadBalancer {
} else {
connectivityState = ConnectivityState.TRANSIENT_FAILURE;
}
/* For each of the states CONNECTING, IDLE, and TRANSIENT_FAILURE, there is
* exactly one corresponding picker, so if the state is one of those and
* that does not change, no new information is provided by passing the
* new state upward. */
if (connectivityState === this.currentState && connectivityState !== ConnectivityState.READY) {
return;
}
let picker: Picker;
switch (connectivityState) {
case ConnectivityState.READY:
picker = new XdsClusterManagerPicker(pickerMap);
break;
case ConnectivityState.CONNECTING:
case ConnectivityState.IDLE:
picker = new QueuePicker(this);
break;
default:
picker = new UnavailablePicker({
code: Status.UNAVAILABLE,
details: 'xds_cluster_manager: all children report state TRANSIENT_FAILURE',
metadata: new Metadata()
});
}
trace(
'Transitioning to ' +
ConnectivityState[connectivityState]
);
this.channelControlHelper.updateState(connectivityState, picker);
this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap));
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {

View File

@ -115,10 +115,15 @@ class XdsClusterResolverLoadBalancingConfig implements TypedLoadBalancingConfig
}
}
interface WeightedEndpoint {
endpoint: Endpoint;
weight: number;
}
interface LocalityEntry {
locality: Locality__Output;
weight: number;
endpoints: Endpoint[];
endpoints: WeightedEndpoint[];
}
interface PriorityEntry {
@ -166,16 +171,19 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt
if (!endpoint.load_balancing_weight) {
continue;
}
const endpoints: Endpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map(
const endpoints: WeightedEndpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map(
(lbEndpoint) => {
/* The validator in the XdsClient class ensures that each endpoint has
* a socket_address with an IP address and a port_value. */
const socketAddress = lbEndpoint.endpoint!.address!.socket_address!;
return {
addresses: [{
host: socketAddress.address!,
port: socketAddress.port_value!,
}]
endpoint: {
addresses: [{
host: socketAddress.address!,
port: socketAddress.port_value!,
}]
},
weight: lbEndpoint.load_balancing_weight?.value ?? 1
};
}
);
@ -211,7 +219,7 @@ function getDnsPriorities(endpoints: Endpoint[]): PriorityEntry[] {
sub_zone: ''
},
weight: 1,
endpoints: endpoints
endpoints: endpoints.map(endpoint => ({endpoint: endpoint, weight: 1}))
}],
dropCategories: []
}];
@ -295,15 +303,16 @@ export class XdsClusterResolver implements LoadBalancer {
newPriorityNames[priority] = newPriorityName;
for (const localityObj of priorityEntry.localities) {
for (const endpoint of localityObj.endpoints) {
for (const weightedEndpoint of localityObj.endpoints) {
endpointList.push({
localityPath: [
newPriorityName,
localityToName(localityObj.locality),
],
locality: localityObj.locality,
weight: localityObj.weight,
...endpoint
localityWeight: localityObj.weight,
endpointWeight: localityObj.weight * weightedEndpoint.weight,
...weightedEndpoint.endpoint
});
}
newLocalityPriorities.set(localityToName(localityObj.locality), priority);

View File

@ -90,7 +90,7 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
if (!(localityName in targets)) {
targets[localityName] = {
child_policy: lbConfig.getChildPolicy(),
weight: address.weight
weight: address.localityWeight
};
}
}

View File

@ -71,7 +71,7 @@ export class SafeRegexValueMatcher implements ValueMatcher {
const numberRegex = new RE2(/^-?\d+$/u);
export class RangeValueMatcher implements ValueMatcher {
constructor(private start: BigInt, private end: BigInt) {}
constructor(private start: bigint, private end: bigint) {}
apply(value: string) {
if (!numberRegex.test(value)) {
@ -264,4 +264,4 @@ export class FullMatcher implements Matcher {
headers: ${this.headerMatchers.map(matcher => matcher.toString()).join('\n\t')}
fraction: ${this.fraction ? fractionToString(this.fraction): 'none'}`;
}
}
}

View File

@ -34,18 +34,19 @@ import { HeaderMatcher__Output } from './generated/envoy/config/route/v3/HeaderM
import ConfigSelector = experimental.ConfigSelector;
import { ContainsValueMatcher, ExactValueMatcher, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher';
import { envoyFractionToFraction, Fraction } from "./fraction";
import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action';
import { HashPolicy, RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action';
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resources';
import Duration = experimental.Duration;
import { Duration__Output } from './generated/google/protobuf/Duration';
import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY } from './environment';
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY, EXPERIMENTAL_RING_HASH } from './environment';
import Filter = experimental.Filter;
import FilterFactory = experimental.FilterFactory;
import { BootstrapInfo, loadBootstrapInfo, validateBootstrapConfig } from './xds-bootstrap';
import { ListenerResourceType } from './xds-resource-type/listener-resource-type';
import { RouteConfigurationResourceType } from './xds-resource-type/route-config-resource-type';
import { protoDurationToDuration } from './duration';
import { loadXxhashApi } from './xxhash';
const TRACER_NAME = 'xds_resolver';
@ -381,7 +382,11 @@ class XdsResolver implements Resolver {
}
}
private handleRouteConfig(routeConfig: RouteConfiguration__Output) {
private async handleRouteConfig(routeConfig: RouteConfiguration__Output) {
/* We need to load the xxhash API before this function finishes, because
* it is invoked in the config selector, which can be called immediately
* after this function returns. */
await loadXxhashApi();
this.latestRouteConfig = routeConfig;
/* Select the virtual host using the default authority override if it
* exists, and the channel target otherwise. */
@ -456,6 +461,26 @@ class XdsResolver implements Resolver {
}
}
}
const hashPolicies: HashPolicy[] = [];
if (EXPERIMENTAL_RING_HASH) {
for (const routeHashPolicy of route.route!.hash_policy) {
if (routeHashPolicy.policy_specifier === 'header') {
const headerPolicy = routeHashPolicy.header!;
hashPolicies.push({
type: 'HEADER',
terminal: routeHashPolicy.terminal,
headerName: headerPolicy.header_name,
regex: headerPolicy.regex_rewrite?.pattern ? new RE2(headerPolicy.regex_rewrite.pattern.regex, 'ug') : undefined,
regexSubstitution: headerPolicy.regex_rewrite?.substitution
});
} else if (routeHashPolicy.policy_specifier === 'filter_state' && routeHashPolicy.filter_state!.key === 'io.grpc.channel_id') {
hashPolicies.push({
type: 'CHANNEL_ID',
terminal: routeHashPolicy.terminal
});
}
}
}
switch (route.route!.cluster_specifier) {
case 'cluster_header':
continue;
@ -483,7 +508,7 @@ class XdsResolver implements Resolver {
}
}
}
routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories);
routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories, hashPolicies);
break;
}
case 'weighted_clusters': {
@ -525,7 +550,7 @@ class XdsResolver implements Resolver {
}
weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories});
}
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy});
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy}, hashPolicies);
break;
}
default:
@ -554,7 +579,7 @@ class XdsResolver implements Resolver {
this.clusterRefcounts.set(name, {inLastConfig: true, refCount: 0});
}
}
const configSelector: ConfigSelector = (methodName, metadata) => {
const configSelector: ConfigSelector = (methodName, metadata, channelId) => {
for (const {matcher, action} of matchList) {
if (matcher.apply(methodName, metadata)) {
const clusterResult = action.getCluster();
@ -562,10 +587,16 @@ class XdsResolver implements Resolver {
const onCommitted = () => {
this.unrefCluster(clusterResult.name);
}
let hash: string;
if (EXPERIMENTAL_RING_HASH) {
hash = `${action.getHash(metadata, channelId)}`;
} else {
hash = '';
}
return {
methodConfig: clusterResult.methodConfig,
onCommitted: onCommitted,
pickInformation: {cluster: clusterResult.name},
pickInformation: {cluster: clusterResult.name, hash: hash},
status: status.OK,
dynamicFilterFactories: clusterResult.dynamicFilterFactories
};
@ -573,8 +604,8 @@ class XdsResolver implements Resolver {
}
return {
methodConfig: {name: []},
// cluster won't be used here, but it's set because of some TypeScript weirdness
pickInformation: {cluster: ''},
// These fields won't be used here, but they're set because of some TypeScript weirdness
pickInformation: {cluster: '', hash: ''},
status: status.UNAVAILABLE,
dynamicFilterFactories: []
};

View File

@ -14,10 +14,12 @@
* limitations under the License.
*/
import { MethodConfig, experimental } from '@grpc/grpc-js';
import { Metadata, MethodConfig, experimental } from '@grpc/grpc-js';
import Duration = experimental.Duration;
import Filter = experimental.Filter;
import FilterFactory = experimental.FilterFactory;
import { RE2 } from 're2-wasm';
import { xxhashApi } from './xxhash';
export interface ClusterResult {
name: string;
@ -28,6 +30,7 @@ export interface ClusterResult {
export interface RouteAction {
toString(): string;
getCluster(): ClusterResult;
getHash(metadata: Metadata, channelId: number): bigint;
}
function durationToLogString(duration: Duration) {
@ -39,8 +42,83 @@ function durationToLogString(duration: Duration) {
}
}
export interface HashPolicy {
type: 'HEADER' | 'CHANNEL_ID';
terminal: boolean;
headerName?: string;
regex?: RE2;
regexSubstitution?: string;
}
/**
* Must be called only after xxhash.loadXxhashApi() resolves.
* @param hashPolicies
* @param metadata
* @param channelId
*/
function getHash(hashPolicies: HashPolicy[], metadata: Metadata, channelId: number): bigint {
let hash: bigint | null = null;
for (const policy of hashPolicies) {
let newHash: bigint | null = null;
switch (policy.type) {
case 'CHANNEL_ID':
newHash = xxhashApi!.h64(`${channelId}`, 0n);
break;
case 'HEADER': {
if (!policy.headerName) {
break;
}
if (policy.headerName.endsWith('-bin')) {
break;
}
let headerString: string;
if (policy.headerName === 'content-type') {
headerString = 'application/grpc';
} else {
const headerValues = metadata.get(policy.headerName);
if (headerValues.length === 0) {
break;
}
headerString = headerValues.join(',');
}
let rewrittenHeaderString = headerString;
if (policy.regex && policy.regexSubstitution) {
/* The JS string replace method uses $-prefixed patterns to produce
* other strings. See
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String/replace#specifying_a_string_as_the_replacement
* RE2-based regex substitutions use \n where n is a number to refer
* to capture group n, and they otherwise have no special replacement
* patterns. See
* https://github.com/envoyproxy/envoy/blob/2443032526cf6e50d63d35770df9473dd0460fc0/api/envoy/type/matcher/v3/regex.proto#L79-L87
* We convert an RE2 regex substitution into a string substitution by
* first replacing each "$" with "$$" (which produces "$" in the
* output), and then replace each "\n" for any whole number n with
* "$n". */
const regexSubstitution = policy.regexSubstitution.replace(/\$/g, '$$$$').replace(/\\(\d+)/g, '$$$1');
rewrittenHeaderString = headerString.replace(policy.regex, regexSubstitution);
}
newHash = xxhashApi!.h64(rewrittenHeaderString, 0n);
break;
}
}
if (hash === null) {
hash = newHash;
} else if (newHash !== null) {
hash = ((hash << 1n) | (hash >> 63n)) ^ newHash;
}
if (policy.terminal && hash !== null) {
break;
}
}
if (hash === null) {
return xxhashApi!.h64(`${Math.random()}`, 0n);
} else {
return hash;
}
}
export class SingleClusterRouteAction implements RouteAction {
constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory<Filter>[]) {}
constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory<Filter>[], private hashPolicies: HashPolicy[]) {}
getCluster() {
return {
@ -50,6 +128,10 @@ export class SingleClusterRouteAction implements RouteAction {
};
}
getHash(metadata: Metadata, channelId: number): bigint {
return getHash(this.hashPolicies, metadata, channelId);
}
toString() {
return 'SingleCluster(' + this.cluster + ', ' + JSON.stringify(this.methodConfig) + ')';
}
@ -72,7 +154,7 @@ export class WeightedClusterRouteAction implements RouteAction {
* The weighted cluster choices represented as a CDF
*/
private clusterChoices: ClusterChoice[];
constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig) {
constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig, private hashPolicies: HashPolicy[]) {
this.clusterChoices = [];
let lastNumerator = 0;
for (const clusterWeight of clusters) {
@ -96,6 +178,10 @@ export class WeightedClusterRouteAction implements RouteAction {
return {name: '', methodConfig: this.methodConfig, dynamicFilterFactories: []};
}
getHash(metadata: Metadata, channelId: number): bigint {
return getHash(this.hashPolicies, metadata, channelId);
}
toString() {
const clusterListString = this.clusters.map(({name, weight}) => '(' + name + ':' + weight + ')').join(', ')
return 'WeightedCluster(' + clusterListString + ', ' + JSON.stringify(this.methodConfig) + ')';

View File

@ -357,14 +357,14 @@ export function loadBootstrapInfo(): BootstrapInfo {
try {
rawBootstrap = fs.readFileSync(bootstrapPath, { encoding: 'utf8'});
} catch (e) {
throw new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${e.message}`);
throw new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${(e as Error).message}`);
}
try {
const parsedFile = JSON.parse(rawBootstrap);
loadedBootstrapInfo = validateBootstrapConfig(parsedFile);
return loadedBootstrapInfo;
} catch (e) {
throw new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}`)
throw new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${(e as Error).message}`)
}
}
@ -383,7 +383,7 @@ export function loadBootstrapInfo(): BootstrapInfo {
loadedBootstrapInfo = validateBootstrapConfig(parsedConfig);
} catch (e) {
throw new Error(
`Failed to parse xDS bootstrap config from environment variable GRPC_XDS_BOOTSTRAP_CONFIG with error ${e.message}`
`Failed to parse xDS bootstrap config from environment variable GRPC_XDS_BOOTSTRAP_CONFIG with error ${(e as Error).message}`
);
}

View File

@ -208,14 +208,14 @@ class AdsResponseParser {
try {
decodeResult = this.result.type.decode(decodeContext, resource);
} catch (e) {
this.result.errors.push(`${errorPrefix} ${e.message}`);
this.result.errors.push(`${errorPrefix} ${(e as Error).message}`);
return;
}
let parsedName: XdsResourceName;
try {
parsedName = parseXdsResourceName(decodeResult.name, this.result.type!.getTypeUrl());
} catch (e) {
this.result.errors.push(`${errorPrefix} ${e.message}`);
this.result.errors.push(`${errorPrefix} ${(e as Error).message}`);
return;
}
this.adsCallState.typeStates.get(this.result.type!)?.subscribedResources.get(parsedName.authority)?.get(parsedName.key)?.markSeen();
@ -250,7 +250,7 @@ class AdsResponseParser {
if (!decodeResult.value) {
return;
}
this.adsCallState.client.trace('Parsed resource of type ' + this.result.type.getTypeUrl() + ': ' + JSON.stringify(decodeResult.value, undefined, 2));
this.adsCallState.client.trace('Parsed resource of type ' + this.result.type.getTypeUrl() + ': ' + JSON.stringify(decodeResult.value, (key, value) => (value && value.type === 'Buffer' && Array.isArray(value.data)) ? (value.data as Number[]).map(n => n.toString(16)).join('') : value, 2));
this.result.haveValidResources = true;
if (this.result.type.resourcesEqual(resourceState.cachedResource, decodeResult.value)) {
return;

View File

@ -21,7 +21,7 @@ import { LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js";
import { XdsServerConfig } from "../xds-bootstrap";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { OutlierDetection__Output } from "../generated/envoy/config/cluster/v3/OutlierDetection";
import { EXPERIMENTAL_CUSTOM_LB_CONFIG, EXPERIMENTAL_OUTLIER_DETECTION } from "../environment";
import { EXPERIMENTAL_CUSTOM_LB_CONFIG, EXPERIMENTAL_OUTLIER_DETECTION, EXPERIMENTAL_RING_HASH } from "../environment";
import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster";
import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value";
import { Any__Output } from "../generated/google/protobuf/Any";
@ -150,6 +150,24 @@ export class ClusterResourceType extends XdsResourceType {
child_policy: [{round_robin: {}}]
}
};
} else if(EXPERIMENTAL_RING_HASH && message.lb_policy === 'RING_HASH') {
if (message.ring_hash_lb_config && message.ring_hash_lb_config.hash_function !== 'XX_HASH') {
return null;
}
const minRingSize = message.ring_hash_lb_config?.minimum_ring_size ? Number(message.ring_hash_lb_config.minimum_ring_size.value) : 1024;
if (minRingSize > 8_388_608) {
return null;
}
const maxRingSize = message.ring_hash_lb_config?.maximum_ring_size ? Number(message.ring_hash_lb_config.maximum_ring_size.value) : 8_388_608;
if (maxRingSize > 8_388_608) {
return null;
}
lbPolicyConfig = {
ring_hash: {
min_ring_size: minRingSize,
max_ring_size: maxRingSize
}
};
} else {
return null;
}
@ -264,6 +282,7 @@ export class ClusterResourceType extends XdsResourceType {
);
}
const message = decodeSingleResource(CDS_TYPE_URL, resource.value);
trace('Decoded raw resource of type ' + CDS_TYPE_URL + ': ' + JSON.stringify(message, undefined, 2));
const validatedMessage = this.validateResource(context, message);
if (validatedMessage) {
return {

View File

@ -101,6 +101,7 @@ export class EndpointResourceType extends XdsResourceType {
);
}
const message = decodeSingleResource(EDS_TYPE_URL, resource.value);
trace('Decoded raw resource of type ' + EDS_TYPE_URL + ': ' + JSON.stringify(message, undefined, 2));
const validatedMessage = this.validateResource(message);
if (validatedMessage) {
return {

View File

@ -106,6 +106,7 @@ export class ListenerResourceType extends XdsResourceType {
);
}
const message = decodeSingleResource(LDS_TYPE_URL, resource.value);
trace('Decoded raw resource of type ' + LDS_TYPE_URL + ': ' + JSON.stringify(message, (key, value) => (value && value.type === 'Buffer' && Array.isArray(value.data)) ? (value.data as Number[]).map(n => n.toString(16)).join('') : value, 2));
const validatedMessage = this.validateResource(message);
if (validatedMessage) {
return {

View File

@ -15,6 +15,7 @@
*
*/
import { experimental, logVerbosity } from "@grpc/grpc-js";
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from "../environment";
import { RetryPolicy__Output } from "../generated/envoy/config/route/v3/RetryPolicy";
import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration";
@ -24,6 +25,11 @@ import { validateOverrideFilter } from "../http-filter";
import { RDS_TYPE_URL, decodeSingleResource } from "../resources";
import { Watcher, XdsClient } from "../xds-client";
import { XdsDecodeContext, XdsDecodeResult, XdsResourceType } from "./xds-resource-type";
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const SUPPORTED_PATH_SPECIFIERS = ['prefix', 'path', 'safe_regex'];
const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
@ -169,6 +175,7 @@ export class RouteConfigurationResourceType extends XdsResourceType {
);
}
const message = decodeSingleResource(RDS_TYPE_URL, resource.value);
trace('Decoded raw resource of type ' + RDS_TYPE_URL + ': ' + JSON.stringify(message, undefined, 2));
const validatedMessage = this.validateResource(message);
if (validatedMessage) {
return {

View File

@ -0,0 +1,31 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/* The simpler `import xxhash from 'xxhash-wasm';` doesn't compile correctly
* to CommonJS require calls for some reason, so we use this import to get
* the type, and then an explicit require call to get the actual value. */
import xxhashImport from 'xxhash-wasm';
const xxhash: typeof xxhashImport = require('xxhash-wasm');
export let xxhashApi: Awaited<ReturnType<typeof xxhash>> | null = null;
export async function loadXxhashApi() {
if (!xxhashApi) {
xxhashApi = await xxhash();
}
return xxhashApi;
}

View File

@ -70,7 +70,7 @@ export interface FakeCluster {
}
export class FakeEdsCluster implements FakeCluster {
constructor(private clusterName: string, private endpointName: string, private endpoints: Endpoint[], private loadBalancingPolicyOverride?: Any) {}
constructor(private clusterName: string, private endpointName: string, private endpoints: Endpoint[], private loadBalancingPolicyOverride?: Any | 'RING_HASH') {}
getEndpointConfig(): ClusterLoadAssignment {
return {
@ -94,7 +94,9 @@ export class FakeEdsCluster implements FakeCluster {
]
}
};
if (this.loadBalancingPolicyOverride) {
if (this.loadBalancingPolicyOverride === 'RING_HASH') {
result.lb_policy = 'RING_HASH';
} else if (this.loadBalancingPolicyOverride) {
result.load_balancing_policy = {
policies: [
{
@ -257,8 +259,14 @@ function createRouteConfig(route: FakeRoute): Route {
prefix: ''
},
route: {
cluster: route.cluster.getName()
}
cluster: route.cluster.getName(),
// Default to consistent hash
hash_policy: [{
filter_state: {
key: 'io.grpc.channel_id'
}
}]
},
};
} else {
return {
@ -271,7 +279,13 @@ function createRouteConfig(route: FakeRoute): Route {
name: clusterWeight.cluster.getName(),
weight: {value: clusterWeight.weight}
}))
}
},
// Default to consistent hash
hash_policy: [{
filter_state: {
key: 'io.grpc.channel_id'
}
}]
}
}
}

View File

@ -19,6 +19,7 @@ import { experimental, LoadBalancingConfig } from "@grpc/grpc-js";
import { register } from "../src";
import assert = require("assert");
import parseLoadbalancingConfig = experimental.parseLoadBalancingConfig;
import { EXPERIMENTAL_RING_HASH } from "../src/environment";
register();
@ -34,6 +35,7 @@ interface TestCase {
input: object,
output?: object;
error?: RegExp;
skipIf?: boolean;
}
/* The main purpose of these tests is to verify that configs that are expected
@ -311,6 +313,41 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = {
}
}
}
],
ring_hash: [
{
name: 'empty config',
input: {},
output: {
min_ring_size: 1024,
max_ring_size: 4096
},
skipIf: !EXPERIMENTAL_RING_HASH
},
{
name: 'populated config',
input: {
min_ring_size: 2048,
max_ring_size: 8192
},
skipIf: !EXPERIMENTAL_RING_HASH
},
{
name: 'min_ring_size too large',
input: {
min_ring_size: 8_388_609
},
error: /min_ring_size/,
skipIf: !EXPERIMENTAL_RING_HASH
},
{
name: 'max_ring_size too large',
input: {
max_ring_size: 8_388_609
},
error: /max_ring_size/,
skipIf: !EXPERIMENTAL_RING_HASH
}
]
}
@ -318,7 +355,10 @@ describe('Load balancing policy config parsing', () => {
for (const [lbPolicyName, testCases] of Object.entries(allTestCases)) {
describe(lbPolicyName, () => {
for (const testCase of testCases) {
it(testCase.name, () => {
it(testCase.name, function() {
if (testCase.skipIf) {
this.skip();
}
const lbConfigInput = {[lbPolicyName]: testCase.input};
if (testCase.error) {
assert.throws(() => {

View File

@ -0,0 +1,173 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { Backend } from "./backend";
import { XdsTestClient } from "./client";
import { FakeEdsCluster, FakeRouteGroup } from "./framework";
import { XdsServer } from "./xds-server";
import { register } from "../src";
import assert = require("assert");
import { Any } from "../src/generated/google/protobuf/Any";
import { AnyExtension } from "@grpc/proto-loader";
import { RingHash } from "../src/generated/envoy/extensions/load_balancing_policies/ring_hash/v3/RingHash";
import { EXPERIMENTAL_RING_HASH } from "../src/environment";
register();
describe('Ring hash LB policy', () => {
let xdsServer: XdsServer;
let client: XdsTestClient;
beforeEach(done => {
xdsServer = new XdsServer();
xdsServer.startServer(error => {
done(error);
});
});
afterEach(() => {
client?.close();
xdsServer?.shutdownServer();
});
it('Should route requests to the single backend with the old lbPolicy field', function(done) {
if (!EXPERIMENTAL_RING_HASH) {
this.skip();
}
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], 'RING_HASH');
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
});
it('Should route requests to the single backend with the new load_balancing_policy field', function(done) {
if (!EXPERIMENTAL_RING_HASH) {
this.skip();
}
const lbPolicy: AnyExtension & RingHash = {
'@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash',
hash_function: 'XX_HASH'
};
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendOneCall(done);
}, reason => done(reason));
});
it('Should route all identical requests to the same backend', function(done) {
if (!EXPERIMENTAL_RING_HASH) {
this.skip();
}
const backend1 = new Backend();
const backend2 = new Backend()
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend1, backend2], locality:{region: 'region1'}}], 'RING_HASH');
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendNCalls(10, error => {
assert.ifError(error);
assert((backend1.getCallCount() === 0) !== (backend2.getCallCount() === 0));
done();
})
}, reason => done(reason));
});
it('Should fallback to a second backend if the first one goes down', function(done) {
if (!EXPERIMENTAL_RING_HASH) {
this.skip();
}
const backends = [new Backend(), new Backend(), new Backend()];
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: backends, locality:{region: 'region1'}}], 'RING_HASH');
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.sendNCalls(100, error => {
assert.ifError(error);
let backendWithTraffic: number | null = null;
for (let i = 0; i < backends.length; i++) {
if (backendWithTraffic === null) {
if (backends[i].getCallCount() > 0) {
backendWithTraffic = i;
}
} else {
assert.strictEqual(backends[i].getCallCount(), 0, `Backends ${backendWithTraffic} and ${i} both got traffic`);
}
}
assert.notStrictEqual(backendWithTraffic, null, 'No backend got traffic');
backends[backendWithTraffic!].shutdown(error => {
assert.ifError(error);
backends[backendWithTraffic!].resetCallCount();
client.sendNCalls(100, error => {
assert.ifError(error);
let backendWithTraffic2: number | null = null;
for (let i = 0; i < backends.length; i++) {
if (backendWithTraffic2 === null) {
if (backends[i].getCallCount() > 0) {
backendWithTraffic2 = i;
}
} else {
assert.strictEqual(backends[i].getCallCount(), 0, `Backends ${backendWithTraffic2} and ${i} both got traffic`);
}
}
assert.notStrictEqual(backendWithTraffic2, null, 'No backend got traffic');
assert.notStrictEqual(backendWithTraffic2, backendWithTraffic, `Traffic went to the same backend ${backendWithTraffic} after shutdown`);
done();
});
});
});
}, reason => done(reason));
})
});

View File

@ -44,6 +44,7 @@ const loadedProtos = loadPackageDefinition(loadSync(
'envoy/extensions/clusters/aggregate/v3/cluster.proto',
'envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto',
'envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto',
'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto',
'xds/type/v3/typed_struct.proto'
],
{

View File

@ -3,8 +3,8 @@
"compilerOptions": {
"rootDir": ".",
"outDir": "build",
"target": "es2017",
"lib": ["es2017"],
"target": "es2020",
"lib": ["es2020"],
"module": "commonjs",
"incremental": true
},

View File

@ -61,6 +61,7 @@ export interface ChannelOptions {
* Set the enableTrace option in TLS clients and servers
*/
'grpc-node.tls_enable_trace'?: number;
'grpc.lb.ring_hash.ring_size_cap'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
}
@ -96,6 +97,7 @@ export const recognizedOptions = {
'grpc.service_config_disable_resolution': true,
'grpc.client_idle_timeout_ms': true,
'grpc-node.tls_enable_trace': true,
'grpc.lb.ring_hash.ring_size_cap': true,
};
export function channelOptionsEqual(

View File

@ -26,6 +26,7 @@ export {
Endpoint,
endpointToString,
endpointHasAddress,
EndpointMap,
} from './subchannel-address';
export { ChildLoadBalancerHandler } from './load-balancer-child-handler';
export {

View File

@ -193,6 +193,15 @@ export class InternalChannel {
private readonly callTracker = new ChannelzCallTracker();
private readonly childrenTracker = new ChannelzChildrenTracker();
/**
* Randomly generated ID to be passed to the config selector, for use by
* ring_hash in xDS. An integer distributed approximately uniformly between
* 0 and MAX_SAFE_INTEGER.
*/
private readonly randomChannelId = Math.floor(
Math.random() * Number.MAX_SAFE_INTEGER
);
constructor(
target: string,
private readonly credentials: ChannelCredentials,
@ -528,7 +537,7 @@ export class InternalChannel {
if (this.configSelector) {
return {
type: 'SUCCESS',
config: this.configSelector(method, metadata),
config: this.configSelector(method, metadata, this.randomChannelId),
};
} else {
if (this.currentResolutionError) {

View File

@ -33,10 +33,9 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { PickArgs, Picker, PickResult, PickResultType } from './picker';
import {
Endpoint,
EndpointMap,
SubchannelAddress,
endpointHasAddress,
endpointToString,
subchannelAddressEqual,
} from './subchannel-address';
import {
BaseSubchannelWrapper,
@ -461,126 +460,9 @@ interface MapEntry {
subchannelWrappers: OutlierDetectionSubchannelWrapper[];
}
interface EndpointMapEntry {
key: Endpoint;
value: MapEntry;
}
function endpointEqualUnordered(
endpoint1: Endpoint,
endpoint2: Endpoint
): boolean {
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
return false;
}
for (const address1 of endpoint1.addresses) {
let matchFound = false;
for (const address2 of endpoint2.addresses) {
if (subchannelAddressEqual(address1, address2)) {
matchFound = true;
break;
}
}
if (!matchFound) {
return false;
}
}
return true;
}
class EndpointMap {
private map: Set<EndpointMapEntry> = new Set();
get size() {
return this.map.size;
}
getForSubchannelAddress(address: SubchannelAddress): MapEntry | undefined {
for (const entry of this.map) {
if (endpointHasAddress(entry.key, address)) {
return entry.value;
}
}
return undefined;
}
/**
* Delete any entries in this map with keys that are not in endpoints
* @param endpoints
*/
deleteMissing(endpoints: Endpoint[]) {
for (const entry of this.map) {
let foundEntry = false;
for (const endpoint of endpoints) {
if (endpointEqualUnordered(endpoint, entry.key)) {
foundEntry = true;
}
}
if (!foundEntry) {
this.map.delete(entry);
}
}
}
get(endpoint: Endpoint): MapEntry | undefined {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return entry.value;
}
}
return undefined;
}
set(endpoint: Endpoint, mapEntry: MapEntry) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
entry.value = mapEntry;
return;
}
}
this.map.add({ key: endpoint, value: mapEntry });
}
delete(endpoint: Endpoint) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
this.map.delete(entry);
return;
}
}
}
has(endpoint: Endpoint): boolean {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return true;
}
}
return false;
}
*keys(): IterableIterator<Endpoint> {
for (const entry of this.map) {
yield entry.key;
}
}
*values(): IterableIterator<MapEntry> {
for (const entry of this.map) {
yield entry.value;
}
}
*entries(): IterableIterator<[Endpoint, MapEntry]> {
for (const entry of this.map) {
yield [entry.key, entry.value];
}
}
}
export class OutlierDetectionLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private entryMap = new EndpointMap();
private entryMap = new EndpointMap<MapEntry>();
private latestConfig: OutlierDetectionLoadBalancingConfig | null = null;
private ejectionTimer: NodeJS.Timeout;
private timerStartTime: Date | null = null;

View File

@ -541,6 +541,19 @@ export class LeafLoadBalancer {
this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG);
}
/**
* Update the endpoint associated with this LeafLoadBalancer to a new
* endpoint. Does not trigger connection establishment if a connection
* attempt is not already in progress.
* @param newEndpoint
*/
updateEndpoint(newEndpoint: Endpoint) {
this.endpoint = newEndpoint;
if (this.latestState !== ConnectivityState.IDLE) {
this.startConnecting();
}
}
getConnectivityState() {
return this.latestState;
}

View File

@ -122,25 +122,34 @@ export class UnavailablePicker implements Picker {
* indicating that the pick should be tried again with the next `Picker`. Also
* reports back to the load balancer that a connection should be established
* once any pick is attempted.
* If the childPicker is provided, delegate to it instead of returning the
* hardcoded QUEUE pick result, but still calls exitIdle.
*/
export class QueuePicker {
private calledExitIdle = false;
// Constructed with a load balancer. Calls exitIdle on it the first time pick is called
constructor(private loadBalancer: LoadBalancer) {}
constructor(
private loadBalancer: LoadBalancer,
private childPicker?: Picker
) {}
pick(pickArgs: PickArgs): QueuePickResult {
pick(pickArgs: PickArgs): PickResult {
if (!this.calledExitIdle) {
process.nextTick(() => {
this.loadBalancer.exitIdle();
});
this.calledExitIdle = true;
}
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
if (this.childPicker) {
return this.childPicker.pick(pickArgs);
} else {
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
}
}
}

View File

@ -37,7 +37,7 @@ export interface CallConfig {
* https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc
*/
export interface ConfigSelector {
(methodName: string, metadata: Metadata): CallConfig;
(methodName: string, metadata: Metadata, channelId: number): CallConfig;
}
/**

View File

@ -279,7 +279,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
);
// Ensure that this.exitIdle() is called by the picker
if (connectivityState === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
picker = new QueuePicker(this, picker);
}
this.currentState = connectivityState;
this.channelControlHelper.updateState(connectivityState, picker);

View File

@ -122,3 +122,127 @@ export function endpointHasAddress(
}
return false;
}
interface EndpointMapEntry<ValueType> {
key: Endpoint;
value: ValueType;
}
function endpointEqualUnordered(
endpoint1: Endpoint,
endpoint2: Endpoint
): boolean {
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
return false;
}
for (const address1 of endpoint1.addresses) {
let matchFound = false;
for (const address2 of endpoint2.addresses) {
if (subchannelAddressEqual(address1, address2)) {
matchFound = true;
break;
}
}
if (!matchFound) {
return false;
}
}
return true;
}
export class EndpointMap<ValueType> {
private map: Set<EndpointMapEntry<ValueType>> = new Set();
get size() {
return this.map.size;
}
getForSubchannelAddress(address: SubchannelAddress): ValueType | undefined {
for (const entry of this.map) {
if (endpointHasAddress(entry.key, address)) {
return entry.value;
}
}
return undefined;
}
/**
* Delete any entries in this map with keys that are not in endpoints
* @param endpoints
*/
deleteMissing(endpoints: Endpoint[]): ValueType[] {
const removedValues: ValueType[] = [];
for (const entry of this.map) {
let foundEntry = false;
for (const endpoint of endpoints) {
if (endpointEqualUnordered(endpoint, entry.key)) {
foundEntry = true;
}
}
if (!foundEntry) {
removedValues.push(entry.value);
this.map.delete(entry);
}
}
return removedValues;
}
get(endpoint: Endpoint): ValueType | undefined {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return entry.value;
}
}
return undefined;
}
set(endpoint: Endpoint, mapEntry: ValueType) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
entry.value = mapEntry;
return;
}
}
this.map.add({ key: endpoint, value: mapEntry });
}
delete(endpoint: Endpoint) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
this.map.delete(entry);
return;
}
}
}
has(endpoint: Endpoint): boolean {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return true;
}
}
return false;
}
clear() {
this.map.clear();
}
*keys(): IterableIterator<Endpoint> {
for (const entry of this.map) {
yield entry.key;
}
}
*values(): IterableIterator<ValueType> {
for (const entry of this.map) {
yield entry.value;
}
}
*entries(): IterableIterator<[Endpoint, ValueType]> {
for (const entry of this.map) {
yield [entry.key, entry.value];
}
}
}