Merge pull request #2058 from murgatroid99/grpc-js_outlier_detection

grpc-js: Add outlier detection LB policy
This commit is contained in:
Michael Lumish 2022-03-14 12:51:07 -07:00 committed by GitHub
commit 157de1a0a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 841 additions and 30 deletions

View File

@ -67,6 +67,7 @@ const compile = checkTask(() => execNpmCommand('compile'));
const copyTestFixtures = checkTask(() => ncpP(`${jsCoreDir}/test/fixtures`, `${outDir}/test/fixtures`));
const runTests = checkTask(() => {
process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION = 'true';
return gulp.src(`${outDir}/test/**/*.js`)
.pipe(mocha({reporter: 'mocha-jenkins-reporter',
require: ['ts-node/register']}));

View File

@ -49,6 +49,7 @@ import { Filter } from './filter';
import { ConnectivityState } from './connectivity-state';
import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
import { Subchannel } from './subchannel';
/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
@ -451,7 +452,7 @@ export class ChannelImplementation implements Channel {
if (subchannelState === ConnectivityState.READY) {
try {
const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
pickResult.subchannel!.startCallStream(
pickResult.subchannel?.getRealSubchannel().startCallStream(
finalMetadata,
callStream,
[...dynamicFilters, ...pickExtraFilters]

View File

@ -0,0 +1,36 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
export interface Duration {
seconds: number;
nanos: number;
}
export function msToDuration(millis: number): Duration {
return {
seconds: (millis / 1000) | 0,
nanos: (millis % 1000) * 1_000_000 | 0
};
}
export function durationToMs(duration: Duration): number {
return (duration.seconds * 1000 + duration.nanos / 1_000_000) | 0;
}
export function isDuration(value: any): value is Duration {
return (typeof value.seconds === 'number') && (typeof value.nanos === 'number');
}

View File

@ -6,7 +6,8 @@ export {
ConfigSelector,
} from './resolver';
export { GrpcUri, uriToString } from './uri-parser';
export { ServiceConfig, Duration } from './service-config';
export { Duration } from './duration';
export { ServiceConfig } from './service-config';
export { BackoffTimeout } from './backoff-timeout';
export {
LoadBalancer,
@ -34,3 +35,4 @@ export { Call as CallStream } from './call-stream';
export { Filter, BaseFilter, FilterFactory } from './filter';
export { FilterStackFactory } from './filter-stack';
export { registerAdminService } from './admin';
export { SubchannelInterface, BaseSubchannelWrapper, ConnectivityStateListener } from './subchannel-interface'

View File

@ -273,6 +273,7 @@ import * as resolver_uds from './resolver-uds';
import * as resolver_ip from './resolver-ip';
import * as load_balancer_pick_first from './load-balancer-pick-first';
import * as load_balancer_round_robin from './load-balancer-round-robin';
import * as load_balancer_outlier_detection from './load-balancer-outlier-detection';
import * as channelz from './channelz';
const clientVersion = require('../../package.json').version;
@ -284,5 +285,6 @@ const clientVersion = require('../../package.json').version;
resolver_ip.setup();
load_balancer_pick_first.setup();
load_balancer_round_robin.setup();
load_balancer_outlier_detection.setup();
channelz.setup();
})();

View File

@ -21,12 +21,12 @@ import {
LoadBalancingConfig,
createLoadBalancer,
} from './load-balancer';
import { Subchannel } from './subchannel';
import { SubchannelAddress } from './subchannel-address';
import { ChannelOptions } from './channel-options';
import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker';
import { ChannelRef, SubchannelRef } from './channelz';
import { SubchannelInterface } from './subchannel-interface';
const TYPE_NAME = 'child_load_balancer_helper';
@ -40,7 +40,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
createSubchannel(
subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions
): Subchannel {
): SubchannelInterface {
return this.parent.channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs

View File

@ -0,0 +1,569 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { ChannelOptions, connectivityState, StatusObject } from ".";
import { Call } from "./call-stream";
import { ConnectivityState } from "./connectivity-state";
import { Status } from "./constants";
import { durationToMs, isDuration, msToDuration } from "./duration";
import { ChannelControlHelper, createChildChannelControlHelper, registerLoadBalancerType } from "./experimental";
import { BaseFilter, Filter, FilterFactory } from "./filter";
import { getFirstUsableConfig, LoadBalancer, LoadBalancingConfig, validateLoadBalancingConfig } from "./load-balancer";
import { ChildLoadBalancerHandler } from "./load-balancer-child-handler";
import { PickArgs, Picker, PickResult, PickResultType, QueuePicker, UnavailablePicker } from "./picker";
import { Subchannel } from "./subchannel";
import { SubchannelAddress, subchannelAddressToString } from "./subchannel-address";
import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from "./subchannel-interface";
const TYPE_NAME = 'outlier_detection';
const OUTLIER_DETECTION_ENABLED = process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION === 'true';
interface SuccessRateEjectionConfig {
readonly stdev_factor: number;
readonly enforcement_percentage: number;
readonly minimum_hosts: number;
readonly request_volume: number;
}
interface FailurePercentageEjectionConfig {
readonly threshold: number;
readonly enforcement_percentage: number;
readonly minimum_hosts: number;
readonly request_volume: number;
}
const defaultSuccessRateEjectionConfig: SuccessRateEjectionConfig = {
stdev_factor: 1900,
enforcement_percentage: 100,
minimum_hosts: 5,
request_volume: 100
};
const defaultFailurePercentageEjectionConfig: FailurePercentageEjectionConfig = {
threshold: 85,
enforcement_percentage: 100,
minimum_hosts: 5,
request_volume: 50
}
type TypeofValues = 'object' | 'boolean' | 'function' | 'number' | 'string' | 'undefined';
function validateFieldType(obj: any, fieldName: string, expectedType: TypeofValues, objectName?: string) {
if (fieldName in obj && typeof obj[fieldName] !== expectedType) {
const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
throw new Error(`outlier detection config ${fullFieldName} parse error: expected ${expectedType}, got ${typeof obj[fieldName]}`);
}
}
function validatePositiveDuration(obj: any, fieldName: string, objectName?: string) {
const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
if (fieldName in obj) {
if (!isDuration(obj[fieldName])) {
throw new Error(`outlier detection config ${fullFieldName} parse error: expected Duration, got ${typeof obj[fieldName]}`);
}
if (!(obj[fieldName].seconds >= 0 && obj[fieldName].seconds <= 315_576_000_000 && obj[fieldName].nanos >= 0 && obj[fieldName].nanos <= 999_999_999)) {
throw new Error(`outlier detection config ${fullFieldName} parse error: values out of range for non-negative Duaration`);
}
}
}
function validatePercentage(obj: any, fieldName: string, objectName?: string) {
const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
validateFieldType(obj, fieldName, 'number', objectName);
if (fieldName in obj && !(obj[fieldName] >= 0 && obj[fieldName] <= 100)) {
throw new Error(`outlier detection config ${fullFieldName} parse error: value out of range for percentage (0-100)`);
}
}
export class OutlierDetectionLoadBalancingConfig implements LoadBalancingConfig {
constructor(
private readonly intervalMs: number,
private readonly baseEjectionTimeMs: number,
private readonly maxEjectionTimeMs: number,
private readonly maxEjectionPercent: number,
private readonly successRateEjection: SuccessRateEjectionConfig | null,
private readonly failurePercentageEjection: FailurePercentageEjectionConfig | null,
private readonly childPolicy: LoadBalancingConfig[]
) {}
getLoadBalancerName(): string {
return TYPE_NAME;
}
toJsonObject(): object {
return {
interval: msToDuration(this.intervalMs),
base_ejection_time: msToDuration(this.baseEjectionTimeMs),
max_ejection_time: msToDuration(this.maxEjectionTimeMs),
max_ejection_percent: this.maxEjectionPercent,
success_rate_ejection: this.successRateEjection,
failure_percentage_ejection: this.failurePercentageEjection,
child_policy: this.childPolicy.map(policy => policy.toJsonObject())
};
}
getIntervalMs(): number {
return this.intervalMs;
}
getBaseEjectionTimeMs(): number {
return this.baseEjectionTimeMs;
}
getMaxEjectionTimeMs(): number {
return this.maxEjectionTimeMs;
}
getMaxEjectionPercent(): number {
return this.maxEjectionPercent;
}
getSuccessRateEjectionConfig(): SuccessRateEjectionConfig | null {
return this.successRateEjection;
}
getFailurePercentageEjectionConfig(): FailurePercentageEjectionConfig | null {
return this.failurePercentageEjection;
}
getChildPolicy(): LoadBalancingConfig[] {
return this.childPolicy;
}
static createFromJson(obj: any): OutlierDetectionLoadBalancingConfig {
validatePositiveDuration(obj, 'interval');
validatePositiveDuration(obj, 'base_ejection_time');
validatePositiveDuration(obj, 'max_ejection_time');
validatePercentage(obj, 'max_ejection_percent');
if ('success_rate_ejection' in obj) {
if (typeof obj.success_rate_ejection !== 'object') {
throw new Error('outlier detection config success_rate_ejection must be an object');
}
validateFieldType(obj.success_rate_ejection, 'stdev_factor', 'number', 'success_rate_ejection');
validatePercentage(obj.success_rate_ejection, 'enforcement_percentage', 'success_rate_ejection');
validateFieldType(obj.success_rate_ejection, 'minimum_hosts', 'number', 'success_rate_ejection');
validateFieldType(obj.success_rate_ejection, 'request_volume', 'number', 'success_rate_ejection');
}
if ('failure_percentage_ejection' in obj) {
if (typeof obj.failure_percentage_ejection !== 'object') {
throw new Error('outlier detection config failure_percentage_ejection must be an object');
}
validatePercentage(obj.failure_percentage_ejection, 'threshold', 'failure_percentage_ejection');
validatePercentage(obj.failure_percentage_ejection, 'enforcement_percentage', 'failure_percentage_ejection');
validateFieldType(obj.failure_percentage_ejection, 'minimum_hosts', 'number', 'failure_percentage_ejection');
validateFieldType(obj.failure_percentage_ejection, 'request_volume', 'number', 'failure_percentage_ejection');
}
return new OutlierDetectionLoadBalancingConfig(
obj.interval ? durationToMs(obj.interval) : 10_000,
obj.base_ejection_time ? durationToMs(obj.base_ejection_time) : 30_000,
obj.max_ejection_time ? durationToMs(obj.max_ejection_time) : 300_000,
obj.max_ejection_percent ?? 10,
obj.success_rate_ejection ? {...defaultSuccessRateEjectionConfig, ...obj.success_rate_ejection} : null,
obj.failure_percentage_ejection ? {...defaultFailurePercentageEjectionConfig, ...obj.failure_percentage_ejection} : null,
obj.child_policy.map(validateLoadBalancingConfig)
);
}
}
class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface {
private childSubchannelState: ConnectivityState = ConnectivityState.IDLE;
private stateListeners: ConnectivityStateListener[] = [];
private ejected: boolean = false;
private refCount: number = 0;
constructor(childSubchannel: SubchannelInterface, private mapEntry?: MapEntry) {
super(childSubchannel);
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState) => {
this.childSubchannelState = newState;
if (!this.ejected) {
for (const listener of this.stateListeners) {
listener(this, previousState, newState);
}
}
});
}
/**
* Add a listener function to be called whenever the wrapper's
* connectivity state changes.
* @param listener
*/
addConnectivityStateListener(listener: ConnectivityStateListener) {
this.stateListeners.push(listener);
}
/**
* Remove a listener previously added with `addConnectivityStateListener`
* @param listener A reference to a function previously passed to
* `addConnectivityStateListener`
*/
removeConnectivityStateListener(listener: ConnectivityStateListener) {
const listenerIndex = this.stateListeners.indexOf(listener);
if (listenerIndex > -1) {
this.stateListeners.splice(listenerIndex, 1);
}
}
ref() {
this.child.ref();
this.refCount += 1;
}
unref() {
this.child.unref();
this.refCount -= 1;
if (this.refCount <= 0) {
if (this.mapEntry) {
const index = this.mapEntry.subchannelWrappers.indexOf(this);
if (index >= 0) {
this.mapEntry.subchannelWrappers.splice(index, 1);
}
}
}
}
eject() {
this.ejected = true;
for (const listener of this.stateListeners) {
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE);
}
}
uneject() {
this.ejected = false;
for (const listener of this.stateListeners) {
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState);
}
}
getMapEntry(): MapEntry | undefined {
return this.mapEntry;
}
getWrappedSubchannel(): SubchannelInterface {
return this.child;
}
}
interface CallCountBucket {
success: number;
failure: number;
}
function createEmptyBucket(): CallCountBucket {
return {
success: 0,
failure: 0
}
}
class CallCounter {
private activeBucket: CallCountBucket = createEmptyBucket();
private inactiveBucket: CallCountBucket = createEmptyBucket();
addSuccess() {
this.activeBucket.success += 1;
}
addFailure() {
this.activeBucket.failure += 1;
}
switchBuckets() {
this.inactiveBucket = this.activeBucket;
this.activeBucket = createEmptyBucket();
}
getLastSuccesses() {
return this.inactiveBucket.success;
}
getLastFailures() {
return this.inactiveBucket.failure;
}
}
interface MapEntry {
counter: CallCounter;
currentEjectionTimestamp: Date | null;
ejectionTimeMultiplier: number;
subchannelWrappers: OutlierDetectionSubchannelWrapper[];
}
class OutlierDetectionCounterFilter extends BaseFilter implements Filter {
constructor(private callCounter: CallCounter) {
super();
}
receiveTrailers(status: StatusObject): StatusObject {
if (status.code === Status.OK) {
this.callCounter.addSuccess();
} else {
this.callCounter.addFailure();
}
return status;
}
}
class OutlierDetectionCounterFilterFactory implements FilterFactory<OutlierDetectionCounterFilter> {
constructor(private callCounter: CallCounter) {}
createFilter(callStream: Call): OutlierDetectionCounterFilter {
return new OutlierDetectionCounterFilter(this.callCounter);
}
}
class OutlierDetectionPicker implements Picker {
constructor(private wrappedPicker: Picker) {}
pick(pickArgs: PickArgs): PickResult {
const wrappedPick = this.wrappedPicker.pick(pickArgs);
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
const subchannelWrapper = wrappedPick.subchannel as OutlierDetectionSubchannelWrapper;
const mapEntry = subchannelWrapper.getMapEntry();
if (mapEntry) {
return {
...wrappedPick,
subchannel: subchannelWrapper.getWrappedSubchannel(),
extraFilterFactories: [...wrappedPick.extraFilterFactories, new OutlierDetectionCounterFilterFactory(mapEntry.counter)]
};
} else {
return wrappedPick;
}
} else {
return wrappedPick;
}
}
}
export class OutlierDetectionLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private addressMap: Map<string, MapEntry> = new Map<string, MapEntry>();
private latestConfig: OutlierDetectionLoadBalancingConfig | null = null;
private ejectionTimer: NodeJS.Timer;
constructor(channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress: SubchannelAddress, subchannelArgs: ChannelOptions) => {
const originalSubchannel = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
const mapEntry = this.addressMap.get(subchannelAddressToString(subchannelAddress));
const subchannelWrapper = new OutlierDetectionSubchannelWrapper(originalSubchannel, mapEntry);
mapEntry?.subchannelWrappers.push(subchannelWrapper);
return subchannelWrapper;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
if (connectivityState === ConnectivityState.READY) {
channelControlHelper.updateState(connectivityState, new OutlierDetectionPicker(picker));
} else {
channelControlHelper.updateState(connectivityState, picker);
}
}
}));
this.ejectionTimer = setInterval(() => {}, 0);
clearInterval(this.ejectionTimer);
}
private getCurrentEjectionPercent() {
let ejectionCount = 0;
for (const mapEntry of this.addressMap.values()) {
if (mapEntry.currentEjectionTimestamp !== null) {
ejectionCount += 1;
}
}
return (ejectionCount * 100) / this.addressMap.size;
}
private runSuccessRateCheck(ejectionTimestamp: Date) {
if (!this.latestConfig) {
return;
}
const successRateConfig = this.latestConfig.getSuccessRateEjectionConfig();
if (!successRateConfig) {
return;
}
// Step 1
const targetRequestVolume = successRateConfig.request_volume;
let addresesWithTargetVolume = 0;
const successRates: number[] = []
for (const mapEntry of this.addressMap.values()) {
const successes = mapEntry.counter.getLastSuccesses();
const failures = mapEntry.counter.getLastFailures();
if (successes + failures >= targetRequestVolume) {
addresesWithTargetVolume += 1;
successRates.push(successes/(successes + failures));
}
}
if (addresesWithTargetVolume < successRateConfig.minimum_hosts) {
return;
}
// Step 2
const successRateMean = successRates.reduce((a, b) => a + b);
let successRateVariance = 0;
for (const rate of successRates) {
const deviation = rate - successRateMean;
successRateVariance += deviation * deviation;
}
const successRateStdev = Math.sqrt(successRateVariance);
const ejectionThreshold = successRateMean - successRateStdev * (successRateConfig.stdev_factor / 1000);
// Step 3
for (const mapEntry of this.addressMap.values()) {
// Step 3.i
if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) {
break;
}
// Step 3.ii
const successes = mapEntry.counter.getLastSuccesses();
const failures = mapEntry.counter.getLastFailures();
if (successes + failures < targetRequestVolume) {
continue;
}
// Step 3.iii
const successRate = successes / (successes + failures);
if (successRate < ejectionThreshold) {
const randomNumber = Math.random() * 100;
if (randomNumber < successRateConfig.enforcement_percentage) {
this.eject(mapEntry, ejectionTimestamp);
}
}
}
}
private runFailurePercentageCheck(ejectionTimestamp: Date) {
if (!this.latestConfig) {
return;
}
const failurePercentageConfig = this.latestConfig.getFailurePercentageEjectionConfig()
if (!failurePercentageConfig) {
return;
}
// Step 1
if (this.addressMap.size < failurePercentageConfig.minimum_hosts) {
return;
}
// Step 2
for (const mapEntry of this.addressMap.values()) {
// Step 2.i
if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) {
break;
}
// Step 2.ii
const successes = mapEntry.counter.getLastSuccesses();
const failures = mapEntry.counter.getLastFailures();
if (successes + failures < failurePercentageConfig.request_volume) {
continue;
}
// Step 2.iii
const failurePercentage = (failures * 100) / (failures + successes);
if (failurePercentage > failurePercentageConfig.threshold) {
const randomNumber = Math.random() * 100;
if (randomNumber < failurePercentageConfig.enforcement_percentage) {
this.eject(mapEntry, ejectionTimestamp);
}
}
}
}
private eject(mapEntry: MapEntry, ejectionTimestamp: Date) {
mapEntry.currentEjectionTimestamp = new Date();
mapEntry.ejectionTimeMultiplier += 1;
for (const subchannelWrapper of mapEntry.subchannelWrappers) {
subchannelWrapper.eject();
}
}
private uneject(mapEntry: MapEntry) {
mapEntry.currentEjectionTimestamp = null;
for (const subchannelWrapper of mapEntry.subchannelWrappers) {
subchannelWrapper.uneject();
}
}
private runChecks() {
const ejectionTimestamp = new Date();
for (const mapEntry of this.addressMap.values()) {
mapEntry.counter.switchBuckets();
}
if (!this.latestConfig) {
return;
}
this.runSuccessRateCheck(ejectionTimestamp);
this.runFailurePercentageCheck(ejectionTimestamp);
for (const mapEntry of this.addressMap.values()) {
if (mapEntry.currentEjectionTimestamp === null) {
if (mapEntry.ejectionTimeMultiplier > 0) {
mapEntry.ejectionTimeMultiplier -= 1;
}
} else {
const baseEjectionTimeMs = this.latestConfig.getBaseEjectionTimeMs();
const maxEjectionTimeMs = this.latestConfig.getMaxEjectionTimeMs();
const returnTime = new Date(mapEntry.currentEjectionTimestamp.getTime());
returnTime.setMilliseconds(returnTime.getMilliseconds() + Math.min(baseEjectionTimeMs * mapEntry.ejectionTimeMultiplier, Math.max(baseEjectionTimeMs, maxEjectionTimeMs)));
if (returnTime < new Date()) {
this.uneject(mapEntry);
}
}
}
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) {
return;
}
const subchannelAddresses = new Set<string>();
for (const address of addressList) {
subchannelAddresses.add(subchannelAddressToString(address));
}
for (const address of subchannelAddresses) {
if (!this.addressMap.has(address)) {
this.addressMap.set(address, {
counter: new CallCounter(),
currentEjectionTimestamp: null,
ejectionTimeMultiplier: 0,
subchannelWrappers: []
});
}
}
for (const key of this.addressMap.keys()) {
if (!subchannelAddresses.has(key)) {
this.addressMap.delete(key);
}
}
const childPolicy: LoadBalancingConfig = getFirstUsableConfig(
lbConfig.getChildPolicy(),
true
);
this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
if (this.latestConfig === null || this.latestConfig.getIntervalMs() !== lbConfig.getIntervalMs()) {
clearInterval(this.ejectionTimer);
this.ejectionTimer = setInterval(() => this.runChecks(), lbConfig.getIntervalMs());
}
this.latestConfig = lbConfig;
}
exitIdle(): void {
this.childBalancer.exitIdle();
}
resetBackoff(): void {
this.childBalancer.resetBackoff();
}
destroy(): void {
this.childBalancer.destroy();
}
getTypeName(): string {
return TYPE_NAME;
}
}
export function setup() {
if (OUTLIER_DETECTION_ENABLED) {
registerLoadBalancerType(TYPE_NAME, OutlierDetectionLoadBalancer, OutlierDetectionLoadBalancingConfig);
}
}

