Merge branch 'grpc:master' into master

This commit is contained in:
natiz 2022-11-27 21:34:34 +02:00 committed by GitHub
commit d209a34d6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 2589 additions and 614 deletions

View File

@ -180,6 +180,27 @@ class CallStatsTracker {
}
}
class RecentTimestampList {
private timeList: bigint[] = [];
private nextIndex = 0;
constructor(private readonly size: number) {}
isFull() {
return this.timeList.length === this.size;
}
insertTimestamp(timestamp: bigint) {
this.timeList[this.nextIndex] = timestamp;
this.nextIndex = (this.nextIndex + 1) % this.size;
}
getSpan(): bigint {
const lastIndex = (this.nextIndex + this.size - 1) % this.size;
return this.timeList[lastIndex] - this.timeList[this.nextIndex];
}
}
type CallType = 'EmptyCall' | 'UnaryCall';
interface ClientConfiguration {
@ -246,7 +267,13 @@ const callTimeHistogram: {[callType: string]: {[status: number]: number[]}} = {
EmptyCall: {}
}
function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
/**
* Timestamps output by process.hrtime.bigint() are a bigint number of
* nanoseconds. This is the representation of 1 second in that context.
*/
const TIMESTAMP_ONE_SECOND = BigInt(1e9);
function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker, callStartTimestamps: RecentTimestampList) {
const callEnumName = callTypeEnumMapReverse[type];
addAccumulatedCallStarted(callEnumName);
const notifier = callStatsTracker.startCall();
@ -254,19 +281,20 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail
let hostname: string | null = null;
let completed: boolean = false;
let completedWithError: boolean = false;
const startTime = process.hrtime();
const startTime = process.hrtime.bigint();
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + currentConfig.timeoutSec);
const callback = (error: grpc.ServiceError | undefined, value: Empty__Output | undefined) => {
const statusCode = error?.code ?? grpc.status.OK;
const duration = process.hrtime(startTime);
const duration = process.hrtime.bigint() - startTime;
const durationSeconds = Number(duration / TIMESTAMP_ONE_SECOND) | 0;
if (!callTimeHistogram[type][statusCode]) {
callTimeHistogram[type][statusCode] = [];
}
if (callTimeHistogram[type][statusCode][duration[0]]) {
callTimeHistogram[type][statusCode][duration[0]] += 1;
if (callTimeHistogram[type][statusCode][durationSeconds]) {
callTimeHistogram[type][statusCode][durationSeconds] += 1;
} else {
callTimeHistogram[type][statusCode][duration[0]] = 1;
callTimeHistogram[type][statusCode][durationSeconds] = 1;
}
addAccumulatedCallEnded(callEnumName, statusCode);
if (error) {
@ -301,13 +329,28 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail
}
}
});
/* callStartTimestamps tracks the last N timestamps of started calls, where N
* is the target QPS. If the measured span of time between the first and last
* of those N calls is greater than 1 second, we make another call
* ~immediately to correct for that. */
callStartTimestamps.insertTimestamp(startTime);
if (callStartTimestamps.isFull()) {
if (callStartTimestamps.getSpan() > TIMESTAMP_ONE_SECOND) {
setImmediate(() => {
makeSingleRequest(client, type, failOnFailedRpcs, callStatsTracker, callStartTimestamps);
});
}
}
}
function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
const callStartTimestampsTrackers: {[callType: string]: RecentTimestampList} = {};
for (const callType of ['EmptyCall', 'UnaryCall']) {
callStartTimestampsTrackers[callType] = new RecentTimestampList(qps);
}
setInterval(() => {
for (const callType of currentConfig.callTypes) {
makeSingleRequest(client, callType, failOnFailedRpcs, callStatsTracker);
makeSingleRequest(client, callType, failOnFailedRpcs, callStatsTracker, callStartTimestampsTrackers[callType]);
}
}, 1000/qps);
setInterval(() => {

View File

@ -48,6 +48,8 @@ git clone -b master --single-branch --depth=1 https://github.com/grpc/grpc.git
grpc/tools/run_tests/helper_scripts/prep_xds.sh
mkdir -p "${KOKORO_ARTIFACTS_DIR}/github/grpc/reports"
GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds \
GRPC_NODE_VERBOSITY=DEBUG \
NODE_XDS_INTEROP_VERBOSITY=1 \
@ -59,7 +61,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh
--gcp_suffix=$(date '+%s') \
--verbose \
${XDS_V3_OPT-} \
--client_cmd="$(which node) --enable-source-maps grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \
--client_cmd="$(which node) --enable-source-maps --prof --logfile=${KOKORO_ARTIFACTS_DIR}/github/grpc/reports/prof.log grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \
--server=xds:///{server_uri} \
--stats_port={stats_port} \
--qps={qps} \

View File

@ -18,7 +18,7 @@
import * as protoLoader from '@grpc/proto-loader';
// This is a non-public, unstable API, but it's very convenient
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel } from '@grpc/grpc-js';
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel, connectivityState } from '@grpc/grpc-js';
import * as adsTypes from './generated/ads';
import * as lrsTypes from './generated/lrs';
import { loadBootstrapInfo } from './xds-bootstrap';
@ -255,6 +255,7 @@ export class XdsClient {
DiscoveryRequest,
DiscoveryResponse__Output
> | null = null;
private receivedAdsResponseOnCurrentStream = false;
private lrsNode: Node | null = null;
private lrsClient: LoadReportingServiceClient | null = null;
@ -373,6 +374,9 @@ export class XdsClient {
{channelOverride: channel}
);
this.maybeStartAdsStream();
channel.watchConnectivityState(channel.getConnectivityState(false), Infinity, () => {
this.handleAdsConnectivityStateUpdate();
})
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService(
serverUri,
@ -394,7 +398,29 @@ export class XdsClient {
clearInterval(this.statsTimer);
}
private handleAdsConnectivityStateUpdate() {
if (!this.adsClient) {
return;
}
const state = this.adsClient.getChannel().getConnectivityState(false);
if (state === connectivityState.READY && this.adsCall) {
this.reportAdsStreamStarted();
}
if (state === connectivityState.TRANSIENT_FAILURE) {
this.reportStreamError({
code: status.UNAVAILABLE,
details: 'No connection established to xDS server',
metadata: new Metadata()
});
}
this.adsClient.getChannel().watchConnectivityState(state, Infinity, () => {
this.handleAdsConnectivityStateUpdate();
});
}
private handleAdsResponse(message: DiscoveryResponse__Output) {
this.receivedAdsResponseOnCurrentStream = true;
this.adsBackoff.reset();
let handleResponseResult: {
result: HandleResponseResult;
serviceKind: AdsServiceKind;
@ -466,7 +492,7 @@ export class XdsClient {
'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
);
this.adsCall = null;
if (streamStatus.code !== status.OK) {
if (streamStatus.code !== status.OK && !this.receivedAdsResponseOnCurrentStream) {
this.reportStreamError(streamStatus);
}
/* If the backoff timer is no longer running, we do not need to wait any
@ -496,7 +522,9 @@ export class XdsClient {
if (this.adsCall !== null) {
return;
}
this.adsCall = this.adsClient.StreamAggregatedResources();
this.receivedAdsResponseOnCurrentStream = false;
const metadata = new Metadata({waitForReady: true});
this.adsCall = this.adsClient.StreamAggregatedResources(metadata);
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
this.handleAdsResponse(message);
});
@ -515,7 +543,9 @@ export class XdsClient {
this.updateNames(service);
}
}
this.reportAdsStreamStarted();
if (this.adsClient.getChannel().getConnectivityState(false) === connectivityState.READY) {
this.reportAdsStreamStarted();
}
}
private maybeSendAdsMessage(typeUrl: string, resourceNames: string[], responseNonce: string, versionInfo: string, errorMessage?: string) {
@ -547,10 +577,6 @@ export class XdsClient {
* version info are updated so that it sends the post-update values.
*/
ack(serviceKind: AdsServiceKind) {
/* An ack is the best indication of a successful interaction between the
* client and the server, so we can reset the backoff timer here. */
this.adsBackoff.reset();
this.updateNames(serviceKind);
}

View File

@ -213,6 +213,9 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
}
reportAdsStreamStart() {
if (this.isAdsStreamRunning) {
return;
}
this.isAdsStreamRunning = true;
for (const subscriptionEntry of this.subscriptions.values()) {
if (subscriptionEntry.cachedResponse === null) {

View File

@ -59,6 +59,9 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.default_compression_algorithm`
- `grpc.enable_channelz`
- `grpc.dns_min_time_between_resolutions_ms`
- `grpc.enable_retries`
- `grpc.per_rpc_retry_buffer_size`
- `grpc.retry_buffer_size`
- `grpc-node.max_session_memory`
- `channelOverride`
- `channelFactoryOverride`

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.7.2",
"version": "1.7.3",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
@ -17,14 +17,14 @@
"devDependencies": {
"@types/gulp": "^4.0.6",
"@types/gulp-mocha": "0.0.32",
"@types/lodash": "^4.14.108",
"@types/lodash": "^4.14.186",
"@types/mocha": "^5.2.6",
"@types/ncp": "^2.0.1",
"@types/pify": "^3.0.2",
"@types/semver": "^7.3.9",
"clang-format": "^1.0.55",
"execa": "^2.0.3",
"gts": "^2.0.0",
"gts": "^3.1.1",
"gulp": "^4.0.2",
"gulp-mocha": "^6.0.0",
"lodash": "^4.17.4",
@ -35,7 +35,7 @@
"rimraf": "^3.0.2",
"semver": "^7.3.5",
"ts-node": "^8.3.0",
"typescript": "^3.7.2"
"typescript": "^4.8.4"
},
"contributors": [
{

View File

@ -115,6 +115,10 @@ export abstract class CallCredentials {
reject(err);
return;
}
if (!headers) {
reject(new Error('Headers not set by metadata plugin'));
return;
}
resolve(headers);
}
);

View File

@ -36,6 +36,10 @@ export interface StatusObject {
metadata: Metadata;
}
export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
metadata: Metadata | null;
}
export const enum WriteFlags {
BufferHint = 1,
NoCompress = 2,

View File

@ -44,6 +44,16 @@ export interface ChannelOptions {
'grpc.default_compression_algorithm'?: CompressionAlgorithms;
'grpc.enable_channelz'?: number;
'grpc.dns_min_time_between_resolutions_ms'?: number;
'grpc.enable_retries'?: number;
'grpc.per_rpc_retry_buffer_size'?: number;
/* This option is pattered like a core option, but the core does not have
* this option. It is closely related to the option
* grpc.per_rpc_retry_buffer_size, which is in the core. The core will likely
* implement this functionality using the ResourceQuota mechanism, so there
* will probably not be any collision or other inconsistency. */
'grpc.retry_buffer_size'?: number;
'grpc.max_connection_age_ms'?: number;
'grpc.max_connection_age_grace_ms'?: number;
'grpc-node.max_session_memory'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
@ -71,6 +81,11 @@ export const recognizedOptions = {
'grpc.enable_http_proxy': true,
'grpc.enable_channelz': true,
'grpc.dns_min_time_between_resolutions_ms': true,
'grpc.enable_retries': true,
'grpc.per_rpc_retry_buffer_size': true,
'grpc.retry_buffer_size': true,
'grpc.max_connection_age_ms': true,
'grpc.max_connection_age_grace_ms': true,
'grpc-node.max_session_memory': true,
};

View File

@ -34,6 +34,7 @@ import { Channel } from './channel';
import { CallOptions } from './client';
import { CallCredentials } from './call-credentials';
import { ClientMethodDefinition } from './make-client';
import { getErrorMessage } from './error';
/**
* Error class associated with passing both interceptors and interceptor
@ -374,7 +375,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
} catch (e) {
this.call.cancelWithStatus(
Status.INTERNAL,
`Request message serialization failure: ${e.message}`
`Request message serialization failure: ${getErrorMessage(e)}`
);
return;
}
@ -401,7 +402,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
} catch (e) {
readError = {
code: Status.INTERNAL,
details: `Response message parsing error: ${e.message}`,
details: `Response message parsing error: ${getErrorMessage(e)}`,
metadata: new Metadata(),
};
this.call.cancelWithStatus(readError.code, readError.details);

View File

@ -0,0 +1,37 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
export function getErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
} else {
return String(error);
}
}
export function getErrorCode(error: unknown): number | null {
if (
typeof error === 'object' &&
error !== null &&
'code' in error &&
typeof (error as Record<string, unknown>).code === 'number'
) {
return (error as Record<string, number>).code;
} else {
return null;
}
}

View File

@ -88,6 +88,10 @@ export class FilterStackFactory implements FilterFactory<FilterStack> {
this.factories.unshift(...filterFactories);
}
clone(): FilterStackFactory {
return new FilterStackFactory([...this.factories]);
}
createFilter(): FilterStack {
return new FilterStack(
this.factories.map((factory) => factory.createFilter())

View File

@ -25,102 +25,102 @@ export interface ChannelzClient extends grpc.Client {
/**
* Returns a single Channel, or else a NOT_FOUND code.
*/
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall;
/**
* Returns a single Server, or else a NOT_FOUND code.
*/
GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
/**
* Returns a single Server, or else a NOT_FOUND code.
*/
getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
/**
* Gets all server sockets that exist in the process.
*/
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
/**
* Gets all server sockets that exist in the process.
*/
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
/**
* Gets all servers that exist in the process.
*/
GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
/**
* Gets all servers that exist in the process.
*/
getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
/**
* Returns a single Socket or else a NOT_FOUND code.
*/
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
/**
* Returns a single Socket or else a NOT_FOUND code.
*/
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
/**
* Returns a single Subchannel, or else a NOT_FOUND code.
*/
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
/**
* Returns a single Subchannel, or else a NOT_FOUND code.
*/
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
/**
* Gets all root channels (i.e. channels the application has directly
* created). This does not include subchannels nor non-top level channels.
*/
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
/**
* Gets all root channels (i.e. channels the application has directly
* created). This does not include subchannels nor non-top level channels.
*/
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
}

View File

@ -50,6 +50,7 @@ import { Deadline, getDeadlineTimeoutString } from './deadline';
import { ResolvingCall } from './resolving-call';
import { getNextCallNumber } from './call-number';
import { restrictControlPlaneStatusCode } from './control-plane-status';
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';
/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
@ -78,6 +79,11 @@ interface ErrorConfigResult {
type GetConfigResult = NoneConfigResult | SuccessConfigResult | ErrorConfigResult;
const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB
export class InternalChannel {
private resolvingLoadBalancer: ResolvingLoadBalancer;
@ -111,6 +117,7 @@ export class InternalChannel {
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
// Channelz info
private readonly channelzEnabled: boolean = true;
@ -179,6 +186,10 @@ export class InternalChannel {
this.subchannelPool = getSubchannelPool(
(options['grpc.use_local_subchannel_pool'] ?? 0) === 0
);
this.retryBufferTracker = new MessageBufferTracker(
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
@ -226,7 +237,12 @@ export class InternalChannel {
this.target,
channelControlHelper,
options,
(configSelector) => {
(serviceConfig, configSelector) => {
if (serviceConfig.retryThrottling) {
RETRY_THROTTLER_MAP.set(this.getTarget(), new RetryThrottler(serviceConfig.retryThrottling.maxTokens, serviceConfig.retryThrottling.tokenRatio, RETRY_THROTTLER_MAP.get(this.getTarget())));
} else {
RETRY_THROTTLER_MAP.delete(this.getTarget());
}
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
}
@ -243,6 +259,7 @@ export class InternalChannel {
}
this.configSelectionQueue = [];
});
},
(status) => {
if (this.channelzEnabled) {
@ -402,7 +419,25 @@ export class InternalChannel {
method +
'"'
);
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, this.filterStackFactory, callNumber);
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber);
}
createRetryingCall(
callConfig: CallConfig,
method: string,
host: string,
credentials: CallCredentials,
deadline: Deadline
): RetryingCall {
const callNumber = getNextCallNumber();
this.trace(
'createRetryingCall [' +
callNumber +
'] method="' +
method +
'"'
);
return new RetryingCall(this, callConfig, method, host, credentials, deadline, callNumber, this.retryBufferTracker, RETRY_THROTTLER_MAP.get(this.getTarget()))
}
createInnerCall(
@ -413,7 +448,11 @@ export class InternalChannel {
deadline: Deadline
): Call {
// Create a RetryingCall if retries are enabled
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
if (this.options['grpc.enable_retries'] === 0) {
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
} else {
return this.createRetryingCall(callConfig, method, host, credentials, deadline);
}
}
createResolvingCall(
@ -439,7 +478,7 @@ export class InternalChannel {
parentCall: parentCall,
};
const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), getNextCallNumber());
const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory.clone(), this.credentials._getCallCredentials(), callNumber);
if (this.channelzEnabled) {
this.callTracker.addCallStarted();

View File

@ -29,6 +29,7 @@ import { CallConfig } from "./resolver";
import { splitHostPort } from "./uri-parser";
import * as logging from './logging';
import { restrictControlPlaneStatusCode } from "./control-plane-status";
import * as http2 from 'http2';
const TRACER_NAME = 'load_balancing_call';
@ -38,17 +39,18 @@ export interface StatusObjectWithProgress extends StatusObject {
progress: RpcProgress;
}
export interface LoadBalancingCallInterceptingListener extends InterceptingListener {
onReceiveStatus(status: StatusObjectWithProgress): void;
}
export class LoadBalancingCall implements Call {
private child: SubchannelCall | null = null;
private readPending = false;
private writeFilterPending = false;
private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
private pendingHalfClose = false;
private readFilterPending = false;
private pendingChildStatus: StatusObject | null = null;
private ended = false;
private serviceUrl: string;
private filterStack: FilterStack;
private metadata: Metadata | null = null;
private listener: InterceptingListener | null = null;
private onCallEnded: ((statusCode: Status) => void) | null = null;
@ -59,11 +61,8 @@ export class LoadBalancingCall implements Call {
private readonly host : string,
private readonly credentials: CallCredentials,
private readonly deadline: Deadline,
filterStackFactory: FilterStackFactory,
private readonly callNumber: number
) {
this.filterStack = filterStackFactory.createFilter();
const splitPath: string[] = this.methodName.split('/');
let serviceName = '';
/* The standard path format is "/{serviceName}/{methodName}", so if we split
@ -90,8 +89,7 @@ export class LoadBalancingCall implements Call {
if (!this.ended) {
this.ended = true;
this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
const filteredStatus = this.filterStack.receiveTrailers(status);
const finalStatus = {...filteredStatus, progress};
const finalStatus = {...status, progress};
this.listener?.onReceiveStatus(finalStatus);
this.onCallEnded?.(finalStatus.code);
}
@ -152,23 +150,17 @@ export class LoadBalancingCall implements Call {
try {
this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
onReceiveMetadata: metadata => {
this.listener!.onReceiveMetadata(this.filterStack.receiveMetadata(metadata));
this.trace('Received metadata');
this.listener!.onReceiveMetadata(metadata);
},
onReceiveMessage: message => {
this.readFilterPending = true;
this.filterStack.receiveMessage(message).then(filteredMesssage => {
this.readFilterPending = false;
this.listener!.onReceiveMessage(filteredMesssage);
if (this.pendingChildStatus) {
this.outputStatus(this.pendingChildStatus, 'PROCESSED');
}
}, (status: StatusObject) => {
this.cancelWithStatus(status.code, status.details);
});
this.trace('Received message');
this.listener!.onReceiveMessage(message);
},
onReceiveStatus: status => {
if (this.readFilterPending) {
this.pendingChildStatus = status;
this.trace('Received status');
if (status.rstCode === http2.constants.NGHTTP2_REFUSED_STREAM) {
this.outputStatus(status, 'REFUSED');
} else {
this.outputStatus(status, 'PROCESSED');
}
@ -201,7 +193,7 @@ export class LoadBalancingCall implements Call {
if (this.pendingMessage) {
this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
}
if (this.pendingHalfClose && !this.writeFilterPending) {
if (this.pendingHalfClose) {
this.child.halfClose();
}
}, (error: Error & { code: number }) => {
@ -246,32 +238,19 @@ export class LoadBalancingCall implements Call {
getPeer(): string {
return this.child?.getPeer() ?? this.channel.getTarget();
}
start(metadata: Metadata, listener: InterceptingListener): void {
start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void {
this.trace('start called');
this.listener = listener;
this.filterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
this.metadata = filteredMetadata;
this.doPick();
}, (status: StatusObject) => {
this.outputStatus(status, 'PROCESSED');
});
this.metadata = metadata;
this.doPick();
}
sendMessageWithContext(context: MessageContext, message: Buffer): void {
this.trace('write() called with message of length ' + message.length);
this.writeFilterPending = true;
this.filterStack.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
this.writeFilterPending = false;
if (this.child) {
this.child.sendMessageWithContext(context, filteredMessage.message);
if (this.pendingHalfClose) {
this.child.halfClose();
}
} else {
this.pendingMessage = {context, message: filteredMessage.message};
}
}, (status: StatusObject) => {
this.cancelWithStatus(status.code, status.details);
})
if (this.child) {
this.child.sendMessageWithContext(context, message);
} else {
this.pendingMessage = {context, message};
}
}
startRead(): void {
this.trace('startRead called');
@ -283,7 +262,7 @@ export class LoadBalancingCall implements Call {
}
halfClose(): void {
this.trace('halfClose called');
if (this.child && !this.writeFilterPending) {
if (this.child) {
this.child.halfClose();
} else {
this.pendingHalfClose = true;

View File

@ -18,6 +18,7 @@
import * as http2 from 'http2';
import { log } from './logging';
import { LogVerbosity } from './constants';
import { getErrorMessage } from './error';
const LEGAL_KEY_REGEX = /^[0-9a-z_.-]+$/;
const LEGAL_NON_BINARY_VALUE_REGEX = /^[ -~]*$/;
@ -48,13 +49,14 @@ function validate(key: string, value?: MetadataValue): void {
if (!isLegalKey(key)) {
throw new Error('Metadata key "' + key + '" contains illegal characters');
}
if (value !== null && value !== undefined) {
if (isBinaryKey(key)) {
if (!(value instanceof Buffer)) {
if (!Buffer.isBuffer(value)) {
throw new Error("keys that end with '-bin' must have Buffer values");
}
} else {
if (value instanceof Buffer) {
if (Buffer.isBuffer(value)) {
throw new Error(
"keys that don't end with '-bin' must have String values"
);
@ -88,12 +90,8 @@ export class Metadata {
protected internalRepr: MetadataObject = new Map<string, MetadataValue[]>();
private options: MetadataOptions;
constructor(options?: MetadataOptions) {
if (options === undefined) {
this.options = {};
} else {
this.options = options;
}
constructor(options: MetadataOptions = {}) {
this.options = options;
}
/**
@ -120,9 +118,7 @@ export class Metadata {
key = normalizeKey(key);
validate(key, value);
const existingValue: MetadataValue[] | undefined = this.internalRepr.get(
key
);
const existingValue: MetadataValue[] | undefined = this.internalRepr.get(key);
if (existingValue === undefined) {
this.internalRepr.set(key, [value]);
@ -137,7 +133,7 @@ export class Metadata {
*/
remove(key: string): void {
key = normalizeKey(key);
validate(key);
// validate(key);
this.internalRepr.delete(key);
}
@ -148,7 +144,7 @@ export class Metadata {
*/
get(key: string): MetadataValue[] {
key = normalizeKey(key);
validate(key);
// validate(key);
return this.internalRepr.get(key) || [];
}
@ -160,12 +156,12 @@ export class Metadata {
getMap(): { [key: string]: MetadataValue } {
const result: { [key: string]: MetadataValue } = {};
this.internalRepr.forEach((values, key) => {
for (const [key, values] of this.internalRepr) {
if (values.length > 0) {
const v = values[0];
result[key] = v instanceof Buffer ? v.slice() : v;
result[key] = Buffer.isBuffer(v) ? Buffer.from(v) : v;
}
});
}
return result;
}
@ -177,9 +173,9 @@ export class Metadata {
const newMetadata = new Metadata(this.options);
const newInternalRepr = newMetadata.internalRepr;
this.internalRepr.forEach((value, key) => {
for (const [key, value] of this.internalRepr) {
const clonedValue: MetadataValue[] = value.map((v) => {
if (v instanceof Buffer) {
if (Buffer.isBuffer(v)) {
return Buffer.from(v);
} else {
return v;
@ -187,7 +183,7 @@ export class Metadata {
});
newInternalRepr.set(key, clonedValue);
});
}
return newMetadata;
}
@ -200,13 +196,13 @@ export class Metadata {
* @param other A Metadata object.
*/
merge(other: Metadata): void {
other.internalRepr.forEach((values, key) => {
for (const [key, values] of other.internalRepr) {
const mergedValue: MetadataValue[] = (
this.internalRepr.get(key) || []
).concat(values);
this.internalRepr.set(key, mergedValue);
});
}
}
setOptions(options: MetadataOptions) {
@ -223,17 +219,13 @@ export class Metadata {
toHttp2Headers(): http2.OutgoingHttpHeaders {
// NOTE: Node <8.9 formats http2 headers incorrectly.
const result: http2.OutgoingHttpHeaders = {};
this.internalRepr.forEach((values, key) => {
for (const [key, values] of this.internalRepr) {
// We assume that the user's interaction with this object is limited to
// through its public API (i.e. keys and values are already validated).
result[key] = values.map((value) => {
if (value instanceof Buffer) {
return value.toString('base64');
} else {
return value;
}
});
});
result[key] = values.map(bufToString);
}
return result;
}
@ -248,7 +240,7 @@ export class Metadata {
*/
toJSON() {
const result: { [key: string]: MetadataValue[] } = {};
for (const [key, values] of this.internalRepr.entries()) {
for (const [key, values] of this.internalRepr) {
result[key] = values;
}
return result;
@ -261,10 +253,10 @@ export class Metadata {
*/
static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata {
const result = new Metadata();
Object.keys(headers).forEach((key) => {
for (const key of Object.keys(headers)) {
// Reserved headers (beginning with `:`) are not valid keys.
if (key.charAt(0) === ':') {
return;
continue;
}
const values = headers[key];
@ -294,10 +286,15 @@ export class Metadata {
}
}
} catch (error) {
const message = `Failed to add metadata entry ${key}: ${values}. ${error.message}. For more information see https://github.com/grpc/grpc-node/issues/1173`;
const message = `Failed to add metadata entry ${key}: ${values}. ${getErrorMessage(error)}. For more information see https://github.com/grpc/grpc-node/issues/1173`;
log(LogVerbosity.ERROR, message);
}
});
}
return result;
}
}
const bufToString = (val: string | Buffer): string => {
return Buffer.isBuffer(val) ? val.toString('base64') : val
};

View File

@ -19,7 +19,7 @@ import { CallCredentials } from "./call-credentials";
import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
import { LogVerbosity, Propagate, Status } from "./constants";
import { Deadline, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline";
import { FilterStackFactory } from "./filter-stack";
import { FilterStack, FilterStackFactory } from "./filter-stack";
import { InternalChannel } from "./internal-channel";
import { Metadata } from "./metadata";
import * as logging from './logging';
@ -33,12 +33,16 @@ export class ResolvingCall implements Call {
private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
private pendingHalfClose = false;
private ended = false;
private readFilterPending = false;
private writeFilterPending = false;
private pendingChildStatus: StatusObject | null = null;
private metadata: Metadata | null = null;
private listener: InterceptingListener | null = null;
private deadline: Deadline;
private host: string;
private statusWatchers: ((status: StatusObject) => void)[] = [];
private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
private filterStack: FilterStack | null = null;
constructor(
private readonly channel: InternalChannel,
@ -96,14 +100,35 @@ export class ResolvingCall implements Call {
private outputStatus(status: StatusObject) {
if (!this.ended) {
this.ended = true;
this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
this.statusWatchers.forEach(watcher => watcher(status));
if (!this.filterStack) {
this.filterStack = this.filterStackFactory.createFilter();
}
const filteredStatus = this.filterStack.receiveTrailers(status);
this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"');
this.statusWatchers.forEach(watcher => watcher(filteredStatus));
process.nextTick(() => {
this.listener?.onReceiveStatus(status);
this.listener?.onReceiveStatus(filteredStatus);
});
}
}
private sendMessageOnChild(context: MessageContext, message: Buffer): void {
if (!this.child) {
throw new Error('sendMessageonChild called with child not populated');
}
const child = this.child;
this.writeFilterPending = true;
this.filterStack!.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
this.writeFilterPending = false;
child.sendMessageWithContext(context, filteredMessage.message);
if (this.pendingHalfClose) {
child.halfClose();
}
}, (status: StatusObject) => {
this.cancelWithStatus(status.code, status.details);
});
}
getConfig(): void {
if (this.ended) {
return;
@ -145,32 +170,50 @@ export class ResolvingCall implements Call {
config.methodConfig.timeout.nanos / 1_000_000
);
this.deadline = minDeadline(this.deadline, configDeadline);
this.runDeadlineTimer();
}
this.filterStackFactory.push(config.dynamicFilterFactories);
this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
this.child.start(this.metadata, {
onReceiveMetadata: metadata => {
this.listener!.onReceiveMetadata(metadata);
},
onReceiveMessage: message => {
this.listener!.onReceiveMessage(message);
},
onReceiveStatus: status => {
this.outputStatus(status);
this.filterStack = this.filterStackFactory.createFilter();
this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => {
this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
this.child.start(filteredMetadata, {
onReceiveMetadata: metadata => {
this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata));
},
onReceiveMessage: message => {
this.readFilterPending = true;
this.filterStack!.receiveMessage(message).then(filteredMesssage => {
this.readFilterPending = false;
this.listener!.onReceiveMessage(filteredMesssage);
if (this.pendingChildStatus) {
this.outputStatus(this.pendingChildStatus);
}
}, (status: StatusObject) => {
this.cancelWithStatus(status.code, status.details);
});
},
onReceiveStatus: status => {
if (this.readFilterPending) {
this.pendingChildStatus = status;
} else {
this.outputStatus(status);
}
}
});
if (this.readPending) {
this.child.startRead();
}
});
if (this.readPending) {
this.child.startRead();
}
if (this.pendingMessage) {
this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
}
if (this.pendingHalfClose) {
this.child.halfClose();
}
if (this.pendingMessage) {
this.sendMessageOnChild(this.pendingMessage.context, this.pendingMessage.message);
} else if (this.pendingHalfClose) {
this.child.halfClose();
}
}, (status: StatusObject) => {
this.outputStatus(status);
})
}
reportResolverError(status: StatusObject) {
if (this.metadata?.getOptions().waitForReady) {
this.channel.queueCallForConfig(this);
@ -195,7 +238,7 @@ export class ResolvingCall implements Call {
sendMessageWithContext(context: MessageContext, message: Buffer): void {
this.trace('write() called with message of length ' + message.length);
if (this.child) {
this.child.sendMessageWithContext(context, message);
this.sendMessageOnChild(context, message);
} else {
this.pendingMessage = {context, message};
}
@ -210,7 +253,7 @@ export class ResolvingCall implements Call {
}
halfClose(): void {
this.trace('halfClose called');
if (this.child) {
if (this.child && !this.writeFilterPending) {
this.child.halfClose();
} else {
this.pendingHalfClose = true;

View File

@ -83,7 +83,7 @@ function getDefaultConfigSelector(
}
export interface ResolutionCallback {
(configSelector: ConfigSelector): void;
(serviceConfig: ServiceConfig, configSelector: ConfigSelector): void;
}
export interface ResolutionFailureCallback {
@ -239,6 +239,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
const finalServiceConfig =
workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(
finalServiceConfig,
configSelector ?? getDefaultConfigSelector(finalServiceConfig)
);
},

View File

@ -0,0 +1,639 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { CallCredentials } from "./call-credentials";
import { LogVerbosity, Status } from "./constants";
import { Deadline } from "./deadline";
import { Metadata } from "./metadata";
import { CallConfig } from "./resolver";
import * as logging from './logging';
import { Call, InterceptingListener, MessageContext, StatusObject, WriteCallback, WriteObject } from "./call-interface";
import { LoadBalancingCall, StatusObjectWithProgress } from "./load-balancing-call";
import { InternalChannel } from "./internal-channel";
const TRACER_NAME = 'retrying_call';
export class RetryThrottler {
private tokens: number;
constructor(private readonly maxTokens: number, private readonly tokenRatio: number, previousRetryThrottler?: RetryThrottler) {
if (previousRetryThrottler) {
/* When carrying over tokens from a previous config, rescale them to the
* new max value */
this.tokens = previousRetryThrottler.tokens * (maxTokens / previousRetryThrottler.maxTokens);
} else {
this.tokens = maxTokens;
}
}
addCallSucceeded() {
this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
}
addCallFailed() {
this.tokens = Math.min(this.tokens - 1, 0);
}
canRetryCall() {
return this.tokens > this.maxTokens / 2;
}
}
export class MessageBufferTracker {
private totalAllocated: number = 0;
private allocatedPerCall: Map<number, number> = new Map<number, number>();
constructor(private totalLimit: number, private limitPerCall: number) {}
allocate(size: number, callId: number): boolean {
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
if (this.limitPerCall - currentPerCall < size || this.totalLimit - this.totalAllocated < size) {
return false;
}
this.allocatedPerCall.set(callId, currentPerCall + size);
this.totalAllocated += size;
return true;
}
free(size: number, callId: number) {
if (this.totalAllocated < size) {
throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`);
}
this.totalAllocated -= size;
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
if (currentPerCall < size) {
throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`);
}
this.allocatedPerCall.set(callId, currentPerCall - size);
}
freeAll(callId: number) {
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
if (this.totalAllocated < currentPerCall) {
throw new Error(`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`);
}
this.totalAllocated -= currentPerCall;
this.allocatedPerCall.delete(callId);
}
}
type UnderlyingCallState = 'ACTIVE' | 'COMPLETED';
interface UnderlyingCall {
state: UnderlyingCallState;
call: LoadBalancingCall;
nextMessageToSend: number;
}
/**
* A retrying call can be in one of these states:
* RETRY: Retries are configured and new attempts may be sent
* HEDGING: Hedging is configured and new attempts may be sent
* TRANSPARENT_ONLY: Neither retries nor hedging are configured, and
* transparent retry attempts may still be sent
* COMMITTED: One attempt is committed, and no new attempts will be
* sent
*/
type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';
/**
* The different types of objects that can be stored in the write buffer, with
* the following meanings:
* MESSAGE: This is a message to be sent.
* HALF_CLOSE: When this entry is reached, the calls should send a half-close.
* FREED: This slot previously contained a message that has been sent on all
* child calls and is no longer needed.
*/
type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED';
/**
* Entry in the buffer of messages to send to the remote end.
*/
interface WriteBufferEntry {
entryType: WriteBufferEntryType;
/**
* Message to send.
* Only populated if entryType is MESSAGE.
*/
message?: WriteObject;
/**
* Callback to call after sending the message.
* Only populated if entryType is MESSAGE and the call is in the COMMITTED
* state.
*/
callback?: WriteCallback;
/**
* Indicates whether the message is allocated in the buffer tracker. Ignored
* if entryType is not MESSAGE. Should be the return value of
* bufferTracker.allocate.
*/
allocated: boolean;
}
const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
export class RetryingCall implements Call {
private state: RetryingCallState;
private listener: InterceptingListener | null = null;
private initialMetadata: Metadata | null = null;
private underlyingCalls: UnderlyingCall[] = [];
private writeBuffer: WriteBufferEntry[] = [];
/**
* Tracks whether a read has been started, so that we know whether to start
* reads on new child calls. This only matters for the first read, because
* once a message comes in the child call becomes committed and there will
* be no new child calls.
*/
private readStarted = false;
private transparentRetryUsed: boolean = false;
/**
* Number of attempts so far
*/
private attempts: number = 0;
private hedgingTimer: NodeJS.Timer | null = null;
private committedCallIndex: number | null = null;
private initialRetryBackoffSec = 0;
private nextRetryBackoffSec = 0;
constructor(
private readonly channel: InternalChannel,
private readonly callConfig: CallConfig,
private readonly methodName: string,
private readonly host: string,
private readonly credentials: CallCredentials,
private readonly deadline: Deadline,
private readonly callNumber: number,
private readonly bufferTracker: MessageBufferTracker,
private readonly retryThrottler?: RetryThrottler
) {
if (callConfig.methodConfig.retryPolicy) {
this.state = 'RETRY';
const retryPolicy = callConfig.methodConfig.retryPolicy;
this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(retryPolicy.initialBackoff.substring(0, retryPolicy.initialBackoff.length - 1));
} else if (callConfig.methodConfig.hedgingPolicy) {
this.state = 'HEDGING';
} else {
this.state = 'TRANSPARENT_ONLY';
}
}
getCallNumber(): number {
return this.callNumber;
}
private trace(text: string): void {
logging.trace(
LogVerbosity.DEBUG,
TRACER_NAME,
'[' + this.callNumber + '] ' + text
);
}
private reportStatus(statusObject: StatusObject) {
this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
process.nextTick(() => {
this.listener?.onReceiveStatus(statusObject);
});
}
cancelWithStatus(status: Status, details: string): void {
this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
this.reportStatus({code: status, details, metadata: new Metadata()});
for (const {call} of this.underlyingCalls) {
call.cancelWithStatus(status, details);
}
}
getPeer(): string {
if (this.committedCallIndex !== null) {
return this.underlyingCalls[this.committedCallIndex].call.getPeer();
} else {
return 'unknown';
}
}
private maybefreeMessageBufferEntry(messageIndex: number) {
if (this.state !== 'COMMITTED') {
return;
}
const bufferEntry = this.writeBuffer[messageIndex];
if (bufferEntry.entryType === 'MESSAGE') {
if (bufferEntry.allocated) {
this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
}
this.writeBuffer[messageIndex] = {
entryType: 'FREED',
allocated: false
};
}
}
private commitCall(index: number) {
if (this.state === 'COMMITTED') {
return;
}
if (this.underlyingCalls[index].state === 'COMPLETED') {
return;
}
this.trace('Committing call [' + this.underlyingCalls[index].call.getCallNumber() + '] at index ' + index);
this.state = 'COMMITTED';
this.committedCallIndex = index;
for (let i = 0; i < this.underlyingCalls.length; i++) {
if (i === index) {
continue;
}
if (this.underlyingCalls[i].state === 'COMPLETED') {
continue;
}
this.underlyingCalls[i].state = 'COMPLETED';
this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt');
}
for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) {
this.maybefreeMessageBufferEntry(messageIndex);
}
}
private commitCallWithMostMessages() {
let mostMessages = -1;
let callWithMostMessages = -1;
for (const [index, childCall] of this.underlyingCalls.entries()) {
if (childCall.nextMessageToSend > mostMessages) {
mostMessages = childCall.nextMessageToSend;
callWithMostMessages = index;
}
}
this.commitCall(callWithMostMessages);
}
private isStatusCodeInList(list: (Status | string)[], code: Status) {
return list.some((value => value === code || value.toString().toLowerCase() === Status[code].toLowerCase()));
}
private getNextRetryBackoffMs() {
const retryPolicy = this.callConfig?.methodConfig.retryPolicy;
if (!retryPolicy) {
return 0;
}
const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
const maxBackoffSec = Number(retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1));
this.nextRetryBackoffSec = Math.min(this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, maxBackoffSec);
return nextBackoffMs
}
private maybeRetryCall(pushback: number | null, callback: (retried: boolean) => void) {
if (this.state !== 'RETRY') {
callback(false);
return;
}
const retryPolicy = this.callConfig!.methodConfig.retryPolicy!;
if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) {
callback(false);
return;
}
let retryDelayMs: number;
if (pushback === null) {
retryDelayMs = this.getNextRetryBackoffMs();
} else if (pushback < 0) {
this.state = 'TRANSPARENT_ONLY';
callback(false);
return;
} else {
retryDelayMs = pushback;
this.nextRetryBackoffSec = this.initialRetryBackoffSec;
}
setTimeout(() => {
if (this.state !== 'RETRY') {
callback(false);
return;
}
if (this.retryThrottler?.canRetryCall() ?? true) {
callback(true);
this.attempts += 1;
this.startNewAttempt();
}
}, retryDelayMs);
}
private countActiveCalls(): number {
let count = 0;
for (const call of this.underlyingCalls) {
if (call?.state === 'ACTIVE') {
count += 1;
}
}
return count;
}
private handleProcessedStatus(status: StatusObject, callIndex: number, pushback: number | null) {
switch (this.state) {
case 'COMMITTED':
case 'TRANSPARENT_ONLY':
this.commitCall(callIndex);
this.reportStatus(status);
break;
case 'HEDGING':
if (this.isStatusCodeInList(this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ?? [], status.code)) {
this.retryThrottler?.addCallFailed();
let delayMs: number;
if (pushback === null) {
delayMs = 0;
} else if (pushback < 0) {
this.state = 'TRANSPARENT_ONLY';
this.commitCall(callIndex);
this.reportStatus(status);
return;
} else {
delayMs = pushback;
}
setTimeout(() => {
this.maybeStartHedgingAttempt();
// If after trying to start a call there are no active calls, this was the last one
if (this.countActiveCalls() === 0) {
this.commitCall(callIndex);
this.reportStatus(status);
}
}, delayMs);
} else {
this.commitCall(callIndex);
this.reportStatus(status);
}
break;
case 'RETRY':
if (this.isStatusCodeInList(this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes, status.code)) {
this.retryThrottler?.addCallFailed();
this.maybeRetryCall(pushback, (retried) => {
if (!retried) {
this.commitCall(callIndex);
this.reportStatus(status);
}
});
} else {
this.commitCall(callIndex);
this.reportStatus(status);
}
break;
}
}
private getPushback(metadata: Metadata): number | null {
const mdValue = metadata.get('grpc-retry-pushback-ms');
if (mdValue.length === 0) {
return null;
}
try {
return parseInt(mdValue[0] as string);
} catch (e) {
return -1;
}
}
private handleChildStatus(status: StatusObjectWithProgress, callIndex: number) {
if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
return;
}
this.trace('state=' + this.state + ' handling status with progress ' + status.progress + ' from child [' + this.underlyingCalls[callIndex].call.getCallNumber() + '] in state ' + this.underlyingCalls[callIndex].state);
this.underlyingCalls[callIndex].state = 'COMPLETED';
if (status.code === Status.OK) {
this.retryThrottler?.addCallSucceeded();
this.commitCall(callIndex);
this.reportStatus(status);
return;
}
if (this.state === 'COMMITTED') {
this.reportStatus(status);
return;
}
const pushback = this.getPushback(status.metadata);
switch (status.progress) {
case 'NOT_STARTED':
// RPC never leaves the client, always safe to retry
this.startNewAttempt();
break;
case 'REFUSED':
// RPC reaches the server library, but not the server application logic
if (this.transparentRetryUsed) {
this.handleProcessedStatus(status, callIndex, pushback);
} else {
this.transparentRetryUsed = true;
this.startNewAttempt();
};
break;
case 'DROP':
this.commitCall(callIndex);
this.reportStatus(status);
break;
case 'PROCESSED':
this.handleProcessedStatus(status, callIndex, pushback);
break;
}
}
private maybeStartHedgingAttempt() {
if (this.state !== 'HEDGING') {
return;
}
if (!this.callConfig.methodConfig.hedgingPolicy) {
return;
}
const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
return;
}
this.attempts += 1;
this.startNewAttempt();
this.maybeStartHedgingTimer();
}
private maybeStartHedgingTimer() {
if (this.hedgingTimer) {
clearTimeout(this.hedgingTimer);
}
if (this.state !== 'HEDGING') {
return;
}
if (!this.callConfig.methodConfig.hedgingPolicy) {
return;
}
const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
return;
}
const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s';
const hedgingDelaySec = Number(hedgingDelayString.substring(0, hedgingDelayString.length - 1));
this.hedgingTimer = setTimeout(() => {
this.maybeStartHedgingAttempt();
}, hedgingDelaySec * 1000);
this.hedgingTimer.unref?.();
}
private startNewAttempt() {
const child = this.channel.createLoadBalancingCall(this.callConfig, this.methodName, this.host, this.credentials, this.deadline);
this.trace('Created child call [' + child.getCallNumber() + '] for attempt ' + this.attempts);
const index = this.underlyingCalls.length;
this.underlyingCalls.push({state: 'ACTIVE', call: child, nextMessageToSend: 0});
const previousAttempts = this.attempts - 1;
const initialMetadata = this.initialMetadata!.clone();
if (previousAttempts > 0) {
initialMetadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
}
let receivedMetadata = false;
child.start(initialMetadata, {
onReceiveMetadata: metadata => {
this.trace('Received metadata from child [' + child.getCallNumber() + ']');
this.commitCall(index);
receivedMetadata = true;
if (previousAttempts > 0) {
metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
}
if (this.underlyingCalls[index].state === 'ACTIVE') {
this.listener!.onReceiveMetadata(metadata);
}
},
onReceiveMessage: message => {
this.trace('Received message from child [' + child.getCallNumber() + ']');
this.commitCall(index);
if (this.underlyingCalls[index].state === 'ACTIVE') {
this.listener!.onReceiveMessage(message);
}
},
onReceiveStatus: status => {
this.trace('Received status from child [' + child.getCallNumber() + ']');
if (!receivedMetadata && previousAttempts > 0) {
status.metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
}
this.handleChildStatus(status, index);
}
});
this.sendNextChildMessage(index);
if (this.readStarted) {
child.startRead();
}
}
start(metadata: Metadata, listener: InterceptingListener): void {
this.trace('start called');
this.listener = listener;
this.initialMetadata = metadata;
this.attempts += 1;
this.startNewAttempt();
this.maybeStartHedgingTimer();
}
private handleChildWriteCompleted(childIndex: number) {
const childCall = this.underlyingCalls[childIndex];
const messageIndex = childCall.nextMessageToSend;
this.writeBuffer[messageIndex].callback?.();
this.maybefreeMessageBufferEntry(messageIndex);
childCall.nextMessageToSend += 1;
this.sendNextChildMessage(childIndex);
}
private sendNextChildMessage(childIndex: number) {
const childCall = this.underlyingCalls[childIndex];
if (childCall.state === 'COMPLETED') {
return;
}
if (this.writeBuffer[childCall.nextMessageToSend]) {
const bufferEntry = this.writeBuffer[childCall.nextMessageToSend];
switch (bufferEntry.entryType) {
case 'MESSAGE':
childCall.call.sendMessageWithContext({
callback: (error) => {
// Ignore error
this.handleChildWriteCompleted(childIndex);
}
}, bufferEntry.message!.message);
break;
case 'HALF_CLOSE':
childCall.nextMessageToSend += 1;
childCall.call.halfClose();
break;
case 'FREED':
// Should not be possible
break;
}
}
}
sendMessageWithContext(context: MessageContext, message: Buffer): void {
this.trace('write() called with message of length ' + message.length);
const writeObj: WriteObject = {
message,
flags: context.flags,
};
const messageIndex = this.writeBuffer.length;
const bufferEntry: WriteBufferEntry = {
entryType: 'MESSAGE',
message: writeObj,
allocated: this.bufferTracker.allocate(message.length, this.callNumber)
};
this.writeBuffer[messageIndex] = bufferEntry;
if (bufferEntry.allocated) {
context.callback?.();
for (const [callIndex, call] of this.underlyingCalls.entries()) {
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
call.call.sendMessageWithContext({
callback: (error) => {
// Ignore error
this.handleChildWriteCompleted(callIndex);
}
}, message);
}
}
} else {
this.commitCallWithMostMessages();
const call = this.underlyingCalls[this.committedCallIndex!];
bufferEntry.callback = context.callback;
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
call.call.sendMessageWithContext({
callback: (error) => {
// Ignore error
this.handleChildWriteCompleted(this.committedCallIndex!);
}
}, message);
}
}
}
startRead(): void {
this.trace('startRead called');
this.readStarted = true;
for (const underlyingCall of this.underlyingCalls) {
if (underlyingCall?.state === 'ACTIVE') {
underlyingCall.call.startRead();
}
}
}
halfClose(): void {
this.trace('halfClose called');
const halfCloseIndex = this.writeBuffer.length;
this.writeBuffer[halfCloseIndex] = {
entryType: 'HALF_CLOSE',
allocated: false
};
for (const call of this.underlyingCalls) {
if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
call.nextMessageToSend += 1;
call.call.halfClose();
}
}
}
setCredentials(newCredentials: CallCredentials): void {
throw new Error("Method not implemented.");
}
getMethod(): string {
return this.methodName;
}
getHost(): string {
return this.host;
}
}

