Merge pull request #1552 from murgatroid99/grpc-js_xds_interop_client

grpc-js: Enable the xds URL scheme and add an interop test for it
This commit is contained in:
Michael Lumish 2020-09-10 10:25:59 -07:00 committed by GitHub
commit 5e0feb5d74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1948 additions and 109 deletions

View File

@ -0,0 +1,26 @@
// Original file: proto/grpc/testing/messages.proto
/**
* TODO(dgq): Go back to using well-known types once
* https://github.com/grpc/grpc/issues/6980 has been fixed.
* import "google/protobuf/wrappers.proto";
*/
export interface BoolValue {
/**
* The bool value.
*/
'value'?: (boolean);
}
/**
* TODO(dgq): Go back to using well-known types once
* https://github.com/grpc/grpc/issues/6980 has been fixed.
* import "google/protobuf/wrappers.proto";
*/
export interface BoolValue__Output {
/**
* The bool value.
*/
'value': (boolean);
}

View File

@ -0,0 +1,20 @@
// Original file: proto/grpc/testing/messages.proto
/**
* A protobuf representation for grpc status. This is used by test
* clients to specify a status that the server should attempt to return.
*/
export interface EchoStatus {
'code'?: (number);
'message'?: (string);
}
/**
* A protobuf representation for grpc status. This is used by test
* clients to specify a status that the server should attempt to return.
*/
export interface EchoStatus__Output {
'code': (number);
'message': (string);
}

View File

@ -0,0 +1,26 @@
// Original file: proto/grpc/testing/empty.proto
/**
* An empty message that you can re-use to avoid defining duplicated empty
* messages in your project. A typical example is to use it as argument or the
* return value of a service API. For instance:
*
* service Foo {
* rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { };
* };
*/
export interface Empty {
}
/**
* An empty message that you can re-use to avoid defining duplicated empty
* messages in your project. A typical example is to use it as argument or the
* return value of a service API. For instance:
*
* service Foo {
* rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { };
* };
*/
export interface Empty__Output {
}

View File

@ -0,0 +1,24 @@
// Original file: proto/grpc/testing/messages.proto
/**
* The type of route that a client took to reach a server w.r.t. gRPCLB.
* The server must fill in "fallback" if it detects that the RPC reached
* the server via the "gRPCLB fallback" path, and "backend" if it detects
* that the RPC reached the server via "gRPCLB backend" path (i.e. if it got
* the address of this server from the gRPCLB server BalanceLoad RPC). Exactly
* how this detection is done is context and server dependent.
*/
export enum GrpclbRouteType {
/**
* Server didn't detect the route that a client took to reach it.
*/
GRPCLB_ROUTE_TYPE_UNKNOWN = 0,
/**
* Indicates that a client reached a server via gRPCLB fallback.
*/
GRPCLB_ROUTE_TYPE_FALLBACK = 1,
/**
* Indicates that a client reached a server as a gRPCLB-given backend.
*/
GRPCLB_ROUTE_TYPE_BACKEND = 2,
}

View File

@ -0,0 +1,24 @@
// Original file: proto/grpc/testing/messages.proto
export interface LoadBalancerStatsRequest {
/**
* Request stats for the next num_rpcs sent by client.
*/
'num_rpcs'?: (number);
/**
* If num_rpcs have not completed within timeout_sec, return partial results.
*/
'timeout_sec'?: (number);
}
export interface LoadBalancerStatsRequest__Output {
/**
* Request stats for the next num_rpcs sent by client.
*/
'num_rpcs': (number);
/**
* If num_rpcs have not completed within timeout_sec, return partial results.
*/
'timeout_sec': (number);
}

View File

@ -0,0 +1,40 @@
// Original file: proto/grpc/testing/messages.proto
export interface _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer {
/**
* The number of completed RPCs for each peer.
*/
'rpcs_by_peer'?: ({[key: string]: number});
}
export interface _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer__Output {
/**
* The number of completed RPCs for each peer.
*/
'rpcs_by_peer': ({[key: string]: number});
}
export interface LoadBalancerStatsResponse {
/**
* The number of completed RPCs for each peer.
*/
'rpcs_by_peer'?: ({[key: string]: number});
/**
* The number of RPCs that failed to record a remote peer.
*/
'num_failures'?: (number);
'rpcs_by_method'?: ({[key: string]: _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer});
}
export interface LoadBalancerStatsResponse__Output {
/**
* The number of completed RPCs for each peer.
*/
'rpcs_by_peer': ({[key: string]: number});
/**
* The number of RPCs that failed to record a remote peer.
*/
'num_failures': (number);
'rpcs_by_method'?: ({[key: string]: _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer__Output});
}

View File

@ -0,0 +1,37 @@
// Original file: proto/grpc/testing/test.proto
import * as grpc from '../../../../src'
import { LoadBalancerStatsRequest as _grpc_testing_LoadBalancerStatsRequest, LoadBalancerStatsRequest__Output as _grpc_testing_LoadBalancerStatsRequest__Output } from '../../grpc/testing/LoadBalancerStatsRequest';
import { LoadBalancerStatsResponse as _grpc_testing_LoadBalancerStatsResponse, LoadBalancerStatsResponse__Output as _grpc_testing_LoadBalancerStatsResponse__Output } from '../../grpc/testing/LoadBalancerStatsResponse';
/**
* A service used to obtain stats for verifying LB behavior.
*/
export interface LoadBalancerStatsServiceClient extends grpc.Client {
/**
* Gets the backend distribution for RPCs sent by a test client.
*/
GetClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall;
GetClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall;
GetClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall;
GetClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall;
/**
* Gets the backend distribution for RPCs sent by a test client.
*/
getClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall;
getClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall;
getClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall;
getClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall;
}
/**
* A service used to obtain stats for verifying LB behavior.
*/
export interface LoadBalancerStatsServiceHandlers extends grpc.UntypedServiceImplementation {
/**
* Gets the backend distribution for RPCs sent by a test client.
*/
GetClientStats(call: grpc.ServerUnaryCall<_grpc_testing_LoadBalancerStatsRequest__Output, _grpc_testing_LoadBalancerStatsResponse>, callback: grpc.sendUnaryData<_grpc_testing_LoadBalancerStatsResponse>): void;
}

View File

@ -0,0 +1,31 @@
// Original file: proto/grpc/testing/messages.proto
import { PayloadType as _grpc_testing_PayloadType } from '../../grpc/testing/PayloadType';
/**
* A block of data, to simply increase gRPC message size.
*/
export interface Payload {
/**
* The type of data in body.
*/
'type'?: (_grpc_testing_PayloadType | keyof typeof _grpc_testing_PayloadType);
/**
* Primary contents of payload.
*/
'body'?: (Buffer | Uint8Array | string);
}
/**
* A block of data, to simply increase gRPC message size.
*/
export interface Payload__Output {
/**
* The type of data in body.
*/
'type': (keyof typeof _grpc_testing_PayloadType);
/**
* Primary contents of payload.
*/
'body': (Buffer);
}

View File

@ -0,0 +1,11 @@
// Original file: proto/grpc/testing/messages.proto
/**
* The type of payload that should be returned.
*/
export enum PayloadType {
/**
* Compressable text format.
*/
COMPRESSABLE = 0,
}

View File

@ -0,0 +1,22 @@
// Original file: proto/grpc/testing/messages.proto
/**
* For reconnect interop test only.
* Server tells client whether its reconnects are following the spec and the
* reconnect backoffs it saw.
*/
export interface ReconnectInfo {
'passed'?: (boolean);
'backoff_ms'?: (number)[];
}
/**
* For reconnect interop test only.
* Server tells client whether its reconnects are following the spec and the
* reconnect backoffs it saw.
*/
export interface ReconnectInfo__Output {
'passed': (boolean);
'backoff_ms': (number)[];
}

View File

@ -0,0 +1,18 @@
// Original file: proto/grpc/testing/messages.proto
/**
* For reconnect interop test only.
* Client tells server what reconnection parameters it used.
*/
export interface ReconnectParams {
'max_reconnect_backoff_ms'?: (number);
}
/**
* For reconnect interop test only.
* Client tells server what reconnection parameters it used.
*/
export interface ReconnectParams__Output {
'max_reconnect_backoff_ms': (number);
}

View File

