grpc-js: Add LrsLoadBalancer class

This commit is contained in:
Michael Lumish 2020-07-28 11:40:42 -07:00
parent 209c224094
commit 669d254045
5 changed files with 295 additions and 14 deletions

View File

@ -297,16 +297,33 @@ export class EdsLoadBalancer implements LoadBalancer {
WeightedTarget
>();
for (const localityObj of localityArray) {
/* Use the endpoint picking policy from the config, default to
* round_robin. */
const endpointPickingPolicy: LoadBalancingConfig[] = [
...this.lastestConfig.eds.endpointPickingPolicy,
{ name: 'round_robin', round_robin: {} },
];
let childPolicy: LoadBalancingConfig[];
if (this.lastestConfig.eds.lrsLoadReportingServerName) {
childPolicy = [
{
name: 'lrs',
lrs: {
cluster_name: this.lastestConfig.eds.cluster,
eds_service_name: this.lastestConfig.eds.edsServiceName ?? '',
lrs_load_reporting_server_name: this.lastestConfig.eds
.lrsLoadReportingServerName,
locality: localityObj.locality,
child_policy: endpointPickingPolicy,
},
},
];
} else {
childPolicy = endpointPickingPolicy;
}
childTargets.set(localityToName(localityObj.locality), {
weight: localityObj.weight,
/* TODO(murgatroid99): Insert an lrs config around the round_robin
* config after implementing lrs */
/* Use the endpoint picking policy from the config, default to
* round_robin. */
child_policy: [
...this.lastestConfig.eds.endpointPickingPolicy,
{ name: 'round_robin', round_robin: {} },
],
child_policy: childPolicy,
});
for (const address of localityObj.addresses) {
addressList.push({

View File

@ -0,0 +1,169 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import {
LoadBalancer,
ChannelControlHelper,
registerLoadBalancerType,
getFirstUsableConfig,
} from './load-balancer';
import { SubchannelAddress } from './subchannel';
import {
LoadBalancingConfig,
isLrsLoadBalancingConfig,
} from './load-balancing-config';
import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { ConnectivityState } from './channel';
import { Picker, PickArgs, PickResultType, PickResult } from './picker';
import { XdsClusterLocalityStats, XdsClient } from './xds-client';
import { Filter, BaseFilter, FilterFactory } from './filter';
import { StatusObject, Call } from './call-stream';
import { Status } from './constants';
import { FilterStackFactory } from './filter-stack';
const TYPE_NAME = 'lrs';
/**
* Filter class that reports when the call ends.
*/
class CallEndTrackingFilter extends BaseFilter implements Filter {
constructor(private localityStatsReporter: XdsClusterLocalityStats) {
super();
}
receiveTrailers(status: StatusObject) {
this.localityStatsReporter.addCallFinished(status.code !== Status.OK);
return status;
}
}
class CallEndTrackingFilterFactory
implements FilterFactory<CallEndTrackingFilter> {
constructor(private localityStatsReporter: XdsClusterLocalityStats) {}
createFilter(callStream: Call): CallEndTrackingFilter {
return new CallEndTrackingFilter(this.localityStatsReporter);
}
}
/**
* Picker that delegates picking to another picker, and reports when calls
* created using those picks start and end.
*/
class LoadReportingPicker implements Picker {
constructor(
private wrappedPicker: Picker,
private localityStatsReporter: XdsClusterLocalityStats
) {}
pick(pickArgs: PickArgs): PickResult {
const wrappedPick = this.wrappedPicker.pick(pickArgs);
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
const trackingFilterFactory = new CallEndTrackingFilterFactory(
this.localityStatsReporter
);
/* In the unlikely event that the wrappedPick already has an
* extraFilterFactory, preserve it in a FilterStackFactory. */
const extraFilterFactory = wrappedPick.extraFilterFactory
? new FilterStackFactory([
wrappedPick.extraFilterFactory,
trackingFilterFactory,
])
: trackingFilterFactory;
return {
pickResultType: PickResultType.COMPLETE,
subchannel: wrappedPick.subchannel,
status: null,
onCallStarted: () => {
wrappedPick.onCallStarted?.();
this.localityStatsReporter.addCallStarted();
},
extraFilterFactory: extraFilterFactory,
};
} else {
return wrappedPick;
}
}
}
/**
* "Load balancer" that delegates the actual load balancing logic to another
* LoadBalancer class and adds hooks to track when calls started using that
* LoadBalancer start and end, and uses the XdsClient to report that
* information back to the xDS server.
*/
export class LrsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private localityStatsReporter: XdsClusterLocalityStats | null = null;
constructor(private channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler({
createSubchannel: (subchannelAddress, subchannelArgs) =>
channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs
),
requestReresolution: () => channelControlHelper.requestReresolution(),
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
if (this.localityStatsReporter !== null) {
picker = new LoadReportingPicker(picker, this.localityStatsReporter);
}
channelControlHelper.updateState(connectivityState, picker);
},
});
}
updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
if (!isLrsLoadBalancingConfig(lbConfig)) {
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
return;
}
const lrsConfig = lbConfig.lrs;
this.localityStatsReporter = attributes.xdsClient.addClusterLocalityStats(
lrsConfig.lrs_load_reporting_server_name,
lrsConfig.cluster_name,
lrsConfig.eds_service_name,
lrsConfig.locality
);
const childPolicy: LoadBalancingConfig = getFirstUsableConfig(
lrsConfig.child_policy
) ?? { name: 'pick_first', pick_first: {} };
this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
}
exitIdle(): void {
this.childBalancer.exitIdle();
}
resetBackoff(): void {
this.childBalancer.resetBackoff();
}
destroy(): void {
this.childBalancer.destroy();
}
getTypeName(): string {
return TYPE_NAME;
}
}
export function setup() {
registerLoadBalancerType(TYPE_NAME, LrsLoadBalancer);
}