View File

@ -19,6 +19,7 @@ import { EventEmitter } from 'events';
import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream';
import * as zlib from 'zlib';
import { promisify } from 'util';
import {
Status,
@ -32,10 +33,13 @@ import { StreamDecoder } from './stream-decoder';
import { ObjectReadable, ObjectWritable } from './object-stream';
import { ChannelOptions } from './channel-options';
import * as logging from './logging';
import { StatusObject } from './call-interface';
import { StatusObject, PartialStatusObject } from './call-interface';
import { Deadline } from './deadline';
import { getErrorCode, getErrorMessage } from './error';
const TRACER_NAME = 'server_call';
const unzip = promisify(zlib.unzip);
const inflate = promisify(zlib.inflate);
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
@ -87,25 +91,22 @@ export type ServerSurfaceCall = {
export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
request: RequestType;
};
export type ServerReadableStream<
RequestType,
ResponseType
> = ServerSurfaceCall & ObjectReadable<RequestType>;
export type ServerWritableStream<
RequestType,
ResponseType
> = ServerSurfaceCall &
ObjectWritable<ResponseType> & {
request: RequestType;
end: (metadata?: Metadata) => void;
};
export type ServerReadableStream<RequestType, ResponseType> =
ServerSurfaceCall & ObjectReadable<RequestType>;
export type ServerWritableStream<RequestType, ResponseType> =
ServerSurfaceCall &
ObjectWritable<ResponseType> & {
request: RequestType;
end: (metadata?: Metadata) => void;
};
export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
ObjectReadable<RequestType> &
ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };
export class ServerUnaryCallImpl<RequestType, ResponseType>
extends EventEmitter
implements ServerUnaryCall<RequestType, ResponseType> {
implements ServerUnaryCall<RequestType, ResponseType>
{
cancelled: boolean;
constructor(
@ -137,7 +138,8 @@ export class ServerUnaryCallImpl<RequestType, ResponseType>
export class ServerReadableStreamImpl<RequestType, ResponseType>
extends Readable
implements ServerReadableStream<RequestType, ResponseType> {
implements ServerReadableStream<RequestType, ResponseType>
{
cancelled: boolean;
constructor(
@ -179,7 +181,8 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
export class ServerWritableStreamImpl<RequestType, ResponseType>
extends Writable
implements ServerWritableStream<RequestType, ResponseType> {
implements ServerWritableStream<RequestType, ResponseType>
{
cancelled: boolean;
private trailingMetadata: Metadata;
@ -230,8 +233,10 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
return;
}
} catch (err) {
err.code = Status.INTERNAL;
this.emit('error', err);
this.emit('error', {
details: getErrorMessage(err),
code: Status.INTERNAL
});
}
callback();
@ -258,7 +263,8 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
export class ServerDuplexStreamImpl<RequestType, ResponseType>
extends Duplex
implements ServerDuplexStream<RequestType, ResponseType> {
implements ServerDuplexStream<RequestType, ResponseType>
{
cancelled: boolean;
private trailingMetadata: Metadata;
@ -396,7 +402,8 @@ export class Http2ServerCallStream<
ResponseType
> extends EventEmitter {
cancelled = false;
deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
deadlineTimer: NodeJS.Timer | null = null;
private statusSent = false;
private deadline: Deadline = Infinity;
private wantTrailers = false;
private metadataSent = false;
@ -429,10 +436,17 @@ export class Http2ServerCallStream<
' stream closed with rstCode ' +
this.stream.rstCode
);
this.cancelled = true;
this.emit('cancelled', 'cancelled');
this.emit('streamEnd', false);
this.sendStatus({code: Status.CANCELLED, details: 'Cancelled by client', metadata: new Metadata()});
if (!this.statusSent) {
this.cancelled = true;
this.emit('cancelled', 'cancelled');
this.emit('streamEnd', false);
this.sendStatus({
code: Status.CANCELLED,
details: 'Cancelled by client',
metadata: null,
});
}
});
this.stream.on('drain', () => {
@ -445,9 +459,6 @@ export class Http2ServerCallStream<
if ('grpc.max_receive_message_length' in options) {
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
}
// Clear noop timer
clearTimeout(this.deadlineTimer);
}
private checkCancelled(): boolean {
@ -459,52 +470,22 @@ export class Http2ServerCallStream<
return this.cancelled;
}
private getDecompressedMessage(message: Buffer, encoding: string) {
switch (encoding) {
case 'deflate': {
return new Promise<Buffer | undefined>((resolve, reject) => {
zlib.inflate(message.slice(5), (err, output) => {
if (err) {
this.sendError({
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
});
resolve();
} else {
resolve(output);
}
});
});
}
case 'gzip': {
return new Promise<Buffer | undefined>((resolve, reject) => {
zlib.unzip(message.slice(5), (err, output) => {
if (err) {
this.sendError({
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
});
resolve();
} else {
resolve(output);
}
});
});
}
case 'identity': {
return Promise.resolve(message.slice(5));
}
default: {
this.sendError({
code: Status.UNIMPLEMENTED,
details: `Received message compressed with unsupported encoding "${encoding}"`,
});
return Promise.resolve();
}
private getDecompressedMessage(
message: Buffer,
encoding: string
): Buffer | Promise<Buffer> {
if (encoding === 'deflate') {
return inflate(message.subarray(5));
} else if (encoding === 'gzip') {
return unzip(message.subarray(5));
} else if (encoding === 'identity') {
return message.subarray(5);
}
return Promise.reject({
code: Status.UNIMPLEMENTED,
details: `Received message compressed with unsupported encoding "${encoding}"`,
});
}
sendMetadata(customMetadata?: Metadata) {
@ -519,14 +500,21 @@ export class Http2ServerCallStream<
this.metadataSent = true;
const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
// TODO(cjihrig): Include compression headers.
const headers = Object.assign({}, defaultResponseHeaders, custom);
const headers = { ...defaultResponseHeaders, ...custom };
this.stream.respond(headers, defaultResponseOptions);
}
receiveMetadata(headers: http2.IncomingHttpHeaders) {
const metadata = Metadata.fromHttp2Headers(headers);
trace('Request to ' + this.handler.path + ' received headers ' + JSON.stringify(metadata.toJSON()));
if (logging.isTracerEnabled(TRACER_NAME)) {
trace(
'Request to ' +
this.handler.path +
' received headers ' +
JSON.stringify(metadata.toJSON())
);
}
// TODO(cjihrig): Receive compression metadata.
@ -559,52 +547,97 @@ export class Http2ServerCallStream<
return metadata;
}
receiveUnaryMessage(encoding: string): Promise<RequestType> {
return new Promise((resolve, reject) => {
const stream = this.stream;
const chunks: Buffer[] = [];
let totalLength = 0;
receiveUnaryMessage(
encoding: string,
next: (
err: Partial<ServerStatusResponse> | null,
request?: RequestType
) => void
): void {
const { stream } = this;
stream.on('data', (data: Buffer) => {
chunks.push(data);
totalLength += data.byteLength;
let receivedLength = 0;
const call = this;
const body: Buffer[] = [];
const limit = this.maxReceiveMessageSize;
stream.on('data', onData);
stream.on('end', onEnd);
stream.on('error', onEnd);
function onData(chunk: Buffer) {
receivedLength += chunk.byteLength;
if (limit !== -1 && receivedLength > limit) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
next({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${receivedLength} vs. ${limit})`,
});
return;
}
body.push(chunk);
}
function onEnd(err?: Error) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
if (err !== undefined) {
next({ code: Status.INTERNAL, details: err.message });
return;
}
if (receivedLength === 0) {
next({ code: Status.INTERNAL, details: 'received empty unary message' })
return;
}
call.emit('receiveMessage');
const requestBytes = Buffer.concat(body, receivedLength);
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = call.getDecompressedMessage(
requestBytes,
compressedMessageEncoding
);
if (Buffer.isBuffer(decompressedMessage)) {
call.safeDeserializeMessage(decompressedMessage, next);
return;
}
decompressedMessage.then(
(decompressed) => call.safeDeserializeMessage(decompressed, next),
(err: any) => next(
err.code
? err
: {
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
}
)
)
}
}
private safeDeserializeMessage(
buffer: Buffer,
next: (err: Partial<ServerStatusResponse> | null, request?: RequestType) => void
) {
try {
next(null, this.deserializeMessage(buffer));
} catch (err) {
next({
details: getErrorMessage(err),
code: Status.INTERNAL
});
stream.once('end', async () => {
try {
const requestBytes = Buffer.concat(chunks, totalLength);
if (
this.maxReceiveMessageSize !== -1 &&
requestBytes.length > this.maxReceiveMessageSize
) {
this.sendError({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${requestBytes.length} vs. ${this.maxReceiveMessageSize})`,
});
resolve();
}
this.emit('receiveMessage');
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding);
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage) {
resolve();
}
else {
resolve(this.deserializeMessage(decompressedMessage));
}
} catch (err) {
err.code = Status.INTERNAL;
this.sendError(err);
resolve();
}
});
});
}
}
serializeMessage(value: ResponseType) {
@ -626,18 +659,19 @@ export class Http2ServerCallStream<
async sendUnaryMessage(
err: ServerErrorResponse | ServerStatusResponse | null,
value?: ResponseType | null,
metadata?: Metadata,
metadata?: Metadata | null,
flags?: number
) {
if (this.checkCancelled()) {
return;
}
if (!metadata) {
metadata = new Metadata();
if (metadata === undefined) {
metadata = null;
}
if (err) {
if (!Object.prototype.hasOwnProperty.call(err, 'metadata')) {
if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) {
err.metadata = metadata;
}
this.sendError(err);
@ -650,12 +684,14 @@ export class Http2ServerCallStream<
this.write(response);
this.sendStatus({ code: Status.OK, details: 'OK', metadata });
} catch (err) {
err.code = Status.INTERNAL;
this.sendError(err);
this.sendError({
details: getErrorMessage(err),
code: Status.INTERNAL
});
}
}
sendStatus(statusObj: StatusObject) {
sendStatus(statusObj: PartialStatusObject) {
this.emit('callEnd', statusObj.code);
this.emit('streamEnd', statusObj.code === Status.OK);
if (this.checkCancelled()) {
@ -671,34 +707,42 @@ export class Http2ServerCallStream<
statusObj.details
);
clearTimeout(this.deadlineTimer);
if (this.deadlineTimer) clearTimeout(this.deadlineTimer);
if (!this.wantTrailers) {
this.wantTrailers = true;
this.stream.once('wantTrailers', () => {
const trailersToSend = Object.assign(
{
if (this.stream.headersSent) {
if (!this.wantTrailers) {
this.wantTrailers = true;
this.stream.once('wantTrailers', () => {
const trailersToSend = {
[GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string),
},
statusObj.metadata.toHttp2Headers()
);
this.stream.sendTrailers(trailersToSend);
});
this.sendMetadata();
this.stream.end();
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
...statusObj.metadata?.toHttp2Headers(),
};
this.stream.sendTrailers(trailersToSend);
this.statusSent = true;
});
this.stream.end();
}
} else {
const trailersToSend = {
[GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
...statusObj.metadata?.toHttp2Headers(),
};
this.stream.respond(trailersToSend, {endStream: true});
this.statusSent = true;
}
}
sendError(error: ServerErrorResponse | ServerStatusResponse) {
const status: StatusObject = {
const status: PartialStatusObject = {
code: Status.UNKNOWN,
details: 'message' in error ? error.message : 'Unknown Error',
metadata:
'metadata' in error && error.metadata !== undefined
? error.metadata
: new Metadata(),
: null,
};
if (
@ -769,7 +813,7 @@ export class Http2ServerCallStream<
pushedEnd = true;
this.pushOrBufferMessage(readable, null);
}
}
};
this.stream.on('data', async (data: Buffer) => {
const messages = decoder.write(data);
@ -791,12 +835,15 @@ export class Http2ServerCallStream<
const compressed = message.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);
const decompressedMessage = await this.getDecompressedMessage(
message,
compressedMessageEncoding
);
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage) return;
this.pushOrBufferMessage(readable, decompressedMessage);
}
pendingMessageProcessing = false;
@ -878,21 +925,15 @@ export class Http2ServerCallStream<
} catch (error) {
// Ignore any remaining messages when errors occur.
this.bufferedMessages.length = 0;
if (
!(
'code' in error &&
typeof error.code === 'number' &&
Number.isInteger(error.code) &&
error.code >= Status.OK &&
error.code <= Status.UNAUTHENTICATED
)
) {
// The error code is not a valid gRPC code so its being overwritten.
error.code = Status.INTERNAL;
let code = getErrorCode(error);
if (code === null || code < Status.OK || code > Status.UNAUTHENTICATED) {
code = Status.INTERNAL
}
readable.emit('error', error);
readable.emit('error', {
details: getErrorMessage(error),
code: code
});
}
this.isPushPending = false;

View File

@ -61,6 +61,15 @@ import {
import { parseUri } from './uri-parser';
import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz';
import { CipherNameAndProtocol, TLSSocket } from 'tls';
import { getErrorCode, getErrorMessage } from './error';
const UNLIMITED_CONNECTION_AGE_MS = ~(1<<31);
const KEEPALIVE_MAX_TIME_MS = ~(1<<31);
const KEEPALIVE_TIMEOUT_MS = 20000;
const {
HTTP2_HEADER_PATH
} = http2.constants
const TRACER_NAME = 'server';
@ -77,7 +86,6 @@ function getUnimplementedStatusResponse(
return {
code: Status.UNIMPLEMENTED,
details: `The server does not implement the method ${methodName}`,
metadata: new Metadata(),
};
}
@ -147,6 +155,7 @@ export class Server {
private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
private started = false;
private options: ChannelOptions;
private serverAddressString: string = 'null'
// Channelz Info
private readonly channelzEnabled: boolean = true;
@ -156,6 +165,12 @@ export class Server {
private listenerChildrenTracker = new ChannelzChildrenTracker();
private sessionChildrenTracker = new ChannelzChildrenTracker();
private readonly maxConnectionAgeMs: number;
private readonly maxConnectionAgeGraceMs: number;
private readonly keepaliveTimeMs: number;
private readonly keepaliveTimeoutMs: number;
constructor(options?: ChannelOptions) {
this.options = options ?? {};
if (this.options['grpc.enable_channelz'] === 0) {
@ -165,6 +180,10 @@ export class Server {
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Server created');
}
this.maxConnectionAgeMs = this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
this.maxConnectionAgeGraceMs = this.options['grpc.max_connection_age_grace_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
this.keepaliveTimeMs = this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS;
this.keepaliveTimeoutMs = this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
this.trace('Server constructed');
}
@ -730,6 +749,192 @@ export class Server {
return this.channelzRef;
}
private _verifyContentType(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders): boolean {
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
if (
typeof contentType !== 'string' ||
!contentType.startsWith('application/grpc')
) {
stream.respond(
{
[http2.constants.HTTP2_HEADER_STATUS]:
http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
},
{ endStream: true }
);
return false
}
return true
}
private _retrieveHandler(headers: http2.IncomingHttpHeaders): Handler<any, any> {
const path = headers[HTTP2_HEADER_PATH] as string
this.trace(
'Received call to method ' +
path +
' at address ' +
this.serverAddressString
);
const handler = this.handlers.get(path);
if (handler === undefined) {
this.trace(
'No handler registered for method ' +
path +
'. Sending UNIMPLEMENTED status.'
);
throw getUnimplementedStatusResponse(path);
}
return handler
}
private _respondWithError<T extends Partial<ServiceError>>(
err: T,
stream: http2.ServerHttp2Stream,
channelzSessionInfo: ChannelzSessionInfo | null = null
) {
const call = new Http2ServerCallStream(stream, null!, this.options);
if (err.code === undefined) {
err.code = Status.INTERNAL;
}
if (this.channelzEnabled) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed()
}
call.sendError(err);
}
private _channelzHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) {
const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session);
this.callTracker.addCallStarted();
channelzSessionInfo?.streamTracker.addCallStarted();
if (!this._verifyContentType(stream, headers)) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed();
return
}
let handler: Handler<any, any>
try {
handler = this._retrieveHandler(headers)
} catch (err) {
this._respondWithError({
details: getErrorMessage(err),
code: getErrorCode(err) ?? undefined
}, stream, channelzSessionInfo)
return
}
const call = new Http2ServerCallStream(stream, handler, this.options);
call.once('callEnd', (code: Status) => {
if (code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
if (channelzSessionInfo) {
call.once('streamEnd', (success: boolean) => {
if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded();
} else {
channelzSessionInfo.streamTracker.addCallFailed();
}
});
call.on('sendMessage', () => {
channelzSessionInfo.messagesSent += 1;
channelzSessionInfo.lastMessageSentTimestamp = new Date();
});
call.on('receiveMessage', () => {
channelzSessionInfo.messagesReceived += 1;
channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
});
}
if (!this._runHandlerForCall(call, handler, headers)) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed()
call.sendError({
code: Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`
});
}
}
private _streamHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) {
if (this._verifyContentType(stream, headers) !== true) {
return
}
let handler: Handler<any, any>
try {
handler = this._retrieveHandler(headers)
} catch (err) {
this._respondWithError({
details: getErrorMessage(err),
code: getErrorCode(err) ?? undefined
}, stream, null)
return
}
const call = new Http2ServerCallStream(stream, handler, this.options)
if (!this._runHandlerForCall(call, handler, headers)) {
call.sendError({
code: Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`
});
}
}
private _runHandlerForCall(call: Http2ServerCallStream<any, any>, handler: Handler<any, any>, headers: http2.IncomingHttpHeaders): boolean {
const metadata = call.receiveMetadata(headers);
const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
metadata.remove('grpc-encoding');
const { type } = handler
if (type === 'unary') {
handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
} else if (type === 'clientStream') {
handleClientStreaming(
call,
handler as UntypedClientStreamingHandler,
metadata,
encoding
);
} else if (type === 'serverStream') {
handleServerStreaming(
call,
handler as UntypedServerStreamingHandler,
metadata,
encoding
);
} else if (type === 'bidi') {
handleBidiStreaming(
call,
handler as UntypedBidiStreamingHandler,
metadata,
encoding
);
} else {
return false
}
return true
}
private _setupHandlers(
http2Server: http2.Http2Server | http2.Http2SecureServer
): void {
@ -737,143 +942,23 @@ export class Server {
return;
}
http2Server.on(
'stream',
(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => {
const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session);
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
channelzSessionInfo?.streamTracker.addCallStarted();
}
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
if (
typeof contentType !== 'string' ||
!contentType.startsWith('application/grpc')
) {
stream.respond(
{
[http2.constants.HTTP2_HEADER_STATUS]:
http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
},
{ endStream: true }
);
this.callTracker.addCallFailed();
if (this.channelzEnabled) {
channelzSessionInfo?.streamTracker.addCallFailed();
}
return;
}
let call: Http2ServerCallStream<any, any> | null = null;
try {
const path = headers[http2.constants.HTTP2_HEADER_PATH] as string;
const serverAddress = http2Server.address();
let serverAddressString = 'null';
if (serverAddress) {
if (typeof serverAddress === 'string') {
serverAddressString = serverAddress;
} else {
serverAddressString =
serverAddress.address + ':' + serverAddress.port;
}
}
this.trace(
'Received call to method ' +
path +
' at address ' +
serverAddressString
);
const handler = this.handlers.get(path);
if (handler === undefined) {
this.trace(
'No handler registered for method ' +
path +
'. Sending UNIMPLEMENTED status.'
);
throw getUnimplementedStatusResponse(path);
}
call = new Http2ServerCallStream(stream, handler, this.options);
call.once('callEnd', (code: Status) => {
if (code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
if (this.channelzEnabled && channelzSessionInfo) {
call.once('streamEnd', (success: boolean) => {
if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded();
} else {
channelzSessionInfo.streamTracker.addCallFailed();
}
});
call.on('sendMessage', () => {
channelzSessionInfo.messagesSent += 1;
channelzSessionInfo.lastMessageSentTimestamp = new Date();
});
call.on('receiveMessage', () => {
channelzSessionInfo.messagesReceived += 1;
channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
});
}
const metadata = call.receiveMetadata(headers);
const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
metadata.remove('grpc-encoding');
switch (handler.type) {
case 'unary':
handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
break;
case 'clientStream':
handleClientStreaming(
call,
handler as UntypedClientStreamingHandler,
metadata,
encoding
);
break;
case 'serverStream':
handleServerStreaming(
call,
handler as UntypedServerStreamingHandler,
metadata,
encoding
);
break;
case 'bidi':
handleBidiStreaming(
call,
handler as UntypedBidiStreamingHandler,
metadata,
encoding
);
break;
default:
throw new Error(`Unknown handler type: ${handler.type}`);
}
} catch (err) {
if (!call) {
call = new Http2ServerCallStream(stream, null!, this.options);
if (this.channelzEnabled) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed()
}
}
if (err.code === undefined) {
err.code = Status.INTERNAL;
}
call.sendError(err);
}
const serverAddress = http2Server.address();
let serverAddressString = 'null'
if (serverAddress) {
if (typeof serverAddress === 'string') {
serverAddressString = serverAddress
} else {
serverAddressString =
serverAddress.address + ':' + serverAddress.port
}
);
}
this.serverAddressString = serverAddressString
const handler = this.channelzEnabled
? this._channelzHandler
: this._streamHandler
http2Server.on('stream', handler.bind(this))
http2Server.on('session', (session) => {
if (!this.started) {
session.destroy();
@ -898,47 +983,109 @@ export class Server {
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
}
let connectionAgeTimer: NodeJS.Timer | null = null;
let connectionAgeGraceTimer: NodeJS.Timer | null = null;
let sessionClosedByServer = false;
if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
// Apply a random jitter within a +/-10% range
const jitterMagnitude = this.maxConnectionAgeMs / 10;
const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
connectionAgeTimer = setTimeout(() => {
sessionClosedByServer = true;
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by max connection age from ' + clientAddress);
}
try {
session.goaway(http2.constants.NGHTTP2_NO_ERROR, ~(1<<31), Buffer.from('max_age'));
} catch (e) {
// The goaway can't be sent because the session is already closed
session.destroy();
return;
}
session.close();
/* Allow a grace period after sending the GOAWAY before forcibly
* closing the connection. */
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs).unref?.();
}
}, this.maxConnectionAgeMs + jitter).unref?.();
}
const keeapliveTimeTimer: NodeJS.Timer | null = setInterval(() => {
const timeoutTImer = setTimeout(() => {
sessionClosedByServer = true;
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by keepalive timeout from ' + clientAddress);
}
session.close();
}, this.keepaliveTimeoutMs).unref?.();
try {
session.ping((err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTImer);
});
} catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs).unref?.();
session.on('close', () => {
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
if (!sessionClosedByServer) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
}
this.sessionChildrenTracker.unrefChild(channelzRef);
unregisterChannelzRef(channelzRef);
}
if (connectionAgeTimer) {
clearTimeout(connectionAgeTimer);
}
if (connectionAgeGraceTimer) {
clearTimeout(connectionAgeGraceTimer);
}
if (keeapliveTimeTimer) {
clearTimeout(keeapliveTimeTimer);
}
this.sessions.delete(session);
});
});
}
}
async function handleUnary<RequestType, ResponseType>(
function handleUnary<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: UnaryHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
): Promise<void> {
const request = await call.receiveUnaryMessage(encoding);
if (request === undefined || call.cancelled) {
return;
}
const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
call,
metadata,
request
);
handler.func(
emitter,
(
err: ServerErrorResponse | ServerStatusResponse | null,
value?: ResponseType | null,
trailer?: Metadata,
flags?: number
) => {
call.sendUnaryMessage(err, value, trailer, flags);
): void {
call.receiveUnaryMessage(encoding, (err, request) => {
if (err) {
call.sendError(err)
return
}
);
if (request === undefined || call.cancelled) {
return;
}
const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
call,
metadata,
request
);
handler.func(
emitter,
(
err: ServerErrorResponse | ServerStatusResponse | null,
value?: ResponseType | null,
trailer?: Metadata,
flags?: number
) => {
call.sendUnaryMessage(err, value, trailer, flags);
}
);
});
}
function handleClientStreaming<RequestType, ResponseType>(
@ -972,26 +1119,31 @@ function handleClientStreaming<RequestType, ResponseType>(
handler.func(stream, respond);
}
async function handleServerStreaming<RequestType, ResponseType>(
function handleServerStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: ServerStreamingHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
): Promise<void> {
const request = await call.receiveUnaryMessage(encoding);
): void {
call.receiveUnaryMessage(encoding, (err, request) => {
if (err) {
call.sendError(err)
return
}
if (request === undefined || call.cancelled) {
return;
}
if (request === undefined || call.cancelled) {
return;
}
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
call,
metadata,
handler.serialize,
request
);
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
call,
metadata,
handler.serialize,
request
);
handler.func(stream);
handler.func(stream);
});
}
function handleBidiStreaming<RequestType, ResponseType>(

View File

@ -50,7 +50,7 @@ export interface RetryPolicy {
export interface HedgingPolicy {
maxAttempts: number;
hedgingDelay?: string;
nonFatalStatusCodes: (Status | string)[];
nonFatalStatusCodes?: (Status | string)[];
}
export interface MethodConfig {
@ -86,7 +86,7 @@ export interface ServiceConfigCanaryConfig {
* Recognizes a number with up to 9 digits after the decimal point, followed by
* an "s", representing a number of seconds.
*/
const TIMEOUT_REGEX = /^\d+(\.\d{1,9})?s$/;
const DURATION_REGEX = /^\d+(\.\d{1,9})?s$/;
/**
* Client language name used for determining whether this client matches a
@ -111,6 +111,81 @@ function validateName(obj: any): MethodConfigName {
return result;
}
function validateRetryPolicy(obj: any): RetryPolicy {
if (!('maxAttempts' in obj) || !Number.isInteger(obj.maxAttempts) || obj.maxAttempts < 2) {
throw new Error('Invalid method config retry policy: maxAttempts must be an integer at least 2');
}
if (!('initialBackoff' in obj) || typeof obj.initialBackoff !== 'string' || !DURATION_REGEX.test(obj.initialBackoff)) {
throw new Error('Invalid method config retry policy: initialBackoff must be a string consisting of a positive integer followed by s');
}
if (!('maxBackoff' in obj) || typeof obj.maxBackoff !== 'string' || !DURATION_REGEX.test(obj.maxBackoff)) {
throw new Error('Invalid method config retry policy: maxBackoff must be a string consisting of a positive integer followed by s');
}
if (!('backoffMultiplier' in obj) || typeof obj.backoffMultiplier !== 'number' || obj.backoffMultiplier <= 0) {
throw new Error('Invalid method config retry policy: backoffMultiplier must be a number greater than 0');
}
if (!(('retryableStatusCodes' in obj) && Array.isArray(obj.retryableStatusCodes))) {
throw new Error('Invalid method config retry policy: retryableStatusCodes is required');
}
if (obj.retryableStatusCodes.length === 0) {
throw new Error('Invalid method config retry policy: retryableStatusCodes must be non-empty');
}
for (const value of obj.retryableStatusCodes) {
if (typeof value === 'number') {
if (!Object.values(Status).includes(value)) {
throw new Error('Invalid method config retry policy: retryableStatusCodes value not in status code range');
}
} else if (typeof value === 'string') {
if (!Object.values(Status).includes(value.toUpperCase())) {
throw new Error('Invalid method config retry policy: retryableStatusCodes value not a status code name');
}
} else {
throw new Error('Invalid method config retry policy: retryableStatusCodes value must be a string or number');
}
}
return {
maxAttempts: obj.maxAttempts,
initialBackoff: obj.initialBackoff,
maxBackoff: obj.maxBackoff,
backoffMultiplier: obj.backoffMultiplier,
retryableStatusCodes: obj.retryableStatusCodes
};
}
function validateHedgingPolicy(obj: any): HedgingPolicy {
if (!('maxAttempts' in obj) || !Number.isInteger(obj.maxAttempts) || obj.maxAttempts < 2) {
throw new Error('Invalid method config hedging policy: maxAttempts must be an integer at least 2');
}
if (('hedgingDelay' in obj) && (typeof obj.hedgingDelay !== 'string' || !DURATION_REGEX.test(obj.hedgingDelay))) {
throw new Error('Invalid method config hedging policy: hedgingDelay must be a string consisting of a positive integer followed by s');
}
if (('nonFatalStatusCodes' in obj) && Array.isArray(obj.nonFatalStatusCodes)) {
for (const value of obj.nonFatalStatusCodes) {
if (typeof value === 'number') {
if (!Object.values(Status).includes(value)) {
throw new Error('Invlid method config hedging policy: nonFatalStatusCodes value not in status code range');
}
} else if (typeof value === 'string') {
if (!Object.values(Status).includes(value.toUpperCase())) {
throw new Error('Invlid method config hedging policy: nonFatalStatusCodes value not a status code name');
}
} else {
throw new Error('Invlid method config hedging policy: nonFatalStatusCodes value must be a string or number');
}
}
}
const result: HedgingPolicy = {
maxAttempts: obj.maxAttempts
}
if (obj.hedgingDelay) {
result.hedgingDelay = obj.hedgingDelay;
}
if (obj.nonFatalStatusCodes) {
result.nonFatalStatusCodes = obj.nonFatalStatusCodes;
}
return result;
}
function validateMethodConfig(obj: any): MethodConfig {
const result: MethodConfig = {
name: [],
@ -144,7 +219,7 @@ function validateMethodConfig(obj: any): MethodConfig {
result.timeout = obj.timeout;
} else if (
typeof obj.timeout === 'string' &&
TIMEOUT_REGEX.test(obj.timeout)
DURATION_REGEX.test(obj.timeout)
) {
const timeoutParts = obj.timeout
.substring(0, obj.timeout.length - 1)
@ -169,9 +244,31 @@ function validateMethodConfig(obj: any): MethodConfig {
}
result.maxResponseBytes = obj.maxResponseBytes;
}
if ('retryPolicy' in obj) {
if ('hedgingPolicy' in obj) {
throw new Error('Invalid method config: retryPolicy and hedgingPolicy cannot both be specified');
} else {
result.retryPolicy = validateRetryPolicy(obj.retryPolicy);
}
} else if ('hedgingPolicy' in obj) {
result.hedgingPolicy = validateHedgingPolicy(obj.hedgingPolicy);
}
return result;
}
export function validateRetryThrottling(obj: any): RetryThrottling {
if (!('maxTokens' in obj) || typeof obj.maxTokens !== 'number' || obj.maxTokens <=0 || obj.maxTokens > 1000) {
throw new Error('Invalid retryThrottling: maxTokens must be a number in (0, 1000]');
}
if (!('tokenRatio' in obj) || typeof obj.tokenRatio !== 'number' || obj.tokenRatio <= 0) {
throw new Error('Invalid retryThrottling: tokenRatio must be a number greater than 0');
}
return {
maxTokens: +(obj.maxTokens as number).toFixed(3),
tokenRatio: +(obj.tokenRatio as number).toFixed(3)
};
}
export function validateServiceConfig(obj: any): ServiceConfig {
const result: ServiceConfig = {
loadBalancingConfig: [],
@ -200,6 +297,9 @@ export function validateServiceConfig(obj: any): ServiceConfig {
}
}
}
if ('retryThrottling' in obj) {
result.retryThrottling = validateRetryThrottling(obj.retryThrottling);
}
// Validate method name uniqueness
const seenMethodNames: MethodConfigName[] = [];
for (const methodConfig of result.methodConfig) {

View File

@ -75,6 +75,14 @@ export interface SubchannelCall {
getCallNumber(): number;
}
export interface StatusObjectWithRstCode extends StatusObject {
rstCode?: number;
}
export interface SubchannelCallInterceptingListener extends InterceptingListener {
onReceiveStatus(status: StatusObjectWithRstCode): void;
}
export class Http2SubchannelCall implements SubchannelCall {
private decoder = new StreamDecoder();
@ -103,7 +111,7 @@ export class Http2SubchannelCall implements SubchannelCall {
constructor(
private readonly http2Stream: http2.ClientHttp2Stream,
private readonly callStatsTracker: SubchannelCallStatsTracker,
private readonly listener: InterceptingListener,
private readonly listener: SubchannelCallInterceptingListener,
private readonly subchannel: Subchannel,
private readonly callId: number
) {
@ -257,7 +265,7 @@ export class Http2SubchannelCall implements SubchannelCall {
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({ code, details, metadata: new Metadata() });
this.endCall({ code, details, metadata: new Metadata(), rstCode: http2Stream.rstCode });
});
});
http2Stream.on('error', (err: SystemError) => {
@ -329,7 +337,7 @@ export class Http2SubchannelCall implements SubchannelCall {
* Subsequent calls are no-ops.
* @param status The status of the call.
*/
private endCall(status: StatusObject): void {
private endCall(status: StatusObjectWithRstCode): void {
/* If the status is OK and a new status comes in (e.g. from a
* deserialization failure), that new status takes priority */
if (this.finalStatus === null || this.finalStatus.code === Status.OK) {

View File

@ -36,7 +36,7 @@ import {
} from './subchannel-address';
import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz';
import { ConnectivityStateListener } from './subchannel-interface';
import { Http2SubchannelCall } from './subchannel-call';
import { Http2SubchannelCall, SubchannelCallInterceptingListener } from './subchannel-call';
import { getNextCallNumber } from './call-number';
import { SubchannelCall } from './subchannel-call';
import { InterceptingListener, StatusObject } from './call-interface';
@ -815,7 +815,7 @@ export class Subchannel {
return false;
}
createCall(metadata: Metadata, host: string, method: string, listener: InterceptingListener): SubchannelCall {
createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener): SubchannelCall {
const headers = metadata.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = host;
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;

View File

@ -0,0 +1,292 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import assert = require("assert");
import { validateServiceConfig } from "../src/service-config";
function createRetryServiceConfig(retryConfig: object): object {
return {
loadBalancingConfig: [],
methodConfig: [
{
name: [{
service: 'A',
method: 'B'
}],
retryPolicy: retryConfig
}
]
};
}
function createHedgingServiceConfig(hedgingConfig: object): object {
return {
loadBalancingConfig: [],
methodConfig: [
{
name: [{
service: 'A',
method: 'B'
}],
hedgingPolicy: hedgingConfig
}
]
};
}
function createThrottlingServiceConfig(retryThrottling: object): object {
return {
loadBalancingConfig: [],
methodConfig: [],
retryThrottling: retryThrottling
};
}
interface TestCase {
description: string;
config: object;
error: RegExp;
}
const validRetryConfig = {
maxAttempts: 2,
initialBackoff: '1s',
maxBackoff: '1s',
backoffMultiplier: 1,
retryableStatusCodes: [14, 'RESOURCE_EXHAUSTED']
};
const RETRY_TEST_CASES: TestCase[] = [
{
description: 'omitted maxAttempts',
config: {
initialBackoff: '1s',
maxBackoff: '1s',
backoffMultiplier: 1,
retryableStatusCodes: [14]
},
error: /retry policy: maxAttempts must be an integer at least 2/
},
{
description: 'a low maxAttempts',
config: {...validRetryConfig, maxAttempts: 1},
error: /retry policy: maxAttempts must be an integer at least 2/
},
{
description: 'omitted initialBackoff',
config: {
maxAttempts: 2,
maxBackoff: '1s',
backoffMultiplier: 1,
retryableStatusCodes: [14]
},
error: /retry policy: initialBackoff must be a string consisting of a positive integer followed by s/
},
{
description: 'a non-numeric initialBackoff',
config: {...validRetryConfig, initialBackoff: 'abcs'},
error: /retry policy: initialBackoff must be a string consisting of a positive integer followed by s/
},
{
description: 'an initialBackoff without an s',
config: {...validRetryConfig, initialBackoff: '123'},
error: /retry policy: initialBackoff must be a string consisting of a positive integer followed by s/
},
{
description: 'omitted maxBackoff',
config: {
maxAttempts: 2,
initialBackoff: '1s',
backoffMultiplier: 1,
retryableStatusCodes: [14]
},
error: /retry policy: maxBackoff must be a string consisting of a positive integer followed by s/
},
{
description: 'a non-numeric maxBackoff',
config: {...validRetryConfig, maxBackoff: 'abcs'},
error: /retry policy: maxBackoff must be a string consisting of a positive integer followed by s/
},
{
description: 'an maxBackoff without an s',
config: {...validRetryConfig, maxBackoff: '123'},
error: /retry policy: maxBackoff must be a string consisting of a positive integer followed by s/
},
{
description: 'omitted backoffMultiplier',
config: {
maxAttempts: 2,
initialBackoff: '1s',
maxBackoff: '1s',
retryableStatusCodes: [14]
},
error: /retry policy: backoffMultiplier must be a number greater than 0/
},
{
description: 'a negative backoffMultiplier',
config: {...validRetryConfig, backoffMultiplier: -1},
error: /retry policy: backoffMultiplier must be a number greater than 0/
},
{
description: 'omitted retryableStatusCodes',
config: {
maxAttempts: 2,
initialBackoff: '1s',
maxBackoff: '1s',
backoffMultiplier: 1
},
error: /retry policy: retryableStatusCodes is required/
},
{
description: 'empty retryableStatusCodes',
config: {...validRetryConfig, retryableStatusCodes: []},
error: /retry policy: retryableStatusCodes must be non-empty/
},
{
description: 'unknown status code name',
config: {...validRetryConfig, retryableStatusCodes: ['abcd']},
error: /retry policy: retryableStatusCodes value not a status code name/
},
{
description: 'out of range status code number',
config: {...validRetryConfig, retryableStatusCodes: [12345]},
error: /retry policy: retryableStatusCodes value not in status code range/
}
];
const validHedgingConfig = {
maxAttempts: 2
};
const HEDGING_TEST_CASES: TestCase[] = [
{
description: 'omitted maxAttempts',
config: {},
error: /hedging policy: maxAttempts must be an integer at least 2/
},
{
description: 'a low maxAttempts',
config: {...validHedgingConfig, maxAttempts: 1},
error: /hedging policy: maxAttempts must be an integer at least 2/
},
{
description: 'a non-numeric hedgingDelay',
config: {...validHedgingConfig, hedgingDelay: 'abcs'},
error: /hedging policy: hedgingDelay must be a string consisting of a positive integer followed by s/
},
{
description: 'a hedgingDelay without an s',
config: {...validHedgingConfig, hedgingDelay: '123'},
error: /hedging policy: hedgingDelay must be a string consisting of a positive integer followed by s/
},
{
description: 'unknown status code name',
config: {...validHedgingConfig, nonFatalStatusCodes: ['abcd']},
error: /hedging policy: nonFatalStatusCodes value not a status code name/
},
{
description: 'out of range status code number',
config: {...validHedgingConfig, nonFatalStatusCodes: [12345]},
error: /hedging policy: nonFatalStatusCodes value not in status code range/
}
];
const validThrottlingConfig = {
maxTokens: 100,
tokenRatio: 0.1
};
const THROTTLING_TEST_CASES: TestCase[] = [
{
description: 'omitted maxTokens',
config: {tokenRatio: 0.1},
error: /retryThrottling: maxTokens must be a number in \(0, 1000\]/
},
{
description: 'a large maxTokens',
config: {...validThrottlingConfig, maxTokens: 1001},
error: /retryThrottling: maxTokens must be a number in \(0, 1000\]/
},
{
description: 'zero maxTokens',
config: {...validThrottlingConfig, maxTokens: 0},
error: /retryThrottling: maxTokens must be a number in \(0, 1000\]/
},
{
description: 'omitted tokenRatio',
config: {maxTokens: 100},
error: /retryThrottling: tokenRatio must be a number greater than 0/
},
{
description: 'zero tokenRatio',
config: {...validThrottlingConfig, tokenRatio: 0},
error: /retryThrottling: tokenRatio must be a number greater than 0/
}
];
describe('Retry configs', () => {
describe('Retry', () => {
it('Should accept a valid config', () => {
assert.doesNotThrow(() => {
validateServiceConfig(createRetryServiceConfig(validRetryConfig));
});
});
for (const testCase of RETRY_TEST_CASES) {
it(`Should reject ${testCase.description}`, () => {
assert.throws(() => {
validateServiceConfig(createRetryServiceConfig(testCase.config));
}, testCase.error);
});
}
});
describe('Hedging', () => {
it('Should accept valid configs', () => {
assert.doesNotThrow(() => {
validateServiceConfig(createHedgingServiceConfig(validHedgingConfig));
});
assert.doesNotThrow(() => {
validateServiceConfig(createHedgingServiceConfig({...validHedgingConfig, hedgingDelay: '1s'}));
});
assert.doesNotThrow(() => {
validateServiceConfig(createHedgingServiceConfig({...validHedgingConfig, nonFatalStatusCodes: [14, 'RESOURCE_EXHAUSTED']}));
});
});
for (const testCase of HEDGING_TEST_CASES) {
it(`Should reject ${testCase.description}`, () => {
assert.throws(() => {
validateServiceConfig(createHedgingServiceConfig(testCase.config));
}, testCase.error);
});
}
});
describe('Throttling', () => {
it('Should accept a valid config', () => {
assert.doesNotThrow(() => {
validateServiceConfig(createThrottlingServiceConfig(validThrottlingConfig));
});
});
for (const testCase of THROTTLING_TEST_CASES) {
it(`Should reject ${testCase.description}`, () => {
assert.throws(() => {
validateServiceConfig(createThrottlingServiceConfig(testCase.config));
}, testCase.error);
});
}
});
});

View File

@ -0,0 +1,364 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as assert from 'assert';
import * as path from 'path';
import * as grpc from '../src';
import { loadProtoFile } from './common';
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const EchoService = loadProtoFile(protoFile)
.EchoService as grpc.ServiceClientConstructor;
const serviceImpl = {
echo: (call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) => {
const succeedOnRetryAttempt = call.metadata.get('succeed-on-retry-attempt');
const previousAttempts = call.metadata.get('grpc-previous-rpc-attempts');
if (succeedOnRetryAttempt.length === 0 || (previousAttempts.length > 0 && previousAttempts[0] === succeedOnRetryAttempt[0])) {
callback(null, call.request);
} else {
const statusCode = call.metadata.get('respond-with-status');
const code = statusCode[0] ? Number.parseInt(statusCode[0] as string) : grpc.status.UNKNOWN;
callback({
code: code,
details: `Failed on retry ${previousAttempts[0] ?? 0}`
});
}
}
}
describe('Retries', () => {
let server: grpc.Server;
let port: number;
before((done) => {
server = new grpc.Server();
server.addService(EchoService.service, serviceImpl);
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, portNumber) => {
if (error) {
done(error);
return;
}
port = portNumber;
server.start();
done();
});
});
after(() => {
server.forceShutdown();
});
describe('Client with retries disabled', () => {
let client: InstanceType<grpc.ServiceClientConstructor>;
before(() => {
client = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.enable_retries': 0});
});
after(() =>{
client.close();
});
it('Should be able to make a basic request', (done) => {
client.echo(
{ value: 'test value', value2: 3 },
(error: grpc.ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
done();
}
);
});
it('Should fail if the server fails the first request', (done) =>{
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '1');
client.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert(error);
assert.strictEqual(error.details, 'Failed on retry 0');
done();
}
);
});
});
describe('Client with retries enabled but not configured', () => {
let client: InstanceType<grpc.ServiceClientConstructor>;
before(() => {
client = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure());
});
after(() =>{
client.close();
});
it('Should be able to make a basic request', (done) => {
client.echo(
{ value: 'test value', value2: 3 },
(error: grpc.ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
done();
}
);
});
it('Should fail if the server fails the first request', (done) =>{
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '1');
client.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert(error);
assert.strictEqual(error.details, 'Failed on retry 0');
done();
}
);
});
});
describe('Client with retries configured', () => {
let client: InstanceType<grpc.ServiceClientConstructor>;
before(() => {
const serviceConfig = {
loadBalancingConfig: [],
methodConfig: [
{
name: [{
service: 'EchoService'
}],
retryPolicy: {
maxAttempts: 3,
initialBackoff: '0.1s',
maxBackoff: '10s',
backoffMultiplier: 1.2,
retryableStatusCodes: [14, 'RESOURCE_EXHAUSTED']
}
}
]
}
client = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(serviceConfig)});
});
after(() =>{
client.close();
});
it('Should be able to make a basic request', (done) => {
client.echo(
{ value: 'test value', value2: 3 },
(error: grpc.ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
done();
}
);
});
it('Should succeed with few required attempts', (done) => {
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '2');
metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`);
client.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
done();
}
);
});
it('Should fail with many required attempts', (done) => {
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '4');
metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`);
client.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert(error);
assert.strictEqual(error.details, 'Failed on retry 2');
done();
}
);
});
it('Should fail with a fatal status code', (done) => {
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '2');
metadata.set('respond-with-status', `${grpc.status.NOT_FOUND}`);
client.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert(error);
assert.strictEqual(error.details, 'Failed on retry 0');
done();
}
);
});
it('Should not be able to make more than 5 attempts', (done) => {
const serviceConfig = {
loadBalancingConfig: [],
methodConfig: [
{
name: [{
service: 'EchoService'
}],
retryPolicy: {
maxAttempts: 10,
initialBackoff: '0.1s',
maxBackoff: '10s',
backoffMultiplier: 1.2,
retryableStatusCodes: [14, 'RESOURCE_EXHAUSTED']
}
}
]
}
const client2 = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(serviceConfig)});
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '6');
metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`);
client2.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert(error);
assert.strictEqual(error.details, 'Failed on retry 4');
done();
}
);
})
});
describe('Client with hedging configured', () => {
let client: InstanceType<grpc.ServiceClientConstructor>;
before(() => {
const serviceConfig = {
loadBalancingConfig: [],
methodConfig: [
{
name: [{
service: 'EchoService'
}],
hedgingPolicy: {
maxAttempts: 3,
nonFatalStatusCodes: [14, 'RESOURCE_EXHAUSTED']
}
}
]
}
client = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(serviceConfig)});
});
after(() =>{
client.close();
});
it('Should be able to make a basic request', (done) => {
client.echo(
{ value: 'test value', value2: 3 },
(error: grpc.ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
done();
}
);
});
it('Should succeed with few required attempts', (done) => {
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '2');
metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`);
client.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
done();
}
);
});
it('Should fail with many required attempts', (done) => {
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '4');
metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`);
client.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert(error);
assert(error.details.startsWith('Failed on retry'));
done();
}
);
});
it('Should fail with a fatal status code', (done) => {
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '2');
metadata.set('respond-with-status', `${grpc.status.NOT_FOUND}`);
client.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert(error);
assert(error.details.startsWith('Failed on retry'));
done();
}
);
});
it('Should not be able to make more than 5 attempts', (done) => {
const serviceConfig = {
loadBalancingConfig: [],
methodConfig: [
{
name: [{
service: 'EchoService'
}],
hedgingPolicy: {
maxAttempts: 10,
nonFatalStatusCodes: [14, 'RESOURCE_EXHAUSTED']
}
}
]
}
const client2 = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(serviceConfig)});
const metadata = new grpc.Metadata();
metadata.set('succeed-on-retry-attempt', '6');
metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`);
client2.echo(
{ value: 'test value', value2: 3 },
metadata,
(error: grpc.ServiceError, response: any) => {
assert(error);
assert(error.details.startsWith('Failed on retry'));
done();
}
);
})
});
});

