From 669d25404587806c20483efaa46aa787d495876f Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 28 Jul 2020 11:40:42 -0700 Subject: [PATCH 1/2] grpc-js: Add LrsLoadBalancer class --- packages/grpc-js/src/load-balancer-eds.ts | 33 +++- packages/grpc-js/src/load-balancer-lrs.ts | 169 ++++++++++++++++++ packages/grpc-js/src/load-balancer.ts | 2 + packages/grpc-js/src/load-balancing-config.ts | 24 ++- packages/grpc-js/src/xds-client.ts | 81 ++++++++- 5 files changed, 295 insertions(+), 14 deletions(-) create mode 100644 packages/grpc-js/src/load-balancer-lrs.ts diff --git a/packages/grpc-js/src/load-balancer-eds.ts b/packages/grpc-js/src/load-balancer-eds.ts index bc812492..57aac25b 100644 --- a/packages/grpc-js/src/load-balancer-eds.ts +++ b/packages/grpc-js/src/load-balancer-eds.ts @@ -297,16 +297,33 @@ export class EdsLoadBalancer implements LoadBalancer { WeightedTarget >(); for (const localityObj of localityArray) { + /* Use the endpoint picking policy from the config, default to + * round_robin. */ + const endpointPickingPolicy: LoadBalancingConfig[] = [ + ...this.lastestConfig.eds.endpointPickingPolicy, + { name: 'round_robin', round_robin: {} }, + ]; + let childPolicy: LoadBalancingConfig[]; + if (this.lastestConfig.eds.lrsLoadReportingServerName) { + childPolicy = [ + { + name: 'lrs', + lrs: { + cluster_name: this.lastestConfig.eds.cluster, + eds_service_name: this.lastestConfig.eds.edsServiceName ?? '', + lrs_load_reporting_server_name: this.lastestConfig.eds + .lrsLoadReportingServerName, + locality: localityObj.locality, + child_policy: endpointPickingPolicy, + }, + }, + ]; + } else { + childPolicy = endpointPickingPolicy; + } childTargets.set(localityToName(localityObj.locality), { weight: localityObj.weight, - /* TODO(murgatroid99): Insert an lrs config around the round_robin - * config after implementing lrs */ - /* Use the endpoint picking policy from the config, default to - * round_robin. */ - child_policy: [ - ...this.lastestConfig.eds.endpointPickingPolicy, - { name: 'round_robin', round_robin: {} }, - ], + child_policy: childPolicy, }); for (const address of localityObj.addresses) { addressList.push({ diff --git a/packages/grpc-js/src/load-balancer-lrs.ts b/packages/grpc-js/src/load-balancer-lrs.ts new file mode 100644 index 00000000..da09593b --- /dev/null +++ b/packages/grpc-js/src/load-balancer-lrs.ts @@ -0,0 +1,169 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + LoadBalancer, + ChannelControlHelper, + registerLoadBalancerType, + getFirstUsableConfig, +} from './load-balancer'; +import { SubchannelAddress } from './subchannel'; +import { + LoadBalancingConfig, + isLrsLoadBalancingConfig, +} from './load-balancing-config'; +import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; +import { ConnectivityState } from './channel'; +import { Picker, PickArgs, PickResultType, PickResult } from './picker'; +import { XdsClusterLocalityStats, XdsClient } from './xds-client'; +import { Filter, BaseFilter, FilterFactory } from './filter'; +import { StatusObject, Call } from './call-stream'; +import { Status } from './constants'; +import { FilterStackFactory } from './filter-stack'; + +const TYPE_NAME = 'lrs'; + +/** + * Filter class that reports when the call ends. + */ +class CallEndTrackingFilter extends BaseFilter implements Filter { + constructor(private localityStatsReporter: XdsClusterLocalityStats) { + super(); + } + + receiveTrailers(status: StatusObject) { + this.localityStatsReporter.addCallFinished(status.code !== Status.OK); + return status; + } +} + +class CallEndTrackingFilterFactory + implements FilterFactory { + constructor(private localityStatsReporter: XdsClusterLocalityStats) {} + + createFilter(callStream: Call): CallEndTrackingFilter { + return new CallEndTrackingFilter(this.localityStatsReporter); + } +} + +/** + * Picker that delegates picking to another picker, and reports when calls + * created using those picks start and end. + */ +class LoadReportingPicker implements Picker { + constructor( + private wrappedPicker: Picker, + private localityStatsReporter: XdsClusterLocalityStats + ) {} + + pick(pickArgs: PickArgs): PickResult { + const wrappedPick = this.wrappedPicker.pick(pickArgs); + if (wrappedPick.pickResultType === PickResultType.COMPLETE) { + const trackingFilterFactory = new CallEndTrackingFilterFactory( + this.localityStatsReporter + ); + /* In the unlikely event that the wrappedPick already has an + * extraFilterFactory, preserve it in a FilterStackFactory. */ + const extraFilterFactory = wrappedPick.extraFilterFactory + ? new FilterStackFactory([ + wrappedPick.extraFilterFactory, + trackingFilterFactory, + ]) + : trackingFilterFactory; + return { + pickResultType: PickResultType.COMPLETE, + subchannel: wrappedPick.subchannel, + status: null, + onCallStarted: () => { + wrappedPick.onCallStarted?.(); + this.localityStatsReporter.addCallStarted(); + }, + extraFilterFactory: extraFilterFactory, + }; + } else { + return wrappedPick; + } + } +} + +/** + * "Load balancer" that delegates the actual load balancing logic to another + * LoadBalancer class and adds hooks to track when calls started using that + * LoadBalancer start and end, and uses the XdsClient to report that + * information back to the xDS server. + */ +export class LrsLoadBalancer implements LoadBalancer { + private childBalancer: ChildLoadBalancerHandler; + private localityStatsReporter: XdsClusterLocalityStats | null = null; + + constructor(private channelControlHelper: ChannelControlHelper) { + this.childBalancer = new ChildLoadBalancerHandler({ + createSubchannel: (subchannelAddress, subchannelArgs) => + channelControlHelper.createSubchannel( + subchannelAddress, + subchannelArgs + ), + requestReresolution: () => channelControlHelper.requestReresolution(), + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + if (this.localityStatsReporter !== null) { + picker = new LoadReportingPicker(picker, this.localityStatsReporter); + } + channelControlHelper.updateState(connectivityState, picker); + }, + }); + } + + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + if (!isLrsLoadBalancingConfig(lbConfig)) { + return; + } + if (!(attributes.xdsClient instanceof XdsClient)) { + return; + } + const lrsConfig = lbConfig.lrs; + this.localityStatsReporter = attributes.xdsClient.addClusterLocalityStats( + lrsConfig.lrs_load_reporting_server_name, + lrsConfig.cluster_name, + lrsConfig.eds_service_name, + lrsConfig.locality + ); + const childPolicy: LoadBalancingConfig = getFirstUsableConfig( + lrsConfig.child_policy + ) ?? { name: 'pick_first', pick_first: {} }; + this.childBalancer.updateAddressList(addressList, childPolicy, attributes); + } + exitIdle(): void { + this.childBalancer.exitIdle(); + } + resetBackoff(): void { + this.childBalancer.resetBackoff(); + } + destroy(): void { + this.childBalancer.destroy(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, LrsLoadBalancer); +} diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index 227bbe9c..56dd06f3 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -26,6 +26,7 @@ import * as load_balancer_priority from './load-balancer-priority'; import * as load_balancer_weighted_target from './load-balancer-weighted-target'; import * as load_balancer_eds from './load-balancer-eds'; import * as load_balancer_cds from './load-balancer-cds'; +import * as load_balancer_lrs from './load-balancer-lrs'; /** * A collection of functions associated with a channel that a load balancer @@ -145,4 +146,5 @@ export function registerAll() { load_balancer_weighted_target.setup(); load_balancer_eds.setup(); load_balancer_cds.setup(); + load_balancer_lrs.setup(); } diff --git a/packages/grpc-js/src/load-balancing-config.ts b/packages/grpc-js/src/load-balancing-config.ts index 92c4b2b9..d4d7792e 100644 --- a/packages/grpc-js/src/load-balancing-config.ts +++ b/packages/grpc-js/src/load-balancing-config.ts @@ -15,6 +15,8 @@ * */ +import { Locality__Output } from './generated/envoy/api/v2/core/Locality'; + /* This file is an implementation of gRFC A24: * https://github.com/grpc/proposal/blob/master/A24-lb-policy-config.md. Each * function here takes an object with unknown structure and returns its @@ -79,6 +81,14 @@ export interface CdsLbConfig { cluster: string; } +export interface LrsLbConfig { + cluster_name: string; + eds_service_name: string; + lrs_load_reporting_server_name: string; + locality: Locality__Output; + child_policy: LoadBalancingConfig[]; +} + export interface PickFirstLoadBalancingConfig { name: 'pick_first'; pick_first: PickFirstConfig; @@ -119,6 +129,11 @@ export interface CdsLoadBalancingConfig { cds: CdsLbConfig; } +export interface LrsLoadBalancingConfig { + name: 'lrs'; + lrs: LrsLbConfig; +} + export type LoadBalancingConfig = | PickFirstLoadBalancingConfig | RoundRobinLoadBalancingConfig @@ -127,7 +142,8 @@ export type LoadBalancingConfig = | PriorityLoadBalancingConfig | WeightedTargetLoadBalancingConfig | EdsLoadBalancingConfig - | CdsLoadBalancingConfig; + | CdsLoadBalancingConfig + | LrsLoadBalancingConfig; export function isRoundRobinLoadBalancingConfig( lbconfig: LoadBalancingConfig @@ -171,6 +187,12 @@ export function isCdsLoadBalancingConfig( return lbconfig.name === 'cds'; } +export function isLrsLoadBalancingConfig( + lbconfig: LoadBalancingConfig +): lbconfig is LrsLoadBalancingConfig { + return lbconfig.name === 'lrs'; +} + /* In these functions we assume the input came from a JSON object. Therefore we * expect that the prototype is uninteresting and that `in` can be used * effectively */ diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index 165e545b..5460c52b 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -38,7 +38,10 @@ import { Cluster__Output } from './generated/envoy/api/v2/Cluster'; import { LoadReportingServiceClient } from './generated/envoy/service/load_stats/v2/LoadReportingService'; import { LoadStatsRequest } from './generated/envoy/service/load_stats/v2/LoadStatsRequest'; import { LoadStatsResponse__Output } from './generated/envoy/service/load_stats/v2/LoadStatsResponse'; -import { Locality__Output } from './generated/envoy/api/v2/core/Locality'; +import { + Locality__Output, + Locality, +} from './generated/envoy/api/v2/core/Locality'; import { ClusterStats, _envoy_api_v2_endpoint_ClusterStats_DroppedRequests, @@ -99,6 +102,17 @@ function loadAdsProtos(): Promise< return loadedProtos; } +function localityEqual( + loc1: Locality__Output, + loc2: Locality__Output +): boolean { + return ( + loc1.region === loc2.region && + loc1.zone === loc2.zone && + loc1.sub_zone === loc2.sub_zone + ); +} + export interface Watcher { onValidUpdate(update: UpdateType): void; onTransientError(error: StatusObject): void; @@ -109,6 +123,11 @@ export interface XdsClusterDropStats { addCallDropped(category: string): void; } +export interface XdsClusterLocalityStats { + addCallStarted(): void; + addCallFinished(fail: boolean): void; +} + interface ClusterLocalityStats { locality: Locality__Output; callsStarted: number; @@ -792,12 +811,12 @@ export class XdsClient { } /** - * + * * @param lrsServer The target name of the server to send stats to. An empty * string indicates that the default LRS client should be used. Currently * only the empty string is supported here. - * @param clusterName - * @param edsServiceName + * @param clusterName + * @param edsServiceName */ addClusterDropStats( lrsServer: string, @@ -806,7 +825,7 @@ export class XdsClient { ): XdsClusterDropStats { if (lrsServer !== '') { return { - addCallDropped: category => {} + addCallDropped: (category) => {}, }; } const clusterStats = this.clusterStatsMap.getOrCreate( @@ -821,6 +840,58 @@ export class XdsClient { }; } + addClusterLocalityStats( + lrsServer: string, + clusterName: string, + edsServiceName: string, + locality: Locality__Output + ): XdsClusterLocalityStats { + if (lrsServer !== '') { + return { + addCallStarted: () => {}, + addCallFinished: (fail) => {}, + }; + } + const clusterStats = this.clusterStatsMap.getOrCreate( + clusterName, + edsServiceName + ); + let localityStats: ClusterLocalityStats | null = null; + for (const statsObj of clusterStats.localityStats) { + if (localityEqual(locality, statsObj.locality)) { + localityStats = statsObj; + break; + } + } + if (localityStats === null) { + localityStats = { + locality, + callsInProgress: 0, + callsStarted: 0, + callsSucceeded: 0, + callsFailed: 0, + }; + clusterStats.localityStats.push(localityStats); + } + /* Help the compiler understand that this object is always non-null in the + * closure */ + const finalLocalityStats: ClusterLocalityStats = localityStats; + return { + addCallStarted: () => { + finalLocalityStats.callsSucceeded += 1; + finalLocalityStats.callsInProgress += 1; + }, + addCallFinished: (fail) => { + if (fail) { + finalLocalityStats.callsFailed += 1; + } else { + finalLocalityStats.callsSucceeded += 1; + } + finalLocalityStats.callsInProgress -= 1; + }, + }; + } + shutdown(): void { this.adsCall?.cancel(); this.adsClient?.close(); From a3b27be21158daff53dc2a3802969cbf12022903 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 28 Jul 2020 15:43:39 -0700 Subject: [PATCH 2/2] Address a couple of comments --- packages/grpc-js/src/xds-client.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index 5460c52b..fc2aa17f 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -865,7 +865,7 @@ export class XdsClient { } if (localityStats === null) { localityStats = { - locality, + locality: locality, callsInProgress: 0, callsStarted: 0, callsSucceeded: 0, @@ -878,7 +878,7 @@ export class XdsClient { const finalLocalityStats: ClusterLocalityStats = localityStats; return { addCallStarted: () => { - finalLocalityStats.callsSucceeded += 1; + finalLocalityStats.callsStarted += 1; finalLocalityStats.callsInProgress += 1; }, addCallFinished: (fail) => {