This commit is contained in:
Andrey Melnik 2021-04-01 10:22:06 +03:00
commit 65d4de8e6a
50 changed files with 3871 additions and 707 deletions

3
SECURITY.md Normal file
View File

@ -0,0 +1,3 @@
# Security Policy
For information on gRPC Security Policy and reporting potentional security issues, please see [gRPC CVE Process](https://github.com/grpc/proposal/blob/master/P4-grpc-cve-process.md).

View File

@ -0,0 +1,68 @@
// Original file: proto/grpc/testing/messages.proto
/**
* Metadata to be attached for the given type of RPCs.
*/
export interface _grpc_testing_ClientConfigureRequest_Metadata {
'type'?: (_grpc_testing_ClientConfigureRequest_RpcType | keyof typeof _grpc_testing_ClientConfigureRequest_RpcType);
'key'?: (string);
'value'?: (string);
}
/**
* Metadata to be attached for the given type of RPCs.
*/
export interface _grpc_testing_ClientConfigureRequest_Metadata__Output {
'type': (keyof typeof _grpc_testing_ClientConfigureRequest_RpcType);
'key': (string);
'value': (string);
}
// Original file: proto/grpc/testing/messages.proto
/**
* Type of RPCs to send.
*/
export enum _grpc_testing_ClientConfigureRequest_RpcType {
EMPTY_CALL = 0,
UNARY_CALL = 1,
}
/**
* Configurations for a test client.
*/
export interface ClientConfigureRequest {
/**
* The types of RPCs the client sends.
*/
'types'?: (_grpc_testing_ClientConfigureRequest_RpcType | keyof typeof _grpc_testing_ClientConfigureRequest_RpcType)[];
/**
* The collection of custom metadata to be attached to RPCs sent by the client.
*/
'metadata'?: (_grpc_testing_ClientConfigureRequest_Metadata)[];
/**
* The deadline to use, in seconds, for all RPCs. If unset or zero, the
* client will use the default from the command-line.
*/
'timeout_sec'?: (number);
}
/**
* Configurations for a test client.
*/
export interface ClientConfigureRequest__Output {
/**
* The types of RPCs the client sends.
*/
'types': (keyof typeof _grpc_testing_ClientConfigureRequest_RpcType)[];
/**
* The collection of custom metadata to be attached to RPCs sent by the client.
*/
'metadata': (_grpc_testing_ClientConfigureRequest_Metadata__Output)[];
/**
* The deadline to use, in seconds, for all RPCs. If unset or zero, the
* client will use the default from the command-line.
*/
'timeout_sec': (number);
}

View File

@ -0,0 +1,14 @@
// Original file: proto/grpc/testing/messages.proto
/**
* Response for updating a test client's configuration.
*/
export interface ClientConfigureResponse {
}
/**
* Response for updating a test client's configuration.
*/
export interface ClientConfigureResponse__Output {
}

View File

@ -0,0 +1,14 @@
// Original file: proto/grpc/testing/messages.proto
/**
* Request for retrieving a test client's accumulated stats.
*/
export interface LoadBalancerAccumulatedStatsRequest {
}
/**
* Request for retrieving a test client's accumulated stats.
*/
export interface LoadBalancerAccumulatedStatsRequest__Output {
}

View File

@ -0,0 +1,78 @@
// Original file: proto/grpc/testing/messages.proto
export interface _grpc_testing_LoadBalancerAccumulatedStatsResponse_MethodStats {
/**
* The number of RPCs that were started for this method.
*/
'rpcs_started'?: (number);
/**
* The number of RPCs that completed with each status for this method. The
* key is the integral value of a google.rpc.Code; the value is the count.
*/
'result'?: ({[key: number]: number});
}
export interface _grpc_testing_LoadBalancerAccumulatedStatsResponse_MethodStats__Output {
/**
* The number of RPCs that were started for this method.
*/
'rpcs_started': (number);
/**
* The number of RPCs that completed with each status for this method. The
* key is the integral value of a google.rpc.Code; the value is the count.
*/
'result': ({[key: number]: number});
}
/**
* Accumulated stats for RPCs sent by a test client.
*/
export interface LoadBalancerAccumulatedStatsResponse {
/**
* The total number of RPCs have ever issued for each type.
* Deprecated: use stats_per_method.rpcs_started instead.
*/
'num_rpcs_started_by_method'?: ({[key: string]: number});
/**
* The total number of RPCs have ever completed successfully for each type.
* Deprecated: use stats_per_method.result instead.
*/
'num_rpcs_succeeded_by_method'?: ({[key: string]: number});
/**
* The total number of RPCs have ever failed for each type.
* Deprecated: use stats_per_method.result instead.
*/
'num_rpcs_failed_by_method'?: ({[key: string]: number});
/**
* Per-method RPC statistics. The key is the RpcType in string form; e.g.
* 'EMPTY_CALL' or 'UNARY_CALL'
*/
'stats_per_method'?: ({[key: string]: _grpc_testing_LoadBalancerAccumulatedStatsResponse_MethodStats});
}
/**
* Accumulated stats for RPCs sent by a test client.
*/
export interface LoadBalancerAccumulatedStatsResponse__Output {
/**
* The total number of RPCs have ever issued for each type.
* Deprecated: use stats_per_method.rpcs_started instead.
*/
'num_rpcs_started_by_method': ({[key: string]: number});
/**
* The total number of RPCs have ever completed successfully for each type.
* Deprecated: use stats_per_method.result instead.
*/
'num_rpcs_succeeded_by_method': ({[key: string]: number});
/**
* The total number of RPCs have ever failed for each type.
* Deprecated: use stats_per_method.result instead.
*/
'num_rpcs_failed_by_method': ({[key: string]: number});
/**
* Per-method RPC statistics. The key is the RpcType in string form; e.g.
* 'EMPTY_CALL' or 'UNARY_CALL'
*/
'stats_per_method'?: ({[key: string]: _grpc_testing_LoadBalancerAccumulatedStatsResponse_MethodStats__Output});
}

View File

@ -1,6 +1,8 @@
// Original file: proto/grpc/testing/test.proto
import * as grpc from '@grpc/grpc-js'
import { LoadBalancerAccumulatedStatsRequest as _grpc_testing_LoadBalancerAccumulatedStatsRequest, LoadBalancerAccumulatedStatsRequest__Output as _grpc_testing_LoadBalancerAccumulatedStatsRequest__Output } from '../../grpc/testing/LoadBalancerAccumulatedStatsRequest';
import { LoadBalancerAccumulatedStatsResponse as _grpc_testing_LoadBalancerAccumulatedStatsResponse, LoadBalancerAccumulatedStatsResponse__Output as _grpc_testing_LoadBalancerAccumulatedStatsResponse__Output } from '../../grpc/testing/LoadBalancerAccumulatedStatsResponse';
import { LoadBalancerStatsRequest as _grpc_testing_LoadBalancerStatsRequest, LoadBalancerStatsRequest__Output as _grpc_testing_LoadBalancerStatsRequest__Output } from '../../grpc/testing/LoadBalancerStatsRequest';
import { LoadBalancerStatsResponse as _grpc_testing_LoadBalancerStatsResponse, LoadBalancerStatsResponse__Output as _grpc_testing_LoadBalancerStatsResponse__Output } from '../../grpc/testing/LoadBalancerStatsResponse';
@ -8,6 +10,21 @@ import { LoadBalancerStatsResponse as _grpc_testing_LoadBalancerStatsResponse, L
* A service used to obtain stats for verifying LB behavior.
*/
export interface LoadBalancerStatsServiceClient extends grpc.Client {
/**
* Gets the accumulated stats for RPCs sent by a test client.
*/
GetClientAccumulatedStats(argument: _grpc_testing_LoadBalancerAccumulatedStatsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerAccumulatedStatsResponse__Output) => void): grpc.ClientUnaryCall;
GetClientAccumulatedStats(argument: _grpc_testing_LoadBalancerAccumulatedStatsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerAccumulatedStatsResponse__Output) => void): grpc.ClientUnaryCall;
GetClientAccumulatedStats(argument: _grpc_testing_LoadBalancerAccumulatedStatsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerAccumulatedStatsResponse__Output) => void): grpc.ClientUnaryCall;
GetClientAccumulatedStats(argument: _grpc_testing_LoadBalancerAccumulatedStatsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerAccumulatedStatsResponse__Output) => void): grpc.ClientUnaryCall;
/**
* Gets the accumulated stats for RPCs sent by a test client.
*/
getClientAccumulatedStats(argument: _grpc_testing_LoadBalancerAccumulatedStatsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerAccumulatedStatsResponse__Output) => void): grpc.ClientUnaryCall;
getClientAccumulatedStats(argument: _grpc_testing_LoadBalancerAccumulatedStatsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerAccumulatedStatsResponse__Output) => void): grpc.ClientUnaryCall;
getClientAccumulatedStats(argument: _grpc_testing_LoadBalancerAccumulatedStatsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerAccumulatedStatsResponse__Output) => void): grpc.ClientUnaryCall;
getClientAccumulatedStats(argument: _grpc_testing_LoadBalancerAccumulatedStatsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerAccumulatedStatsResponse__Output) => void): grpc.ClientUnaryCall;
/**
* Gets the backend distribution for RPCs sent by a test client.
*/
@ -29,6 +46,11 @@ export interface LoadBalancerStatsServiceClient extends grpc.Client {
* A service used to obtain stats for verifying LB behavior.
*/
export interface LoadBalancerStatsServiceHandlers extends grpc.UntypedServiceImplementation {
/**
* Gets the accumulated stats for RPCs sent by a test client.
*/
GetClientAccumulatedStats(call: grpc.ServerUnaryCall<_grpc_testing_LoadBalancerAccumulatedStatsRequest__Output, _grpc_testing_LoadBalancerAccumulatedStatsResponse>, callback: grpc.sendUnaryData<_grpc_testing_LoadBalancerAccumulatedStatsResponse>): void;
/**
* Gets the backend distribution for RPCs sent by a test client.
*/

View File

@ -0,0 +1,37 @@
// Original file: proto/grpc/testing/test.proto
import * as grpc from '@grpc/grpc-js'
import { ClientConfigureRequest as _grpc_testing_ClientConfigureRequest, ClientConfigureRequest__Output as _grpc_testing_ClientConfigureRequest__Output } from '../../grpc/testing/ClientConfigureRequest';
import { ClientConfigureResponse as _grpc_testing_ClientConfigureResponse, ClientConfigureResponse__Output as _grpc_testing_ClientConfigureResponse__Output } from '../../grpc/testing/ClientConfigureResponse';
/**
* A service to dynamically update the configuration of an xDS test client.
*/
export interface XdsUpdateClientConfigureServiceClient extends grpc.Client {
/**
* Update the tes client's configuration.
*/
Configure(argument: _grpc_testing_ClientConfigureRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ClientConfigureResponse__Output) => void): grpc.ClientUnaryCall;
Configure(argument: _grpc_testing_ClientConfigureRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ClientConfigureResponse__Output) => void): grpc.ClientUnaryCall;
Configure(argument: _grpc_testing_ClientConfigureRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ClientConfigureResponse__Output) => void): grpc.ClientUnaryCall;
Configure(argument: _grpc_testing_ClientConfigureRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ClientConfigureResponse__Output) => void): grpc.ClientUnaryCall;
/**
* Update the tes client's configuration.
*/
configure(argument: _grpc_testing_ClientConfigureRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ClientConfigureResponse__Output) => void): grpc.ClientUnaryCall;
configure(argument: _grpc_testing_ClientConfigureRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ClientConfigureResponse__Output) => void): grpc.ClientUnaryCall;
configure(argument: _grpc_testing_ClientConfigureRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ClientConfigureResponse__Output) => void): grpc.ClientUnaryCall;
configure(argument: _grpc_testing_ClientConfigureRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ClientConfigureResponse__Output) => void): grpc.ClientUnaryCall;
}
/**
* A service to dynamically update the configuration of an xDS test client.
*/
export interface XdsUpdateClientConfigureServiceHandlers extends grpc.UntypedServiceImplementation {
/**
* Update the tes client's configuration.
*/
Configure(call: grpc.ServerUnaryCall<_grpc_testing_ClientConfigureRequest__Output, _grpc_testing_ClientConfigureResponse>, callback: grpc.sendUnaryData<_grpc_testing_ClientConfigureResponse>): void;
}

View File

