mirror of https://github.com/grpc/grpc-node.git
grpc-js: Implement retries
This commit is contained in:
parent
c4c321d37d
commit
035c260e36
|
@ -44,6 +44,14 @@ export interface ChannelOptions {
|
|||
'grpc.default_compression_algorithm'?: CompressionAlgorithms;
|
||||
'grpc.enable_channelz'?: number;
|
||||
'grpc.dns_min_time_between_resolutions_ms'?: number;
|
||||
'grpc.enable_retries'?: number;
|
||||
'grpc.per_rpc_retry_buffer_size'?: number;
|
||||
/* This option is pattered like a core option, but the core does not have
|
||||
* this option. It is closely related to the option
|
||||
* grpc.per_rpc_retry_buffer_size, which is in the core. The core will likely
|
||||
* implement this functionality using the ResourceQuota mechanism, so there
|
||||
* will probably not be any collision or other inconsistency. */
|
||||
'grpc.retry_buffer_size'?: number;
|
||||
'grpc-node.max_session_memory'?: number;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
[key: string]: any;
|
||||
|
@ -71,6 +79,9 @@ export const recognizedOptions = {
|
|||
'grpc.enable_http_proxy': true,
|
||||
'grpc.enable_channelz': true,
|
||||
'grpc.dns_min_time_between_resolutions_ms': true,
|
||||
'grpc.enable_retries': true,
|
||||
'grpc.per_rpc_retry_buffer_size': true,
|
||||
'grpc.retry_buffer_size': true,
|
||||
'grpc-node.max_session_memory': true,
|
||||
};
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ import { Deadline, getDeadlineTimeoutString } from './deadline';
|
|||
import { ResolvingCall } from './resolving-call';
|
||||
import { getNextCallNumber } from './call-number';
|
||||
import { restrictControlPlaneStatusCode } from './control-plane-status';
|
||||
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';
|
||||
|
||||
/**
|
||||
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
|
||||
|
@ -78,6 +79,11 @@ interface ErrorConfigResult {
|
|||
|
||||
type GetConfigResult = NoneConfigResult | SuccessConfigResult | ErrorConfigResult;
|
||||
|
||||
const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
|
||||
|
||||
const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
|
||||
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB
|
||||
|
||||
export class InternalChannel {
|
||||
|
||||
private resolvingLoadBalancer: ResolvingLoadBalancer;
|
||||
|
@ -111,6 +117,7 @@ export class InternalChannel {
|
|||
* than TRANSIENT_FAILURE.
|
||||
*/
|
||||
private currentResolutionError: StatusObject | null = null;
|
||||
private retryBufferTracker: MessageBufferTracker;
|
||||
|
||||
// Channelz info
|
||||
private readonly channelzEnabled: boolean = true;
|
||||
|
@ -179,6 +186,10 @@ export class InternalChannel {
|
|||
this.subchannelPool = getSubchannelPool(
|
||||
(options['grpc.use_local_subchannel_pool'] ?? 0) === 0
|
||||
);
|
||||
this.retryBufferTracker = new MessageBufferTracker(
|
||||
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
|
||||
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
|
||||
);
|
||||
const channelControlHelper: ChannelControlHelper = {
|
||||
createSubchannel: (
|
||||
subchannelAddress: SubchannelAddress,
|
||||
|
@ -226,7 +237,12 @@ export class InternalChannel {
|
|||
this.target,
|
||||
channelControlHelper,
|
||||
options,
|
||||
(configSelector) => {
|
||||
(serviceConfig, configSelector) => {
|
||||
if (serviceConfig.retryThrottling) {
|
||||
RETRY_THROTTLER_MAP.set(this.getTarget(), new RetryThrottler(serviceConfig.retryThrottling.maxTokens, serviceConfig.retryThrottling.tokenRatio, RETRY_THROTTLER_MAP.get(this.getTarget())));
|
||||
} else {
|
||||
RETRY_THROTTLER_MAP.delete(this.getTarget());
|
||||
}
|
||||
if (this.channelzEnabled) {
|
||||
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
|
||||
}
|
||||
|
@ -243,6 +259,7 @@ export class InternalChannel {
|
|||
}
|
||||
this.configSelectionQueue = [];
|
||||
});
|
||||
|
||||
},
|
||||
(status) => {
|
||||
if (this.channelzEnabled) {
|
||||
|
@ -405,6 +422,24 @@ export class InternalChannel {
|
|||
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber);
|
||||
}
|
||||
|
||||
createRetryingCall(
|
||||
callConfig: CallConfig,
|
||||
method: string,
|
||||
host: string,
|
||||
credentials: CallCredentials,
|
||||
deadline: Deadline
|
||||
): RetryingCall {
|
||||
const callNumber = getNextCallNumber();
|
||||
this.trace(
|
||||
'createRetryingCall [' +
|
||||
callNumber +
|
||||
'] method="' +
|
||||
method +
|
||||
'"'
|
||||
);
|
||||
return new RetryingCall(this, callConfig, method, host, credentials, deadline, callNumber, this.retryBufferTracker, RETRY_THROTTLER_MAP.get(this.getTarget()))
|
||||
}
|
||||
|
||||
createInnerCall(
|
||||
callConfig: CallConfig,
|
||||
method: string,
|
||||
|
@ -413,7 +448,11 @@ export class InternalChannel {
|
|||
deadline: Deadline
|
||||
): Call {
|
||||
// Create a RetryingCall if retries are enabled
|
||||
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
|
||||
if (this.options['grpc.enable_retries'] === 0) {
|
||||
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
|
||||
} else {
|
||||
return this.createRetryingCall(callConfig, method, host, credentials, deadline);
|
||||
}
|
||||
}
|
||||
|
||||
createResolvingCall(
|
||||
|
@ -439,7 +478,7 @@ export class InternalChannel {
|
|||
parentCall: parentCall,
|
||||
};
|
||||
|
||||
const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), getNextCallNumber());
|
||||
const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), callNumber);
|
||||
|
||||
if (this.channelzEnabled) {
|
||||
this.callTracker.addCallStarted();
|
||||
|
|
|
@ -29,6 +29,7 @@ import { CallConfig } from "./resolver";
|
|||
import { splitHostPort } from "./uri-parser";
|
||||
import * as logging from './logging';
|
||||
import { restrictControlPlaneStatusCode } from "./control-plane-status";
|
||||
import * as http2 from 'http2';
|
||||
|
||||
const TRACER_NAME = 'load_balancing_call';
|
||||
|
||||
|
@ -38,6 +39,10 @@ export interface StatusObjectWithProgress extends StatusObject {
|
|||
progress: RpcProgress;
|
||||
}
|
||||
|
||||
export interface LoadBalancingCallInterceptingListener extends InterceptingListener {
|
||||
onReceiveStatus(status: StatusObjectWithProgress): void;
|
||||
}
|
||||
|
||||
export class LoadBalancingCall implements Call {
|
||||
private child: SubchannelCall | null = null;
|
||||
private readPending = false;
|
||||
|
@ -151,7 +156,11 @@ export class LoadBalancingCall implements Call {
|
|||
this.listener!.onReceiveMessage(message);
|
||||
},
|
||||
onReceiveStatus: status => {
|
||||
this.outputStatus(status, 'PROCESSED');
|
||||
if (status.code === http2.constants.NGHTTP2_REFUSED_STREAM) {
|
||||
this.outputStatus(status, 'REFUSED');
|
||||
} else {
|
||||
this.outputStatus(status, 'PROCESSED');
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
|
@ -226,7 +235,7 @@ export class LoadBalancingCall implements Call {
|
|||
getPeer(): string {
|
||||
return this.child?.getPeer() ?? this.channel.getTarget();
|
||||
}
|
||||
start(metadata: Metadata, listener: InterceptingListener): void {
|
||||
start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void {
|
||||
this.trace('start called');
|
||||
this.listener = listener;
|
||||
this.metadata = metadata;
|
||||
|
|
|
@ -83,7 +83,7 @@ function getDefaultConfigSelector(
|
|||
}
|
||||
|
||||
export interface ResolutionCallback {
|
||||
(configSelector: ConfigSelector): void;
|
||||
(serviceConfig: ServiceConfig, configSelector: ConfigSelector): void;
|
||||
}
|
||||
|
||||
export interface ResolutionFailureCallback {
|
||||
|
@ -239,6 +239,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
|||
const finalServiceConfig =
|
||||
workingServiceConfig ?? this.defaultServiceConfig;
|
||||
this.onSuccessfulResolution(
|
||||
finalServiceConfig,
|
||||
configSelector ?? getDefaultConfigSelector(finalServiceConfig)
|
||||
);
|
||||
},
|
||||
|
|
|
@ -0,0 +1,590 @@
|
|||
/*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
import { CallCredentials } from "./call-credentials";
|
||||
import { LogVerbosity, Status } from "./constants";
|
||||
import { Deadline } from "./deadline";
|
||||
import { Metadata } from "./metadata";
|
||||
import { CallConfig } from "./resolver";
|
||||
import * as logging from './logging';
|
||||
import { Call, InterceptingListener, MessageContext, StatusObject, WriteCallback, WriteObject } from "./call-interface";
|
||||
import { LoadBalancingCall, StatusObjectWithProgress } from "./load-balancing-call";
|
||||
import { InternalChannel } from "./internal-channel";
|
||||
|
||||
const TRACER_NAME = 'retrying_call';
|
||||
|
||||
export class RetryThrottler {
|
||||
private tokens: number;
|
||||
constructor(private readonly maxTokens: number, private readonly tokenRatio: number, previousRetryThrottler?: RetryThrottler) {
|
||||
if (previousRetryThrottler) {
|
||||
/* When carrying over tokens from a previous config, rescale them to the
|
||||
* new max value */
|
||||
this.tokens = previousRetryThrottler.tokens * (maxTokens / previousRetryThrottler.maxTokens);
|
||||
} else {
|
||||
this.tokens = maxTokens;
|
||||
}
|
||||
}
|
||||
|
||||
addCallSucceeded() {
|
||||
this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
|
||||
}
|
||||
|
||||
addCallFailed() {
|
||||
this.tokens = Math.min(this.tokens - 1, 0);
|
||||
}
|
||||
|
||||
canRetryCall() {
|
||||
return this.tokens > this.maxTokens / 2;
|
||||
}
|
||||
}
|
||||
|
||||
export class MessageBufferTracker {
|
||||
private totalAllocated: number = 0;
|
||||
private allocatedPerCall: Map<number, number> = new Map<number, number>();
|
||||
|
||||
constructor(private totalLimit: number, private limitPerCall: number) {}
|
||||
|
||||
allocate(size: number, callId: number): boolean {
|
||||
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
|
||||
if (this.limitPerCall - currentPerCall < size || this.totalLimit - this.totalAllocated < size) {
|
||||
return false;
|
||||
}
|
||||
this.allocatedPerCall.set(callId, currentPerCall + size);
|
||||
this.totalAllocated += size;
|
||||
return true;
|
||||
}
|
||||
|
||||
free(size: number, callId: number) {
|
||||
if (this.totalAllocated < size) {
|
||||
throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`);
|
||||
}
|
||||
this.totalAllocated -= size;
|
||||
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
|
||||
if (currentPerCall < size) {
|
||||
throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`);
|
||||
}
|
||||
this.allocatedPerCall.set(callId, currentPerCall - size);
|
||||
}
|
||||
|
||||
freeAll(callId: number) {
|
||||
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
|
||||
if (this.totalAllocated < currentPerCall) {
|
||||
throw new Error(`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`);
|
||||
}
|
||||
this.totalAllocated -= currentPerCall;
|
||||
this.allocatedPerCall.delete(callId);
|
||||
}
|
||||
}
|
||||
|
||||
type UnderlyingCallState = 'ACTIVE' | 'COMPLETED';
|
||||
|
||||
interface UnderlyingCall {
|
||||
state: UnderlyingCallState;
|
||||
call: LoadBalancingCall;
|
||||
nextMessageToSend: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* A retrying call can be in one of these states:
|
||||
* RETRY: Retries are configured and new attempts may be sent
|
||||
* HEDGING: Hedging is configured and new attempts may be sent
|
||||
* TRANSPARENT_ONLY: Neither retries nor hedging are configured, and
|
||||
* transparent retry attempts may still be sent
|
||||
* COMMITTED: One attempt is committed, and no new attempts will be
|
||||
* sent
|
||||
*/
|
||||
type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';
|
||||
|
||||
/**
|
||||
* The different types of objects that can be stored in the write buffer, with
|
||||
* the following meanings:
|
||||
* MESSAGE: This is a message to be sent.
|
||||
* HALF_CLOSE: When this entry is reached, the calls should send a half-close.
|
||||
* FREED: This slot previously contained a message that has been sent on all
|
||||
* child calls and is no longer needed.
|
||||
*/
|
||||
type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED';
|
||||
|
||||
/**
|
||||
* Entry in the buffer of messages to send to the remote end.
|
||||
*/
|
||||
interface WriteBufferEntry {
|
||||
entryType: WriteBufferEntryType;
|
||||
/**
|
||||
* Message to send.
|
||||
* Only populated if entryType is MESSAGE.
|
||||
*/
|
||||
message?: WriteObject;
|
||||
/**
|
||||
* Callback to call after sending the message.
|
||||
* Only populated if entryType is MESSAGE and the call is in the COMMITTED
|
||||
* state.
|
||||
*/
|
||||
callback?: WriteCallback;
|
||||
}
|
||||
|
||||
const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
|
||||
|
||||
export class RetryingCall implements Call {
|
||||
private state: RetryingCallState;
|
||||
private listener: InterceptingListener | null = null;
|
||||
private initialMetadata: Metadata | null = null;
|
||||
private underlyingCalls: UnderlyingCall[] = [];
|
||||
private writeBuffer: WriteBufferEntry[] = [];
|
||||
private transparentRetryUsed: boolean = false;
|
||||
/**
|
||||
* Number of attempts so far
|
||||
*/
|
||||
private attempts: number = 0;
|
||||
private hedgingTimer: NodeJS.Timer | null = null;
|
||||
private committedCallIndex: number | null = null;
|
||||
private initialRetryBackoffSec = 0;
|
||||
private nextRetryBackoffSec = 0;
|
||||
constructor(
|
||||
private readonly channel: InternalChannel,
|
||||
private readonly callConfig: CallConfig,
|
||||
private readonly methodName: string,
|
||||
private readonly host: string,
|
||||
private readonly credentials: CallCredentials,
|
||||
private readonly deadline: Deadline,
|
||||
private readonly callNumber: number,
|
||||
private readonly bufferTracker: MessageBufferTracker,
|
||||
private readonly retryThrottler?: RetryThrottler
|
||||
) {
|
||||
if (callConfig.methodConfig.retryPolicy) {
|
||||
this.state = 'RETRY';
|
||||
const retryPolicy = callConfig.methodConfig.retryPolicy;
|
||||
this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(retryPolicy.initialBackoff.substring(0, retryPolicy.initialBackoff.length - 1));
|
||||
} else if (callConfig.methodConfig.hedgingPolicy) {
|
||||
this.state = 'HEDGING';
|
||||
} else {
|
||||
this.state = 'TRANSPARENT_ONLY';
|
||||
}
|
||||
}
|
||||
getCallNumber(): number {
|
||||
return this.callNumber;
|
||||
}
|
||||
|
||||
private trace(text: string): void {
|
||||
logging.trace(
|
||||
LogVerbosity.DEBUG,
|
||||
TRACER_NAME,
|
||||
'[' + this.callNumber + '] ' + text
|
||||
);
|
||||
}
|
||||
|
||||
private reportStatus(statusObject: StatusObject) {
|
||||
this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
|
||||
process.nextTick(() => {
|
||||
this.listener?.onReceiveStatus(statusObject);
|
||||
});
|
||||
}
|
||||
|
||||
cancelWithStatus(status: Status, details: string): void {
|
||||
this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
|
||||
this.reportStatus({code: status, details, metadata: new Metadata()});
|
||||
for (const {call} of this.underlyingCalls) {
|
||||
call.cancelWithStatus(status, details);
|
||||
}
|
||||
}
|
||||
getPeer(): string {
|
||||
if (this.committedCallIndex !== null) {
|
||||
return this.underlyingCalls[this.committedCallIndex].call.getPeer();
|
||||
} else {
|
||||
return 'unknown';
|
||||
}
|
||||
}
|
||||
|
||||
private commitCall(index: number) {
|
||||
if (this.state === 'COMMITTED') {
|
||||
return;
|
||||
}
|
||||
if (this.underlyingCalls[index].state === 'COMPLETED') {
|
||||
return;
|
||||
}
|
||||
this.trace('Committing call [' + this.underlyingCalls[index].call.getCallNumber() + '] at index ' + index);
|
||||
this.state = 'COMMITTED';
|
||||
this.committedCallIndex = index;
|
||||
for (let i = 0; i < this.underlyingCalls.length; i++) {
|
||||
if (i === index) {
|
||||
continue;
|
||||
}
|
||||
if (this.underlyingCalls[i].state === 'COMPLETED') {
|
||||
continue;
|
||||
}
|
||||
this.underlyingCalls[i].state = 'COMPLETED';
|
||||
this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt');
|
||||
}
|
||||
for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) {
|
||||
const bufferEntry = this.writeBuffer[messageIndex];
|
||||
if (bufferEntry.entryType === 'MESSAGE') {
|
||||
this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
|
||||
this.writeBuffer[messageIndex] = {
|
||||
entryType: 'FREED'
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private commitCallWithMostMessages() {
|
||||
let mostMessages = -1;
|
||||
let callWithMostMessages = -1;
|
||||
for (const [index, childCall] of this.underlyingCalls.entries()) {
|
||||
if (childCall.nextMessageToSend > mostMessages) {
|
||||
mostMessages = childCall.nextMessageToSend;
|
||||
callWithMostMessages = index;
|
||||
}
|
||||
}
|
||||
this.commitCall(callWithMostMessages);
|
||||
}
|
||||
|
||||
private isStatusCodeInList(list: (Status | string)[], code: Status) {
|
||||
return list.some((value => value === code || value.toString().toLowerCase() === Status[code].toLowerCase()));
|
||||
}
|
||||
|
||||
private getNextRetryBackoffMs() {
|
||||
const retryPolicy = this.callConfig?.methodConfig.retryPolicy;
|
||||
if (!retryPolicy) {
|
||||
return 0;
|
||||
}
|
||||
const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
|
||||
const maxBackoffSec = Number(retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1));
|
||||
this.nextRetryBackoffSec = Math.min(this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, maxBackoffSec);
|
||||
return nextBackoffMs
|
||||
}
|
||||
|
||||
private maybeRetryCall(pushback: number | null, callback: (retried: boolean) => void) {
|
||||
if (this.state !== 'RETRY') {
|
||||
callback(false);
|
||||
return;
|
||||
}
|
||||
const retryPolicy = this.callConfig!.methodConfig.retryPolicy!;
|
||||
if (this.attempts >= retryPolicy.maxAttempts) {
|
||||
callback(false);
|
||||
return;
|
||||
}
|
||||
let retryDelayMs: number;
|
||||
if (pushback === null) {
|
||||
retryDelayMs = this.getNextRetryBackoffMs();
|
||||
} else if (pushback < 0) {
|
||||
this.state = 'TRANSPARENT_ONLY';
|
||||
callback(false);
|
||||
return;
|
||||
} else {
|
||||
retryDelayMs = pushback;
|
||||
this.nextRetryBackoffSec = this.initialRetryBackoffSec;
|
||||
}
|
||||
setTimeout(() => {
|
||||
if (this.state !== 'RETRY') {
|
||||
callback(false);
|
||||
return;
|
||||
}
|
||||
if (this.retryThrottler?.canRetryCall() ?? true) {
|
||||
callback(true);
|
||||
this.attempts += 1;
|
||||
this.startNewAttempt();
|
||||
}
|
||||
}, retryDelayMs);
|
||||
}
|
||||
|
||||
private countActiveCalls(): number {
|
||||
let count = 0;
|
||||
for (const call of this.underlyingCalls) {
|
||||
if (call?.state === 'ACTIVE') {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
private handleProcessedStatus(status: StatusObject, callIndex: number, pushback: number | null) {
|
||||
switch (this.state) {
|
||||
case 'COMMITTED':
|
||||
case 'TRANSPARENT_ONLY':
|
||||
this.commitCall(callIndex);
|
||||
this.reportStatus(status);
|
||||
break;
|
||||
case 'HEDGING':
|
||||
if (this.isStatusCodeInList(this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes, status.code)) {
|
||||
this.retryThrottler?.addCallFailed();
|
||||
let delayMs: number;
|
||||
if (pushback === null) {
|
||||
delayMs = 0;
|
||||
} else if (pushback < 0) {
|
||||
this.state = 'TRANSPARENT_ONLY';
|
||||
this.commitCall(callIndex);
|
||||
this.reportStatus(status);
|
||||
return;
|
||||
} else {
|
||||
delayMs = pushback;
|
||||
}
|
||||
setTimeout(() => {
|
||||
this.maybeStartHedgingAttempt();
|
||||
// If after trying to start a call there are no active calls, this was the last one
|
||||
if (this.countActiveCalls() === 0) {
|
||||
this.commitCall(callIndex);
|
||||
this.reportStatus(status);
|
||||
}
|
||||
}, delayMs);
|
||||
} else {
|
||||
this.commitCall(callIndex);
|
||||
this.reportStatus(status);
|
||||
}
|
||||
break;
|
||||
case 'RETRY':
|
||||
if (this.isStatusCodeInList(this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes, status.code)) {
|
||||
this.retryThrottler?.addCallFailed();
|
||||
this.maybeRetryCall(pushback, (retried) => {
|
||||
if (!retried) {
|
||||
this.commitCall(callIndex);
|
||||
this.reportStatus(status);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
this.commitCall(callIndex);
|
||||
this.reportStatus(status);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private getPushback(metadata: Metadata): number | null {
|
||||
const mdValue = metadata.get('grpc-retry-pushback-ms');
|
||||
if (mdValue.length === 0) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return parseInt(mdValue[0] as string);
|
||||
} catch (e) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private handleChildStatus(status: StatusObjectWithProgress, callIndex: number) {
|
||||
if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
|
||||
return;
|
||||
}
|
||||
this.underlyingCalls[callIndex].state = 'COMPLETED';
|
||||
if (status.code === Status.OK) {
|
||||
this.retryThrottler?.addCallSucceeded();
|
||||
this.commitCall(callIndex);
|
||||
this.reportStatus(status);
|
||||
return;
|
||||
}
|
||||
if (this.state === 'COMMITTED') {
|
||||
this.reportStatus(status);
|
||||
return;
|
||||
}
|
||||
const pushback = this.getPushback(status.metadata);
|
||||
switch (status.progress) {
|
||||
case 'NOT_STARTED':
|
||||
// RPC never leaves the client, always safe to retry
|
||||
this.startNewAttempt();
|
||||
break;
|
||||
case 'REFUSED':
|
||||
// RPC reaches the server library, but not the server application logic
|
||||
if (this.transparentRetryUsed) {
|
||||
this.handleProcessedStatus(status, callIndex, pushback);
|
||||
} else {
|
||||
this.transparentRetryUsed = true;
|
||||
this.startNewAttempt();
|
||||
};
|
||||
break;
|
||||
case 'DROP':
|
||||
this.commitCall(callIndex);
|
||||
this.reportStatus(status);
|
||||
break;
|
||||
case 'PROCESSED':
|
||||
this.handleProcessedStatus(status, callIndex, pushback);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private maybeStartHedgingAttempt() {
|
||||
if (this.state !== 'HEDGING') {
|
||||
return;
|
||||
}
|
||||
if (!this.callConfig.methodConfig.hedgingPolicy) {
|
||||
return;
|
||||
}
|
||||
const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
|
||||
if (this.attempts >= hedgingPolicy.maxAttempts) {
|
||||
return;
|
||||
}
|
||||
this.attempts += 1;
|
||||
this.startNewAttempt();
|
||||
this.maybeStartHedgingTimer();
|
||||
}
|
||||
|
||||
private maybeStartHedgingTimer() {
|
||||
if (this.hedgingTimer) {
|
||||
clearTimeout(this.hedgingTimer);
|
||||
}
|
||||
if (this.state !== 'HEDGING') {
|
||||
return;
|
||||
}
|
||||
if (!this.callConfig.methodConfig.hedgingPolicy) {
|
||||
return;
|
||||
}
|
||||
const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
|
||||
if (this.attempts >= hedgingPolicy.maxAttempts) {
|
||||
return;
|
||||
}
|
||||
const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s';
|
||||
const hedgingDelaySec = Number(hedgingDelayString.substring(0, hedgingDelayString.length - 1));
|
||||
this.hedgingTimer = setTimeout(() => {
|
||||
this.maybeStartHedgingAttempt();
|
||||
}, hedgingDelaySec * 1000);
|
||||
this.hedgingTimer.unref?.();
|
||||
}
|
||||
|
||||
private startNewAttempt() {
|
||||
const child = this.channel.createLoadBalancingCall(this.callConfig, this.methodName, this.host, this.credentials, this.deadline);
|
||||
this.trace('Created child call [' + child.getCallNumber() + '] for attempt ' + this.attempts);
|
||||
const index = this.underlyingCalls.length;
|
||||
this.underlyingCalls.push({state: 'ACTIVE', call: child, nextMessageToSend: 0});
|
||||
const previousAttempts = this.attempts - 1;
|
||||
const initialMetadata = this.initialMetadata!.clone();
|
||||
if (previousAttempts > 0) {
|
||||
initialMetadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
|
||||
}
|
||||
let receivedMetadata = false;
|
||||
child.start(initialMetadata, {
|
||||
onReceiveMetadata: metadata => {
|
||||
this.commitCall(index);
|
||||
receivedMetadata = true;
|
||||
if (previousAttempts > 0) {
|
||||
metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
|
||||
}
|
||||
if (this.underlyingCalls[index].state === 'ACTIVE') {
|
||||
this.listener!.onReceiveMetadata(metadata);
|
||||
}
|
||||
},
|
||||
onReceiveMessage: message => {
|
||||
this.commitCall(index);
|
||||
if (this.underlyingCalls[index].state === 'ACTIVE') {
|
||||
this.listener!.onReceiveMessage(message);
|
||||
}
|
||||
},
|
||||
onReceiveStatus: status => {
|
||||
if (!receivedMetadata && previousAttempts > 0) {
|
||||
status.metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
|
||||
}
|
||||
this.commitCall(index);
|
||||
this.handleChildStatus(status, index);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
start(metadata: Metadata, listener: InterceptingListener): void {
|
||||
this.trace('start called');
|
||||
this.listener = listener;
|
||||
this.initialMetadata = metadata;
|
||||
this.attempts += 1;
|
||||
this.startNewAttempt();
|
||||
this.maybeStartHedgingTimer();
|
||||
}
|
||||
|
||||
private sendNextChildMessage(childIndex: number) {
|
||||
const childCall = this.underlyingCalls[childIndex];
|
||||
if (childCall.state === 'COMPLETED') {
|
||||
return;
|
||||
}
|
||||
if (this.writeBuffer[childCall.nextMessageToSend]) {
|
||||
const bufferEntry = this.writeBuffer[childCall.nextMessageToSend];
|
||||
switch (bufferEntry.entryType) {
|
||||
case 'MESSAGE':
|
||||
childCall.call.sendMessageWithContext({
|
||||
callback: (error) => {
|
||||
// Ignore error
|
||||
childCall.nextMessageToSend += 1;
|
||||
this.sendNextChildMessage(childIndex);
|
||||
}
|
||||
}, bufferEntry.message!.message);
|
||||
break;
|
||||
case 'HALF_CLOSE':
|
||||
childCall.nextMessageToSend += 1;
|
||||
childCall.call.halfClose();
|
||||
break;
|
||||
case 'FREED':
|
||||
// Should not be possible
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
||||
this.trace('write() called with message of length ' + message.length);
|
||||
const writeObj: WriteObject = {
|
||||
message,
|
||||
flags: context.flags,
|
||||
};
|
||||
const messageIndex = this.writeBuffer.length;
|
||||
const bufferEntry: WriteBufferEntry = {
|
||||
entryType: 'MESSAGE',
|
||||
message: writeObj
|
||||
};
|
||||
this.writeBuffer[messageIndex] = bufferEntry;
|
||||
if (this.bufferTracker.allocate(message.length, this.callNumber)) {
|
||||
context.callback?.();
|
||||
for (const [callIndex, call] of this.underlyingCalls.entries()) {
|
||||
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
|
||||
call.call.sendMessageWithContext({
|
||||
callback: (error) => {
|
||||
// Ignore error
|
||||
call.nextMessageToSend += 1;
|
||||
this.sendNextChildMessage(callIndex);
|
||||
}
|
||||
}, message);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
this.commitCallWithMostMessages();
|
||||
bufferEntry.callback = context.callback;
|
||||
}
|
||||
}
|
||||
startRead(): void {
|
||||
this.trace('startRead called');
|
||||
for (const underlyingCall of this.underlyingCalls) {
|
||||
if (underlyingCall?.state === 'ACTIVE') {
|
||||
underlyingCall.call.startRead();
|
||||
}
|
||||
}
|
||||
}
|
||||
halfClose(): void {
|
||||
this.trace('halfClose called');
|
||||
const halfCloseIndex = this.writeBuffer.length;
|
||||
this.writeBuffer[halfCloseIndex] = {
|
||||
entryType: 'HALF_CLOSE'
|
||||
};
|
||||
for (const call of this.underlyingCalls) {
|
||||
if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
|
||||
call.nextMessageToSend += 1;
|
||||
call.call.halfClose();
|
||||
}
|
||||
}
|
||||
}
|
||||
setCredentials(newCredentials: CallCredentials): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
getMethod(): string {
|
||||
return this.methodName;
|
||||
}
|
||||
getHost(): string {
|
||||
return this.host;
|
||||
}
|
||||
}
|
|
@ -86,7 +86,7 @@ export interface ServiceConfigCanaryConfig {
|
|||
* Recognizes a number with up to 9 digits after the decimal point, followed by
|
||||
* an "s", representing a number of seconds.
|
||||
*/
|
||||
const TIMEOUT_REGEX = /^\d+(\.\d{1,9})?s$/;
|
||||
const DURATION_REGEX = /^\d+(\.\d{1,9})?s$/;
|
||||
|
||||
/**
|
||||
* Client language name used for determining whether this client matches a
|
||||
|
@ -111,6 +111,75 @@ function validateName(obj: any): MethodConfigName {
|
|||
return result;
|
||||
}
|
||||
|
||||
function validateRetryPolicy(obj: any): RetryPolicy {
|
||||
if (!('maxAttempts' in obj) || !Number.isInteger(obj.maxAttempts) || obj.maxAttempts < 2) {
|
||||
throw new Error('Invalid method config retry policy: maxAttempts must be an integer at least 2');
|
||||
}
|
||||
if (!('initialBackoff' in obj) || typeof obj.initialBackoff !== 'string' || !DURATION_REGEX.test(obj.initialBackoff)) {
|
||||
throw new Error('Invalid method config retry policy: initialBackoff must be a string consisting of a positive integer followed by s');
|
||||
}
|
||||
if (!('maxBackoff' in obj) || typeof obj.maxBackoff !== 'string' || !DURATION_REGEX.test(obj.maxBackoff)) {
|
||||
throw new Error('Invalid method config retry policy: maxBackoff must be a string consisting of a positive integer followed by s');
|
||||
}
|
||||
if (!('backoffMultiplier' in obj) || typeof obj.backoffMultiplier !== 'number' || obj.backoffMultiplier <= 0) {
|
||||
throw new Error('Invalid method config retry policy: backoffMultiplier must be a number greater than 0');
|
||||
}
|
||||
if (('retryableStatusCodes' in obj) && Array.isArray(obj.retryableStatusCodes)) {
|
||||
for (const value of obj.retryableStatusCodes) {
|
||||
if (typeof value === 'number') {
|
||||
if (!Object.values(Status).includes(value)) {
|
||||
throw new Error('Invlid method config retry policy: retryableStatusCodes value not in status code range');
|
||||
}
|
||||
} else if (typeof value === 'string') {
|
||||
if (!Object.values(Status).includes(value.toUpperCase())) {
|
||||
throw new Error('Invlid method config retry policy: retryableStatusCodes value not a status code name');
|
||||
}
|
||||
} else {
|
||||
throw new Error('Invlid method config retry policy: retryableStatusCodes value must be a string or number');
|
||||
}
|
||||
}
|
||||
}
|
||||
return {
|
||||
maxAttempts: obj.maxAttempts,
|
||||
initialBackoff: obj.initialBackoff,
|
||||
maxBackoff: obj.maxBackoff,
|
||||
backoffMultiplier: obj.backoffMultiplier,
|
||||
retryableStatusCodes: obj.retryableStatusCodes
|
||||
};
|
||||
}
|
||||
|
||||
function validateHedgingPolicy(obj: any): HedgingPolicy {
|
||||
if (!('maxAttempts' in obj) || !Number.isInteger(obj.maxAttempts) || obj.maxAttempts < 2) {
|
||||
throw new Error('Invalid method config hedging policy: maxAttempts must be an integer at least 2');
|
||||
}
|
||||
if (('hedgingDelay' in obj) && (typeof obj.hedgingDelay !== 'string' || !DURATION_REGEX.test(obj.hedgingDelay))) {
|
||||
throw new Error('Invalid method config hedging policy: hedgingDelay must be a string consisting of a positive integer followed by s');
|
||||
}
|
||||
if (('nonFatalStatusCodes' in obj) && Array.isArray(obj.nonFatalStatusCodes)) {
|
||||
for (const value of obj.nonFatalStatusCodes) {
|
||||
if (typeof value === 'number') {
|
||||
if (!Object.values(Status).includes(value)) {
|
||||
throw new Error('Invlid method config hedging policy: nonFatalStatusCodes value not in status code range');
|
||||
}
|
||||
} else if (typeof value === 'string') {
|
||||
if (!Object.values(Status).includes(value.toUpperCase())) {
|
||||
throw new Error('Invlid method config hedging policy: nonFatalStatusCodes value not a status code name');
|
||||
}
|
||||
} else {
|
||||
throw new Error('Invlid method config hedging policy: nonFatalStatusCodes value must be a string or number');
|
||||
}
|
||||
}
|
||||
}
|
||||
const result: HedgingPolicy = {
|
||||
maxAttempts: obj.maxAttempts,
|
||||
nonFatalStatusCodes: obj.nonFatalStatusCodes
|
||||
}
|
||||
if (obj.hedgingDelay) {
|
||||
result.hedgingDelay = obj.hedgingDelay;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function validateMethodConfig(obj: any): MethodConfig {
|
||||
const result: MethodConfig = {
|
||||
name: [],
|
||||
|
@ -144,7 +213,7 @@ function validateMethodConfig(obj: any): MethodConfig {
|
|||
result.timeout = obj.timeout;
|
||||
} else if (
|
||||
typeof obj.timeout === 'string' &&
|
||||
TIMEOUT_REGEX.test(obj.timeout)
|
||||
DURATION_REGEX.test(obj.timeout)
|
||||
) {
|
||||
const timeoutParts = obj.timeout
|
||||
.substring(0, obj.timeout.length - 1)
|
||||
|
@ -169,9 +238,31 @@ function validateMethodConfig(obj: any): MethodConfig {
|
|||
}
|
||||
result.maxResponseBytes = obj.maxResponseBytes;
|
||||
}
|
||||
if ('retryPolicy' in obj) {
|
||||
if ('hedgingPolicy' in obj) {
|
||||
throw new Error('Invalid method config: retryPolicy and hedgingPolicy cannot both be specified');
|
||||
} else {
|
||||
result.retryPolicy = validateRetryPolicy(obj.retryPolicy);
|
||||
}
|
||||
} else if ('hedgingPolicy' in obj) {
|
||||
result.hedgingPolicy = validateHedgingPolicy(obj.hedgingPolicy);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
export function validateRetryThrottling(obj: any): RetryThrottling {
|
||||
if (!('maxTokens' in obj) || typeof obj.maxTokens !== 'number' || obj.maxTokens <=0 || obj.maxTokens > 1000) {
|
||||
throw new Error('Invalid retryThrottling: maxTokens must be a number in (0, 1000]');
|
||||
}
|
||||
if (!('tokenRatio' in obj) || typeof obj.tokenRatio !== 'number' || obj.tokenRatio <= 0) {
|
||||
throw new Error('Invalid retryThrottling: tokenRatio must be a number greater than 0');
|
||||
}
|
||||
return {
|
||||
maxTokens: +(obj.maxTokens as number).toFixed(3),
|
||||
tokenRatio: +(obj.tokenRatio as number).toFixed(3)
|
||||
};
|
||||
}
|
||||
|
||||
export function validateServiceConfig(obj: any): ServiceConfig {
|
||||
const result: ServiceConfig = {
|
||||
loadBalancingConfig: [],
|
||||
|
|
|
@ -75,6 +75,14 @@ export interface SubchannelCall {
|
|||
getCallNumber(): number;
|
||||
}
|
||||
|
||||
export interface StatusObjectWithRstCode extends StatusObject {
|
||||
rstCode?: number;
|
||||
}
|
||||
|
||||
export interface SubchannelCallInterceptingListener extends InterceptingListener {
|
||||
onReceiveStatus(status: StatusObjectWithRstCode): void;
|
||||
}
|
||||
|
||||
export class Http2SubchannelCall implements SubchannelCall {
|
||||
private decoder = new StreamDecoder();
|
||||
|
||||
|
@ -103,7 +111,7 @@ export class Http2SubchannelCall implements SubchannelCall {
|
|||
constructor(
|
||||
private readonly http2Stream: http2.ClientHttp2Stream,
|
||||
private readonly callStatsTracker: SubchannelCallStatsTracker,
|
||||
private readonly listener: InterceptingListener,
|
||||
private readonly listener: SubchannelCallInterceptingListener,
|
||||
private readonly subchannel: Subchannel,
|
||||
private readonly callId: number
|
||||
) {
|
||||
|
@ -257,7 +265,7 @@ export class Http2SubchannelCall implements SubchannelCall {
|
|||
// This is OK, because status codes emitted here correspond to more
|
||||
// catastrophic issues that prevent us from receiving trailers in the
|
||||
// first place.
|
||||
this.endCall({ code, details, metadata: new Metadata() });
|
||||
this.endCall({ code, details, metadata: new Metadata(), rstCode: http2Stream.rstCode });
|
||||
});
|
||||
});
|
||||
http2Stream.on('error', (err: SystemError) => {
|
||||
|
@ -329,7 +337,7 @@ export class Http2SubchannelCall implements SubchannelCall {
|
|||
* Subsequent calls are no-ops.
|
||||
* @param status The status of the call.
|
||||
*/
|
||||
private endCall(status: StatusObject): void {
|
||||
private endCall(status: StatusObjectWithRstCode): void {
|
||||
/* If the status is OK and a new status comes in (e.g. from a
|
||||
* deserialization failure), that new status takes priority */
|
||||
if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
|
||||
|
|
|
@ -36,7 +36,7 @@ import {
|
|||
} from './subchannel-address';
|
||||
import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz';
|
||||
import { ConnectivityStateListener } from './subchannel-interface';
|
||||
import { Http2SubchannelCall } from './subchannel-call';
|
||||
import { Http2SubchannelCall, SubchannelCallInterceptingListener } from './subchannel-call';
|
||||
import { getNextCallNumber } from './call-number';
|
||||
import { SubchannelCall } from './subchannel-call';
|
||||
import { InterceptingListener, StatusObject } from './call-interface';
|
||||
|
@ -815,7 +815,7 @@ export class Subchannel {
|
|||
return false;
|
||||
}
|
||||
|
||||
createCall(metadata: Metadata, host: string, method: string, listener: InterceptingListener): SubchannelCall {
|
||||
createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener): SubchannelCall {
|
||||
const headers = metadata.toHttp2Headers();
|
||||
headers[HTTP2_HEADER_AUTHORITY] = host;
|
||||
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
|
||||
|
|
Loading…
Reference in New Issue