View File

@ -26,6 +26,7 @@ import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
import * as load_balancer_eds from './load-balancer-eds';
import * as load_balancer_cds from './load-balancer-cds';
import * as load_balancer_lrs from './load-balancer-lrs';
/**
* A collection of functions associated with a channel that a load balancer
@ -145,4 +146,5 @@ export function registerAll() {
load_balancer_weighted_target.setup();
load_balancer_eds.setup();
load_balancer_cds.setup();
load_balancer_lrs.setup();
}

View File

@ -15,6 +15,8 @@
*
*/
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
/* This file is an implementation of gRFC A24:
* https://github.com/grpc/proposal/blob/master/A24-lb-policy-config.md. Each
* function here takes an object with unknown structure and returns its
@ -79,6 +81,14 @@ export interface CdsLbConfig {
cluster: string;
}
export interface LrsLbConfig {
cluster_name: string;
eds_service_name: string;
lrs_load_reporting_server_name: string;
locality: Locality__Output;
child_policy: LoadBalancingConfig[];
}
export interface PickFirstLoadBalancingConfig {
name: 'pick_first';
pick_first: PickFirstConfig;
@ -119,6 +129,11 @@ export interface CdsLoadBalancingConfig {
cds: CdsLbConfig;
}
export interface LrsLoadBalancingConfig {
name: 'lrs';
lrs: LrsLbConfig;
}
export type LoadBalancingConfig =
| PickFirstLoadBalancingConfig
| RoundRobinLoadBalancingConfig
@ -127,7 +142,8 @@ export type LoadBalancingConfig =
| PriorityLoadBalancingConfig
| WeightedTargetLoadBalancingConfig
| EdsLoadBalancingConfig
| CdsLoadBalancingConfig;
| CdsLoadBalancingConfig
| LrsLoadBalancingConfig;
export function isRoundRobinLoadBalancingConfig(
lbconfig: LoadBalancingConfig
@ -171,6 +187,12 @@ export function isCdsLoadBalancingConfig(
return lbconfig.name === 'cds';
}
export function isLrsLoadBalancingConfig(
lbconfig: LoadBalancingConfig
): lbconfig is LrsLoadBalancingConfig {
return lbconfig.name === 'lrs';
}
/* In these functions we assume the input came from a JSON object. Therefore we
* expect that the prototype is uninteresting and that `in` can be used
* effectively */

