Merge 1.9.x into master

This commit is contained in:
Michael Lumish 2024-01-17 14:28:31 -08:00
commit 3915f579f2
30 changed files with 546 additions and 163 deletions

View File

@ -42,7 +42,6 @@ can be set.
- `subchannel_internals` - Traces HTTP/2 session state. Includes per-call logs.
- `channel_stacktrace` - Traces channel construction events with stack traces.
- `keepalive` - Traces gRPC keepalive pings
- `index` - Traces module loading
- `outlier_detection` - Traces outlier detection events
The following tracers are added by the `@grpc/grpc-js-xds` library:

View File

@ -1,6 +1,6 @@
# @grpc/grpc-js xDS plugin
This package provides support for the `xds://` URL scheme to the `@grpc/grpc-js` library. The latest version of this package is compatible with `@grpc/grpc-js` version 1.2.x.
This package provides support for the `xds://` URL scheme to the `@grpc/grpc-js` library. The latest version of this package is compatible with `@grpc/grpc-js` version 1.9.x.
## Installation
@ -30,3 +30,5 @@ const client = new MyServiceClient('xds:///example.com:123');
- [Client Status Discovery Service](https://github.com/grpc/proposal/blob/master/A40-csds-support.md)
- [Outlier Detection](https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md)
- [xDS Retry Support](https://github.com/grpc/proposal/blob/master/A44-xds-retry.md)
- [xDS Aggregate and Logical DNS Clusters](https://github.com/grpc/proposal/blob/master/A37-xds-aggregate-and-logical-dns-clusters.md)'
- [xDS Federation](https://github.com/grpc/proposal/blob/master/A47-xds-federation.md) (Currently experimental, enabled by environment variable `GRPC_EXPERIMENTAL_XDS_FEDERATION`)

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js-xds",
"version": "1.8.2",
"version": "1.9.2",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js",
"scripts": {
@ -9,7 +9,7 @@
"clean": "gts clean",
"compile": "tsc",
"fix": "gts fix",
"prepare": "npm run compile",
"prepare": "npm run generate-types && npm run compile",
"pretest": "npm run compile",
"posttest": "npm run check",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto",
@ -50,7 +50,7 @@
"xxhash-wasm": "^1.0.2"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.8.0"
"@grpc/grpc-js": "~1.9.0"
},
"engines": {
"node": ">=10.10.0"

View File

@ -677,9 +677,11 @@ class XdsResolver implements Resolver {
destroy() {
if (this.listenerResourceName) {
ListenerResourceType.cancelWatch(this.xdsClient, this.listenerResourceName, this.ldsWatcher);
this.isLdsWatcherActive = false;
}
if (this.latestRouteConfigName) {
RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher);
this.latestRouteConfigName = null;
}
}

View File

@ -924,8 +924,8 @@ class XdsSingleServerClient {
}
onLrsStreamReceivedMessage() {
this.adsBackoff.stop();
this.adsBackoff.reset();
this.lrsBackoff.stop();
this.lrsBackoff.reset();
}
handleLrsStreamEnd() {

View File

@ -15,7 +15,7 @@
*
*/
import { credentials, loadPackageDefinition, ServiceError } from "@grpc/grpc-js";
import { ChannelOptions, credentials, loadPackageDefinition, ServiceError } from "@grpc/grpc-js";
import { loadSync } from "@grpc/proto-loader";
import { ProtoGrpcType } from "./generated/echo";
import { EchoTestServiceClient } from "./generated/grpc/testing/EchoTestService";
@ -44,14 +44,14 @@ export class XdsTestClient {
private client: EchoTestServiceClient;
private callInterval: NodeJS.Timer;
constructor(target: string, bootstrapInfo: string) {
this.client = new loadedProtos.grpc.testing.EchoTestService(target, credentials.createInsecure(), {[BOOTSTRAP_CONFIG_KEY]: bootstrapInfo});
constructor(target: string, bootstrapInfo: string, options?: ChannelOptions) {
this.client = new loadedProtos.grpc.testing.EchoTestService(target, credentials.createInsecure(), {...options, [BOOTSTRAP_CONFIG_KEY]: bootstrapInfo});
this.callInterval = setInterval(() => {}, 0);
clearInterval(this.callInterval);
}
static createFromServer(targetName: string, xdsServer: XdsServer) {
return new XdsTestClient(`xds:///${targetName}`, xdsServer.getBootstrapInfoString());
static createFromServer(targetName: string, xdsServer: XdsServer, options?: ChannelOptions) {
return new XdsTestClient(`xds:///${targetName}`, xdsServer.getBootstrapInfoString(), options);
}
startCalls(interval: number) {
@ -98,4 +98,8 @@ export class XdsTestClient {
}
sendInner(count, callback);
}
getConnectivityState() {
return this.client.getChannel().getConnectivityState(false);
}
}

View File

@ -22,6 +22,7 @@ import { XdsServer } from "./xds-server";
import { register } from "../src";
import assert = require("assert");
import { connectivityState } from "@grpc/grpc-js";
register();
@ -60,4 +61,34 @@ describe('core xDS functionality', () => {
}, reason => done(reason));
}, reason => done(reason));
});
it('should be able to enter and exit idle', function(done) {
this.timeout(5000);
const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}]);
const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]);
routeGroup.startAllBackends().then(() => {
xdsServer.setEdsResource(cluster.getEndpointConfig());
xdsServer.setCdsResource(cluster.getClusterConfig());
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
xdsServer.setLdsResource(routeGroup.getListener());
xdsServer.addResponseListener((typeUrl, responseState) => {
if (responseState.state === 'NACKED') {
client.stopCalls();
assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`);
}
})
client = XdsTestClient.createFromServer('listener1', xdsServer, {
'grpc.client_idle_timeout_ms': 1000,
});
client.sendOneCall(error => {
assert.ifError(error);
assert.strictEqual(client.getConnectivityState(), connectivityState.READY);
setTimeout(() => {
assert.strictEqual(client.getConnectivityState(), connectivityState.IDLE);
client.sendOneCall(error => {
done(error);
})
}, 1100);
});
}, reason => done(reason));
});
});

View File

@ -65,6 +65,7 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.service_config_disable_resolution`
- `grpc.client_idle_timeout_ms`
- `grpc-node.max_session_memory`
- `grpc-node.tls_enable_trace`
- `channelOverride`
- `channelFactoryOverride`

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.21",
"version": "1.9.14",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
@ -65,7 +65,7 @@
"generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto"
},
"dependencies": {
"@grpc/proto-loader": "^0.7.0",
"@grpc/proto-loader": "^0.7.8",
"@types/node": ">=12.12.47"
},
"files": [

View File

@ -78,6 +78,11 @@ export class BackoffTimeout {
* running is true.
*/
private startTime: Date = new Date();
/**
* The approximate time that the currently running timer will end. Only valid
* if running is true.
*/
private endTime: Date = new Date();
constructor(private callback: () => void, options?: BackoffOptions) {
if (options) {
@ -100,6 +105,8 @@ export class BackoffTimeout {
}
private runTimer(delay: number) {
this.endTime = this.startTime;
this.endTime.setMilliseconds(this.endTime.getMilliseconds() + this.nextDelay);
clearTimeout(this.timerId);
this.timerId = setTimeout(() => {
this.callback();
@ -178,4 +185,12 @@ export class BackoffTimeout {
this.hasRef = false;
this.timerId.unref?.();
}
/**
* Get the approximate timestamp of when the timer will fire. Only valid if
* this.isRunning() is true.
*/
getEndTime() {
return this.endTime;
}
}

View File

@ -30,6 +30,7 @@ import {
import { ChannelOptions } from './channel-options';
import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
import { URL } from 'url';
import { DEFAULT_PORT } from './resolver-dns';
const TRACER_NAME = 'proxy';
@ -189,12 +190,19 @@ export function getProxiedConnection(
if (parsedTarget === null) {
return Promise.resolve<ProxyConnectionResult>({});
}
const splitHostPost = splitHostPort(parsedTarget.path);
if (splitHostPost === null) {
return Promise.resolve<ProxyConnectionResult>({});
}
const hostPort = `${splitHostPost.host}:${
splitHostPost.port ?? DEFAULT_PORT
}`;
const options: http.RequestOptions = {
method: 'CONNECT',
path: parsedTarget.path,
path: hostPort,
};
const headers: http.OutgoingHttpHeaders = {
Host: parsedTarget.path,
Host: hostPort,
};
// Connect to the subchannel address as a proxy
if (isTcpSubchannelAddress(address)) {

View File

@ -276,14 +276,7 @@ import * as load_balancer_outlier_detection from './load-balancer-outlier-detect
import * as channelz from './channelz';
import { Deadline } from './deadline';
const clientVersion = require('../../package.json').version;
(() => {
logging.trace(
LogVerbosity.DEBUG,
'index',
'Loading @grpc/grpc-js version ' + clientVersion
);
resolver_dns.setup();
resolver_uds.setup();
resolver_ip.setup();

View File

@ -184,6 +184,7 @@ export class InternalChannel {
private callCount = 0;
private idleTimer: NodeJS.Timeout | null = null;
private readonly idleTimeoutMs: number;
private lastActivityTimestamp: Date;
// Channelz info
private readonly channelzEnabled: boolean = true;
@ -305,7 +306,9 @@ export class InternalChannel {
this.currentPicker = picker;
const queueCopy = this.pickQueue.slice();
this.pickQueue = [];
if (queueCopy.length > 0) {
this.callRefTimerUnref();
}
for (const call of queueCopy) {
call.doPick();
}
@ -358,11 +361,12 @@ export class InternalChannel {
process.nextTick(() => {
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
if (localQueue.length > 0) {
this.callRefTimerUnref();
}
for (const call of localQueue) {
call.getConfig();
}
this.configSelectionQueue = [];
});
},
status => {
@ -389,7 +393,9 @@ export class InternalChannel {
}
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
if (localQueue.length > 0) {
this.callRefTimerUnref();
}
for (const call of localQueue) {
call.reportResolverError(status);
}
@ -413,6 +419,7 @@ export class InternalChannel {
'Channel constructed \n' +
error.stack?.substring(error.stack.indexOf('\n') + 1)
);
this.lastActivityTimestamp = new Date();
}
private getChannelzInfo(): ChannelInfo {
@ -486,9 +493,7 @@ export class InternalChannel {
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
'CT_INFO',
ConnectivityState[this.connectivityState] +
' -> ' +
ConnectivityState[newState]
'Connectivity state change to ' + ConnectivityState[newState]
);
}
this.connectivityState = newState;
@ -562,20 +567,45 @@ export class InternalChannel {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.IDLE);
this.currentPicker = new QueuePicker(this.resolvingLoadBalancer);
if (this.idleTimer) {
clearTimeout(this.idleTimer);
this.idleTimer = null;
}
}
private maybeStartIdleTimer() {
if (this.callCount === 0) {
private startIdleTimeout(timeoutMs: number) {
this.idleTimer = setTimeout(() => {
if (this.callCount > 0) {
/* If there is currently a call, the channel will not go idle for a
* period of at least idleTimeoutMs, so check again after that time.
*/
this.startIdleTimeout(this.idleTimeoutMs);
return;
}
const now = new Date();
const timeSinceLastActivity = now.valueOf() - this.lastActivityTimestamp.valueOf();
if (timeSinceLastActivity >= this.idleTimeoutMs) {
this.trace(
'Idle timer triggered after ' +
this.idleTimeoutMs +
'ms of inactivity'
);
this.enterIdle();
}, this.idleTimeoutMs);
} else {
/* Whenever the timer fires with the latest activity being too recent,
* set the timer again for the time when the time since the last
* activity is equal to the timeout. This should result in the timer
* firing no more than once every idleTimeoutMs/2 on average. */
this.startIdleTimeout(this.idleTimeoutMs - timeSinceLastActivity);
}
}, timeoutMs);
this.idleTimer.unref?.();
}
private maybeStartIdleTimer() {
if (this.connectivityState !== ConnectivityState.SHUTDOWN && !this.idleTimer) {
this.startIdleTimeout(this.idleTimeoutMs);
}
}
private onCallStart() {
@ -583,10 +613,6 @@ export class InternalChannel {
this.callTracker.addCallStarted();
}
this.callCount += 1;
if (this.idleTimer) {
clearTimeout(this.idleTimer);
this.idleTimer = null;
}
}
private onCallEnd(status: StatusObject) {
@ -598,6 +624,7 @@ export class InternalChannel {
}
}
this.callCount -= 1;
this.lastActivityTimestamp = new Date();
this.maybeStartIdleTimer();
}
@ -717,6 +744,9 @@ export class InternalChannel {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.SHUTDOWN);
clearInterval(this.callRefTimer);
if (this.idleTimer) {
clearTimeout(this.idleTimer);
}
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
@ -732,6 +762,7 @@ export class InternalChannel {
const connectivityState = this.connectivityState;
if (tryToConnect) {
this.resolvingLoadBalancer.exitIdle();
this.lastActivityTimestamp = new Date();
this.maybeStartIdleTimer();
}
return connectivityState;

View File

@ -194,9 +194,11 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private subchannelStateListener: ConnectivityStateListener = (
subchannel,
previousState,
newState
newState,
keepaliveTime,
errorMessage
) => {
this.onSubchannelStateUpdate(subchannel, previousState, newState);
this.onSubchannelStateUpdate(subchannel, previousState, newState, errorMessage);
};
private pickedSubchannelHealthListener: HealthListener = () =>
@ -218,6 +220,20 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private reportHealthStatus: boolean;
/**
* Indicates whether we called channelControlHelper.requestReresolution since
* the last call to updateAddressList
*/
private requestedResolutionSinceLastUpdate = false;
/**
* The most recent error reported by any subchannel as it transitioned to
* TRANSIENT_FAILURE.
*/
private lastError: string | null = null;
private latestAddressList: SubchannelAddress[] | null = null;
/**
* Load balancer that attempts to connect to each backend in the address list
* in order, and picks the first one that connects, using it for every
@ -259,7 +275,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
if (this.stickyTransientFailureMode) {
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker()
new UnavailablePicker({details: `No connection established. Last error: ${this.lastError}`})
);
} else {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
@ -267,15 +283,28 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
}
private maybeEnterStickyTransientFailureMode() {
if (this.stickyTransientFailureMode) {
return;
private requestReresolution() {
this.requestedResolutionSinceLastUpdate = true;
this.channelControlHelper.requestReresolution();
}
private maybeEnterStickyTransientFailureMode() {
if (!this.allChildrenHaveReportedTF()) {
return;
}
if (!this.requestedResolutionSinceLastUpdate) {
/* Each time we get an update we reset each subchannel's
* hasReportedTransientFailure flag, so the next time we get to this
* point after that, each subchannel has reported TRANSIENT_FAILURE
* at least once since then. That is the trigger for requesting
* reresolution, whether or not the LB policy is already in sticky TF
* mode. */
this.requestReresolution();
}
if (this.stickyTransientFailureMode) {
return;
}
this.stickyTransientFailureMode = true;
this.channelControlHelper.requestReresolution();
for (const { subchannel } of this.children) {
subchannel.startConnecting();
}
@ -305,13 +334,14 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private onSubchannelStateUpdate(
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
newState: ConnectivityState,
errorMessage?: string
) {
if (this.currentPick?.realSubchannelEquals(subchannel)) {
if (newState !== ConnectivityState.READY) {
this.removeCurrentPick();
this.calculateAndReportNewState();
this.channelControlHelper.requestReresolution();
this.requestReresolution();
}
return;
}
@ -322,6 +352,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
if (newState === ConnectivityState.TRANSIENT_FAILURE) {
child.hasReportedTransientFailure = true;
if (errorMessage) {
this.lastError = errorMessage;
}
this.maybeEnterStickyTransientFailureMode();
if (index === this.currentSubchannelIndex) {
this.startNextSubchannelConnecting(index + 1);
@ -335,7 +368,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private startNextSubchannelConnecting(startIndex: number) {
clearTimeout(this.connectionDelayTimeout);
if (this.triedAllSubchannels || this.stickyTransientFailureMode) {
if (this.triedAllSubchannels) {
return;
}
for (const [index, child] of this.children.entries()) {
@ -408,7 +441,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private resetSubchannelList() {
for (const child of this.children) {
if (child.subchannel !== this.currentPick) {
if (!(this.currentPick && child.subchannel.realSubchannelEquals(this.currentPick))) {
/* The connectivity state listener is the same whether the subchannel
* is in the list of children or it is the currentPick, so if it is in
* both, removing it here would cause problems. In particular, that
@ -429,28 +462,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentSubchannelIndex = 0;
this.children = [];
this.triedAllSubchannels = false;
this.requestedResolutionSinceLastUpdate = false;
}
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig
): void {
if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {
return;
}
/* Previously, an update would be discarded if it was identical to the
* previous update, to minimize churn. Now the DNS resolver is
* rate-limited, so that is less of a concern. */
if (lbConfig.getShuffleAddressList()) {
endpointList = shuffled(endpointList);
}
const rawAddressList = ([] as SubchannelAddress[]).concat(
...endpointList.map(endpoint => endpoint.addresses)
);
if (rawAddressList.length === 0) {
throw new Error('No addresses in endpoint list passed to pick_first');
}
const addressList = interleaveAddressFamilies(rawAddressList);
private connectToAddressList(addressList: SubchannelAddress[]) {
const newChildrenList = addressList.map(address => ({
subchannel: this.channelControlHelper.createSubchannel(address, {}),
hasReportedTransientFailure: false,
@ -483,10 +498,34 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.calculateAndReportNewState();
}
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig
): void {
if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {
return;
}
/* Previously, an update would be discarded if it was identical to the
* previous update, to minimize churn. Now the DNS resolver is
* rate-limited, so that is less of a concern. */
if (lbConfig.getShuffleAddressList()) {
endpointList = shuffled(endpointList);
}
const rawAddressList = ([] as SubchannelAddress[]).concat(
...endpointList.map(endpoint => endpoint.addresses)
);
if (rawAddressList.length === 0) {
throw new Error('No addresses in endpoint list passed to pick_first');
}
const addressList = interleaveAddressFamilies(rawAddressList);
this.latestAddressList = addressList;
this.connectToAddressList(addressList);
}
exitIdle() {
/* The pick_first LB policy is only in the IDLE state if it has no
* addresses to try to connect to and it has no picked subchannel.
* In that case, there is no meaningful action that can be taken here. */
if (this.currentState === ConnectivityState.IDLE && this.latestAddressList) {
this.connectToAddressList(this.latestAddressList);
}
}
resetBackoff() {

View File

@ -100,6 +100,8 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
private childChannelControlHelper: ChannelControlHelper;
private lastError: string | null = null;
constructor(
private readonly channelControlHelper: ChannelControlHelper,
private readonly options: ChannelOptions
@ -154,7 +156,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
) {
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker()
new UnavailablePicker({details: `No connection established. Last error: ${this.lastError}`})
);
} else {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));

View File

@ -141,6 +141,13 @@ export class LoadBalancingCall implements Call {
.generateMetadata({ service_url: this.serviceUrl })
.then(
credsMetadata => {
/* If this call was cancelled (e.g. by the deadline) before
* metadata generation finished, we shouldn't do anything with
* it. */
if (this.ended) {
this.trace('Credentials metadata generation finished after call ended');
return;
}
finalMetadata.merge(credsMetadata);
if (finalMetadata.get('authorization').length > 1) {
this.outputStatus(

View File

@ -16,6 +16,9 @@
*/
import { LogVerbosity } from './constants';
import { pid } from 'process';
const clientVersion = require('../../package.json').version;
const DEFAULT_LOGGER: Partial<Console> = {
error: (message?: any, ...optionalParams: any[]) => {
@ -109,7 +112,7 @@ export function trace(
text: string
): void {
if (isTracerEnabled(tracer)) {
log(severity, new Date().toISOString() + ' | ' + tracer + ' | ' + text);
log(severity, new Date().toISOString() + ' | v' + clientVersion + ' ' + pid + ' | ' + tracer + ' | ' + text);
}
}

View File

@ -43,7 +43,7 @@ function trace(text: string): void {
/**
* The default TCP port to connect to if not explicitly specified in the target.
*/
const DEFAULT_PORT = 443;
export const DEFAULT_PORT = 443;
const DEFAULT_MIN_TIME_BETWEEN_RESOLUTIONS_MS = 30_000;
@ -75,6 +75,7 @@ class DnsResolver implements Resolver {
private nextResolutionTimer: NodeJS.Timeout;
private isNextResolutionTimerRunning = false;
private isServiceConfigEnabled = true;
private returnedIpResult = false;
constructor(
private target: GrpcUri,
private listener: ResolverListener,
@ -143,6 +144,7 @@ class DnsResolver implements Resolver {
*/
private startResolution() {
if (this.ipResult !== null) {
if (!this.returnedIpResult) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.listener.onSuccessfulResolution(
@ -153,8 +155,11 @@ class DnsResolver implements Resolver {
{}
);
});
this.returnedIpResult = true;
}
this.backoff.stop();
this.backoff.reset();
this.stopNextResolutionTimer();
return;
}
if (this.dnsHostname === null) {
@ -316,9 +321,9 @@ class DnsResolver implements Resolver {
private startResolutionWithBackoff() {
if (this.pendingLookupPromise === null) {
this.continueResolving = false;
this.startResolution();
this.backoff.runOnce();
this.startNextResolutionTimer();
this.startResolution();
}
}
@ -329,6 +334,11 @@ class DnsResolver implements Resolver {
* fires. Otherwise, start resolving immediately. */
if (this.pendingLookupPromise === null) {
if (this.isNextResolutionTimerRunning || this.backoff.isRunning()) {
if (this.isNextResolutionTimerRunning) {
trace('resolution update delayed by "min time between resolutions" rate limit');
} else {
trace('resolution update delayed by backoff timer until ' + this.backoff.getEndTime().toISOString());
}
this.continueResolving = true;
} else {
this.startResolutionWithBackoff();
@ -351,6 +361,7 @@ class DnsResolver implements Resolver {
this.latestLookupResult = null;
this.latestServiceConfig = null;
this.latestServiceConfigError = null;
this.returnedIpResult = false;
}
/**

View File

@ -41,6 +41,7 @@ const DEFAULT_PORT = 443;
class IpResolver implements Resolver {
private endpoints: Endpoint[] = [];
private error: StatusObject | null = null;
private hasReturnedResult = false;
constructor(
target: GrpcUri,
private listener: ResolverListener,
@ -87,6 +88,8 @@ class IpResolver implements Resolver {
trace('Parsed ' + target.scheme + ' address list ' + addresses);
}
updateResolution(): void {
if (!this.hasReturnedResult) {
this.hasReturnedResult = true;
process.nextTick(() => {
if (this.error) {
this.listener.onError(this.error);
@ -101,8 +104,9 @@ class IpResolver implements Resolver {
}
});
}
}
destroy(): void {
// This resolver owns no resources, so we do nothing here.
this.hasReturnedResult = false;
}
static getDefaultAuthority(target: GrpcUri): string {

View File

@ -20,6 +20,7 @@ import { GrpcUri } from './uri-parser';
import { ChannelOptions } from './channel-options';
class UdsResolver implements Resolver {
private hasReturnedResult = false;
private endpoints: Endpoint[] = [];
constructor(
target: GrpcUri,
@ -35,6 +36,8 @@ class UdsResolver implements Resolver {
this.endpoints = [{ addresses: [{ path }] }];
}
updateResolution(): void {
if (!this.hasReturnedResult) {
this.hasReturnedResult = true;
process.nextTick(
this.listener.onSuccessfulResolution,
this.endpoints,
@ -44,6 +47,7 @@ class UdsResolver implements Resolver {
{}
);
}
}
destroy() {
// This resolver owns no resources, so we do nothing here.

View File

@ -21,7 +21,11 @@ import {
TypedLoadBalancingConfig,
selectLbConfigFromList,
} from './load-balancer';
import { ServiceConfig, validateServiceConfig } from './service-config';
import {
MethodConfig,
ServiceConfig,
validateServiceConfig,
} from './service-config';
import { ConnectivityState } from './connectivity-state';
import { ConfigSelector, createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
@ -43,6 +47,59 @@ function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
type NameMatchLevel = 'EMPTY' | 'SERVICE' | 'SERVICE_AND_METHOD';
/**
* Name match levels in order from most to least specific. This is the order in
* which searches will be performed.
*/
const NAME_MATCH_LEVEL_ORDER: NameMatchLevel[] = [
'SERVICE_AND_METHOD',
'SERVICE',
'EMPTY',
];
function hasMatchingName(
service: string,
method: string,
methodConfig: MethodConfig,
matchLevel: NameMatchLevel
): boolean {
for (const name of methodConfig.name) {
switch (matchLevel) {
case 'EMPTY':
if (!name.service && !name.method) {
return true;
}
break;
case 'SERVICE':
if (name.service === service && !name.method) {
return true;
}
break;
case 'SERVICE_AND_METHOD':
if (name.service === service && name.method === method) {
return true;
}
}
}
return false;
}
function findMatchingConfig(
service: string,
method: string,
methodConfigs: MethodConfig[],
matchLevel: NameMatchLevel
): MethodConfig | null {
for (const config of methodConfigs) {
if (hasMatchingName(service, method, config, matchLevel)) {
return config;
}
}
return null;
}
function getDefaultConfigSelector(
serviceConfig: ServiceConfig | null
): ConfigSelector {
@ -54,14 +111,22 @@ function getDefaultConfigSelector(
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
for (const methodConfig of serviceConfig.methodConfig) {
for (const name of methodConfig.name) {
if (
name.service === service &&
(name.method === undefined || name.method === method)
) {
/* Check for the following in order, and return the first method
* config that matches:
* 1. A name that exactly matches the service and method
* 2. A name with no method set that matches the service
* 3. An empty name
*/
for (const matchLevel of NAME_MATCH_LEVEL_ORDER) {
const matchingConfig = findMatchingConfig(
service,
method,
serviceConfig.methodConfig,
matchLevel
);
if (matchingConfig) {
return {
methodConfig: methodConfig,
methodConfig: matchingConfig,
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: [],
@ -69,7 +134,6 @@ function getDefaultConfigSelector(
}
}
}
}
return {
methodConfig: { name: [] },
pickInformation: {},
@ -159,6 +223,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
* In that case, the backoff timer callback will call
* updateResolution */
if (this.backoffTimeout.isRunning()) {
trace('requestReresolution delayed by backoff timer until ' + this.backoffTimeout.getEndTime().toISOString());
this.continueResolving = true;
} else {
this.updateResolution();
@ -186,6 +251,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
) => {
this.backoffTimeout.stop();
this.backoffTimeout.reset();
let workingServiceConfig: ServiceConfig | null = null;
/* This first group of conditionals implements the algorithm described
* in https://github.com/grpc/proposal/blob/master/A21-service-config-error-handling.md

View File

@ -558,7 +558,7 @@ export class Http2ServerCallStream<
return metadata;
}
receiveUnaryMessage(encoding: string): Promise<RequestType | void> {
receiveUnaryMessage(encoding: string): Promise<RequestType> {
return new Promise((resolve, reject) => {
const { stream } = this;

View File

@ -233,6 +233,7 @@ export class Server {
* it is called twice, as it did previously.
*/
private started = false;
private shutdown = false;
private options: ChannelOptions;
private serverAddressString = 'null';
@ -699,6 +700,9 @@ export class Server {
creds: ServerCredentials,
callback: (error: Error | null, port: number) => void
): void {
if (this.shutdown) {
throw new Error('bindAsync called after shutdown');
}
if (typeof port !== 'string') {
throw new TypeError('port must be a string');
}
@ -920,6 +924,8 @@ export class Server {
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
this.shutdown = true;
}
register<RequestType, ResponseType>(
@ -983,6 +989,7 @@ export class Server {
wrappedCallback();
}
}
this.shutdown = true;
for (const server of this.http2Servers.keys()) {
pendingChecks++;

View File

@ -31,7 +31,7 @@ import { Status } from './constants';
import { Duration } from './duration';
export interface MethodConfigName {
service: string;
service?: string;
method?: string;
}
@ -95,20 +95,36 @@ const DURATION_REGEX = /^\d+(\.\d{1,9})?s$/;
const CLIENT_LANGUAGE_STRING = 'node';
function validateName(obj: any): MethodConfigName {
if (!('service' in obj) || typeof obj.service !== 'string') {
throw new Error('Invalid method config name: invalid service');
// In this context, and unset field and '' are considered the same
if ('service' in obj && obj.service !== '') {
if (typeof obj.service !== 'string') {
throw new Error(
`Invalid method config name: invalid service: expected type string, got ${typeof obj.service}`
);
}
const result: MethodConfigName = {
if ('method' in obj && obj.method !== '') {
if (typeof obj.method !== 'string') {
throw new Error(
`Invalid method config name: invalid method: expected type string, got ${typeof obj.service}`
);
}
return {
service: obj.service,
method: obj.method,
};
} else {
return {
service: obj.service,
};
if ('method' in obj) {
if (typeof obj.method === 'string') {
result.method = obj.method;
}
} else {
throw new Error('Invalid method config name: invalid method');
if ('method' in obj && obj.method !== undefined) {
throw new Error(
`Invalid method config name: method set with empty or unset service`
);
}
return {};
}
return result;
}
function validateRetryPolicy(obj: any): RetryPolicy {

View File

@ -501,9 +501,14 @@ export class Http2SubchannelCall implements SubchannelCall {
sendMessageWithContext(context: MessageContext, message: Buffer) {
this.trace('write() called with message of length ' + message.length);
const cb: WriteCallback = (error?: Error | null) => {
/* nextTick here ensures that no stream action can be taken in the call
* stack of the write callback, in order to hopefully work around
* https://github.com/nodejs/node/issues/49147 */
process.nextTick(() => {
let code: Status = Status.UNAVAILABLE;
if (
(error as NodeJS.ErrnoException)?.code === 'ERR_STREAM_WRITE_AFTER_END'
(error as NodeJS.ErrnoException)?.code ===
'ERR_STREAM_WRITE_AFTER_END'
) {
code = Status.INTERNAL;
}
@ -511,6 +516,7 @@ export class Http2SubchannelCall implements SubchannelCall {
this.cancelWithStatus(code, `Write error: ${error.message}`);
}
context.callback?.();
});
};
this.trace('sending data chunk of length ' + message.length);
this.callEventTracker.addMessageSent();

View File

@ -23,7 +23,8 @@ export type ConnectivityStateListener = (
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState,
keepaliveTime: number
keepaliveTime: number,
errorMessage?: string
) => void;
export type HealthListener = (healthy: boolean) => void;

View File

@ -245,12 +245,17 @@ export class Subchannel {
);
}
});
} else {
/* If we can't transition from CONNECTING to READY here, we will
* not be using this transport, so release its resources. */
transport.shutdown();
}
},
error => {
this.transitionToState(
[ConnectivityState.CONNECTING],
ConnectivityState.TRANSIENT_FAILURE
ConnectivityState.TRANSIENT_FAILURE,
`${error}`
);
}
);
@ -265,7 +270,8 @@ export class Subchannel {
*/
private transitionToState(
oldStates: ConnectivityState[],
newState: ConnectivityState
newState: ConnectivityState,
errorMessage?: string
): boolean {
if (oldStates.indexOf(this.connectivityState) === -1) {
return false;
@ -278,9 +284,7 @@ export class Subchannel {
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
'CT_INFO',
ConnectivityState[this.connectivityState] +
' -> ' +
ConnectivityState[newState]
'Connectivity state change to ' + ConnectivityState[newState]
);
}
const previousState = this.connectivityState;
@ -320,7 +324,7 @@ export class Subchannel {
throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
}
for (const listener of this.stateListeners) {
listener(this, previousState, newState, this.keepaliveTime);
listener(this, previousState, newState, this.keepaliveTime, errorMessage);
}
return true;
}

View File

@ -194,17 +194,18 @@ class Http2Transport implements Transport {
});
session.once(
'goaway',
(errorCode: number, lastStreamID: number, opaqueData: Buffer) => {
(errorCode: number, lastStreamID: number, opaqueData?: Buffer) => {
let tooManyPings = false;
/* See the last paragraph of
* https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */
if (
errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
opaqueData &&
opaqueData.equals(tooManyPingsData)
) {
tooManyPings = true;
}
this.trace('connection closed by GOAWAY with code ' + errorCode);
this.trace('connection closed by GOAWAY with code ' + errorCode + ' and data ' + opaqueData?.toString());
this.reportDisconnectToOwner(tooManyPings);
}
);
@ -426,6 +427,10 @@ class Http2Transport implements Transport {
try {
this.session!.ping(
(err: Error | null, duration: number, payload: Buffer) => {
if (err) {
this.keepaliveTrace('Ping failed with error ' + err.message);
this.handleDisconnect();
}
this.keepaliveTrace('Received ping response');
this.clearKeepaliveTimeout();
this.maybeStartKeepalivePingTimer();
@ -737,6 +742,7 @@ export class Http2SubchannelConnector implements SubchannelConnector {
connectionOptions
);
this.session = session;
let errorMessage = 'Failed to connect';
session.unref();
session.once('connect', () => {
session.removeAllListeners();
@ -745,10 +751,14 @@ export class Http2SubchannelConnector implements SubchannelConnector {
});
session.once('close', () => {
this.session = null;
reject();
// Leave time for error event to happen before rejecting
setImmediate(() => {
reject(`${errorMessage} (${new Date().toISOString()})`);
});
});
session.once('error', error => {
this.trace('connection failed with error ' + (error as Error).message);
errorMessage = (error as Error).message;
this.trace('connection failed with error ' + errorMessage);
});
});
}

View File

@ -38,7 +38,11 @@ function updateStateCallBackForExpectedStateSequence(
) {
const actualStateSequence: ConnectivityState[] = [];
let lastPicker: Picker | null = null;
let finished = false;
return (connectivityState: ConnectivityState, picker: Picker) => {
if (finished) {
return;
}
// Ignore duplicate state transitions
if (
connectivityState === actualStateSequence[actualStateSequence.length - 1]
@ -57,6 +61,7 @@ function updateStateCallBackForExpectedStateSequence(
if (
expectedStateSequence[actualStateSequence.length] !== connectivityState
) {
finished = true;
done(
new Error(
`Unexpected state ${
@ -66,10 +71,12 @@ function updateStateCallBackForExpectedStateSequence(
)}]`
)
);
return;
}
actualStateSequence.push(connectivityState);
lastPicker = picker;
if (actualStateSequence.length === expectedStateSequence.length) {
finished = true;
done();
}
};
@ -537,6 +544,115 @@ describe('pick_first load balancing policy', () => {
});
});
});
it('Should request reresolution every time each child reports TF', done => {
let reresolutionRequestCount = 0;
const targetReresolutionRequestCount = 3;
const currentStartState = ConnectivityState.IDLE;
const channelControlHelper = createChildChannelControlHelper(
baseChannelControlHelper,
{
createSubchannel: (subchannelAddress, subchannelArgs) => {
const subchannel = new MockSubchannel(
subchannelAddressToString(subchannelAddress),
currentStartState
);
subchannels.push(subchannel);
return subchannel;
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.CONNECTING, ConnectivityState.TRANSIENT_FAILURE],
err => setImmediate(() => {
assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount);
done(err);
})
),
requestReresolution: () => {
reresolutionRequestCount += 1;
}
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 3 }]}], config);
process.nextTick(() => {
subchannels[2].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
});
});
});
});
});
});
it('Should request reresolution if the new subchannels are already in TF', done => {
let reresolutionRequestCount = 0;
const targetReresolutionRequestCount = 3;
const currentStartState = ConnectivityState.TRANSIENT_FAILURE;
const channelControlHelper = createChildChannelControlHelper(
baseChannelControlHelper,
{
createSubchannel: (subchannelAddress, subchannelArgs) => {
const subchannel = new MockSubchannel(
subchannelAddressToString(subchannelAddress),
currentStartState
);
subchannels.push(subchannel);
return subchannel;
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.TRANSIENT_FAILURE],
err => setImmediate(() => {
assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount);
done(err);
})
),
requestReresolution: () => {
reresolutionRequestCount += 1;
}
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config);
process.nextTick(() => {
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config);
process.nextTick(() => {
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config);
});
});
});
it('Should reconnect to the same address list if exitIdle is called', done => {
const currentStartState = ConnectivityState.READY;
const channelControlHelper = createChildChannelControlHelper(
baseChannelControlHelper,
{
createSubchannel: (subchannelAddress, subchannelArgs) => {
const subchannel = new MockSubchannel(
subchannelAddressToString(subchannelAddress),
currentStartState
);
subchannels.push(subchannel);
return subchannel;
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.READY, ConnectivityState.IDLE, ConnectivityState.READY],
done
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
process.nextTick(() => {
pickFirst.exitIdle();
});
});
});
describe('Address list randomization', () => {
const shuffleConfig = new PickFirstLoadBalancingConfig(true);
it('Should pick different subchannels after multiple updates', done => {

View File

@ -211,7 +211,7 @@ describe('Server', () => {
client!.makeUnaryRequest('/math.Math/Div', x => x, x => x, Buffer.from('abc'), {deadline: deadline}, (callError2, result) => {
assert(callError2);
// DEADLINE_EXCEEDED means that the server is unreachable
assert.strictEqual(callError2.code, grpc.status.DEADLINE_EXCEEDED);
assert(callError2.code === grpc.status.DEADLINE_EXCEEDED || callError2.code === grpc.status.UNAVAILABLE);
done();
});
});
@ -228,7 +228,7 @@ describe('Server', () => {
});
});
describe.only('drain', () => {
describe('drain', () => {
let client: ServiceClient;
let portNumber: number;
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');