View File

@ -31,13 +31,13 @@ import {
PickResultType,
UnavailablePicker,
} from './picker';
import { Subchannel, ConnectivityStateListener } from './subchannel';
import {
SubchannelAddress,
subchannelAddressToString,
} from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { SubchannelInterface, ConnectivityStateListener } from './subchannel-interface';
const TRACER_NAME = 'pick_first';
@ -77,7 +77,7 @@ export class PickFirstLoadBalancingConfig implements LoadBalancingConfig {
* picked subchannel.
*/
class PickFirstPicker implements Picker {
constructor(private subchannel: Subchannel) {}
constructor(private subchannel: SubchannelInterface) {}
pick(pickArgs: PickArgs): CompletePickResult {
return {
@ -107,7 +107,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* The list of subchannels this load balancer is currently attempting to
* connect to.
*/
private subchannels: Subchannel[] = [];
private subchannels: SubchannelInterface[] = [];
/**
* The current connectivity state of the load balancer.
*/
@ -124,7 +124,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* and only if the load balancer's current state is READY. In that case,
* the subchannel's current state is also READY.
*/
private currentPick: Subchannel | null = null;
private currentPick: SubchannelInterface | null = null;
/**
* Listener callback attached to each subchannel in the `subchannels` list
* while establishing a connection.
@ -157,7 +157,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
[ConnectivityState.TRANSIENT_FAILURE]: 0,
};
this.subchannelStateListener = (
subchannel: Subchannel,
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
) => {
@ -219,7 +219,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
};
this.pickedSubchannelStateListener = (
subchannel: Subchannel,
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
) => {
@ -310,7 +310,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}, CONNECTION_DELAY_INTERVAL_MS);
}
private pickSubchannel(subchannel: Subchannel) {
private pickSubchannel(subchannel: SubchannelInterface) {
trace('Pick subchannel with address ' + subchannel.getAddress());
if (this.currentPick !== null) {
this.currentPick.unref();

View File

@ -30,13 +30,13 @@ import {
PickResultType,
UnavailablePicker,
} from './picker';
import { Subchannel, ConnectivityStateListener } from './subchannel';
import {
SubchannelAddress,
subchannelAddressToString,
} from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';
const TRACER_NAME = 'round_robin';
@ -67,7 +67,7 @@ class RoundRobinLoadBalancingConfig implements LoadBalancingConfig {
class RoundRobinPicker implements Picker {
constructor(
private readonly subchannelList: Subchannel[],
private readonly subchannelList: SubchannelInterface[],
private nextIndex = 0
) {}
@ -88,7 +88,7 @@ class RoundRobinPicker implements Picker {
* balancer implementation to preserve this part of the picker state if
* possible when a subchannel connects or disconnects.
*/
peekNextSubchannel(): Subchannel {
peekNextSubchannel(): SubchannelInterface {
return this.subchannelList[this.nextIndex];
}
}
@ -102,7 +102,7 @@ interface ConnectivityStateCounts {
}
export class RoundRobinLoadBalancer implements LoadBalancer {
private subchannels: Subchannel[] = [];
private subchannels: SubchannelInterface[] = [];
private currentState: ConnectivityState = ConnectivityState.IDLE;
@ -121,7 +121,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
[ConnectivityState.TRANSIENT_FAILURE]: 0,
};
this.subchannelStateListener = (
subchannel: Subchannel,
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
) => {

View File

@ -21,6 +21,7 @@ import { SubchannelAddress } from './subchannel-address';
import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker';
import { ChannelRef, SubchannelRef } from './channelz';
import { SubchannelInterface } from './subchannel-interface';
/**
* A collection of functions associated with a channel that a load balancer
@ -35,7 +36,7 @@ export interface ChannelControlHelper {
createSubchannel(
subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions
): Subchannel;
): SubchannelInterface;
/**
* Passes a new subchannel picker up to the channel. This is called if either
* the connectivity state changes or if a different picker is needed for any

View File

@ -21,6 +21,7 @@ import { Metadata } from './metadata';
import { Status } from './constants';
import { LoadBalancer } from './load-balancer';
import { FilterFactory, Filter } from './filter';
import { SubchannelInterface } from './subchannel-interface';
export enum PickResultType {
COMPLETE,
@ -36,7 +37,7 @@ export interface PickResult {
* `pickResultType` is COMPLETE. If null, indicates that the call should be
* dropped.
*/
subchannel: Subchannel | null;
subchannel: SubchannelInterface | null;
/**
* The status object to end the call with. Populated if and only if
* `pickResultType` is TRANSIENT_FAILURE.
@ -53,7 +54,7 @@ export interface PickResult {
export interface CompletePickResult extends PickResult {
pickResultType: PickResultType.COMPLETE;
subchannel: Subchannel | null;
subchannel: SubchannelInterface | null;
status: null;
extraFilterFactories: FilterFactory<Filter>[];
onCallStarted: (() => void) | null;

View File

@ -27,6 +27,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import * as os from 'os';
import { Duration } from './duration';
import {
LoadBalancingConfig,
validateLoadBalancingConfig,
@ -37,11 +38,6 @@ export interface MethodConfigName {
method?: string;
}
export interface Duration {
seconds: number;
nanos: number;
}
export interface MethodConfig {
name: MethodConfigName[];
waitForReady?: boolean;

View File

@ -0,0 +1,82 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { SubchannelRef } from "./channelz";
import { ConnectivityState } from "./connectivity-state";
import { Subchannel } from "./subchannel";
export type ConnectivityStateListener = (
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
) => void;
/**
* This is an interface for load balancing policies to use to interact with
* subchannels. This allows load balancing policies to wrap and unwrap
* subchannels.
*
* Any load balancing policy that wraps subchannels must unwrap the subchannel
* in the picker, so that other load balancing policies consistently have
* access to their own wrapper objects.
*/
export interface SubchannelInterface {
getConnectivityState(): ConnectivityState;
addConnectivityStateListener(listener: ConnectivityStateListener): void;
removeConnectivityStateListener(listener: ConnectivityStateListener): void;
startConnecting(): void;
getAddress(): string;
ref(): void;
unref(): void;
getChannelzRef(): SubchannelRef;
/**
* If this is a wrapper, return the wrapped subchannel, otherwise return this
*/
getRealSubchannel(): Subchannel;
}
export abstract class BaseSubchannelWrapper implements SubchannelInterface {
constructor(protected child: SubchannelInterface) {}
getConnectivityState(): ConnectivityState {
return this.child.getConnectivityState();
}
addConnectivityStateListener(listener: ConnectivityStateListener): void {
this.child.addConnectivityStateListener(listener);
}
removeConnectivityStateListener(listener: ConnectivityStateListener): void {
this.child.removeConnectivityStateListener(listener);
}
startConnecting(): void {
this.child.startConnecting();
}
getAddress(): string {
return this.child.getAddress();
}
ref(): void {
this.child.ref();
}
unref(): void {
this.child.unref();
}
getChannelzRef(): SubchannelRef {
return this.child.getChannelzRef();
}
getRealSubchannel(): Subchannel {
return this.child.getRealSubchannel();
}
}

View File

@ -37,6 +37,7 @@ import {
subchannelAddressToString,
} from './subchannel-address';
import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz';
import { ConnectivityStateListener } from './subchannel-interface';
const clientVersion = require('../../package.json').version;
@ -54,12 +55,6 @@ const BACKOFF_JITTER = 0.2;
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000;
export type ConnectivityStateListener = (
subchannel: Subchannel,
previousState: ConnectivityState,
newState: ConnectivityState
) => void;
export interface SubchannelCallStatsTracker {
addMessageSent(): void;
addMessageReceived(): void;
@ -949,4 +944,8 @@ export class Subchannel {
getChannelzRef(): SubchannelRef {
return this.channelzRef;
}
getRealSubchannel(): this {
return this;
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as assert from 'assert';
import * as path from 'path';
import * as grpc from '../src';
import { loadProtoFile } from './common';
function multiDone(done: Mocha.Done, target: number) {
let count = 0;
return (error?: any) => {
if (error) {
done(error);
}
count++;
if (count >= target) {
done();
}
}
}
const defaultOutlierDetectionServiceConfig = {
methodConfig: [],
loadBalancingConfig: [
{
outlier_detection: {
success_rate_ejection: {},
failure_percentage_ejection: {},
child_policy: [{round_robin: {}}]
}
}
]
};
const defaultOutlierDetectionServiceConfigString = JSON.stringify(defaultOutlierDetectionServiceConfig);
const goodService = {
echo: (call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) => {
callback(null, call.request)
}
};
const badService = {
echo: (call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) => {
callback({
code: grpc.status.PERMISSION_DENIED,
details: 'Permission denied'
})
}
}
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const EchoService = loadProtoFile(protoFile)
.EchoService as grpc.ServiceClientConstructor;
describe('Outlier detection', () => {
const GOOD_PORTS = 4;
let goodServer: grpc.Server;
let badServer: grpc.Server;
const goodPorts: number[] = [];
let badPort: number;
before(done => {
const eachDone = multiDone(() => {
goodServer.start();
badServer.start();
done();
}, GOOD_PORTS + 1);
goodServer = new grpc.Server();
goodServer.addService(EchoService.service, goodService);
for (let i = 0; i < GOOD_PORTS; i++) {
goodServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
if (error) {
eachDone(error);
return;
}
goodPorts.push(port);
eachDone();
});
}
badServer = new grpc.Server();
badServer.addService(EchoService.service, badService);
badServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
if (error) {
eachDone(error);
return;
}
badPort = port;
eachDone();
});
});
after(() => {
goodServer.forceShutdown();
badServer.forceShutdown();
});
it('Should allow normal operation with one server', done => {
const client = new EchoService(`localhost:${goodPorts[0]}`, grpc.credentials.createInsecure(), {'grpc.service_config': defaultOutlierDetectionServiceConfigString});
client.echo(
{ value: 'test value', value2: 3 },
(error: grpc.ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
done();
}
);
});
});