View File

@ -135,20 +135,16 @@ function getImportLine(dependency: Protobuf.Type | Protobuf.Enum | Protobuf.Serv
/* If the dependency is defined within a message, it will be generated in that
* message's file and exported using its typeInterfaceName. */
if (dependency.parent instanceof Protobuf.Type) {
if (dependency instanceof Protobuf.Type) {
if (dependency instanceof Protobuf.Type || dependency instanceof Protobuf.Enum) {
importedTypes = `${inputName(typeInterfaceName)}, ${outputName(typeInterfaceName)}`;
} else if (dependency instanceof Protobuf.Enum) {
importedTypes = `${typeInterfaceName}`;
} else if (dependency instanceof Protobuf.Service) {
importedTypes = `${typeInterfaceName}Client, ${typeInterfaceName}Definition`;
} else {
throw new Error('Invalid object passed to getImportLine');
}
} else {
if (dependency instanceof Protobuf.Type) {
if (dependency instanceof Protobuf.Type || dependency instanceof Protobuf.Enum) {
importedTypes = `${inputName(dependency.name)} as ${inputName(typeInterfaceName)}, ${outputName(dependency.name)} as ${outputName(typeInterfaceName)}`;
} else if (dependency instanceof Protobuf.Enum) {
importedTypes = `${dependency.name} as ${typeInterfaceName}`;
} else if (dependency instanceof Protobuf.Service) {
importedTypes = `${dependency.name}Client as ${typeInterfaceName}Client, ${dependency.name}Definition as ${typeInterfaceName}Definition`;
} else {
@ -220,7 +216,8 @@ function getTypeNamePermissive(fieldType: string, resolvedType: Protobuf.Type |
return `${inputName(typeInterfaceName)} | null`;
}
} else {
return `${typeInterfaceName} | keyof typeof ${typeInterfaceName}`;
// Enum
return inputName(typeInterfaceName);
}
}
}
@ -324,11 +321,8 @@ function getTypeNameRestricted(fieldType: string, resolvedType: Protobuf.Type |
return `${outputName(typeInterfaceName)}`;
}
} else {
if (options.enums == String) {
return `keyof typeof ${typeInterfaceName}`;
} else {
return typeInterfaceName;
}
// Enum
return outputName(typeInterfaceName);
}
}
}
@ -455,21 +449,46 @@ function generateMessageInterfaces(formatter: TextFormatter, messageType: Protob
}
function generateEnumInterface(formatter: TextFormatter, enumType: Protobuf.Enum, options: GeneratorOptions, nameOverride?: string) {
const {inputName, outputName} = useNameFmter(options);
const name = nameOverride ?? enumType.name;
formatter.writeLine(`// Original file: ${(enumType.filename ?? 'null')?.replace(/\\/g, '/')}`);
formatter.writeLine('');
if (options.includeComments) {
formatComment(formatter, enumType.comment);
}
formatter.writeLine(`export enum ${nameOverride ?? enumType.name} {`);
formatter.writeLine(`export const ${name} = {`);
formatter.indent();
for (const key of Object.keys(enumType.values)) {
if (options.includeComments) {
formatComment(formatter, enumType.comments[key]);
}
formatter.writeLine(`${key} = ${enumType.values[key]},`);
formatter.writeLine(`${key}: ${options.enums == String ? `'${key}'` : enumType.values[key]},`);
}
formatter.unindent();
formatter.writeLine('}');
formatter.writeLine('} as const;');
// Permissive Type
formatter.writeLine('');
if (options.includeComments) {
formatComment(formatter, enumType.comment);
}
formatter.writeLine(`export type ${inputName(name)} =`)
formatter.indent();
for (const key of Object.keys(enumType.values)) {
if (options.includeComments) {
formatComment(formatter, enumType.comments[key]);
}
formatter.writeLine(`| '${key}'`);
formatter.writeLine(`| ${enumType.values[key]}`);
}
formatter.unindent();
// Restrictive Type
formatter.writeLine('');
if (options.includeComments) {
formatComment(formatter, enumType.comment);
}
formatter.writeLine(`export type ${outputName(name)} = typeof ${name}[keyof typeof ${name}]`)
}
/**

View File

@ -8,40 +8,101 @@
*
* Note: This enum **may** receive new values in the future.
*/
export enum FieldBehavior {
export const FieldBehavior = {
/**
* Conventional default for enums. Do not use this.
*/
FIELD_BEHAVIOR_UNSPECIFIED = 0,
FIELD_BEHAVIOR_UNSPECIFIED: 'FIELD_BEHAVIOR_UNSPECIFIED',
/**
* Specifically denotes a field as optional.
* While all fields in protocol buffers are optional, this may be specified
* for emphasis if appropriate.
*/
OPTIONAL = 1,
OPTIONAL: 'OPTIONAL',
/**
* Denotes a field as required.
* This indicates that the field **must** be provided as part of the request,
* and failure to do so will cause an error (usually `INVALID_ARGUMENT`).
*/
REQUIRED = 2,
REQUIRED: 'REQUIRED',
/**
* Denotes a field as output only.
* This indicates that the field is provided in responses, but including the
* field in a request does nothing (the server *must* ignore it and
* *must not* throw an error as a result of the field's presence).
*/
OUTPUT_ONLY = 3,
OUTPUT_ONLY: 'OUTPUT_ONLY',
/**
* Denotes a field as input only.
* This indicates that the field is provided in requests, and the
* corresponding field is not included in output.
*/
INPUT_ONLY = 4,
INPUT_ONLY: 'INPUT_ONLY',
/**
* Denotes a field as immutable.
* This indicates that the field may be set once in a request to create a
* resource, but may not be changed thereafter.
*/
IMMUTABLE = 5,
}
IMMUTABLE: 'IMMUTABLE',
} as const;
/**
* An indicator of the behavior of a given field (for example, that a field
* is required in requests, or given as output but ignored as input).
* This **does not** change the behavior in protocol buffers itself; it only
* denotes the behavior and may affect how API tooling handles the field.
*
* Note: This enum **may** receive new values in the future.
*/
export type IFieldBehavior =
/**
* Conventional default for enums. Do not use this.
*/
| 'FIELD_BEHAVIOR_UNSPECIFIED'
| 0
/**
* Specifically denotes a field as optional.
* While all fields in protocol buffers are optional, this may be specified
* for emphasis if appropriate.
*/
| 'OPTIONAL'
| 1
/**
* Denotes a field as required.
* This indicates that the field **must** be provided as part of the request,
* and failure to do so will cause an error (usually `INVALID_ARGUMENT`).
*/
| 'REQUIRED'
| 2
/**
* Denotes a field as output only.
* This indicates that the field is provided in responses, but including the
* field in a request does nothing (the server *must* ignore it and
* *must not* throw an error as a result of the field's presence).
*/
| 'OUTPUT_ONLY'
| 3
/**
* Denotes a field as input only.
* This indicates that the field is provided in requests, and the
* corresponding field is not included in output.
*/
| 'INPUT_ONLY'
| 4
/**
* Denotes a field as immutable.
* This indicates that the field may be set once in a request to create a
* resource, but may not be changed thereafter.
*/
| 'IMMUTABLE'
| 5
/**
* An indicator of the behavior of a given field (for example, that a field
* is required in requests, or given as output but ignored as input).
* This **does not** change the behavior in protocol buffers itself; it only
* denotes the behavior and may affect how API tooling handles the field.
*
* Note: This enum **may** receive new values in the future.
*/
export type OFieldBehavior = typeof FieldBehavior[keyof typeof FieldBehavior]

