diff --git a/packages/grpc-js/interop/generated/grpc/testing/BoolValue.ts b/packages/grpc-js/interop/generated/grpc/testing/BoolValue.ts new file mode 100644 index 00000000..a1e31ab3 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/BoolValue.ts @@ -0,0 +1,26 @@ +// Original file: proto/grpc/testing/messages.proto + + +/** + * TODO(dgq): Go back to using well-known types once + * https://github.com/grpc/grpc/issues/6980 has been fixed. + * import "google/protobuf/wrappers.proto"; + */ +export interface BoolValue { + /** + * The bool value. + */ + 'value'?: (boolean); +} + +/** + * TODO(dgq): Go back to using well-known types once + * https://github.com/grpc/grpc/issues/6980 has been fixed. + * import "google/protobuf/wrappers.proto"; + */ +export interface BoolValue__Output { + /** + * The bool value. + */ + 'value': (boolean); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/EchoStatus.ts b/packages/grpc-js/interop/generated/grpc/testing/EchoStatus.ts new file mode 100644 index 00000000..d5da7501 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/EchoStatus.ts @@ -0,0 +1,20 @@ +// Original file: proto/grpc/testing/messages.proto + + +/** + * A protobuf representation for grpc status. This is used by test + * clients to specify a status that the server should attempt to return. + */ +export interface EchoStatus { + 'code'?: (number); + 'message'?: (string); +} + +/** + * A protobuf representation for grpc status. This is used by test + * clients to specify a status that the server should attempt to return. + */ +export interface EchoStatus__Output { + 'code': (number); + 'message': (string); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/Empty.ts b/packages/grpc-js/interop/generated/grpc/testing/Empty.ts new file mode 100644 index 00000000..d79db52b --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/Empty.ts @@ -0,0 +1,26 @@ +// Original file: proto/grpc/testing/empty.proto + + +/** + * An empty message that you can re-use to avoid defining duplicated empty + * messages in your project. A typical example is to use it as argument or the + * return value of a service API. For instance: + * + * service Foo { + * rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { }; + * }; + */ +export interface Empty { +} + +/** + * An empty message that you can re-use to avoid defining duplicated empty + * messages in your project. A typical example is to use it as argument or the + * return value of a service API. For instance: + * + * service Foo { + * rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { }; + * }; + */ +export interface Empty__Output { +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/GrpclbRouteType.ts b/packages/grpc-js/interop/generated/grpc/testing/GrpclbRouteType.ts new file mode 100644 index 00000000..8ab0146b --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/GrpclbRouteType.ts @@ -0,0 +1,24 @@ +// Original file: proto/grpc/testing/messages.proto + +/** + * The type of route that a client took to reach a server w.r.t. gRPCLB. + * The server must fill in "fallback" if it detects that the RPC reached + * the server via the "gRPCLB fallback" path, and "backend" if it detects + * that the RPC reached the server via "gRPCLB backend" path (i.e. if it got + * the address of this server from the gRPCLB server BalanceLoad RPC). Exactly + * how this detection is done is context and server dependent. + */ +export enum GrpclbRouteType { + /** + * Server didn't detect the route that a client took to reach it. + */ + GRPCLB_ROUTE_TYPE_UNKNOWN = 0, + /** + * Indicates that a client reached a server via gRPCLB fallback. + */ + GRPCLB_ROUTE_TYPE_FALLBACK = 1, + /** + * Indicates that a client reached a server as a gRPCLB-given backend. + */ + GRPCLB_ROUTE_TYPE_BACKEND = 2, +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/LoadBalancerStatsRequest.ts b/packages/grpc-js/interop/generated/grpc/testing/LoadBalancerStatsRequest.ts new file mode 100644 index 00000000..189d871b --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/LoadBalancerStatsRequest.ts @@ -0,0 +1,24 @@ +// Original file: proto/grpc/testing/messages.proto + + +export interface LoadBalancerStatsRequest { + /** + * Request stats for the next num_rpcs sent by client. + */ + 'num_rpcs'?: (number); + /** + * If num_rpcs have not completed within timeout_sec, return partial results. + */ + 'timeout_sec'?: (number); +} + +export interface LoadBalancerStatsRequest__Output { + /** + * Request stats for the next num_rpcs sent by client. + */ + 'num_rpcs': (number); + /** + * If num_rpcs have not completed within timeout_sec, return partial results. + */ + 'timeout_sec': (number); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/LoadBalancerStatsResponse.ts b/packages/grpc-js/interop/generated/grpc/testing/LoadBalancerStatsResponse.ts new file mode 100644 index 00000000..184a6e25 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/LoadBalancerStatsResponse.ts @@ -0,0 +1,40 @@ +// Original file: proto/grpc/testing/messages.proto + + +export interface _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer { + /** + * The number of completed RPCs for each peer. + */ + 'rpcs_by_peer'?: ({[key: string]: number}); +} + +export interface _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer__Output { + /** + * The number of completed RPCs for each peer. + */ + 'rpcs_by_peer': ({[key: string]: number}); +} + +export interface LoadBalancerStatsResponse { + /** + * The number of completed RPCs for each peer. + */ + 'rpcs_by_peer'?: ({[key: string]: number}); + /** + * The number of RPCs that failed to record a remote peer. + */ + 'num_failures'?: (number); + 'rpcs_by_method'?: ({[key: string]: _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer}); +} + +export interface LoadBalancerStatsResponse__Output { + /** + * The number of completed RPCs for each peer. + */ + 'rpcs_by_peer': ({[key: string]: number}); + /** + * The number of RPCs that failed to record a remote peer. + */ + 'num_failures': (number); + 'rpcs_by_method'?: ({[key: string]: _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer__Output}); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/LoadBalancerStatsService.ts b/packages/grpc-js/interop/generated/grpc/testing/LoadBalancerStatsService.ts new file mode 100644 index 00000000..eece848b --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/LoadBalancerStatsService.ts @@ -0,0 +1,37 @@ +// Original file: proto/grpc/testing/test.proto + +import * as grpc from '../../../../src' +import { LoadBalancerStatsRequest as _grpc_testing_LoadBalancerStatsRequest, LoadBalancerStatsRequest__Output as _grpc_testing_LoadBalancerStatsRequest__Output } from '../../grpc/testing/LoadBalancerStatsRequest'; +import { LoadBalancerStatsResponse as _grpc_testing_LoadBalancerStatsResponse, LoadBalancerStatsResponse__Output as _grpc_testing_LoadBalancerStatsResponse__Output } from '../../grpc/testing/LoadBalancerStatsResponse'; + +/** + * A service used to obtain stats for verifying LB behavior. + */ +export interface LoadBalancerStatsServiceClient extends grpc.Client { + /** + * Gets the backend distribution for RPCs sent by a test client. + */ + GetClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall; + GetClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall; + GetClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall; + GetClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall; + /** + * Gets the backend distribution for RPCs sent by a test client. + */ + getClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall; + getClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall; + getClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall; + getClientStats(argument: _grpc_testing_LoadBalancerStatsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_LoadBalancerStatsResponse__Output) => void): grpc.ClientUnaryCall; + +} + +/** + * A service used to obtain stats for verifying LB behavior. + */ +export interface LoadBalancerStatsServiceHandlers extends grpc.UntypedServiceImplementation { + /** + * Gets the backend distribution for RPCs sent by a test client. + */ + GetClientStats(call: grpc.ServerUnaryCall<_grpc_testing_LoadBalancerStatsRequest__Output, _grpc_testing_LoadBalancerStatsResponse>, callback: grpc.sendUnaryData<_grpc_testing_LoadBalancerStatsResponse>): void; + +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/Payload.ts b/packages/grpc-js/interop/generated/grpc/testing/Payload.ts new file mode 100644 index 00000000..87fc0cf3 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/Payload.ts @@ -0,0 +1,31 @@ +// Original file: proto/grpc/testing/messages.proto + +import { PayloadType as _grpc_testing_PayloadType } from '../../grpc/testing/PayloadType'; + +/** + * A block of data, to simply increase gRPC message size. + */ +export interface Payload { + /** + * The type of data in body. + */ + 'type'?: (_grpc_testing_PayloadType | keyof typeof _grpc_testing_PayloadType); + /** + * Primary contents of payload. + */ + 'body'?: (Buffer | Uint8Array | string); +} + +/** + * A block of data, to simply increase gRPC message size. + */ +export interface Payload__Output { + /** + * The type of data in body. + */ + 'type': (keyof typeof _grpc_testing_PayloadType); + /** + * Primary contents of payload. + */ + 'body': (Buffer); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/PayloadType.ts b/packages/grpc-js/interop/generated/grpc/testing/PayloadType.ts new file mode 100644 index 00000000..3cf9d375 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/PayloadType.ts @@ -0,0 +1,11 @@ +// Original file: proto/grpc/testing/messages.proto + +/** + * The type of payload that should be returned. + */ +export enum PayloadType { + /** + * Compressable text format. + */ + COMPRESSABLE = 0, +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/ReconnectInfo.ts b/packages/grpc-js/interop/generated/grpc/testing/ReconnectInfo.ts new file mode 100644 index 00000000..616de9eb --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/ReconnectInfo.ts @@ -0,0 +1,22 @@ +// Original file: proto/grpc/testing/messages.proto + + +/** + * For reconnect interop test only. + * Server tells client whether its reconnects are following the spec and the + * reconnect backoffs it saw. + */ +export interface ReconnectInfo { + 'passed'?: (boolean); + 'backoff_ms'?: (number)[]; +} + +/** + * For reconnect interop test only. + * Server tells client whether its reconnects are following the spec and the + * reconnect backoffs it saw. + */ +export interface ReconnectInfo__Output { + 'passed': (boolean); + 'backoff_ms': (number)[]; +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/ReconnectParams.ts b/packages/grpc-js/interop/generated/grpc/testing/ReconnectParams.ts new file mode 100644 index 00000000..1337b568 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/ReconnectParams.ts @@ -0,0 +1,18 @@ +// Original file: proto/grpc/testing/messages.proto + + +/** + * For reconnect interop test only. + * Client tells server what reconnection parameters it used. + */ +export interface ReconnectParams { + 'max_reconnect_backoff_ms'?: (number); +} + +/** + * For reconnect interop test only. + * Client tells server what reconnection parameters it used. + */ +export interface ReconnectParams__Output { + 'max_reconnect_backoff_ms': (number); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/ReconnectService.ts b/packages/grpc-js/interop/generated/grpc/testing/ReconnectService.ts new file mode 100644 index 00000000..3829506b --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/ReconnectService.ts @@ -0,0 +1,40 @@ +// Original file: proto/grpc/testing/test.proto + +import * as grpc from '../../../../src' +import { Empty as _grpc_testing_Empty, Empty__Output as _grpc_testing_Empty__Output } from '../../grpc/testing/Empty'; +import { ReconnectInfo as _grpc_testing_ReconnectInfo, ReconnectInfo__Output as _grpc_testing_ReconnectInfo__Output } from '../../grpc/testing/ReconnectInfo'; +import { ReconnectParams as _grpc_testing_ReconnectParams, ReconnectParams__Output as _grpc_testing_ReconnectParams__Output } from '../../grpc/testing/ReconnectParams'; + +/** + * A service used to control reconnect server. + */ +export interface ReconnectServiceClient extends grpc.Client { + Start(argument: _grpc_testing_ReconnectParams, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + Start(argument: _grpc_testing_ReconnectParams, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + Start(argument: _grpc_testing_ReconnectParams, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + Start(argument: _grpc_testing_ReconnectParams, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + start(argument: _grpc_testing_ReconnectParams, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + start(argument: _grpc_testing_ReconnectParams, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + start(argument: _grpc_testing_ReconnectParams, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + start(argument: _grpc_testing_ReconnectParams, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + + Stop(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall; + Stop(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall; + Stop(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall; + Stop(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall; + stop(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall; + stop(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall; + stop(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall; + stop(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_ReconnectInfo__Output) => void): grpc.ClientUnaryCall; + +} + +/** + * A service used to control reconnect server. + */ +export interface ReconnectServiceHandlers extends grpc.UntypedServiceImplementation { + Start(call: grpc.ServerUnaryCall<_grpc_testing_ReconnectParams__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void; + + Stop(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_ReconnectInfo>, callback: grpc.sendUnaryData<_grpc_testing_ReconnectInfo>): void; + +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/ResponseParameters.ts b/packages/grpc-js/interop/generated/grpc/testing/ResponseParameters.ts new file mode 100644 index 00000000..9bd24ee3 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/ResponseParameters.ts @@ -0,0 +1,47 @@ +// Original file: proto/grpc/testing/messages.proto + +import { BoolValue as _grpc_testing_BoolValue, BoolValue__Output as _grpc_testing_BoolValue__Output } from '../../grpc/testing/BoolValue'; + +/** + * Configuration for a particular response. + */ +export interface ResponseParameters { + /** + * Desired payload sizes in responses from the server. + */ + 'size'?: (number); + /** + * Desired interval between consecutive responses in the response stream in + * microseconds. + */ + 'interval_us'?: (number); + /** + * Whether to request the server to compress the response. This field is + * "nullable" in order to interoperate seamlessly with clients not able to + * implement the full compression tests by introspecting the call to verify + * the response's compression status. + */ + 'compressed'?: (_grpc_testing_BoolValue); +} + +/** + * Configuration for a particular response. + */ +export interface ResponseParameters__Output { + /** + * Desired payload sizes in responses from the server. + */ + 'size': (number); + /** + * Desired interval between consecutive responses in the response stream in + * microseconds. + */ + 'interval_us': (number); + /** + * Whether to request the server to compress the response. This field is + * "nullable" in order to interoperate seamlessly with clients not able to + * implement the full compression tests by introspecting the call to verify + * the response's compression status. + */ + 'compressed'?: (_grpc_testing_BoolValue__Output); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/SimpleRequest.ts b/packages/grpc-js/interop/generated/grpc/testing/SimpleRequest.ts new file mode 100644 index 00000000..b03f6f6d --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/SimpleRequest.ts @@ -0,0 +1,106 @@ +// Original file: proto/grpc/testing/messages.proto + +import { PayloadType as _grpc_testing_PayloadType } from '../../grpc/testing/PayloadType'; +import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload'; +import { BoolValue as _grpc_testing_BoolValue, BoolValue__Output as _grpc_testing_BoolValue__Output } from '../../grpc/testing/BoolValue'; +import { EchoStatus as _grpc_testing_EchoStatus, EchoStatus__Output as _grpc_testing_EchoStatus__Output } from '../../grpc/testing/EchoStatus'; + +/** + * Unary request. + */ +export interface SimpleRequest { + /** + * Desired payload type in the response from the server. + * If response_type is RANDOM, server randomly chooses one from other formats. + */ + 'response_type'?: (_grpc_testing_PayloadType | keyof typeof _grpc_testing_PayloadType); + /** + * Desired payload size in the response from the server. + */ + 'response_size'?: (number); + /** + * Optional input payload sent along with the request. + */ + 'payload'?: (_grpc_testing_Payload); + /** + * Whether SimpleResponse should include username. + */ + 'fill_username'?: (boolean); + /** + * Whether SimpleResponse should include OAuth scope. + */ + 'fill_oauth_scope'?: (boolean); + /** + * Whether to request the server to compress the response. This field is + * "nullable" in order to interoperate seamlessly with clients not able to + * implement the full compression tests by introspecting the call to verify + * the response's compression status. + */ + 'response_compressed'?: (_grpc_testing_BoolValue); + /** + * Whether server should return a given status + */ + 'response_status'?: (_grpc_testing_EchoStatus); + /** + * Whether the server should expect this request to be compressed. + */ + 'expect_compressed'?: (_grpc_testing_BoolValue); + /** + * Whether SimpleResponse should include server_id. + */ + 'fill_server_id'?: (boolean); + /** + * Whether SimpleResponse should include grpclb_route_type. + */ + 'fill_grpclb_route_type'?: (boolean); +} + +/** + * Unary request. + */ +export interface SimpleRequest__Output { + /** + * Desired payload type in the response from the server. + * If response_type is RANDOM, server randomly chooses one from other formats. + */ + 'response_type': (keyof typeof _grpc_testing_PayloadType); + /** + * Desired payload size in the response from the server. + */ + 'response_size': (number); + /** + * Optional input payload sent along with the request. + */ + 'payload'?: (_grpc_testing_Payload__Output); + /** + * Whether SimpleResponse should include username. + */ + 'fill_username': (boolean); + /** + * Whether SimpleResponse should include OAuth scope. + */ + 'fill_oauth_scope': (boolean); + /** + * Whether to request the server to compress the response. This field is + * "nullable" in order to interoperate seamlessly with clients not able to + * implement the full compression tests by introspecting the call to verify + * the response's compression status. + */ + 'response_compressed'?: (_grpc_testing_BoolValue__Output); + /** + * Whether server should return a given status + */ + 'response_status'?: (_grpc_testing_EchoStatus__Output); + /** + * Whether the server should expect this request to be compressed. + */ + 'expect_compressed'?: (_grpc_testing_BoolValue__Output); + /** + * Whether SimpleResponse should include server_id. + */ + 'fill_server_id': (boolean); + /** + * Whether SimpleResponse should include grpclb_route_type. + */ + 'fill_grpclb_route_type': (boolean); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/SimpleResponse.ts b/packages/grpc-js/interop/generated/grpc/testing/SimpleResponse.ts new file mode 100644 index 00000000..7a96e7df --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/SimpleResponse.ts @@ -0,0 +1,68 @@ +// Original file: proto/grpc/testing/messages.proto + +import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload'; +import { GrpclbRouteType as _grpc_testing_GrpclbRouteType } from '../../grpc/testing/GrpclbRouteType'; + +/** + * Unary response, as configured by the request. + */ +export interface SimpleResponse { + /** + * Payload to increase message size. + */ + 'payload'?: (_grpc_testing_Payload); + /** + * The user the request came from, for verifying authentication was + * successful when the client expected it. + */ + 'username'?: (string); + /** + * OAuth scope. + */ + 'oauth_scope'?: (string); + /** + * Server ID. This must be unique among different server instances, + * but the same across all RPC's made to a particular server instance. + */ + 'server_id'?: (string); + /** + * gRPCLB Path. + */ + 'grpclb_route_type'?: (_grpc_testing_GrpclbRouteType | keyof typeof _grpc_testing_GrpclbRouteType); + /** + * Server hostname. + */ + 'hostname'?: (string); +} + +/** + * Unary response, as configured by the request. + */ +export interface SimpleResponse__Output { + /** + * Payload to increase message size. + */ + 'payload'?: (_grpc_testing_Payload__Output); + /** + * The user the request came from, for verifying authentication was + * successful when the client expected it. + */ + 'username': (string); + /** + * OAuth scope. + */ + 'oauth_scope': (string); + /** + * Server ID. This must be unique among different server instances, + * but the same across all RPC's made to a particular server instance. + */ + 'server_id': (string); + /** + * gRPCLB Path. + */ + 'grpclb_route_type': (keyof typeof _grpc_testing_GrpclbRouteType); + /** + * Server hostname. + */ + 'hostname': (string); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/StreamingInputCallRequest.ts b/packages/grpc-js/interop/generated/grpc/testing/StreamingInputCallRequest.ts new file mode 100644 index 00000000..db9d8d40 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/StreamingInputCallRequest.ts @@ -0,0 +1,38 @@ +// Original file: proto/grpc/testing/messages.proto + +import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload'; +import { BoolValue as _grpc_testing_BoolValue, BoolValue__Output as _grpc_testing_BoolValue__Output } from '../../grpc/testing/BoolValue'; + +/** + * Client-streaming request. + */ +export interface StreamingInputCallRequest { + /** + * Optional input payload sent along with the request. + */ + 'payload'?: (_grpc_testing_Payload); + /** + * Whether the server should expect this request to be compressed. This field + * is "nullable" in order to interoperate seamlessly with servers not able to + * implement the full compression tests by introspecting the call to verify + * the request's compression status. + */ + 'expect_compressed'?: (_grpc_testing_BoolValue); +} + +/** + * Client-streaming request. + */ +export interface StreamingInputCallRequest__Output { + /** + * Optional input payload sent along with the request. + */ + 'payload'?: (_grpc_testing_Payload__Output); + /** + * Whether the server should expect this request to be compressed. This field + * is "nullable" in order to interoperate seamlessly with servers not able to + * implement the full compression tests by introspecting the call to verify + * the request's compression status. + */ + 'expect_compressed'?: (_grpc_testing_BoolValue__Output); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/StreamingInputCallResponse.ts b/packages/grpc-js/interop/generated/grpc/testing/StreamingInputCallResponse.ts new file mode 100644 index 00000000..1703e755 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/StreamingInputCallResponse.ts @@ -0,0 +1,22 @@ +// Original file: proto/grpc/testing/messages.proto + + +/** + * Client-streaming response. + */ +export interface StreamingInputCallResponse { + /** + * Aggregated size of payloads received from the client. + */ + 'aggregated_payload_size'?: (number); +} + +/** + * Client-streaming response. + */ +export interface StreamingInputCallResponse__Output { + /** + * Aggregated size of payloads received from the client. + */ + 'aggregated_payload_size': (number); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/StreamingOutputCallRequest.ts b/packages/grpc-js/interop/generated/grpc/testing/StreamingOutputCallRequest.ts new file mode 100644 index 00000000..0d7bff2f --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/StreamingOutputCallRequest.ts @@ -0,0 +1,56 @@ +// Original file: proto/grpc/testing/messages.proto + +import { PayloadType as _grpc_testing_PayloadType } from '../../grpc/testing/PayloadType'; +import { ResponseParameters as _grpc_testing_ResponseParameters, ResponseParameters__Output as _grpc_testing_ResponseParameters__Output } from '../../grpc/testing/ResponseParameters'; +import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload'; +import { EchoStatus as _grpc_testing_EchoStatus, EchoStatus__Output as _grpc_testing_EchoStatus__Output } from '../../grpc/testing/EchoStatus'; + +/** + * Server-streaming request. + */ +export interface StreamingOutputCallRequest { + /** + * Desired payload type in the response from the server. + * If response_type is RANDOM, the payload from each response in the stream + * might be of different types. This is to simulate a mixed type of payload + * stream. + */ + 'response_type'?: (_grpc_testing_PayloadType | keyof typeof _grpc_testing_PayloadType); + /** + * Configuration for each expected response message. + */ + 'response_parameters'?: (_grpc_testing_ResponseParameters)[]; + /** + * Optional input payload sent along with the request. + */ + 'payload'?: (_grpc_testing_Payload); + /** + * Whether server should return a given status + */ + 'response_status'?: (_grpc_testing_EchoStatus); +} + +/** + * Server-streaming request. + */ +export interface StreamingOutputCallRequest__Output { + /** + * Desired payload type in the response from the server. + * If response_type is RANDOM, the payload from each response in the stream + * might be of different types. This is to simulate a mixed type of payload + * stream. + */ + 'response_type': (keyof typeof _grpc_testing_PayloadType); + /** + * Configuration for each expected response message. + */ + 'response_parameters': (_grpc_testing_ResponseParameters__Output)[]; + /** + * Optional input payload sent along with the request. + */ + 'payload'?: (_grpc_testing_Payload__Output); + /** + * Whether server should return a given status + */ + 'response_status'?: (_grpc_testing_EchoStatus__Output); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/StreamingOutputCallResponse.ts b/packages/grpc-js/interop/generated/grpc/testing/StreamingOutputCallResponse.ts new file mode 100644 index 00000000..9b8f49e3 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/StreamingOutputCallResponse.ts @@ -0,0 +1,23 @@ +// Original file: proto/grpc/testing/messages.proto + +import { Payload as _grpc_testing_Payload, Payload__Output as _grpc_testing_Payload__Output } from '../../grpc/testing/Payload'; + +/** + * Server-streaming response, as configured by the request and parameters. + */ +export interface StreamingOutputCallResponse { + /** + * Payload to increase response size. + */ + 'payload'?: (_grpc_testing_Payload); +} + +/** + * Server-streaming response, as configured by the request and parameters. + */ +export interface StreamingOutputCallResponse__Output { + /** + * Payload to increase response size. + */ + 'payload'?: (_grpc_testing_Payload__Output); +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/TestService.ts b/packages/grpc-js/interop/generated/grpc/testing/TestService.ts new file mode 100644 index 00000000..b95b7a97 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/TestService.ts @@ -0,0 +1,202 @@ +// Original file: proto/grpc/testing/test.proto + +import * as grpc from '../../../../src' +import { Empty as _grpc_testing_Empty, Empty__Output as _grpc_testing_Empty__Output } from '../../grpc/testing/Empty'; +import { SimpleRequest as _grpc_testing_SimpleRequest, SimpleRequest__Output as _grpc_testing_SimpleRequest__Output } from '../../grpc/testing/SimpleRequest'; +import { SimpleResponse as _grpc_testing_SimpleResponse, SimpleResponse__Output as _grpc_testing_SimpleResponse__Output } from '../../grpc/testing/SimpleResponse'; +import { StreamingInputCallRequest as _grpc_testing_StreamingInputCallRequest, StreamingInputCallRequest__Output as _grpc_testing_StreamingInputCallRequest__Output } from '../../grpc/testing/StreamingInputCallRequest'; +import { StreamingInputCallResponse as _grpc_testing_StreamingInputCallResponse, StreamingInputCallResponse__Output as _grpc_testing_StreamingInputCallResponse__Output } from '../../grpc/testing/StreamingInputCallResponse'; +import { StreamingOutputCallRequest as _grpc_testing_StreamingOutputCallRequest, StreamingOutputCallRequest__Output as _grpc_testing_StreamingOutputCallRequest__Output } from '../../grpc/testing/StreamingOutputCallRequest'; +import { StreamingOutputCallResponse as _grpc_testing_StreamingOutputCallResponse, StreamingOutputCallResponse__Output as _grpc_testing_StreamingOutputCallResponse__Output } from '../../grpc/testing/StreamingOutputCallResponse'; + +/** + * A simple service to test the various types of RPCs and experiment with + * performance with various types of payload. + */ +export interface TestServiceClient extends grpc.Client { + /** + * One request followed by one response. Response has cache control + * headers set such that a caching HTTP proxy (such as GFE) can + * satisfy subsequent requests. + */ + CacheableUnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + CacheableUnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + CacheableUnaryCall(argument: _grpc_testing_SimpleRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + CacheableUnaryCall(argument: _grpc_testing_SimpleRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + /** + * One request followed by one response. Response has cache control + * headers set such that a caching HTTP proxy (such as GFE) can + * satisfy subsequent requests. + */ + cacheableUnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + cacheableUnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + cacheableUnaryCall(argument: _grpc_testing_SimpleRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + cacheableUnaryCall(argument: _grpc_testing_SimpleRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + + /** + * One empty request followed by one empty response. + */ + EmptyCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + EmptyCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + EmptyCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + EmptyCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + /** + * One empty request followed by one empty response. + */ + emptyCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + emptyCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + emptyCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + emptyCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + + /** + * A sequence of requests with each request served by the server immediately. + * As one request could lead to multiple responses, this interface + * demonstrates the idea of full duplexing. + */ + FullDuplexCall(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>; + FullDuplexCall(options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>; + /** + * A sequence of requests with each request served by the server immediately. + * As one request could lead to multiple responses, this interface + * demonstrates the idea of full duplexing. + */ + fullDuplexCall(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>; + fullDuplexCall(options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>; + + /** + * A sequence of requests followed by a sequence of responses. + * The server buffers all the client requests and then serves them in order. A + * stream of responses are returned to the client when the server starts with + * first request. + */ + HalfDuplexCall(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>; + HalfDuplexCall(options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>; + /** + * A sequence of requests followed by a sequence of responses. + * The server buffers all the client requests and then serves them in order. A + * stream of responses are returned to the client when the server starts with + * first request. + */ + halfDuplexCall(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>; + halfDuplexCall(options?: grpc.CallOptions): grpc.ClientDuplexStream<_grpc_testing_StreamingOutputCallRequest, _grpc_testing_StreamingOutputCallResponse__Output>; + + /** + * A sequence of requests followed by one response (streamed upload). + * The server returns the aggregated size of client payload as the result. + */ + StreamingInputCall(metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>; + StreamingInputCall(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>; + StreamingInputCall(options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>; + StreamingInputCall(callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>; + /** + * A sequence of requests followed by one response (streamed upload). + * The server returns the aggregated size of client payload as the result. + */ + streamingInputCall(metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>; + streamingInputCall(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>; + streamingInputCall(options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>; + streamingInputCall(callback: (error?: grpc.ServiceError, result?: _grpc_testing_StreamingInputCallResponse__Output) => void): grpc.ClientWritableStream<_grpc_testing_StreamingInputCallResponse__Output>; + + /** + * One request followed by a sequence of responses (streamed download). + * The server returns the payload with client desired type and sizes. + */ + StreamingOutputCall(argument: _grpc_testing_StreamingOutputCallRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_grpc_testing_StreamingOutputCallResponse__Output>; + StreamingOutputCall(argument: _grpc_testing_StreamingOutputCallRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_grpc_testing_StreamingOutputCallResponse__Output>; + /** + * One request followed by a sequence of responses (streamed download). + * The server returns the payload with client desired type and sizes. + */ + streamingOutputCall(argument: _grpc_testing_StreamingOutputCallRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_grpc_testing_StreamingOutputCallResponse__Output>; + streamingOutputCall(argument: _grpc_testing_StreamingOutputCallRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_grpc_testing_StreamingOutputCallResponse__Output>; + + /** + * One request followed by one response. + */ + UnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + UnaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + UnaryCall(argument: _grpc_testing_SimpleRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + UnaryCall(argument: _grpc_testing_SimpleRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + /** + * One request followed by one response. + */ + unaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + unaryCall(argument: _grpc_testing_SimpleRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + unaryCall(argument: _grpc_testing_SimpleRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + unaryCall(argument: _grpc_testing_SimpleRequest, callback: (error?: grpc.ServiceError, result?: _grpc_testing_SimpleResponse__Output) => void): grpc.ClientUnaryCall; + + /** + * The test server will not implement this method. It will be used + * to test the behavior when clients call unimplemented methods. + */ + UnimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + UnimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + UnimplementedCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + UnimplementedCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + /** + * The test server will not implement this method. It will be used + * to test the behavior when clients call unimplemented methods. + */ + unimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + unimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + unimplementedCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + unimplementedCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + +} + +/** + * A simple service to test the various types of RPCs and experiment with + * performance with various types of payload. + */ +export interface TestServiceHandlers extends grpc.UntypedServiceImplementation { + /** + * One request followed by one response. Response has cache control + * headers set such that a caching HTTP proxy (such as GFE) can + * satisfy subsequent requests. + */ + CacheableUnaryCall(call: grpc.ServerUnaryCall<_grpc_testing_SimpleRequest__Output, _grpc_testing_SimpleResponse>, callback: grpc.sendUnaryData<_grpc_testing_SimpleResponse>): void; + + /** + * One empty request followed by one empty response. + */ + EmptyCall(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void; + + /** + * A sequence of requests with each request served by the server immediately. + * As one request could lead to multiple responses, this interface + * demonstrates the idea of full duplexing. + */ + FullDuplexCall(call: grpc.ServerDuplexStream<_grpc_testing_StreamingOutputCallRequest__Output, _grpc_testing_StreamingOutputCallResponse>): void; + + /** + * A sequence of requests followed by a sequence of responses. + * The server buffers all the client requests and then serves them in order. A + * stream of responses are returned to the client when the server starts with + * first request. + */ + HalfDuplexCall(call: grpc.ServerDuplexStream<_grpc_testing_StreamingOutputCallRequest__Output, _grpc_testing_StreamingOutputCallResponse>): void; + + /** + * A sequence of requests followed by one response (streamed upload). + * The server returns the aggregated size of client payload as the result. + */ + StreamingInputCall(call: grpc.ServerReadableStream<_grpc_testing_StreamingInputCallRequest__Output, _grpc_testing_StreamingInputCallResponse>, callback: grpc.sendUnaryData<_grpc_testing_StreamingInputCallResponse>): void; + + /** + * One request followed by a sequence of responses (streamed download). + * The server returns the payload with client desired type and sizes. + */ + StreamingOutputCall(call: grpc.ServerWritableStream<_grpc_testing_StreamingOutputCallRequest__Output, _grpc_testing_StreamingOutputCallResponse>): void; + + /** + * One request followed by one response. + */ + UnaryCall(call: grpc.ServerUnaryCall<_grpc_testing_SimpleRequest__Output, _grpc_testing_SimpleResponse>, callback: grpc.sendUnaryData<_grpc_testing_SimpleResponse>): void; + + /** + * The test server will not implement this method. It will be used + * to test the behavior when clients call unimplemented methods. + */ + UnimplementedCall(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void; + +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/UnimplementedService.ts b/packages/grpc-js/interop/generated/grpc/testing/UnimplementedService.ts new file mode 100644 index 00000000..afbf9117 --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/UnimplementedService.ts @@ -0,0 +1,38 @@ +// Original file: proto/grpc/testing/test.proto + +import * as grpc from '../../../../src' +import { Empty as _grpc_testing_Empty, Empty__Output as _grpc_testing_Empty__Output } from '../../grpc/testing/Empty'; + +/** + * A simple service NOT implemented at servers so clients can test for + * that case. + */ +export interface UnimplementedServiceClient extends grpc.Client { + /** + * A call that no server should implement + */ + UnimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + UnimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + UnimplementedCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + UnimplementedCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + /** + * A call that no server should implement + */ + unimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + unimplementedCall(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + unimplementedCall(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + unimplementedCall(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + +} + +/** + * A simple service NOT implemented at servers so clients can test for + * that case. + */ +export interface UnimplementedServiceHandlers extends grpc.UntypedServiceImplementation { + /** + * A call that no server should implement + */ + UnimplementedCall(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void; + +} diff --git a/packages/grpc-js/interop/generated/grpc/testing/XdsUpdateHealthService.ts b/packages/grpc-js/interop/generated/grpc/testing/XdsUpdateHealthService.ts new file mode 100644 index 00000000..f27a461e --- /dev/null +++ b/packages/grpc-js/interop/generated/grpc/testing/XdsUpdateHealthService.ts @@ -0,0 +1,38 @@ +// Original file: proto/grpc/testing/test.proto + +import * as grpc from '../../../../src' +import { Empty as _grpc_testing_Empty, Empty__Output as _grpc_testing_Empty__Output } from '../../grpc/testing/Empty'; + +/** + * A service to remotely control health status of an xDS test server. + */ +export interface XdsUpdateHealthServiceClient extends grpc.Client { + SetNotServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + SetNotServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + SetNotServing(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + SetNotServing(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + setNotServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + setNotServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + setNotServing(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + setNotServing(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + + SetServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + SetServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + SetServing(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + SetServing(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + setServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + setServing(argument: _grpc_testing_Empty, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + setServing(argument: _grpc_testing_Empty, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + setServing(argument: _grpc_testing_Empty, callback: (error?: grpc.ServiceError, result?: _grpc_testing_Empty__Output) => void): grpc.ClientUnaryCall; + +} + +/** + * A service to remotely control health status of an xDS test server. + */ +export interface XdsUpdateHealthServiceHandlers extends grpc.UntypedServiceImplementation { + SetNotServing(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void; + + SetServing(call: grpc.ServerUnaryCall<_grpc_testing_Empty__Output, _grpc_testing_Empty>, callback: grpc.sendUnaryData<_grpc_testing_Empty>): void; + +} diff --git a/packages/grpc-js/interop/generated/test.ts b/packages/grpc-js/interop/generated/test.ts new file mode 100644 index 00000000..a5c95d95 --- /dev/null +++ b/packages/grpc-js/interop/generated/test.ts @@ -0,0 +1,60 @@ +import * as grpc from '../../src'; +import { ServiceDefinition, EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader'; + +import { LoadBalancerStatsServiceClient as _grpc_testing_LoadBalancerStatsServiceClient } from './grpc/testing/LoadBalancerStatsService'; +import { ReconnectServiceClient as _grpc_testing_ReconnectServiceClient } from './grpc/testing/ReconnectService'; +import { TestServiceClient as _grpc_testing_TestServiceClient } from './grpc/testing/TestService'; +import { UnimplementedServiceClient as _grpc_testing_UnimplementedServiceClient } from './grpc/testing/UnimplementedService'; +import { XdsUpdateHealthServiceClient as _grpc_testing_XdsUpdateHealthServiceClient } from './grpc/testing/XdsUpdateHealthService'; + +type ConstructorArguments = Constructor extends new (...args: infer Args) => any ? Args: never; +type SubtypeConstructor = { + new(...args: ConstructorArguments): Subtype; +}; + +export interface ProtoGrpcType { + grpc: { + testing: { + BoolValue: MessageTypeDefinition + EchoStatus: MessageTypeDefinition + Empty: MessageTypeDefinition + GrpclbRouteType: EnumTypeDefinition + LoadBalancerStatsRequest: MessageTypeDefinition + LoadBalancerStatsResponse: MessageTypeDefinition + /** + * A service used to obtain stats for verifying LB behavior. + */ + LoadBalancerStatsService: SubtypeConstructor & { service: ServiceDefinition } + Payload: MessageTypeDefinition + PayloadType: EnumTypeDefinition + ReconnectInfo: MessageTypeDefinition + ReconnectParams: MessageTypeDefinition + /** + * A service used to control reconnect server. + */ + ReconnectService: SubtypeConstructor & { service: ServiceDefinition } + ResponseParameters: MessageTypeDefinition + SimpleRequest: MessageTypeDefinition + SimpleResponse: MessageTypeDefinition + StreamingInputCallRequest: MessageTypeDefinition + StreamingInputCallResponse: MessageTypeDefinition + StreamingOutputCallRequest: MessageTypeDefinition + StreamingOutputCallResponse: MessageTypeDefinition + /** + * A simple service to test the various types of RPCs and experiment with + * performance with various types of payload. + */ + TestService: SubtypeConstructor & { service: ServiceDefinition } + /** + * A simple service NOT implemented at servers so clients can test for + * that case. + */ + UnimplementedService: SubtypeConstructor & { service: ServiceDefinition } + /** + * A service to remotely control health status of an xDS test server. + */ + XdsUpdateHealthService: SubtypeConstructor & { service: ServiceDefinition } + } + } +} + diff --git a/packages/grpc-js/interop/xds-interop-client.ts b/packages/grpc-js/interop/xds-interop-client.ts new file mode 100644 index 00000000..3009541c --- /dev/null +++ b/packages/grpc-js/interop/xds-interop-client.ts @@ -0,0 +1,249 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as grpc from '../src'; + +import { ProtoGrpcType } from './generated/test'; + +import * as protoLoader from '@grpc/proto-loader'; +import { TestServiceClient } from './generated/grpc/testing/TestService'; +import { LoadBalancerStatsResponse } from './generated/grpc/testing/LoadBalancerStatsResponse'; +import * as yargs from 'yargs'; +import { LoadBalancerStatsServiceHandlers } from './generated/grpc/testing/LoadBalancerStatsService'; + +const packageDefinition = protoLoader.loadSync('grpc/testing/test.proto', { + keepCase: true, + defaults: true, + oneofs: true, + json: true, + longs: String, + enums: String, + includeDirs: [__dirname + '/../../proto'] +}); + +const loadedProto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType; + +const REQUEST_TIMEOUT_SEC = 20; + +const VERBOSITY = Number.parseInt(process.env.NODE_XDS_INTEROP_VERBOSITY ?? '0'); + +interface CallEndNotifier { + onCallSucceeded(peerName: string): void; + onCallFailed(message: string): void; +} + +class CallSubscriber { + private callsStarted = 0; + private callsSucceededByPeer: {[key: string]: number} = {}; + private callsSucceeded = 0; + private callsFinished = 0; + private failureMessageCount: Map = new Map(); + + constructor(private callGoal: number, private onFinished: () => void) {} + + addCallStarted(): void { + if (VERBOSITY >= 2) { + console.log('Call started'); + } + this.callsStarted += 1; + } + + private maybeOnFinished() { + if (this.callsFinished == this.callGoal) { + this.onFinished(); + } + } + + addCallSucceeded(peerName: string): void { + if (VERBOSITY >= 2) { + console.log(`Call to ${peerName} succeeded`); + } + if (peerName in this.callsSucceededByPeer) { + this.callsSucceededByPeer[peerName] += 1; + } else { + this.callsSucceededByPeer[peerName] = 1; + } + this.callsSucceeded += 1; + this.callsFinished += 1; + this.maybeOnFinished(); + } + addCallFailed(message: string): void { + if (VERBOSITY >= 2) { + console.log(`Call failed with message ${message}`); + } + this.callsFinished += 1; + this.failureMessageCount.set(message, (this.failureMessageCount.get(message) ?? 0) + 1); + this.maybeOnFinished(); + } + + needsMoreCalls(): boolean { + return this.callsStarted < this.callGoal; + } + + getFinalStats(): LoadBalancerStatsResponse { + if (VERBOSITY >= 1) { + console.log(`Out of a total of ${this.callGoal} calls requested, ${this.callsFinished} finished. ${this.callsSucceeded} succeeded`); + for (const [message, count] of this.failureMessageCount) { + console.log(`${count} failed with the message ${message}`); + } + } + return { + rpcs_by_peer: this.callsSucceededByPeer, + num_failures: this.callsStarted - this.callsSucceeded + }; + } +} + +class CallStatsTracker { + + private subscribers: CallSubscriber[] = []; + + getCallStats(callCount: number, timeoutSec: number): Promise { + return new Promise((resolve, reject) => { + let finished = false; + const subscriber = new CallSubscriber(callCount, () => { + if (!finished) { + finished = true; + resolve(subscriber.getFinalStats()); + } + }); + setTimeout(() => { + if (!finished) { + finished = true; + this.subscribers.splice(this.subscribers.indexOf(subscriber), 1); + resolve(subscriber.getFinalStats()); + } + }, timeoutSec * 1000) + this.subscribers.push(subscriber); + }) + } + + startCall(): CallEndNotifier { + const callSubscribers = this.subscribers.slice(); + for (const subscriber of callSubscribers) { + subscriber.addCallStarted(); + if (!subscriber.needsMoreCalls()) { + this.subscribers.splice(this.subscribers.indexOf(subscriber), 1); + } + } + return { + onCallSucceeded: (peerName: string) => { + for (const subscriber of callSubscribers) { + subscriber.addCallSucceeded(peerName); + } + }, + onCallFailed: (message: string) => { + for (const subscriber of callSubscribers) { + subscriber.addCallFailed(message); + } + } + } + } +} + +function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) { + let anyCallSucceeded: boolean = false; + setInterval(() => { + const notifier = callStatsTracker.startCall(); + let gotMetadata: boolean = false; + let hostname: string | null = null; + let completed: boolean = false; + let completedWithError: boolean = false; + const deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + REQUEST_TIMEOUT_SEC); + const call = client.emptyCall({}, {deadline}, (error, value) => { + if (error) { + if (failOnFailedRpcs && anyCallSucceeded) { + console.error('A call failed after a call succeeded'); + process.exit(1); + } + completed = true; + completedWithError = true; + notifier.onCallFailed(error.message); + } else { + anyCallSucceeded = true; + if (gotMetadata) { + if (hostname === null) { + notifier.onCallFailed('Hostname omitted from call metadata'); + } else { + notifier.onCallSucceeded(hostname); + } + } + } + }); + call.on('metadata', (metadata) => { + hostname = (metadata.get('hostname') as string[])[0] ?? null; + gotMetadata = true; + if (completed && !completedWithError) { + if (hostname === null) { + notifier.onCallFailed('Hostname omitted from call metadata'); + } else { + notifier.onCallSucceeded(hostname); + } + } + }) + }, 1000/qps); +} + + + +function main() { + const argv = yargs + .string(['fail_on_failed_rpcs', 'server', 'stats_port']) + .number(['num_channels', 'qps']) + .require(['qps', 'server', 'stats_port']) + .default('num_channels', 1) + .argv; + console.log('Starting xDS interop client. Args: ', argv); + const callStatsTracker = new CallStatsTracker(); + for (let i = 0; i < argv.num_channels; i++) { + /* The 'unique' channel argument is there solely to ensure that the + * channels do not share any subchannels. It does not have any + * inherent function. */ + console.log(`Interop client channel ${i} starting sending ${argv.qps} QPS to ${argv.server}`); + sendConstantQps(new loadedProto.grpc.testing.TestService(argv.server, grpc.credentials.createInsecure(), {'unique': i}), + argv.qps, + argv.fail_on_failed_rpcs === 'true', + callStatsTracker); + } + + const loadBalancerStatsServiceImpl: LoadBalancerStatsServiceHandlers = { + GetClientStats: (call, callback) => { + console.log(`Received stats request with num_rpcs=${call.request.num_rpcs} and timeout_sec=${call.request.num_rpcs}`); + callStatsTracker.getCallStats(call.request.num_rpcs, call.request.timeout_sec).then((value) => { + console.log(`Sending stats response: ${JSON.stringify(value)}`); + callback(null, value); + }, (error) => { + callback({code: grpc.status.ABORTED, details: 'Call stats collection failed'}); + }); + } + } + + const server = new grpc.Server(); + server.addService(loadedProto.grpc.testing.LoadBalancerStatsService.service, loadBalancerStatsServiceImpl); + server.bindAsync(`0.0.0.0:${argv.stats_port}`, grpc.ServerCredentials.createInsecure(), (error, port) => { + if (error) { + throw error; + } + console.log(`Starting stats service server bound to port ${port}`); + server.start(); + }); +} + +if (require.main === module) { + main(); +} \ No newline at end of file diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index a8428b24..a5217815 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -22,6 +22,7 @@ "@types/ncp": "^2.0.1", "@types/pify": "^3.0.2", "@types/semver": "^6.0.1", + "@types/yargs": "^15.0.5", "clang-format": "^1.0.55", "execa": "^2.0.3", "gts": "^2.0.0", @@ -33,7 +34,8 @@ "pify": "^4.0.1", "rimraf": "^3.0.2", "ts-node": "^8.3.0", - "typescript": "^3.7.2" + "typescript": "^3.7.2", + "yargs": "^15.4.1" }, "contributors": [ { @@ -46,6 +48,7 @@ "compile": "tsc -p .", "format": "clang-format -i -style=\"{Language: JavaScript, BasedOnStyle: Google, ColumnLimit: 80}\" src/*.ts test/*.ts", "generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib ../index envoy/service/discovery/v2/ads.proto envoy/service/load_stats/v2/lrs.proto envoy/api/v2/listener.proto envoy/api/v2/route.proto envoy/api/v2/cluster.proto envoy/api/v2/endpoint.proto envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto", + "generate-interop-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O interop/generated --grpcLib ../../src grpc/testing/test.proto", "lint": "npm run check", "prepare": "npm run compile", "test": "gulp test", @@ -57,7 +60,7 @@ "dependencies": { "@grpc/proto-loader": "^0.6.0-pre14", "@types/node": "^12.12.47", - "google-auth-library": "^6.0.0", + "google-auth-library": "^5.10.1", "semver": "^6.2.0" }, "files": [ diff --git a/packages/grpc-js/proto/grpc/testing/empty.proto b/packages/grpc-js/proto/grpc/testing/empty.proto new file mode 100644 index 00000000..6a0aa88d --- /dev/null +++ b/packages/grpc-js/proto/grpc/testing/empty.proto @@ -0,0 +1,28 @@ + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.testing; + +// An empty message that you can re-use to avoid defining duplicated empty +// messages in your project. A typical example is to use it as argument or the +// return value of a service API. For instance: +// +// service Foo { +// rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { }; +// }; +// +message Empty {} diff --git a/packages/grpc-js/proto/grpc/testing/messages.proto b/packages/grpc-js/proto/grpc/testing/messages.proto new file mode 100644 index 00000000..70e34277 --- /dev/null +++ b/packages/grpc-js/proto/grpc/testing/messages.proto @@ -0,0 +1,214 @@ + +// Copyright 2015-2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Message definitions to be used by integration test service definitions. + +syntax = "proto3"; + +package grpc.testing; + +// TODO(dgq): Go back to using well-known types once +// https://github.com/grpc/grpc/issues/6980 has been fixed. +// import "google/protobuf/wrappers.proto"; +message BoolValue { + // The bool value. + bool value = 1; +} + +// The type of payload that should be returned. +enum PayloadType { + // Compressable text format. + COMPRESSABLE = 0; +} + +// A block of data, to simply increase gRPC message size. +message Payload { + // The type of data in body. + PayloadType type = 1; + // Primary contents of payload. + bytes body = 2; +} + +// A protobuf representation for grpc status. This is used by test +// clients to specify a status that the server should attempt to return. +message EchoStatus { + int32 code = 1; + string message = 2; +} + +// The type of route that a client took to reach a server w.r.t. gRPCLB. +// The server must fill in "fallback" if it detects that the RPC reached +// the server via the "gRPCLB fallback" path, and "backend" if it detects +// that the RPC reached the server via "gRPCLB backend" path (i.e. if it got +// the address of this server from the gRPCLB server BalanceLoad RPC). Exactly +// how this detection is done is context and server dependent. +enum GrpclbRouteType { + // Server didn't detect the route that a client took to reach it. + GRPCLB_ROUTE_TYPE_UNKNOWN = 0; + // Indicates that a client reached a server via gRPCLB fallback. + GRPCLB_ROUTE_TYPE_FALLBACK = 1; + // Indicates that a client reached a server as a gRPCLB-given backend. + GRPCLB_ROUTE_TYPE_BACKEND = 2; +} + +// Unary request. +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + PayloadType response_type = 1; + + // Desired payload size in the response from the server. + int32 response_size = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Whether SimpleResponse should include username. + bool fill_username = 4; + + // Whether SimpleResponse should include OAuth scope. + bool fill_oauth_scope = 5; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue response_compressed = 6; + + // Whether server should return a given status + EchoStatus response_status = 7; + + // Whether the server should expect this request to be compressed. + BoolValue expect_compressed = 8; + + // Whether SimpleResponse should include server_id. + bool fill_server_id = 9; + + // Whether SimpleResponse should include grpclb_route_type. + bool fill_grpclb_route_type = 10; +} + +// Unary response, as configured by the request. +message SimpleResponse { + // Payload to increase message size. + Payload payload = 1; + // The user the request came from, for verifying authentication was + // successful when the client expected it. + string username = 2; + // OAuth scope. + string oauth_scope = 3; + + // Server ID. This must be unique among different server instances, + // but the same across all RPC's made to a particular server instance. + string server_id = 4; + // gRPCLB Path. + GrpclbRouteType grpclb_route_type = 5; + + // Server hostname. + string hostname = 6; +} + +// Client-streaming request. +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + Payload payload = 1; + + // Whether the server should expect this request to be compressed. This field + // is "nullable" in order to interoperate seamlessly with servers not able to + // implement the full compression tests by introspecting the call to verify + // the request's compression status. + BoolValue expect_compressed = 2; + + // Not expecting any payload from the response. +} + +// Client-streaming response. +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + int32 aggregated_payload_size = 1; +} + +// Configuration for a particular response. +message ResponseParameters { + // Desired payload sizes in responses from the server. + int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + int32 interval_us = 2; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue compressed = 3; +} + +// Server-streaming request. +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + PayloadType response_type = 1; + + // Configuration for each expected response message. + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Whether server should return a given status + EchoStatus response_status = 7; +} + +// Server-streaming response, as configured by the request and parameters. +message StreamingOutputCallResponse { + // Payload to increase response size. + Payload payload = 1; +} + +// For reconnect interop test only. +// Client tells server what reconnection parameters it used. +message ReconnectParams { + int32 max_reconnect_backoff_ms = 1; +} + +// For reconnect interop test only. +// Server tells client whether its reconnects are following the spec and the +// reconnect backoffs it saw. +message ReconnectInfo { + bool passed = 1; + repeated int32 backoff_ms = 2; +} + +message LoadBalancerStatsRequest { + // Request stats for the next num_rpcs sent by client. + int32 num_rpcs = 1; + // If num_rpcs have not completed within timeout_sec, return partial results. + int32 timeout_sec = 2; +} + +message LoadBalancerStatsResponse { + message RpcsByPeer { + // The number of completed RPCs for each peer. + map rpcs_by_peer = 1; + } + // The number of completed RPCs for each peer. + map rpcs_by_peer = 1; + // The number of RPCs that failed to record a remote peer. + int32 num_failures = 2; + map rpcs_by_method = 3; +} diff --git a/packages/grpc-js/proto/grpc/testing/test.proto b/packages/grpc-js/proto/grpc/testing/test.proto new file mode 100644 index 00000000..9d0fadd9 --- /dev/null +++ b/packages/grpc-js/proto/grpc/testing/test.proto @@ -0,0 +1,92 @@ + +// Copyright 2015-2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. + +syntax = "proto3"; + +import "grpc/testing/empty.proto"; +import "grpc/testing/messages.proto"; + +package grpc.testing; + +// A simple service to test the various types of RPCs and experiment with +// performance with various types of payload. +service TestService { + // One empty request followed by one empty response. + rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); + + // One request followed by one response. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by one response. Response has cache control + // headers set such that a caching HTTP proxy (such as GFE) can + // satisfy subsequent requests. + rpc CacheableUnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by one response (streamed upload). + // The server returns the aggregated size of client payload as the result. + rpc StreamingInputCall(stream StreamingInputCallRequest) + returns (StreamingInputCallResponse); + + // A sequence of requests with each request served by the server immediately. + // As one request could lead to multiple responses, this interface + // demonstrates the idea of full duplexing. + rpc FullDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client requests and then serves them in order. A + // stream of responses are returned to the client when the server starts with + // first request. + rpc HalfDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // The test server will not implement this method. It will be used + // to test the behavior when clients call unimplemented methods. + rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty); +} + +// A simple service NOT implemented at servers so clients can test for +// that case. +service UnimplementedService { + // A call that no server should implement + rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty); +} + +// A service used to control reconnect server. +service ReconnectService { + rpc Start(grpc.testing.ReconnectParams) returns (grpc.testing.Empty); + rpc Stop(grpc.testing.Empty) returns (grpc.testing.ReconnectInfo); +} + +// A service used to obtain stats for verifying LB behavior. +service LoadBalancerStatsService { + // Gets the backend distribution for RPCs sent by a test client. + rpc GetClientStats(LoadBalancerStatsRequest) + returns (LoadBalancerStatsResponse) {} +} + +// A service to remotely control health status of an xDS test server. +service XdsUpdateHealthService { + rpc SetServing(grpc.testing.Empty) returns (grpc.testing.Empty); + rpc SetNotServing(grpc.testing.Empty) returns (grpc.testing.Empty); +} diff --git a/packages/grpc-js/scripts/xds.sh b/packages/grpc-js/scripts/xds.sh new file mode 100755 index 00000000..bd30702c --- /dev/null +++ b/packages/grpc-js/scripts/xds.sh @@ -0,0 +1,53 @@ +#!/bin/bash +# Copyright 2020 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Install NVM +cd ~ +export NVM_DIR=`pwd`/.nvm +curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.33.4/install.sh | bash + +# Load NVM +. $NVM_DIR/nvm.sh + +nvm install 12 + +set -exu -o pipefail +[[ -f /VERSION ]] && cat /VERSION + +cd $(dirname $0)/.. +base=$(pwd) +npm run compile + +cd ../../.. + +git clone -b master --single-branch --depth=1 https://github.com/grpc/grpc.git + +grpc/tools/run_tests/helper_scripts/prep_xds.sh + +GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver GRPC_NODE_VERBOSITY=DEBUG \ + python3 grpc/tools/run_tests/run_xds_tests.py \ + --test_case="backends_restart,change_backend_service,gentle_failover,ping_pong,remove_instance_group,round_robin,secondary_locality_gets_no_requests_on_partial_primary_failure,secondary_locality_gets_requests_on_primary_failure" \ + --project_id=grpc-testing \ + --source_image=projects/grpc-testing/global/images/xds-test-server-2 \ + --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ + --gcp_suffix=$(date '+%s') \ + --verbose \ + --client_cmd="node grpc-node/packages/grpc-js/build/interop/xds-interop-client \ + --server=xds:///{server_uri} \ + --stats_port={stats_port} \ + --qps={qps} \ + {fail_on_failed_rpc} \ + {rpcs_to_send} \ + {metadata_to_send}" diff --git a/packages/grpc-js/src/load-balancer-cds.ts b/packages/grpc-js/src/load-balancer-cds.ts index 82346978..01cc4c62 100644 --- a/packages/grpc-js/src/load-balancer-cds.ts +++ b/packages/grpc-js/src/load-balancer-cds.ts @@ -34,6 +34,14 @@ import { ConnectivityState } from './channel'; import { UnavailablePicker } from './picker'; import { Status } from './constants'; import { Metadata } from './metadata'; +import * as logging from './logging'; +import { LogVerbosity } from './constants'; + +const TRACER_NAME = 'cds_balancer'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} const TYPE_NAME = 'cds'; @@ -70,6 +78,7 @@ export class CdsLoadBalancer implements LoadBalancer { * Otherwise, if the field is omitted, load reporting is disabled. */ edsConfig.lrsLoadReportingServerName = ''; } + trace('Child update EDS config: ' + JSON.stringify(edsConfig)); this.childBalancer.updateAddressList( [], { name: 'eds', eds: edsConfig }, @@ -82,6 +91,8 @@ export class CdsLoadBalancer implements LoadBalancer { this.watcher ); this.isWatcherActive = false; + this.channelControlHelper.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: Status.UNAVAILABLE, details: 'CDS resource does not exist', metadata: new Metadata()})); + this.childBalancer.destroy(); }, onTransientError: (status) => { if (this.latestCdsUpdate === null) { @@ -104,11 +115,14 @@ export class CdsLoadBalancer implements LoadBalancer { attributes: { [key: string]: unknown } ): void { if (!isCdsLoadBalancingConfig(lbConfig)) { + trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig)); return; } if (!(attributes.xdsClient instanceof XdsClient)) { + trace('Discarding address list update missing xdsClient attribute'); return; } + trace('Received update with config ' + JSON.stringify(lbConfig)); this.xdsClient = attributes.xdsClient; this.latestAttributes = attributes; diff --git a/packages/grpc-js/src/load-balancer-child-handler.ts b/packages/grpc-js/src/load-balancer-child-handler.ts index 158108f0..b0044d12 100644 --- a/packages/grpc-js/src/load-balancer-child-handler.ts +++ b/packages/grpc-js/src/load-balancer-child-handler.ts @@ -133,9 +133,11 @@ export class ChildLoadBalancerHandler implements LoadBalancer { destroy(): void { if (this.currentChild) { this.currentChild.destroy(); + this.currentChild = null; } if (this.pendingChild) { this.pendingChild.destroy(); + this.pendingChild = null; } } getTypeName(): string { diff --git a/packages/grpc-js/src/load-balancer-eds.ts b/packages/grpc-js/src/load-balancer-eds.ts index 57aac25b..ccc56e9f 100644 --- a/packages/grpc-js/src/load-balancer-eds.ts +++ b/packages/grpc-js/src/load-balancer-eds.ts @@ -21,7 +21,7 @@ import { registerLoadBalancerType, getFirstUsableConfig, } from './load-balancer'; -import { SubchannelAddress } from './subchannel'; +import { SubchannelAddress, subchannelAddressToString } from './subchannel'; import { LoadBalancingConfig, isEdsLoadBalancingConfig, @@ -40,6 +40,14 @@ import { Locality__Output } from './generated/envoy/api/v2/core/Locality'; import { LocalitySubchannelAddress } from './load-balancer-priority'; import { Status } from './constants'; import { Metadata } from './metadata'; +import * as logging from './logging'; +import { LogVerbosity } from './constants'; + +const TRACER_NAME = 'eds_balancer'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} const TYPE_NAME = 'eds'; @@ -127,6 +135,7 @@ export class EdsLoadBalancer implements LoadBalancer { }); this.watcher = { onValidUpdate: (update) => { + trace('Received EDS update for ' + this.edsServiceName + ': ' + JSON.stringify(update)); this.latestEdsUpdate = update; this.updateChild(); }, @@ -136,6 +145,8 @@ export class EdsLoadBalancer implements LoadBalancer { this.watcher ); this.isWatcherActive = false; + this.channelControlHelper.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: Status.UNAVAILABLE, details: 'EDS resource does not exist', metadata: new Metadata()})); + this.childBalancer.destroy(); }, onTransientError: (status) => { if (this.latestEdsUpdate === null) { @@ -207,6 +218,10 @@ export class EdsLoadBalancer implements LoadBalancer { weight: number; addresses: SubchannelAddress[]; }[][] = []; + /** + * New replacement for this.localityPriorities, mapping locality names to + * priority values. The replacement occurrs at the end of this method. + */ const newLocalityPriorities: Map = new Map< string, number @@ -215,12 +230,10 @@ export class EdsLoadBalancer implements LoadBalancer { * loop consolidates localities into buckets by priority, while also * simplifying the data structure to make the later steps simpler */ for (const endpoint of this.latestEdsUpdate.endpoints) { - let localityArray = priorityList[endpoint.priority]; - if (localityArray === undefined) { - localityArray = []; - priorityList[endpoint.priority] = localityArray; + if (!endpoint.load_balancing_weight) { + continue; } - const addresses: SubchannelAddress[] = endpoint.lb_endpoints.map( + const addresses: SubchannelAddress[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map( (lbEndpoint) => { /* The validator in the XdsClient class ensures that each endpoint has * a socket_address with an IP address and a port_value. */ @@ -231,15 +244,22 @@ export class EdsLoadBalancer implements LoadBalancer { }; } ); - localityArray.push({ - locality: endpoint.locality!, - addresses: addresses, - weight: endpoint.load_balancing_weight?.value ?? 0, - }); - newLocalityPriorities.set( - localityToName(endpoint.locality!), - endpoint.priority - ); + if (addresses.length > 0) { + let localityArray = priorityList[endpoint.priority]; + if (localityArray === undefined) { + localityArray = []; + priorityList[endpoint.priority] = localityArray; + } + localityArray.push({ + locality: endpoint.locality!, + addresses: addresses, + weight: endpoint.load_balancing_weight.value, + }); + newLocalityPriorities.set( + localityToName(endpoint.locality!), + endpoint.priority + ); + } } const newPriorityNames: string[] = []; @@ -256,6 +276,7 @@ export class EdsLoadBalancer implements LoadBalancer { * - Otherwise, construct a new name using this.nextPriorityChildNumber. */ for (const [priority, localityArray] of priorityList.entries()) { + // Skip priorities that have no localities with healthy endpoints if (localityArray === undefined) { continue; } @@ -356,6 +377,11 @@ export class EdsLoadBalancer implements LoadBalancer { priorities: newPriorityNames.filter((value) => value !== undefined), }, }; + trace('Child update addresses: ' + addressList.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')')); + trace('Child update priority list: ' + childConfig.priority.priorities); + for (const [childName, child] of childConfig.priority.children) { + trace('Child update priority config: ' + childName + ' -> ' + JSON.stringify(child)); + } this.childBalancer.updateAddressList( addressList, childConfig, @@ -372,11 +398,14 @@ export class EdsLoadBalancer implements LoadBalancer { attributes: { [key: string]: unknown } ): void { if (!isEdsLoadBalancingConfig(lbConfig)) { + trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig)); return; } if (!(attributes.xdsClient instanceof XdsClient)) { + trace('Discarding address list update missing xdsClient attribute'); return; } + trace('Received update with config: ' + JSON.stringify(lbConfig)); this.lastestConfig = lbConfig; this.latestAttributes = attributes; this.xdsClient = attributes.xdsClient; diff --git a/packages/grpc-js/src/load-balancer-priority.ts b/packages/grpc-js/src/load-balancer-priority.ts index d3070715..74f2e978 100644 --- a/packages/grpc-js/src/load-balancer-priority.ts +++ b/packages/grpc-js/src/load-balancer-priority.ts @@ -21,7 +21,7 @@ import { getFirstUsableConfig, registerLoadBalancerType, } from './load-balancer'; -import { SubchannelAddress } from './subchannel'; +import { SubchannelAddress, subchannelAddressToString } from './subchannel'; import { LoadBalancingConfig, isPriorityLoadBalancingConfig, @@ -32,6 +32,14 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; import { ChannelOptions } from './channel-options'; import { Status } from './constants'; import { Metadata } from './metadata'; +import * as logging from './logging'; +import { LogVerbosity } from './constants'; + +const TRACER_NAME = 'priority'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} const TYPE_NAME = 'priority'; @@ -103,6 +111,7 @@ export class PriorityLoadBalancer implements LoadBalancer { } private updateState(connectivityState: ConnectivityState, picker: Picker) { + trace('Child ' + this.name + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[connectivityState]); this.connectivityState = connectivityState; this.picker = picker; this.parent.onChildStateChange(this); @@ -110,7 +119,9 @@ export class PriorityLoadBalancer implements LoadBalancer { private startFailoverTimer() { if (this.failoverTimer === null) { + trace('Starting failover timer for child ' + this.name); this.failoverTimer = setTimeout(() => { + trace('Failover timer triggered for child ' + this.name); this.failoverTimer = null; this.updateState( ConnectivityState.TRANSIENT_FAILURE, @@ -222,6 +233,10 @@ export class PriorityLoadBalancer implements LoadBalancer { constructor(private channelControlHelper: ChannelControlHelper) {} private updateState(state: ConnectivityState, picker: Picker) { + trace( + 'Transitioning to ' + + ConnectivityState[state] + ); /* If switching to IDLE, use a QueuePicker attached to this load balancer * so that when the picker calls exitIdle, that in turn calls exitIdle on * the PriorityChildImpl, which will start the failover timer. */ @@ -233,6 +248,7 @@ export class PriorityLoadBalancer implements LoadBalancer { private onChildStateChange(child: PriorityChildBalancer) { const childState = child.getConnectivityState(); + trace('Child ' + child.getName() + ' transitioning to ' + ConnectivityState[childState]); if (child === this.currentChildFromBeforeUpdate) { if ( childState === ConnectivityState.READY || @@ -295,6 +311,7 @@ export class PriorityLoadBalancer implements LoadBalancer { private selectPriority(priority: number) { this.currentPriority = priority; const chosenChild = this.children.get(this.priorities[priority])!; + chosenChild.cancelFailoverTimer(); this.updateState( chosenChild.getConnectivityState(), chosenChild.getPicker() @@ -369,6 +386,7 @@ export class PriorityLoadBalancer implements LoadBalancer { ): void { if (!isPriorityLoadBalancingConfig(lbConfig)) { // Reject a config of the wrong type + trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig)); return; } const priorityConfig = lbConfig.priority; @@ -416,6 +434,7 @@ export class PriorityLoadBalancer implements LoadBalancer { const chosenChildConfig = getFirstUsableConfig(childConfig.config); if (chosenChildConfig !== null) { const childAddresses = childAddressMap.get(childName) ?? []; + trace('Assigning child ' + childName + ' address list ' + childAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')')) this.latestUpdates.set(childName, { subchannelAddress: childAddresses, lbConfig: chosenChildConfig, @@ -433,6 +452,7 @@ export class PriorityLoadBalancer implements LoadBalancer { // Deactivate all children that are no longer in the priority list for (const [childName, child] of this.children) { if (this.priorities.indexOf(childName) < 0) { + trace('Deactivating child ' + childName); child.deactivate(); } } diff --git a/packages/grpc-js/src/load-balancer-weighted-target.ts b/packages/grpc-js/src/load-balancer-weighted-target.ts index 04145914..2da67e7f 100644 --- a/packages/grpc-js/src/load-balancer-weighted-target.ts +++ b/packages/grpc-js/src/load-balancer-weighted-target.ts @@ -16,7 +16,7 @@ */ import { LoadBalancer, ChannelControlHelper, getFirstUsableConfig, registerLoadBalancerType } from "./load-balancer"; -import { SubchannelAddress } from "./subchannel"; +import { SubchannelAddress, subchannelAddressToString } from "./subchannel"; import { LoadBalancingConfig, WeightedTarget, isWeightedTargetLoadBalancingConfig } from "./load-balancing-config"; import { Picker, PickResult, PickArgs, QueuePicker, UnavailablePicker } from "./picker"; import { ConnectivityState } from "./channel"; @@ -24,6 +24,14 @@ import { ChildLoadBalancerHandler } from "./load-balancer-child-handler"; import { Status } from "./constants"; import { Metadata } from "./metadata"; import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority"; +import * as logging from './logging'; +import { LogVerbosity } from './constants'; + +const TRACER_NAME = 'weighted_target'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} const TYPE_NAME = 'weighted_target'; @@ -116,6 +124,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { } private updateState(connectivityState: ConnectivityState, picker: Picker) { + trace('Target ' + this.name + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[connectivityState]); this.connectivityState = connectivityState; this.picker = picker; this.parent.updateState(); @@ -229,7 +238,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { picker = new WeightedTargetPicker(pickerList); break; case ConnectivityState.CONNECTING: - case ConnectivityState.READY: + case ConnectivityState.IDLE: picker = new QueuePicker(this); break; default: @@ -239,12 +248,17 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { metadata: new Metadata() }); } + trace( + 'Transitioning to ' + + ConnectivityState[connectivityState] + ); this.channelControlHelper.updateState(connectivityState, picker); } updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void { if (!isWeightedTargetLoadBalancingConfig(lbConfig)) { // Reject a config of the wrong type + trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig)); return; } @@ -252,7 +266,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { * which child it belongs to. So we bucket those addresses by that first * element, and pass along the rest of the localityPath for that child * to use. */ - const childAddressMap = new Map(); + const childAddressMap = new Map(); for (const address of addressList) { if (!isLocalitySubchannelAddress(address)) { // Reject address that cannot be associated with targets @@ -284,12 +298,15 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { } else { target.maybeReactivate(); } - target.updateAddressList(childAddressMap.get(targetName) ?? [], targetConfig, attributes); + const targetAddresses = childAddressMap.get(targetName) ?? []; + trace('Assigning target ' + targetName + ' address list ' + targetAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')')); + target.updateAddressList(targetAddresses, targetConfig, attributes); } // Deactivate targets that are not in the new config for (const [targetName, target] of this.targets) { if (this.targetList.indexOf(targetName) < 0) { + trace('Deactivating target ' + targetName); target.deactivate(); } } diff --git a/packages/grpc-js/src/logging.ts b/packages/grpc-js/src/logging.ts index 91b4e8f0..1140e8d8 100644 --- a/packages/grpc-js/src/logging.ts +++ b/packages/grpc-js/src/logging.ts @@ -20,20 +20,20 @@ import { LogVerbosity } from './constants'; let _logger: Partial = console; let _logVerbosity: LogVerbosity = LogVerbosity.ERROR; -if (process.env.GRPC_VERBOSITY) { - switch (process.env.GRPC_VERBOSITY) { - case 'DEBUG': - _logVerbosity = LogVerbosity.DEBUG; - break; - case 'INFO': - _logVerbosity = LogVerbosity.INFO; - break; - case 'ERROR': - _logVerbosity = LogVerbosity.ERROR; - break; - default: - // Ignore any other values - } +const verbosityString = process.env.GRPC_NODE_VERBOSITY ?? process.env.GRPC_VERBOSITY ?? ''; + +switch (verbosityString) { + case 'DEBUG': + _logVerbosity = LogVerbosity.DEBUG; + break; + case 'INFO': + _logVerbosity = LogVerbosity.INFO; + break; + case 'ERROR': + _logVerbosity = LogVerbosity.ERROR; + break; + default: + // Ignore any other values } export const getLogger = (): Partial => { @@ -55,9 +55,8 @@ export const log = (severity: LogVerbosity, ...args: any[]): void => { } }; -const enabledTracers = process.env.GRPC_TRACE - ? process.env.GRPC_TRACE.split(',') - : []; +const tracersString = process.env.GRPC_NODE_TRACE ?? process.env.GRPC_TRACE ?? ''; +const enabledTracers = tracersString.split(','); const allEnabled = enabledTracers.includes('all'); export function trace( diff --git a/packages/grpc-js/src/resolver-xds.ts b/packages/grpc-js/src/resolver-xds.ts index e92fddff..297c6c3f 100644 --- a/packages/grpc-js/src/resolver-xds.ts +++ b/packages/grpc-js/src/resolver-xds.ts @@ -19,9 +19,16 @@ import { GrpcUri, uriToString } from './uri-parser'; import { XdsClient } from './xds-client'; import { ServiceConfig } from './service-config'; import { StatusObject } from './call-stream'; -import { Status } from './constants'; +import { Status, LogVerbosity } from './constants'; import { Metadata } from './metadata'; import { ChannelOptions } from './channel-options'; +import * as logging from './logging'; + +const TRACER_NAME = 'xds_resolver'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} class XdsResolver implements Resolver { private resolutionStarted = false; @@ -47,10 +54,12 @@ class XdsResolver implements Resolver { // Wait until updateResolution is called once to start the xDS requests if (!this.resolutionStarted) { this.resolutionStarted = true; + trace('Starting resolution for target ' + uriToString(this.target)); const xdsClient = new XdsClient( this.target.path, { onValidUpdate: (update: ServiceConfig) => { + trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update)); this.hasReportedSuccess = true; this.listener.onSuccessfulResolution([], update, null, { xdsClient: xdsClient, @@ -60,10 +69,12 @@ class XdsResolver implements Resolver { /* A transient error only needs to bubble up as a failure if we have * not already provided a ServiceConfig for the upper layer to use */ if (!this.hasReportedSuccess) { + trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details); this.reportResolutionError(); } }, onResourceDoesNotExist: () => { + trace('Resolution error for target ' + uriToString(this.target) + ': resource does not exist'); this.reportResolutionError(); }, }, diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 57c750ae..0c4c0d6b 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -18,6 +18,7 @@ import { ServiceConfig } from './service-config'; import * as resolver_dns from './resolver-dns'; import * as resolver_uds from './resolver-uds'; +import * as resolver_xds from './resolver-xds'; import { StatusObject } from './call-stream'; import { SubchannelAddress } from './subchannel'; import { GrpcUri, uriToString } from './uri-parser'; @@ -156,4 +157,5 @@ export function mapUriDefaultScheme(target: GrpcUri): GrpcUri | null { export function registerAll() { resolver_dns.setup(); resolver_uds.setup(); + resolver_xds.setup(); } diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index fad0eddb..2c62b206 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -81,7 +81,7 @@ export type ServerSurfaceCall = { } & EventEmitter; export type ServerUnaryCall = ServerSurfaceCall & { - request: RequestType | null; + request: RequestType; }; export type ServerReadableStream< RequestType, @@ -92,7 +92,7 @@ export type ServerWritableStream< ResponseType > = ServerSurfaceCall & ObjectWritable & { - request: RequestType | null; + request: RequestType; end: (metadata?: Metadata) => void; }; export type ServerDuplexStream = ServerSurfaceCall & @@ -102,15 +102,14 @@ export type ServerDuplexStream = ServerSurfaceCall & export class ServerUnaryCallImpl extends EventEmitter implements ServerUnaryCall { cancelled: boolean; - request: RequestType | null; constructor( private call: Http2ServerCallStream, - public metadata: Metadata + public metadata: Metadata, + public request: RequestType ) { super(); this.cancelled = false; - this.request = null; this.call.setupSurfaceCall(this); } @@ -160,17 +159,16 @@ export class ServerWritableStreamImpl extends Writable implements ServerWritableStream { cancelled: boolean; - request: RequestType | null; private trailingMetadata: Metadata; constructor( private call: Http2ServerCallStream, public metadata: Metadata, - public serialize: Serialize + public serialize: Serialize, + public request: RequestType ) { super({ objectMode: true }); this.cancelled = false; - this.request = null; this.trailingMetadata = new Metadata(); this.call.setupSurfaceCall(this); @@ -280,7 +278,7 @@ ServerDuplexStreamImpl.prototype.end = ServerWritableStreamImpl.prototype.end; // Unary response callback signature. export type sendUnaryData = ( error: ServerErrorResponse | ServerStatusResponse | null, - value: ResponseType | null, + value?: ResponseType | null, trailer?: Metadata, flags?: number ) => void; @@ -524,7 +522,7 @@ export class Http2ServerCallStream< async sendUnaryMessage( err: ServerErrorResponse | ServerStatusResponse | null, - value: ResponseType | null, + value?: ResponseType | null, metadata?: Metadata, flags?: number ) { diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 8d8952e8..5ac25c7c 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -632,22 +632,23 @@ async function handleUnary( handler: UnaryHandler, metadata: Metadata ): Promise { - const emitter = new ServerUnaryCallImpl( - call, - metadata - ); const request = await call.receiveUnaryMessage(); if (request === undefined || call.cancelled) { return; } + + const emitter = new ServerUnaryCallImpl( + call, + metadata, + request + ); - emitter.request = request; handler.func( emitter, ( err: ServerErrorResponse | ServerStatusResponse | null, - value: ResponseType | null, + value?: ResponseType | null, trailer?: Metadata, flags?: number ) => { @@ -669,7 +670,7 @@ function handleClientStreaming( function respond( err: ServerErrorResponse | ServerStatusResponse | null, - value: ResponseType | null, + value?: ResponseType | null, trailer?: Metadata, flags?: number ) { @@ -699,10 +700,10 @@ async function handleServerStreaming( const stream = new ServerWritableStreamImpl( call, metadata, - handler.serialize + handler.serialize, + request ); - stream.request = request; handler.func(stream); } diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 3be6dd0a..b9535769 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -28,7 +28,7 @@ import * as logging from './logging'; import { LogVerbosity } from './constants'; import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; import * as net from 'net'; -import { GrpcUri, parseUri, splitHostPort } from './uri-parser'; +import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; import { ConnectionOptions } from 'tls'; import { FilterFactory, Filter } from './filter'; @@ -40,6 +40,10 @@ function trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); } +function refTrace(text: string): void { + logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', text); +} + const MIN_CONNECT_TIMEOUT_MS = 20000; const INITIAL_BACKOFF_MS = 1000; const BACKOFF_MULTIPLIER = 1.6; @@ -263,6 +267,7 @@ export class Subchannel { } private sendPing() { + logging.trace(LogVerbosity.DEBUG, 'keepalive', 'Sending ping to ' + this.subchannelAddressString); this.keepaliveTimeoutId = setTimeout(() => { this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE); }, this.keepaliveTimeoutMs); @@ -277,7 +282,8 @@ export class Subchannel { this.keepaliveIntervalId = setInterval(() => { this.sendPing(); }, this.keepaliveTimeMs); - this.sendPing(); + /* Don't send a ping immediately because whatever caused us to start + * sending pings should also involve some network activity. */ } private stopKeepalivePings() { @@ -405,14 +411,14 @@ export class Subchannel { errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM && opaqueData.equals(tooManyPingsData) ) { - logging.log( - LogVerbosity.ERROR, - `Connection to ${this.channelTarget} rejected by server because of excess pings` - ); this.keepaliveTimeMs = Math.min( 2 * this.keepaliveTimeMs, KEEPALIVE_MAX_TIME_MS ); + logging.log( + LogVerbosity.ERROR, + `Connection to ${uriToString(this.channelTarget)} at ${this.subchannelAddressString} rejected by server because of excess pings. Increasing ping interval to ${this.keepaliveTimeMs} ms` + ); } trace( this.subchannelAddressString + @@ -585,7 +591,7 @@ export class Subchannel { } callRef() { - trace( + refTrace( this.subchannelAddressString + ' callRefcount ' + this.callRefcount + @@ -602,7 +608,7 @@ export class Subchannel { } callUnref() { - trace( + refTrace( this.subchannelAddressString + ' callRefcount ' + this.callRefcount + @@ -620,7 +626,7 @@ export class Subchannel { } ref() { - trace( + refTrace( this.subchannelAddressString + ' refcount ' + this.refcount + @@ -631,7 +637,7 @@ export class Subchannel { } unref() { - trace( + refTrace( this.subchannelAddressString + ' refcount ' + this.refcount + @@ -691,7 +697,7 @@ export class Subchannel { for (const header of Object.keys(headers)) { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } - trace('Starting stream with headers\n' + headersString); + logging.trace(LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' + this.subchannelAddressString + ' with headers\n' + headersString); callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory); } diff --git a/packages/grpc-js/src/xds-bootstrap.ts b/packages/grpc-js/src/xds-bootstrap.ts index b8e446b2..00e13d09 100644 --- a/packages/grpc-js/src/xds-bootstrap.ts +++ b/packages/grpc-js/src/xds-bootstrap.ts @@ -157,15 +157,6 @@ function validateNode(obj: any): Node { throw new Error(`node.id field: expected string, got ${typeof obj.id}`); } result.id = obj.id; - if (!('cluster' in obj)) { - throw new Error('cluster field missing in node element'); - } - if (typeof obj.cluster !== 'string') { - throw new Error( - `node.cluster field: expected string, got ${typeof obj.cluster}` - ); - } - result.cluster = obj.cluster; if (!('locality' in obj)) { throw new Error('locality field missing in node element'); } @@ -180,7 +171,7 @@ function validateNode(obj: any): Node { result.locality.region = obj.locality.region; } if ('zone' in obj.locality) { - if (typeof obj.locality.region !== 'string') { + if (typeof obj.locality.zone !== 'string') { throw new Error( `node.locality.zone field: expected string, got ${typeof obj.locality .zone}` @@ -197,6 +188,14 @@ function validateNode(obj: any): Node { } result.locality.sub_zone = obj.locality.sub_zone; } + if ('cluster' in obj) { + if (typeof obj.cluster !== 'string') { + throw new Error( + `node.cluster field: expected string, got ${typeof obj.cluster}` + ); + } + result.cluster = obj.cluster; + } if ('metadata' in obj) { result.metadata = getStructFromJson(obj.metadata); } diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index d29d29b8..0b1fa7b9 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -54,6 +54,7 @@ import { Listener__Output } from './generated/envoy/api/v2/Listener'; import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager'; import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration'; import { Any__Output } from './generated/google/protobuf/Any'; +import { BackoffTimeout } from './backoff-timeout'; const TRACER_NAME = 'xds_client'; @@ -105,6 +106,7 @@ function loadAdsProtos(): Promise< enums: String, defaults: true, oneofs: true, + json: true, includeDirs: [ // Paths are relative to src/build __dirname + '/../../deps/envoy-api/', @@ -259,6 +261,7 @@ class EdsState implements XdsStreamState { edsServiceName: string, watcher: Watcher ): void { + trace('Adding EDS watcher for edsServiceName ' + edsServiceName); let watchersEntry = this.watchers.get(edsServiceName); let addedServiceName = false; if (watchersEntry === undefined) { @@ -275,6 +278,7 @@ class EdsState implements XdsStreamState { /* These updates normally occur asynchronously, so we ensure that * the same happens here */ process.nextTick(() => { + trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName); watcher.onValidUpdate(message); }); } @@ -288,6 +292,7 @@ class EdsState implements XdsStreamState { edsServiceName: string, watcher: Watcher ): void { + trace('Removing EDS watcher for edsServiceName ' + edsServiceName); const watchersEntry = this.watchers.get(edsServiceName); let removedServiceName = false; if (watchersEntry !== undefined) { @@ -341,6 +346,7 @@ class EdsState implements XdsStreamState { handleMissingNames(allEdsServiceNames: Set) { for (const [edsServiceName, watcherList] of this.watchers.entries()) { if (!allEdsServiceNames.has(edsServiceName)) { + trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName); for (const watcher of watcherList) { watcher.onResourceDoesNotExist(); } @@ -351,7 +357,8 @@ class EdsState implements XdsStreamState { handleResponses(responses: ClusterLoadAssignment__Output[]) { for (const message of responses) { if (!this.validateResponse(message)) { - return 'ClusterLoadAssignment validation failed'; + trace('EDS validation failed for message ' + JSON.stringify(message)); + return 'EDS Error: ClusterLoadAssignment validation failed'; } } this.latestResponses = responses; @@ -363,6 +370,7 @@ class EdsState implements XdsStreamState { watcher.onValidUpdate(message); } } + trace('Received EDS updates for cluster names ' + Array.from(allClusterNames)); this.handleMissingNames(allClusterNames); return null; } @@ -399,6 +407,7 @@ class CdsState implements XdsStreamState { * @param watcher */ addWatcher(clusterName: string, watcher: Watcher): void { + trace('Adding CDS watcher for clusterName ' + clusterName); let watchersEntry = this.watchers.get(clusterName); let addedServiceName = false; if (watchersEntry === undefined) { @@ -415,6 +424,7 @@ class CdsState implements XdsStreamState { /* These updates normally occur asynchronously, so we ensure that * the same happens here */ process.nextTick(() => { + trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName); watcher.onValidUpdate(message); }); } @@ -425,6 +435,7 @@ class CdsState implements XdsStreamState { } removeWatcher(clusterName: string, watcher: Watcher): void { + trace('Removing CDS watcher for clusterName ' + clusterName); const watchersEntry = this.watchers.get(clusterName); let removedServiceName = false; if (watchersEntry !== undefined) { @@ -465,14 +476,15 @@ class CdsState implements XdsStreamState { } /** - * Given a list of edsServiceNames (which may actually be the cluster name), + * Given a list of clusterNames (which may actually be the cluster name), * for each watcher watching a name not on the list, call that watcher's * onResourceDoesNotExist method. * @param allClusterNames */ private handleMissingNames(allClusterNames: Set) { - for (const [edsServiceName, watcherList] of this.watchers.entries()) { - if (!allClusterNames.has(edsServiceName)) { + for (const [clusterName, watcherList] of this.watchers.entries()) { + if (!allClusterNames.has(clusterName)) { + trace('Reporting CDS resource does not exist for clusterName ' + clusterName); for (const watcher of watcherList) { watcher.onResourceDoesNotExist(); } @@ -483,7 +495,8 @@ class CdsState implements XdsStreamState { handleResponses(responses: Cluster__Output[]): string | null { for (const message of responses) { if (!this.validateResponse(message)) { - return 'Cluster validation failed'; + trace('CDS validation failed for message ' + JSON.stringify(message)); + return 'CDS Error: Cluster validation failed'; } } this.latestResponses = responses; @@ -500,6 +513,7 @@ class CdsState implements XdsStreamState { watcher.onValidUpdate(message); } } + trace('Received CDS updates for cluster names ' + Array.from(allClusterNames)); this.handleMissingNames(allClusterNames); this.edsState.handleMissingNames(allEdsServiceNames); return null; @@ -521,6 +535,7 @@ class RdsState implements XdsStreamState { private routeConfigName: string | null = null; constructor( + private targetName: string, private watcher: Watcher, private updateResouceNames: () => void ) {} @@ -531,9 +546,10 @@ class RdsState implements XdsStreamState { handleSingleMessage(message: RouteConfiguration__Output) { for (const virtualHost of message.virtual_hosts) { - if (virtualHost.domains.indexOf(this.routeConfigName!) >= 0) { + if (virtualHost.domains.indexOf(this.targetName) >= 0) { const route = virtualHost.routes[virtualHost.routes.length - 1]; if (route.match?.prefix === '' && route.route?.cluster) { + trace('Reporting RDS update for host ' + this.targetName + ' with cluster ' + route.route.cluster); this.watcher.onValidUpdate({ methodConfig: [], loadBalancingConfig: [ @@ -545,16 +561,20 @@ class RdsState implements XdsStreamState { }, ], }); - break; + return; + } else { + trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster); } } } + trace('Reporting RDS resource does not exist from domain lists ' + message.virtual_hosts.map(virtualHost => virtualHost.domains)); /* If none of the routes match the one we are looking for, bubble up an * error. */ this.watcher.onResourceDoesNotExist(); } handleResponses(responses: RouteConfiguration__Output[]): string | null { + trace('Received RDS response with route config names ' + responses.map(message => message.name)); if (this.routeConfigName !== null) { for (const message of responses) { if (message.name === this.routeConfigName) { @@ -613,6 +633,7 @@ class LdsState implements XdsStreamState { } handleResponses(responses: Listener__Output[]): string | null { + trace('Received LDS update with names ' + responses.map(message => message.name)); for (const message of responses) { if (message.name === this.targetName) { if (this.validateResponse(message)) { @@ -622,11 +643,13 @@ class LdsState implements XdsStreamState { HttpConnectionManager__Output; switch (httpConnectionManager.route_specifier) { case 'rds': + trace('Received LDS update with RDS route config name ' + httpConnectionManager.rds!.route_config_name); this.rdsState.setRouteConfigName( httpConnectionManager.rds!.route_config_name ); break; case 'route_config': + trace('Received LDS update with route configuration'); this.rdsState.setRouteConfigName(null); this.rdsState.handleSingleMessage( httpConnectionManager.route_config! @@ -636,11 +659,12 @@ class LdsState implements XdsStreamState { // The validation rules should prevent this } } else { - return 'Listener validation failed'; + trace('LRS validation error for message ' + JSON.stringify(message)); + return 'LRS Error: Listener validation failed'; } } } - throw new Error('Method not implemented.'); + return null; } reportStreamError(status: StatusObject): void { @@ -676,11 +700,11 @@ function getResponseMessages( result.push(resource as protoLoader.AnyExtension & OutputType); } else { throw new Error( - `Invalid resource type ${ + `ADS Error: Invalid resource type ${ protoLoader.isAnyExtension(resource) ? resource['@type'] : resource.type_url - }` + }, expected ${typeUrl}` ); } } @@ -710,6 +734,9 @@ export class XdsClient { private adsState: AdsState; + private adsBackoff: BackoffTimeout; + private lrsBackoff: BackoffTimeout; + constructor( targetName: string, serviceConfigWatcher: Watcher, @@ -721,7 +748,7 @@ export class XdsClient { const cdsState = new CdsState(edsState, () => { this.updateNames(CDS_TYPE_URL); }); - const rdsState = new RdsState(serviceConfigWatcher, () => { + const rdsState = new RdsState(targetName, serviceConfigWatcher, () => { this.updateNames(RDS_TYPE_URL); }); const ldsState = new LdsState(targetName, rdsState); @@ -750,7 +777,16 @@ export class XdsClient { for (const arg of channelArgsToRemove) { delete channelArgs[arg]; } - channelArgs['grpc.keepalive_time_ms'] = 5000; + // 5 minutes + channelArgs['grpc.keepalive_time_ms'] = 5 * 60 * 1000; + + this.adsBackoff = new BackoffTimeout(() => { + this.maybeStartAdsStream(); + }); + this.lrsBackoff = new BackoffTimeout(() => { + this.maybeStartLrsStream(); + }) + Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then( ([bootstrapInfo, protoDefinitions]) => { if (this.hasShutdown) { @@ -769,6 +805,7 @@ export class XdsClient { ...node, client_features: ['envoy.lrs.supports_send_all_clusters'], }; + trace('Starting xDS client connected to server URI ' + bootstrapInfo.xdsServers[0].serverUri); this.adsClient = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService( bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials(), @@ -779,7 +816,7 @@ export class XdsClient { this.lrsClient = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService( bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials(), - channelArgs + {channelOverride: this.adsClient.getChannel()} ); this.maybeStartLrsStream(); }, @@ -827,6 +864,7 @@ export class XdsClient { errorString = `Unknown type_url ${message.type_url}`; } if (errorString === null) { + trace('Acking message with type URL ' + message.type_url); /* errorString can only be null in one of the first 4 cases, which * implies that message.type_url is one of the 4 known type URLs, which * means that this type assertion is valid. */ @@ -835,6 +873,7 @@ export class XdsClient { this.adsState[typeUrl].versionInfo = message.version_info; this.ack(typeUrl); } else { + trace('Nacking message with type URL ' + message.type_url + ': "' + errorString + '"'); this.nack(message.type_url, errorString); } } @@ -853,6 +892,9 @@ export class XdsClient { if (this.hasShutdown) { return; } + trace('Starting ADS stream'); + // Backoff relative to when we start the request + this.adsBackoff.runOnce(); this.adsCall = this.adsClient.StreamAggregatedResources(); this.adsCall.on('data', (message: DiscoveryResponse__Output) => { this.handleAdsResponse(message); @@ -863,10 +905,11 @@ export class XdsClient { ); this.adsCall = null; this.reportStreamError(error); - /* Connection backoff is handled by the client object, so we can - * immediately start a new request to indicate that it should try to - * reconnect */ - this.maybeStartAdsStream(); + /* If the backoff timer is no longer running, we do not need to wait any + * more to start the new call. */ + if (!this.adsBackoff.isRunning()) { + this.maybeStartAdsStream(); + } }); const allTypeUrls: AdsTypeUrl[] = [ @@ -877,8 +920,6 @@ export class XdsClient { ]; for (const typeUrl of allTypeUrls) { const state = this.adsState[typeUrl]; - state.nonce = ''; - state.versionInfo = ''; if (state.getResourceNames().length > 0) { this.updateNames(typeUrl); } @@ -890,6 +931,11 @@ export class XdsClient { * version info are updated so that it sends the post-update values. */ ack(typeUrl: AdsTypeUrl) { + /* An ack is the best indication of a successful interaction between the + * client and the server, so we can reset the backoff timer here. */ + this.adsBackoff.stop(); + this.adsBackoff.reset(); + this.updateNames(typeUrl); } @@ -928,6 +974,7 @@ export class XdsClient { } private updateNames(typeUrl: AdsTypeUrl) { + trace('Sending update for type URL ' + typeUrl + ' with names ' + this.adsState[typeUrl].getResourceNames()); this.adsCall?.write({ node: this.adsNode!, type_url: typeUrl, @@ -954,8 +1001,17 @@ export class XdsClient { if (this.hasShutdown) { return; } + + trace('Starting LRS stream'); + this.lrsBackoff.runOnce(); this.lrsCall = this.lrsClient.streamLoadStats(); + this.lrsCall.on('metadata', () => { + /* Once we get any response from the server, we assume that the stream is + * in a good state, so we can reset the backoff timer. */ + this.lrsBackoff.stop(); + this.lrsBackoff.reset(); + }); this.lrsCall.on('data', (message: LoadStatsResponse__Output) => { if ( message.load_reporting_interval?.seconds !== @@ -971,7 +1027,8 @@ export class XdsClient { const loadReportingIntervalMs = Number.parseInt(message.load_reporting_interval!.seconds) * 1000 + message.load_reporting_interval!.nanos / 1_000_000; - setInterval(() => { + trace('Received LRS request with load reporting interval ' + loadReportingIntervalMs + ' ms'); + this.statsTimer = setInterval(() => { this.sendStats(); }, loadReportingIntervalMs); } @@ -982,21 +1039,24 @@ export class XdsClient { 'LRS stream ended. code=' + error.code + ' details= ' + error.details ); this.lrsCall = null; + this.latestLrsSettings = null; clearInterval(this.statsTimer); - /* Connection backoff is handled by the client object, so we can - * immediately start a new request to indicate that it should try to - * reconnect */ - this.maybeStartAdsStream(); - }); - this.lrsCall.write({ - node: this.lrsNode!, + /* If the backoff timer is no longer running, we do not need to wait any + * more to start the new call. */ + if (!this.lrsBackoff.isRunning()) { + this.maybeStartLrsStream(); + } }); + /* Send buffered stats information when starting LRS stream. If there is no + * buffered stats information, it will still send the node field. */ + this.sendStats(); } private sendStats() { if (!this.lrsCall) { return; } + trace('Sending LRS stats'); const clusterStats: ClusterStats[] = []; for (const [ { clusterName, edsServiceName }, diff --git a/packages/grpc-js/tsconfig.json b/packages/grpc-js/tsconfig.json index ba675db7..65ebf089 100644 --- a/packages/grpc-js/tsconfig.json +++ b/packages/grpc-js/tsconfig.json @@ -10,6 +10,7 @@ }, "include": [ "src/**/*.ts", - "test/**/*.ts" + "test/**/*.ts", + "interop/**/*.ts" ] } diff --git a/test/kokoro/xds-interop.cfg b/test/kokoro/xds-interop.cfg new file mode 100644 index 00000000..81a03873 --- /dev/null +++ b/test/kokoro/xds-interop.cfg @@ -0,0 +1,24 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for Kokoro (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc-node/packages/grpc-js/scripts/xds.sh" +timeout_mins: 60 +action { + define_artifacts { + regex: "github/grpc/reports/**" + } +}