View File

@ -38,7 +38,10 @@ import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import { LoadReportingServiceClient } from './generated/envoy/service/load_stats/v2/LoadReportingService';
import { LoadStatsRequest } from './generated/envoy/service/load_stats/v2/LoadStatsRequest';
import { LoadStatsResponse__Output } from './generated/envoy/service/load_stats/v2/LoadStatsResponse';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import {
Locality__Output,
Locality,
} from './generated/envoy/api/v2/core/Locality';
import {
ClusterStats,
_envoy_api_v2_endpoint_ClusterStats_DroppedRequests,
@ -99,6 +102,17 @@ function loadAdsProtos(): Promise<
return loadedProtos;
}
function localityEqual(
loc1: Locality__Output,
loc2: Locality__Output
): boolean {
return (
loc1.region === loc2.region &&
loc1.zone === loc2.zone &&
loc1.sub_zone === loc2.sub_zone
);
}
export interface Watcher<UpdateType> {
onValidUpdate(update: UpdateType): void;
onTransientError(error: StatusObject): void;
@ -109,6 +123,11 @@ export interface XdsClusterDropStats {
addCallDropped(category: string): void;
}
export interface XdsClusterLocalityStats {
addCallStarted(): void;
addCallFinished(fail: boolean): void;
}
interface ClusterLocalityStats {
locality: Locality__Output;
callsStarted: number;
@ -792,12 +811,12 @@ export class XdsClient {
}
/**
*
*
* @param lrsServer The target name of the server to send stats to. An empty
* string indicates that the default LRS client should be used. Currently
* only the empty string is supported here.
* @param clusterName
* @param edsServiceName
* @param clusterName
* @param edsServiceName
*/
addClusterDropStats(
lrsServer: string,
@ -806,7 +825,7 @@ export class XdsClient {
): XdsClusterDropStats {
if (lrsServer !== '') {
return {
addCallDropped: category => {}
addCallDropped: (category) => {},
};
}
const clusterStats = this.clusterStatsMap.getOrCreate(
@ -821,6 +840,58 @@ export class XdsClient {
};
}
addClusterLocalityStats(
lrsServer: string,
clusterName: string,
edsServiceName: string,
locality: Locality__Output
): XdsClusterLocalityStats {
if (lrsServer !== '') {
return {
addCallStarted: () => {},
addCallFinished: (fail) => {},
};
}
const clusterStats = this.clusterStatsMap.getOrCreate(
clusterName,
edsServiceName
);
let localityStats: ClusterLocalityStats | null = null;
for (const statsObj of clusterStats.localityStats) {
if (localityEqual(locality, statsObj.locality)) {
localityStats = statsObj;
break;
}
}
if (localityStats === null) {
localityStats = {
locality,
callsInProgress: 0,
callsStarted: 0,
callsSucceeded: 0,
callsFailed: 0,
};
clusterStats.localityStats.push(localityStats);
}
/* Help the compiler understand that this object is always non-null in the
* closure */
const finalLocalityStats: ClusterLocalityStats = localityStats;
return {
addCallStarted: () => {
finalLocalityStats.callsSucceeded += 1;
finalLocalityStats.callsInProgress += 1;
},
addCallFinished: (fail) => {
if (fail) {
finalLocalityStats.callsFailed += 1;
} else {
finalLocalityStats.callsSucceeded += 1;
}
finalLocalityStats.callsInProgress -= 1;
},
};
}
shutdown(): void {
this.adsCall?.cancel();
this.adsClient?.close();