Add subchannel interface

This commit is contained in:
Michael Lumish 2022-01-11 11:07:19 -08:00
parent 81ef5e33c4
commit 4b3c26382b
9 changed files with 109 additions and 24 deletions

View File

@ -49,6 +49,7 @@ import { Filter } from './filter';
import { ConnectivityState } from './connectivity-state'; import { ConnectivityState } from './connectivity-state';
import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz'; import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
import { Subchannel } from './subchannel';
/** /**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
@ -451,7 +452,7 @@ export class ChannelImplementation implements Channel {
if (subchannelState === ConnectivityState.READY) { if (subchannelState === ConnectivityState.READY) {
try { try {
const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream)); const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
pickResult.subchannel!.startCallStream( pickResult.subchannel?.getRealSubchannel().startCallStream(
finalMetadata, finalMetadata,
callStream, callStream,
[...dynamicFilters, ...pickExtraFilters] [...dynamicFilters, ...pickExtraFilters]

View File

@ -34,3 +34,4 @@ export { Call as CallStream } from './call-stream';
export { Filter, BaseFilter, FilterFactory } from './filter'; export { Filter, BaseFilter, FilterFactory } from './filter';
export { FilterStackFactory } from './filter-stack'; export { FilterStackFactory } from './filter-stack';
export { registerAdminService } from './admin'; export { registerAdminService } from './admin';
export { SubchannelInterface, BaseSubchannelWrapper, ConnectivityStateListener } from './subchannel-interface'

View File

@ -21,12 +21,12 @@ import {
LoadBalancingConfig, LoadBalancingConfig,
createLoadBalancer, createLoadBalancer,
} from './load-balancer'; } from './load-balancer';
import { Subchannel } from './subchannel';
import { SubchannelAddress } from './subchannel-address'; import { SubchannelAddress } from './subchannel-address';
import { ChannelOptions } from './channel-options'; import { ChannelOptions } from './channel-options';
import { ConnectivityState } from './connectivity-state'; import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker'; import { Picker } from './picker';
import { ChannelRef, SubchannelRef } from './channelz'; import { ChannelRef, SubchannelRef } from './channelz';
import { SubchannelInterface } from './subchannel-interface';
const TYPE_NAME = 'child_load_balancer_helper'; const TYPE_NAME = 'child_load_balancer_helper';
@ -40,7 +40,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
createSubchannel( createSubchannel(
subchannelAddress: SubchannelAddress, subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions subchannelArgs: ChannelOptions
): Subchannel { ): SubchannelInterface {
return this.parent.channelControlHelper.createSubchannel( return this.parent.channelControlHelper.createSubchannel(
subchannelAddress, subchannelAddress,
subchannelArgs subchannelArgs

View File

@ -31,13 +31,13 @@ import {
PickResultType, PickResultType,
UnavailablePicker, UnavailablePicker,
} from './picker'; } from './picker';
import { Subchannel, ConnectivityStateListener } from './subchannel';
import { import {
SubchannelAddress, SubchannelAddress,
subchannelAddressToString, subchannelAddressToString,
} from './subchannel-address'; } from './subchannel-address';
import * as logging from './logging'; import * as logging from './logging';
import { LogVerbosity } from './constants'; import { LogVerbosity } from './constants';
import { SubchannelInterface, ConnectivityStateListener } from './subchannel-interface';
const TRACER_NAME = 'pick_first'; const TRACER_NAME = 'pick_first';
@ -77,7 +77,7 @@ export class PickFirstLoadBalancingConfig implements LoadBalancingConfig {
* picked subchannel. * picked subchannel.
*/ */
class PickFirstPicker implements Picker { class PickFirstPicker implements Picker {
constructor(private subchannel: Subchannel) {} constructor(private subchannel: SubchannelInterface) {}
pick(pickArgs: PickArgs): CompletePickResult { pick(pickArgs: PickArgs): CompletePickResult {
return { return {
@ -107,7 +107,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* The list of subchannels this load balancer is currently attempting to * The list of subchannels this load balancer is currently attempting to
* connect to. * connect to.
*/ */
private subchannels: Subchannel[] = []; private subchannels: SubchannelInterface[] = [];
/** /**
* The current connectivity state of the load balancer. * The current connectivity state of the load balancer.
*/ */
@ -124,7 +124,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* and only if the load balancer's current state is READY. In that case, * and only if the load balancer's current state is READY. In that case,
* the subchannel's current state is also READY. * the subchannel's current state is also READY.
*/ */
private currentPick: Subchannel | null = null; private currentPick: SubchannelInterface | null = null;
/** /**
* Listener callback attached to each subchannel in the `subchannels` list * Listener callback attached to each subchannel in the `subchannels` list
* while establishing a connection. * while establishing a connection.
@ -157,7 +157,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
[ConnectivityState.TRANSIENT_FAILURE]: 0, [ConnectivityState.TRANSIENT_FAILURE]: 0,
}; };
this.subchannelStateListener = ( this.subchannelStateListener = (
subchannel: Subchannel, subchannel: SubchannelInterface,
previousState: ConnectivityState, previousState: ConnectivityState,
newState: ConnectivityState newState: ConnectivityState
) => { ) => {
@ -219,7 +219,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
} }
}; };
this.pickedSubchannelStateListener = ( this.pickedSubchannelStateListener = (
subchannel: Subchannel, subchannel: SubchannelInterface,
previousState: ConnectivityState, previousState: ConnectivityState,
newState: ConnectivityState newState: ConnectivityState
) => { ) => {
@ -310,7 +310,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}, CONNECTION_DELAY_INTERVAL_MS); }, CONNECTION_DELAY_INTERVAL_MS);
} }
private pickSubchannel(subchannel: Subchannel) { private pickSubchannel(subchannel: SubchannelInterface) {
trace('Pick subchannel with address ' + subchannel.getAddress()); trace('Pick subchannel with address ' + subchannel.getAddress());
if (this.currentPick !== null) { if (this.currentPick !== null) {
this.currentPick.unref(); this.currentPick.unref();

View File

@ -30,13 +30,13 @@ import {
PickResultType, PickResultType,
UnavailablePicker, UnavailablePicker,
} from './picker'; } from './picker';
import { Subchannel, ConnectivityStateListener } from './subchannel';
import { import {
SubchannelAddress, SubchannelAddress,
subchannelAddressToString, subchannelAddressToString,
} from './subchannel-address'; } from './subchannel-address';
import * as logging from './logging'; import * as logging from './logging';
import { LogVerbosity } from './constants'; import { LogVerbosity } from './constants';
import { ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';
const TRACER_NAME = 'round_robin'; const TRACER_NAME = 'round_robin';
@ -67,7 +67,7 @@ class RoundRobinLoadBalancingConfig implements LoadBalancingConfig {
class RoundRobinPicker implements Picker { class RoundRobinPicker implements Picker {
constructor( constructor(
private readonly subchannelList: Subchannel[], private readonly subchannelList: SubchannelInterface[],
private nextIndex = 0 private nextIndex = 0
) {} ) {}
@ -88,7 +88,7 @@ class RoundRobinPicker implements Picker {
* balancer implementation to preserve this part of the picker state if * balancer implementation to preserve this part of the picker state if
* possible when a subchannel connects or disconnects. * possible when a subchannel connects or disconnects.
*/ */
peekNextSubchannel(): Subchannel { peekNextSubchannel(): SubchannelInterface {
return this.subchannelList[this.nextIndex]; return this.subchannelList[this.nextIndex];
} }
} }
@ -102,7 +102,7 @@ interface ConnectivityStateCounts {
} }
export class RoundRobinLoadBalancer implements LoadBalancer { export class RoundRobinLoadBalancer implements LoadBalancer {
private subchannels: Subchannel[] = []; private subchannels: SubchannelInterface[] = [];
private currentState: ConnectivityState = ConnectivityState.IDLE; private currentState: ConnectivityState = ConnectivityState.IDLE;
@ -121,7 +121,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
[ConnectivityState.TRANSIENT_FAILURE]: 0, [ConnectivityState.TRANSIENT_FAILURE]: 0,
}; };
this.subchannelStateListener = ( this.subchannelStateListener = (
subchannel: Subchannel, subchannel: SubchannelInterface,
previousState: ConnectivityState, previousState: ConnectivityState,
newState: ConnectivityState newState: ConnectivityState
) => { ) => {

View File

@ -21,6 +21,7 @@ import { SubchannelAddress } from './subchannel-address';
import { ConnectivityState } from './connectivity-state'; import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker'; import { Picker } from './picker';
import { ChannelRef, SubchannelRef } from './channelz'; import { ChannelRef, SubchannelRef } from './channelz';
import { SubchannelInterface } from './subchannel-interface';
/** /**
* A collection of functions associated with a channel that a load balancer * A collection of functions associated with a channel that a load balancer
@ -35,7 +36,7 @@ export interface ChannelControlHelper {
createSubchannel( createSubchannel(
subchannelAddress: SubchannelAddress, subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions subchannelArgs: ChannelOptions
): Subchannel; ): SubchannelInterface;
/** /**
* Passes a new subchannel picker up to the channel. This is called if either * Passes a new subchannel picker up to the channel. This is called if either
* the connectivity state changes or if a different picker is needed for any * the connectivity state changes or if a different picker is needed for any

View File

@ -21,6 +21,7 @@ import { Metadata } from './metadata';
import { Status } from './constants'; import { Status } from './constants';
import { LoadBalancer } from './load-balancer'; import { LoadBalancer } from './load-balancer';
import { FilterFactory, Filter } from './filter'; import { FilterFactory, Filter } from './filter';
import { SubchannelInterface } from './subchannel-interface';
export enum PickResultType { export enum PickResultType {
COMPLETE, COMPLETE,
@ -36,7 +37,7 @@ export interface PickResult {
* `pickResultType` is COMPLETE. If null, indicates that the call should be * `pickResultType` is COMPLETE. If null, indicates that the call should be
* dropped. * dropped.
*/ */
subchannel: Subchannel | null; subchannel: SubchannelInterface | null;
/** /**
* The status object to end the call with. Populated if and only if * The status object to end the call with. Populated if and only if
* `pickResultType` is TRANSIENT_FAILURE. * `pickResultType` is TRANSIENT_FAILURE.
@ -53,7 +54,7 @@ export interface PickResult {
export interface CompletePickResult extends PickResult { export interface CompletePickResult extends PickResult {
pickResultType: PickResultType.COMPLETE; pickResultType: PickResultType.COMPLETE;
subchannel: Subchannel | null; subchannel: SubchannelInterface | null;
status: null; status: null;
extraFilterFactories: FilterFactory<Filter>[]; extraFilterFactories: FilterFactory<Filter>[];
onCallStarted: (() => void) | null; onCallStarted: (() => void) | null;

View File

@ -0,0 +1,82 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { SubchannelRef } from "./channelz";
import { ConnectivityState } from "./connectivity-state";
import { Subchannel } from "./subchannel";
export type ConnectivityStateListener = (
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
) => void;
/**
* This is an interface for load balancing policies to use to interact with
* subchannels. This allows load balancing policies to wrap and unwrap
* subchannels.
*
* Any load balancing policy that wraps subchannels must unwrap the subchannel
* in the picker, so that other load balancing policies consistently have
* access to their own wrapper objects.
*/
export interface SubchannelInterface {
getConnectivityState(): ConnectivityState;
addConnectivityStateListener(listener: ConnectivityStateListener): void;
removeConnectivityStateListener(listener: ConnectivityStateListener): void;
startConnecting(): void;
getAddress(): string;
ref(): void;
unref(): void;
getChannelzRef(): SubchannelRef;
/**
* If this is a wrapper, return the wrapped subchannel, otherwise return this
*/
getRealSubchannel(): Subchannel;
}
export abstract class BaseSubchannelWrapper implements SubchannelInterface {
constructor(private child: SubchannelInterface) {}
getConnectivityState(): ConnectivityState {
return this.child.getConnectivityState();
}
addConnectivityStateListener(listener: ConnectivityStateListener): void {
this.child.addConnectivityStateListener(listener);
}
removeConnectivityStateListener(listener: ConnectivityStateListener): void {
this.child.removeConnectivityStateListener(listener);
}
startConnecting(): void {
this.child.startConnecting();
}
getAddress(): string {
return this.child.getAddress();
}
ref(): void {
this.child.ref();
}
unref(): void {
this.child.unref();
}
getChannelzRef(): SubchannelRef {
return this.child.getChannelzRef();
}
getRealSubchannel(): Subchannel {
return this.child.getRealSubchannel();
}
}

View File

@ -37,6 +37,7 @@ import {
subchannelAddressToString, subchannelAddressToString,
} from './subchannel-address'; } from './subchannel-address';
import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz'; import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz';
import { ConnectivityStateListener } from './subchannel-interface';
const clientVersion = require('../../package.json').version; const clientVersion = require('../../package.json').version;
@ -54,12 +55,6 @@ const BACKOFF_JITTER = 0.2;
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000; const KEEPALIVE_TIMEOUT_MS = 20000;
export type ConnectivityStateListener = (
subchannel: Subchannel,
previousState: ConnectivityState,
newState: ConnectivityState
) => void;
export interface SubchannelCallStatsTracker { export interface SubchannelCallStatsTracker {
addMessageSent(): void; addMessageSent(): void;
addMessageReceived(): void; addMessageReceived(): void;
@ -949,4 +944,8 @@ export class Subchannel {
getChannelzRef(): SubchannelRef { getChannelzRef(): SubchannelRef {
return this.channelzRef; return this.channelzRef;
} }
getRealSubchannel(): this {
return this;
}
} }