Merge pull request #2363 from murgatroid99/grpc-js_channel_keepalive_throttling

grpc-js: Propagate keepalive throttling throughout channel
This commit is contained in:
Michael Lumish 2023-02-15 15:44:48 -08:00 committed by GitHub
commit 72b99a1413
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 81 additions and 18 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.8",
"version": "1.8.9",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -51,6 +51,7 @@ import { ResolvingCall } from './resolving-call';
import { getNextCallNumber } from './call-number';
import { restrictControlPlaneStatusCode } from './control-plane-status';
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';
import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';
/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
@ -84,6 +85,33 @@ const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB
class ChannelSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface {
private stateListeners: ConnectivityStateListener[] = [];
private refCount = 0;
constructor(childSubchannel: SubchannelInterface, private channel: InternalChannel) {
super(childSubchannel);
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => {
channel.throttleKeepalive(keepaliveTime);
for (const listener of this.stateListeners) {
listener(this, previousState, newState, keepaliveTime);
}
});
}
ref(): void {
this.child.ref();
this.refCount += 1;
}
unref(): void {
this.child.unref();
this.refCount -= 1;
if (this.refCount <= 0) {
this.channel.removeWrappedSubchannel(this);
}
}
}
export class InternalChannel {
private resolvingLoadBalancer: ResolvingLoadBalancer;
@ -116,8 +144,10 @@ export class InternalChannel {
* configSelector becomes set or the channel state becomes anything other
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private keepaliveTime: number;
private wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();
// Channelz info
private readonly channelzEnabled: boolean = true;
@ -190,6 +220,7 @@ export class InternalChannel {
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
@ -201,10 +232,13 @@ export class InternalChannel {
Object.assign({}, this.options, subchannelArgs),
this.credentials
);
subchannel.throttleKeepalive(this.keepaliveTime);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
}
return subchannel;
const wrappedSubchannel = new ChannelSubchannelWrapper(subchannel, this);
this.wrappedSubchannels.add(wrappedSubchannel);
return wrappedSubchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.currentPicker = picker;
@ -369,6 +403,19 @@ export class InternalChannel {
}
}
throttleKeepalive(newKeepaliveTime: number) {
if (newKeepaliveTime > this.keepaliveTime) {
this.keepaliveTime = newKeepaliveTime;
for (const wrappedSubchannel of this.wrappedSubchannels) {
wrappedSubchannel.throttleKeepalive(newKeepaliveTime);
}
}
}
removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
this.wrappedSubchannels.delete(wrappedSubchannel);
}
doPick(metadata: Metadata, extraPickInfo: {[key: string]: string}) {
return this.currentPicker.pick({metadata: metadata, extraPickInfo: extraPickInfo});
}

View File

@ -205,11 +205,11 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
constructor(childSubchannel: SubchannelInterface, private mapEntry?: MapEntry) {
super(childSubchannel);
this.childSubchannelState = childSubchannel.getConnectivityState();
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState) => {
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => {
this.childSubchannelState = newState;
if (!this.ejected) {
for (const listener of this.stateListeners) {
listener(this, previousState, newState);
listener(this, previousState, newState, keepaliveTime);
}
}
});
@ -265,14 +265,14 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
eject() {
this.ejected = true;
for (const listener of this.stateListeners) {
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE);
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE, -1);
}
}
uneject() {
this.ejected = false;
for (const listener of this.stateListeners) {
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState);
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState, -1);
}
}

View File

@ -22,7 +22,8 @@ import { Subchannel } from "./subchannel";
export type ConnectivityStateListener = (
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
newState: ConnectivityState,
keepaliveTime: number
) => void;
/**
@ -40,6 +41,7 @@ export interface SubchannelInterface {
removeConnectivityStateListener(listener: ConnectivityStateListener): void;
startConnecting(): void;
getAddress(): string;
throttleKeepalive(newKeepaliveTime: number): void;
ref(): void;
unref(): void;
getChannelzRef(): SubchannelRef;
@ -67,6 +69,9 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
getAddress(): string {
return this.child.getAddress();
}
throttleKeepalive(newKeepaliveTime: number): void {
this.child.throttleKeepalive(newKeepaliveTime);
}
ref(): void {
this.child.ref();
}

View File

@ -64,7 +64,7 @@ export class Subchannel {
private backoffTimeout: BackoffTimeout;
private keepaliveTimeMultiplier = 1;
private keepaliveTime: number;
/**
* Tracks channels and subchannel pools with references to this subchannel
*/
@ -111,6 +111,8 @@ export class Subchannel {
}, backoffOptions);
this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
@ -169,7 +171,7 @@ export class Subchannel {
private startConnectingInternal() {
let options = this.options;
if (options['grpc.keepalive_time_ms']) {
const adjustedKeepaliveTime = Math.min(options['grpc.keepalive_time_ms'] * this.keepaliveTimeMultiplier, KEEPALIVE_MAX_TIME_MS);
const adjustedKeepaliveTime = Math.min(this.keepaliveTime, KEEPALIVE_MAX_TIME_MS);
options = {...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime};
}
this.connector.connect(this.subchannelAddress, this.credentials, options).then(
@ -181,14 +183,14 @@ export class Subchannel {
}
transport.addDisconnectListener((tooManyPings) => {
this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
if (tooManyPings) {
this.keepaliveTimeMultiplier *= 2;
if (tooManyPings && this.keepaliveTime > 0) {
this.keepaliveTime *= 2;
logging.log(
LogVerbosity.ERROR,
`Connection to ${uriToString(this.channelTarget)} at ${
this.subchannelAddressString
} rejected by server because of excess pings. Increasing ping interval multiplier to ${
this.keepaliveTimeMultiplier
} rejected by server because of excess pings. Increasing ping interval to ${
this.keepaliveTime
} ms`
);
}
@ -262,7 +264,7 @@ export class Subchannel {
/* We use a shallow copy of the stateListeners array in case a listener
* is removed during this iteration */
for (const listener of [...this.stateListeners]) {
listener(this, previousState, newState);
listener(this, previousState, newState, this.keepaliveTime);
}
return true;
}
@ -403,4 +405,10 @@ export class Subchannel {
getRealSubchannel(): this {
return this;
}
throttleKeepalive(newKeepaliveTime: number) {
if (newKeepaliveTime > this.keepaliveTime) {
this.keepaliveTime = newKeepaliveTime;
}
}
}

View File

@ -77,7 +77,7 @@ class Http2Transport implements Transport {
/**
* The amount of time in between sending pings
*/
private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS;
private keepaliveTimeMs: number = -1;
/**
* The amount of time to wait for an acknowledgement after sending a ping
*/
@ -133,7 +133,7 @@ class Http2Transport implements Transport {
]
.filter((e) => e)
.join(' '); // remove falsey values first
if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
}
@ -334,6 +334,9 @@ class Http2Transport implements Transport {
}
private startKeepalivePings() {
if (this.keepaliveTimeMs < 0) {
return;
}
this.keepaliveIntervalId = setInterval(() => {
this.sendPing();
}, this.keepaliveTimeMs);