mirror of https://github.com/grpc/grpc-node.git
grpc-js-xds: Implement retry support
This commit is contained in:
parent
89e132ad3a
commit
edf612a56a
|
@ -16,4 +16,5 @@
|
|||
*/
|
||||
|
||||
export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION ?? 'true') === 'true';
|
||||
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
|
||||
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
|
||||
export const EXPERIMENTAL_RETRY = process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY === 'true';
|
|
@ -44,9 +44,10 @@ import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resourc
|
|||
import Duration = experimental.Duration;
|
||||
import { Duration__Output } from './generated/google/protobuf/Duration';
|
||||
import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
|
||||
import { EXPERIMENTAL_FAULT_INJECTION } from './environment';
|
||||
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from './environment';
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
import RetryPolicy = experimental.RetryPolicy;
|
||||
|
||||
const TRACER_NAME = 'xds_resolver';
|
||||
|
||||
|
@ -199,6 +200,24 @@ function protoDurationToDuration(duration: Duration__Output): Duration {
|
|||
}
|
||||
}
|
||||
|
||||
function protoDurationToSecondsString(duration: Duration__Output): string {
|
||||
return `${duration.seconds + duration.nanos / 1_000_000_000}s`;
|
||||
}
|
||||
|
||||
const DEFAULT_RETRY_BASE_INTERVAL = '0.025s'
|
||||
|
||||
function getDefaultRetryMaxInterval(baseInterval: string): string {
|
||||
return `${Number.parseFloat(baseInterval.substring(0, baseInterval.length - 1)) * 10}s`;
|
||||
}
|
||||
|
||||
const RETRY_CODES: {[key: string]: status} = {
|
||||
'cancelled': status.CANCELLED,
|
||||
'deadline-exceeded': status.DEADLINE_EXCEEDED,
|
||||
'internal': status.INTERNAL,
|
||||
'resource-exhausted': status.RESOURCE_EXHAUSTED,
|
||||
'unavailable': status.UNAVAILABLE
|
||||
};
|
||||
|
||||
class XdsResolver implements Resolver {
|
||||
private hasReportedSuccess = false;
|
||||
|
||||
|
@ -363,6 +382,33 @@ class XdsResolver implements Resolver {
|
|||
}
|
||||
}
|
||||
}
|
||||
let retryPolicy: RetryPolicy | undefined = undefined;
|
||||
if (EXPERIMENTAL_RETRY) {
|
||||
const retryConfig = route.route!.retry_policy ?? virtualHost.retry_policy;
|
||||
if (retryConfig) {
|
||||
const retryableStatusCodes = [];
|
||||
for (const code of retryConfig.retry_on.split(',')) {
|
||||
if (RETRY_CODES[code]) {
|
||||
retryableStatusCodes.push(RETRY_CODES[code]);
|
||||
}
|
||||
}
|
||||
if (retryableStatusCodes.length > 0) {
|
||||
const baseInterval = retryConfig.retry_back_off?.base_interval ?
|
||||
protoDurationToSecondsString(retryConfig.retry_back_off.base_interval) :
|
||||
DEFAULT_RETRY_BASE_INTERVAL;
|
||||
const maxInterval = retryConfig.retry_back_off?.max_interval ?
|
||||
protoDurationToSecondsString(retryConfig.retry_back_off.max_interval) :
|
||||
getDefaultRetryMaxInterval(baseInterval);
|
||||
retryPolicy = {
|
||||
backoffMultiplier: 2,
|
||||
initialBackoff: baseInterval,
|
||||
maxBackoff: maxInterval,
|
||||
maxAttempts: (retryConfig.num_retries?.value ?? 1) + 1,
|
||||
retryableStatusCodes: retryableStatusCodes
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
switch (route.route!.cluster_specifier) {
|
||||
case 'cluster_header':
|
||||
continue;
|
||||
|
@ -390,7 +436,7 @@ class XdsResolver implements Resolver {
|
|||
}
|
||||
}
|
||||
}
|
||||
routeAction = new SingleClusterRouteAction(cluster, timeout, extraFilterFactories);
|
||||
routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories);
|
||||
break;
|
||||
}
|
||||
case 'weighted_clusters': {
|
||||
|
@ -432,7 +478,7 @@ class XdsResolver implements Resolver {
|
|||
}
|
||||
weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories});
|
||||
}
|
||||
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, timeout);
|
||||
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy});
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -470,7 +516,7 @@ class XdsResolver implements Resolver {
|
|||
this.unrefCluster(clusterResult.name);
|
||||
}
|
||||
return {
|
||||
methodConfig: {name: [], timeout: action.getTimeout()},
|
||||
methodConfig: clusterResult.methodConfig,
|
||||
onCommitted: onCommitted,
|
||||
pickInformation: {cluster: clusterResult.name},
|
||||
status: status.OK,
|
||||
|
|
|
@ -18,16 +18,17 @@ import { experimental } from '@grpc/grpc-js';
|
|||
import Duration = experimental.Duration;
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
import MethodConfig = experimental.MethodConfig;
|
||||
|
||||
export interface ClusterResult {
|
||||
name: string;
|
||||
methodConfig: MethodConfig;
|
||||
dynamicFilterFactories: FilterFactory<Filter>[];
|
||||
}
|
||||
|
||||
export interface RouteAction {
|
||||
toString(): string;
|
||||
getCluster(): ClusterResult;
|
||||
getTimeout(): Duration | undefined;
|
||||
}
|
||||
|
||||
function durationToLogString(duration: Duration) {
|
||||
|
@ -40,25 +41,18 @@ function durationToLogString(duration: Duration) {
|
|||
}
|
||||
|
||||
export class SingleClusterRouteAction implements RouteAction {
|
||||
constructor(private cluster: string, private timeout: Duration | undefined, private extraFilterFactories: FilterFactory<Filter>[]) {}
|
||||
constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory<Filter>[]) {}
|
||||
|
||||
getCluster() {
|
||||
return {
|
||||
name: this.cluster,
|
||||
methodConfig: this.methodConfig,
|
||||
dynamicFilterFactories: this.extraFilterFactories
|
||||
};
|
||||
}
|
||||
|
||||
toString() {
|
||||
if (this.timeout) {
|
||||
return 'SingleCluster(' + this.cluster + ', ' + 'timeout=' + durationToLogString(this.timeout) + 's)';
|
||||
} else {
|
||||
return 'SingleCluster(' + this.cluster + ')';
|
||||
}
|
||||
}
|
||||
|
||||
getTimeout() {
|
||||
return this.timeout;
|
||||
return 'SingleCluster(' + this.cluster + ', ' + JSON.stringify(this.methodConfig) + ')';
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,7 +73,7 @@ export class WeightedClusterRouteAction implements RouteAction {
|
|||
* The weighted cluster choices represented as a CDF
|
||||
*/
|
||||
private clusterChoices: ClusterChoice[];
|
||||
constructor(private clusters: WeightedCluster[], private totalWeight: number, private timeout: Duration | undefined) {
|
||||
constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig) {
|
||||
this.clusterChoices = [];
|
||||
let lastNumerator = 0;
|
||||
for (const clusterWeight of clusters) {
|
||||
|
@ -94,24 +88,17 @@ export class WeightedClusterRouteAction implements RouteAction {
|
|||
if (randomNumber < choice.numerator) {
|
||||
return {
|
||||
name: choice.name,
|
||||
methodConfig: this.methodConfig,
|
||||
dynamicFilterFactories: choice.dynamicFilterFactories
|
||||
};
|
||||
}
|
||||
}
|
||||
// This should be prevented by the validation rules
|
||||
return {name: '', dynamicFilterFactories: []};
|
||||
return {name: '', methodConfig: this.methodConfig, dynamicFilterFactories: []};
|
||||
}
|
||||
|
||||
toString() {
|
||||
const clusterListString = this.clusters.map(({name, weight}) => '(' + name + ':' + weight + ')').join(', ')
|
||||
if (this.timeout) {
|
||||
return 'WeightedCluster(' + clusterListString + ', ' + 'timeout=' + durationToLogString(this.timeout) + 's)';
|
||||
} else {
|
||||
return 'WeightedCluster(' + clusterListString + ')';
|
||||
}
|
||||
}
|
||||
|
||||
getTimeout() {
|
||||
return this.timeout;
|
||||
return 'WeightedCluster(' + clusterListString + ', ' + JSON.stringify(this.methodConfig) + ')';
|
||||
}
|
||||
}
|
|
@ -15,8 +15,10 @@
|
|||
*
|
||||
*/
|
||||
|
||||
import { EXPERIMENTAL_FAULT_INJECTION } from "../environment";
|
||||
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from "../environment";
|
||||
import { RetryPolicy__Output } from "../generated/envoy/config/route/v3/RetryPolicy";
|
||||
import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration";
|
||||
import { Duration__Output } from "../generated/google/protobuf/Duration";
|
||||
import { validateOverrideFilter } from "../http-filter";
|
||||
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";
|
||||
|
||||
|
@ -30,6 +32,13 @@ const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
|
|||
'suffix_match'];
|
||||
const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header'];
|
||||
|
||||
function durationToMs(duration: Duration__Output | null): number | null {
|
||||
if (duration === null) {
|
||||
return null;
|
||||
}
|
||||
return (Number.parseInt(duration.seconds) * 1000 + duration.nanos / 1_000_000) | 0;
|
||||
}
|
||||
|
||||
export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> implements XdsStreamState<RouteConfiguration__Output> {
|
||||
protected isStateOfTheWorld(): boolean {
|
||||
return false;
|
||||
|
@ -40,6 +49,28 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
|
|||
protected getProtocolName(): string {
|
||||
return 'RDS';
|
||||
}
|
||||
|
||||
private validateRetryPolicy(policy: RetryPolicy__Output | null): boolean {
|
||||
if (policy === null) {
|
||||
return true;
|
||||
}
|
||||
const numRetries = policy.num_retries?.value ?? 1
|
||||
if (numRetries < 1) {
|
||||
return false;
|
||||
}
|
||||
if (policy.retry_back_off) {
|
||||
if (!policy.retry_back_off.base_interval) {
|
||||
return false;
|
||||
}
|
||||
const baseInterval = durationToMs(policy.retry_back_off.base_interval)!;
|
||||
const maxInterval = durationToMs(policy.retry_back_off.max_interval) ?? (10 * baseInterval);
|
||||
if (!(maxInterval >= baseInterval) && (baseInterval > 0)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
validateResponse(message: RouteConfiguration__Output): boolean {
|
||||
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
|
||||
for (const virtualHost of message.virtual_hosts) {
|
||||
|
@ -62,6 +93,11 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
|
|||
}
|
||||
}
|
||||
}
|
||||
if (EXPERIMENTAL_RETRY) {
|
||||
if (!this.validateRetryPolicy(virtualHost.retry_policy)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (const route of virtualHost.routes) {
|
||||
const match = route.match;
|
||||
if (!match) {
|
||||
|
@ -88,6 +124,11 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
|
|||
}
|
||||
}
|
||||
}
|
||||
if (EXPERIMENTAL_RETRY) {
|
||||
if (!this.validateRetryPolicy(route.route.retry_policy)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (route.route!.cluster_specifier === 'weighted_clusters') {
|
||||
if (route.route.weighted_clusters!.total_weight?.value === 0) {
|
||||
return false;
|
||||
|
|
|
@ -7,7 +7,7 @@ export {
|
|||
} from './resolver';
|
||||
export { GrpcUri, uriToString } from './uri-parser';
|
||||
export { Duration, durationToMs } from './duration';
|
||||
export { ServiceConfig } from './service-config';
|
||||
export { ServiceConfig, MethodConfig, RetryPolicy } from './service-config';
|
||||
export { BackoffTimeout } from './backoff-timeout';
|
||||
export {
|
||||
LoadBalancer,
|
||||
|
|
Loading…
Reference in New Issue