Compare commits

...

27 Commits

Author SHA1 Message Date
Michael Lumish a7dfb681b1
Merge pull request #3003 from murgatroid99/grpc-js-xds_register_wrr_locality
grpc-js-xds: Add wrr_locality to LB policy registry
2025-09-03 15:17:26 -07:00
Michael Lumish 296bd2fb6f grpc-js-xds: Add wrr_locality to LB policy registry 2025-09-03 13:58:42 -07:00
Michael Lumish cbece4238b
Merge pull request #3002 from murgatroid99/grpc-js-xds_interop_fix
grpc-js-xds: interop: Fix order of submodule update and npm install
2025-08-29 11:21:15 -07:00
Michael Lumish 196ceaadb2 grpc-js-xds: interop: Fix order of submodule update and npm install 2025-08-29 10:45:44 -07:00
Michael Lumish 5ceac29f3a
Merge pull request #2962 from murgatroid99/grpc-js-xds_light_test_job
grpc-js-xds: Add psm-light test job config
2025-08-27 15:41:03 -07:00
Michael Lumish 9d601da651
Merge pull request #3001 from murgatroid99/grpc-js-xds_wrr
grpc-js-xds: Register weighted_round_robin as a custom LB policy
2025-08-27 10:54:52 -07:00
Michael Lumish c67f7183b2 grpc-js: Properly unwrap subchannel in WRR picker 2025-08-22 15:44:58 -07:00
Michael Lumish 1ec5996769 Merge branch 'master' into grpc-js-xds_wrr 2025-08-22 15:15:39 -07:00
Michael Lumish f957004b07 grpc-js-xds: XdsClusterImpl: fix handling of non-complete pick result type 2025-08-22 14:40:06 -07:00
Michael Lumish 2121631243 grpc-js-xds: Register weighted_round_robin as a custom LB policy 2025-08-22 13:56:15 -07:00
Michael Lumish 479fa71321
Merge pull request #3000 from murgatroid99/grpc-js-xds_test_fix
grpc-js-xds: Remove .only from a test, switch port finding library
2025-08-19 12:57:47 -07:00
Michael Lumish 202648b334
Merge pull request #2999 from murgatroid99/grpc-js_client_oob_metrics_rework
grpc-js: Move client OOB metrics API to the subchannel
2025-08-18 14:55:00 -07:00
Michael Lumish 211be0f3c2 grpc-js-xds: Remove .only from a test, switch port finding library 2025-08-18 14:33:35 -07:00
Michael Lumish e975b856b9 Loosen test tolerance a little 2025-08-18 12:56:11 -07:00
Michael Lumish f0d3f81fec grpc-js: Move client OOB metrics API to the subchannel 2025-08-18 11:27:04 -07:00
Michael Lumish 1aaa3fde33
Merge pull request #2998 from murgatroid99/grpc-js_client_weighted_round_robin
grpc-js: Implement weighted_round_robin LB policy
2025-08-14 15:58:45 -07:00
Michael Lumish 2f74b88616 Give more leeway on different QPS tests 2025-08-14 14:47:43 -07:00
Michael Lumish e78322f717 Fix an issue with the test 2025-08-14 14:14:12 -07:00
Michael Lumish 13065ad5a1 grpc-js: Implement weighted_round_robin LB policy 2025-08-14 13:55:02 -07:00
Michael Lumish 6573a0ebe4
Merge pull request #2971 from KoenRijpstra/fix/early-connection-window-update
Fix: send connection-level WINDOW_UPDATE at session start
2025-08-11 15:54:59 -07:00
Koen Rijpstra a8d22dc906 fix(http2): set default initial window size to 65535 if not specified 2025-08-10 09:02:07 +02:00
Michael Lumish 83ece61c88 grpc-js: Implement weighted round robin 2025-08-06 10:50:05 -07:00
Koen Rijpstra 0e09b9cd59 fix(http2): move WINDOW_UPDATE handling to remoteSettings event 2025-06-27 21:38:50 +02:00
Koen Rijpstra b69bcad1bb fix(http2): rename connection flow control window option to grpc-node.flow_control_window 2025-06-27 15:11:56 +02:00
Koen Rijpstra aeb7a5fd52 fix(http2): handle default initial window size correctly 2025-06-27 14:35:21 +02:00
Koen Rijpstra d872278606 fix(http2): bump connection window immediately after connect()
If the user sets `grpc-node.connection_flow_control_window` to a value
> 65 535 B, we now send a WINDOW_UPDATE (or setLocalWindowSize) right
after `http2.connect()` returns. This removes the 65 KB start-window
stall that caused large initial backlogs on high-throughput streams,
especially when an H2 proxy (e.g. HAProxy) sat between client and
server. Behaviour now matches Go/Rust gRPC; no API changes.
2025-06-27 14:09:01 +02:00
Michael Lumish f8a801bcbe grpc-js-xds: Add psm-light test job config 2025-06-09 15:09:55 -07:00
43 changed files with 2144 additions and 286 deletions

View File