@ -5,6 +5,7 @@ import { LoadBalancerStatsServiceClient as _grpc_testing_LoadBalancerStatsServic
import { ReconnectServiceClient as _grpc_testing_ReconnectServiceClient } from './grpc/testing/ReconnectService';
import { TestServiceClient as _grpc_testing_TestServiceClient } from './grpc/testing/TestService';
import { UnimplementedServiceClient as _grpc_testing_UnimplementedServiceClient } from './grpc/testing/UnimplementedService';
import { XdsUpdateClientConfigureServiceClient as _grpc_testing_XdsUpdateClientConfigureServiceClient } from './grpc/testing/XdsUpdateClientConfigureService';
import { XdsUpdateHealthServiceClient as _grpc_testing_XdsUpdateHealthServiceClient } from './grpc/testing/XdsUpdateHealthService';
type ConstructorArguments<Constructor> = Constructor extends new (...args: infer Args) => any ? Args: never;
@ -16,9 +17,13 @@ export interface ProtoGrpcType {
grpc: {
testing: {
BoolValue: MessageTypeDefinition
ClientConfigureRequest: MessageTypeDefinition
ClientConfigureResponse: MessageTypeDefinition
EchoStatus: MessageTypeDefinition
Empty: MessageTypeDefinition
GrpclbRouteType: EnumTypeDefinition
LoadBalancerAccumulatedStatsRequest: MessageTypeDefinition
LoadBalancerAccumulatedStatsResponse: MessageTypeDefinition
LoadBalancerStatsRequest: MessageTypeDefinition
LoadBalancerStatsResponse: MessageTypeDefinition
/**
@ -50,6 +55,10 @@ export interface ProtoGrpcType {
* that case.
*/
UnimplementedService: SubtypeConstructor<typeof grpc.Client, _grpc_testing_UnimplementedServiceClient> & { service: ServiceDefinition }
/**
* A service to dynamically update the configuration of an xDS test client.
*/
XdsUpdateClientConfigureService: SubtypeConstructor<typeof grpc.Client, _grpc_testing_XdsUpdateClientConfigureServiceClient> & { service: ServiceDefinition }
/**
* A service to remotely control health status of an xDS test server.
*/

View File

@ -26,6 +26,9 @@ import { TestServiceClient } from './generated/grpc/testing/TestService';
import { LoadBalancerStatsResponse } from './generated/grpc/testing/LoadBalancerStatsResponse';
import * as yargs from 'yargs';
import { LoadBalancerStatsServiceHandlers } from './generated/grpc/testing/LoadBalancerStatsService';
import { XdsUpdateClientConfigureServiceHandlers } from './generated/grpc/testing/XdsUpdateClientConfigureService';
import { Empty__Output } from './generated/grpc/testing/Empty';
import { LoadBalancerAccumulatedStatsResponse } from './generated/grpc/testing/LoadBalancerAccumulatedStatsResponse';
grpc_xds.register();
@ -159,47 +162,95 @@ class CallStatsTracker {
}
}
function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
let anyCallSucceeded: boolean = false;
setInterval(() => {
const notifier = callStatsTracker.startCall();
let gotMetadata: boolean = false;
let hostname: string | null = null;
let completed: boolean = false;
let completedWithError: boolean = false;
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + REQUEST_TIMEOUT_SEC);
const call = client.emptyCall({}, {deadline}, (error, value) => {
if (error) {
if (failOnFailedRpcs && anyCallSucceeded) {
console.error('A call failed after a call succeeded');
process.exit(1);
}
completed = true;
completedWithError = true;
notifier.onCallFailed(error.message);
} else {
anyCallSucceeded = true;
if (gotMetadata) {
if (hostname === null) {
notifier.onCallFailed('Hostname omitted from call metadata');
} else {
notifier.onCallSucceeded(hostname);
}
}
type CallType = 'EMPTY_CALL' | 'UNARY_CALL';
interface ClientConfiguration {
callTypes: (CallType)[];
metadata: {
EMPTY_CALL: grpc.Metadata,
UNARY_CALL: grpc.Metadata
},
timeoutSec: number
}
const currentConfig: ClientConfiguration = {
callTypes: ['EMPTY_CALL'],
metadata: {
EMPTY_CALL: new grpc.Metadata(),
UNARY_CALL: new grpc.Metadata()
},
timeoutSec: REQUEST_TIMEOUT_SEC
};
let anyCallSucceeded = false;
const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
stats_per_method: {
'EMPTY_CALL': {
rpcs_started: 0,
result: {}
},
'UNARY_CALL': {
rpcs_started: 0,
result: {}
}
}
};
function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
const callTypeStats = accumulatedStats.stats_per_method![type];
callTypeStats.rpcs_started! += 1;
const notifier = callStatsTracker.startCall();
let gotMetadata: boolean = false;
let hostname: string | null = null;
let completed: boolean = false;
let completedWithError: boolean = false;
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + currentConfig.timeoutSec);
const callback = (error: grpc.ServiceError | undefined, value: Empty__Output | undefined) => {
const statusCode = error?.code ?? grpc.status.OK;
callTypeStats.result![statusCode] = (callTypeStats.result![statusCode] ?? 0) + 1;
if (error) {
if (failOnFailedRpcs && anyCallSucceeded) {
console.error('A call failed after a call succeeded');
process.exit(1);
}
});
call.on('metadata', (metadata) => {
hostname = (metadata.get('hostname') as string[])[0] ?? null;
gotMetadata = true;
if (completed && !completedWithError) {
completed = true;
completedWithError = true;
notifier.onCallFailed(error.message);
} else {
anyCallSucceeded = true;
if (gotMetadata) {
if (hostname === null) {
notifier.onCallFailed('Hostname omitted from call metadata');
} else {
notifier.onCallSucceeded(hostname);
}
}
})
}
};
const method = (type === 'EMPTY_CALL' ? client.emptyCall : client.unaryCall).bind(client);
const call = method({}, currentConfig.metadata[type], {deadline}, callback);
call.on('metadata', (metadata) => {
hostname = (metadata.get('hostname') as string[])[0] ?? null;
gotMetadata = true;
if (completed && !completedWithError) {
if (hostname === null) {
notifier.onCallFailed('Hostname omitted from call metadata');
} else {
notifier.onCallSucceeded(hostname);
}
}
});
}
function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
setInterval(() => {
for (const callType of currentConfig.callTypes) {
makeSingleRequest(client, callType, failOnFailedRpcs, callStatsTracker);
}
}, 1000/qps);
}
@ -234,11 +285,30 @@ function main() {
}, (error) => {
callback({code: grpc.status.ABORTED, details: 'Call stats collection failed'});
});
},
GetClientAccumulatedStats: (call, callback) => {
callback(null, accumulatedStats);
}
}
const xdsUpdateClientConfigureServiceImpl: XdsUpdateClientConfigureServiceHandlers = {
Configure: (call, callback) => {
const callMetadata = {
EMPTY_CALL: new grpc.Metadata(),
UNARY_CALL: new grpc.Metadata()
}
for (const metadataItem of call.request.metadata) {
callMetadata[metadataItem.type].add(metadataItem.key, metadataItem.value);
}
currentConfig.callTypes = call.request.types;
currentConfig.metadata = callMetadata;
currentConfig.timeoutSec = call.request.timeout_sec
}
}
const server = new grpc.Server();
server.addService(loadedProto.grpc.testing.LoadBalancerStatsService.service, loadBalancerStatsServiceImpl);
server.addService(loadedProto.grpc.testing.XdsUpdateClientConfigureService.service, xdsUpdateClientConfigureServiceImpl);
server.bindAsync(`0.0.0.0:${argv.stats_port}`, grpc.ServerCredentials.createInsecure(), (error, port) => {
if (error) {
throw error;

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js-xds",
"version": "1.0.0",
"version": "1.2.1",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js",
"scripts": {
@ -32,22 +32,38 @@
"homepage": "https://github.com/grpc/grpc-node#readme",
"devDependencies": {
"@grpc/grpc-js": "file:../grpc-js",
"gts": "^2.0.2",
"typescript": "^3.8.3",
"@types/gulp": "^4.0.6",
"@types/gulp-mocha": "0.0.32",
"@types/mocha": "^5.2.6",
"@types/node": "^13.11.1",
"@types/yargs": "^15.0.5",
"gts": "^2.0.2",
"typescript": "^3.8.3",
"yargs": "^15.4.1"
},
"dependencies": {
"@grpc/proto-loader": "^0.6.0-pre14"
"@grpc/proto-loader": "^0.6.0-pre14",
"google-auth-library": "^7.0.2",
"re2-wasm": "^1.0.1"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.2.0"
"@grpc/grpc-js": "~1.2.2"
},
"engines": {
"node": ">=10.10.0"
}
},
"files": [
"src/**/*.ts",
"build/src/**/*.{js,d.ts,js.map}",
"deps/envoy-api/envoy/api/v2/**/*.proto",
"deps/envoy-api/envoy/config/**/*.proto",
"deps/envoy-api/envoy/service/**/*.proto",
"deps/envoy-api/envoy/type/**/*.proto",
"deps/envoy-api/envoy/annotations/**/*.proto",
"deps/googleapis/google/api/**/*.proto",
"deps/googleapis/google/protobuf/**/*.proto",
"deps/googleapis/google/rpc/**/*.proto",
"deps/udpa/udpa/annotations/**/*.proto",
"deps/protoc-gen-validate/validate/**/*.proto"
]
}

View File

@ -212,3 +212,59 @@ message LoadBalancerStatsResponse {
int32 num_failures = 2;
map<string, RpcsByPeer> rpcs_by_method = 3;
}
// Request for retrieving a test client's accumulated stats.
message LoadBalancerAccumulatedStatsRequest {}
// Accumulated stats for RPCs sent by a test client.
message LoadBalancerAccumulatedStatsResponse {
// The total number of RPCs have ever issued for each type.
// Deprecated: use stats_per_method.rpcs_started instead.
map<string, int32> num_rpcs_started_by_method = 1 [deprecated = true];
// The total number of RPCs have ever completed successfully for each type.
// Deprecated: use stats_per_method.result instead.
map<string, int32> num_rpcs_succeeded_by_method = 2 [deprecated = true];
// The total number of RPCs have ever failed for each type.
// Deprecated: use stats_per_method.result instead.
map<string, int32> num_rpcs_failed_by_method = 3 [deprecated = true];
message MethodStats {
// The number of RPCs that were started for this method.
int32 rpcs_started = 1;
// The number of RPCs that completed with each status for this method. The
// key is the integral value of a google.rpc.Code; the value is the count.
map<int32, int32> result = 2;
}
// Per-method RPC statistics. The key is the RpcType in string form; e.g.
// 'EMPTY_CALL' or 'UNARY_CALL'
map<string, MethodStats> stats_per_method = 4;
}
// Configurations for a test client.
message ClientConfigureRequest {
// Type of RPCs to send.
enum RpcType {
EMPTY_CALL = 0;
UNARY_CALL = 1;
}
// Metadata to be attached for the given type of RPCs.
message Metadata {
RpcType type = 1;
string key = 2;
string value = 3;
}
// The types of RPCs the client sends.
repeated RpcType types = 1;
// The collection of custom metadata to be attached to RPCs sent by the client.
repeated Metadata metadata = 2;
// The deadline to use, in seconds, for all RPCs. If unset or zero, the
// client will use the default from the command-line.
int32 timeout_sec = 3;
}
// Response for updating a test client's configuration.
message ClientConfigureResponse {}

View File

@ -83,6 +83,10 @@ service LoadBalancerStatsService {
// Gets the backend distribution for RPCs sent by a test client.
rpc GetClientStats(LoadBalancerStatsRequest)
returns (LoadBalancerStatsResponse) {}
// Gets the accumulated stats for RPCs sent by a test client.
rpc GetClientAccumulatedStats(LoadBalancerAccumulatedStatsRequest)
returns (LoadBalancerAccumulatedStatsResponse) {}
}
// A service to remotely control health status of an xDS test server.
@ -90,3 +94,9 @@ service XdsUpdateHealthService {
rpc SetServing(grpc.testing.Empty) returns (grpc.testing.Empty);
rpc SetNotServing(grpc.testing.Empty) returns (grpc.testing.Empty);
}
// A service to dynamically update the configuration of an xDS test client.
service XdsUpdateClientConfigureService {
// Update the tes client's configuration.
rpc Configure(ClientConfigureRequest) returns (ClientConfigureResponse);
}

View File

@ -51,8 +51,9 @@ grpc/tools/run_tests/helper_scripts/prep_xds.sh
GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver \
GRPC_NODE_VERBOSITY=DEBUG \
NODE_XDS_INTEROP_VERBOSITY=1 \
GRPC_XDS_EXPERIMENTAL_ROUTING=true \
python3 grpc/tools/run_tests/run_xds_tests.py \
--test_case="backends_restart,change_backend_service,gentle_failover,ping_pong,remove_instance_group,round_robin,secondary_locality_gets_no_requests_on_partial_primary_failure,secondary_locality_gets_requests_on_primary_failure" \
--test_case="all" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server-2 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \

View File

@ -0,0 +1,22 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Environment variable protection for traffic splitting and routing
* https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#xds-resolver-and-xds-client
*/
export const GRPC_XDS_EXPERIMENTAL_ROUTING = (process.env.GRPC_XDS_EXPERIMENTAL_ROUTING === 'true');

View File

@ -0,0 +1,27 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { GoogleAuth } from 'google-auth-library';
import { ChannelCredentials, CallCredentials } from '@grpc/grpc-js';
export function createGoogleDefaultCredentials(): ChannelCredentials {
const sslCreds = ChannelCredentials.createSsl();
const googleAuthCreds = CallCredentials.createFromGoogleCredential(
new GoogleAuth()
);
return sslCreds.compose(googleAuthCreds);
}

View File

@ -21,6 +21,7 @@ import * as load_balancer_eds from './load-balancer-eds';
import * as load_balancer_lrs from './load-balancer-lrs';
import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager';
/**
* Register the "xds:" name scheme with the @grpc/grpc-js library.
@ -32,4 +33,5 @@ export function register() {
load_balancer_lrs.setup();
load_balancer_priority.setup();
load_balancer_weighted_target.setup();
load_balancer_xds_cluster_manager.setup();
}

View File

@ -16,7 +16,7 @@
*/
import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
import { XdsClient, Watcher } from './xds-client';
import { getSingletonXdsClient, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import SubchannelAddress = experimental.SubchannelAddress;
import UnavailablePicker = experimental.UnavailablePicker;
@ -26,6 +26,7 @@ import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import { EdsLoadBalancingConfig } from './load-balancer-eds';
import { Watcher } from './xds-stream-state/xds-stream-state';
const TRACER_NAME = 'cds_balancer';
@ -65,7 +66,6 @@ export class CdsLoadBalancingConfig implements LoadBalancingConfig {
export class CdsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private xdsClient: XdsClient | null = null;
private watcher: Watcher<Cluster__Output>;
private isWatcherActive = false;
@ -121,12 +121,7 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
trace('Discarding address list update missing xdsClient attribute');
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
this.xdsClient = attributes.xdsClient;
this.latestAttributes = attributes;
/* If the cluster is changing, disable the old watcher before adding the new
@ -136,7 +131,7 @@ export class CdsLoadBalancer implements LoadBalancer {
this.latestConfig?.getCluster() !== lbConfig.getCluster()
) {
trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster());
this.xdsClient.removeClusterWatcher(
getSingletonXdsClient().removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
@ -152,7 +147,7 @@ export class CdsLoadBalancer implements LoadBalancer {
if (!this.isWatcherActive) {
trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster());
this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher);
getSingletonXdsClient().addClusterWatcher(lbConfig.getCluster(), this.watcher);
this.isWatcherActive = true;
}
}
@ -166,7 +161,7 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster());
this.childBalancer.destroy();
if (this.isWatcherActive) {
this.xdsClient?.removeClusterWatcher(
getSingletonXdsClient().removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);

View File

@ -16,7 +16,7 @@
*/
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental } from '@grpc/grpc-js';
import { XdsClient, Watcher, XdsClusterDropStats } from './xds-client';
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client';
import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from './load-balancer-priority';
@ -33,6 +33,7 @@ import PickResultType = experimental.PickResultType;
import { validateLoadBalancingConfig } from '@grpc/grpc-js/build/src/experimental';
import { WeightedTarget, WeightedTargetLoadBalancingConfig } from './load-balancer-weighted-target';
import { LrsLoadBalancingConfig } from './load-balancer-lrs';
import { Watcher } from './xds-stream-state/xds-stream-state';
const TRACER_NAME = 'eds_balancer';
@ -122,11 +123,10 @@ export class EdsLoadBalancer implements LoadBalancer {
* requests.
*/
private childBalancer: ChildLoadBalancerHandler;
private xdsClient: XdsClient | null = null;
private edsServiceName: string | null = null;
private watcher: Watcher<ClusterLoadAssignment__Output>;
/**
* Indicates whether the watcher has already been passed to this.xdsClient
* Indicates whether the watcher has already been passed to the xdsClient
* and is getting updates.
*/
private isWatcherActive = false;
@ -377,7 +377,7 @@ export class EdsLoadBalancer implements LoadBalancer {
validateLoadBalancingConfig({ round_robin: {} }),
];
let childPolicy: LoadBalancingConfig[];
if (this.lastestConfig.getLrsLoadReportingServerName()) {
if (this.lastestConfig.getLrsLoadReportingServerName() !== undefined) {
childPolicy = [new LrsLoadBalancingConfig(this.lastestConfig.getCluster(), this.lastestConfig.getEdsServiceName() ?? '', this.lastestConfig.getLrsLoadReportingServerName()!, localityObj.locality, endpointPickingPolicy)];
} else {
childPolicy = endpointPickingPolicy;
@ -427,21 +427,16 @@ export class EdsLoadBalancer implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
trace('Discarding address list update missing xdsClient attribute');
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
this.lastestConfig = lbConfig;
this.latestAttributes = attributes;
this.xdsClient = attributes.xdsClient;
const newEdsServiceName = lbConfig.getEdsServiceName() ?? lbConfig.getCluster();
/* If the name is changing, disable the old watcher before adding the new
* one */
if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) {
trace('Removing old endpoint watcher for edsServiceName ' + this.edsServiceName)
this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher);
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName!, this.watcher);
/* Setting isWatcherActive to false here lets us have one code path for
* calling addEndpointWatcher */
this.isWatcherActive = false;
@ -454,12 +449,12 @@ export class EdsLoadBalancer implements LoadBalancer {
if (!this.isWatcherActive) {
trace('Adding new endpoint watcher for edsServiceName ' + this.edsServiceName);
this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher);
getSingletonXdsClient().addEndpointWatcher(this.edsServiceName, this.watcher);
this.isWatcherActive = true;
}
if (lbConfig.getLrsLoadReportingServerName()) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
this.clusterDropStats = getSingletonXdsClient().addClusterDropStats(
lbConfig.getLrsLoadReportingServerName()!,
lbConfig.getCluster(),
lbConfig.getEdsServiceName() ?? ''
@ -480,7 +475,7 @@ export class EdsLoadBalancer implements LoadBalancer {
destroy(): void {
trace('Destroying load balancer with edsServiceName ' + this.edsServiceName);
if (this.edsServiceName) {
this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher);
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName, this.watcher);
}
this.childBalancer.destroy();
}

View File

@ -16,9 +16,8 @@
*/
import { connectivityState as ConnectivityState, StatusObject, status as Status, experimental } from '@grpc/grpc-js';
import { type } from 'os';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { XdsClusterLocalityStats, XdsClient } from './xds-client';
import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds-client';
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
@ -208,10 +207,7 @@ export class LrsLoadBalancer implements LoadBalancer {
if (!(lbConfig instanceof LrsLoadBalancingConfig)) {
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
return;
}
this.localityStatsReporter = attributes.xdsClient.addClusterLocalityStats(
this.localityStatsReporter = getSingletonXdsClient().addClusterLocalityStats(
lbConfig.getLrsLoadReportingServerName(),
lbConfig.getClusterName(),
lbConfig.getEdsServiceName(),

View File

@ -0,0 +1,295 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { connectivityState as ConnectivityState, status as Status, experimental, logVerbosity, Metadata, status } from "@grpc/grpc-js/";
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig;
import LoadBalancer = experimental.LoadBalancer;
import Picker = experimental.Picker;
import PickResult = experimental.PickResult;
import PickArgs = experimental.PickArgs;
import PickResultType = experimental.PickResultType;
import UnavailablePicker = experimental.UnavailablePicker;
import QueuePicker = experimental.QueuePicker;
import SubchannelAddress = experimental.SubchannelAddress;
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import getFirstUsableConfig = experimental.getFirstUsableConfig;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
const TRACER_NAME = 'weighted_target';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const TYPE_NAME = 'xds_cluster_manager';
interface ClusterManagerChild {
child_policy: LoadBalancingConfig[];
}
export class XdsClusterManagerLoadBalancingConfig implements LoadBalancingConfig {
getLoadBalancerName(): string {
return TYPE_NAME;
}
constructor(private children: Map<string, ClusterManagerChild>) {}
getChildren() {
return this.children;
}
toJsonObject(): object {
const childrenField: {[key: string]: object} = {};
for (const [childName, childValue] of this.children.entries()) {
childrenField[childName] = {
child_policy: childValue.child_policy.map(policy => policy.toJsonObject())
};
}
return {
[TYPE_NAME]: {
children: childrenField
}
}
}
static createFromJson(obj: any): XdsClusterManagerLoadBalancingConfig {
const childrenMap: Map<string, ClusterManagerChild> = new Map<string, ClusterManagerChild>();
if (!('children' in obj && obj.children !== null && typeof obj.children === 'object')) {
throw new Error('xds_cluster_manager config must have a children map');
}
for (const key of obj.children) {
const childObj = obj.children[key];
if (!('child_policy' in childObj && Array.isArray(childObj.child_policy))) {
throw new Error(`xds_cluster_manager child ${key} must have a child_policy array`);
}
const validatedChild = {
child_policy: childObj.child_policy.map(validateLoadBalancingConfig)
};
childrenMap.set(key, validatedChild);
}
return new XdsClusterManagerLoadBalancingConfig(childrenMap);
}
}
class XdsClusterManagerPicker implements Picker {
constructor(private childPickers: Map<string, Picker>) {}
pick(pickArgs: PickArgs): PickResult {
/* extraPickInfo.cluster should be set for all calls by the config selector
* corresponding to the service config that specified the use of this LB
* policy. */
const cluster = pickArgs.extraPickInfo.cluster ?? '';
if (this.childPickers.has(cluster)) {
return this.childPickers.get(cluster)!.pick(pickArgs);
} else {
return {
pickResultType: PickResultType.TRANSIENT_FAILURE,
status: {
code: status.INTERNAL,
details: `Requested cluster ${cluster} not found`,
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactory: null,
onCallStarted: null
};
}
}
}
interface XdsClusterManagerChild {
updateAddressList(addressList: SubchannelAddress[], lbConfig: ClusterManagerChild, attributes: { [key: string]: unknown; }): void;
exitIdle(): void;
resetBackoff(): void;
destroy(): void;
getConnectivityState(): ConnectivityState;
getPicker(): Picker;
}
class XdsClusterManager implements LoadBalancer {
private XdsClusterManagerChildImpl = class implements XdsClusterManagerChild {
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private picker: Picker;
private childBalancer: ChildLoadBalancerHandler;
constructor(private parent: XdsClusterManager, private name: string) {
this.childBalancer = new ChildLoadBalancerHandler({
createSubchannel: (subchannelAddress, subchannelOptions) => {
return this.parent.channelControlHelper.createSubchannel(subchannelAddress, subchannelOptions);
},
updateState: (connectivityState, picker) => {
this.updateState(connectivityState, picker);
},
requestReresolution: () => {
this.parent.channelControlHelper.requestReresolution();
}
});
this.picker = new QueuePicker(this.childBalancer);
}
private updateState(connectivityState: ConnectivityState, picker: Picker) {
trace('Child ' + this.name + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[connectivityState]);
this.connectivityState = connectivityState;
this.picker = picker;
this.parent.updateState();
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: ClusterManagerChild, attributes: { [key: string]: unknown; }): void {
const childConfig = getFirstUsableConfig(lbConfig.child_policy);
if (childConfig !== null) {
this.childBalancer.updateAddressList(addressList, childConfig, attributes);
}
}
exitIdle(): void {
this.childBalancer.exitIdle();
}
resetBackoff(): void {
this.childBalancer.resetBackoff();
}
destroy(): void {
this.childBalancer.destroy();
}
getConnectivityState(): ConnectivityState {
return this.connectivityState;
}
getPicker(): Picker {
return this.picker;
}
}
// End of XdsClusterManagerChildImpl
private children: Map<string, XdsClusterManagerChild> = new Map<string, XdsClusterManagerChild>();
// Shutdown is a placeholder value that will never appear in normal operation.
private currentState: ConnectivityState = ConnectivityState.SHUTDOWN;
constructor(private channelControlHelper: ChannelControlHelper) {}
private updateState() {
const pickerMap: Map<string, Picker> = new Map<string, Picker>();
let anyReady = false;
let anyConnecting = false;
let anyIdle = false;
for (const [name, child] of this.children.entries()) {
pickerMap.set(name, child.getPicker());
switch (child.getConnectivityState()) {
case ConnectivityState.READY:
anyReady = true;
break;
case ConnectivityState.CONNECTING:
anyConnecting = true;
break;
case ConnectivityState.IDLE:
anyIdle = true;
break;
}
}
let connectivityState: ConnectivityState;
if (anyReady) {
connectivityState = ConnectivityState.READY;
} else if (anyConnecting) {
connectivityState = ConnectivityState.CONNECTING;
} else if (anyIdle) {
connectivityState = ConnectivityState.IDLE;
} else {
connectivityState = ConnectivityState.TRANSIENT_FAILURE;
}
/* For each of the states CONNECTING, IDLE, and TRANSIENT_FAILURE, there is
* exactly one corresponding picker, so if the state is one of those and
* that does not change, no new information is provided by passing the
* new state upward. */
if (connectivityState === this.currentState && connectivityState !== ConnectivityState.READY) {
return;
}
let picker: Picker;
switch (connectivityState) {
case ConnectivityState.READY:
picker = new XdsClusterManagerPicker(pickerMap);
break;
case ConnectivityState.CONNECTING:
case ConnectivityState.IDLE:
picker = new QueuePicker(this);
break;
default:
picker = new UnavailablePicker({
code: Status.UNAVAILABLE,
details: 'xds_cluster_manager: all children report state TRANSIENT_FAILURE',
metadata: new Metadata()
});
}
trace(
'Transitioning to ' +
ConnectivityState[connectivityState]
);
this.channelControlHelper.updateState(connectivityState, picker);
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
const configChildren = lbConfig.getChildren();
// Delete children that are not in the new config
const namesToRemove: string[] = [];
for (const name of this.children.keys()) {
if (!configChildren.has(name)) {
namesToRemove.push(name);
}
}
for (const name of namesToRemove) {
this.children.get(name)!.destroy();
this.children.delete(name);
}
// Add new children that were not in the previous config
for (const [name, childConfig] of configChildren.entries()) {
if (!this.children.has(name)) {
const newChild = new this.XdsClusterManagerChildImpl(this, name);
newChild.updateAddressList(addressList, childConfig, attributes);
this.children.set(name, newChild);
}
}
this.updateState();
}
exitIdle(): void {
for (const child of this.children.values()) {
child.exitIdle();
}
}
resetBackoff(): void {
for (const child of this.children.values()) {
child.resetBackoff();
}
}
destroy(): void {
for (const child of this.children.values()) {
child.destroy();
}
this.children.clear();
}
getTypeName(): string {
return TYPE_NAME;
}
}
export function setup() {
registerLoadBalancerType(TYPE_NAME, XdsClusterManager, XdsClusterManagerLoadBalancingConfig);
}

View File

@ -14,7 +14,11 @@
* limitations under the License.
*/
import { XdsClient } from './xds-client';
import * as protoLoader from '@grpc/proto-loader';
import { RE2 } from 're2-wasm';
import { getSingletonXdsClient, XdsClient } from './xds-client';
import { StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions } from '@grpc/grpc-js';
import Resolver = experimental.Resolver;
import GrpcUri = experimental.GrpcUri;
@ -22,6 +26,18 @@ import ResolverListener = experimental.ResolverListener;
import uriToString = experimental.uriToString;
import ServiceConfig = experimental.ServiceConfig;
import registerResolver = experimental.registerResolver;
import { Listener__Output } from './generated/envoy/api/v2/Listener';
import { Watcher } from './xds-stream-state/xds-stream-state';
import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration';
import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
import { GRPC_XDS_EXPERIMENTAL_ROUTING } from './environment';
import { CdsLoadBalancingConfig } from './load-balancer-cds';
import { VirtualHost__Output } from './generated/envoy/api/v2/route/VirtualHost';
import { RouteMatch__Output } from './generated/envoy/api/v2/route/RouteMatch';
import { HeaderMatcher__Output } from './generated/envoy/api/v2/route/HeaderMatcher';
import ConfigSelector = experimental.ConfigSelector;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import { XdsClusterManagerLoadBalancingConfig } from './load-balancer-xds-cluster-manager';
const TRACER_NAME = 'xds_resolver';
@ -29,56 +45,436 @@ function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
// Better match type has smaller value.
enum MatchType {
EXACT_MATCH,
SUFFIX_MATCH,
PREFIX_MATCH,
UNIVERSE_MATCH,
INVALID_MATCH,
};
function domainPatternMatchType(domainPattern: string): MatchType {
if (domainPattern.length === 0) {
return MatchType.INVALID_MATCH;
}
if (domainPattern.indexOf('*') < 0) {
return MatchType.EXACT_MATCH;
}
if (domainPattern === '*') {
return MatchType.UNIVERSE_MATCH;
}
if (domainPattern.startsWith('*')) {
return MatchType.SUFFIX_MATCH;
}
if (domainPattern.endsWith('*')) {
return MatchType.PREFIX_MATCH;
}
return MatchType.INVALID_MATCH;
}
function domainMatch(matchType: MatchType, domainPattern: string, expectedHostName: string) {
switch (matchType) {
case MatchType.EXACT_MATCH:
return expectedHostName === domainPattern;
case MatchType.SUFFIX_MATCH:
return expectedHostName.endsWith(domainPattern.substring(1));
case MatchType.PREFIX_MATCH:
return expectedHostName.startsWith(domainPattern.substring(0, domainPattern.length - 1));
case MatchType.UNIVERSE_MATCH:
return true;
case MatchType.INVALID_MATCH:
return false;
}
}
function findVirtualHostForDomain(virutalHostList: VirtualHost__Output[], domain: string): VirtualHost__Output | null {
let targetVhost: VirtualHost__Output | null = null;
let bestMatchType: MatchType = MatchType.INVALID_MATCH;
let longestMatch = 0;
for (const virtualHost of virutalHostList) {
for (const domainPattern of virtualHost.domains) {
const matchType = domainPatternMatchType(domainPattern);
// If we already have a match of a better type, skip this one
if (matchType > bestMatchType) {
continue;
}
// If we already have a longer match of the same type, skip this one
if (matchType === bestMatchType && domainPattern.length <= longestMatch) {
continue;
}
if (domainMatch(matchType, domainPattern, domain)) {
targetVhost = virtualHost;
bestMatchType = matchType;
longestMatch = domainPattern.length;
}
if (bestMatchType === MatchType.EXACT_MATCH) {
break;
}
}
if (bestMatchType === MatchType.EXACT_MATCH) {
break;
}
}
return targetVhost;
}
interface Matcher {
(methodName: string, metadata: Metadata): boolean;
}
const numberRegex = new RE2(/^-?\d+$/u);
function getPredicateForHeaderMatcher(headerMatch: HeaderMatcher__Output): Matcher {
let valueChecker: (value: string) => boolean;
switch (headerMatch.header_match_specifier) {
case 'exact_match':
valueChecker = value => value === headerMatch.exact_match;
break;
case 'safe_regex_match':
const regex = new RE2(`^${headerMatch.safe_regex_match}$`, 'u');
valueChecker = value => regex.test(value);
break;
case 'range_match':
const start = BigInt(headerMatch.range_match!.start);
const end = BigInt(headerMatch.range_match!.end);
valueChecker = value => {
if (!numberRegex.test(value)) {
return false;
}
const numberValue = BigInt(value);
return start <= numberValue && numberValue < end;
}
break;
case 'present_match':
valueChecker = value => true;
break;
case 'prefix_match':
valueChecker = value => value.startsWith(headerMatch.prefix_match!);
break;
case 'suffix_match':
valueChecker = value => value.endsWith(headerMatch.suffix_match!);
break;
default:
// Should be prevented by validation rules
return (methodName, metadata) => false;
}
const headerMatcher: Matcher = (methodName, metadata) => {
if (headerMatch.name.endsWith('-bin')) {
return false;
}
let value: string;
if (headerMatch.name === 'content-type') {
value = 'application/grpc';
} else {
const valueArray = metadata.get(headerMatch.name);
if (valueArray.length === 0) {
return false;
} else {
value = valueArray.join(',');
}
}
return valueChecker(value);
}
if (headerMatch.invert_match) {
return (methodName, metadata) => !headerMatcher(methodName, metadata);
} else {
return headerMatcher;
}
}
const RUNTIME_FRACTION_DENOMINATOR_VALUES = {
HUNDRED: 100,
TEN_THOUSAND: 10_000,
MILLION: 1_000_000
}
function getPredicateForMatcher(routeMatch: RouteMatch__Output): Matcher {
let pathMatcher: Matcher;
switch (routeMatch.path_specifier) {
case 'prefix':
if (routeMatch.case_sensitive?.value === false) {
const prefix = routeMatch.prefix!.toLowerCase();
pathMatcher = (methodName, metadata) => (methodName.toLowerCase().startsWith(prefix));
} else {
const prefix = routeMatch.prefix!;
pathMatcher = (methodName, metadata) => (methodName.startsWith(prefix));
}
break;
case 'path':
if (routeMatch.case_sensitive?.value === false) {
const path = routeMatch.path!.toLowerCase();
pathMatcher = (methodName, metadata) => (methodName.toLowerCase() === path);
} else {
const path = routeMatch.path!;
pathMatcher = (methodName, metadata) => (methodName === path);
}
break;
case 'safe_regex':
const flags = routeMatch.case_sensitive?.value === false ? 'ui' : 'u';
const regex = new RE2(`^${routeMatch.safe_regex!.regex!}$`, flags);
pathMatcher = (methodName, metadata) => (regex.test(methodName));
break;
default:
// Should be prevented by validation rules
return (methodName, metadata) => false;
}
const headerMatchers: Matcher[] = routeMatch.headers.map(getPredicateForHeaderMatcher);
let runtimeFractionHandler: () => boolean;
if (!routeMatch.runtime_fraction?.default_value) {
runtimeFractionHandler = () => true;
} else {
const numerator = routeMatch.runtime_fraction.default_value.numerator;
const denominator = RUNTIME_FRACTION_DENOMINATOR_VALUES[routeMatch.runtime_fraction.default_value.denominator];
runtimeFractionHandler = () => {
const randomNumber = Math.random() * denominator;
return randomNumber < numerator;
}
}
return (methodName, metadata) => pathMatcher(methodName, metadata) && headerMatchers.every(matcher => matcher(methodName, metadata)) && runtimeFractionHandler();
}
class XdsResolver implements Resolver {
private resolutionStarted = false;
private hasReportedSuccess = false;
private ldsWatcher: Watcher<Listener__Output>;
private rdsWatcher: Watcher<RouteConfiguration__Output>
private isLdsWatcherActive = false;
/**
* The latest route config name from an LDS response. The RDS watcher is
* actively watching that name if and only if this is not null.
*/
private latestRouteConfigName: string | null = null;
private latestRouteConfig: RouteConfiguration__Output | null = null;
private clusterRefcounts = new Map<string, {inLastConfig: boolean, refCount: number}>();
constructor(
private target: GrpcUri,
private listener: ResolverListener,
private channelOptions: ChannelOptions
) {}
) {
this.ldsWatcher = {
onValidUpdate: (update: Listener__Output) => {
const httpConnectionManager = update.api_listener!
.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds': {
const routeConfigName = httpConnectionManager.rds!.route_config_name;
if (this.latestRouteConfigName !== routeConfigName) {
if (this.latestRouteConfigName !== null) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
getSingletonXdsClient().addRouteWatcher(httpConnectionManager.rds!.route_config_name, this.rdsWatcher);
this.latestRouteConfigName = routeConfigName;
}
break;
}
case 'route_config':
if (this.latestRouteConfigName) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
this.handleRouteConfig(httpConnectionManager.route_config!);
break;
default:
// This is prevented by the validation rules
}
},
onTransientError: (error: StatusObject) => {
/* A transient error only needs to bubble up as a failure if we have
* not already provided a ServiceConfig for the upper layer to use */
if (!this.hasReportedSuccess) {
trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details);
this.reportResolutionError(error.details);
}
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ': LDS resource does not exist');
this.reportResolutionError(`Listener ${this.target} does not exist`);
}
};
this.rdsWatcher = {
onValidUpdate: (update: RouteConfiguration__Output) => {
this.handleRouteConfig(update);
},
onTransientError: (error: StatusObject) => {
/* A transient error only needs to bubble up as a failure if we have
* not already provided a ServiceConfig for the upper layer to use */
if (!this.hasReportedSuccess) {
trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details);
this.reportResolutionError(error.details);
}
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ' and route config ' + this.latestRouteConfigName + ': RDS resource does not exist');
this.reportResolutionError(`Route config ${this.latestRouteConfigName} does not exist`);
}
}
}
private reportResolutionError() {
private refCluster(clusterName: string) {
const refCount = this.clusterRefcounts.get(clusterName);
if (refCount) {
refCount.refCount += 1;
}
}
private unrefCluster(clusterName: string) {
const refCount = this.clusterRefcounts.get(clusterName);
if (refCount) {
refCount.refCount -= 1;
if (!refCount.inLastConfig && refCount.refCount === 0) {
this.clusterRefcounts.delete(clusterName);
this.handleRouteConfig(this.latestRouteConfig!);
}
}
}
private handleRouteConfig(routeConfig: RouteConfiguration__Output) {
this.latestRouteConfig = routeConfig;
if (GRPC_XDS_EXPERIMENTAL_ROUTING) {
const virtualHost = findVirtualHostForDomain(routeConfig.virtual_hosts, this.target.path);
if (virtualHost === null) {
this.reportResolutionError('No matching route found');
return;
}
const allConfigClusters = new Set<string>();
const matchList: {matcher: Matcher, action: () => string}[] = [];
for (const route of virtualHost.routes) {
let routeAction: () => string;
switch (route.route!.cluster_specifier) {
case 'cluster_header':
continue;
case 'cluster':{
const cluster = route.route!.cluster!;
allConfigClusters.add(cluster);
routeAction = () => cluster;
break;
}
case 'weighted_clusters': {
let lastNumerator = 0;
// clusterChoices is essentially the weighted choices represented as a CDF
const clusterChoices: {cluster: string, numerator: number}[] = [];
for (const clusterWeight of route.route!.weighted_clusters!.clusters) {
allConfigClusters.add(clusterWeight.name);
lastNumerator = lastNumerator + (clusterWeight.weight?.value ?? 0);
clusterChoices.push({cluster: clusterWeight.name, numerator: lastNumerator});
}
routeAction = () => {
const randomNumber = Math.random() * (route.route!.weighted_clusters!.total_weight?.value ?? 100);
for (const choice of clusterChoices) {
if (randomNumber < choice.numerator) {
return choice.cluster;
}
}
// This should be prevented by the validation rules
return '';
}
}
}
const routeMatcher = getPredicateForMatcher(route.match!);
matchList.push({matcher: routeMatcher, action: routeAction});
}
/* Mark clusters that are not in this route config, and remove ones with
* no references */
for (const [name, refCount] of Array.from(this.clusterRefcounts.entries())) {
if (!allConfigClusters.has(name)) {
refCount.inLastConfig = false;
if (refCount.refCount === 0) {
this.clusterRefcounts.delete(name);
}
}
}
// Add any new clusters from this route config
for (const name of allConfigClusters) {
if (this.clusterRefcounts.has(name)) {
this.clusterRefcounts.get(name)!.inLastConfig = true;
} else {
this.clusterRefcounts.set(name, {inLastConfig: true, refCount: 0});
}
}
const configSelector: ConfigSelector = (methodName, metadata) => {
for (const {matcher, action} of matchList) {
if (matcher(methodName, metadata)) {
const clusterName = action();
this.refCluster(clusterName);
const onCommitted = () => {
this.unrefCluster(clusterName);
}
return {
methodConfig: {name: []},
onCommitted: onCommitted,
pickInformation: {cluster: clusterName},
status: status.OK
};
}
}
return {
methodConfig: {name: []},
// cluster won't be used here, but it's set because of some TypeScript weirdness
pickInformation: {cluster: ''},
status: status.UNAVAILABLE
};
};
const clusterConfigMap = new Map<string, {child_policy: LoadBalancingConfig[]}>();
for (const clusterName of this.clusterRefcounts.keys()) {
clusterConfigMap.set(clusterName, {child_policy: [new CdsLoadBalancingConfig(clusterName)]});
}
const lbPolicyConfig = new XdsClusterManagerLoadBalancingConfig(clusterConfigMap);
const serviceConfig: ServiceConfig = {
methodConfig: [],
loadBalancingConfig: [lbPolicyConfig]
}
this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {});
} else {
// !GRPC_XDS_EXPERIMENTAL_ROUTING
for (const virtualHost of routeConfig.virtual_hosts) {
if (virtualHost.domains.indexOf(this.target.path) >= 0) {
const route = virtualHost.routes[virtualHost.routes.length - 1];
if (route.match?.prefix === '' && route.route?.cluster) {
trace('Reporting RDS update for host ' + uriToString(this.target) + ' with cluster ' + route.route.cluster);
this.listener.onSuccessfulResolution([], {
methodConfig: [],
loadBalancingConfig: [
new CdsLoadBalancingConfig(route.route.cluster)
],
}, null, null, {});
this.hasReportedSuccess = true;
return;
} else {
trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster);
}
}
}
this.reportResolutionError('No matching route found');
}
}
private reportResolutionError(reason: string) {
this.listener.onError({
code: status.UNAVAILABLE,
details: `xDS name resolution failed for target ${uriToString(
this.target
)}`,
)}: ${reason}`,
metadata: new Metadata(),
});
}
updateResolution(): void {
// Wait until updateResolution is called once to start the xDS requests
if (!this.resolutionStarted) {
this.resolutionStarted = true;
if (!this.isLdsWatcherActive) {
trace('Starting resolution for target ' + uriToString(this.target));
const xdsClient = new XdsClient(
this.target.path,
{
onValidUpdate: (update: ServiceConfig) => {
trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update));
this.hasReportedSuccess = true;
this.listener.onSuccessfulResolution([], update, null, {
xdsClient: xdsClient,
});
},
onTransientError: (error: StatusObject) => {
/* A transient error only needs to bubble up as a failure if we have
* not already provided a ServiceConfig for the upper layer to use */
if (!this.hasReportedSuccess) {
trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details);
this.reportResolutionError();
}
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ': resource does not exist');
this.reportResolutionError();
},
},
this.channelOptions
);
getSingletonXdsClient().addListenerWatcher(this.target.path, this.ldsWatcher);
this.isLdsWatcherActive = true;
}
}
destroy() {
getSingletonXdsClient().removeListenerWatcher(this.target.path, this.ldsWatcher);
if (this.latestRouteConfigName) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
}

View File

@ -48,8 +48,13 @@ import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfig
import { Any__Output } from './generated/google/protobuf/Any';
import BackoffTimeout = experimental.BackoffTimeout;
import ServiceConfig = experimental.ServiceConfig;
import createGoogleDefaultCredentials = experimental.createGoogleDefaultCredentials;
import { createGoogleDefaultCredentials } from './google-default-credentials';
import { CdsLoadBalancingConfig } from './load-balancer-cds';
import { EdsState } from './xds-stream-state/eds-state';
import { CdsState } from './xds-stream-state/cds-state';
import { RdsState } from './xds-stream-state/rds-state';
import { LdsState } from './xds-stream-state/lds-state';
import { Watcher } from './xds-stream-state/xds-stream-state';
const TRACER_NAME = 'xds_client';
@ -131,12 +136,6 @@ function localityEqual(
);
}
export interface Watcher<UpdateType> {
onValidUpdate(update: UpdateType): void;
onTransientError(error: StatusObject): void;
onResourceDoesNotExist(): void;
}
export interface XdsClusterDropStats {
addCallDropped(category: string): void;
}
@ -219,450 +218,6 @@ class ClusterLoadReportMap {
}
}
interface XdsStreamState<ResponseType> {
versionInfo: string;
nonce: string;
getResourceNames(): string[];
/**
* Returns a string containing the error details if the message should be nacked,
* or null if it should be acked.
* @param responses
*/
handleResponses(responses: ResponseType[]): string | null;
reportStreamError(status: StatusObject): void;
}
class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
public versionInfo = '';
public nonce = '';
private watchers: Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();
private latestResponses: ClusterLoadAssignment__Output[] = [];
constructor(private updateResourceNames: () => void) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param edsServiceName
* @param watcher
*/
addWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(edsServiceName, watchersEntry);
}
trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Removing EDS watcher for edsServiceName ' + edsServiceName);
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateResponse(message: ClusterLoadAssignment__Output) {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
return false;
}
}
}
return true;
}
/**
* Given a list of edsServiceNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
handleMissingNames(allEdsServiceNames: Set<string>) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allEdsServiceNames.has(edsServiceName)) {
trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: ClusterLoadAssignment__Output[]) {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('EDS validation failed for message ' + JSON.stringify(message));
return 'EDS Error: ClusterLoadAssignment validation failed';
}
}
this.latestResponses = responses;
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received EDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
class CdsState implements XdsStreamState<Cluster__Output> {
versionInfo = '';
nonce = '';
private watchers: Map<string, Watcher<Cluster__Output>[]> = new Map<
string,
Watcher<Cluster__Output>[]
>();
private latestResponses: Cluster__Output[] = [];
constructor(
private edsState: EdsState,
private updateResourceNames: () => void
) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param clusterName
* @param watcher
*/
addWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Adding CDS watcher for clusterName ' + clusterName);
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(clusterName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.name === clusterName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Removing CDS watcher for clusterName ' + clusterName);
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(clusterName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
private validateResponse(message: Cluster__Output): boolean {
if (message.type !== 'EDS') {
return false;
}
if (!message.eds_cluster_config?.eds_config?.ads) {
return false;
}
if (message.lb_policy !== 'ROUND_ROBIN') {
return false;
}
if (message.lrs_server) {
if (!message.lrs_server.self) {
return false;
}
}
return true;
}
/**
* Given a list of clusterNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
private handleMissingNames(allClusterNames: Set<string>) {
for (const [clusterName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(clusterName)) {
trace('Reporting CDS resource does not exist for clusterName ' + clusterName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: Cluster__Output[]): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('CDS validation failed for message ' + JSON.stringify(message));
return 'CDS Error: Cluster validation failed';
}
}
this.latestResponses = responses;
const allEdsServiceNames: Set<string> = new Set<string>();
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
allClusterNames.add(message.name);
const edsServiceName = message.eds_cluster_config?.service_name ?? '';
allEdsServiceNames.add(
edsServiceName === '' ? message.name : edsServiceName
);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received CDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
this.edsState.handleMissingNames(allEdsServiceNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
class RdsState implements XdsStreamState<RouteConfiguration__Output> {
versionInfo = '';
nonce = '';
private routeConfigName: string | null = null;
constructor(
private targetName: string,
private watcher: Watcher<ServiceConfig>,
private updateResouceNames: () => void
) {}
getResourceNames(): string[] {
return this.routeConfigName ? [this.routeConfigName] : [];
}
handleSingleMessage(message: RouteConfiguration__Output) {
for (const virtualHost of message.virtual_hosts) {
if (virtualHost.domains.indexOf(this.targetName) >= 0) {
const route = virtualHost.routes[virtualHost.routes.length - 1];
if (route.match?.prefix === '' && route.route?.cluster) {
trace('Reporting RDS update for host ' + this.targetName + ' with cluster ' + route.route.cluster);
this.watcher.onValidUpdate({
methodConfig: [],
loadBalancingConfig: [
new CdsLoadBalancingConfig(route.route.cluster)
],
});
return;
} else {
trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster);
}
}
}
trace('Reporting RDS resource does not exist from domain lists ' + message.virtual_hosts.map(virtualHost => virtualHost.domains));
/* If none of the routes match the one we are looking for, bubble up an
* error. */
this.watcher.onResourceDoesNotExist();
}
handleResponses(responses: RouteConfiguration__Output[]): string | null {
trace('Received RDS response with route config names ' + responses.map(message => message.name));
if (this.routeConfigName !== null) {
for (const message of responses) {
if (message.name === this.routeConfigName) {
this.handleSingleMessage(message);
return null;
}
}
}
return null;
}
setRouteConfigName(name: string | null) {
const oldName = this.routeConfigName;
this.routeConfigName = name;
if (name !== oldName) {
this.updateResouceNames();
}
}
reportStreamError(status: StatusObject): void {
this.watcher.onTransientError(status);
}
}
class LdsState implements XdsStreamState<Listener__Output> {
versionInfo = '';
nonce = '';
constructor(private targetName: string, private rdsState: RdsState) {}
getResourceNames(): string[] {
return [this.targetName];
}
private validateResponse(message: Listener__Output): boolean {
if (
!(
message.api_listener?.api_listener &&
protoLoader.isAnyExtension(message.api_listener.api_listener) &&
message.api_listener?.api_listener['@type'] ===
HTTP_CONNECTION_MANGER_TYPE_URL
)
) {
return false;
}
const httpConnectionManager = message.api_listener
?.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
return !!httpConnectionManager.rds?.config_source?.ads;
case 'route_config':
return true;
}
return false;
}
handleResponses(responses: Listener__Output[]): string | null {
trace('Received LDS update with names ' + responses.map(message => message.name));
for (const message of responses) {
if (message.name === this.targetName) {
if (this.validateResponse(message)) {
// The validation step ensures that this is correct
const httpConnectionManager = message.api_listener!
.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
trace('Received LDS update with RDS route config name ' + httpConnectionManager.rds!.route_config_name);
this.rdsState.setRouteConfigName(
httpConnectionManager.rds!.route_config_name
);
break;
case 'route_config':
trace('Received LDS update with route configuration');
this.rdsState.setRouteConfigName(null);
this.rdsState.handleSingleMessage(
httpConnectionManager.route_config!
);
break;
default:
// The validation rules should prevent this
}
} else {
trace('LRS validation error for message ' + JSON.stringify(message));
return 'LRS Error: Listener validation failed';
}
}
}
return null;
}
reportStreamError(status: StatusObject): void {
// Nothing to do here
}
}
interface AdsState {
[EDS_TYPE_URL]: EdsState;
[CDS_TYPE_URL]: CdsState;
@ -728,21 +283,19 @@ export class XdsClient {
private adsBackoff: BackoffTimeout;
private lrsBackoff: BackoffTimeout;
constructor(
targetName: string,
serviceConfigWatcher: Watcher<ServiceConfig>,
channelOptions: ChannelOptions
) {
constructor() {
const edsState = new EdsState(() => {
this.updateNames(EDS_TYPE_URL);
});
const cdsState = new CdsState(edsState, () => {
this.updateNames(CDS_TYPE_URL);
});
const rdsState = new RdsState(targetName, serviceConfigWatcher, () => {
const rdsState = new RdsState(() => {
this.updateNames(RDS_TYPE_URL);
});
const ldsState = new LdsState(targetName, rdsState);
const ldsState = new LdsState(rdsState, () => {
this.updateNames(LDS_TYPE_URL);
});
this.adsState = {
[EDS_TYPE_URL]: edsState,
[CDS_TYPE_URL]: cdsState,
@ -750,26 +303,10 @@ export class XdsClient {
[LDS_TYPE_URL]: ldsState,
};
const channelArgs = { ...channelOptions };
const channelArgsToRemove = [
/* The SSL target name override corresponds to the target, and this
* client has its own target */
'grpc.ssl_target_name_override',
/* The default authority also corresponds to the target */
'grpc.default_authority',
/* This client will have its own specific keepalive time setting */
'grpc.keepalive_time_ms',
/* The service config specifies the load balancing policy. This channel
* needs its own separate load balancing policy setting. In particular,
* recursively using an xDS load balancer for the xDS client would be
* bad */
'grpc.service_config',
];
for (const arg of channelArgsToRemove) {
delete channelArgs[arg];
const channelArgs = {
// 5 minutes
'grpc.keepalive_time_ms': 5 * 60 * 1000
}
// 5 minutes
channelArgs['grpc.keepalive_time_ms'] = 5 * 60 * 1000;
this.adsBackoff = new BackoffTimeout(() => {
this.maybeStartAdsStream();
@ -904,6 +441,12 @@ export class XdsClient {
if (this.hasShutdown) {
return;
}
if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) {
return;
}
trace('Starting ADS stream');
// Backoff relative to when we start the request
this.adsBackoff.runOnce();
@ -986,6 +529,16 @@ export class XdsClient {
}
private updateNames(typeUrl: AdsTypeUrl) {
if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) {
this.adsCall?.end();
this.lrsCall?.end();
return;
}
this.maybeStartAdsStream();
this.maybeStartLrsStream();
trace('Sending update for type URL ' + typeUrl + ' with names ' + this.adsState[typeUrl].getResourceNames());
this.adsCall?.write({
node: this.adsNode!,
@ -1013,18 +566,22 @@ export class XdsClient {
if (this.hasShutdown) {
return;
}
if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 &&
this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) {
return;
}
trace('Starting LRS stream');
this.lrsBackoff.runOnce();
this.lrsCall = this.lrsClient.streamLoadStats();
this.lrsCall.on('metadata', () => {
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
/* Once we get any response from the server, we assume that the stream is
* in a good state, so we can reset the backoff timer. */
this.lrsBackoff.stop();
this.lrsBackoff.reset();
});
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
if (
message.load_reporting_interval?.seconds !==
this.latestLrsSettings?.load_reporting_interval?.seconds ||
@ -1068,7 +625,6 @@ export class XdsClient {
if (!this.lrsCall) {
return;
}
trace('Sending LRS stats');
const clusterStats: ClusterStats[] = [];
for (const [
{ clusterName, edsServiceName },
@ -1129,6 +685,7 @@ export class XdsClient {
}
}
}
trace('Sending LRS stats ' + JSON.stringify(clusterStats, undefined, 2));
this.lrsCall.write({
node: this.lrsNode!,
cluster_stats: clusterStats,
@ -1157,10 +714,30 @@ export class XdsClient {
}
removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
trace('Watcher removed for endpoint ' + clusterName);
trace('Watcher removed for cluster ' + clusterName);
this.adsState[CDS_TYPE_URL].removeWatcher(clusterName, watcher);
}
addRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
trace('Watcher added for route ' + routeConfigName);
this.adsState[RDS_TYPE_URL].addWatcher(routeConfigName, watcher);
}
removeRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
trace('Watcher removed for route ' + routeConfigName);
this.adsState[RDS_TYPE_URL].removeWatcher(routeConfigName, watcher);
}
addListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
trace('Watcher added for listener ' + targetName);
this.adsState[LDS_TYPE_URL].addWatcher(targetName, watcher);
}
removeListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
trace('Watcher removed for listener ' + targetName);
this.adsState[LDS_TYPE_URL].removeWatcher(targetName, watcher);
}
/**
*
* @param lrsServer The target name of the server to send stats to. An empty
@ -1174,6 +751,7 @@ export class XdsClient {
clusterName: string,
edsServiceName: string
): XdsClusterDropStats {
trace('addClusterDropStats(lrsServer=' + lrsServer + ', clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')');
if (lrsServer !== '') {
return {
addCallDropped: (category) => {},
@ -1197,6 +775,7 @@ export class XdsClient {
edsServiceName: string,
locality: Locality__Output
): XdsClusterLocalityStats {
trace('addClusterLocalityStats(lrsServer=' + lrsServer + ', clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ', locality=' + JSON.stringify(locality) + ')');
if (lrsServer !== '') {
return {
addCallStarted: () => {},
@ -1243,7 +822,7 @@ export class XdsClient {
};
}
shutdown(): void {
private shutdown(): void {
this.adsCall?.cancel();
this.adsClient?.close();
this.lrsCall?.cancel();
@ -1251,3 +830,12 @@ export class XdsClient {
this.hasShutdown = true;
}
}
let singletonXdsClient: XdsClient | null = null;
export function getSingletonXdsClient(): XdsClient {
if (singletonXdsClient === null) {
singletonXdsClient = new XdsClient();
}
return singletonXdsClient;
}

View File

@ -0,0 +1,171 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { Cluster__Output } from "../generated/envoy/api/v2/Cluster";
import { EdsState } from "./eds-state";
import { Watcher, XdsStreamState } from "./xds-stream-state";
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
export class CdsState implements XdsStreamState<Cluster__Output> {
versionInfo = '';
nonce = '';
private watchers: Map<string, Watcher<Cluster__Output>[]> = new Map<
string,
Watcher<Cluster__Output>[]
>();
private latestResponses: Cluster__Output[] = [];
constructor(
private edsState: EdsState,
private updateResourceNames: () => void
) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param clusterName
* @param watcher
*/
addWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Adding CDS watcher for clusterName ' + clusterName);
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(clusterName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.name === clusterName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Removing CDS watcher for clusterName ' + clusterName);
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(clusterName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
private validateResponse(message: Cluster__Output): boolean {
if (message.type !== 'EDS') {
return false;
}
if (!message.eds_cluster_config?.eds_config?.ads) {
return false;
}
if (message.lb_policy !== 'ROUND_ROBIN') {
return false;
}
if (message.lrs_server) {
if (!message.lrs_server.self) {
return false;
}
}
return true;
}
/**
* Given a list of clusterNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
private handleMissingNames(allClusterNames: Set<string>) {
for (const [clusterName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(clusterName)) {
trace('Reporting CDS resource does not exist for clusterName ' + clusterName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: Cluster__Output[]): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('CDS validation failed for message ' + JSON.stringify(message));
return 'CDS Error: Cluster validation failed';
}
}
this.latestResponses = responses;
const allEdsServiceNames: Set<string> = new Set<string>();
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
allClusterNames.add(message.name);
const edsServiceName = message.eds_cluster_config?.service_name ?? '';
allEdsServiceNames.add(
edsServiceName === '' ? message.name : edsServiceName
);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received CDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
this.edsState.handleMissingNames(allEdsServiceNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

View File

@ -0,0 +1,174 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { isIPv4, isIPv6 } from "net";
import { ClusterLoadAssignment__Output } from "../generated/envoy/api/v2/ClusterLoadAssignment";
import { Watcher, XdsStreamState } from "./xds-stream-state";
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
public versionInfo = '';
public nonce = '';
private watchers: Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();
private latestResponses: ClusterLoadAssignment__Output[] = [];
constructor(private updateResourceNames: () => void) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param edsServiceName
* @param watcher
*/
addWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(edsServiceName, watchersEntry);
}
trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Removing EDS watcher for edsServiceName ' + edsServiceName);
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateResponse(message: ClusterLoadAssignment__Output) {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
return false;
}
}
}
return true;
}
/**
* Given a list of edsServiceNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
handleMissingNames(allEdsServiceNames: Set<string>) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allEdsServiceNames.has(edsServiceName)) {
trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: ClusterLoadAssignment__Output[]) {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('EDS validation failed for message ' + JSON.stringify(message));
return 'EDS Error: ClusterLoadAssignment validation failed';
}
}
this.latestResponses = responses;
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received EDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as protoLoader from '@grpc/proto-loader';
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { Listener__Output } from "../generated/envoy/api/v2/Listener";
import { RdsState } from "./rds-state";
import { Watcher, XdsStreamState } from "./xds-stream-state";
import { HttpConnectionManager__Output } from '../generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const HTTP_CONNECTION_MANGER_TYPE_URL =
'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager';
export class LdsState implements XdsStreamState<Listener__Output> {
versionInfo = '';
nonce = '';
private watchers: Map<string, Watcher<Listener__Output>[]> = new Map<string, Watcher<Listener__Output>[]>();
private latestResponses: Listener__Output[] = [];
constructor(private rdsState: RdsState, private updateResourceNames: () => void) {}
addWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
trace('Adding RDS watcher for targetName ' + targetName);
let watchersEntry = this.watchers.get(targetName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(targetName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.name === targetName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing RDS update for new watcher for targetName ' + targetName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(targetName: string, watcher: Watcher<Listener__Output>): void {
trace('Removing RDS watcher for targetName ' + targetName);
const watchersEntry = this.watchers.get(targetName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(targetName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
private validateResponse(message: Listener__Output): boolean {
if (
!(
message.api_listener?.api_listener &&
protoLoader.isAnyExtension(message.api_listener.api_listener) &&
message.api_listener?.api_listener['@type'] ===
HTTP_CONNECTION_MANGER_TYPE_URL
)
) {
return false;
}
const httpConnectionManager = message.api_listener
?.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
return !!httpConnectionManager.rds?.config_source?.ads;
case 'route_config':
return this.rdsState.validateResponse(httpConnectionManager.route_config!);
}
return false;
}
private handleMissingNames(allTargetNames: Set<string>) {
for (const [targetName, watcherList] of this.watchers.entries()) {
if (!allTargetNames.has(targetName)) {
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: Listener__Output[]): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('LDS validation failed for message ' + JSON.stringify(message));
return 'LDS Error: Route validation failed';
}
}
this.latestResponses = responses;
const allTargetNames = new Set<string>();
for (const message of responses) {
allTargetNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received RDS response with route config names ' + Array.from(allTargetNames));
this.handleMissingNames(allTargetNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

View File

@ -0,0 +1,191 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { GRPC_XDS_EXPERIMENTAL_ROUTING } from "../environment";
import { RouteConfiguration__Output } from "../generated/envoy/api/v2/RouteConfiguration";
import { CdsLoadBalancingConfig } from "../load-balancer-cds";
import { Watcher, XdsStreamState } from "./xds-stream-state";
import ServiceConfig = experimental.ServiceConfig;
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const SUPPORTED_PATH_SPECIFIERS = ['prefix', 'path', 'safe_regex'];
const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
'exact_match',
'safe_regex_match',
'range_match',
'present_match',
'prefix_match',
'suffix_match'];
const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header'];
export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
versionInfo = '';
nonce = '';
private watchers: Map<string, Watcher<RouteConfiguration__Output>[]> = new Map<string, Watcher<RouteConfiguration__Output>[]>();
private latestResponses: RouteConfiguration__Output[] = [];
constructor(private updateResourceNames: () => void) {}
addWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
trace('Adding RDS watcher for routeConfigName ' + routeConfigName);
let watchersEntry = this.watchers.get(routeConfigName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(routeConfigName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.name === routeConfigName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing RDS update for new watcher for routeConfigName ' + routeConfigName);
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>): void {
trace('Removing RDS watcher for routeConfigName ' + routeConfigName);
const watchersEntry = this.watchers.get(routeConfigName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(routeConfigName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
validateResponse(message: RouteConfiguration__Output): boolean {
if (GRPC_XDS_EXPERIMENTAL_ROUTING) {
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
for (const virtualHost of message.virtual_hosts) {
for (const domainPattern of virtualHost.domains) {
const starIndex = domainPattern.indexOf('*');
const lastStarIndex = domainPattern.lastIndexOf('*');
// A domain pattern can have at most one wildcard *
if (starIndex !== lastStarIndex) {
return false;
}
// A wildcard * can either be absent or at the beginning or end of the pattern
if (!(starIndex === -1 || starIndex === 0 || starIndex === domainPattern.length - 1)) {
return false;
}
}
for (const route of virtualHost.routes) {
const match = route.match;
if (!match) {
return false;
}
if (SUPPORTED_PATH_SPECIFIERS.indexOf(match.path_specifier) < 0) {
return false;
}
for (const headers of match.headers) {
if (SUPPPORTED_HEADER_MATCH_SPECIFIERS.indexOf(headers.header_match_specifier) < 0) {
return false;
}
}
if (route.action !== 'route') {
return false;
}
if ((route.route === undefined) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) {
return false;
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
let weightSum = 0;
for (const clusterWeight of route.route.weighted_clusters!.clusters) {
weightSum += clusterWeight.weight?.value ?? 0;
}
if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) {
return false;
}
}
}
}
return true;
} else {
return true;
}
}
private handleMissingNames(allRouteConfigNames: Set<string>) {
for (const [routeConfigName, watcherList] of this.watchers.entries()) {
if (!allRouteConfigNames.has(routeConfigName)) {
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: RouteConfiguration__Output[]): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('RDS validation failed for message ' + JSON.stringify(message));
return 'RDS Error: Route validation failed';
}
}
this.latestResponses = responses;
const allRouteConfigNames = new Set<string>();
for (const message of responses) {
allRouteConfigNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames));
this.handleMissingNames(allRouteConfigNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { StatusObject } from "@grpc/grpc-js";
export interface Watcher<UpdateType> {
onValidUpdate(update: UpdateType): void;
onTransientError(error: StatusObject): void;
onResourceDoesNotExist(): void;
}
export interface XdsStreamState<ResponseType> {
versionInfo: string;
nonce: string;
getResourceNames(): string[];
/**
* Returns a string containing the error details if the message should be nacked,
* or null if it should be acked.
* @param responses
*/
handleResponses(responses: ResponseType[]): string | null;
reportStreamError(status: StatusObject): void;
}

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.2.0",
"version": "1.2.5",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
@ -58,7 +58,6 @@
},
"dependencies": {
"@types/node": "^12.12.47",
"google-auth-library": "^6.1.1",
"semver": "^6.2.0"
},
"files": [

View File

@ -37,6 +37,10 @@ const {
NGHTTP2_CANCEL,
} = http2.constants;
interface NodeError extends Error {
code: string;
}
export type Deadline = Date | number;
export interface CallStreamOptions {
@ -202,6 +206,8 @@ export class Http2CallStream implements Call {
private listener: InterceptingListener | null = null;
private internalErrorMessage: string | null = null;
constructor(
private readonly methodName: string,
private readonly channel: ChannelImplementation,
@ -409,21 +415,8 @@ export class Http2CallStream implements Call {
);
}
const status: StatusObject = { code, details, metadata };
let finalStatus;
try {
// Attempt to assign final status.
finalStatus = this.filterStack.receiveTrailers(status);
} catch (error) {
// This is a no-op if the call was already ended when handling headers.
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata(),
});
return;
}
// This is a no-op if the call was already ended when handling headers.
this.endCall(finalStatus);
this.endCall(status);
}
attachHttp2Stream(
@ -518,66 +511,86 @@ export class Http2CallStream implements Call {
this.maybeOutputStatus();
});
stream.on('close', () => {
this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
/* If we have a final status with an OK status code, that means that
* we have received all of the messages and we have processed the
* trailers and the call completed successfully, so it doesn't matter
* how the stream ends after that */
if (this.finalStatus?.code === Status.OK) {
return;
}
let code: Status;
let details = '';
switch (stream.rstCode) {
case http2.constants.NGHTTP2_NO_ERROR:
/* If we get a NO_ERROR code and we already have a status, the
* stream completed properly and we just haven't fully processed
* it yet */
if (this.finalStatus !== null) {
return;
}
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
break;
case http2.constants.NGHTTP2_REFUSED_STREAM:
code = Status.UNAVAILABLE;
details = 'Stream refused by server';
break;
case http2.constants.NGHTTP2_CANCEL:
code = Status.CANCELLED;
details = 'Call cancelled';
break;
case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
code = Status.RESOURCE_EXHAUSTED;
details = 'Bandwidth exhausted';
break;
case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
code = Status.PERMISSION_DENIED;
details = 'Protocol not secure enough';
break;
case http2.constants.NGHTTP2_INTERNAL_ERROR:
code = Status.INTERNAL;
/* This error code was previously handled in the default case, and
* there are several instances of it online, so I wanted to
* preserve the original error message so that people find existing
* information in searches, but also include the more recognizable
* "Internal server error" message. */
details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
break;
default:
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
}
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({ code, details, metadata: new Metadata() });
/* Use process.next tick to ensure that this code happens after any
* "error" event that may be emitted at about the same time, so that
* we can bubble up the error message from that event. */
process.nextTick(() => {
this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
/* If we have a final status with an OK status code, that means that
* we have received all of the messages and we have processed the
* trailers and the call completed successfully, so it doesn't matter
* how the stream ends after that */
if (this.finalStatus?.code === Status.OK) {
return;
}
let code: Status;
let details = '';
switch (stream.rstCode) {
case http2.constants.NGHTTP2_NO_ERROR:
/* If we get a NO_ERROR code and we already have a status, the
* stream completed properly and we just haven't fully processed
* it yet */
if (this.finalStatus !== null) {
return;
}
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
break;
case http2.constants.NGHTTP2_REFUSED_STREAM:
code = Status.UNAVAILABLE;
details = 'Stream refused by server';
break;
case http2.constants.NGHTTP2_CANCEL:
code = Status.CANCELLED;
details = 'Call cancelled';
break;
case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
code = Status.RESOURCE_EXHAUSTED;
details = 'Bandwidth exhausted';
break;
case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
code = Status.PERMISSION_DENIED;
details = 'Protocol not secure enough';
break;
case http2.constants.NGHTTP2_INTERNAL_ERROR:
code = Status.INTERNAL;
if (this.internalErrorMessage === null) {
/* This error code was previously handled in the default case, and
* there are several instances of it online, so I wanted to
* preserve the original error message so that people find existing
* information in searches, but also include the more recognizable
* "Internal server error" message. */
details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
} else {
/* The "Received RST_STREAM with code ..." error is preserved
* here for continuity with errors reported online, but the
* error message at the end will probably be more relevant in
* most cases. */
details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalErrorMessage}`;
}
break;
default:
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
}
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({ code, details, metadata: new Metadata() });
});
});
stream.on('error', (err: Error) => {
stream.on('error', (err: NodeError) => {
/* We need an error handler here to stop "Uncaught Error" exceptions
* from bubbling up. However, errors here should all correspond to
* "close" events, where we will handle the error more granularly */
/* Specifically looking for stream errors that were *not* constructed
* from a RST_STREAM response here:
* https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
*/
if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
this.internalErrorMessage = err.message;
}
});
if (!this.pendingRead) {
stream.pause();
@ -630,7 +643,11 @@ export class Http2CallStream implements Call {
getDeadline(): Deadline {
if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
return this.options.parentCall.getDeadline();
const parentDeadline = this.options.parentCall.getDeadline();
const selfDeadline = this.options.deadline;
const parentDeadlineMsecs = parentDeadline instanceof Date ? parentDeadline.getTime() : parentDeadline;
const selfDeadlineMsecs = selfDeadline instanceof Date ? selfDeadline.getTime() : selfDeadline;
return Math.min(parentDeadlineMsecs, selfDeadlineMsecs);
} else {
return this.options.deadline;
}

View File

@ -19,7 +19,6 @@ import { ConnectionOptions, createSecureContext, PeerCertificate } from 'tls';
import { CallCredentials } from './call-credentials';
import { CIPHER_SUITES, getDefaultRootsData } from './tls-helpers';
import { GoogleAuth as GoogleAuthType } from 'google-auth-library';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function verifyIsBufferOrNull(obj: any, friendlyName: string): void {
@ -279,13 +278,3 @@ class ComposedChannelCredentialsImpl extends ChannelCredentials {
}
}
}
export function createGoogleDefaultCredentials(): ChannelCredentials {
const GoogleAuth = require('google-auth-library')
.GoogleAuth as typeof GoogleAuthType;
const sslCreds = ChannelCredentials.createSsl();
const googleAuthCreds = CallCredentials.createFromGoogleCredential(
new GoogleAuth()
);
return sslCreds.compose(googleAuthCreds);
}

View File

@ -33,7 +33,7 @@ import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
import { CompressionFilterFactory } from './compression-filter';
import { getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import { CallConfig, ConfigSelector, getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import { trace, log } from './logging';
import { SubchannelAddress } from './subchannel';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
@ -136,9 +136,18 @@ export class ChannelImplementation implements Channel {
private subchannelPool: SubchannelPool;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private currentPicker: Picker = new UnavailablePicker();
/**
* Calls queued up to get a call config. Should only be populated before the
* first time the resolver returns a result, which includes the ConfigSelector.
*/
private configSelectionQueue: Array<{
callStream: Http2CallStream;
callMetadata: Metadata;
}> = [];
private pickQueue: Array<{
callStream: Http2CallStream;
callMetadata: Metadata;
callConfig: CallConfig;
}> = [];
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
private defaultAuthority: string;
@ -152,6 +161,7 @@ export class ChannelImplementation implements Channel {
* is non-empty.
*/
private callRefTimer: NodeJS.Timer;
private configSelector: ConfigSelector | null = null;
constructor(
target: string,
private readonly credentials: ChannelCredentials,
@ -225,10 +235,10 @@ export class ChannelImplementation implements Channel {
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.currentPicker = picker;
const queueCopy = this.pickQueue.slice();
this.callRefTimer.unref?.();
this.pickQueue = [];
for (const { callStream, callMetadata } of queueCopy) {
this.tryPick(callStream, callMetadata);
this.callRefTimerUnref();
for (const { callStream, callMetadata, callConfig } of queueCopy) {
this.tryPick(callStream, callMetadata, callConfig);
}
this.updateState(connectivityState);
},
@ -242,7 +252,37 @@ export class ChannelImplementation implements Channel {
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
this.target,
channelControlHelper,
options
options,
(configSelector) => {
this.configSelector = configSelector;
/* We process the queue asynchronously to ensure that the corresponding
* load balancer update has completed. */
process.nextTick(() => {
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
this.callRefTimerUnref()
for (const {callStream, callMetadata} of localQueue) {
this.tryGetConfig(callStream, callMetadata);
}
this.configSelectionQueue = [];
});
},
(status) => {
if (this.configSelectionQueue.length > 0) {
trace(LogVerbosity.DEBUG, 'channel', 'Name resolution failed for target ' + uriToString(this.target) + ' with calls queued for config selection');
}
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
this.callRefTimerUnref();
for (const {callStream, callMetadata} of localQueue) {
if (callMetadata.getOptions().waitForReady) {
this.callRefTimerRef();
this.configSelectionQueue.push({callStream, callMetadata});
} else {
callStream.cancelWithStatus(status.code, status.details);
}
}
}
);
this.filterStackFactory = new FilterStackFactory([
new CallCredentialsFilterFactory(this),
@ -252,9 +292,25 @@ export class ChannelImplementation implements Channel {
]);
}
private pushPick(callStream: Http2CallStream, callMetadata: Metadata) {
this.callRefTimer.ref?.();
this.pickQueue.push({ callStream, callMetadata });
private callRefTimerRef() {
// If the hasRef function does not exist, always run the code
if (!this.callRefTimer.hasRef?.()) {
trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.ref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length);
this.callRefTimer.ref?.();
}
}
private callRefTimerUnref() {
// If the hasRef function does not exist, always run the code
if ((!this.callRefTimer.hasRef) || (this.callRefTimer.hasRef())) {
trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.unref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length);
this.callRefTimer.unref?.();
}
}
private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) {
this.pickQueue.push({ callStream, callMetadata, callConfig });
this.callRefTimerRef();
}
/**
@ -264,8 +320,8 @@ export class ChannelImplementation implements Channel {
* @param callStream
* @param callMetadata
*/
private tryPick(callStream: Http2CallStream, callMetadata: Metadata) {
const pickResult = this.currentPicker.pick({ metadata: callMetadata });
private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) {
const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation });
trace(
LogVerbosity.DEBUG,
'channel',
@ -301,7 +357,7 @@ export class ChannelImplementation implements Channel {
' has state ' +
ConnectivityState[pickResult.subchannel!.getConnectivityState()]
);
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
break;
}
/* We need to clone the callMetadata here because the transparent
@ -321,6 +377,7 @@ export class ChannelImplementation implements Channel {
);
/* If we reach this point, the call stream has started
* successfully */
callConfig.onCommitted?.();
pickResult.onCallStarted?.();
} catch (error) {
if (
@ -349,7 +406,7 @@ export class ChannelImplementation implements Channel {
(error as Error).message +
'. Retrying pick'
);
this.tryPick(callStream, callMetadata);
this.tryPick(callStream, callMetadata, callConfig);
} else {
trace(
LogVerbosity.INFO,
@ -378,7 +435,7 @@ export class ChannelImplementation implements Channel {
ConnectivityState[subchannelState] +
' after metadata filters. Retrying pick'
);
this.tryPick(callStream, callMetadata);
this.tryPick(callStream, callMetadata, callConfig);
}
},
(error: Error & { code: number }) => {
@ -392,11 +449,11 @@ export class ChannelImplementation implements Channel {
}
break;
case PickResultType.QUEUE:
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
break;
case PickResultType.TRANSIENT_FAILURE:
if (callMetadata.getOptions().waitForReady) {
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
} else {
callStream.cancelWithStatus(
pickResult.status!.code,
@ -451,8 +508,30 @@ export class ChannelImplementation implements Channel {
}
}
private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
if (this.configSelector === null) {
/* This branch will only be taken at the beginning of the channel's life,
* before the resolver ever returns a result. So, the
* ResolvingLoadBalancer may be idle and if so it needs to be kicked
* because it now has a pending request. */
this.resolvingLoadBalancer.exitIdle();
this.configSelectionQueue.push({
callStream: stream,
callMetadata: metadata
});
this.callRefTimerRef();
} else {
const callConfig = this.configSelector(stream.getMethod(), metadata);
if (callConfig.status === Status.OK) {
this.tryPick(stream, metadata, callConfig);
} else {
stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod());
}
}
}
_startCallStream(stream: Http2CallStream, metadata: Metadata) {
this.tryPick(stream, metadata.clone());
this.tryGetConfig(stream, metadata.clone());
}
close() {

View File

@ -347,10 +347,11 @@ class BaseInterceptingCall implements InterceptingCallInterface {
let serialized: Buffer;
try {
serialized = this.methodDefinition.requestSerialize(message);
this.call.sendMessageWithContext(context, serialized);
} catch (e) {
this.call.cancelWithStatus(Status.INTERNAL, `Request message serialization failure: ${e.message}`);
return;
}
this.call.sendMessageWithContext(context, serialized);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessage(message: any) {
@ -370,7 +371,6 @@ class BaseInterceptingCall implements InterceptingCallInterface {
let deserialized: any;
try {
deserialized = this.methodDefinition.responseDeserialize(message);
interceptingListener?.onReceiveMessage?.(deserialized);
} catch (e) {
readError = {
code: Status.INTERNAL,
@ -378,7 +378,9 @@ class BaseInterceptingCall implements InterceptingCallInterface {
metadata: new Metadata(),
};
this.call.cancelWithStatus(readError.code, readError.details);
return;
}
interceptingListener?.onReceiveMessage?.(deserialized);
},
onReceiveStatus: (status) => {
if (readError) {

View File

@ -56,10 +56,14 @@ export class DeadlineFilter extends BaseFilter implements Filter {
}
const now: number = new Date().getTime();
let timeout = this.deadline - now;
if (timeout < 0) {
timeout = 0;
}
if (this.deadline !== Infinity) {
if (timeout <= 0) {
process.nextTick(() => {
callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED,
'Deadline exceeded'
);
});
} else if (this.deadline !== Infinity) {
this.timer = setTimeout(() => {
callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED,

View File

@ -1,8 +1,7 @@
export { trace } from './logging';
export { Resolver, ResolverListener, registerResolver } from './resolver';
export { Resolver, ResolverListener, registerResolver, ConfigSelector } from './resolver';
export { GrpcUri, uriToString } from './uri-parser';
export { ServiceConfig } from './service-config';
export { createGoogleDefaultCredentials } from './channel-credentials';
export { BackoffTimeout } from './backoff-timeout';
export { LoadBalancer, LoadBalancingConfig, ChannelControlHelper, registerLoadBalancerType, getFirstUsableConfig, validateLoadBalancingConfig } from './load-balancer';
export { SubchannelAddress, subchannelAddressToString } from './subchannel';

View File

@ -128,14 +128,12 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
this.subchannelStateCounts[previousState] -= 1;
this.subchannelStateCounts[newState] += 1;
this.calculateAndUpdateState();
if (newState === ConnectivityState.TRANSIENT_FAILURE) {
this.channelControlHelper.requestReresolution();
}
if (
newState === ConnectivityState.TRANSIENT_FAILURE ||
newState === ConnectivityState.IDLE
) {
this.channelControlHelper.requestReresolution();
subchannel.startConnecting();
}
};

View File

@ -93,6 +93,15 @@ export interface ServiceClientConstructor {
service: ServiceDefinition;
}
/**
* Returns true, if given key is included in the blacklisted
* keys.
* @param key key for check, string.
*/
function isPrototypePolluted(key: string): Boolean {
return ['__proto__', 'prototype', 'constructor'].includes(key);
}
/**
* Creates a constructor for a client with the given methods, as specified in
* the methods argument. The resulting class will have an instance method for
@ -122,7 +131,7 @@ export function makeClientConstructor(
}
Object.keys(methods).forEach((name) => {
if (name === '__proto__') {
if (isPrototypePolluted(name)) {
return;
}
const attrs = methods[name];
@ -155,7 +164,7 @@ export function makeClientConstructor(
ServiceClientImpl.prototype[name] = methodFunc;
// Associate all provided attributes with the method
Object.assign(ServiceClientImpl.prototype[name], attrs);
if (attrs.originalName && attrs.originalName !== '__proto__') {
if (attrs.originalName && !isPrototypePolluted(attrs.originalName)) {
ServiceClientImpl.prototype[attrs.originalName] =
ServiceClientImpl.prototype[name];
}
@ -204,7 +213,7 @@ export function loadPackageDefinition(
if (Object.prototype.hasOwnProperty.call(packageDef, serviceFqn)) {
const service = packageDef[serviceFqn];
const nameComponents = serviceFqn.split('.');
if (nameComponents.some(comp => comp === '__proto__')) {
if (nameComponents.some((comp: string) => isPrototypePolluted(comp))) {
continue;
}
const serviceName = nameComponents[nameComponents.length - 1];

View File

@ -85,6 +85,7 @@ export interface DropCallPickResult extends PickResult {
export interface PickArgs {
metadata: Metadata;
extraPickInfo: {[key: string]: string};
}
/**

View File

@ -129,7 +129,7 @@ class DnsResolver implements Resolver {
if (this.ipResult !== null) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.listener.onSuccessfulResolution(this.ipResult!, null, null, {});
this.listener.onSuccessfulResolution(this.ipResult!, null, null, null, {});
});
return;
}
@ -192,6 +192,7 @@ class DnsResolver implements Resolver {
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
);
},
@ -237,6 +238,7 @@ class DnsResolver implements Resolver {
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
);
}
@ -262,6 +264,12 @@ class DnsResolver implements Resolver {
}
}
destroy() {
/* Do nothing. There is not a practical way to cancel in-flight DNS
* requests, and after this function is called we can expect that
* updateResolution will not be called again. */
}
/**
* Get the default authority for the given target. For IP targets, that is
* the IP address. For DNS targets, it is the hostname.

View File

@ -40,10 +40,15 @@ class UdsResolver implements Resolver {
this.addresses,
null,
null,
null,
{}
);
}
destroy() {
// This resolver owns no resources, so we do nothing here.
}
static getDefaultAuthority(target: GrpcUri): string {
return 'localhost';
}

View File

@ -15,13 +15,30 @@
*
*/
import { ServiceConfig } from './service-config';
import { MethodConfig, ServiceConfig } from './service-config';
import * as resolver_dns from './resolver-dns';
import * as resolver_uds from './resolver-uds';
import { StatusObject } from './call-stream';
import { SubchannelAddress } from './subchannel';
import { GrpcUri, uriToString } from './uri-parser';
import { ChannelOptions } from './channel-options';
import { Metadata } from './metadata';
import { Status } from './constants';
export interface CallConfig {
methodConfig: MethodConfig;
onCommitted?: () => void;
pickInformation: {[key: string]: string};
status: Status;
}
/**
* Selects a configuration for a method given the name and metadata. Defined in
* https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc
*/
export interface ConfigSelector {
(methodName: string, metadata: Metadata): CallConfig;
}
/**
* A listener object passed to the resolver's constructor that provides name
@ -41,6 +58,7 @@ export interface ResolverListener {
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
): void;
/**
@ -62,6 +80,11 @@ export interface Resolver {
* called synchronously with the constructor or updateResolution.
*/
updateResolution(): void;
/**
* Destroy the resolver. Should be called when the owning channel shuts down.
*/
destroy(): void;
}
export interface ResolverConstructor {

View File

@ -23,7 +23,7 @@ import {
} from './load-balancer';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { ConnectivityState } from './channel';
import { createResolver, Resolver } from './resolver';
import { ConfigSelector, createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
import { Picker, UnavailablePicker, QueuePicker } from './picker';
import { BackoffTimeout } from './backoff-timeout';
@ -46,6 +46,40 @@ function trace(text: string): void {
const DEFAULT_LOAD_BALANCER_NAME = 'pick_first';
function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSelector {
return function defaultConfigSelector(methodName: string, metadata: Metadata) {
const splitName = methodName.split('/').filter(x => x.length > 0);
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
for (const methodConfig of serviceConfig.methodConfig) {
for (const name of methodConfig.name) {
if (name.service === service && (name.method === undefined || name.method === method)) {
return {
methodConfig: methodConfig,
pickInformation: {},
status: Status.OK
};
}
}
}
}
return {
methodConfig: {name: []},
pickInformation: {},
status: Status.OK
};
}
}
export interface ResolutionCallback {
(configSelector: ConfigSelector): void;
}
export interface ResolutionFailureCallback {
(status: StatusObject): void;
}
export class ResolvingLoadBalancer implements LoadBalancer {
/**
* The resolver class constructed for the target address.
@ -93,7 +127,9 @@ export class ResolvingLoadBalancer implements LoadBalancer {
constructor(
private readonly target: GrpcUri,
private readonly channelControlHelper: ChannelControlHelper,
private readonly channelOptions: ChannelOptions
private readonly channelOptions: ChannelOptions,
private readonly onSuccessfulResolution: ResolutionCallback,
private readonly onFailedResolution: ResolutionFailureCallback
) {
if (channelOptions['grpc.service_config']) {
this.defaultServiceConfig = validateServiceConfig(
@ -134,6 +170,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
) => {
let workingServiceConfig: ServiceConfig | null = null;
@ -180,6 +217,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
loadBalancingConfig,
attributes
);
const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(configSelector ?? getDefaultConfigSelector(finalServiceConfig));
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);
@ -227,6 +266,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker(error)
);
this.onFailedResolution(error);
}
this.backoffTimeout.runOnce();
}
@ -257,6 +297,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
destroy() {
this.childLoadBalancer.destroy();
this.innerResolver.destroy();
this.updateState(ConnectivityState.SHUTDOWN, new UnavailablePicker());
}

View File

@ -24,4 +24,8 @@ describe('loadPackageDefinition', () => {
loadPackageDefinition({'__proto__.polluted': true} as any);
assert.notStrictEqual(({} as any).polluted, true);
});
it('Should not allow prototype pollution #2', () => {
loadPackageDefinition({'constructor.prototype.polluted': true} as any);
assert.notStrictEqual(({} as any).polluted, true);
});
});

View File

@ -394,6 +394,9 @@ describe('Name Resolver', () => {
return [];
}
destroy() {
}
static getDefaultAuthority(target: GrpcUri): string {
return 'other';
}

@ -1 +1 @@
Subproject commit 2514f0bd7da7e2af1bed4c5d1b84f031c4d12c10
Subproject commit 6aa539bf0195f188ff86efe6fb8bfa2b676cdd46

View File

@ -1,6 +1,6 @@
{
"name": "grpc-tools",
"version": "1.10.0",
"version": "1.11.1",
"author": "Google Inc.",
"description": "Tools for developing with gRPC on Node.js",
"homepage": "https://grpc.io/",

View File

@ -65,6 +65,10 @@ class NodeGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
coded_out.WriteRaw(code.data(), code.size());
return true;
}
uint64_t GetSupportedFeatures() const override {
return FEATURE_PROTO3_OPTIONAL;
}
};
int main(int argc, char* argv[]) {

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/proto-loader",
"version": "0.5.5",
"version": "0.5.6",
"author": "Google Inc.",
"contributors": [
{

View File

@ -395,6 +395,16 @@ export function loadSync(
return createPackageDefinition(root, options!);
}
export function fromJSON(
json: Protobuf.INamespace,
options?: Options
): PackageDefinition {
options = options || {};
const loadedRoot = Protobuf.Root.fromJSON(json);
loadedRoot.resolveAll();
return createPackageDefinition(loadedRoot, options!);
}
export function loadFileDescriptorSetFromBuffer(
descriptorSet: Buffer,
options?: Options

View File

@ -102,6 +102,15 @@ describe('Descriptor types', () => {
proto_loader.loadSync(`${TEST_PROTO_DIR}/well_known.proto`);
});
it('Can load JSON descriptors', () => {
// This is protobuf.js JSON descriptor
// https://github.com/protobufjs/protobuf.js#using-json-descriptors
const buffer = readFileSync(`${TEST_PROTO_DIR}/rpc.proto.json`);
const json = JSON.parse(buffer.toString());
// This will throw if the rpc descriptor JSON cannot be decoded
proto_loader.fromJSON(json);
});
it('Can load binary-encoded proto file descriptor sets', () => {
const buffer = readFileSync(`${TEST_PROTO_DIR}/rpc.desc.bin`);
// This will throw if the rpc descriptor cannot be decoded

File diff suppressed because it is too large Load Diff