View File

@ -4,41 +4,91 @@ import type { IFieldOptions as I_google_protobuf_FieldOptions, OFieldOptions as
// Original file: null
export enum _google_protobuf_FieldDescriptorProto_Label {
LABEL_OPTIONAL = 1,
LABEL_REQUIRED = 2,
LABEL_REPEATED = 3,
}
export const _google_protobuf_FieldDescriptorProto_Label = {
LABEL_OPTIONAL: 'LABEL_OPTIONAL',
LABEL_REQUIRED: 'LABEL_REQUIRED',
LABEL_REPEATED: 'LABEL_REPEATED',
} as const;
export type I_google_protobuf_FieldDescriptorProto_Label =
| 'LABEL_OPTIONAL'
| 1
| 'LABEL_REQUIRED'
| 2
| 'LABEL_REPEATED'
| 3
export type O_google_protobuf_FieldDescriptorProto_Label = typeof _google_protobuf_FieldDescriptorProto_Label[keyof typeof _google_protobuf_FieldDescriptorProto_Label]
// Original file: null
export enum _google_protobuf_FieldDescriptorProto_Type {
TYPE_DOUBLE = 1,
TYPE_FLOAT = 2,
TYPE_INT64 = 3,
TYPE_UINT64 = 4,
TYPE_INT32 = 5,
TYPE_FIXED64 = 6,
TYPE_FIXED32 = 7,
TYPE_BOOL = 8,
TYPE_STRING = 9,
TYPE_GROUP = 10,
TYPE_MESSAGE = 11,
TYPE_BYTES = 12,
TYPE_UINT32 = 13,
TYPE_ENUM = 14,
TYPE_SFIXED32 = 15,
TYPE_SFIXED64 = 16,
TYPE_SINT32 = 17,
TYPE_SINT64 = 18,
}
export const _google_protobuf_FieldDescriptorProto_Type = {
TYPE_DOUBLE: 'TYPE_DOUBLE',
TYPE_FLOAT: 'TYPE_FLOAT',
TYPE_INT64: 'TYPE_INT64',
TYPE_UINT64: 'TYPE_UINT64',
TYPE_INT32: 'TYPE_INT32',
TYPE_FIXED64: 'TYPE_FIXED64',
TYPE_FIXED32: 'TYPE_FIXED32',
TYPE_BOOL: 'TYPE_BOOL',
TYPE_STRING: 'TYPE_STRING',
TYPE_GROUP: 'TYPE_GROUP',
TYPE_MESSAGE: 'TYPE_MESSAGE',
TYPE_BYTES: 'TYPE_BYTES',
TYPE_UINT32: 'TYPE_UINT32',
TYPE_ENUM: 'TYPE_ENUM',
TYPE_SFIXED32: 'TYPE_SFIXED32',
TYPE_SFIXED64: 'TYPE_SFIXED64',
TYPE_SINT32: 'TYPE_SINT32',
TYPE_SINT64: 'TYPE_SINT64',
} as const;
export type I_google_protobuf_FieldDescriptorProto_Type =
| 'TYPE_DOUBLE'
| 1
| 'TYPE_FLOAT'
| 2
| 'TYPE_INT64'
| 3
| 'TYPE_UINT64'
| 4
| 'TYPE_INT32'
| 5
| 'TYPE_FIXED64'
| 6
| 'TYPE_FIXED32'
| 7
| 'TYPE_BOOL'
| 8
| 'TYPE_STRING'
| 9
| 'TYPE_GROUP'
| 10
| 'TYPE_MESSAGE'
| 11
| 'TYPE_BYTES'
| 12
| 'TYPE_UINT32'
| 13
| 'TYPE_ENUM'
| 14
| 'TYPE_SFIXED32'
| 15
| 'TYPE_SFIXED64'
| 16
| 'TYPE_SINT32'
| 17
| 'TYPE_SINT64'
| 18
export type O_google_protobuf_FieldDescriptorProto_Type = typeof _google_protobuf_FieldDescriptorProto_Type[keyof typeof _google_protobuf_FieldDescriptorProto_Type]
export interface IFieldDescriptorProto {
'name'?: (string);
'extendee'?: (string);
'number'?: (number);
'label'?: (_google_protobuf_FieldDescriptorProto_Label | keyof typeof _google_protobuf_FieldDescriptorProto_Label);
'type'?: (_google_protobuf_FieldDescriptorProto_Type | keyof typeof _google_protobuf_FieldDescriptorProto_Type);
'label'?: (I_google_protobuf_FieldDescriptorProto_Label);
'type'?: (I_google_protobuf_FieldDescriptorProto_Type);
'typeName'?: (string);
'defaultValue'?: (string);
'options'?: (I_google_protobuf_FieldOptions | null);
@ -50,8 +100,8 @@ export interface OFieldDescriptorProto {
'name': (string);
'extendee': (string);
'number': (number);
'label': (keyof typeof _google_protobuf_FieldDescriptorProto_Label);
'type': (keyof typeof _google_protobuf_FieldDescriptorProto_Type);
'label': (O_google_protobuf_FieldDescriptorProto_Label);
'type': (O_google_protobuf_FieldDescriptorProto_Type);
'typeName': (string);
'defaultValue': (string);
'options': (O_google_protobuf_FieldOptions | null);

View File

@ -1,42 +1,62 @@
// Original file: null
import type { IUninterpretedOption as I_google_protobuf_UninterpretedOption, OUninterpretedOption as O_google_protobuf_UninterpretedOption } from '../../google/protobuf/UninterpretedOption';
import type { FieldBehavior as _google_api_FieldBehavior } from '../../google/api/FieldBehavior';
import type { IFieldBehavior as I_google_api_FieldBehavior, OFieldBehavior as O_google_api_FieldBehavior } from '../../google/api/FieldBehavior';
// Original file: null
export enum _google_protobuf_FieldOptions_CType {
STRING = 0,
CORD = 1,
STRING_PIECE = 2,
}
export const _google_protobuf_FieldOptions_CType = {
STRING: 'STRING',
CORD: 'CORD',
STRING_PIECE: 'STRING_PIECE',
} as const;
export type I_google_protobuf_FieldOptions_CType =
| 'STRING'
| 0
| 'CORD'
| 1
| 'STRING_PIECE'
| 2
export type O_google_protobuf_FieldOptions_CType = typeof _google_protobuf_FieldOptions_CType[keyof typeof _google_protobuf_FieldOptions_CType]
// Original file: null
export enum _google_protobuf_FieldOptions_JSType {
JS_NORMAL = 0,
JS_STRING = 1,
JS_NUMBER = 2,
}
export const _google_protobuf_FieldOptions_JSType = {
JS_NORMAL: 'JS_NORMAL',
JS_STRING: 'JS_STRING',
JS_NUMBER: 'JS_NUMBER',
} as const;
export type I_google_protobuf_FieldOptions_JSType =
| 'JS_NORMAL'
| 0
| 'JS_STRING'
| 1
| 'JS_NUMBER'
| 2
export type O_google_protobuf_FieldOptions_JSType = typeof _google_protobuf_FieldOptions_JSType[keyof typeof _google_protobuf_FieldOptions_JSType]
export interface IFieldOptions {
'ctype'?: (_google_protobuf_FieldOptions_CType | keyof typeof _google_protobuf_FieldOptions_CType);
'ctype'?: (I_google_protobuf_FieldOptions_CType);
'packed'?: (boolean);
'deprecated'?: (boolean);
'lazy'?: (boolean);
'jstype'?: (_google_protobuf_FieldOptions_JSType | keyof typeof _google_protobuf_FieldOptions_JSType);
'jstype'?: (I_google_protobuf_FieldOptions_JSType);
'weak'?: (boolean);
'uninterpretedOption'?: (I_google_protobuf_UninterpretedOption)[];
'.google.api.field_behavior'?: (_google_api_FieldBehavior | keyof typeof _google_api_FieldBehavior)[];
'.google.api.field_behavior'?: (I_google_api_FieldBehavior)[];
}
export interface OFieldOptions {
'ctype': (keyof typeof _google_protobuf_FieldOptions_CType);
'ctype': (O_google_protobuf_FieldOptions_CType);
'packed': (boolean);
'deprecated': (boolean);
'lazy': (boolean);
'jstype': (keyof typeof _google_protobuf_FieldOptions_JSType);
'jstype': (O_google_protobuf_FieldOptions_JSType);
'weak': (boolean);
'uninterpretedOption': (O_google_protobuf_UninterpretedOption)[];
'.google.api.field_behavior': (keyof typeof _google_api_FieldBehavior)[];
'.google.api.field_behavior': (O_google_api_FieldBehavior)[];
}

View File

@ -4,16 +4,26 @@ import type { IUninterpretedOption as I_google_protobuf_UninterpretedOption, OUn
// Original file: null
export enum _google_protobuf_FileOptions_OptimizeMode {
SPEED = 1,
CODE_SIZE = 2,
LITE_RUNTIME = 3,
}
export const _google_protobuf_FileOptions_OptimizeMode = {
SPEED: 'SPEED',
CODE_SIZE: 'CODE_SIZE',
LITE_RUNTIME: 'LITE_RUNTIME',
} as const;
export type I_google_protobuf_FileOptions_OptimizeMode =
| 'SPEED'
| 1
| 'CODE_SIZE'
| 2
| 'LITE_RUNTIME'
| 3
export type O_google_protobuf_FileOptions_OptimizeMode = typeof _google_protobuf_FileOptions_OptimizeMode[keyof typeof _google_protobuf_FileOptions_OptimizeMode]
export interface IFileOptions {
'javaPackage'?: (string);
'javaOuterClassname'?: (string);
'optimizeFor'?: (_google_protobuf_FileOptions_OptimizeMode | keyof typeof _google_protobuf_FileOptions_OptimizeMode);
'optimizeFor'?: (I_google_protobuf_FileOptions_OptimizeMode);
'javaMultipleFiles'?: (boolean);
'goPackage'?: (string);
'ccGenericServices'?: (boolean);
@ -31,7 +41,7 @@ export interface IFileOptions {
export interface OFileOptions {
'javaPackage': (string);
'javaOuterClassname': (string);
'optimizeFor': (keyof typeof _google_protobuf_FileOptions_OptimizeMode);
'optimizeFor': (O_google_protobuf_FileOptions_OptimizeMode);
'javaMultipleFiles': (boolean);
'goPackage': (string);
'ccGenericServices': (boolean);

View File

@ -1,7 +1,7 @@
// Original file: deps/gapic-showcase/schema/google/showcase/v1beta1/echo.proto
import type { IStatus as I_google_rpc_Status, OStatus as O_google_rpc_Status } from '../../../google/rpc/Status';
import type { Severity as _google_showcase_v1beta1_Severity } from '../../../google/showcase/v1beta1/Severity';
import type { ISeverity as I_google_showcase_v1beta1_Severity, OSeverity as O_google_showcase_v1beta1_Severity } from '../../../google/showcase/v1beta1/Severity';
/**
* The request message used for the Echo, Collect and Chat methods.
@ -21,7 +21,7 @@ export interface IEchoRequest {
/**
* The severity to be echoed by the server.
*/
'severity'?: (_google_showcase_v1beta1_Severity | keyof typeof _google_showcase_v1beta1_Severity);
'severity'?: (I_google_showcase_v1beta1_Severity);
'response'?: "content"|"error";
}
@ -43,6 +43,6 @@ export interface OEchoRequest {
/**
* The severity to be echoed by the server.
*/
'severity': (keyof typeof _google_showcase_v1beta1_Severity);
'severity': (O_google_showcase_v1beta1_Severity);
'response': "content"|"error";
}

View File

@ -1,6 +1,6 @@
// Original file: deps/gapic-showcase/schema/google/showcase/v1beta1/echo.proto
import type { Severity as _google_showcase_v1beta1_Severity } from '../../../google/showcase/v1beta1/Severity';
import type { ISeverity as I_google_showcase_v1beta1_Severity, OSeverity as O_google_showcase_v1beta1_Severity } from '../../../google/showcase/v1beta1/Severity';
/**
* The response message for the Echo methods.
@ -13,7 +13,7 @@ export interface IEchoResponse {
/**
* The severity specified in the request.
*/
'severity'?: (_google_showcase_v1beta1_Severity | keyof typeof _google_showcase_v1beta1_Severity);
'severity'?: (I_google_showcase_v1beta1_Severity);
}
/**
@ -27,5 +27,5 @@ export interface OEchoResponse {
/**
* The severity specified in the request.
*/
'severity': (keyof typeof _google_showcase_v1beta1_Severity);
'severity': (O_google_showcase_v1beta1_Severity);
}

View File

@ -3,9 +3,27 @@
/**
* A severity enum used to test enum capabilities in GAPIC surfaces
*/
export enum Severity {
UNNECESSARY = 0,
NECESSARY = 1,
URGENT = 2,
CRITICAL = 3,
}
export const Severity = {
UNNECESSARY: 'UNNECESSARY',
NECESSARY: 'NECESSARY',
URGENT: 'URGENT',
CRITICAL: 'CRITICAL',
} as const;
/**
* A severity enum used to test enum capabilities in GAPIC surfaces
*/
export type ISeverity =
| 'UNNECESSARY'
| 0
| 'NECESSARY'
| 1
| 'URGENT'
| 2
| 'CRITICAL'
| 3
/**
* A severity enum used to test enum capabilities in GAPIC surfaces
*/
export type OSeverity = typeof Severity[keyof typeof Severity]