diff --git a/PACKAGE-COMPARISON.md b/PACKAGE-COMPARISON.md index 6d748e65..8a62c914 100644 --- a/PACKAGE-COMPARISON.md +++ b/PACKAGE-COMPARISON.md @@ -36,5 +36,6 @@ In addition, all channel arguments defined in [this header file](https://github. - `grpc.default_authority` - `grpc.keepalive_time_ms` - `grpc.keepalive_timeout_ms` + - `grpc.service_config` - `channelOverride` - `channelFactoryOverride` diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index b7145970..ad905b9f 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -25,6 +25,7 @@ export interface ChannelOptions { 'grpc.default_authority'?: string; 'grpc.keepalive_time_ms'?: number; 'grpc.keepalive_timeout_ms'?: number; + 'grpc.service_config'?: string; [key: string]: string | number | undefined; } @@ -39,6 +40,7 @@ export const recognizedOptions = { 'grpc.default_authority': true, 'grpc.keepalive_time_ms': true, 'grpc.keepalive_timeout_ms': true, + 'grpc.service_config': true, }; export function channelOptionsEqual( diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index b2fdb654..c06c3978 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -36,7 +36,7 @@ import { MetadataStatusFilterFactory } from './metadata-status-filter'; import { CompressionFilterFactory } from './compression-filter'; import { getDefaultAuthority } from './resolver'; import { LoadBalancingConfig } from './load-balancing-config'; -import { ServiceConfig } from './service-config'; +import { ServiceConfig, validateServiceConfig } from './service-config'; export enum ConnectivityState { CONNECTING, @@ -159,10 +159,13 @@ export class ChannelImplementation implements Channel { }, }; // TODO(murgatroid99): check channel arg for default service config - const defaultServiceConfig: ServiceConfig = { + let defaultServiceConfig: ServiceConfig = { loadBalancingConfig: [], methodConfig: [], }; + if (options['grpc.service_config']) { + defaultServiceConfig = validateServiceConfig(JSON.parse(options['grpc.service_config']!)); + } this.resolvingLoadBalancer = new ResolvingLoadBalancer( target, channelControlHelper, diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts new file mode 100644 index 00000000..72af705d --- /dev/null +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -0,0 +1,187 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + LoadBalancer, + ChannelControlHelper, + registerLoadBalancerType, +} from './load-balancer'; +import { ConnectivityState } from './channel'; +import { + QueuePicker, + Picker, + PickArgs, + CompletePickResult, + PickResultType, + UnavailablePicker, +} from './picker'; +import { LoadBalancingConfig } from './load-balancing-config'; +import { Subchannel, ConnectivityStateListener } from './subchannel'; + +const TYPE_NAME = 'round_robin'; + +class RoundRobinPicker implements Picker { + constructor(private readonly subchannelList: Subchannel[], private nextIndex = 0) {} + + pick(pickArgs: PickArgs): CompletePickResult { + const pickedSubchannel = this.subchannelList[this.nextIndex]; + this.nextIndex = (this.nextIndex + 1) % this.subchannelList.length; + return { + pickResultType: PickResultType.COMPLETE, + subchannel: pickedSubchannel, + status: null + }; + } + + /** + * Check what the next subchannel returned would be. Used by the load + * balancer implementation to preserve this part of the picker state if + * possible when a subchannel connects or disconnects. + */ + peekNextSubchannel(): Subchannel { + return this.subchannelList[this.nextIndex]; + } +} + +interface ConnectivityStateCounts { + [ConnectivityState.CONNECTING]: number, + [ConnectivityState.IDLE]: number, + [ConnectivityState.READY]: number, + [ConnectivityState.SHUTDOWN]: number, + [ConnectivityState.TRANSIENT_FAILURE]: number +} + +export class RoundRobinLoadBalancer implements LoadBalancer { + + private subchannels: Subchannel[] = []; + + private currentState: ConnectivityState = ConnectivityState.IDLE; + + private subchannelStateListener: ConnectivityStateListener; + + private subchannelStateCounts: ConnectivityStateCounts; + + private currentReadyPicker: RoundRobinPicker | null = null; + + constructor(private channelControlHelper: ChannelControlHelper) { + this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); + this.subchannelStateCounts = { + [ConnectivityState.CONNECTING]: 0, + [ConnectivityState.IDLE]: 0, + [ConnectivityState.READY]: 0, + [ConnectivityState.SHUTDOWN]: 0, + [ConnectivityState.TRANSIENT_FAILURE]: 0 + }; + this.subchannelStateListener = ( + subchannel: Subchannel, + previousState: ConnectivityState, + newState: ConnectivityState + ) => { + this.subchannelStateCounts[previousState] -= 1; + this.subchannelStateCounts[newState] += 1; + this.calculateAndUpdateState(); + + if (newState === ConnectivityState.TRANSIENT_FAILURE) { + this.channelControlHelper.requestReresolution(); + } + if (newState === ConnectivityState.TRANSIENT_FAILURE || newState === ConnectivityState.IDLE) { + subchannel.startConnecting(); + } + } + } + + private calculateAndUpdateState() { + if (this.subchannelStateCounts[ConnectivityState.READY] > 0) { + const readySubchannels = this.subchannels.filter(subchannel => subchannel.getConnectivityState() === ConnectivityState.READY); + let index: number = 0; + if (this.currentReadyPicker !== null) { + index = readySubchannels.indexOf(this.currentReadyPicker.peekNextSubchannel()); + if (index < 0) { + index = 0; + } + } + this.updateState(ConnectivityState.READY, new RoundRobinPicker(readySubchannels, index)); + } else if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) { + this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); + } else if (this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] > 0) { + this.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker()); + } else { + this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); + } + } + + private updateState(newState: ConnectivityState, picker: Picker) { + if (newState === ConnectivityState.READY) { + this.currentReadyPicker = picker as RoundRobinPicker; + } else { + this.currentReadyPicker = null; + } + this.currentState = newState; + this.channelControlHelper.updateState(newState, picker); + } + + private resetSubchannelList() { + for (const subchannel of this.subchannels) { + subchannel.removeConnectivityStateListener(this.subchannelStateListener); + subchannel.unref(); + } + this.subchannelStateCounts = { + [ConnectivityState.CONNECTING]: 0, + [ConnectivityState.IDLE]: 0, + [ConnectivityState.READY]: 0, + [ConnectivityState.SHUTDOWN]: 0, + [ConnectivityState.TRANSIENT_FAILURE]: 0 + }; + this.subchannels = []; + } + + updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null): void { + this.resetSubchannelList(); + this.subchannels = addressList.map(address => this.channelControlHelper.createSubchannel(address, {})); + for (const subchannel of this.subchannels) { + const subchannelState = subchannel.getConnectivityState(); + this.subchannelStateCounts[subchannelState] += 1; + if (subchannelState === ConnectivityState.IDLE || subchannelState === ConnectivityState.TRANSIENT_FAILURE) { + subchannel.startConnecting(); + } + } + this.calculateAndUpdateState(); + } + + exitIdle(): void { + for (const subchannel of this.subchannels) { + subchannel.startConnecting(); + } + } + resetBackoff(): void { + /* The pick first load balancer does not have a connection backoff, so this + * does nothing */ + } + destroy(): void { + this.resetSubchannelList(); + } + getTypeName(): string { + return TYPE_NAME; + } + replaceChannelControlHelper(channelControlHelper: ChannelControlHelper): void { + this.channelControlHelper = channelControlHelper; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, RoundRobinLoadBalancer); +} diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index a74deea7..69114521 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -21,6 +21,7 @@ import { ConnectivityState } from './channel'; import { Picker } from './picker'; import { LoadBalancingConfig } from './load-balancing-config'; import * as load_balancer_pick_first from './load-balancer-pick-first'; +import * as load_balancer_round_robin from './load-balancer-round-robin'; /** * A collection of functions associated with a channel that a load balancer @@ -128,4 +129,5 @@ export function isLoadBalancerNameRegistered(typeName: string): boolean { export function registerAll() { load_balancer_pick_first.setup(); + load_balancer_round_robin.setup(); } diff --git a/packages/grpc-js/src/service-config.ts b/packages/grpc-js/src/service-config.ts index d9b60812..fc8cda04 100644 --- a/packages/grpc-js/src/service-config.ts +++ b/packages/grpc-js/src/service-config.ts @@ -124,7 +124,7 @@ function validateMethodConfig(obj: any): MethodConfig { return result; } -function validateServiceConfig(obj: any): ServiceConfig { +export function validateServiceConfig(obj: any): ServiceConfig { const result: ServiceConfig = { loadBalancingConfig: [], methodConfig: [],