@ -72,6 +72,7 @@ const runTests = checkTask(() => {
process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION = 'true';
process.env.GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG = 'true';
process.env.GRPC_XDS_EXPERIMENTAL_RBAC = 'true';
process.env.GRPC_EXPERIMENTAL_XDS_WRR_LB = 'true';
if (Number(process.versions.node.split('.')[0]) <= 14) {
process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH = 'false';
}

View File

@ -12,7 +12,7 @@
"prepare": "npm run generate-types && npm run generate-interop-types && npm run generate-test-types && npm run compile",
"pretest": "npm run compile",
"posttest": "npm run check",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto envoy/extensions/clusters/aggregate/v3/cluster.proto envoy/extensions/transport_sockets/tls/v3/tls.proto envoy/config/rbac/v3/rbac.proto envoy/extensions/filters/http/rbac/v3/rbac.proto",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto envoy/extensions/clusters/aggregate/v3/cluster.proto envoy/extensions/transport_sockets/tls/v3/tls.proto envoy/config/rbac/v3/rbac.proto envoy/extensions/filters/http/rbac/v3/rbac.proto envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3/client_side_weighted_round_robin.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto",
"generate-interop-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O interop/generated --grpcLib @grpc/grpc-js grpc/testing/test.proto",
"generate-test-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O test/generated --grpcLib @grpc/grpc-js grpc/testing/echo.proto"
},
@ -34,16 +34,16 @@
"devDependencies": {
"@grpc/grpc-js": "file:../grpc-js",
"@grpc/proto-loader": "file:../proto-loader",
"@grpc/reflection": "file:../grpc-reflection",
"@types/gulp": "^4.0.6",
"@types/gulp-mocha": "0.0.32",
"@types/mocha": "^5.2.6",
"@types/node": ">=20.11.20",
"@grpc/reflection": "file:../grpc-reflection",
"@types/yargs": "^15.0.5",
"find-free-ports": "^3.1.1",
"grpc-health-check": "file:../grpc-health-check",
"gts": "^5.0.1",
"ncp": "^2.0.0",
"portfinder": "^1.0.37",
"typescript": "^5.1.3",
"yargs": "^15.4.1"
},

View File

@ -34,6 +34,9 @@ echo "source $NVM_DIR/nvm.sh" > ~/.profile
echo "source $NVM_DIR/nvm.sh" > ~/.shrc
export ENV=~/.shrc
cd $base
git submodule update --init --recursive
cd $base/../proto-loader
npm install
@ -47,8 +50,6 @@ cd $base/../grpc-reflection
npm install
# grpc-js-xds has a dev dependency on "../grpc-js", so it should pull that in automatically
cd $base
git submodule update --init --recursive
npm install
cd ../../..

View File

@ -28,3 +28,4 @@ export const EXPERIMENTAL_PICK_FIRST = (process.env.GRPC_EXPERIMENTAL_PICKFIRST_
export const EXPERIMENTAL_DUALSTACK_ENDPOINTS = (process.env.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS ?? 'true') === 'true';
export const AGGREGATE_CLUSTER_BACKWARDS_COMPAT = (process.env.GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT ?? 'false') === 'true';
export const EXPERIMENTAL_RBAC = (process.env.GRPC_XDS_EXPERIMENTAL_RBAC ?? 'false') === 'true';
export const EXPERIMENTAL_WRR_LB = (process.env.GRPC_EXPERIMENTAL_XDS_WRR_LB ?? 'false') === 'true';

View File

@ -0,0 +1,153 @@
import type * as grpc from '@grpc/grpc-js';
import type { EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader';
import type { ClientSideWeightedRoundRobin as _envoy_extensions_load_balancing_policies_client_side_weighted_round_robin_v3_ClientSideWeightedRoundRobin, ClientSideWeightedRoundRobin__Output as _envoy_extensions_load_balancing_policies_client_side_weighted_round_robin_v3_ClientSideWeightedRoundRobin__Output } from './envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3/ClientSideWeightedRoundRobin';
import type { BoolValue as _google_protobuf_BoolValue, BoolValue__Output as _google_protobuf_BoolValue__Output } from './google/protobuf/BoolValue';
import type { BytesValue as _google_protobuf_BytesValue, BytesValue__Output as _google_protobuf_BytesValue__Output } from './google/protobuf/BytesValue';
import type { DescriptorProto as _google_protobuf_DescriptorProto, DescriptorProto__Output as _google_protobuf_DescriptorProto__Output } from './google/protobuf/DescriptorProto';
import type { DoubleValue as _google_protobuf_DoubleValue, DoubleValue__Output as _google_protobuf_DoubleValue__Output } from './google/protobuf/DoubleValue';
import type { Duration as _google_protobuf_Duration, Duration__Output as _google_protobuf_Duration__Output } from './google/protobuf/Duration';
import type { EnumDescriptorProto as _google_protobuf_EnumDescriptorProto, EnumDescriptorProto__Output as _google_protobuf_EnumDescriptorProto__Output } from './google/protobuf/EnumDescriptorProto';
import type { EnumOptions as _google_protobuf_EnumOptions, EnumOptions__Output as _google_protobuf_EnumOptions__Output } from './google/protobuf/EnumOptions';
import type { EnumValueDescriptorProto as _google_protobuf_EnumValueDescriptorProto, EnumValueDescriptorProto__Output as _google_protobuf_EnumValueDescriptorProto__Output } from './google/protobuf/EnumValueDescriptorProto';
import type { EnumValueOptions as _google_protobuf_EnumValueOptions, EnumValueOptions__Output as _google_protobuf_EnumValueOptions__Output } from './google/protobuf/EnumValueOptions';
import type { ExtensionRangeOptions as _google_protobuf_ExtensionRangeOptions, ExtensionRangeOptions__Output as _google_protobuf_ExtensionRangeOptions__Output } from './google/protobuf/ExtensionRangeOptions';
import type { FeatureSet as _google_protobuf_FeatureSet, FeatureSet__Output as _google_protobuf_FeatureSet__Output } from './google/protobuf/FeatureSet';
import type { FeatureSetDefaults as _google_protobuf_FeatureSetDefaults, FeatureSetDefaults__Output as _google_protobuf_FeatureSetDefaults__Output } from './google/protobuf/FeatureSetDefaults';
import type { FieldDescriptorProto as _google_protobuf_FieldDescriptorProto, FieldDescriptorProto__Output as _google_protobuf_FieldDescriptorProto__Output } from './google/protobuf/FieldDescriptorProto';
import type { FieldOptions as _google_protobuf_FieldOptions, FieldOptions__Output as _google_protobuf_FieldOptions__Output } from './google/protobuf/FieldOptions';
import type { FileDescriptorProto as _google_protobuf_FileDescriptorProto, FileDescriptorProto__Output as _google_protobuf_FileDescriptorProto__Output } from './google/protobuf/FileDescriptorProto';
import type { FileDescriptorSet as _google_protobuf_FileDescriptorSet, FileDescriptorSet__Output as _google_protobuf_FileDescriptorSet__Output } from './google/protobuf/FileDescriptorSet';
import type { FileOptions as _google_protobuf_FileOptions, FileOptions__Output as _google_protobuf_FileOptions__Output } from './google/protobuf/FileOptions';
import type { FloatValue as _google_protobuf_FloatValue, FloatValue__Output as _google_protobuf_FloatValue__Output } from './google/protobuf/FloatValue';
import type { GeneratedCodeInfo as _google_protobuf_GeneratedCodeInfo, GeneratedCodeInfo__Output as _google_protobuf_GeneratedCodeInfo__Output } from './google/protobuf/GeneratedCodeInfo';
import type { Int32Value as _google_protobuf_Int32Value, Int32Value__Output as _google_protobuf_Int32Value__Output } from './google/protobuf/Int32Value';
import type { Int64Value as _google_protobuf_Int64Value, Int64Value__Output as _google_protobuf_Int64Value__Output } from './google/protobuf/Int64Value';
import type { MessageOptions as _google_protobuf_MessageOptions, MessageOptions__Output as _google_protobuf_MessageOptions__Output } from './google/protobuf/MessageOptions';
import type { MethodDescriptorProto as _google_protobuf_MethodDescriptorProto, MethodDescriptorProto__Output as _google_protobuf_MethodDescriptorProto__Output } from './google/protobuf/MethodDescriptorProto';
import type { MethodOptions as _google_protobuf_MethodOptions, MethodOptions__Output as _google_protobuf_MethodOptions__Output } from './google/protobuf/MethodOptions';
import type { OneofDescriptorProto as _google_protobuf_OneofDescriptorProto, OneofDescriptorProto__Output as _google_protobuf_OneofDescriptorProto__Output } from './google/protobuf/OneofDescriptorProto';
import type { OneofOptions as _google_protobuf_OneofOptions, OneofOptions__Output as _google_protobuf_OneofOptions__Output } from './google/protobuf/OneofOptions';
import type { ServiceDescriptorProto as _google_protobuf_ServiceDescriptorProto, ServiceDescriptorProto__Output as _google_protobuf_ServiceDescriptorProto__Output } from './google/protobuf/ServiceDescriptorProto';
import type { ServiceOptions as _google_protobuf_ServiceOptions, ServiceOptions__Output as _google_protobuf_ServiceOptions__Output } from './google/protobuf/ServiceOptions';
import type { SourceCodeInfo as _google_protobuf_SourceCodeInfo, SourceCodeInfo__Output as _google_protobuf_SourceCodeInfo__Output } from './google/protobuf/SourceCodeInfo';
import type { StringValue as _google_protobuf_StringValue, StringValue__Output as _google_protobuf_StringValue__Output } from './google/protobuf/StringValue';
import type { Timestamp as _google_protobuf_Timestamp, Timestamp__Output as _google_protobuf_Timestamp__Output } from './google/protobuf/Timestamp';
import type { UInt32Value as _google_protobuf_UInt32Value, UInt32Value__Output as _google_protobuf_UInt32Value__Output } from './google/protobuf/UInt32Value';
import type { UInt64Value as _google_protobuf_UInt64Value, UInt64Value__Output as _google_protobuf_UInt64Value__Output } from './google/protobuf/UInt64Value';
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from './google/protobuf/UninterpretedOption';
import type { StatusAnnotation as _udpa_annotations_StatusAnnotation, StatusAnnotation__Output as _udpa_annotations_StatusAnnotation__Output } from './udpa/annotations/StatusAnnotation';
import type { AnyRules as _validate_AnyRules, AnyRules__Output as _validate_AnyRules__Output } from './validate/AnyRules';
import type { BoolRules as _validate_BoolRules, BoolRules__Output as _validate_BoolRules__Output } from './validate/BoolRules';
import type { BytesRules as _validate_BytesRules, BytesRules__Output as _validate_BytesRules__Output } from './validate/BytesRules';
import type { DoubleRules as _validate_DoubleRules, DoubleRules__Output as _validate_DoubleRules__Output } from './validate/DoubleRules';
import type { DurationRules as _validate_DurationRules, DurationRules__Output as _validate_DurationRules__Output } from './validate/DurationRules';
import type { EnumRules as _validate_EnumRules, EnumRules__Output as _validate_EnumRules__Output } from './validate/EnumRules';
import type { FieldRules as _validate_FieldRules, FieldRules__Output as _validate_FieldRules__Output } from './validate/FieldRules';
import type { Fixed32Rules as _validate_Fixed32Rules, Fixed32Rules__Output as _validate_Fixed32Rules__Output } from './validate/Fixed32Rules';
import type { Fixed64Rules as _validate_Fixed64Rules, Fixed64Rules__Output as _validate_Fixed64Rules__Output } from './validate/Fixed64Rules';
import type { FloatRules as _validate_FloatRules, FloatRules__Output as _validate_FloatRules__Output } from './validate/FloatRules';
import type { Int32Rules as _validate_Int32Rules, Int32Rules__Output as _validate_Int32Rules__Output } from './validate/Int32Rules';
import type { Int64Rules as _validate_Int64Rules, Int64Rules__Output as _validate_Int64Rules__Output } from './validate/Int64Rules';
import type { MapRules as _validate_MapRules, MapRules__Output as _validate_MapRules__Output } from './validate/MapRules';
import type { MessageRules as _validate_MessageRules, MessageRules__Output as _validate_MessageRules__Output } from './validate/MessageRules';
import type { RepeatedRules as _validate_RepeatedRules, RepeatedRules__Output as _validate_RepeatedRules__Output } from './validate/RepeatedRules';
import type { SFixed32Rules as _validate_SFixed32Rules, SFixed32Rules__Output as _validate_SFixed32Rules__Output } from './validate/SFixed32Rules';
import type { SFixed64Rules as _validate_SFixed64Rules, SFixed64Rules__Output as _validate_SFixed64Rules__Output } from './validate/SFixed64Rules';
import type { SInt32Rules as _validate_SInt32Rules, SInt32Rules__Output as _validate_SInt32Rules__Output } from './validate/SInt32Rules';
import type { SInt64Rules as _validate_SInt64Rules, SInt64Rules__Output as _validate_SInt64Rules__Output } from './validate/SInt64Rules';
import type { StringRules as _validate_StringRules, StringRules__Output as _validate_StringRules__Output } from './validate/StringRules';
import type { TimestampRules as _validate_TimestampRules, TimestampRules__Output as _validate_TimestampRules__Output } from './validate/TimestampRules';
import type { UInt32Rules as _validate_UInt32Rules, UInt32Rules__Output as _validate_UInt32Rules__Output } from './validate/UInt32Rules';
import type { UInt64Rules as _validate_UInt64Rules, UInt64Rules__Output as _validate_UInt64Rules__Output } from './validate/UInt64Rules';
type SubtypeConstructor<Constructor extends new (...args: any) => any, Subtype> = {
new(...args: ConstructorParameters<Constructor>): Subtype;
};
export interface ProtoGrpcType {
envoy: {
extensions: {
load_balancing_policies: {
client_side_weighted_round_robin: {
v3: {
ClientSideWeightedRoundRobin: MessageTypeDefinition<_envoy_extensions_load_balancing_policies_client_side_weighted_round_robin_v3_ClientSideWeightedRoundRobin, _envoy_extensions_load_balancing_policies_client_side_weighted_round_robin_v3_ClientSideWeightedRoundRobin__Output>
}
}
}
}
}
google: {
protobuf: {
BoolValue: MessageTypeDefinition<_google_protobuf_BoolValue, _google_protobuf_BoolValue__Output>
BytesValue: MessageTypeDefinition<_google_protobuf_BytesValue, _google_protobuf_BytesValue__Output>
DescriptorProto: MessageTypeDefinition<_google_protobuf_DescriptorProto, _google_protobuf_DescriptorProto__Output>
DoubleValue: MessageTypeDefinition<_google_protobuf_DoubleValue, _google_protobuf_DoubleValue__Output>
Duration: MessageTypeDefinition<_google_protobuf_Duration, _google_protobuf_Duration__Output>
Edition: EnumTypeDefinition
EnumDescriptorProto: MessageTypeDefinition<_google_protobuf_EnumDescriptorProto, _google_protobuf_EnumDescriptorProto__Output>
EnumOptions: MessageTypeDefinition<_google_protobuf_EnumOptions, _google_protobuf_EnumOptions__Output>
EnumValueDescriptorProto: MessageTypeDefinition<_google_protobuf_EnumValueDescriptorProto, _google_protobuf_EnumValueDescriptorProto__Output>
EnumValueOptions: MessageTypeDefinition<_google_protobuf_EnumValueOptions, _google_protobuf_EnumValueOptions__Output>
ExtensionRangeOptions: MessageTypeDefinition<_google_protobuf_ExtensionRangeOptions, _google_protobuf_ExtensionRangeOptions__Output>
FeatureSet: MessageTypeDefinition<_google_protobuf_FeatureSet, _google_protobuf_FeatureSet__Output>
FeatureSetDefaults: MessageTypeDefinition<_google_protobuf_FeatureSetDefaults, _google_protobuf_FeatureSetDefaults__Output>
FieldDescriptorProto: MessageTypeDefinition<_google_protobuf_FieldDescriptorProto, _google_protobuf_FieldDescriptorProto__Output>
FieldOptions: MessageTypeDefinition<_google_protobuf_FieldOptions, _google_protobuf_FieldOptions__Output>
FileDescriptorProto: MessageTypeDefinition<_google_protobuf_FileDescriptorProto, _google_protobuf_FileDescriptorProto__Output>
FileDescriptorSet: MessageTypeDefinition<_google_protobuf_FileDescriptorSet, _google_protobuf_FileDescriptorSet__Output>
FileOptions: MessageTypeDefinition<_google_protobuf_FileOptions, _google_protobuf_FileOptions__Output>
FloatValue: MessageTypeDefinition<_google_protobuf_FloatValue, _google_protobuf_FloatValue__Output>
GeneratedCodeInfo: MessageTypeDefinition<_google_protobuf_GeneratedCodeInfo, _google_protobuf_GeneratedCodeInfo__Output>
Int32Value: MessageTypeDefinition<_google_protobuf_Int32Value, _google_protobuf_Int32Value__Output>
Int64Value: MessageTypeDefinition<_google_protobuf_Int64Value, _google_protobuf_Int64Value__Output>
MessageOptions: MessageTypeDefinition<_google_protobuf_MessageOptions, _google_protobuf_MessageOptions__Output>
MethodDescriptorProto: MessageTypeDefinition<_google_protobuf_MethodDescriptorProto, _google_protobuf_MethodDescriptorProto__Output>
MethodOptions: MessageTypeDefinition<_google_protobuf_MethodOptions, _google_protobuf_MethodOptions__Output>
OneofDescriptorProto: MessageTypeDefinition<_google_protobuf_OneofDescriptorProto, _google_protobuf_OneofDescriptorProto__Output>
OneofOptions: MessageTypeDefinition<_google_protobuf_OneofOptions, _google_protobuf_OneofOptions__Output>
ServiceDescriptorProto: MessageTypeDefinition<_google_protobuf_ServiceDescriptorProto, _google_protobuf_ServiceDescriptorProto__Output>
ServiceOptions: MessageTypeDefinition<_google_protobuf_ServiceOptions, _google_protobuf_ServiceOptions__Output>
SourceCodeInfo: MessageTypeDefinition<_google_protobuf_SourceCodeInfo, _google_protobuf_SourceCodeInfo__Output>
StringValue: MessageTypeDefinition<_google_protobuf_StringValue, _google_protobuf_StringValue__Output>
SymbolVisibility: EnumTypeDefinition
Timestamp: MessageTypeDefinition<_google_protobuf_Timestamp, _google_protobuf_Timestamp__Output>
UInt32Value: MessageTypeDefinition<_google_protobuf_UInt32Value, _google_protobuf_UInt32Value__Output>
UInt64Value: MessageTypeDefinition<_google_protobuf_UInt64Value, _google_protobuf_UInt64Value__Output>
UninterpretedOption: MessageTypeDefinition<_google_protobuf_UninterpretedOption, _google_protobuf_UninterpretedOption__Output>
}
}
udpa: {
annotations: {
PackageVersionStatus: EnumTypeDefinition
StatusAnnotation: MessageTypeDefinition<_udpa_annotations_StatusAnnotation, _udpa_annotations_StatusAnnotation__Output>
}
}
validate: {
AnyRules: MessageTypeDefinition<_validate_AnyRules, _validate_AnyRules__Output>
BoolRules: MessageTypeDefinition<_validate_BoolRules, _validate_BoolRules__Output>
BytesRules: MessageTypeDefinition<_validate_BytesRules, _validate_BytesRules__Output>
DoubleRules: MessageTypeDefinition<_validate_DoubleRules, _validate_DoubleRules__Output>
DurationRules: MessageTypeDefinition<_validate_DurationRules, _validate_DurationRules__Output>
EnumRules: MessageTypeDefinition<_validate_EnumRules, _validate_EnumRules__Output>
FieldRules: MessageTypeDefinition<_validate_FieldRules, _validate_FieldRules__Output>
Fixed32Rules: MessageTypeDefinition<_validate_Fixed32Rules, _validate_Fixed32Rules__Output>
Fixed64Rules: MessageTypeDefinition<_validate_Fixed64Rules, _validate_Fixed64Rules__Output>
FloatRules: MessageTypeDefinition<_validate_FloatRules, _validate_FloatRules__Output>
Int32Rules: MessageTypeDefinition<_validate_Int32Rules, _validate_Int32Rules__Output>
Int64Rules: MessageTypeDefinition<_validate_Int64Rules, _validate_Int64Rules__Output>
KnownRegex: EnumTypeDefinition
MapRules: MessageTypeDefinition<_validate_MapRules, _validate_MapRules__Output>
MessageRules: MessageTypeDefinition<_validate_MessageRules, _validate_MessageRules__Output>
RepeatedRules: MessageTypeDefinition<_validate_RepeatedRules, _validate_RepeatedRules__Output>
SFixed32Rules: MessageTypeDefinition<_validate_SFixed32Rules, _validate_SFixed32Rules__Output>
SFixed64Rules: MessageTypeDefinition<_validate_SFixed64Rules, _validate_SFixed64Rules__Output>
SInt32Rules: MessageTypeDefinition<_validate_SInt32Rules, _validate_SInt32Rules__Output>
SInt64Rules: MessageTypeDefinition<_validate_SInt64Rules, _validate_SInt64Rules__Output>
StringRules: MessageTypeDefinition<_validate_StringRules, _validate_StringRules__Output>
TimestampRules: MessageTypeDefinition<_validate_TimestampRules, _validate_TimestampRules__Output>
UInt32Rules: MessageTypeDefinition<_validate_UInt32Rules, _validate_UInt32Rules__Output>
UInt64Rules: MessageTypeDefinition<_validate_UInt64Rules, _validate_UInt64Rules__Output>
}
}

View File

@ -0,0 +1,129 @@
// Original file: deps/envoy-api/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3/client_side_weighted_round_robin.proto
import type { BoolValue as _google_protobuf_BoolValue, BoolValue__Output as _google_protobuf_BoolValue__Output } from '../../../../../google/protobuf/BoolValue';
import type { Duration as _google_protobuf_Duration, Duration__Output as _google_protobuf_Duration__Output } from '../../../../../google/protobuf/Duration';
import type { FloatValue as _google_protobuf_FloatValue, FloatValue__Output as _google_protobuf_FloatValue__Output } from '../../../../../google/protobuf/FloatValue';
/**
* Configuration for the client_side_weighted_round_robin LB policy.
*
* This policy differs from the built-in ROUND_ROBIN policy in terms of
* how the endpoint weights are determined. In the ROUND_ROBIN policy,
* the endpoint weights are sent by the control plane via EDS. However,
* in this policy, the endpoint weights are instead determined via qps (queries
* per second), eps (errors per second), and utilization metrics sent by the
* endpoint using the Open Request Cost Aggregation (ORCA) protocol. Utilization
* is determined by using the ORCA application_utilization field, if set, or
* else falling back to the cpu_utilization field. All queries count toward qps,
* regardless of result. Only failed queries count toward eps. A config
* parameter error_utilization_penalty controls the penalty to adjust endpoint
* weights using eps and qps. The weight of a given endpoint is computed as:
* qps / (utilization + eps/qps * error_utilization_penalty)
*
* See the :ref:`load balancing architecture overview<arch_overview_load_balancing_types>` for more information.
*
* [#next-free-field: 7]
*/
export interface ClientSideWeightedRoundRobin {
/**
* Whether to enable out-of-band utilization reporting collection from
* the endpoints. By default, per-request utilization reporting is used.
*/
'enable_oob_load_report'?: (_google_protobuf_BoolValue | null);
/**
* Load reporting interval to request from the server. Note that the
* server may not provide reports as frequently as the client requests.
* Used only when enable_oob_load_report is true. Default is 10 seconds.
*/
'oob_reporting_period'?: (_google_protobuf_Duration | null);
/**
* A given endpoint must report load metrics continuously for at least
* this long before the endpoint weight will be used. This avoids
* churn when the set of endpoint addresses changes. Takes effect
* both immediately after we establish a connection to an endpoint and
* after weight_expiration_period has caused us to stop using the most
* recent load metrics. Default is 10 seconds.
*/
'blackout_period'?: (_google_protobuf_Duration | null);
/**
* If a given endpoint has not reported load metrics in this long,
* then we stop using the reported weight. This ensures that we do
* not continue to use very stale weights. Once we stop using a stale
* value, if we later start seeing fresh reports again, the
* blackout_period applies. Defaults to 3 minutes.
*/
'weight_expiration_period'?: (_google_protobuf_Duration | null);
/**
* How often endpoint weights are recalculated. Values less than 100ms are
* capped at 100ms. Default is 1 second.
*/
'weight_update_period'?: (_google_protobuf_Duration | null);
/**
* The multiplier used to adjust endpoint weights with the error rate
* calculated as eps/qps. Configuration is rejected if this value is negative.
* Default is 1.0.
*/
'error_utilization_penalty'?: (_google_protobuf_FloatValue | null);
}
/**
* Configuration for the client_side_weighted_round_robin LB policy.
*
* This policy differs from the built-in ROUND_ROBIN policy in terms of
* how the endpoint weights are determined. In the ROUND_ROBIN policy,
* the endpoint weights are sent by the control plane via EDS. However,
* in this policy, the endpoint weights are instead determined via qps (queries
* per second), eps (errors per second), and utilization metrics sent by the
* endpoint using the Open Request Cost Aggregation (ORCA) protocol. Utilization
* is determined by using the ORCA application_utilization field, if set, or
* else falling back to the cpu_utilization field. All queries count toward qps,
* regardless of result. Only failed queries count toward eps. A config
* parameter error_utilization_penalty controls the penalty to adjust endpoint
* weights using eps and qps. The weight of a given endpoint is computed as:
* qps / (utilization + eps/qps * error_utilization_penalty)
*
* See the :ref:`load balancing architecture overview<arch_overview_load_balancing_types>` for more information.
*
* [#next-free-field: 7]
*/
export interface ClientSideWeightedRoundRobin__Output {
/**
* Whether to enable out-of-band utilization reporting collection from
* the endpoints. By default, per-request utilization reporting is used.
*/
'enable_oob_load_report': (_google_protobuf_BoolValue__Output | null);
/**
* Load reporting interval to request from the server. Note that the
* server may not provide reports as frequently as the client requests.
* Used only when enable_oob_load_report is true. Default is 10 seconds.
*/
'oob_reporting_period': (_google_protobuf_Duration__Output | null);
/**
* A given endpoint must report load metrics continuously for at least
* this long before the endpoint weight will be used. This avoids
* churn when the set of endpoint addresses changes. Takes effect
* both immediately after we establish a connection to an endpoint and
* after weight_expiration_period has caused us to stop using the most
* recent load metrics. Default is 10 seconds.
*/
'blackout_period': (_google_protobuf_Duration__Output | null);
/**
* If a given endpoint has not reported load metrics in this long,
* then we stop using the reported weight. This ensures that we do
* not continue to use very stale weights. Once we stop using a stale
* value, if we later start seeing fresh reports again, the
* blackout_period applies. Defaults to 3 minutes.
*/
'weight_expiration_period': (_google_protobuf_Duration__Output | null);
/**
* How often endpoint weights are recalculated. Values less than 100ms are
* capped at 100ms. Default is 1 second.
*/
'weight_update_period': (_google_protobuf_Duration__Output | null);
/**
* The multiplier used to adjust endpoint weights with the error rate
* calculated as eps/qps. Configuration is rejected if this value is negative.
* Default is 1.0.
*/
'error_utilization_penalty': (_google_protobuf_FloatValue__Output | null);
}

View File

@ -2,7 +2,6 @@
import type { FeatureSet as _google_protobuf_FeatureSet, FeatureSet__Output as _google_protobuf_FeatureSet__Output } from '../../google/protobuf/FeatureSet';
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
import type { MigrateAnnotation as _udpa_annotations_MigrateAnnotation, MigrateAnnotation__Output as _udpa_annotations_MigrateAnnotation__Output } from '../../udpa/annotations/MigrateAnnotation';
export interface EnumOptions {
'allowAlias'?: (boolean);
@ -13,7 +12,6 @@ export interface EnumOptions {
'deprecatedLegacyJsonFieldConflicts'?: (boolean);
'features'?: (_google_protobuf_FeatureSet | null);
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
'.udpa.annotations.enum_migrate'?: (_udpa_annotations_MigrateAnnotation | null);
}
export interface EnumOptions__Output {
@ -25,5 +23,4 @@ export interface EnumOptions__Output {
'deprecatedLegacyJsonFieldConflicts': (boolean);
'features': (_google_protobuf_FeatureSet__Output | null);
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
'.udpa.annotations.enum_migrate': (_udpa_annotations_MigrateAnnotation__Output | null);
}

View File

@ -3,7 +3,6 @@
import type { FeatureSet as _google_protobuf_FeatureSet, FeatureSet__Output as _google_protobuf_FeatureSet__Output } from '../../google/protobuf/FeatureSet';
import type { _google_protobuf_FieldOptions_FeatureSupport, _google_protobuf_FieldOptions_FeatureSupport__Output } from '../../google/protobuf/FieldOptions';
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
import type { MigrateAnnotation as _udpa_annotations_MigrateAnnotation, MigrateAnnotation__Output as _udpa_annotations_MigrateAnnotation__Output } from '../../udpa/annotations/MigrateAnnotation';
export interface EnumValueOptions {
'deprecated'?: (boolean);
@ -11,9 +10,6 @@ export interface EnumValueOptions {
'debugRedact'?: (boolean);
'featureSupport'?: (_google_protobuf_FieldOptions_FeatureSupport | null);
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
'.envoy.annotations.disallowed_by_default_enum'?: (boolean);
'.udpa.annotations.enum_value_migrate'?: (_udpa_annotations_MigrateAnnotation | null);
'.envoy.annotations.deprecated_at_minor_version_enum'?: (string);
}
export interface EnumValueOptions__Output {
@ -22,7 +18,4 @@ export interface EnumValueOptions__Output {
'debugRedact': (boolean);
'featureSupport': (_google_protobuf_FieldOptions_FeatureSupport__Output | null);
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
'.envoy.annotations.disallowed_by_default_enum': (boolean);
'.udpa.annotations.enum_value_migrate': (_udpa_annotations_MigrateAnnotation__Output | null);
'.envoy.annotations.deprecated_at_minor_version_enum': (string);
}

View File

@ -3,8 +3,6 @@
import type { FeatureSet as _google_protobuf_FeatureSet, FeatureSet__Output as _google_protobuf_FeatureSet__Output } from '../../google/protobuf/FeatureSet';
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
import type { FieldRules as _validate_FieldRules, FieldRules__Output as _validate_FieldRules__Output } from '../../validate/FieldRules';
import type { FieldMigrateAnnotation as _udpa_annotations_FieldMigrateAnnotation, FieldMigrateAnnotation__Output as _udpa_annotations_FieldMigrateAnnotation__Output } from '../../udpa/annotations/FieldMigrateAnnotation';
import type { FieldStatusAnnotation as _xds_annotations_v3_FieldStatusAnnotation, FieldStatusAnnotation__Output as _xds_annotations_v3_FieldStatusAnnotation__Output } from '../../xds/annotations/v3/FieldStatusAnnotation';
import type { Edition as _google_protobuf_Edition, Edition__Output as _google_protobuf_Edition__Output } from '../../google/protobuf/Edition';
// Original file: null
@ -143,10 +141,6 @@ export interface FieldOptions {
'featureSupport'?: (_google_protobuf_FieldOptions_FeatureSupport | null);
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
'.validate.rules'?: (_validate_FieldRules | null);
'.envoy.annotations.deprecated_at_minor_version'?: (string);
'.udpa.annotations.field_migrate'?: (_udpa_annotations_FieldMigrateAnnotation | null);
'.envoy.annotations.disallowed_by_default'?: (boolean);
'.xds.annotations.v3.field_status'?: (_xds_annotations_v3_FieldStatusAnnotation | null);
}
export interface FieldOptions__Output {
@ -168,8 +162,4 @@ export interface FieldOptions__Output {
'featureSupport': (_google_protobuf_FieldOptions_FeatureSupport__Output | null);
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
'.validate.rules': (_validate_FieldRules__Output | null);
'.envoy.annotations.deprecated_at_minor_version': (string);
'.udpa.annotations.field_migrate': (_udpa_annotations_FieldMigrateAnnotation__Output | null);
'.envoy.annotations.disallowed_by_default': (boolean);
'.xds.annotations.v3.field_status': (_xds_annotations_v3_FieldStatusAnnotation__Output | null);
}

View File

@ -2,9 +2,7 @@
import type { FeatureSet as _google_protobuf_FeatureSet, FeatureSet__Output as _google_protobuf_FeatureSet__Output } from '../../google/protobuf/FeatureSet';
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
import type { FileMigrateAnnotation as _udpa_annotations_FileMigrateAnnotation, FileMigrateAnnotation__Output as _udpa_annotations_FileMigrateAnnotation__Output } from '../../udpa/annotations/FileMigrateAnnotation';
import type { StatusAnnotation as _udpa_annotations_StatusAnnotation, StatusAnnotation__Output as _udpa_annotations_StatusAnnotation__Output } from '../../udpa/annotations/StatusAnnotation';
import type { FileStatusAnnotation as _xds_annotations_v3_FileStatusAnnotation, FileStatusAnnotation__Output as _xds_annotations_v3_FileStatusAnnotation__Output } from '../../xds/annotations/v3/FileStatusAnnotation';
// Original file: null
@ -49,9 +47,7 @@ export interface FileOptions {
'rubyPackage'?: (string);
'features'?: (_google_protobuf_FeatureSet | null);
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
'.udpa.annotations.file_migrate'?: (_udpa_annotations_FileMigrateAnnotation | null);
'.udpa.annotations.file_status'?: (_udpa_annotations_StatusAnnotation | null);
'.xds.annotations.v3.file_status'?: (_xds_annotations_v3_FileStatusAnnotation | null);
}
export interface FileOptions__Output {
@ -79,7 +75,5 @@ export interface FileOptions__Output {
'rubyPackage': (string);
'features': (_google_protobuf_FeatureSet__Output | null);
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
'.udpa.annotations.file_migrate': (_udpa_annotations_FileMigrateAnnotation__Output | null);
'.udpa.annotations.file_status': (_udpa_annotations_StatusAnnotation__Output | null);
'.xds.annotations.v3.file_status': (_xds_annotations_v3_FileStatusAnnotation__Output | null);
}

View File

@ -2,9 +2,6 @@
import type { FeatureSet as _google_protobuf_FeatureSet, FeatureSet__Output as _google_protobuf_FeatureSet__Output } from '../../google/protobuf/FeatureSet';
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
import type { VersioningAnnotation as _udpa_annotations_VersioningAnnotation, VersioningAnnotation__Output as _udpa_annotations_VersioningAnnotation__Output } from '../../udpa/annotations/VersioningAnnotation';
import type { MigrateAnnotation as _udpa_annotations_MigrateAnnotation, MigrateAnnotation__Output as _udpa_annotations_MigrateAnnotation__Output } from '../../udpa/annotations/MigrateAnnotation';
import type { MessageStatusAnnotation as _xds_annotations_v3_MessageStatusAnnotation, MessageStatusAnnotation__Output as _xds_annotations_v3_MessageStatusAnnotation__Output } from '../../xds/annotations/v3/MessageStatusAnnotation';
export interface MessageOptions {
'messageSetWireFormat'?: (boolean);
@ -18,9 +15,6 @@ export interface MessageOptions {
'features'?: (_google_protobuf_FeatureSet | null);
'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
'.validate.disabled'?: (boolean);
'.udpa.annotations.versioning'?: (_udpa_annotations_VersioningAnnotation | null);
'.udpa.annotations.message_migrate'?: (_udpa_annotations_MigrateAnnotation | null);
'.xds.annotations.v3.message_status'?: (_xds_annotations_v3_MessageStatusAnnotation | null);
}
export interface MessageOptions__Output {
@ -35,7 +29,4 @@ export interface MessageOptions__Output {
'features': (_google_protobuf_FeatureSet__Output | null);
'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
'.validate.disabled': (boolean);
'.udpa.annotations.versioning': (_udpa_annotations_VersioningAnnotation__Output | null);
'.udpa.annotations.message_migrate': (_udpa_annotations_MigrateAnnotation__Output | null);
'.xds.annotations.v3.message_status': (_xds_annotations_v3_MessageStatusAnnotation__Output | null);
}

View File

@ -1,58 +1,10 @@
import type * as grpc from '@grpc/grpc-js';
import type { EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader';
import type { DescriptorProto as _google_protobuf_DescriptorProto, DescriptorProto__Output as _google_protobuf_DescriptorProto__Output } from './google/protobuf/DescriptorProto';
import type { Duration as _google_protobuf_Duration, Duration__Output as _google_protobuf_Duration__Output } from './google/protobuf/Duration';
import type { EnumDescriptorProto as _google_protobuf_EnumDescriptorProto, EnumDescriptorProto__Output as _google_protobuf_EnumDescriptorProto__Output } from './google/protobuf/EnumDescriptorProto';
import type { EnumOptions as _google_protobuf_EnumOptions, EnumOptions__Output as _google_protobuf_EnumOptions__Output } from './google/protobuf/EnumOptions';
import type { EnumValueDescriptorProto as _google_protobuf_EnumValueDescriptorProto, EnumValueDescriptorProto__Output as _google_protobuf_EnumValueDescriptorProto__Output } from './google/protobuf/EnumValueDescriptorProto';
import type { EnumValueOptions as _google_protobuf_EnumValueOptions, EnumValueOptions__Output as _google_protobuf_EnumValueOptions__Output } from './google/protobuf/EnumValueOptions';
import type { ExtensionRangeOptions as _google_protobuf_ExtensionRangeOptions, ExtensionRangeOptions__Output as _google_protobuf_ExtensionRangeOptions__Output } from './google/protobuf/ExtensionRangeOptions';
import type { FeatureSet as _google_protobuf_FeatureSet, FeatureSet__Output as _google_protobuf_FeatureSet__Output } from './google/protobuf/FeatureSet';
import type { FeatureSetDefaults as _google_protobuf_FeatureSetDefaults, FeatureSetDefaults__Output as _google_protobuf_FeatureSetDefaults__Output } from './google/protobuf/FeatureSetDefaults';
import type { FieldDescriptorProto as _google_protobuf_FieldDescriptorProto, FieldDescriptorProto__Output as _google_protobuf_FieldDescriptorProto__Output } from './google/protobuf/FieldDescriptorProto';
import type { FieldOptions as _google_protobuf_FieldOptions, FieldOptions__Output as _google_protobuf_FieldOptions__Output } from './google/protobuf/FieldOptions';
import type { FileDescriptorProto as _google_protobuf_FileDescriptorProto, FileDescriptorProto__Output as _google_protobuf_FileDescriptorProto__Output } from './google/protobuf/FileDescriptorProto';
import type { FileDescriptorSet as _google_protobuf_FileDescriptorSet, FileDescriptorSet__Output as _google_protobuf_FileDescriptorSet__Output } from './google/protobuf/FileDescriptorSet';
import type { FileOptions as _google_protobuf_FileOptions, FileOptions__Output as _google_protobuf_FileOptions__Output } from './google/protobuf/FileOptions';
import type { GeneratedCodeInfo as _google_protobuf_GeneratedCodeInfo, GeneratedCodeInfo__Output as _google_protobuf_GeneratedCodeInfo__Output } from './google/protobuf/GeneratedCodeInfo';
import type { ListValue as _google_protobuf_ListValue, ListValue__Output as _google_protobuf_ListValue__Output } from './google/protobuf/ListValue';
import type { MessageOptions as _google_protobuf_MessageOptions, MessageOptions__Output as _google_protobuf_MessageOptions__Output } from './google/protobuf/MessageOptions';
import type { MethodDescriptorProto as _google_protobuf_MethodDescriptorProto, MethodDescriptorProto__Output as _google_protobuf_MethodDescriptorProto__Output } from './google/protobuf/MethodDescriptorProto';
import type { MethodOptions as _google_protobuf_MethodOptions, MethodOptions__Output as _google_protobuf_MethodOptions__Output } from './google/protobuf/MethodOptions';
import type { OneofDescriptorProto as _google_protobuf_OneofDescriptorProto, OneofDescriptorProto__Output as _google_protobuf_OneofDescriptorProto__Output } from './google/protobuf/OneofDescriptorProto';
import type { OneofOptions as _google_protobuf_OneofOptions, OneofOptions__Output as _google_protobuf_OneofOptions__Output } from './google/protobuf/OneofOptions';
import type { ServiceDescriptorProto as _google_protobuf_ServiceDescriptorProto, ServiceDescriptorProto__Output as _google_protobuf_ServiceDescriptorProto__Output } from './google/protobuf/ServiceDescriptorProto';
import type { ServiceOptions as _google_protobuf_ServiceOptions, ServiceOptions__Output as _google_protobuf_ServiceOptions__Output } from './google/protobuf/ServiceOptions';
import type { SourceCodeInfo as _google_protobuf_SourceCodeInfo, SourceCodeInfo__Output as _google_protobuf_SourceCodeInfo__Output } from './google/protobuf/SourceCodeInfo';
import type { Struct as _google_protobuf_Struct, Struct__Output as _google_protobuf_Struct__Output } from './google/protobuf/Struct';
import type { Timestamp as _google_protobuf_Timestamp, Timestamp__Output as _google_protobuf_Timestamp__Output } from './google/protobuf/Timestamp';
import type { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from './google/protobuf/UninterpretedOption';
import type { Value as _google_protobuf_Value, Value__Output as _google_protobuf_Value__Output } from './google/protobuf/Value';
import type { TypedStruct as _udpa_type_v1_TypedStruct, TypedStruct__Output as _udpa_type_v1_TypedStruct__Output } from './udpa/type/v1/TypedStruct';
import type { AnyRules as _validate_AnyRules, AnyRules__Output as _validate_AnyRules__Output } from './validate/AnyRules';
import type { BoolRules as _validate_BoolRules, BoolRules__Output as _validate_BoolRules__Output } from './validate/BoolRules';
import type { BytesRules as _validate_BytesRules, BytesRules__Output as _validate_BytesRules__Output } from './validate/BytesRules';
import type { DoubleRules as _validate_DoubleRules, DoubleRules__Output as _validate_DoubleRules__Output } from './validate/DoubleRules';
import type { DurationRules as _validate_DurationRules, DurationRules__Output as _validate_DurationRules__Output } from './validate/DurationRules';
import type { EnumRules as _validate_EnumRules, EnumRules__Output as _validate_EnumRules__Output } from './validate/EnumRules';
import type { FieldRules as _validate_FieldRules, FieldRules__Output as _validate_FieldRules__Output } from './validate/FieldRules';
import type { Fixed32Rules as _validate_Fixed32Rules, Fixed32Rules__Output as _validate_Fixed32Rules__Output } from './validate/Fixed32Rules';
import type { Fixed64Rules as _validate_Fixed64Rules, Fixed64Rules__Output as _validate_Fixed64Rules__Output } from './validate/Fixed64Rules';
import type { FloatRules as _validate_FloatRules, FloatRules__Output as _validate_FloatRules__Output } from './validate/FloatRules';
import type { Int32Rules as _validate_Int32Rules, Int32Rules__Output as _validate_Int32Rules__Output } from './validate/Int32Rules';
import type { Int64Rules as _validate_Int64Rules, Int64Rules__Output as _validate_Int64Rules__Output } from './validate/Int64Rules';
import type { MapRules as _validate_MapRules, MapRules__Output as _validate_MapRules__Output } from './validate/MapRules';
import type { MessageRules as _validate_MessageRules, MessageRules__Output as _validate_MessageRules__Output } from './validate/MessageRules';
import type { RepeatedRules as _validate_RepeatedRules, RepeatedRules__Output as _validate_RepeatedRules__Output } from './validate/RepeatedRules';
import type { SFixed32Rules as _validate_SFixed32Rules, SFixed32Rules__Output as _validate_SFixed32Rules__Output } from './validate/SFixed32Rules';
import type { SFixed64Rules as _validate_SFixed64Rules, SFixed64Rules__Output as _validate_SFixed64Rules__Output } from './validate/SFixed64Rules';
import type { SInt32Rules as _validate_SInt32Rules, SInt32Rules__Output as _validate_SInt32Rules__Output } from './validate/SInt32Rules';
import type { SInt64Rules as _validate_SInt64Rules, SInt64Rules__Output as _validate_SInt64Rules__Output } from './validate/SInt64Rules';
import type { StringRules as _validate_StringRules, StringRules__Output as _validate_StringRules__Output } from './validate/StringRules';
import type { TimestampRules as _validate_TimestampRules, TimestampRules__Output as _validate_TimestampRules__Output } from './validate/TimestampRules';
import type { UInt32Rules as _validate_UInt32Rules, UInt32Rules__Output as _validate_UInt32Rules__Output } from './validate/UInt32Rules';
import type { UInt64Rules as _validate_UInt64Rules, UInt64Rules__Output as _validate_UInt64Rules__Output } from './validate/UInt64Rules';
import type { TypedStruct as _xds_type_v3_TypedStruct, TypedStruct__Output as _xds_type_v3_TypedStruct__Output } from './xds/type/v3/TypedStruct';
type SubtypeConstructor<Constructor extends new (...args: any) => any, Subtype> = {
@ -62,36 +14,9 @@ type SubtypeConstructor<Constructor extends new (...args: any) => any, Subtype>
export interface ProtoGrpcType {
google: {
protobuf: {
DescriptorProto: MessageTypeDefinition<_google_protobuf_DescriptorProto, _google_protobuf_DescriptorProto__Output>
Duration: MessageTypeDefinition<_google_protobuf_Duration, _google_protobuf_Duration__Output>
Edition: EnumTypeDefinition
EnumDescriptorProto: MessageTypeDefinition<_google_protobuf_EnumDescriptorProto, _google_protobuf_EnumDescriptorProto__Output>
EnumOptions: MessageTypeDefinition<_google_protobuf_EnumOptions, _google_protobuf_EnumOptions__Output>
EnumValueDescriptorProto: MessageTypeDefinition<_google_protobuf_EnumValueDescriptorProto, _google_protobuf_EnumValueDescriptorProto__Output>
EnumValueOptions: MessageTypeDefinition<_google_protobuf_EnumValueOptions, _google_protobuf_EnumValueOptions__Output>
ExtensionRangeOptions: MessageTypeDefinition<_google_protobuf_ExtensionRangeOptions, _google_protobuf_ExtensionRangeOptions__Output>
FeatureSet: MessageTypeDefinition<_google_protobuf_FeatureSet, _google_protobuf_FeatureSet__Output>
FeatureSetDefaults: MessageTypeDefinition<_google_protobuf_FeatureSetDefaults, _google_protobuf_FeatureSetDefaults__Output>
FieldDescriptorProto: MessageTypeDefinition<_google_protobuf_FieldDescriptorProto, _google_protobuf_FieldDescriptorProto__Output>
FieldOptions: MessageTypeDefinition<_google_protobuf_FieldOptions, _google_protobuf_FieldOptions__Output>
FileDescriptorProto: MessageTypeDefinition<_google_protobuf_FileDescriptorProto, _google_protobuf_FileDescriptorProto__Output>
FileDescriptorSet: MessageTypeDefinition<_google_protobuf_FileDescriptorSet, _google_protobuf_FileDescriptorSet__Output>
FileOptions: MessageTypeDefinition<_google_protobuf_FileOptions, _google_protobuf_FileOptions__Output>
GeneratedCodeInfo: MessageTypeDefinition<_google_protobuf_GeneratedCodeInfo, _google_protobuf_GeneratedCodeInfo__Output>
ListValue: MessageTypeDefinition<_google_protobuf_ListValue, _google_protobuf_ListValue__Output>
MessageOptions: MessageTypeDefinition<_google_protobuf_MessageOptions, _google_protobuf_MessageOptions__Output>
MethodDescriptorProto: MessageTypeDefinition<_google_protobuf_MethodDescriptorProto, _google_protobuf_MethodDescriptorProto__Output>
MethodOptions: MessageTypeDefinition<_google_protobuf_MethodOptions, _google_protobuf_MethodOptions__Output>
NullValue: EnumTypeDefinition
OneofDescriptorProto: MessageTypeDefinition<_google_protobuf_OneofDescriptorProto, _google_protobuf_OneofDescriptorProto__Output>
OneofOptions: MessageTypeDefinition<_google_protobuf_OneofOptions, _google_protobuf_OneofOptions__Output>
ServiceDescriptorProto: MessageTypeDefinition<_google_protobuf_ServiceDescriptorProto, _google_protobuf_ServiceDescriptorProto__Output>
ServiceOptions: MessageTypeDefinition<_google_protobuf_ServiceOptions, _google_protobuf_ServiceOptions__Output>
SourceCodeInfo: MessageTypeDefinition<_google_protobuf_SourceCodeInfo, _google_protobuf_SourceCodeInfo__Output>
Struct: MessageTypeDefinition<_google_protobuf_Struct, _google_protobuf_Struct__Output>
SymbolVisibility: EnumTypeDefinition
Timestamp: MessageTypeDefinition<_google_protobuf_Timestamp, _google_protobuf_Timestamp__Output>
UninterpretedOption: MessageTypeDefinition<_google_protobuf_UninterpretedOption, _google_protobuf_UninterpretedOption__Output>
Value: MessageTypeDefinition<_google_protobuf_Value, _google_protobuf_Value__Output>
}
}
@ -102,32 +27,6 @@ export interface ProtoGrpcType {
}
}
}
validate: {
AnyRules: MessageTypeDefinition<_validate_AnyRules, _validate_AnyRules__Output>
BoolRules: MessageTypeDefinition<_validate_BoolRules, _validate_BoolRules__Output>
BytesRules: MessageTypeDefinition<_validate_BytesRules, _validate_BytesRules__Output>
DoubleRules: MessageTypeDefinition<_validate_DoubleRules, _validate_DoubleRules__Output>
DurationRules: MessageTypeDefinition<_validate_DurationRules, _validate_DurationRules__Output>
EnumRules: MessageTypeDefinition<_validate_EnumRules, _validate_EnumRules__Output>
FieldRules: MessageTypeDefinition<_validate_FieldRules, _validate_FieldRules__Output>
Fixed32Rules: MessageTypeDefinition<_validate_Fixed32Rules, _validate_Fixed32Rules__Output>
Fixed64Rules: MessageTypeDefinition<_validate_Fixed64Rules, _validate_Fixed64Rules__Output>
FloatRules: MessageTypeDefinition<_validate_FloatRules, _validate_FloatRules__Output>
Int32Rules: MessageTypeDefinition<_validate_Int32Rules, _validate_Int32Rules__Output>
Int64Rules: MessageTypeDefinition<_validate_Int64Rules, _validate_Int64Rules__Output>
KnownRegex: EnumTypeDefinition
MapRules: MessageTypeDefinition<_validate_MapRules, _validate_MapRules__Output>
MessageRules: MessageTypeDefinition<_validate_MessageRules, _validate_MessageRules__Output>
RepeatedRules: MessageTypeDefinition<_validate_RepeatedRules, _validate_RepeatedRules__Output>
SFixed32Rules: MessageTypeDefinition<_validate_SFixed32Rules, _validate_SFixed32Rules__Output>
SFixed64Rules: MessageTypeDefinition<_validate_SFixed64Rules, _validate_SFixed64Rules__Output>
SInt32Rules: MessageTypeDefinition<_validate_SInt32Rules, _validate_SInt32Rules__Output>
SInt64Rules: MessageTypeDefinition<_validate_SInt64Rules, _validate_SInt64Rules__Output>
StringRules: MessageTypeDefinition<_validate_StringRules, _validate_StringRules__Output>
TimestampRules: MessageTypeDefinition<_validate_TimestampRules, _validate_TimestampRules__Output>
UInt32Rules: MessageTypeDefinition<_validate_UInt32Rules, _validate_UInt32Rules__Output>
UInt64Rules: MessageTypeDefinition<_validate_UInt64Rules, _validate_UInt64Rules__Output>
}
xds: {
type: {
v3: {

View File

@ -8,8 +8,8 @@ import type { Any as _google_protobuf_Any, Any__Output as _google_protobuf_Any__
*/
export interface _xds_core_v3_CollectionEntry_InlineEntry {
/**
* Optional name to describe the inlined resource. Resource names must
* [a-zA-Z0-9_-\./]+ (TODO(htuch): turn this into a PGV constraint once
* Optional name to describe the inlined resource. Resource names must match
* ``[a-zA-Z0-9_-\./]+`` (TODO(htuch): turn this into a PGV constraint once
* finalized, probably should be a RFC3986 pchar). This name allows
* reference via the #entry directive in ResourceLocator.
*/
@ -30,8 +30,8 @@ export interface _xds_core_v3_CollectionEntry_InlineEntry {
*/
export interface _xds_core_v3_CollectionEntry_InlineEntry__Output {
/**
* Optional name to describe the inlined resource. Resource names must
* [a-zA-Z0-9_-\./]+ (TODO(htuch): turn this into a PGV constraint once
* Optional name to describe the inlined resource. Resource names must match
* ``[a-zA-Z0-9_-\./]+`` (TODO(htuch): turn this into a PGV constraint once
* finalized, probably should be a RFC3986 pchar). This name allows
* reference via the #entry directive in ResourceLocator.
*/
@ -52,6 +52,8 @@ export interface _xds_core_v3_CollectionEntry_InlineEntry__Output {
* appearing inside a list collection resource. List collection resources are
* regular Resource messages of type:
*
* .. code-block:: proto
*
* message <T>Collection {
* repeated CollectionEntry resources = 1;
* }
@ -73,6 +75,8 @@ export interface CollectionEntry {
* appearing inside a list collection resource. List collection resources are
* regular Resource messages of type:
*
* .. code-block:: proto
*
* message <T>Collection {
* repeated CollectionEntry resources = 1;
* }

View File

@ -6,6 +6,7 @@
* global context parameters, per-resource type client feature capabilities and per-resource
* type functional attributes. All per-resource type attributes will be `xds.resource.`
* prefixed and some of these are documented below:
*
* `xds.resource.listening_address`: The value is "IP:port" (e.g. "10.1.1.3:8080") which is
* the listening address of a Listener. Used in a Listener resource query.
*/
@ -18,6 +19,7 @@ export interface ContextParams {
* global context parameters, per-resource type client feature capabilities and per-resource
* type functional attributes. All per-resource type attributes will be `xds.resource.`
* prefixed and some of these are documented below:
*
* `xds.resource.listening_address`: The value is "IP:port" (e.g. "10.1.1.3:8080") which is
* the listening address of a Listener. Used in a Listener resource query.
*/

View File

@ -271,10 +271,9 @@ export interface Matcher {
*/
'matcher_tree'?: (_xds_type_matcher_v3_Matcher_MatcherTree | null);
/**
* Optional OnMatch to use if the matcher failed.
* If specified, the OnMatch is used, and the matcher is considered
* to have matched.
* If not specified, the matcher is considered not to have matched.
* Optional OnMatch to use if no matcher above matched (e.g., if there are no matchers specified
* above, or if none of the matches specified above succeeded).
* If no matcher above matched and this field is not populated, the match will be considered unsuccessful.
*/
'on_no_match'?: (_xds_type_matcher_v3_Matcher_OnMatch | null);
'matcher_type'?: "matcher_list"|"matcher_tree";
@ -297,10 +296,9 @@ export interface Matcher__Output {
*/
'matcher_tree'?: (_xds_type_matcher_v3_Matcher_MatcherTree__Output | null);
/**
* Optional OnMatch to use if the matcher failed.
* If specified, the OnMatch is used, and the matcher is considered
* to have matched.
* If not specified, the matcher is considered not to have matched.
* Optional OnMatch to use if no matcher above matched (e.g., if there are no matchers specified
* above, or if none of the matches specified above succeeded).
* If no matcher above matched and this field is not populated, the match will be considered unsuccessful.
*/
'on_no_match': (_xds_type_matcher_v3_Matcher_OnMatch__Output | null);
'matcher_type'?: "matcher_list"|"matcher_tree";

View File

@ -13,6 +13,7 @@ import type { Struct as _google_protobuf_Struct, Struct__Output as _google_proto
* When packing an opaque extension config, packing the expected type into Any is preferred
* wherever possible for its efficiency. TypedStruct should be used only if a proto descriptor
* is not available, for example if:
*
* - A control plane sends opaque message that is originally from external source in human readable
* format such as JSON or YAML.
* - The control plane doesn't have the knowledge of the protocol buffer schema hence it cannot
@ -23,6 +24,7 @@ import type { Struct as _google_protobuf_Struct, Struct__Output as _google_proto
* When a DPLB receives a TypedStruct in Any, it should:
* - Check if the type_url of the TypedStruct matches the type the extension expects.
* - Convert value to the type described in type_url and perform validation.
*
* TODO(lizan): Figure out how TypeStruct should be used with DPLB extensions that doesn't link
* protobuf descriptor with DPLB itself, (e.g. gRPC LB Plugin, Envoy WASM extensions).
*/
@ -50,6 +52,7 @@ export interface TypedStruct {
* When packing an opaque extension config, packing the expected type into Any is preferred
* wherever possible for its efficiency. TypedStruct should be used only if a proto descriptor
* is not available, for example if:
*
* - A control plane sends opaque message that is originally from external source in human readable
* format such as JSON or YAML.
* - The control plane doesn't have the knowledge of the protocol buffer schema hence it cannot
@ -60,6 +63,7 @@ export interface TypedStruct {
* When a DPLB receives a TypedStruct in Any, it should:
* - Check if the type_url of the TypedStruct matches the type the extension expects.
* - Convert value to the type described in type_url and perform validation.
*
* TODO(lizan): Figure out how TypeStruct should be used with DPLB extensions that doesn't link
* protobuf descriptor with DPLB itself, (e.g. gRPC LB Plugin, Envoy WASM extensions).
*/

View File

@ -30,6 +30,8 @@ import * as csds from './csds';
import * as round_robin_lb from './lb-policy-registry/round-robin';
import * as typed_struct_lb from './lb-policy-registry/typed-struct';
import * as pick_first_lb from './lb-policy-registry/pick-first';
import * as weighted_round_robin_lb from './lb-policy-registry/weighted-round-robin';
import * as wrr_locality from './lb-policy-registry/wrr-locality';
export { XdsServer } from './server';
export { XdsChannelCredentials, XdsServerCredentials } from './xds-credentials';
@ -59,4 +61,6 @@ export function register() {
round_robin_lb.setup();
typed_struct_lb.setup();
pick_first_lb.setup();
weighted_round_robin_lb.setup();
wrr_locality.setup();
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { LoadBalancingConfig } from "@grpc/grpc-js";
import { LoadBalancingPolicy__Output } from "../generated/envoy/config/cluster/v3/LoadBalancingPolicy";
import { TypedExtensionConfig__Output } from "../generated/envoy/config/core/v3/TypedExtensionConfig";
import { loadProtosWithOptionsSync } from "@grpc/proto-loader/build/src/util";
import { Any__Output } from "../generated/google/protobuf/Any";
import { ClientSideWeightedRoundRobin__Output } from "../generated/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3/ClientSideWeightedRoundRobin";
import { EXPERIMENTAL_WRR_LB } from "../environment";
import { registerLbPolicy } from "../lb-policy-registry";
const WEIGHTED_ROUND_ROBIN_TYPE_URL = 'type.googleapis.com/envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin';
const resourceRoot = loadProtosWithOptionsSync([
'envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3/client_side_weighted_round_robin.proto'], {
keepCase: true,
includeDirs: [
// Paths are relative to src/build/lb-policy-registry
__dirname + '/../../../deps/envoy-api/',
__dirname + '/../../../deps/xds/',
__dirname + '/../../../deps/protoc-gen-validate'
],
}
);
const toObjectOptions = {
longs: String,
enums: String,
defaults: true,
oneofs: true
};
function decodeWeightedRoundRobinConfig(message: Any__Output): ClientSideWeightedRoundRobin__Output {
const name = message.type_url.substring(message.type_url.lastIndexOf('/') + 1);
const type = resourceRoot.lookup(name);
if (type) {
const decodedMessage = (type as any).decode(message.value);
return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as ClientSideWeightedRoundRobin__Output;
} else {
throw new Error(`TypedStruct parsing error: unexpected type URL ${message.type_url}`);
}
}
function convertToLoadBalancingPolicy(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig | null {
if (protoPolicy.typed_config?.type_url !== WEIGHTED_ROUND_ROBIN_TYPE_URL) {
throw new Error(`Pick first LB policy parsing error: unexpected type URL ${protoPolicy.typed_config?.type_url}`);
}
const wrrMessage = decodeWeightedRoundRobinConfig(protoPolicy.typed_config);
return {
weighted_round_robin: {
enable_oob_load_report: wrrMessage.enable_oob_load_report?.value,
oob_load_reporting_period: wrrMessage.oob_reporting_period,
blackout_period: wrrMessage.blackout_period,
weight_expiration_period: wrrMessage.weight_expiration_period,
weight_update_period: wrrMessage.weight_update_period,
error_utilization_penalty: wrrMessage.error_utilization_penalty?.value
}
}
}
export function setup() {
if (EXPERIMENTAL_WRR_LB) {
registerLbPolicy(WEIGHTED_ROUND_ROBIN_TYPE_URL, convertToLoadBalancingPolicy);
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { LoadBalancingConfig } from "@grpc/grpc-js";
import { LoadBalancingPolicy__Output } from "../generated/envoy/config/cluster/v3/LoadBalancingPolicy";
import { TypedExtensionConfig__Output } from "../generated/envoy/config/core/v3/TypedExtensionConfig";
import { loadProtosWithOptionsSync } from "@grpc/proto-loader/build/src/util";
import { Any__Output } from "../generated/google/protobuf/Any";
import { registerLbPolicy } from "../lb-policy-registry";
import { WrrLocality__Output } from "../generated/envoy/extensions/load_balancing_policies/wrr_locality/v3/WrrLocality";
const WRR_LOCALITY_TYPE_URL = 'envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality';
const resourceRoot = loadProtosWithOptionsSync([
'envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto'], {
keepCase: true,
includeDirs: [
// Paths are relative to src/build/lb-policy-registry
__dirname + '/../../../deps/envoy-api/',
__dirname + '/../../../deps/xds/',
__dirname + '/../../../deps/protoc-gen-validate'
],
}
);
const toObjectOptions = {
longs: String,
enums: String,
defaults: true,
oneofs: true
}
function decodePickFirstConfig(message: Any__Output): WrrLocality__Output {
const name = message.type_url.substring(message.type_url.lastIndexOf('/') + 1);
const type = resourceRoot.lookup(name);
if (type) {
const decodedMessage = (type as any).decode(message.value);
return decodedMessage.$type.toObject(decodedMessage, toObjectOptions) as WrrLocality__Output;
} else {
throw new Error(`WRR Locality parsing error: unexpected type URL ${message.type_url}`);
}
}
function convertToLoadBalancingPolicy(protoPolicy: TypedExtensionConfig__Output, selectChildPolicy: (childPolicy: LoadBalancingPolicy__Output) => LoadBalancingConfig): LoadBalancingConfig | null {
if (protoPolicy.typed_config?.type_url !== WRR_LOCALITY_TYPE_URL) {
throw new Error(`WRR Locality LB policy parsing error: unexpected type URL ${protoPolicy.typed_config?.type_url}`);
}
const wrrLocalityMessage = decodePickFirstConfig(protoPolicy.typed_config);
if (!wrrLocalityMessage.endpoint_picking_policy) {
throw new Error('WRR Locality LB policy parsing error: no endpoint_picking_policy set');
}
return {
wrr_locality: {
shuffleAddressList: selectChildPolicy(wrrLocalityMessage.endpoint_picking_policy)
}
};
}
export function setup() {
registerLbPolicy(WRR_LOCALITY_TYPE_URL, convertToLoadBalancingPolicy);
}

View File

@ -173,7 +173,7 @@ class XdsClusterImplPicker implements Picker {
return {
pickResultType: originalPick.pickResultType,
status: originalPick.status,
subchannel: pickSubchannel?.getWrappedSubchannel() ?? null,
subchannel: pickSubchannel?.getWrappedSubchannel?.() ?? null,
onCallStarted: () => {
originalPick.onCallStarted?.();
pickSubchannel?.getStatsObject()?.addCallStarted();

View File

@ -21,11 +21,9 @@ import { ProtoGrpcType } from "./generated/echo";
import { EchoRequest__Output } from "./generated/grpc/testing/EchoRequest";
import { EchoResponse } from "./generated/grpc/testing/EchoResponse";
import * as net from 'net';
import { XdsServer } from "../src";
import { ControlPlaneServer } from "./xds-server";
import { findFreePorts } from 'find-free-ports';
import { XdsServerCredentials } from "../src/xds-credentials";
import { getPortsPromise } from 'portfinder';
const loadedProtos = loadPackageDefinition(loadSync(
[
@ -148,6 +146,6 @@ export class Backend {
}
export async function createBackends(count: number, useXdsServer?: boolean, creds?: ServerCredentials | undefined, serverOptions?: ServerOptions): Promise<Backend[]> {
const ports = await findFreePorts(count);
const ports = await getPortsPromise(count);
return ports.map(port => new Backend(port, useXdsServer ?? true, creds, serverOptions));
}

View File

@ -40,6 +40,7 @@ import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import StatusOr = experimental.StatusOr;
import { PickFirst } from "../src/generated/envoy/extensions/load_balancing_policies/pick_first/v3/PickFirst";
import { ClientSideWeightedRoundRobin } from "../src/generated/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3/ClientSideWeightedRoundRobin";
const LB_POLICY_NAME = 'test.RpcBehaviorLoadBalancer';
@ -344,5 +345,70 @@ describe('Custom LB policies', () => {
const error = await client.sendOneCallAsync();
assert.strictEqual(error, null);
});
it('Should handle weighted_round_robin', async () => {
const lbPolicy: ClientSideWeightedRoundRobin & AnyExtension = {
'@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin',
enable_oob_load_report: { value: true },
oob_reporting_period: { seconds: 1 },
blackout_period: { seconds: 1 },
weight_expiration_period: { seconds: 1 },
weight_update_period: { seconds: 1 },
error_utilization_penalty: { value: 0.5 }
};
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend] = await createBackends(1);
const serverRoute = new FakeServerRoute(backend.getPort(), 'serverRoute');
xdsServer.setRdsResource(serverRoute.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
const error = await client.sendOneCallAsync();
assert.strictEqual(error, null);
});
it('Should distribute traffic among backends with weighted_round_robin', async () => {
const lbPolicy: ClientSideWeightedRoundRobin & AnyExtension = {
'@type': 'type.googleapis.com/envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin',
enable_oob_load_report: { value: true },
oob_reporting_period: { seconds: 1 },
blackout_period: { seconds: 1 },
weight_expiration_period: { seconds: 1 },
weight_update_period: { seconds: 1 },
error_utilization_penalty: { value: 0.5 }
};
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client?.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
});
const [backend1, backend2] = await createBackends(2);
const serverRoute1 = new FakeServerRoute(backend1.getPort(), 'serverRoute');
const serverRoute2 = new FakeServerRoute(backend2.getPort(), 'serverRoute2');
xdsServer.setRdsResource(serverRoute1.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute1.getListener());
xdsServer.setRdsResource(serverRoute2.getRouteConfiguration());
xdsServer.setLdsResource(serverRoute2.getListener());
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [backend1, backend2], locality:{region: 'region1'}}], lbPolicy);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
await routeGroup.startAllBackends(xdsServer);
client = XdsTestClient.createFromServer('listener1', xdsServer);
client.startCalls(100);
await routeGroup.waitForAllBackendsToReceiveTraffic();
client.stopCalls();
});
});

View File

@ -24,7 +24,7 @@ import { AnyExtension } from '@grpc/proto-loader';
import { RBAC } from '../src/generated/envoy/extensions/filters/http/rbac/v3/RBAC';
import { status } from '@grpc/grpc-js';
describe.only('RBAC HTTP filter', () => {
describe('RBAC HTTP filter', () => {
let xdsServer: ControlPlaneServer;
let client: XdsTestClient;
beforeEach(done => {

View File

@ -53,6 +53,7 @@ const loadedProtos = loadPackageDefinition(loadSync(
'envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto',
'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto',
'envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto',
'envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3/client_side_weighted_round_robin.proto',
'envoy/extensions/transport_sockets/tls/v3/tls.proto',
'xds/type/v3/typed_struct.proto',
'envoy/extensions/filters/http/router/v3/router.proto',

View File

@ -64,7 +64,7 @@
"pretest": "npm run generate-types && npm run generate-test-types && npm run compile",
"posttest": "npm run check && madge -c ./build/src",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs proto/ --include-dirs proto/ proto/xds/ proto/protoc-gen-validate/ -O src/generated/ --grpcLib ../index channelz.proto xds/service/orca/v3/orca.proto",
"generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto",
"generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto echo_service.proto",
"copy-protos": "node ./copy-protos"
},
"dependencies": {

View File

@ -47,6 +47,10 @@ export function isDuration(value: any): value is Duration {
return typeof value.seconds === 'number' && typeof value.nanos === 'number';
}
export function isDurationMessage(value: any): value is DurationMessage {
return typeof value.seconds === 'string' && typeof value.nanos === 'number';
}
const durationRegex = /^(\d+)(?:\.(\d+))?s$/;
export function parseDuration(value: string): Duration | null {
const match = value.match(durationRegex);
@ -58,3 +62,18 @@ export function parseDuration(value: string): Duration | null {
nanos: match[2] ? Number.parseInt(match[2].padEnd(9, '0'), 10) : 0
};
}
export function durationToString(duration: Duration): string {
if (duration.nanos === 0) {
return `${duration.seconds}s`;
}
let scaleFactor: number;
if (duration.nanos % 1_000_000 === 0) {
scaleFactor = 1_000_000;
} else if (duration.nanos % 1_000 === 0) {
scaleFactor = 1_000;
} else {
scaleFactor = 1;
}
return `${duration.seconds}.${duration.nanos/scaleFactor}s`;
}

View File

@ -296,6 +296,7 @@ import * as resolver_ip from './resolver-ip';
import * as load_balancer_pick_first from './load-balancer-pick-first';
import * as load_balancer_round_robin from './load-balancer-round-robin';
import * as load_balancer_outlier_detection from './load-balancer-outlier-detection';
import * as load_balancer_weighted_round_robin from './load-balancer-weighted-round-robin';
import * as channelz from './channelz';
import { Deadline } from './deadline';
@ -306,5 +307,6 @@ import { Deadline } from './deadline';
load_balancer_pick_first.setup();
load_balancer_round_robin.setup();
load_balancer_outlier_detection.setup();
load_balancer_weighted_round_robin.setup();
channelz.setup();
})();

View File

@ -34,7 +34,7 @@ import {
} from './picker';
import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity, Status } from './constants';
import { LogVerbosity } from './constants';
import {
SubchannelInterface,
ConnectivityStateListener,
@ -44,12 +44,6 @@ import { isTcpSubchannelAddress } from './subchannel-address';
import { isIPv6 } from 'net';
import { ChannelOptions } from './channel-options';
import { StatusOr, statusOrFromValue } from './call-interface';
import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport';
import { OpenRcaServiceClient } from './generated/xds/service/orca/v3/OpenRcaService';
import { ClientReadableStream, ServiceError } from './call';
import { createOrcaClient, MetricsListener } from './orca';
import { msToDuration } from './duration';
import { BackoffTimeout } from './backoff-timeout';
const TRACER_NAME = 'pick_first';
@ -245,13 +239,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private latestResolutionNote: string = '';
private metricsListeners: Map<MetricsListener, number> = new Map();
private orcaClient: OpenRcaServiceClient | null = null;
private metricsCall: ClientReadableStream<OrcaLoadReport__Output> | null = null;
private currentMetricsIntervalMs: number = Infinity;
private orcaUnsupported = false;
private metricsBackoffTimer = new BackoffTimeout(() => this.updateMetricsSubscription());
/**
* Load balancer that attempts to connect to each backend in the address list
* in order, and picks the first one that connects, using it for every
@ -349,12 +336,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentPick.removeHealthStateWatcher(
this.pickedSubchannelHealthListener
);
this.orcaClient?.close();
this.orcaClient = null;
this.metricsCall?.cancel();
this.metricsCall = null;
this.metricsBackoffTimer.stop();
this.metricsBackoffTimer.reset();
// Unref last, to avoid triggering listeners
this.currentPick.unref();
this.currentPick = null;
@ -458,7 +439,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentPick = subchannel;
clearTimeout(this.connectionDelayTimeout);
this.calculateAndReportNewState();
this.updateMetricsSubscription();
}
private updateState(newState: ConnectivityState, picker: Picker, errorMessage: string | null) {
@ -593,67 +573,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
getTypeName(): string {
return TYPE_NAME;
}
private getOrCreateOrcaClient(): OpenRcaServiceClient | null {
if (this.orcaClient) {
return this.orcaClient;
}
if (this.currentPick) {
const channel = this.currentPick.getChannel();
this.orcaClient = createOrcaClient(channel);
return this.orcaClient;
}
return null;
}
private updateMetricsSubscription() {
if (this.orcaUnsupported) {
return;
}
if (this.metricsListeners.size > 0) {
const newInterval = Math.min(...Array.from(this.metricsListeners.values()));
if (!this.metricsCall || newInterval !== this.currentMetricsIntervalMs) {
const orcaClient = this.getOrCreateOrcaClient();
if (!orcaClient) {
return;
}
this.metricsCall?.cancel();
this.currentMetricsIntervalMs = newInterval;
const metricsCall = orcaClient.streamCoreMetrics({report_interval: msToDuration(newInterval)});
this.metricsCall = metricsCall;
metricsCall.on('data', (report: OrcaLoadReport__Output) => {
this.metricsListeners.forEach((interval, listener) => {
listener(report);
});
});
metricsCall.on('error', (error: ServiceError) => {
this.metricsCall = null;
if (error.code === Status.UNIMPLEMENTED) {
this.orcaUnsupported = true;
return;
}
if (error.code === Status.CANCELLED) {
return;
}
this.metricsBackoffTimer.runOnce();
});
}
} else {
this.metricsCall?.cancel();
this.metricsCall = null;
this.currentMetricsIntervalMs = Infinity;
}
}
addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
this.metricsListeners.set(listener, intervalMs);
this.updateMetricsSubscription();
}
removeMetricsSubscription(listener: MetricsListener): void {
this.metricsListeners.delete(listener);
this.updateMetricsSubscription();
}
}
const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
@ -731,14 +650,6 @@ export class LeafLoadBalancer {
destroy() {
this.pickFirstBalancer.destroy();
}
addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
this.pickFirstBalancer.addMetricsSubscription(listener, intervalMs);
}
removeMetricsSubscription(listener: MetricsListener): void {
this.pickFirstBalancer.removeMetricsSubscription(listener);
}
}
export function setup(): void {

View File

@ -0,0 +1,494 @@
/*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { StatusOr } from './call-interface';
import { ChannelOptions } from './channel-options';
import { ConnectivityState } from './connectivity-state';
import { LogVerbosity } from './constants';
import { Duration, durationMessageToDuration, durationToMs, durationToString, isDuration, isDurationMessage, msToDuration, parseDuration } from './duration';
import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport';
import { ChannelControlHelper, createChildChannelControlHelper, LoadBalancer, registerLoadBalancerType, TypedLoadBalancingConfig } from './load-balancer';
import { LeafLoadBalancer } from './load-balancer-pick-first';
import * as logging from './logging';
import { createMetricsReader, MetricsListener, OrcaOobMetricsSubchannelWrapper } from './orca';
import { PickArgs, Picker, PickResult, PickResultType, QueuePicker, UnavailablePicker } from './picker';
import { PriorityQueue } from './priority-queue';
import { Endpoint, endpointToString } from './subchannel-address';
const TRACER_NAME = 'weighted_round_robin';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const TYPE_NAME = 'weighted_round_robin';
const DEFAULT_OOB_REPORTING_PERIOD_MS = 10_000;
const DEFAULT_BLACKOUT_PERIOD_MS = 10_000;
const DEFAULT_WEIGHT_EXPIRATION_PERIOD_MS = 3 * 60_000;
const DEFAULT_WEIGHT_UPDATE_PERIOD_MS = 1_000;
const DEFAULT_ERROR_UTILIZATION_PENALTY = 1;
type TypeofValues =
| 'object'
| 'boolean'
| 'function'
| 'number'
| 'string'
| 'undefined';
function validateFieldType(
obj: any,
fieldName: string,
expectedType: TypeofValues
) {
if (
fieldName in obj &&
obj[fieldName] !== undefined &&
typeof obj[fieldName] !== expectedType
) {
throw new Error(
`weighted round robin config ${fieldName} parse error: expected ${expectedType}, got ${typeof obj[
fieldName
]}`
);
}
}
function parseDurationField(obj: any, fieldName: string): number | null {
if (fieldName in obj && obj[fieldName] !== undefined && obj[fieldName] !== null) {
let durationObject: Duration;
if (isDuration(obj[fieldName])) {
durationObject = obj[fieldName];
} else if (isDurationMessage(obj[fieldName])) {
durationObject = durationMessageToDuration(obj[fieldName]);
} else if (typeof obj[fieldName] === 'string') {
const parsedDuration = parseDuration(obj[fieldName]);
if (!parsedDuration) {
throw new Error(`weighted round robin config ${fieldName}: failed to parse duration string ${obj[fieldName]}`);
}
durationObject = parsedDuration;
} else {
throw new Error(`weighted round robin config ${fieldName}: expected duration, got ${typeof obj[fieldName]}`);
}
return durationToMs(durationObject);
}
return null;
}
export class WeightedRoundRobinLoadBalancingConfig implements TypedLoadBalancingConfig {
private readonly enableOobLoadReport: boolean;
private readonly oobLoadReportingPeriodMs: number;
private readonly blackoutPeriodMs: number;
private readonly weightExpirationPeriodMs: number;
private readonly weightUpdatePeriodMs: number;
private readonly errorUtilizationPenalty: number;
constructor(
enableOobLoadReport: boolean | null,
oobLoadReportingPeriodMs: number | null,
blackoutPeriodMs: number | null,
weightExpirationPeriodMs: number | null,
weightUpdatePeriodMs: number | null,
errorUtilizationPenalty: number | null
) {
this.enableOobLoadReport = enableOobLoadReport ?? false;
this.oobLoadReportingPeriodMs = oobLoadReportingPeriodMs ?? DEFAULT_OOB_REPORTING_PERIOD_MS;
this.blackoutPeriodMs = blackoutPeriodMs ?? DEFAULT_BLACKOUT_PERIOD_MS;
this.weightExpirationPeriodMs = weightExpirationPeriodMs ?? DEFAULT_WEIGHT_EXPIRATION_PERIOD_MS;
this.weightUpdatePeriodMs = Math.max(weightUpdatePeriodMs ?? DEFAULT_WEIGHT_UPDATE_PERIOD_MS, 100);
this.errorUtilizationPenalty = errorUtilizationPenalty ?? DEFAULT_ERROR_UTILIZATION_PENALTY;
}
getLoadBalancerName(): string {
return TYPE_NAME;
}
toJsonObject(): object {
return {
enable_oob_load_report: this.enableOobLoadReport,
oob_load_reporting_period: durationToString(msToDuration(this.oobLoadReportingPeriodMs)),
blackout_period: durationToString(msToDuration(this.blackoutPeriodMs)),
weight_expiration_period: durationToString(msToDuration(this.weightExpirationPeriodMs)),
weight_update_period: durationToString(msToDuration(this.weightUpdatePeriodMs)),
error_utilization_penalty: this.errorUtilizationPenalty
};
}
static createFromJson(obj: any): WeightedRoundRobinLoadBalancingConfig {
validateFieldType(obj, 'enable_oob_load_report', 'boolean');
validateFieldType(obj, 'error_utilization_penalty', 'number');
if (obj.error_utilization_penalty < 0) {
throw new Error('weighted round robin config error_utilization_penalty < 0');
}
return new WeightedRoundRobinLoadBalancingConfig(
obj.enable_oob_load_report,
parseDurationField(obj, 'oob_load_reporting_period'),
parseDurationField(obj, 'blackout_period'),
parseDurationField(obj, 'weight_expiration_period'),
parseDurationField(obj, 'weight_update_period'),
obj.error_utilization_penalty
)
}
getEnableOobLoadReport() {
return this.enableOobLoadReport;
}
getOobLoadReportingPeriodMs() {
return this.oobLoadReportingPeriodMs;
}
getBlackoutPeriodMs() {
return this.blackoutPeriodMs;
}
getWeightExpirationPeriodMs() {
return this.weightExpirationPeriodMs;
}
getWeightUpdatePeriodMs() {
return this.weightUpdatePeriodMs;
}
getErrorUtilizationPenalty() {
return this.errorUtilizationPenalty;
}
}
interface WeightedPicker {
endpointName: string;
picker: Picker;
weight: number;
}
interface QueueEntry {
endpointName: string;
picker: Picker;
period: number;
deadline: number;
}
type MetricsHandler = (loadReport: OrcaLoadReport__Output, endpointName: string) => void;
class WeightedRoundRobinPicker implements Picker {
private queue: PriorityQueue<QueueEntry> = new PriorityQueue((a, b) => a.deadline < b.deadline);
constructor(children: WeightedPicker[], private readonly metricsHandler: MetricsHandler | null) {
const positiveWeight = children.filter(picker => picker.weight > 0);
let averageWeight: number;
if (positiveWeight.length < 2) {
averageWeight = 1;
} else {
let weightSum: number = 0;
for (const { weight } of positiveWeight) {
weightSum += weight;
}
averageWeight = weightSum / positiveWeight.length;
}
for (const child of children) {
const period = child.weight > 0 ? 1 / child.weight : averageWeight;
this.queue.push({
endpointName: child.endpointName,
picker: child.picker,
period: period,
deadline: Math.random() * period
});
}
}
pick(pickArgs: PickArgs): PickResult {
const entry = this.queue.pop()!;
this.queue.push({
...entry,
deadline: entry.deadline + entry.period
})
const childPick = entry.picker.pick(pickArgs);
if (childPick.pickResultType === PickResultType.COMPLETE) {
if (this.metricsHandler) {
return {
...childPick,
onCallEnded: createMetricsReader(loadReport => this.metricsHandler!(loadReport, entry.endpointName), childPick.onCallEnded)
};
} else {
const subchannelWrapper = childPick.subchannel as OrcaOobMetricsSubchannelWrapper;
return {
...childPick,
subchannel: subchannelWrapper.getWrappedSubchannel()
}
}
} else {
return childPick;
}
}
}
interface ChildEntry {
child: LeafLoadBalancer;
lastUpdated: Date;
nonEmptySince: Date | null;
weight: number;
oobMetricsListener: MetricsListener | null;
}
class WeightedRoundRobinLoadBalancer implements LoadBalancer {
private latestConfig: WeightedRoundRobinLoadBalancingConfig | null = null;
private children: Map<string, ChildEntry> = new Map();
private currentState: ConnectivityState = ConnectivityState.IDLE;
private updatesPaused = false;
private lastError: string | null = null;
private weightUpdateTimer: NodeJS.Timeout | null = null;
constructor(private readonly channelControlHelper: ChannelControlHelper) {}
private countChildrenWithState(state: ConnectivityState) {
let count = 0;
for (const entry of this.children.values()) {
if (entry.child.getConnectivityState() === state) {
count += 1;
}
}
return count;
}
updateWeight(entry: ChildEntry, loadReport: OrcaLoadReport__Output): void {
const qps = loadReport.rps_fractional;
let utilization = loadReport.application_utilization;
if (utilization > 0 && qps > 0) {
utilization += (loadReport.eps / qps) * (this.latestConfig?.getErrorUtilizationPenalty() ?? 0);
}
const newWeight = utilization === 0 ? 0 : qps / utilization;
if (newWeight === 0) {
return;
}
const now = new Date();
if (entry.nonEmptySince === null) {
entry.nonEmptySince = now;
}
entry.lastUpdated = now;
entry.weight = newWeight;
}
getWeight(entry: ChildEntry): number {
if (!this.latestConfig) {
return 0;
}
const now = new Date().getTime();
if (now - entry.lastUpdated.getTime() >= this.latestConfig.getWeightExpirationPeriodMs()) {
entry.nonEmptySince = null;
return 0;
}
const blackoutPeriod = this.latestConfig.getBlackoutPeriodMs();
if (blackoutPeriod > 0 && (entry.nonEmptySince === null || now - entry.nonEmptySince.getTime() < blackoutPeriod)) {
return 0;
}
return entry.weight;
}
private calculateAndUpdateState() {
if (this.updatesPaused || !this.latestConfig) {
return;
}
if (this.countChildrenWithState(ConnectivityState.READY) > 0) {
const weightedPickers: WeightedPicker[] = [];
for (const [endpoint, entry] of this.children) {
if (entry.child.getConnectivityState() !== ConnectivityState.READY) {
continue;
}
weightedPickers.push({
endpointName: endpoint,
picker: entry.child.getPicker(),
weight: this.getWeight(entry)
});
}
trace('Created picker with weights: ' + weightedPickers.map(entry => entry.endpointName + ':' + entry.weight).join(','));
let metricsHandler: MetricsHandler | null;
if (!this.latestConfig.getEnableOobLoadReport()) {
metricsHandler = (loadReport, endpointName) => {
const childEntry = this.children.get(endpointName);
if (childEntry) {
this.updateWeight(childEntry, loadReport);
}
};
} else {
metricsHandler = null;
}
this.updateState(
ConnectivityState.READY,
new WeightedRoundRobinPicker(
weightedPickers,
metricsHandler
),
null
);
} else if (this.countChildrenWithState(ConnectivityState.CONNECTING) > 0) {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this), null);
} else if (
this.countChildrenWithState(ConnectivityState.TRANSIENT_FAILURE) > 0
) {
const errorMessage = `weighted_round_robin: No connection established. Last error: ${this.lastError}`;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
details: errorMessage,
}),
errorMessage
);
} else {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this), null);
}
/* round_robin should keep all children connected, this is how we do that.
* We can't do this more efficiently in the individual child's updateState
* callback because that doesn't have a reference to which child the state
* change is associated with. */
for (const {child} of this.children.values()) {
if (child.getConnectivityState() === ConnectivityState.IDLE) {
child.exitIdle();
}
}
}
private updateState(newState: ConnectivityState, picker: Picker, errorMessage: string | null) {
trace(
ConnectivityState[this.currentState] +
' -> ' +
ConnectivityState[newState]
);
this.currentState = newState;
this.channelControlHelper.updateState(newState, picker, errorMessage);
}
updateAddressList(maybeEndpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof WeightedRoundRobinLoadBalancingConfig)) {
return false;
}
if (!maybeEndpointList.ok) {
if (this.children.size === 0) {
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker(maybeEndpointList.error),
maybeEndpointList.error.details
);
}
return true;
}
if (maybeEndpointList.value.length === 0) {
const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({details: errorMessage}),
errorMessage
);
return false;
}
trace('Connect to endpoint list ' + maybeEndpointList.value.map(endpointToString));
const now = new Date();
const seenEndpointNames = new Set<string>();
this.updatesPaused = true;
this.latestConfig = lbConfig;
for (const endpoint of maybeEndpointList.value) {
const name = endpointToString(endpoint);
seenEndpointNames.add(name);
let entry = this.children.get(name);
if (!entry) {
entry = {
child: new LeafLoadBalancer(endpoint, createChildChannelControlHelper(this.channelControlHelper, {
updateState: (connectivityState, picker, errorMessage) => {
/* Ensure that name resolution is requested again after active
* connections are dropped. This is more aggressive than necessary to
* accomplish that, so we are counting on resolvers to have
* reasonable rate limits. */
if (this.currentState === ConnectivityState.READY && connectivityState !== ConnectivityState.READY) {
this.channelControlHelper.requestReresolution();
}
if (connectivityState === ConnectivityState.READY) {
entry!.nonEmptySince = null;
}
if (errorMessage) {
this.lastError = errorMessage;
}
this.calculateAndUpdateState();
},
createSubchannel: (subchannelAddress, subchannelArgs) => {
const subchannel = this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
if (entry?.oobMetricsListener) {
return new OrcaOobMetricsSubchannelWrapper(subchannel, entry.oobMetricsListener, this.latestConfig!.getOobLoadReportingPeriodMs());
} else {
return subchannel;
}
}
}), options, resolutionNote),
lastUpdated: now,
nonEmptySince: null,
weight: 0,
oobMetricsListener: null
};
this.children.set(name, entry);
}
if (lbConfig.getEnableOobLoadReport()) {
entry.oobMetricsListener = loadReport => {
this.updateWeight(entry!, loadReport);
};
} else {
entry.oobMetricsListener = null;
}
}
for (const [endpointName, entry] of this.children) {
if (seenEndpointNames.has(endpointName)) {
entry.child.startConnecting();
} else {
entry.child.destroy();
this.children.delete(endpointName);
}
}
this.updatesPaused = false;
this.calculateAndUpdateState();
if (this.weightUpdateTimer) {
clearInterval(this.weightUpdateTimer);
}
this.weightUpdateTimer = setInterval(() => {
if (this.currentState === ConnectivityState.READY) {
this.calculateAndUpdateState();
}
}, lbConfig.getWeightUpdatePeriodMs()).unref?.();
return true;
}
exitIdle(): void {
/* The weighted_round_robin LB policy is only in the IDLE state if it has
* no addresses to try to connect to and it has no picked subchannel.
* In that case, there is no meaningful action that can be taken here. */
}
resetBackoff(): void {
// This LB policy has no backoff to reset
}
destroy(): void {
for (const entry of this.children.values()) {
entry.child.destroy();
}
this.children.clear();
if (this.weightUpdateTimer) {
clearInterval(this.weightUpdateTimer);
}
}
getTypeName(): string {
return TYPE_NAME;
}
}
export function setup() {
registerLoadBalancerType(
TYPE_NAME,
WeightedRoundRobinLoadBalancer,
WeightedRoundRobinLoadBalancingConfig
);
}

View File

@ -21,11 +21,17 @@ import type { loadSync } from '@grpc/proto-loader';
import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca";
import { loadPackageDefinition } from "./make-client";
import { OpenRcaServiceClient, OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService";
import { durationMessageToDuration, durationToMs } from "./duration";
import { durationMessageToDuration, durationToMs, msToDuration } from "./duration";
import { Server } from "./server";
import { ChannelCredentials } from "./channel-credentials";
import { Channel } from "./channel";
import { OnCallEnded } from "./picker";
import { DataProducer, Subchannel } from "./subchannel";
import { BaseSubchannelWrapper, DataWatcher, SubchannelInterface } from "./subchannel-interface";
import { ClientReadableStream, ServiceError } from "./call";
import { Status } from "./constants";
import { BackoffTimeout } from "./backoff-timeout";
import { ConnectivityState } from "./connectivity-state";
const loadedOrcaProto: OrcaProtoGrpcType | null = null;
function loadOrcaProto(): OrcaProtoGrpcType {
@ -246,3 +252,98 @@ export function createMetricsReader(listener: MetricsListener, previousOnCallEnd
}
}
}
const DATA_PRODUCER_KEY = 'orca_oob_metrics';
class OobMetricsDataWatcher implements DataWatcher {
private dataProducer: DataProducer | null = null;
constructor(private metricsListener: MetricsListener, private intervalMs: number) {}
setSubchannel(subchannel: Subchannel): void {
const producer = subchannel.getOrCreateDataProducer(DATA_PRODUCER_KEY, createOobMetricsDataProducer);
this.dataProducer = producer;
producer.addDataWatcher(this);
}
destroy(): void {
this.dataProducer?.removeDataWatcher(this);
}
getInterval(): number {
return this.intervalMs;
}
onMetricsUpdate(metrics: OrcaLoadReport__Output) {
this.metricsListener(metrics);
}
}
class OobMetricsDataProducer implements DataProducer {
private dataWatchers: Set<OobMetricsDataWatcher> = new Set();
private orcaSupported = true;
private client: OpenRcaServiceClient;
private metricsCall: ClientReadableStream<OrcaLoadReport__Output> | null = null;
private currentInterval = Infinity;
private backoffTimer = new BackoffTimeout(() => this.updateMetricsSubscription());
private subchannelStateListener = () => this.updateMetricsSubscription();
constructor(private subchannel: Subchannel) {
const channel = subchannel.getChannel();
this.client = createOrcaClient(channel);
subchannel.addConnectivityStateListener(this.subchannelStateListener);
}
addDataWatcher(dataWatcher: OobMetricsDataWatcher): void {
this.dataWatchers.add(dataWatcher);
this.updateMetricsSubscription();
}
removeDataWatcher(dataWatcher: OobMetricsDataWatcher): void {
this.dataWatchers.delete(dataWatcher);
if (this.dataWatchers.size === 0) {
this.subchannel.removeDataProducer(DATA_PRODUCER_KEY);
this.metricsCall?.cancel();
this.metricsCall = null;
this.client.close();
this.subchannel.removeConnectivityStateListener(this.subchannelStateListener);
} else {
this.updateMetricsSubscription();
}
}
private updateMetricsSubscription() {
if (this.dataWatchers.size === 0 || !this.orcaSupported || this.subchannel.getConnectivityState() !== ConnectivityState.READY) {
return;
}
const newInterval = Math.min(...Array.from(this.dataWatchers).map(watcher => watcher.getInterval()));
if (!this.metricsCall || newInterval !== this.currentInterval) {
this.metricsCall?.cancel();
this.currentInterval = newInterval;
const metricsCall = this.client.streamCoreMetrics({report_interval: msToDuration(newInterval)});
this.metricsCall = metricsCall;
metricsCall.on('data', (report: OrcaLoadReport__Output) => {
this.dataWatchers.forEach(watcher => {
watcher.onMetricsUpdate(report);
});
});
metricsCall.on('error', (error: ServiceError) => {
this.metricsCall = null;
if (error.code === Status.UNIMPLEMENTED) {
this.orcaSupported = false;
return;
}
if (error.code === Status.CANCELLED) {
return;
}
this.backoffTimer.runOnce();
});
}
}
}
export class OrcaOobMetricsSubchannelWrapper extends BaseSubchannelWrapper {
constructor(child: SubchannelInterface, metricsListener: MetricsListener, intervalMs: number) {
super(child);
this.addDataWatcher(new OobMetricsDataWatcher(metricsListener, intervalMs));
}
getWrappedSubchannel(): SubchannelInterface {
return this.child;
}
}
function createOobMetricsDataProducer(subchannel: Subchannel) {
return new OobMetricsDataProducer(subchannel);
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
const top = 0;
const parent = (i: number) => Math.floor(i / 2);
const left = (i: number) => i * 2 + 1;
const right = (i: number) => i * 2 + 2;
/**
* A generic priority queue implemented as an array-based binary heap.
* Adapted from https://stackoverflow.com/a/42919752/159388
*/
export class PriorityQueue<T=number> {
private readonly heap: T[] = [];
/**
*
* @param comparator Returns true if the first argument should precede the
* second in the queue. Defaults to `(a, b) => a > b`
*/
constructor(private readonly comparator = (a: T, b: T) => a > b) {}
/**
* @returns The number of items currently in the queue
*/
size(): number {
return this.heap.length;
}
/**
* @returns True if there are no items in the queue, false otherwise
*/
isEmpty(): boolean {
return this.size() == 0;
}
/**
* Look at the front item that would be popped, without modifying the contents
* of the queue
* @returns The front item in the queue, or undefined if the queue is empty
*/
peek(): T | undefined {
return this.heap[top];
}
/**
* Add the items to the queue
* @param values The items to add
* @returns The new size of the queue after adding the items
*/
push(...values: T[]): number {
values.forEach(value => {
this.heap.push(value);
this.siftUp();
});
return this.size();
}
/**
* Remove the front item in the queue and return it
* @returns The front item in the queue, or undefined if the queue is empty
*/
pop(): T | undefined {
const poppedValue = this.peek();
const bottom = this.size() - 1;
if (bottom > top) {
this.swap(top, bottom);
}
this.heap.pop();
this.siftDown();
return poppedValue;
}
/**
* Simultaneously remove the front item in the queue and add the provided
* item.
* @param value The item to add
* @returns The front item in the queue, or undefined if the queue is empty
*/
replace(value: T): T | undefined {
const replacedValue = this.peek();
this.heap[top] = value;
this.siftDown();
return replacedValue;
}
private greater(i: number, j: number): boolean {
return this.comparator(this.heap[i], this.heap[j]);
}
private swap(i: number, j: number): void {
[this.heap[i], this.heap[j]] = [this.heap[j], this.heap[i]];
}
private siftUp(): void {
let node = this.size() - 1;
while (node > top && this.greater(node, parent(node))) {
this.swap(node, parent(node));
node = parent(node);
}
}
private siftDown(): void {
let node = top;
while (
(left(node) < this.size() && this.greater(left(node), node)) ||
(right(node) < this.size() && this.greater(right(node), node))
) {
let maxChild = (right(node) < this.size() && this.greater(right(node), left(node))) ? right(node) : left(node);
this.swap(node, maxChild);
node = maxChild;
}
}
}

View File

@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options';
import { LogVerbosity, Status } from './constants';
import { Metadata } from './metadata';
import { registerResolver, Resolver, ResolverListener } from './resolver';
import { Endpoint, SubchannelAddress } from './subchannel-address';
import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
import { GrpcUri, splitHostPort, uriToString } from './uri-parser';
import * as logging from './logging';
@ -85,7 +85,7 @@ class IpResolver implements Resolver {
});
}
this.endpoints = addresses.map(address => ({ addresses: [address] }));
trace('Parsed ' + target.scheme + ' address list ' + addresses);
trace('Parsed ' + target.scheme + ' address list ' + addresses.map(subchannelAddressToString));
}
updateResolution(): void {
if (!this.hasReturnedResult) {

View File

@ -22,10 +22,12 @@ import { getNextCallNumber } from "./call-number";
import { Channel } from "./channel";
import { ChannelOptions } from "./channel-options";
import { ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, unregisterChannelzRef } from "./channelz";
import { CompressionFilterFactory } from "./compression-filter";
import { ConnectivityState } from "./connectivity-state";
import { Propagate, Status } from "./constants";
import { restrictControlPlaneStatusCode } from "./control-plane-status";
import { Deadline, getRelativeTimeout } from "./deadline";
import { FilterStack, FilterStackFactory } from "./filter-stack";
import { Metadata } from "./metadata";
import { getDefaultAuthority } from "./resolver";
import { Subchannel } from "./subchannel";
@ -40,7 +42,10 @@ class SubchannelCallWrapper implements Call {
private halfClosePending = false;
private pendingStatus: StatusObject | null = null;
private serviceUrl: string;
constructor(private subchannel: Subchannel, private method: string, private options: CallStreamOptions, private callNumber: number) {
private filterStack: FilterStack;
private readFilterPending = false;
private writeFilterPending = false;
constructor(private subchannel: Subchannel, private method: string, filterStackFactory: FilterStackFactory, private options: CallStreamOptions, private callNumber: number) {
const splitPath: string[] = this.method.split('/');
let serviceName = '';
/* The standard path format is "/{serviceName}/{methodName}", so if we split
@ -63,6 +68,7 @@ class SubchannelCallWrapper implements Call {
}, timeout);
}
}
this.filterStack = filterStackFactory.createFilter();
}
cancelWithStatus(status: Status, details: string): void {
@ -80,7 +86,7 @@ class SubchannelCallWrapper implements Call {
getPeer(): string {
return this.childCall?.getPeer() ?? this.subchannel.getAddress();
}
start(metadata: Metadata, listener: InterceptingListener): void {
async start(metadata: Metadata, listener: InterceptingListener): Promise<void> {
if (this.pendingStatus) {
listener.onReceiveStatus(this.pendingStatus);
return;
@ -93,38 +99,71 @@ class SubchannelCallWrapper implements Call {
});
return;
}
this.subchannel.getCallCredentials()
.generateMetadata({method_name: this.method, service_url: this.serviceUrl})
.then(credsMetadata => {
this.childCall = this.subchannel.createCall(credsMetadata, this.options.host, this.method, listener);
if (this.readPending) {
this.childCall.startRead();
const filteredMetadata = await this.filterStack.sendMetadata(Promise.resolve(metadata));
let credsMetadata: Metadata;
try {
credsMetadata = await this.subchannel.getCallCredentials()
.generateMetadata({method_name: this.method, service_url: this.serviceUrl});
} catch (e) {
const error = e as (Error & { code: number });
const { code, details } = restrictControlPlaneStatusCode(
typeof error.code === 'number' ? error.code : Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${error.message}`
);
listener.onReceiveStatus(
{
code: code,
details: details,
metadata: new Metadata(),
}
if (this.pendingMessage) {
this.childCall.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
);
return;
}
credsMetadata.merge(filteredMetadata);
const childListener: InterceptingListener = {
onReceiveMetadata: async metadata => {
listener.onReceiveMetadata(await this.filterStack.receiveMetadata(metadata));
},
onReceiveMessage: async message => {
this.readFilterPending = true;
const filteredMessage = await this.filterStack.receiveMessage(message);
this.readFilterPending = false;
listener.onReceiveMessage(filteredMessage);
if (this.pendingStatus) {
listener.onReceiveStatus(this.pendingStatus);
}
if (this.halfClosePending) {
this.childCall.halfClose();
},
onReceiveStatus: async status => {
const filteredStatus = await this.filterStack.receiveTrailers(status);
if (this.readFilterPending) {
this.pendingStatus = filteredStatus;
} else {
listener.onReceiveStatus(filteredStatus);
}
}, (error: Error & { code: number }) => {
const { code, details } = restrictControlPlaneStatusCode(
typeof error.code === 'number' ? error.code : Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${error.message}`
);
listener.onReceiveStatus(
{
code: code,
details: details,
metadata: new Metadata(),
}
);
});
}
}
this.childCall = this.subchannel.createCall(credsMetadata, this.options.host, this.method, childListener);
if (this.readPending) {
this.childCall.startRead();
}
if (this.pendingMessage) {
this.childCall.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
}
if (this.halfClosePending && !this.writeFilterPending) {
this.childCall.halfClose();
}
}
sendMessageWithContext(context: MessageContext, message: Buffer): void {
async sendMessageWithContext(context: MessageContext, message: Buffer): Promise<void> {
this.writeFilterPending = true;
const filteredMessage = await this.filterStack.sendMessage(Promise.resolve({message: message, flags: context.flags}));
this.writeFilterPending = false;
if (this.childCall) {
this.childCall.sendMessageWithContext(context, message);
this.childCall.sendMessageWithContext(context, filteredMessage.message);
if (this.halfClosePending) {
this.childCall.halfClose();
}
} else {
this.pendingMessage = { context, message };
this.pendingMessage = { context, message: filteredMessage.message };
}
}
startRead(): void {
@ -135,7 +174,7 @@ class SubchannelCallWrapper implements Call {
}
}
halfClose(): void {
if (this.childCall) {
if (this.childCall && !this.writeFilterPending) {
this.childCall.halfClose();
} else {
this.halfClosePending = true;
@ -162,6 +201,7 @@ export class SingleSubchannelChannel implements Channel {
private channelzTrace = new ChannelzTrace();
private callTracker = new ChannelzCallTracker();
private childrenTracker = new ChannelzChildrenTracker();
private filterStackFactory: FilterStackFactory;
constructor(private subchannel: Subchannel, private target: GrpcUri, options: ChannelOptions) {
this.channelzEnabled = options['grpc.enable_channelz'] !== 0;
this.channelzRef = registerChannelzChannel(uriToString(target), () => ({
@ -174,6 +214,7 @@ export class SingleSubchannelChannel implements Channel {
if (this.channelzEnabled) {
this.childrenTracker.refChild(subchannel.getChannelzRef());
}
this.filterStackFactory = new FilterStackFactory([new CompressionFilterFactory(this, options)]);
}
close(): void {
@ -202,6 +243,6 @@ export class SingleSubchannelChannel implements Channel {
flags: Propagate.DEFAULTS,
parentCall: null
};
return new SubchannelCallWrapper(this.subchannel, method, callOptions, getNextCallNumber());
return new SubchannelCallWrapper(this.subchannel, method, this.filterStackFactory, callOptions, getNextCallNumber());
}
}

View File

@ -31,6 +31,11 @@ export type ConnectivityStateListener = (
export type HealthListener = (healthy: boolean) => void;
export interface DataWatcher {
setSubchannel(subchannel: Subchannel): void;
destroy(): void;
}
/**
* This is an interface for load balancing policies to use to interact with
* subchannels. This allows load balancing policies to wrap and unwrap
@ -53,6 +58,7 @@ export interface SubchannelInterface {
isHealthy(): boolean;
addHealthStateWatcher(listener: HealthListener): void;
removeHealthStateWatcher(listener: HealthListener): void;
addDataWatcher(dataWatcher: DataWatcher): void;
/**
* If this is a wrapper, return the wrapped subchannel, otherwise return this
*/
@ -77,6 +83,8 @@ export interface SubchannelInterface {
export abstract class BaseSubchannelWrapper implements SubchannelInterface {
private healthy = true;
private healthListeners: Set<HealthListener> = new Set();
private refcount = 0;
private dataWatchers: Set<DataWatcher> = new Set();
constructor(protected child: SubchannelInterface) {
child.addHealthStateWatcher(childHealthy => {
/* A change to the child health state only affects this wrapper's overall
@ -113,9 +121,19 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
}
ref(): void {
this.child.ref();
this.refcount += 1;
}
unref(): void {
this.child.unref();
this.refcount -= 1;
if (this.refcount === 0) {
this.destroy();
}
}
protected destroy() {
for (const watcher of this.dataWatchers) {
watcher.destroy();
}
}
getChannelzRef(): SubchannelRef {
return this.child.getChannelzRef();
@ -129,6 +147,10 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
removeHealthStateWatcher(listener: HealthListener): void {
this.healthListeners.delete(listener);
}
addDataWatcher(dataWatcher: DataWatcher): void {
dataWatcher.setSubchannel(this.getRealSubchannel());
this.dataWatchers.add(dataWatcher);
}
protected setHealthy(healthy: boolean): void {
if (healthy !== this.healthy) {
this.healthy = healthy;

View File

@ -41,6 +41,7 @@ import {
} from './channelz';
import {
ConnectivityStateListener,
DataWatcher,
SubchannelInterface,
} from './subchannel-interface';
import { SubchannelCallInterceptingListener } from './subchannel-call';
@ -57,6 +58,11 @@ const TRACER_NAME = 'subchannel';
* to calculate it */
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
export interface DataProducer {
addDataWatcher(dataWatcher: DataWatcher): void;
removeDataWatcher(dataWatcher: DataWatcher): void;
}
export class Subchannel implements SubchannelInterface {
/**
* The subchannel's current connectivity state. Invariant: `session` === `null`
@ -107,6 +113,10 @@ export class Subchannel implements SubchannelInterface {
private secureConnector: SecureConnector;
private dataProducers: Map<string, DataProducer> = new Map();
private subchannelChannel: Channel | null = null;
/**
* A class representing a connection to a single backend.
* @param channelTarget The target string for the channel as a whole
@ -523,6 +533,27 @@ export class Subchannel implements SubchannelInterface {
}
getChannel(): Channel {
return new SingleSubchannelChannel(this, this.channelTarget, this.options);
if (!this.subchannelChannel) {
this.subchannelChannel = new SingleSubchannelChannel(this, this.channelTarget, this.options);
}
return this.subchannelChannel;
}
addDataWatcher(dataWatcher: DataWatcher): void {
throw new Error('Not implemented');
}
getOrCreateDataProducer(name: string, createDataProducer: (subchannel: Subchannel) => DataProducer): DataProducer {
const existingProducer = this.dataProducers.get(name);
if (existingProducer){
return existingProducer;
}
const newProducer = createDataProducer(this);
this.dataProducers.set(name, newProducer);
return newProducer;
}
removeDataProducer(name: string) {
this.dataProducers.delete(name);
}
}

View File

@ -719,14 +719,33 @@ export class Http2SubchannelConnector implements SubchannelConnector {
settings: {
initialWindowSize:
options['grpc-node.flow_control_window'] ??
http2.getDefaultSettings().initialWindowSize,
http2.getDefaultSettings().initialWindowSize ?? 65535,
}
});
// Prepare window size configuration for remoteSettings handler
const defaultWin = http2.getDefaultSettings().initialWindowSize ?? 65535; // 65 535 B
const connWin = options[
'grpc-node.flow_control_window'
] as number | undefined;
this.session = session;
let errorMessage = 'Failed to connect';
let reportedError = false;
session.unref();
session.once('remoteSettings', () => {
// Send WINDOW_UPDATE now to avoid 65 KB start-window stall.
if (connWin && connWin > defaultWin) {
try {
// Node ≥ 14.18
(session as any).setLocalWindowSize(connWin);
} catch {
// Older Node: bump by the delta
const delta = connWin - (session.state.localWindowSize ?? defaultWin);
if (delta > 0) (session as any).incrementWindowSize(delta);
}
}
session.removeAllListeners();
secureConnectResult.socket.removeListener('close', closeHandler);
secureConnectResult.socket.removeListener('error', errorHandler);

View File

@ -30,6 +30,7 @@ import {
} from '../src/make-client';
import { readFileSync } from 'fs';
import {
DataWatcher,
HealthListener,
SubchannelInterface,
} from '../src/subchannel-interface';
@ -205,6 +206,9 @@ export class MockSubchannel implements SubchannelInterface {
) {
this.state = initialState;
}
addDataWatcher(dataWatcher: DataWatcher): void {
throw new Error('Method not implemented.');
}
getConnectivityState(): grpc.connectivityState {
return this.state;
}

View File

@ -0,0 +1,12 @@
// Original file: test/fixtures/echo_service.proto
export interface EchoMessage {
'value'?: (string);
'value2'?: (number);
}
export interface EchoMessage__Output {
'value': (string);
'value2': (number);
}

View File

@ -0,0 +1,54 @@
// Original file: test/fixtures/echo_service.proto
import type * as grpc from './../../src/index'
import type { MethodDefinition } from '@grpc/proto-loader'
import type { EchoMessage as _EchoMessage, EchoMessage__Output as _EchoMessage__Output } from './EchoMessage';
export interface EchoServiceClient extends grpc.Client {
Echo(argument: _EchoMessage, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientUnaryCall;
Echo(argument: _EchoMessage, metadata: grpc.Metadata, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientUnaryCall;
Echo(argument: _EchoMessage, options: grpc.CallOptions, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientUnaryCall;
Echo(argument: _EchoMessage, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientUnaryCall;
echo(argument: _EchoMessage, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientUnaryCall;
echo(argument: _EchoMessage, metadata: grpc.Metadata, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientUnaryCall;
echo(argument: _EchoMessage, options: grpc.CallOptions, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientUnaryCall;
echo(argument: _EchoMessage, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientUnaryCall;
EchoBidiStream(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_EchoMessage, _EchoMessage__Output>;
EchoBidiStream(options?: grpc.CallOptions): grpc.ClientDuplexStream<_EchoMessage, _EchoMessage__Output>;
echoBidiStream(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_EchoMessage, _EchoMessage__Output>;
echoBidiStream(options?: grpc.CallOptions): grpc.ClientDuplexStream<_EchoMessage, _EchoMessage__Output>;
EchoClientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientWritableStream<_EchoMessage>;
EchoClientStream(metadata: grpc.Metadata, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientWritableStream<_EchoMessage>;
EchoClientStream(options: grpc.CallOptions, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientWritableStream<_EchoMessage>;
EchoClientStream(callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientWritableStream<_EchoMessage>;
echoClientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientWritableStream<_EchoMessage>;
echoClientStream(metadata: grpc.Metadata, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientWritableStream<_EchoMessage>;
echoClientStream(options: grpc.CallOptions, callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientWritableStream<_EchoMessage>;
echoClientStream(callback: grpc.requestCallback<_EchoMessage__Output>): grpc.ClientWritableStream<_EchoMessage>;
EchoServerStream(argument: _EchoMessage, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_EchoMessage__Output>;
EchoServerStream(argument: _EchoMessage, options?: grpc.CallOptions): grpc.ClientReadableStream<_EchoMessage__Output>;
echoServerStream(argument: _EchoMessage, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_EchoMessage__Output>;
echoServerStream(argument: _EchoMessage, options?: grpc.CallOptions): grpc.ClientReadableStream<_EchoMessage__Output>;
}
export interface EchoServiceHandlers extends grpc.UntypedServiceImplementation {
Echo: grpc.handleUnaryCall<_EchoMessage__Output, _EchoMessage>;
EchoBidiStream: grpc.handleBidiStreamingCall<_EchoMessage__Output, _EchoMessage>;
EchoClientStream: grpc.handleClientStreamingCall<_EchoMessage__Output, _EchoMessage>;
EchoServerStream: grpc.handleServerStreamingCall<_EchoMessage__Output, _EchoMessage>;
}
export interface EchoServiceDefinition extends grpc.ServiceDefinition {
Echo: MethodDefinition<_EchoMessage, _EchoMessage, _EchoMessage__Output, _EchoMessage__Output>
EchoBidiStream: MethodDefinition<_EchoMessage, _EchoMessage, _EchoMessage__Output, _EchoMessage__Output>
EchoClientStream: MethodDefinition<_EchoMessage, _EchoMessage, _EchoMessage__Output, _EchoMessage__Output>
EchoServerStream: MethodDefinition<_EchoMessage, _EchoMessage, _EchoMessage__Output, _EchoMessage__Output>
}

View File

@ -0,0 +1,15 @@
import type * as grpc from '../../src/index';
import type { MessageTypeDefinition } from '@grpc/proto-loader';
import type { EchoMessage as _EchoMessage, EchoMessage__Output as _EchoMessage__Output } from './EchoMessage';
import type { EchoServiceClient as _EchoServiceClient, EchoServiceDefinition as _EchoServiceDefinition } from './EchoService';
type SubtypeConstructor<Constructor extends new (...args: any) => any, Subtype> = {
new(...args: ConstructorParameters<Constructor>): Subtype;
};
export interface ProtoGrpcType {
EchoMessage: MessageTypeDefinition<_EchoMessage, _EchoMessage__Output>
EchoService: SubtypeConstructor<typeof grpc.Client, _EchoServiceClient> & { service: _EchoServiceDefinition }
}

View File

@ -0,0 +1,168 @@
/*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as assert from 'assert';
import { PriorityQueue } from '../src/priority-queue';
describe('PriorityQueue', () => {
describe('size', () => {
it('Should be 0 initially', () => {
const queue = new PriorityQueue();
assert.strictEqual(queue.size(), 0);
});
it('Should be 1 after pushing one item', () => {
const queue = new PriorityQueue();
queue.push(1);
assert.strictEqual(queue.size(), 1);
});
it('Should be 0 after pushing and popping one item', () => {
const queue = new PriorityQueue();
queue.push(1);
queue.pop();
assert.strictEqual(queue.size(), 0);
});
});
describe('isEmpty', () => {
it('Should be true initially', () => {
const queue = new PriorityQueue();
assert.strictEqual(queue.isEmpty(), true);
});
it('Should be false after pushing one item', () => {
const queue = new PriorityQueue();
queue.push(1);
assert.strictEqual(queue.isEmpty(), false);
});
it('Should be 0 after pushing and popping one item', () => {
const queue = new PriorityQueue();
queue.push(1);
queue.pop();
assert.strictEqual(queue.isEmpty(), true);
});
});
describe('peek', () => {
it('Should return undefined initially', () => {
const queue = new PriorityQueue();
assert.strictEqual(queue.peek(), undefined);
});
it('Should return the same value multiple times', () => {
const queue = new PriorityQueue();
queue.push(1);
assert.strictEqual(queue.peek(), 1);
assert.strictEqual(queue.peek(), 1);
});
it('Should return the maximum of multiple values', () => {
const queue = new PriorityQueue();
queue.push(1, 3, 8, 5, 6);
assert.strictEqual(queue.peek(), 8);
});
it('Should return undefined after popping the last item', () => {
const queue = new PriorityQueue();
queue.push(1);
queue.pop();
assert.strictEqual(queue.peek(), undefined);
});
});
describe('pop', () => {
it('Should return undefined initially', () => {
const queue = new PriorityQueue();
assert.strictEqual(queue.pop(), undefined);
});
it('Should return a pushed item', () => {
const queue = new PriorityQueue();
queue.push(1);
assert.strictEqual(queue.pop(), 1);
});
it('Should return pushed items in decreasing order', () => {
const queue = new PriorityQueue();
queue.push(1, 3, 8, 5, 6);
assert.strictEqual(queue.pop(), 8);
assert.strictEqual(queue.pop(), 6);
assert.strictEqual(queue.pop(), 5);
assert.strictEqual(queue.pop(), 3);
assert.strictEqual(queue.pop(), 1);
});
it('Should return undefined after popping the last item', () => {
const queue = new PriorityQueue();
queue.push(1);
queue.pop();
assert.strictEqual(queue.pop(), undefined);
});
});
describe('replace', () => {
it('should return undefined initially', () => {
const queue = new PriorityQueue();
assert.strictEqual(queue.replace(1), undefined);
});
it('Should return a pushed item', () => {
const queue = new PriorityQueue();
queue.push(1);
assert.strictEqual(queue.replace(2), 1);
});
it('Should replace the max value if providing the new max', () => {
const queue = new PriorityQueue();
queue.push(1, 3, 8, 5, 6);
assert.strictEqual(queue.replace(10), 8);
assert.strictEqual(queue.peek(), 10);
});
it('Should not replace the max value if providing a lower value', () => {
const queue = new PriorityQueue();
queue.push(1, 3, 8, 5, 6);
assert.strictEqual(queue.replace(4), 8);
assert.strictEqual(queue.peek(), 6);
});
});
describe('push', () => {
it('Should would the same with one call or multiple', () => {
const queue1 = new PriorityQueue();
queue1.push(1, 3, 8, 5, 6);
assert.strictEqual(queue1.pop(), 8);
assert.strictEqual(queue1.pop(), 6);
assert.strictEqual(queue1.pop(), 5);
assert.strictEqual(queue1.pop(), 3);
assert.strictEqual(queue1.pop(), 1);
const queue2 = new PriorityQueue();
queue2.push(1);
queue2.push(3);
queue2.push(8);
queue2.push(5);
queue2.push(6);
assert.strictEqual(queue2.pop(), 8);
assert.strictEqual(queue2.pop(), 6);
assert.strictEqual(queue2.pop(), 5);
assert.strictEqual(queue2.pop(), 3);
assert.strictEqual(queue2.pop(), 1);
});
});
describe('custom comparator', () => {
it('Should produce items in the reverse order with a reversed comparator', () => {
const queue = new PriorityQueue((a, b) => a < b);
queue.push(1, 3, 8, 5, 6);
assert.strictEqual(queue.pop(), 1);
assert.strictEqual(queue.pop(), 3);
assert.strictEqual(queue.pop(), 5);
assert.strictEqual(queue.pop(), 6);
assert.strictEqual(queue.pop(), 8);
});
it('Should support other types', () => {
const queue = new PriorityQueue<string>((a, b) => a.localeCompare(b) > 0);
queue.push('a', 'c', 'b');
assert.strictEqual(queue.pop(), 'c');
assert.strictEqual(queue.pop(), 'b');
assert.strictEqual(queue.pop(), 'a');
});
});
});

View File

@ -0,0 +1,436 @@
/*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as assert from 'assert';
import * as path from 'path';
import * as grpc from '../src';
import { loadProtoFile } from './common';
import { EchoServiceClient } from './generated/EchoService';
import { ProtoGrpcType } from './generated/echo_service'
import { WeightedRoundRobinLoadBalancingConfig } from '../src/load-balancer-weighted-round-robin';
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const EchoService = (loadProtoFile(protoFile) as unknown as ProtoGrpcType).EchoService;
function makeNCalls(client: EchoServiceClient, count: number): Promise<{[serverId: string]: number}> {
return new Promise((resolve, reject) => {
const result: {[serverId: string]: number} = {};
function makeOneCall(callsLeft: number) {
if (callsLeft <= 0) {
resolve(result);
} else {
const deadline = new Date();
deadline.setMilliseconds(deadline.getMilliseconds() + 100);
const call= client.echo({}, {deadline}, (error, value) => {
if (error) {
reject(error);
return;
}
makeOneCall(callsLeft - 1);
});
call.on('metadata', metadata => {
const serverEntry = metadata.get('server');
if (serverEntry.length > 0) {
const serverId = serverEntry[0] as string;
if (!(serverId in result)) {
result[serverId] = 0;
}
result[serverId] += 1;
}
});
}
}
makeOneCall(count);
});
}
function createServiceConfig(wrrConfig: object): grpc.ServiceConfig {
return {
methodConfig: [],
loadBalancingConfig: [
{'weighted_round_robin': wrrConfig}
]
};
}
function createClient(ports: number[], serviceConfig: grpc.ServiceConfig) {
return new EchoService(`ipv4:${ports.map(port => `127.0.0.1:${port}`).join(',')}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(serviceConfig)});
}
function asyncTimeout(delay: number): Promise<void> {
return new Promise(resolve => {
setTimeout(resolve, delay);
});
}
describe('Weighted round robin LB policy', () => {
describe('Config parsing', () => {
it('Should have default values with an empty object', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({});
assert.strictEqual(config.getEnableOobLoadReport(), false);
assert.strictEqual(config.getBlackoutPeriodMs(), 10_000);
assert.strictEqual(config.getErrorUtilizationPenalty(), 1);
assert.strictEqual(config.getOobLoadReportingPeriodMs(), 10_000);
assert.strictEqual(config.getWeightExpirationPeriodMs(), 180_000);
assert.strictEqual(config.getWeightUpdatePeriodMs(), 1_000);
});
it('Should handle enable_oob_load_report', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
enable_oob_load_report: true
});
assert.strictEqual(config.getEnableOobLoadReport(), true);
});
it('Should handle error_utilization_penalty', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
error_utilization_penalty: 0.5
});
assert.strictEqual(config.getErrorUtilizationPenalty(), 0.5);
});
it('Should reject negative error_utilization_penalty', () => {
const loadBalancingConfig = {
error_utilization_penalty: -1
};
assert.throws(() => {
WeightedRoundRobinLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /error_utilization_penalty < 0/);
});
it('Should handle blackout_period as a string', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
blackout_period: '1s'
});
assert.strictEqual(config.getBlackoutPeriodMs(), 1_000);
});
it('Should handle blackout_period as an object', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
blackout_period: {
seconds: 1,
nanos: 0
}
});
assert.strictEqual(config.getBlackoutPeriodMs(), 1_000);
});
it('Should handle oob_load_reporting_period as a string', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
oob_load_reporting_period: '1s'
});
assert.strictEqual(config.getOobLoadReportingPeriodMs(), 1_000);
});
it('Should handle oob_load_reporting_period as an object', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
oob_load_reporting_period: {
seconds: 1,
nanos: 0
}
});
assert.strictEqual(config.getOobLoadReportingPeriodMs(), 1_000);
});
it('Should handle weight_expiration_period as a string', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
weight_expiration_period: '1s'
});
assert.strictEqual(config.getWeightExpirationPeriodMs(), 1_000);
});
it('Should handle weight_expiration_period as an object', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
weight_expiration_period: {
seconds: 1,
nanos: 0
}
});
assert.strictEqual(config.getWeightExpirationPeriodMs(), 1_000);
});
it('Should handle weight_update_period as a string', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
weight_update_period: '2s'
});
assert.strictEqual(config.getWeightUpdatePeriodMs(), 2_000);
});
it('Should handle weight_update_period as an object', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
weight_update_period: {
seconds: 2,
nanos: 0
}
});
assert.strictEqual(config.getWeightUpdatePeriodMs(), 2_000);
});
it('Should cap weight_update_period to a minimum of 0.1s', () => {
const config = WeightedRoundRobinLoadBalancingConfig.createFromJson({
weight_update_period: '0.01s'
});
assert.strictEqual(config.getWeightUpdatePeriodMs(), 100);
});
});
describe('Per-call metrics', () => {
const server1Metrics = {
qps: 0,
utilization: 0,
eps: 0
};
const server2Metrics = {
qps: 0,
utilization: 0,
eps: 0
};
const server1 = new grpc.Server({'grpc.server_call_metric_recording': 1});
const server2 = new grpc.Server({'grpc.server_call_metric_recording': 1});
const server1Impl = {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
const metricsRecorder = call.getMetricsRecorder();
metricsRecorder.recordQpsMetric(server1Metrics.qps);
metricsRecorder.recordApplicationUtilizationMetric(server1Metrics.utilization);
metricsRecorder.recordEpsMetric(server1Metrics.eps);
const metadata = new grpc.Metadata();
metadata.set('server', '1');
call.sendMetadata(metadata);
callback(null, call.request);
},
};
const server2Impl = {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
const metricsRecorder = call.getMetricsRecorder();
metricsRecorder.recordQpsMetric(server2Metrics.qps);
metricsRecorder.recordApplicationUtilizationMetric(server2Metrics.utilization);
metricsRecorder.recordEpsMetric(server2Metrics.eps);
const metadata = new grpc.Metadata();
metadata.set('server', '2');
call.sendMetadata(metadata);
callback(null, call.request);
},
};
let port1: number;
let port2: number;
let client: EchoServiceClient | null = null;
before(done => {
const creds = grpc.ServerCredentials.createInsecure();
server1.addService(EchoService.service, server1Impl);
server2.addService(EchoService.service, server2Impl);
server1.bindAsync('localhost:0', creds, (error1, server1Port) => {
if (error1) {
done(error1);
return;
}
port1 = server1Port;
server2.bindAsync('localhost:0', creds, (error2, server2Port) => {
if (error2) {
done(error2);
return;
}
port2 = server2Port;
done();
});
});
});
beforeEach(() => {
server1Metrics.qps = 0;
server1Metrics.utilization = 0;
server1Metrics.eps = 0;
server2Metrics.qps = 0;
server2Metrics.utilization = 0;
server2Metrics.eps = 0;
});
afterEach(() => {
client?.close();
client = null;
});
after(() => {
server1.forceShutdown();
server2.forceShutdown();
});
it('Should evenly balance among endpoints with no weight', async () => {
const serviceConfig = createServiceConfig({});
client = createClient([port1, port2], serviceConfig);
await makeNCalls(client, 10);
const result = await makeNCalls(client, 30);
assert(Math.abs(result['1'] - result['2']) < 3, `server1: ${result['1']}, server2: ${result[2]}`);
});
it('Should send more requests to endpoints with higher QPS', async () => {
const serviceConfig = createServiceConfig({
blackout_period: '0.01s',
weight_update_period: '0.1s'
});
client = createClient([port1, port2], serviceConfig);
server1Metrics.qps = 3;
server1Metrics.utilization = 1;
server2Metrics.qps = 1;
server2Metrics.utilization = 1;
await makeNCalls(client, 10);
await asyncTimeout(200);
const result = await makeNCalls(client, 40);
assert(Math.abs(result['1'] - 30) < 3, `server1: ${result['1']}, server2: ${result['2']}`);
});
// Calls aren't fast enough for this to work consistently
it.skip('Should wait for the blackout period to apply weights', async () => {
const serviceConfig = createServiceConfig({
blackout_period: '0.5s'
});
client = createClient([port1, port2], serviceConfig);
server1Metrics.qps = 3;
server1Metrics.utilization = 1;
server2Metrics.qps = 1;
server2Metrics.utilization = 1;
await makeNCalls(client, 10);
await asyncTimeout(100);
const result1 = await makeNCalls(client, 20);
assert(Math.abs(result1['1'] - result1['2']) < 3, `result1: server1: ${result1['1']}, server2: ${result1[2]}`);
await asyncTimeout(400);
const result2 = await makeNCalls(client, 40);
assert(Math.abs(result2['1'] - 30) < 2, `result2: server1: ${result2['1']}, server2: ${result2['2']}`);
})
// Calls aren't fast enough for this to work consistently
it.skip('Should wait for the weight update period to apply weights', async () => {
const serviceConfig = createServiceConfig({
blackout_period: '0.01s',
weight_update_period: '1s'
});
client = createClient([port1, port2], serviceConfig);
server1Metrics.qps = 3;
server1Metrics.utilization = 1;
server2Metrics.qps = 1;
server2Metrics.utilization = 1;
await makeNCalls(client, 10);
await asyncTimeout(100);
const result1 = await makeNCalls(client, 20);
assert(Math.abs(result1['1'] - result1['2']) < 3, `result1: server1: ${result1['1']}, server2: ${result1[2]}`);
await asyncTimeout(400);
const result2 = await makeNCalls(client, 40);
assert(Math.abs(result2['1'] - 30) < 2, `result2: server1: ${result2['1']}, server2: ${result2['2']}`);
})
it('Should send more requests to endpoints with lower EPS', async () => {
const serviceConfig = createServiceConfig({
blackout_period: '0.01s',
weight_update_period: '0.1s',
error_utilization_penalty: 1
});
client = createClient([port1, port2], serviceConfig);
server1Metrics.qps = 2;
server1Metrics.utilization = 1;
server1Metrics.eps = 0;
server2Metrics.qps = 2;
server2Metrics.utilization = 1;
server2Metrics.eps = 2;
await makeNCalls(client, 10);
await asyncTimeout(100);
const result = await makeNCalls(client, 30);
assert(Math.abs(result['1'] - 20) < 3, `server1: ${result['1']}, server2: ${result['2']}`);
});
});
describe('Out of band metrics', () => {
const server1MetricRecorder = new grpc.ServerMetricRecorder();
const server2MetricRecorder = new grpc.ServerMetricRecorder();
const server1 = new grpc.Server();
const server2 = new grpc.Server();
const server1Impl = {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
const metadata = new grpc.Metadata();
metadata.set('server', '1');
call.sendMetadata(metadata);
callback(null, call.request);
},
};
const server2Impl = {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
const metadata = new grpc.Metadata();
metadata.set('server', '2');
call.sendMetadata(metadata);
callback(null, call.request);
},
};
let port1: number;
let port2: number;
let client: EchoServiceClient | null = null;
before(done => {
const creds = grpc.ServerCredentials.createInsecure();
server1.addService(EchoService.service, server1Impl);
server2.addService(EchoService.service, server2Impl);
server1MetricRecorder.addToServer(server1);
server2MetricRecorder.addToServer(server2);
server1.bindAsync('localhost:0', creds, (error1, server1Port) => {
if (error1) {
done(error1);
return;
}
port1 = server1Port;
server2.bindAsync('localhost:0', creds, (error2, server2Port) => {
if (error2) {
done(error2);
return;
}
port2 = server2Port;
done();
});
});
});
beforeEach(() => {
server1MetricRecorder.deleteQpsMetric();
server1MetricRecorder.deleteEpsMetric();
server1MetricRecorder.deleteApplicationUtilizationMetric();
server2MetricRecorder.deleteQpsMetric();
server2MetricRecorder.deleteEpsMetric();
server2MetricRecorder.deleteApplicationUtilizationMetric();
});
afterEach(() => {
client?.close();
client = null;
});
after(() => {
server1.forceShutdown();
server2.forceShutdown();
});
it('Should evenly balance among endpoints with no weight', async () => {
const serviceConfig = createServiceConfig({
enable_oob_load_report: true,
oob_load_reporting_period: '0.01s',
blackout_period: '0.01s'
});
client = createClient([port1, port2], serviceConfig);
await makeNCalls(client, 10);
const result = await makeNCalls(client, 30);
assert(Math.abs(result['1'] - result['2']) < 3, `server1: ${result['1']}, server2: ${result[2]}`);
});
it('Should send more requests to endpoints with higher QPS', async () => {
const serviceConfig = createServiceConfig({
enable_oob_load_report: true,
oob_load_reporting_period: '0.01s',
blackout_period: '0.01s',
weight_update_period: '0.1s'
});
client = createClient([port1, port2], serviceConfig);
server1MetricRecorder.setQpsMetric(3);
server1MetricRecorder.setApplicationUtilizationMetric(1);
server2MetricRecorder.setQpsMetric(1);
server2MetricRecorder.setApplicationUtilizationMetric(1);
await makeNCalls(client, 10);
await asyncTimeout(200);
const result = await makeNCalls(client, 40);
assert(Math.abs(result['1'] - 30) < 3, `server1: ${result['1']}, server2: ${result['2']}`);
});
});
});

30
test/kokoro/psm-light.cfg Normal file
View File

@ -0,0 +1,30 @@
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for Kokoro (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc-node/packages/grpc-js-xds/scripts/psm-interop-test-node.sh"
timeout_mins: 360
action {
define_artifacts {
regex: "artifacts/**/*sponge_log.xml"
regex: "artifacts/**/*.log"
strip_prefix: "artifacts"
}
}
env_vars {
key: "PSM_TEST_SUITE"
value: "light"
}