@ -0,0 +1,40 @@
// Original file: proto/grpc/testing/test.proto
import * as grpc from '../../../../src'
import { Empty as _grpc_testing_Empty, Empty__Output as _grpc_testing_Empty__Output } from '../../grpc/testing/Empty';
import { ReconnectInfo as _grpc_testing_ReconnectInfo, ReconnectInfo__Output as _grpc_testing_ReconnectInfo__Output } from '../../grpc/testing/ReconnectInfo';
import { ReconnectParams as _grpc_testing_ReconnectParams, ReconnectParams__Output as _grpc_testing_ReconnectParams__Output } from '../../grpc/testing/ReconnectParams';
/**
* A service used to control reconnect server.
*/
export interface ReconnectServiceClient extends grpc.Client {
Start(argument: _grpc_testing_ReconnectParams, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
Start(argument: _grpc_testing_ReconnectParams, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
Start(argument: _grpc_testing_ReconnectParams, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
Start(argument: _grpc_testing_ReconnectParams, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
start(argument: _grpc_testing_ReconnectParams, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
start(argument: _grpc_testing_ReconnectParams, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
start(argument: _grpc_testing_ReconnectParams, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
start(argument: _grpc_testing_ReconnectParams, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
Stop(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall;
Stop(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall;
Stop(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall;
Stop(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall;
stop(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall;
stop(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall;
stop(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall;
stop(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall;
}
/**
* A service used to control reconnect server.
*/
export interface ReconnectServiceHandlers extends grpc.UntypedServiceImplementation {
Start(call: grpc.ServerUnaryCall<_grpc_testing_ReconnectParams__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void;
Stop(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_ReconnectInfo>, callback: grpc.sendUnaryData<_grpc_testing_ReconnectInfo>): void;
}

View File

@ -0,0 +1,47 @@
// Original file: proto/grpc/testing/messages.proto
import { BoolValue as _grpc_testing_BoolValue, BoolValue__Output as _grpc_testing_BoolValue__Output } from '../../grpc/testing/BoolValue';
/**
* Configuration for a particular response.
*/
export interface ResponseParameters {
/**
* Desired payload sizes in responses from the server.
*/
'size'?: (number);
/**
* Desired interval between consecutive responses in the response stream in
* microseconds.
*/
'interval_us'?: (number);
/**
* Whether to request the server to compress the response. This field is
* "nullable" in order to interoperate seamlessly with clients not able to
* implement the full compression tests by introspecting the call to verify
* the response's compression status.
*/
'compressed'?: (_grpc_testing_BoolValue);
}
/**
* Configuration for a particular response.
*/
export interface ResponseParameters__Output {
/**
* Desired payload sizes in responses from the server.
*/
'size': (number);
/**
* Desired interval between consecutive responses in the response stream in
* microseconds.
*/
'interval_us': (number);
/**
* Whether to request the server to compress the response. This field is
* "nullable" in order to interoperate seamlessly with clients not able to
* implement the full compression tests by introspecting the call to verify
* the response's compression status.
*/
'compressed'?: (_grpc_testing_BoolValue__Output);
}

View File

@ -0,0 +1,106 @@
// Original file: proto/grpc/testing/messages.proto
import { PayloadType as _grpc_testing_PayloadType } from '../../grpc/testing/PayloadType';
import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload';
import { BoolValue as _grpc_testing_BoolValue, BoolValue__Output as _grpc_testing_BoolValue__Output } from '../../grpc/testing/BoolValue';
import { EchoStatus as _grpc_testing_EchoStatus, EchoStatus__Output as _grpc_testing_EchoStatus__Output } from '../../grpc/testing/EchoStatus';
/**
* Unary request.
*/
export interface SimpleRequest {
/**
* Desired payload type in the response from the server.
* If response_type is RANDOM, server randomly chooses one from other formats.
*/
'response_type'?: (_grpc_testing_PayloadType | keyof typeof _grpc_testing_PayloadType);
/**
* Desired payload size in the response from the server.
*/
'response_size'?: (number);
/**
* Optional input payload sent along with the request.
*/
'payload'?: (_grpc_testing_Payload);
/**
* Whether SimpleResponse should include username.
*/
'fill_username'?: (boolean);
/**
* Whether SimpleResponse should include OAuth scope.
*/
'fill_oauth_scope'?: (boolean);
/**
* Whether to request the server to compress the response. This field is
* "nullable" in order to interoperate seamlessly with clients not able to
* implement the full compression tests by introspecting the call to verify
* the response's compression status.
*/
'response_compressed'?: (_grpc_testing_BoolValue);
/**
* Whether server should return a given status
*/
'response_status'?: (_grpc_testing_EchoStatus);
/**
* Whether the server should expect this request to be compressed.
*/
'expect_compressed'?: (_grpc_testing_BoolValue);
/**
* Whether SimpleResponse should include server_id.
*/
'fill_server_id'?: (boolean);
/**
* Whether SimpleResponse should include grpclb_route_type.
*/
'fill_grpclb_route_type'?: (boolean);
}
/**
* Unary request.
*/
export interface SimpleRequest__Output {
/**
* Desired payload type in the response from the server.
* If response_type is RANDOM, server randomly chooses one from other formats.
*/
'response_type': (keyof typeof _grpc_testing_PayloadType);
/**
* Desired payload size in the response from the server.
*/
'response_size': (number);
/**
* Optional input payload sent along with the request.
*/
'payload'?: (_grpc_testing_Payload__Output);
/**
* Whether SimpleResponse should include username.
*/
'fill_username': (boolean);
/**
* Whether SimpleResponse should include OAuth scope.
*/
'fill_oauth_scope': (boolean);
/**
* Whether to request the server to compress the response. This field is
* "nullable" in order to interoperate seamlessly with clients not able to
* implement the full compression tests by introspecting the call to verify
* the response's compression status.
*/
'response_compressed'?: (_grpc_testing_BoolValue__Output);
/**
* Whether server should return a given status
*/
'response_status'?: (_grpc_testing_EchoStatus__Output);
/**
* Whether the server should expect this request to be compressed.
*/
'expect_compressed'?: (_grpc_testing_BoolValue__Output);
/**
* Whether SimpleResponse should include server_id.
*/
'fill_server_id': (boolean);
/**
* Whether SimpleResponse should include grpclb_route_type.
*/
'fill_grpclb_route_type': (boolean);
}

View File

@ -0,0 +1,68 @@
// Original file: proto/grpc/testing/messages.proto
import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload';
import { GrpclbRouteType as _grpc_testing_GrpclbRouteType } from '../../grpc/testing/GrpclbRouteType';
/**
* Unary response, as configured by the request.
*/
export interface SimpleResponse {
/**
* Payload to increase message size.
*/
'payload'?: (_grpc_testing_Payload);
/**
* The user the request came from, for verifying authentication was
* successful when the client expected it.
*/
'username'?: (string);
/**
* OAuth scope.
*/
'oauth_scope'?: (string);
/**
* Server ID. This must be unique among different server instances,
* but the same across all RPC's made to a particular server instance.
*/
'server_id'?: (string);
/**
* gRPCLB Path.
*/
'grpclb_route_type'?: (_grpc_testing_GrpclbRouteType | keyof typeof _grpc_testing_GrpclbRouteType);
/**
* Server hostname.
*/
'hostname'?: (string);
}
/**
* Unary response, as configured by the request.
*/
export interface SimpleResponse__Output {
/**
* Payload to increase message size.
*/
'payload'?: (_grpc_testing_Payload__Output);
/**
* The user the request came from, for verifying authentication was
* successful when the client expected it.
*/
'username': (string);
/**
* OAuth scope.
*/
'oauth_scope': (string);
/**
* Server ID. This must be unique among different server instances,
* but the same across all RPC's made to a particular server instance.
*/
'server_id': (string);
/**
* gRPCLB Path.
*/
'grpclb_route_type': (keyof typeof _grpc_testing_GrpclbRouteType);
/**
* Server hostname.
*/
'hostname': (string);
}

View File

@ -0,0 +1,38 @@
// Original file: proto/grpc/testing/messages.proto
import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload';
import { BoolValue as _grpc_testing_BoolValue, BoolValue__Output as _grpc_testing_BoolValue__Output } from '../../grpc/testing/BoolValue';
/**
* Client-streaming request.
*/
export interface StreamingInputCallRequest {
/**
* Optional input payload sent along with the request.
*/
'payload'?: (_grpc_testing_Payload);
/**
* Whether the server should expect this request to be compressed. This field
* is "nullable" in order to interoperate seamlessly with servers not able to
* implement the full compression tests by introspecting the call to verify
* the request's compression status.
*/
'expect_compressed'?: (_grpc_testing_BoolValue);
}
/**
* Client-streaming request.
*/
export interface StreamingInputCallRequest__Output {
/**
* Optional input payload sent along with the request.
*/
'payload'?: (_grpc_testing_Payload__Output);
/**
* Whether the server should expect this request to be compressed. This field
* is "nullable" in order to interoperate seamlessly with servers not able to
* implement the full compression tests by introspecting the call to verify
* the request's compression status.
*/
'expect_compressed'?: (_grpc_testing_BoolValue__Output);
}

View File

@ -0,0 +1,22 @@
// Original file: proto/grpc/testing/messages.proto
/**
* Client-streaming response.
*/
export interface StreamingInputCallResponse {
/**
* Aggregated size of payloads received from the client.
*/
'aggregated_payload_size'?: (number);
}
/**
* Client-streaming response.
*/
export interface StreamingInputCallResponse__Output {
/**
* Aggregated size of payloads received from the client.
*/
'aggregated_payload_size': (number);
}

View File

@ -0,0 +1,56 @@
// Original file: proto/grpc/testing/messages.proto
import { PayloadType as _grpc_testing_PayloadType } from '../../grpc/testing/PayloadType';
import { ResponseParameters as _grpc_testing_ResponseParameters, ResponseParameters__Output as _grpc_testing_ResponseParameters__Output } from '../../grpc/testing/ResponseParameters';
import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload';
import { EchoStatus as _grpc_testing_EchoStatus, EchoStatus__Output as _grpc_testing_EchoStatus__Output } from '../../grpc/testing/EchoStatus';
/**
* Server-streaming request.
*/
export interface StreamingOutputCallRequest {
/**
* Desired payload type in the response from the server.
* If response_type is RANDOM, the payload from each response in the stream
* might be of different types. This is to simulate a mixed type of payload
* stream.
*/
'response_type'?: (_grpc_testing_PayloadType | keyof typeof _grpc_testing_PayloadType);
/**
* Configuration for each expected response message.
*/
'response_parameters'?: (_grpc_testing_ResponseParameters)[];
/**
* Optional input payload sent along with the request.
*/
'payload'?: (_grpc_testing_Payload);
/**
* Whether server should return a given status
*/
'response_status'?: (_grpc_testing_EchoStatus);
}
/**
* Server-streaming request.
*/
export interface StreamingOutputCallRequest__Output {
/**
* Desired payload type in the response from the server.
* If response_type is RANDOM, the payload from each response in the stream
* might be of different types. This is to simulate a mixed type of payload
* stream.
*/
'response_type': (keyof typeof _grpc_testing_PayloadType);
/**
* Configuration for each expected response message.
*/
'response_parameters': (_grpc_testing_ResponseParameters__Output)[];
/**
* Optional input payload sent along with the request.
*/
'payload'?: (_grpc_testing_Payload__Output);
/**
* Whether server should return a given status
*/
'response_status'?: (_grpc_testing_EchoStatus__Output);
}

View File

@ -0,0 +1,23 @@
// Original file: proto/grpc/testing/messages.proto
import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload';
/**
* Server-streaming response, as configured by the request and parameters.
*/
export interface StreamingOutputCallResponse {
/**
* Payload to increase response size.
*/
'payload'?: (_grpc_testing_Payload);
}
/**
* Server-streaming response, as configured by the request and parameters.
*/
export interface StreamingOutputCallResponse__Output {
/**
* Payload to increase response size.
*/
'payload'?: (_grpc_testing_Payload__Output);
}

View File

@ -0,0 +1,202 @@
// Original file: proto/grpc/testing/test.proto
import * as grpc from '../../../../src'
import { Empty as _grpc_testing_Empty, Empty__Output as _grpc_testing_Empty__Output } from '../../grpc/testing/Empty';
import { SimpleRequest as _grpc_testing_SimpleRequest, SimpleRequest__Output as _grpc_testing_SimpleRequest__Output } from '../../grpc/testing/SimpleRequest';
import { SimpleResponse as _grpc_testing_SimpleResponse, SimpleResponse__Output as _grpc_testing_SimpleResponse__Output } from '../../grpc/testing/SimpleResponse';
import { StreamingInputCallRequest as _grpc_testing_StreamingInputCallRequest, StreamingInputCallRequest__Output as _grpc_testing_StreamingInputCallRequest__Output } from '../../grpc/testing/StreamingInputCallRequest';
import { StreamingInputCallResponse as _grpc_testing_StreamingInputCallResponse, StreamingInputCallResponse__Output as _grpc_testing_StreamingInputCallResponse__Output } from '../../grpc/testing/StreamingInputCallResponse';
import { StreamingOutputCallRequest as _grpc_testing_StreamingOutputCallRequest, StreamingOutputCallRequest__Output as _grpc_testing_StreamingOutputCallRequest__Output } from '../../grpc/testing/StreamingOutputCallRequest';
import { StreamingOutputCallResponse as _grpc_testing_StreamingOutputCallResponse, StreamingOutputCallResponse__Output as _grpc_testing_StreamingOutputCallResponse__Output } from '../../grpc/testing/StreamingOutputCallResponse';
/**
* A simple service to test the various types of RPCs and experiment with
* performance with various types of payload.
*/
export interface TestServiceClient extends grpc.Client {
/**
* One request followed by one response. Response has cache control
* headers set such that a caching HTTP proxy (such as GFE) can
* satisfy subsequent requests.
*/
CacheableUnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
CacheableUnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
CacheableUnaryCall(argument: _grpc_testing_SimpleRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
CacheableUnaryCall(argument: _grpc_testing_SimpleRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
/**
* One request followed by one response. Response has cache control
* headers set such that a caching HTTP proxy (such as GFE) can
* satisfy subsequent requests.
*/
cacheableUnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
cacheableUnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
cacheableUnaryCall(argument: _grpc_testing_SimpleRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
cacheableUnaryCall(argument: _grpc_testing_SimpleRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
/**
* One empty request followed by one empty response.
*/
EmptyCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
EmptyCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
EmptyCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
EmptyCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
/**
* One empty request followed by one empty response.
*/
emptyCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
emptyCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
emptyCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
emptyCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
/**
* A sequence of requests with each request served by the server immediately.
* As one request could lead to multiple responses, this interface
* demonstrates the idea of full duplexing.
*/
FullDuplexCall(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>;
FullDuplexCall(options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>;
/**
* A sequence of requests with each request served by the server immediately.
* As one request could lead to multiple responses, this interface
* demonstrates the idea of full duplexing.
*/
fullDuplexCall(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>;
fullDuplexCall(options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>;
/**
* A sequence of requests followed by a sequence of responses.
* The server buffers all the client requests and then serves them in order. A
* stream of responses are returned to the client when the server starts with
* first request.
*/
HalfDuplexCall(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>;
HalfDuplexCall(options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>;
/**
* A sequence of requests followed by a sequence of responses.
* The server buffers all the client requests and then serves them in order. A
* stream of responses are returned to the client when the server starts with
* first request.
*/
halfDuplexCall(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>;
halfDuplexCall(options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>;
/**
* A sequence of requests followed by one response (streamed upload).
* The server returns the aggregated size of client payload as the result.
*/
StreamingInputCall(metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>;
StreamingInputCall(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>;
StreamingInputCall(options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>;
StreamingInputCall(callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>;
/**
* A sequence of requests followed by one response (streamed upload).
* The server returns the aggregated size of client payload as the result.
*/
streamingInputCall(metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>;
streamingInputCall(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>;
streamingInputCall(options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>;
streamingInputCall(callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>;
/**
* One request followed by a sequence of responses (streamed download).
* The server returns the payload with client desired type and sizes.
*/
StreamingOutputCall(argument: _grpc_testing_StreamingOutputCallRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_grpc_testing_StreamingOutputCallResponse__Output>;
StreamingOutputCall(argument: _grpc_testing_StreamingOutputCallRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_grpc_testing_StreamingOutputCallResponse__Output>;
/**
* One request followed by a sequence of responses (streamed download).
* The server returns the payload with client desired type and sizes.
*/
streamingOutputCall(argument: _grpc_testing_StreamingOutputCallRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_grpc_testing_StreamingOutputCallResponse__Output>;
streamingOutputCall(argument: _grpc_testing_StreamingOutputCallRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_grpc_testing_StreamingOutputCallResponse__Output>;
/**
* One request followed by one response.
*/
UnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
UnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
UnaryCall(argument: _grpc_testing_SimpleRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
UnaryCall(argument: _grpc_testing_SimpleRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
/**
* One request followed by one response.
*/
unaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
unaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
unaryCall(argument: _grpc_testing_SimpleRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
unaryCall(argument: _grpc_testing_SimpleRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall;
/**
* The test server will not implement this method. It will be used
* to test the behavior when clients call unimplemented methods.
*/
UnimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
UnimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
UnimplementedCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
UnimplementedCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
/**
* The test server will not implement this method. It will be used
* to test the behavior when clients call unimplemented methods.
*/
unimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
unimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
unimplementedCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
unimplementedCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
}
/**
* A simple service to test the various types of RPCs and experiment with
* performance with various types of payload.
*/
export interface TestServiceHandlers extends grpc.UntypedServiceImplementation {
/**
* One request followed by one response. Response has cache control
* headers set such that a caching HTTP proxy (such as GFE) can
* satisfy subsequent requests.
*/
CacheableUnaryCall(call: grpc.ServerUnaryCall<_grpc_testing_SimpleRequest__Output, _grpc_testing_SimpleResponse>, callback: grpc.sendUnaryData<_grpc_testing_SimpleResponse>): void;
/**
* One empty request followed by one empty response.
*/
EmptyCall(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void;
/**
* A sequence of requests with each request served by the server immediately.
* As one request could lead to multiple responses, this interface
* demonstrates the idea of full duplexing.
*/
FullDuplexCall(call: grpc.ServerDuplexStream<_grpc_testing_StreamingOutputCallRequest__Output, _grpc_testing_StreamingOutputCallResponse>): void;
/**
* A sequence of requests followed by a sequence of responses.
* The server buffers all the client requests and then serves them in order. A
* stream of responses are returned to the client when the server starts with
* first request.
*/
HalfDuplexCall(call: grpc.ServerDuplexStream<_grpc_testing_StreamingOutputCallRequest__Output, _grpc_testing_StreamingOutputCallResponse>): void;
/**
* A sequence of requests followed by one response (streamed upload).
* The server returns the aggregated size of client payload as the result.
*/
StreamingInputCall(call: grpc.ServerReadableStream<_grpc_testing_StreamingInputCallRequest__Output, _grpc_testing_StreamingInputCallResponse>, callback: grpc.sendUnaryData<_grpc_testing_StreamingInputCallResponse>): void;
/**
* One request followed by a sequence of responses (streamed download).
* The server returns the payload with client desired type and sizes.
*/
StreamingOutputCall(call: grpc.ServerWritableStream<_grpc_testing_StreamingOutputCallRequest__Output, _grpc_testing_StreamingOutputCallResponse>): void;
/**
* One request followed by one response.
*/
UnaryCall(call: grpc.ServerUnaryCall<_grpc_testing_SimpleRequest__Output, _grpc_testing_SimpleResponse>, callback: grpc.sendUnaryData<_grpc_testing_SimpleResponse>): void;
/**
* The test server will not implement this method. It will be used
* to test the behavior when clients call unimplemented methods.
*/
UnimplementedCall(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void;
}

View File

@ -0,0 +1,38 @@
// Original file: proto/grpc/testing/test.proto
import * as grpc from '../../../../src'
import { Empty as _grpc_testing_Empty, Empty__Output as _grpc_testing_Empty__Output } from '../../grpc/testing/Empty';
/**
* A simple service NOT implemented at servers so clients can test for
* that case.
*/
export interface UnimplementedServiceClient extends grpc.Client {
/**
* A call that no server should implement
*/
UnimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
UnimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
UnimplementedCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
UnimplementedCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
/**
* A call that no server should implement
*/
unimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
unimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
unimplementedCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
unimplementedCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
}
/**
* A simple service NOT implemented at servers so clients can test for
* that case.
*/
export interface UnimplementedServiceHandlers extends grpc.UntypedServiceImplementation {
/**
* A call that no server should implement
*/
UnimplementedCall(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void;
}

View File

@ -0,0 +1,38 @@
// Original file: proto/grpc/testing/test.proto
import * as grpc from '../../../../src'
import { Empty as _grpc_testing_Empty, Empty__Output as _grpc_testing_Empty__Output } from '../../grpc/testing/Empty';
/**
* A service to remotely control health status of an xDS test server.
*/
export interface XdsUpdateHealthServiceClient extends grpc.Client {
SetNotServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
SetNotServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
SetNotServing(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
SetNotServing(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
setNotServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
setNotServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
setNotServing(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
setNotServing(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
SetServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
SetServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
SetServing(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
SetServing(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
setServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
setServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
setServing(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
setServing(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall;
}
/**
* A service to remotely control health status of an xDS test server.
*/
export interface XdsUpdateHealthServiceHandlers extends grpc.UntypedServiceImplementation {
SetNotServing(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void;
SetServing(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void;
}

View File

@ -0,0 +1,60 @@
import * as grpc from '../../src';
import { ServiceDefinition, EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader';
import { LoadBalancerStatsServiceClient as _grpc_testing_LoadBalancerStatsServiceClient } from './grpc/testing/LoadBalancerStatsService';
import { ReconnectServiceClient as _grpc_testing_ReconnectServiceClient } from './grpc/testing/ReconnectService';
import { TestServiceClient as _grpc_testing_TestServiceClient } from './grpc/testing/TestService';
import { UnimplementedServiceClient as _grpc_testing_UnimplementedServiceClient } from './grpc/testing/UnimplementedService';
import { XdsUpdateHealthServiceClient as _grpc_testing_XdsUpdateHealthServiceClient } from './grpc/testing/XdsUpdateHealthService';
type ConstructorArguments<Constructor> = Constructor extends new (...args: infer Args) => any ? Args: never;
type SubtypeConstructor<Constructor, Subtype> = {
new(...args: ConstructorArguments<Constructor>): Subtype;
};
export interface ProtoGrpcType {
grpc: {
testing: {
BoolValue: MessageTypeDefinition
EchoStatus: MessageTypeDefinition
Empty: MessageTypeDefinition
GrpclbRouteType: EnumTypeDefinition
LoadBalancerStatsRequest: MessageTypeDefinition
LoadBalancerStatsResponse: MessageTypeDefinition
/**
* A service used to obtain stats for verifying LB behavior.
*/
LoadBalancerStatsService: SubtypeConstructor<typeof grpc.Client, _grpc_testing_LoadBalancerStatsServiceClient> & { service: ServiceDefinition }
Payload: MessageTypeDefinition
PayloadType: EnumTypeDefinition
ReconnectInfo: MessageTypeDefinition
ReconnectParams: MessageTypeDefinition
/**
* A service used to control reconnect server.
*/
ReconnectService: SubtypeConstructor<typeof grpc.Client, _grpc_testing_ReconnectServiceClient> & { service: ServiceDefinition }
ResponseParameters: MessageTypeDefinition
SimpleRequest: MessageTypeDefinition
SimpleResponse: MessageTypeDefinition
StreamingInputCallRequest: MessageTypeDefinition
StreamingInputCallResponse: MessageTypeDefinition
StreamingOutputCallRequest: MessageTypeDefinition
StreamingOutputCallResponse: MessageTypeDefinition
/**
* A simple service to test the various types of RPCs and experiment with
* performance with various types of payload.
*/
TestService: SubtypeConstructor<typeof grpc.Client, _grpc_testing_TestServiceClient> & { service: ServiceDefinition }
/**
* A simple service NOT implemented at servers so clients can test for
* that case.
*/
UnimplementedService: SubtypeConstructor<typeof grpc.Client, _grpc_testing_UnimplementedServiceClient> & { service: ServiceDefinition }
/**
* A service to remotely control health status of an xDS test server.
*/
XdsUpdateHealthService: SubtypeConstructor<typeof grpc.Client, _grpc_testing_XdsUpdateHealthServiceClient> & { service: ServiceDefinition }
}
}
}

View File

@ -0,0 +1,249 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as grpc from '../src';
import { ProtoGrpcType } from './generated/test';
import * as protoLoader from '@grpc/proto-loader';
import { TestServiceClient } from './generated/grpc/testing/TestService';
import { LoadBalancerStatsResponse } from './generated/grpc/testing/LoadBalancerStatsResponse';
import * as yargs from 'yargs';
import { LoadBalancerStatsServiceHandlers } from './generated/grpc/testing/LoadBalancerStatsService';
const packageDefinition = protoLoader.loadSync('grpc/testing/test.proto', {
keepCase: true,
defaults: true,
oneofs: true,
json: true,
longs: String,
enums: String,
includeDirs: [__dirname + '/../../proto']
});
const loadedProto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType;
const REQUEST_TIMEOUT_SEC = 20;
const VERBOSITY = Number.parseInt(process.env.NODE_XDS_INTEROP_VERBOSITY ?? '0');
interface CallEndNotifier {
onCallSucceeded(peerName: string): void;
onCallFailed(message: string): void;
}
class CallSubscriber {
private callsStarted = 0;
private callsSucceededByPeer: {[key: string]: number} = {};
private callsSucceeded = 0;
private callsFinished = 0;
private failureMessageCount: Map<string, number> = new Map<string, number>();
constructor(private callGoal: number, private onFinished: () => void) {}
addCallStarted(): void {
if (VERBOSITY >= 2) {
console.log('Call started');
}
this.callsStarted += 1;
}
private maybeOnFinished() {
if (this.callsFinished == this.callGoal) {
this.onFinished();
}
}
addCallSucceeded(peerName: string): void {
if (VERBOSITY >= 2) {
console.log(`Call to ${peerName} succeeded`);
}
if (peerName in this.callsSucceededByPeer) {
this.callsSucceededByPeer[peerName] += 1;
} else {
this.callsSucceededByPeer[peerName] = 1;
}
this.callsSucceeded += 1;
this.callsFinished += 1;
this.maybeOnFinished();
}
addCallFailed(message: string): void {
if (VERBOSITY >= 2) {
console.log(`Call failed with message ${message}`);
}
this.callsFinished += 1;
this.failureMessageCount.set(message, (this.failureMessageCount.get(message) ?? 0) + 1);
this.maybeOnFinished();
}
needsMoreCalls(): boolean {
return this.callsStarted < this.callGoal;
}
getFinalStats(): LoadBalancerStatsResponse {
if (VERBOSITY >= 1) {
console.log(`Out of a total of ${this.callGoal} calls requested, ${this.callsFinished} finished. ${this.callsSucceeded} succeeded`);
for (const [message, count] of this.failureMessageCount) {
console.log(`${count} failed with the message ${message}`);
}
}
return {
rpcs_by_peer: this.callsSucceededByPeer,
num_failures: this.callsStarted - this.callsSucceeded
};
}
}
class CallStatsTracker {
private subscribers: CallSubscriber[] = [];
getCallStats(callCount: number, timeoutSec: number): Promise<LoadBalancerStatsResponse> {
return new Promise((resolve, reject) => {
let finished = false;
const subscriber = new CallSubscriber(callCount, () => {
if (!finished) {
finished = true;
resolve(subscriber.getFinalStats());
}
});
setTimeout(() => {
if (!finished) {
finished = true;
this.subscribers.splice(this.subscribers.indexOf(subscriber), 1);
resolve(subscriber.getFinalStats());
}
}, timeoutSec * 1000)
this.subscribers.push(subscriber);
})
}
startCall(): CallEndNotifier {
const callSubscribers = this.subscribers.slice();
for (const subscriber of callSubscribers) {
subscriber.addCallStarted();
if (!subscriber.needsMoreCalls()) {
this.subscribers.splice(this.subscribers.indexOf(subscriber), 1);
}
}
return {
onCallSucceeded: (peerName: string) => {
for (const subscriber of callSubscribers) {
subscriber.addCallSucceeded(peerName);
}
},
onCallFailed: (message: string) => {
for (const subscriber of callSubscribers) {
subscriber.addCallFailed(message);
}
}
}
}
}
function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
let anyCallSucceeded: boolean = false;
setInterval(() => {
const notifier = callStatsTracker.startCall();
let gotMetadata: boolean = false;
let hostname: string | null = null;
let completed: boolean = false;
let completedWithError: boolean = false;
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + REQUEST_TIMEOUT_SEC);
const call = client.emptyCall({}, {deadline}, (error, value) => {
if (error) {
if (failOnFailedRpcs && anyCallSucceeded) {
console.error('A call failed after a call succeeded');
process.exit(1);
}
completed = true;
completedWithError = true;
notifier.onCallFailed(error.message);
} else {
anyCallSucceeded = true;
if (gotMetadata) {
if (hostname === null) {
notifier.onCallFailed('Hostname omitted from call metadata');
} else {
notifier.onCallSucceeded(hostname);
}
}
}
});
call.on('metadata', (metadata) => {
hostname = (metadata.get('hostname') as string[])[0] ?? null;
gotMetadata = true;
if (completed && !completedWithError) {
if (hostname === null) {
notifier.onCallFailed('Hostname omitted from call metadata');
} else {
notifier.onCallSucceeded(hostname);
}
}
})
}, 1000/qps);
}
function main() {
const argv = yargs
.string(['fail_on_failed_rpcs', 'server', 'stats_port'])
.number(['num_channels', 'qps'])
.require(['qps', 'server', 'stats_port'])
.default('num_channels', 1)
.argv;
console.log('Starting xDS interop client. Args: ', argv);
const callStatsTracker = new CallStatsTracker();
for (let i = 0; i < argv.num_channels; i++) {
/* The 'unique' channel argument is there solely to ensure that the
* channels do not share any subchannels. It does not have any
* inherent function. */
console.log(`Interop client channel ${i} starting sending ${argv.qps} QPS to ${argv.server}`);
sendConstantQps(new loadedProto.grpc.testing.TestService(argv.server, grpc.credentials.createInsecure(), {'unique': i}),
argv.qps,
argv.fail_on_failed_rpcs === 'true',
callStatsTracker);
}
const loadBalancerStatsServiceImpl: LoadBalancerStatsServiceHandlers = {
GetClientStats: (call, callback) => {
console.log(`Received stats request with num_rpcs=${call.request.num_rpcs} and timeout_sec=${call.request.num_rpcs}`);
callStatsTracker.getCallStats(call.request.num_rpcs, call.request.timeout_sec).then((value) => {
console.log(`Sending stats response: ${JSON.stringify(value)}`);
callback(null, value);
}, (error) => {
callback({code: grpc.status.ABORTED, details: 'Call stats collection failed'});
});
}
}
const server = new grpc.Server();
server.addService(loadedProto.grpc.testing.LoadBalancerStatsService.service, loadBalancerStatsServiceImpl);
server.bindAsync(`0.0.0.0:${argv.stats_port}`, grpc.ServerCredentials.createInsecure(), (error, port) => {
if (error) {
throw error;
}
console.log(`Starting stats service server bound to port ${port}`);
server.start();
});
}
if (require.main === module) {
main();
}

View File

@ -22,6 +22,7 @@
"@types/ncp": "^2.0.1",
"@types/pify": "^3.0.2",
"@types/semver": "^6.0.1",
"@types/yargs": "^15.0.5",
"clang-format": "^1.0.55",
"execa": "^2.0.3",
"gts": "^2.0.0",
@ -33,7 +34,8 @@
"pify": "^4.0.1",
"rimraf": "^3.0.2",
"ts-node": "^8.3.0",
"typescript": "^3.7.2"
"typescript": "^3.7.2",
"yargs": "^15.4.1"
},
"contributors": [
{
@ -46,6 +48,7 @@
"compile": "tsc -p .",
"format": "clang-format -i -style=\"{Language: JavaScript, BasedOnStyle: Google, ColumnLimit: 80}\" src/*.ts test/*.ts",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib ../index envoy/service/discovery/v2/ads.proto envoy/service/load_stats/v2/lrs.proto envoy/api/v2/listener.proto envoy/api/v2/route.proto envoy/api/v2/cluster.proto envoy/api/v2/endpoint.proto envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto",
"generate-interop-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O interop/generated --grpcLib ../../src grpc/testing/test.proto",
"lint": "npm run check",
"prepare": "npm run compile",
"test": "gulp test",
@ -57,7 +60,7 @@
"dependencies": {
"@grpc/proto-loader": "^0.6.0-pre14",
"@types/node": "^12.12.47",
"google-auth-library": "^6.0.0",
"google-auth-library": "^5.10.1",
"semver": "^6.2.0"
},
"files": [

View File

@ -0,0 +1,28 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package grpc.testing;
// An empty message that you can re-use to avoid defining duplicated empty
// messages in your project. A typical example is to use it as argument or the
// return value of a service API. For instance:
//
// service Foo {
// rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { };
// };
//
message Empty {}

View File

@ -0,0 +1,214 @@
// Copyright 2015-2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Message definitions to be used by integration test service definitions.
syntax = "proto3";
package grpc.testing;
// TODO(dgq): Go back to using well-known types once
// https://github.com/grpc/grpc/issues/6980 has been fixed.
// import "google/protobuf/wrappers.proto";
message BoolValue {
// The bool value.
bool value = 1;
}
// The type of payload that should be returned.
enum PayloadType {
// Compressable text format.
COMPRESSABLE = 0;
}
// A block of data, to simply increase gRPC message size.
message Payload {
// The type of data in body.
PayloadType type = 1;
// Primary contents of payload.
bytes body = 2;
}
// A protobuf representation for grpc status. This is used by test
// clients to specify a status that the server should attempt to return.
message EchoStatus {
int32 code = 1;
string message = 2;
}
// The type of route that a client took to reach a server w.r.t. gRPCLB.
// The server must fill in "fallback" if it detects that the RPC reached
// the server via the "gRPCLB fallback" path, and "backend" if it detects
// that the RPC reached the server via "gRPCLB backend" path (i.e. if it got
// the address of this server from the gRPCLB server BalanceLoad RPC). Exactly
// how this detection is done is context and server dependent.
enum GrpclbRouteType {
// Server didn't detect the route that a client took to reach it.
GRPCLB_ROUTE_TYPE_UNKNOWN = 0;
// Indicates that a client reached a server via gRPCLB fallback.
GRPCLB_ROUTE_TYPE_FALLBACK = 1;
// Indicates that a client reached a server as a gRPCLB-given backend.
GRPCLB_ROUTE_TYPE_BACKEND = 2;
}
// Unary request.
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
PayloadType response_type = 1;
// Desired payload size in the response from the server.
int32 response_size = 2;
// Optional input payload sent along with the request.
Payload payload = 3;
// Whether SimpleResponse should include username.
bool fill_username = 4;
// Whether SimpleResponse should include OAuth scope.
bool fill_oauth_scope = 5;
// Whether to request the server to compress the response. This field is
// "nullable" in order to interoperate seamlessly with clients not able to
// implement the full compression tests by introspecting the call to verify
// the response's compression status.
BoolValue response_compressed = 6;
// Whether server should return a given status
EchoStatus response_status = 7;
// Whether the server should expect this request to be compressed.
BoolValue expect_compressed = 8;
// Whether SimpleResponse should include server_id.
bool fill_server_id = 9;
// Whether SimpleResponse should include grpclb_route_type.
bool fill_grpclb_route_type = 10;
}
// Unary response, as configured by the request.
message SimpleResponse {
// Payload to increase message size.
Payload payload = 1;
// The user the request came from, for verifying authentication was
// successful when the client expected it.
string username = 2;
// OAuth scope.
string oauth_scope = 3;
// Server ID. This must be unique among different server instances,
// but the same across all RPC's made to a particular server instance.
string server_id = 4;
// gRPCLB Path.
GrpclbRouteType grpclb_route_type = 5;
// Server hostname.
string hostname = 6;
}
// Client-streaming request.
message StreamingInputCallRequest {
// Optional input payload sent along with the request.
Payload payload = 1;
// Whether the server should expect this request to be compressed. This field
// is "nullable" in order to interoperate seamlessly with servers not able to
// implement the full compression tests by introspecting the call to verify
// the request's compression status.
BoolValue expect_compressed = 2;
// Not expecting any payload from the response.
}
// Client-streaming response.
message StreamingInputCallResponse {
// Aggregated size of payloads received from the client.
int32 aggregated_payload_size = 1;
}
// Configuration for a particular response.
message ResponseParameters {
// Desired payload sizes in responses from the server.
int32 size = 1;
// Desired interval between consecutive responses in the response stream in
// microseconds.
int32 interval_us = 2;
// Whether to request the server to compress the response. This field is
// "nullable" in order to interoperate seamlessly with clients not able to
// implement the full compression tests by introspecting the call to verify
// the response's compression status.
BoolValue compressed = 3;
}
// Server-streaming request.
message StreamingOutputCallRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, the payload from each response in the stream
// might be of different types. This is to simulate a mixed type of payload
// stream.
PayloadType response_type = 1;
// Configuration for each expected response message.
repeated ResponseParameters response_parameters = 2;
// Optional input payload sent along with the request.
Payload payload = 3;
// Whether server should return a given status
EchoStatus response_status = 7;
}
// Server-streaming response, as configured by the request and parameters.
message StreamingOutputCallResponse {
// Payload to increase response size.
Payload payload = 1;
}
// For reconnect interop test only.
// Client tells server what reconnection parameters it used.
message ReconnectParams {
int32 max_reconnect_backoff_ms = 1;
}
// For reconnect interop test only.
// Server tells client whether its reconnects are following the spec and the
// reconnect backoffs it saw.
message ReconnectInfo {
bool passed = 1;
repeated int32 backoff_ms = 2;
}
message LoadBalancerStatsRequest {
// Request stats for the next num_rpcs sent by client.
int32 num_rpcs = 1;
// If num_rpcs have not completed within timeout_sec, return partial results.
int32 timeout_sec = 2;
}
message LoadBalancerStatsResponse {
message RpcsByPeer {
// The number of completed RPCs for each peer.
map<string, int32> rpcs_by_peer = 1;
}
// The number of completed RPCs for each peer.
map<string, int32> rpcs_by_peer = 1;
// The number of RPCs that failed to record a remote peer.
int32 num_failures = 2;
map<string, RpcsByPeer> rpcs_by_method = 3;
}

View File

@ -0,0 +1,92 @@
// Copyright 2015-2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
syntax = "proto3";
import "grpc/testing/empty.proto";
import "grpc/testing/messages.proto";
package grpc.testing;
// A simple service to test the various types of RPCs and experiment with
// performance with various types of payload.
service TestService {
// One empty request followed by one empty response.
rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty);
// One request followed by one response.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by one response. Response has cache control
// headers set such that a caching HTTP proxy (such as GFE) can
// satisfy subsequent requests.
rpc CacheableUnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by a sequence of responses (streamed download).
// The server returns the payload with client desired type and sizes.
rpc StreamingOutputCall(StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
// A sequence of requests followed by one response (streamed upload).
// The server returns the aggregated size of client payload as the result.
rpc StreamingInputCall(stream StreamingInputCallRequest)
returns (StreamingInputCallResponse);
// A sequence of requests with each request served by the server immediately.
// As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing.
rpc FullDuplexCall(stream StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
// A sequence of requests followed by a sequence of responses.
// The server buffers all the client requests and then serves them in order. A
// stream of responses are returned to the client when the server starts with
// first request.
rpc HalfDuplexCall(stream StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
// The test server will not implement this method. It will be used
// to test the behavior when clients call unimplemented methods.
rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty);
}
// A simple service NOT implemented at servers so clients can test for
// that case.
service UnimplementedService {
// A call that no server should implement
rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty);
}
// A service used to control reconnect server.
service ReconnectService {
rpc Start(grpc.testing.ReconnectParams) returns (grpc.testing.Empty);
rpc Stop(grpc.testing.Empty) returns (grpc.testing.ReconnectInfo);
}
// A service used to obtain stats for verifying LB behavior.
service LoadBalancerStatsService {
// Gets the backend distribution for RPCs sent by a test client.
rpc GetClientStats(LoadBalancerStatsRequest)
returns (LoadBalancerStatsResponse) {}
}
// A service to remotely control health status of an xDS test server.
service XdsUpdateHealthService {
rpc SetServing(grpc.testing.Empty) returns (grpc.testing.Empty);
rpc SetNotServing(grpc.testing.Empty) returns (grpc.testing.Empty);
}

53
packages/grpc-js/scripts/xds.sh Executable file
View File

@ -0,0 +1,53 @@
#!/bin/bash
# Copyright 2020 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Install NVM
cd ~
export NVM_DIR=`pwd`/.nvm
curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.33.4/install.sh | bash
# Load NVM
. $NVM_DIR/nvm.sh
nvm install 12
set -exu -o pipefail
[[ -f /VERSION ]] && cat /VERSION
cd $(dirname $0)/..
base=$(pwd)
npm run compile
cd ../../..
git clone -b master --single-branch --depth=1 https://github.com/grpc/grpc.git
grpc/tools/run_tests/helper_scripts/prep_xds.sh
GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver GRPC_NODE_VERBOSITY=DEBUG \
python3 grpc/tools/run_tests/run_xds_tests.py \
--test_case="backends_restart,change_backend_service,gentle_failover,ping_pong,remove_instance_group,round_robin,secondary_locality_gets_no_requests_on_partial_primary_failure,secondary_locality_gets_requests_on_primary_failure" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server-2 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
--gcp_suffix=$(date '+%s') \
--verbose \
--client_cmd="node grpc-node/packages/grpc-js/build/interop/xds-interop-client \
--server=xds:///{server_uri} \
--stats_port={stats_port} \
--qps={qps} \
{fail_on_failed_rpc} \
{rpcs_to_send} \
{metadata_to_send}"

View File

@ -34,6 +34,14 @@ import { ConnectivityState } from './channel';
import { UnavailablePicker } from './picker';
import { Status } from './constants';
import { Metadata } from './metadata';
import * as logging from './logging';
import { LogVerbosity } from './constants';
const TRACER_NAME = 'cds_balancer';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const TYPE_NAME = 'cds';
@ -70,6 +78,7 @@ export class CdsLoadBalancer implements LoadBalancer {
* Otherwise, if the field is omitted, load reporting is disabled. */
edsConfig.lrsLoadReportingServerName = '';
}
trace('Child update EDS config: ' + JSON.stringify(edsConfig));
this.childBalancer.updateAddressList(
[],
{ name: 'eds', eds: edsConfig },
@ -82,6 +91,8 @@ export class CdsLoadBalancer implements LoadBalancer {
this.watcher
);
this.isWatcherActive = false;
this.channelControlHelper.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: Status.UNAVAILABLE, details: 'CDS resource does not exist', metadata: new Metadata()}));
this.childBalancer.destroy();
},
onTransientError: (status) => {
if (this.latestCdsUpdate === null) {
@ -104,11 +115,14 @@ export class CdsLoadBalancer implements LoadBalancer {
attributes: { [key: string]: unknown }
): void {
if (!isCdsLoadBalancingConfig(lbConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig));
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
trace('Discarding address list update missing xdsClient attribute');
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig));
this.xdsClient = attributes.xdsClient;
this.latestAttributes = attributes;

View File

@ -133,9 +133,11 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
destroy(): void {
if (this.currentChild) {
this.currentChild.destroy();
this.currentChild = null;
}
if (this.pendingChild) {
this.pendingChild.destroy();
this.pendingChild = null;
}
}
getTypeName(): string {

View File

@ -21,7 +21,7 @@ import {
registerLoadBalancerType,
getFirstUsableConfig,
} from './load-balancer';
import { SubchannelAddress } from './subchannel';
import { SubchannelAddress, subchannelAddressToString } from './subchannel';
import {
LoadBalancingConfig,
isEdsLoadBalancingConfig,
@ -40,6 +40,14 @@ import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { LocalitySubchannelAddress } from './load-balancer-priority';
import { Status } from './constants';
import { Metadata } from './metadata';
import * as logging from './logging';
import { LogVerbosity } from './constants';
const TRACER_NAME = 'eds_balancer';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const TYPE_NAME = 'eds';
@ -127,6 +135,7 @@ export class EdsLoadBalancer implements LoadBalancer {
});
this.watcher = {
onValidUpdate: (update) => {
trace('Received EDS update for ' + this.edsServiceName + ': ' + JSON.stringify(update));
this.latestEdsUpdate = update;
this.updateChild();
},
@ -136,6 +145,8 @@ export class EdsLoadBalancer implements LoadBalancer {
this.watcher
);
this.isWatcherActive = false;
this.channelControlHelper.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: Status.UNAVAILABLE, details: 'EDS resource does not exist', metadata: new Metadata()}));
this.childBalancer.destroy();
},
onTransientError: (status) => {
if (this.latestEdsUpdate === null) {
@ -207,6 +218,10 @@ export class EdsLoadBalancer implements LoadBalancer {
weight: number;
addresses: SubchannelAddress[];
}[][] = [];
/**
* New replacement for this.localityPriorities, mapping locality names to
* priority values. The replacement occurrs at the end of this method.
*/
const newLocalityPriorities: Map<string, number> = new Map<
string,
number
@ -215,12 +230,10 @@ export class EdsLoadBalancer implements LoadBalancer {
* loop consolidates localities into buckets by priority, while also
* simplifying the data structure to make the later steps simpler */
for (const endpoint of this.latestEdsUpdate.endpoints) {
let localityArray = priorityList[endpoint.priority];
if (localityArray === undefined) {
localityArray = [];
priorityList[endpoint.priority] = localityArray;
if (!endpoint.load_balancing_weight) {
continue;
}
const addresses: SubchannelAddress[] = endpoint.lb_endpoints.map(
const addresses: SubchannelAddress[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map(
(lbEndpoint) => {
/* The validator in the XdsClient class ensures that each endpoint has
* a socket_address with an IP address and a port_value. */
@ -231,15 +244,22 @@ export class EdsLoadBalancer implements LoadBalancer {
};
}
);
localityArray.push({
locality: endpoint.locality!,
addresses: addresses,
weight: endpoint.load_balancing_weight?.value ?? 0,
});
newLocalityPriorities.set(
localityToName(endpoint.locality!),
endpoint.priority
);
if (addresses.length > 0) {
let localityArray = priorityList[endpoint.priority];
if (localityArray === undefined) {
localityArray = [];
priorityList[endpoint.priority] = localityArray;
}
localityArray.push({
locality: endpoint.locality!,
addresses: addresses,
weight: endpoint.load_balancing_weight.value,
});
newLocalityPriorities.set(
localityToName(endpoint.locality!),
endpoint.priority
);
}
}
const newPriorityNames: string[] = [];
@ -256,6 +276,7 @@ export class EdsLoadBalancer implements LoadBalancer {
* - Otherwise, construct a new name using this.nextPriorityChildNumber.
*/
for (const [priority, localityArray] of priorityList.entries()) {
// Skip priorities that have no localities with healthy endpoints
if (localityArray === undefined) {
continue;
}
@ -356,6 +377,11 @@ export class EdsLoadBalancer implements LoadBalancer {
priorities: newPriorityNames.filter((value) => value !== undefined),
},
};
trace('Child update addresses: ' + addressList.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'));
trace('Child update priority list: ' + childConfig.priority.priorities);
for (const [childName, child] of childConfig.priority.children) {
trace('Child update priority config: ' + childName + ' -> ' + JSON.stringify(child));
}
this.childBalancer.updateAddressList(
addressList,
childConfig,
@ -372,11 +398,14 @@ export class EdsLoadBalancer implements LoadBalancer {
attributes: { [key: string]: unknown }
): void {
if (!isEdsLoadBalancingConfig(lbConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig));
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
trace('Discarding address list update missing xdsClient attribute');
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig));
this.lastestConfig = lbConfig;
this.latestAttributes = attributes;
this.xdsClient = attributes.xdsClient;

View File

@ -21,7 +21,7 @@ import {
getFirstUsableConfig,
registerLoadBalancerType,
} from './load-balancer';
import { SubchannelAddress } from './subchannel';
import { SubchannelAddress, subchannelAddressToString } from './subchannel';
import {
LoadBalancingConfig,
isPriorityLoadBalancingConfig,
@ -32,6 +32,14 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { ChannelOptions } from './channel-options';
import { Status } from './constants';
import { Metadata } from './metadata';
import * as logging from './logging';
import { LogVerbosity } from './constants';
const TRACER_NAME = 'priority';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const TYPE_NAME = 'priority';
@ -103,6 +111,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
private updateState(connectivityState: ConnectivityState, picker: Picker) {
trace('Child ' + this.name + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[connectivityState]);
this.connectivityState = connectivityState;
this.picker = picker;
this.parent.onChildStateChange(this);
@ -110,7 +119,9 @@ export class PriorityLoadBalancer implements LoadBalancer {
private startFailoverTimer() {
if (this.failoverTimer === null) {
trace('Starting failover timer for child ' + this.name);
this.failoverTimer = setTimeout(() => {
trace('Failover timer triggered for child ' + this.name);
this.failoverTimer = null;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
@ -222,6 +233,10 @@ export class PriorityLoadBalancer implements LoadBalancer {
constructor(private channelControlHelper: ChannelControlHelper) {}
private updateState(state: ConnectivityState, picker: Picker) {
trace(
'Transitioning to ' +
ConnectivityState[state]
);
/* If switching to IDLE, use a QueuePicker attached to this load balancer
* so that when the picker calls exitIdle, that in turn calls exitIdle on
* the PriorityChildImpl, which will start the failover timer. */
@ -233,6 +248,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
private onChildStateChange(child: PriorityChildBalancer) {
const childState = child.getConnectivityState();
trace('Child ' + child.getName() + ' transitioning to ' + ConnectivityState[childState]);
if (child === this.currentChildFromBeforeUpdate) {
if (
childState === ConnectivityState.READY ||
@ -295,6 +311,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
private selectPriority(priority: number) {
this.currentPriority = priority;
const chosenChild = this.children.get(this.priorities[priority])!;
chosenChild.cancelFailoverTimer();
this.updateState(
chosenChild.getConnectivityState(),
chosenChild.getPicker()
@ -369,6 +386,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
): void {
if (!isPriorityLoadBalancingConfig(lbConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig));
return;
}
const priorityConfig = lbConfig.priority;
@ -416,6 +434,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
const chosenChildConfig = getFirstUsableConfig(childConfig.config);
if (chosenChildConfig !== null) {
const childAddresses = childAddressMap.get(childName) ?? [];
trace('Assigning child ' + childName + ' address list ' + childAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'))
this.latestUpdates.set(childName, {
subchannelAddress: childAddresses,
lbConfig: chosenChildConfig,
@ -433,6 +452,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
// Deactivate all children that are no longer in the priority list
for (const [childName, child] of this.children) {
if (this.priorities.indexOf(childName) < 0) {
trace('Deactivating child ' + childName);
child.deactivate();
}
}

View File

@ -16,7 +16,7 @@
*/
import { LoadBalancer, ChannelControlHelper, getFirstUsableConfig, registerLoadBalancerType } from "./load-balancer";
import { SubchannelAddress } from "./subchannel";
import { SubchannelAddress, subchannelAddressToString } from "./subchannel";
import { LoadBalancingConfig, WeightedTarget, isWeightedTargetLoadBalancingConfig } from "./load-balancing-config";
import { Picker, PickResult, PickArgs, QueuePicker, UnavailablePicker } from "./picker";
import { ConnectivityState } from "./channel";
@ -24,6 +24,14 @@ import { ChildLoadBalancerHandler } from "./load-balancer-child-handler";
import { Status } from "./constants";
import { Metadata } from "./metadata";
import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority";
import * as logging from './logging';
import { LogVerbosity } from './constants';
const TRACER_NAME = 'weighted_target';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const TYPE_NAME = 'weighted_target';
@ -116,6 +124,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
}
private updateState(connectivityState: ConnectivityState, picker: Picker) {
trace('Target ' + this.name + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[connectivityState]);
this.connectivityState = connectivityState;
this.picker = picker;
this.parent.updateState();
@ -229,7 +238,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
picker = new WeightedTargetPicker(pickerList);
break;
case ConnectivityState.CONNECTING:
case ConnectivityState.READY:
case ConnectivityState.IDLE:
picker = new QueuePicker(this);
break;
default:
@ -239,12 +248,17 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
metadata: new Metadata()
});
}
trace(
'Transitioning to ' +
ConnectivityState[connectivityState]
);
this.channelControlHelper.updateState(connectivityState, picker);
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!isWeightedTargetLoadBalancingConfig(lbConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig));
return;
}
@ -252,7 +266,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
* which child it belongs to. So we bucket those addresses by that first
* element, and pass along the rest of the localityPath for that child
* to use. */
const childAddressMap = new Map<string, SubchannelAddress[]>();
const childAddressMap = new Map<string, LocalitySubchannelAddress[]>();
for (const address of addressList) {
if (!isLocalitySubchannelAddress(address)) {
// Reject address that cannot be associated with targets
@ -284,12 +298,15 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
} else {
target.maybeReactivate();
}
target.updateAddressList(childAddressMap.get(targetName) ?? [], targetConfig, attributes);
const targetAddresses = childAddressMap.get(targetName) ?? [];
trace('Assigning target ' + targetName + ' address list ' + targetAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'));
target.updateAddressList(targetAddresses, targetConfig, attributes);
}
// Deactivate targets that are not in the new config
for (const [targetName, target] of this.targets) {
if (this.targetList.indexOf(targetName) < 0) {
trace('Deactivating target ' + targetName);
target.deactivate();
}
}

View File

@ -20,20 +20,20 @@ import { LogVerbosity } from './constants';
let _logger: Partial<Console> = console;
let _logVerbosity: LogVerbosity = LogVerbosity.ERROR;
if (process.env.GRPC_VERBOSITY) {
switch (process.env.GRPC_VERBOSITY) {
case 'DEBUG':
_logVerbosity = LogVerbosity.DEBUG;
break;
case 'INFO':
_logVerbosity = LogVerbosity.INFO;
break;
case 'ERROR':
_logVerbosity = LogVerbosity.ERROR;
break;
default:
// Ignore any other values
}
const verbosityString = process.env.GRPC_NODE_VERBOSITY ?? process.env.GRPC_VERBOSITY ?? '';
switch (verbosityString) {
case 'DEBUG':
_logVerbosity = LogVerbosity.DEBUG;
break;
case 'INFO':
_logVerbosity = LogVerbosity.INFO;
break;
case 'ERROR':
_logVerbosity = LogVerbosity.ERROR;
break;
default:
// Ignore any other values
}
export const getLogger = (): Partial<Console> => {
@ -55,9 +55,8 @@ export const log = (severity: LogVerbosity, ...args: any[]): void => {
}
};
const enabledTracers = process.env.GRPC_TRACE
? process.env.GRPC_TRACE.split(',')
: [];
const tracersString = process.env.GRPC_NODE_TRACE ?? process.env.GRPC_TRACE ?? '';
const enabledTracers = tracersString.split(',');
const allEnabled = enabledTracers.includes('all');
export function trace(

View File

@ -19,9 +19,16 @@ import { GrpcUri, uriToString } from './uri-parser';
import { XdsClient } from './xds-client';
import { ServiceConfig } from './service-config';
import { StatusObject } from './call-stream';
import { Status } from './constants';
import { Status, LogVerbosity } from './constants';
import { Metadata } from './metadata';
import { ChannelOptions } from './channel-options';
import * as logging from './logging';
const TRACER_NAME = 'xds_resolver';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
class XdsResolver implements Resolver {
private resolutionStarted = false;
@ -47,10 +54,12 @@ class XdsResolver implements Resolver {
// Wait until updateResolution is called once to start the xDS requests
if (!this.resolutionStarted) {
this.resolutionStarted = true;
trace('Starting resolution for target ' + uriToString(this.target));
const xdsClient = new XdsClient(
this.target.path,
{
onValidUpdate: (update: ServiceConfig) => {
trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update));
this.hasReportedSuccess = true;
this.listener.onSuccessfulResolution([], update, null, {
xdsClient: xdsClient,
@ -60,10 +69,12 @@ class XdsResolver implements Resolver {
/* A transient error only needs to bubble up as a failure if we have
* not already provided a ServiceConfig for the upper layer to use */
if (!this.hasReportedSuccess) {
trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details);
this.reportResolutionError();
}
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ': resource does not exist');
this.reportResolutionError();
},
},

View File

@ -18,6 +18,7 @@
import { ServiceConfig } from './service-config';
import * as resolver_dns from './resolver-dns';
import * as resolver_uds from './resolver-uds';
import * as resolver_xds from './resolver-xds';
import { StatusObject } from './call-stream';
import { SubchannelAddress } from './subchannel';
import { GrpcUri, uriToString } from './uri-parser';
@ -156,4 +157,5 @@ export function mapUriDefaultScheme(target: GrpcUri): GrpcUri | null {
export function registerAll() {
resolver_dns.setup();
resolver_uds.setup();
resolver_xds.setup();
}

View File

@ -81,7 +81,7 @@ export type ServerSurfaceCall = {
} & EventEmitter;
export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
request: RequestType | null;
request: RequestType;
};
export type ServerReadableStream<
RequestType,
@ -92,7 +92,7 @@ export type ServerWritableStream<
ResponseType
> = ServerSurfaceCall &
ObjectWritable<ResponseType> & {
request: RequestType | null;
request: RequestType;
end: (metadata?: Metadata) => void;
};
export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
@ -102,15 +102,14 @@ export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
implements ServerUnaryCall<RequestType, ResponseType> {
cancelled: boolean;
request: RequestType | null;
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata
public metadata: Metadata,
public request: RequestType
) {
super();
this.cancelled = false;
this.request = null;
this.call.setupSurfaceCall(this);
}
@ -160,17 +159,16 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
extends Writable
implements ServerWritableStream<RequestType, ResponseType> {
cancelled: boolean;
request: RequestType | null;
private trailingMetadata: Metadata;
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata,
public serialize: Serialize<ResponseType>
public serialize: Serialize<ResponseType>,
public request: RequestType
) {
super({ objectMode: true });
this.cancelled = false;
this.request = null;
this.trailingMetadata = new Metadata();
this.call.setupSurfaceCall(this);
@ -280,7 +278,7 @@ ServerDuplexStreamImpl.prototype.end = ServerWritableStreamImpl.prototype.end;
// Unary response callback signature.
export type sendUnaryData<ResponseType> = (
error: ServerErrorResponse | ServerStatusResponse | null,
value: ResponseType | null,
value?: ResponseType | null,
trailer?: Metadata,
flags?: number
) => void;
@ -524,7 +522,7 @@ export class Http2ServerCallStream<
async sendUnaryMessage(
err: ServerErrorResponse | ServerStatusResponse | null,
value: ResponseType | null,
value?: ResponseType | null,
metadata?: Metadata,
flags?: number
) {

View File

@ -632,22 +632,23 @@ async function handleUnary<RequestType, ResponseType>(
handler: UnaryHandler<RequestType, ResponseType>,
metadata: Metadata
): Promise<void> {
const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
call,
metadata
);
const request = await call.receiveUnaryMessage();
if (request === undefined || call.cancelled) {
return;
}
const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
call,
metadata,
request
);
emitter.request = request;
handler.func(
emitter,
(
err: ServerErrorResponse | ServerStatusResponse | null,
value: ResponseType | null,
value?: ResponseType | null,
trailer?: Metadata,
flags?: number
) => {
@ -669,7 +670,7 @@ function handleClientStreaming<RequestType, ResponseType>(
function respond(
err: ServerErrorResponse | ServerStatusResponse | null,
value: ResponseType | null,
value?: ResponseType | null,
trailer?: Metadata,
flags?: number
) {
@ -699,10 +700,10 @@ async function handleServerStreaming<RequestType, ResponseType>(
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
call,
metadata,
handler.serialize
handler.serialize,
request
);
stream.request = request;
handler.func(stream);
}

View File

@ -28,7 +28,7 @@ import * as logging from './logging';
import { LogVerbosity } from './constants';
import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
import * as net from 'net';
import { GrpcUri, parseUri, splitHostPort } from './uri-parser';
import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
import { ConnectionOptions } from 'tls';
import { FilterFactory, Filter } from './filter';
@ -40,6 +40,10 @@ function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
function refTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', text);
}
const MIN_CONNECT_TIMEOUT_MS = 20000;
const INITIAL_BACKOFF_MS = 1000;
const BACKOFF_MULTIPLIER = 1.6;
@ -263,6 +267,7 @@ export class Subchannel {
}
private sendPing() {
logging.trace(LogVerbosity.DEBUG, 'keepalive', 'Sending ping to ' + this.subchannelAddressString);
this.keepaliveTimeoutId = setTimeout(() => {
this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
}, this.keepaliveTimeoutMs);
@ -277,7 +282,8 @@ export class Subchannel {
this.keepaliveIntervalId = setInterval(() => {
this.sendPing();
}, this.keepaliveTimeMs);
this.sendPing();
/* Don't send a ping immediately because whatever caused us to start
* sending pings should also involve some network activity. */
}
private stopKeepalivePings() {
@ -405,14 +411,14 @@ export class Subchannel {
errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
opaqueData.equals(tooManyPingsData)
) {
logging.log(
LogVerbosity.ERROR,
`Connection to ${this.channelTarget} rejected by server because of excess pings`
);
this.keepaliveTimeMs = Math.min(
2 * this.keepaliveTimeMs,
KEEPALIVE_MAX_TIME_MS
);
logging.log(
LogVerbosity.ERROR,
`Connection to ${uriToString(this.channelTarget)} at ${this.subchannelAddressString} rejected by server because of excess pings. Increasing ping interval to ${this.keepaliveTimeMs} ms`
);
}
trace(
this.subchannelAddressString +
@ -585,7 +591,7 @@ export class Subchannel {
}
callRef() {
trace(
refTrace(
this.subchannelAddressString +
' callRefcount ' +
this.callRefcount +
@ -602,7 +608,7 @@ export class Subchannel {
}
callUnref() {
trace(
refTrace(
this.subchannelAddressString +
' callRefcount ' +
this.callRefcount +
@ -620,7 +626,7 @@ export class Subchannel {
}
ref() {
trace(
refTrace(
this.subchannelAddressString +
' refcount ' +
this.refcount +
@ -631,7 +637,7 @@ export class Subchannel {
}
unref() {
trace(
refTrace(
this.subchannelAddressString +
' refcount ' +
this.refcount +
@ -691,7 +697,7 @@ export class Subchannel {
for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
}
trace('Starting stream with headers\n' + headersString);
logging.trace(LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' + this.subchannelAddressString + ' with headers\n' + headersString);
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory);
}

View File

@ -157,15 +157,6 @@ function validateNode(obj: any): Node {
throw new Error(`node.id field: expected string, got ${typeof obj.id}`);
}
result.id = obj.id;
if (!('cluster' in obj)) {
throw new Error('cluster field missing in node element');
}
if (typeof obj.cluster !== 'string') {
throw new Error(
`node.cluster field: expected string, got ${typeof obj.cluster}`
);
}
result.cluster = obj.cluster;
if (!('locality' in obj)) {
throw new Error('locality field missing in node element');
}
@ -180,7 +171,7 @@ function validateNode(obj: any): Node {
result.locality.region = obj.locality.region;
}
if ('zone' in obj.locality) {
if (typeof obj.locality.region !== 'string') {
if (typeof obj.locality.zone !== 'string') {
throw new Error(
`node.locality.zone field: expected string, got ${typeof obj.locality
.zone}`
@ -197,6 +188,14 @@ function validateNode(obj: any): Node {
}
result.locality.sub_zone = obj.locality.sub_zone;
}
if ('cluster' in obj) {
if (typeof obj.cluster !== 'string') {
throw new Error(
`node.cluster field: expected string, got ${typeof obj.cluster}`
);
}
result.cluster = obj.cluster;
}
if ('metadata' in obj) {
result.metadata = getStructFromJson(obj.metadata);
}

View File

@ -54,6 +54,7 @@ import { Listener__Output } from './generated/envoy/api/v2/Listener';
import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration';
import { Any__Output } from './generated/google/protobuf/Any';
import { BackoffTimeout } from './backoff-timeout';
const TRACER_NAME = 'xds_client';
@ -105,6 +106,7 @@ function loadAdsProtos(): Promise<
enums: String,
defaults: true,
oneofs: true,
json: true,
includeDirs: [
// Paths are relative to src/build
__dirname + '/../../deps/envoy-api/',
@ -259,6 +261,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Adding EDS watcher for edsServiceName ' + edsServiceName);
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
@ -275,6 +278,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
watcher.onValidUpdate(message);
});
}
@ -288,6 +292,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Removing EDS watcher for edsServiceName ' + edsServiceName);
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
@ -341,6 +346,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
handleMissingNames(allEdsServiceNames: Set<string>) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allEdsServiceNames.has(edsServiceName)) {
trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
@ -351,7 +357,8 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
handleResponses(responses: ClusterLoadAssignment__Output[]) {
for (const message of responses) {
if (!this.validateResponse(message)) {
return 'ClusterLoadAssignment validation failed';
trace('EDS validation failed for message ' + JSON.stringify(message));
return 'EDS Error: ClusterLoadAssignment validation failed';
}
}
this.latestResponses = responses;
@ -363,6 +370,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
watcher.onValidUpdate(message);
}
}
trace('Received EDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
return null;
}
@ -399,6 +407,7 @@ class CdsState implements XdsStreamState<Cluster__Output> {
* @param watcher
*/
addWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Adding CDS watcher for clusterName ' + clusterName);
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
@ -415,6 +424,7 @@ class CdsState implements XdsStreamState<Cluster__Output> {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
watcher.onValidUpdate(message);
});
}
@ -425,6 +435,7 @@ class CdsState implements XdsStreamState<Cluster__Output> {
}
removeWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Removing CDS watcher for clusterName ' + clusterName);
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
@ -465,14 +476,15 @@ class CdsState implements XdsStreamState<Cluster__Output> {
}
/**
* Given a list of edsServiceNames (which may actually be the cluster name),
* Given a list of clusterNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
private handleMissingNames(allClusterNames: Set<string>) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(edsServiceName)) {
for (const [clusterName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(clusterName)) {
trace('Reporting CDS resource does not exist for clusterName ' + clusterName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
@ -483,7 +495,8 @@ class CdsState implements XdsStreamState<Cluster__Output> {
handleResponses(responses: Cluster__Output[]): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
return 'Cluster validation failed';
trace('CDS validation failed for message ' + JSON.stringify(message));
return 'CDS Error: Cluster validation failed';
}
}
this.latestResponses = responses;
@ -500,6 +513,7 @@ class CdsState implements XdsStreamState<Cluster__Output> {
watcher.onValidUpdate(message);
}
}
trace('Received CDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
this.edsState.handleMissingNames(allEdsServiceNames);
return null;
@ -521,6 +535,7 @@ class RdsState implements XdsStreamState<RouteConfiguration__Output> {
private routeConfigName: string | null = null;
constructor(
private targetName: string,
private watcher: Watcher<ServiceConfig>,
private updateResouceNames: () => void
) {}
@ -531,9 +546,10 @@ class RdsState implements XdsStreamState<RouteConfiguration__Output> {
handleSingleMessage(message: RouteConfiguration__Output) {
for (const virtualHost of message.virtual_hosts) {
if (virtualHost.domains.indexOf(this.routeConfigName!) >= 0) {
if (virtualHost.domains.indexOf(this.targetName) >= 0) {
const route = virtualHost.routes[virtualHost.routes.length - 1];
if (route.match?.prefix === '' && route.route?.cluster) {
trace('Reporting RDS update for host ' + this.targetName + ' with cluster ' + route.route.cluster);
this.watcher.onValidUpdate({
methodConfig: [],
loadBalancingConfig: [
@ -545,16 +561,20 @@ class RdsState implements XdsStreamState<RouteConfiguration__Output> {
},
],
});
break;
return;
} else {
trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster);
}
}
}
trace('Reporting RDS resource does not exist from domain lists ' + message.virtual_hosts.map(virtualHost => virtualHost.domains));
/* If none of the routes match the one we are looking for, bubble up an
* error. */
this.watcher.onResourceDoesNotExist();
}
handleResponses(responses: RouteConfiguration__Output[]): string | null {
trace('Received RDS response with route config names ' + responses.map(message => message.name));
if (this.routeConfigName !== null) {
for (const message of responses) {
if (message.name === this.routeConfigName) {
@ -613,6 +633,7 @@ class LdsState implements XdsStreamState<Listener__Output> {
}
handleResponses(responses: Listener__Output[]): string | null {
trace('Received LDS update with names ' + responses.map(message => message.name));
for (const message of responses) {
if (message.name === this.targetName) {
if (this.validateResponse(message)) {
@ -622,11 +643,13 @@ class LdsState implements XdsStreamState<Listener__Output> {
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
trace('Received LDS update with RDS route config name ' + httpConnectionManager.rds!.route_config_name);
this.rdsState.setRouteConfigName(
httpConnectionManager.rds!.route_config_name
);
break;
case 'route_config':
trace('Received LDS update with route configuration');
this.rdsState.setRouteConfigName(null);
this.rdsState.handleSingleMessage(
httpConnectionManager.route_config!
@ -636,11 +659,12 @@ class LdsState implements XdsStreamState<Listener__Output> {
// The validation rules should prevent this
}
} else {
return 'Listener validation failed';
trace('LRS validation error for message ' + JSON.stringify(message));
return 'LRS Error: Listener validation failed';
}
}
}
throw new Error('Method not implemented.');
return null;
}
reportStreamError(status: StatusObject): void {
@ -676,11 +700,11 @@ function getResponseMessages<T extends AdsTypeUrl>(
result.push(resource as protoLoader.AnyExtension & OutputType<T>);
} else {
throw new Error(
`Invalid resource type ${
`ADS Error: Invalid resource type ${
protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url
}`
}, expected ${typeUrl}`
);
}
}
@ -710,6 +734,9 @@ export class XdsClient {
private adsState: AdsState;
private adsBackoff: BackoffTimeout;
private lrsBackoff: BackoffTimeout;
constructor(
targetName: string,
serviceConfigWatcher: Watcher<ServiceConfig>,
@ -721,7 +748,7 @@ export class XdsClient {
const cdsState = new CdsState(edsState, () => {
this.updateNames(CDS_TYPE_URL);
});
const rdsState = new RdsState(serviceConfigWatcher, () => {
const rdsState = new RdsState(targetName, serviceConfigWatcher, () => {
this.updateNames(RDS_TYPE_URL);
});
const ldsState = new LdsState(targetName, rdsState);
@ -750,7 +777,16 @@ export class XdsClient {
for (const arg of channelArgsToRemove) {
delete channelArgs[arg];
}
channelArgs['grpc.keepalive_time_ms'] = 5000;
// 5 minutes
channelArgs['grpc.keepalive_time_ms'] = 5 * 60 * 1000;
this.adsBackoff = new BackoffTimeout(() => {
this.maybeStartAdsStream();
});
this.lrsBackoff = new BackoffTimeout(() => {
this.maybeStartLrsStream();
})
Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(
([bootstrapInfo, protoDefinitions]) => {
if (this.hasShutdown) {
@ -769,6 +805,7 @@ export class XdsClient {
...node,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
trace('Starting xDS client connected to server URI ' + bootstrapInfo.xdsServers[0].serverUri);
this.adsClient = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
bootstrapInfo.xdsServers[0].serverUri,
createGoogleDefaultCredentials(),
@ -779,7 +816,7 @@ export class XdsClient {
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService(
bootstrapInfo.xdsServers[0].serverUri,
createGoogleDefaultCredentials(),
channelArgs
{channelOverride: this.adsClient.getChannel()}
);
this.maybeStartLrsStream();
},
@ -827,6 +864,7 @@ export class XdsClient {
errorString = `Unknown type_url ${message.type_url}`;
}
if (errorString === null) {
trace('Acking message with type URL ' + message.type_url);
/* errorString can only be null in one of the first 4 cases, which
* implies that message.type_url is one of the 4 known type URLs, which
* means that this type assertion is valid. */
@ -835,6 +873,7 @@ export class XdsClient {
this.adsState[typeUrl].versionInfo = message.version_info;
this.ack(typeUrl);
} else {
trace('Nacking message with type URL ' + message.type_url + ': "' + errorString + '"');
this.nack(message.type_url, errorString);
}
}
@ -853,6 +892,9 @@ export class XdsClient {
if (this.hasShutdown) {
return;
}
trace('Starting ADS stream');
// Backoff relative to when we start the request
this.adsBackoff.runOnce();
this.adsCall = this.adsClient.StreamAggregatedResources();
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
this.handleAdsResponse(message);
@ -863,10 +905,11 @@ export class XdsClient {
);
this.adsCall = null;
this.reportStreamError(error);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
/* If the backoff timer is no longer running, we do not need to wait any
* more to start the new call. */
if (!this.adsBackoff.isRunning()) {
this.maybeStartAdsStream();
}
});
const allTypeUrls: AdsTypeUrl[] = [
@ -877,8 +920,6 @@ export class XdsClient {
];
for (const typeUrl of allTypeUrls) {
const state = this.adsState[typeUrl];
state.nonce = '';
state.versionInfo = '';
if (state.getResourceNames().length > 0) {
this.updateNames(typeUrl);
}
@ -890,6 +931,11 @@ export class XdsClient {
* version info are updated so that it sends the post-update values.
*/
ack(typeUrl: AdsTypeUrl) {
/* An ack is the best indication of a successful interaction between the
* client and the server, so we can reset the backoff timer here. */
this.adsBackoff.stop();
this.adsBackoff.reset();
this.updateNames(typeUrl);
}
@ -928,6 +974,7 @@ export class XdsClient {
}
private updateNames(typeUrl: AdsTypeUrl) {
trace('Sending update for type URL ' + typeUrl + ' with names ' + this.adsState[typeUrl].getResourceNames());
this.adsCall?.write({
node: this.adsNode!,
type_url: typeUrl,
@ -954,8 +1001,17 @@ export class XdsClient {
if (this.hasShutdown) {
return;
}
trace('Starting LRS stream');
this.lrsBackoff.runOnce();
this.lrsCall = this.lrsClient.streamLoadStats();
this.lrsCall.on('metadata', () => {
/* Once we get any response from the server, we assume that the stream is
* in a good state, so we can reset the backoff timer. */
this.lrsBackoff.stop();
this.lrsBackoff.reset();
});
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
if (
message.load_reporting_interval?.seconds !==
@ -971,7 +1027,8 @@ export class XdsClient {
const loadReportingIntervalMs =
Number.parseInt(message.load_reporting_interval!.seconds) * 1000 +
message.load_reporting_interval!.nanos / 1_000_000;
setInterval(() => {
trace('Received LRS request with load reporting interval ' + loadReportingIntervalMs + ' ms');
this.statsTimer = setInterval(() => {
this.sendStats();
}, loadReportingIntervalMs);
}
@ -982,21 +1039,24 @@ export class XdsClient {
'LRS stream ended. code=' + error.code + ' details= ' + error.details
);
this.lrsCall = null;
this.latestLrsSettings = null;
clearInterval(this.statsTimer);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
});
this.lrsCall.write({
node: this.lrsNode!,
/* If the backoff timer is no longer running, we do not need to wait any
* more to start the new call. */
if (!this.lrsBackoff.isRunning()) {
this.maybeStartLrsStream();
}
});
/* Send buffered stats information when starting LRS stream. If there is no
* buffered stats information, it will still send the node field. */
this.sendStats();
}
private sendStats() {
if (!this.lrsCall) {
return;
}
trace('Sending LRS stats');
const clusterStats: ClusterStats[] = [];
for (const [
{ clusterName, edsServiceName },

View File

@ -10,6 +10,7 @@
},
"include": [
"src/**/*.ts",
"test/**/*.ts"
"test/**/*.ts",
"interop/**/*.ts"
]
}

View File

@ -0,0 +1,24 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for Kokoro (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc-node/packages/grpc-js/scripts/xds.sh"
timeout_mins: 60
action {
define_artifacts {
regex: "github/grpc/reports/**"
}
}