Merge remote-tracking branch 'upstream/@grpc/grpc-js@1.5.x' into v1.5.x_upmerge

This commit is contained in:
Michael Lumish 2022-03-24 10:11:52 -07:00
commit 312fb9b737
18 changed files with 277 additions and 37 deletions

View File

@ -36,7 +36,10 @@ can be set.
- `server` - Traces high-level server events
- `server_call` - Traces server handling of individual requests
- `subchannel` - Traces subchannel connectivity state and errors
- `subchannel_refcount` - Traces subchannel refcount changes
- `subchannel_refcount` - Traces subchannel refcount changes. Includes per-call logs.
- `subchannel_flowctrl` - Traces HTTP/2 flow control. Includes per-call logs.
- `subchannel_internals` - Traces HTTP/2 session state. Includes per-call logs.
- `channel_stacktrace` - Traces channel construction events with stack traces.
The following tracers are added by the `@grpc/grpc-js-xds` library:
- `cds_balancer` - Traces the CDS load balancing policy

View File

@ -26,4 +26,5 @@ const client = new MyServiceClient('xds:///example.com:123');
- [xDS v3 API](https://github.com/grpc/proposal/blob/master/A30-xds-v3.md)
- [xDS Timeouts](https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md)
- [xDS Circuit Breaking](https://github.com/grpc/proposal/blob/master/A32-xds-circuit-breaking.md)
- [xDS Client-Side Fault Injection](https://github.com/grpc/proposal/blob/master/A33-Fault-Injection.md)
- [xDS Client-Side Fault Injection](https://github.com/grpc/proposal/blob/master/A33-Fault-Injection.md)
- [Client Status Discovery Service](https://github.com/grpc/proposal/blob/master/A40-csds-support.md)

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js-xds",
"version": "1.4.0",
"version": "1.5.2",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js",
"scripts": {
@ -47,7 +47,7 @@
"re2-wasm": "^1.0.1"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.4.0"
"@grpc/grpc-js": "~1.5.0"
},
"engines": {
"node": ">=10.10.0"
@ -55,6 +55,7 @@
"files": [
"src/**/*.ts",
"build/src/**/*.{js,d.ts,js.map}",
"deps/envoy-api/envoy/admin/v3/**/*.proto",
"deps/envoy-api/envoy/api/v2/**/*.proto",
"deps/envoy-api/envoy/config/**/*.proto",
"deps/envoy-api/envoy/service/**/*.proto",

View File

@ -56,6 +56,8 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.max_send_message_length`
- `grpc.max_receive_message_length`
- `grpc.enable_http_proxy`
- `grpc.default_compression_algorithm`
- `grpc.enable_channelz`
- `grpc-node.max_session_memory`
- `channelOverride`
- `channelFactoryOverride`

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.4.6",
"version": "1.5.9",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -312,6 +312,13 @@ export class Http2CallStream implements Call {
const filteredStatus = this.filterStack.receiveTrailers(
this.finalStatus!
);
this.trace(
'ended with status: code=' +
filteredStatus.code +
' details="' +
filteredStatus.details +
'"'
);
this.statusWatchers.forEach(watcher => watcher(filteredStatus));
/* We delay the actual action of bubbling up the status to insulate the
* cleanup code in this class from any errors that may be thrown in the
@ -346,13 +353,6 @@ export class Http2CallStream implements Call {
/* If the status is OK and a new status comes in (e.g. from a
* deserialization failure), that new status takes priority */
if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
this.trace(
'ended with status: code=' +
status.code +
' details="' +
status.details +
'"'
);
this.finalStatus = status;
this.maybeOutputStatus();
}
@ -795,6 +795,10 @@ export class Http2CallStream implements Call {
this.filterStack.push(extraFilters);
}
getCallNumber() {
return this.callNumber;
}
startRead() {
/* If the stream has ended with an error, we should not emit any more
* messages and we should communicate that the stream has ended */

View File

@ -36,6 +36,9 @@ export interface ChannelOptions {
'grpc.max_send_message_length'?: number;
'grpc.max_receive_message_length'?: number;
'grpc.enable_http_proxy'?: number;
/* http_connect_target and http_connect_creds are used for passing data
* around internally, and should not be documented as public-facing options
*/
'grpc.http_connect_target'?: string;
'grpc.http_connect_creds'?: string;
'grpc.default_compression_algorithm'?: CompressionAlgorithms;

View File

@ -336,6 +336,8 @@ export class ChannelImplementation implements Channel {
new CompressionFilterFactory(this, this.options),
]);
this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
const error = new Error();
trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1));
}
private getChannelzInfo(): ChannelInfo {
@ -405,11 +407,16 @@ export class ChannelImplementation implements Channel {
metadata: callMetadata,
extraPickInfo: callConfig.pickInformation,
});
const subchannelString = pickResult.subchannel ?
'(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
'' + pickResult.subchannel;
this.trace(
'Pick result: ' +
'Pick result for call [' +
callStream.getCallNumber() +
']: ' +
PickResultType[pickResult.pickResultType] +
' subchannel: ' +
pickResult.subchannel?.getAddress() +
subchannelString +
' status: ' +
pickResult.status?.code +
' ' +
@ -434,7 +441,7 @@ export class ChannelImplementation implements Channel {
log(
LogVerbosity.ERROR,
'Error: COMPLETE pick result subchannel ' +
pickResult.subchannel!.getAddress() +
subchannelString +
' has state ' +
ConnectivityState[pickResult.subchannel!.getConnectivityState()]
);
@ -462,9 +469,9 @@ export class ChannelImplementation implements Channel {
callConfig.onCommitted?.();
pickResult.onCallStarted?.();
} catch (error) {
if (
(error as NodeJS.ErrnoException).code ===
'ERR_HTTP2_GOAWAY_SESSION'
const errorCode = (error as NodeJS.ErrnoException).code;
if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' ||
errorCode === 'ERR_HTTP2_INVALID_SESSION'
) {
/* An error here indicates that something went wrong with
* the picked subchannel's http2 stream right before we
@ -481,7 +488,7 @@ export class ChannelImplementation implements Channel {
* tryPick */
this.trace(
'Failed to start call on picked subchannel ' +
pickResult.subchannel!.getAddress() +
subchannelString +
' with error ' +
(error as Error).message +
'. Retrying pick',
@ -491,7 +498,7 @@ export class ChannelImplementation implements Channel {
} else {
this.trace(
'Failed to start call on picked subchanel ' +
pickResult.subchannel!.getAddress() +
subchannelString +
' with error ' +
(error as Error).message +
'. Ending call',
@ -510,7 +517,7 @@ export class ChannelImplementation implements Channel {
* block above */
this.trace(
'Picked subchannel ' +
pickResult.subchannel!.getAddress() +
subchannelString +
' has state ' +
ConnectivityState[subchannelState] +
' after metadata filters. Retrying pick',

View File

@ -125,9 +125,9 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
}
exitIdle(): void {
if (this.currentChild) {
this.currentChild.resetBackoff();
this.currentChild.exitIdle();
if (this.pendingChild) {
this.pendingChild.resetBackoff();
this.pendingChild.exitIdle();
}
}
}

View File

@ -449,11 +449,15 @@ export class PickFirstLoadBalancer implements LoadBalancer {
destroy() {
this.resetSubchannelList();
if (this.currentPick !== null) {
this.currentPick.unref();
this.currentPick.removeConnectivityStateListener(
/* Unref can cause a state change, which can cause a change in the value
* of this.currentPick, so we hold a local reference to make sure that
* does not impact this function. */
const currentPick = this.currentPick;
currentPick.unref();
currentPick.removeConnectivityStateListener(
this.pickedSubchannelStateListener
);
this.channelControlHelper.removeChannelzChild(this.currentPick.getChannelzRef());
this.channelControlHelper.removeChannelzChild(currentPick.getChannelzRef());
}
}

View File

@ -108,10 +108,12 @@ export function trace(
tracer: string,
text: string
): void {
if (
!disabledTracers.has(tracer) &&
(allEnabled || enabledTracers.has(tracer))
) {
if (isTracerEnabled(tracer)) {
log(severity, new Date().toISOString() + ' | ' + tracer + ' | ' + text);
}
}
export function isTracerEnabled(tracer: string): boolean {
return !disabledTracers.has(tracer) &&
(allEnabled || enabledTracers.has(tracer));
}

View File

@ -32,6 +32,7 @@ import { SubchannelAddress, TcpSubchannelAddress } from './subchannel-address';
import { GrpcUri, uriToString, splitHostPort } from './uri-parser';
import { isIPv6, isIPv4 } from 'net';
import { ChannelOptions } from './channel-options';
import { BackoffOptions, BackoffTimeout } from './backoff-timeout';
const TRACER_NAME = 'dns_resolver';
@ -85,6 +86,8 @@ class DnsResolver implements Resolver {
private latestServiceConfigError: StatusObject | null = null;
private percentage: number;
private defaultResolutionError: StatusObject;
private backoff: BackoffTimeout;
private continueResolving = false;
constructor(
private target: GrpcUri,
private listener: ResolverListener,
@ -119,6 +122,18 @@ class DnsResolver implements Resolver {
details: `Name resolution failed for target ${uriToString(this.target)}`,
metadata: new Metadata(),
};
const backoffOptions: BackoffOptions = {
initialDelay: channelOptions['grpc.initial_reconnect_backoff_ms'],
maxDelay: channelOptions['grpc.max_reconnect_backoff_ms'],
};
this.backoff = new BackoffTimeout(() => {
if (this.continueResolving) {
this.startResolutionWithBackoff();
}
}, backoffOptions);
this.backoff.unref();
}
/**
@ -129,6 +144,7 @@ class DnsResolver implements Resolver {
if (this.ipResult !== null) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.backoff.reset();
this.listener.onSuccessfulResolution(
this.ipResult!,
null,
@ -140,6 +156,7 @@ class DnsResolver implements Resolver {
return;
}
if (this.dnsHostname === null) {
trace('Failed to parse DNS address ' + uriToString(this.target));
setImmediate(() => {
this.listener.onError({
code: Status.UNAVAILABLE,
@ -148,6 +165,7 @@ class DnsResolver implements Resolver {
});
});
} else {
trace('Looking up DNS hostname ' + this.dnsHostname);
/* We clear out latestLookupResult here to ensure that it contains the
* latest result since the last time we started resolving. That way, the
* TXT resolution handler can use it, but only if it finishes second. We
@ -164,6 +182,7 @@ class DnsResolver implements Resolver {
this.pendingLookupPromise.then(
(addressList) => {
this.pendingLookupPromise = null;
this.backoff.reset();
const ip4Addresses: dns.LookupAddress[] = addressList.filter(
(addr) => addr.family === 4
);
@ -263,10 +282,21 @@ class DnsResolver implements Resolver {
}
}
updateResolution() {
trace('Resolution update requested for target ' + uriToString(this.target));
if (this.pendingLookupPromise === null) {
private startResolutionWithBackoff() {
this.startResolution();
this.backoff.runOnce();
}
updateResolution() {
/* If there is a pending lookup, just let it finish. Otherwise, if the
* backoff timer is running, do another lookup when it ends, and if not,
* do another lookup immeidately. */
if (this.pendingLookupPromise === null) {
if (this.backoff.isRunning()) {
this.continueResolving = true;
} else {
this.startResolutionWithBackoff();
}
}
}

View File

@ -371,6 +371,13 @@ export class Server {
creds._getSettings()!
);
http2Server = http2.createSecureServer(secureServerOptions);
http2Server.on('secureConnection', (socket: TLSSocket) => {
/* These errors need to be handled by the user of Http2SecureServer,
* according to https://github.com/nodejs/node/issues/35824 */
socket.on('error', (e: Error) => {
this.trace('An incoming TLS connection closed with error: ' + e.message);
});
});
} else {
http2Server = http2.createServer(serverOptions);
}

View File

@ -42,6 +42,7 @@ import { ConnectivityStateListener } from './subchannel-interface';
const clientVersion = require('../../package.json').version;
const TRACER_NAME = 'subchannel';
const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl';
const MIN_CONNECT_TIMEOUT_MS = 20000;
const INITIAL_BACKOFF_MS = 1000;
@ -319,6 +320,14 @@ export class Subchannel {
logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private flowControlTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private internalsTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private handleBackoffTimer() {
if (this.continueConnecting) {
this.transitionToState(
@ -550,6 +559,24 @@ export class Subchannel {
(error as Error).message
);
});
if (logging.isTracerEnabled(TRACER_NAME)) {
session.on('remoteSettings', (settings: http2.Settings) => {
this.trace(
'new settings received' +
(this.session !== session ? ' on the old connection' : '') +
': ' +
JSON.stringify(settings)
);
});
session.on('localSettings', (settings: http2.Settings) => {
this.trace(
'local settings acknowledged by remote' +
(this.session !== session ? ' on the old connection' : '') +
': ' +
JSON.stringify(settings)
);
});
}
}
private startConnectingInternal() {
@ -637,9 +664,15 @@ export class Subchannel {
switch (newState) {
case ConnectivityState.READY:
this.stopBackoff();
this.session!.socket.once('close', () => {
for (const listener of this.disconnectListeners) {
listener();
const session = this.session!;
session.socket.once('close', () => {
if (this.session === session) {
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
for (const listener of this.disconnectListeners) {
listener();
}
}
});
if (this.keepaliveWithoutCalls) {
@ -819,13 +852,26 @@ export class Subchannel {
logging.trace(
LogVerbosity.DEBUG,
'call_stream',
'Starting stream on subchannel ' +
'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' +
'(' + this.channelzRef.id + ') ' +
this.subchannelAddressString +
' with headers\n' +
headersString
);
this.flowControlTrace(
'local window size: ' +
this.session!.state.localWindowSize +
' remote window size: ' +
this.session!.state.remoteWindowSize
);
const streamSession = this.session;
this.internalsTrace(
'session.closed=' +
streamSession!.closed +
' session.destroyed=' +
streamSession!.destroyed +
' session.socket.destroyed=' +
streamSession!.socket.destroyed);
let statsTracker: SubchannelCallStatsTracker;
if (this.channelzEnabled) {
this.callTracker.addCallStarted();

View File

@ -0,0 +1,73 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as assert from 'assert';
import * as path from 'path';
import * as grpc from '../src';
import { sendUnaryData, Server, ServerCredentials, ServerUnaryCall, ServiceClientConstructor, ServiceError } from "../src";
import { loadProtoFile } from './common';
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;
describe('Local subchannel pool', () => {
let server: Server;
let serverPort: number;
before(done => {
server = new Server();
server.addService(echoService.service, {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
});
server.bindAsync(
'localhost:0',
ServerCredentials.createInsecure(),
(err, port) => {
assert.ifError(err);
serverPort = port;
server.start();
done();
}
);
});
after(done => {
server.tryShutdown(done);
});
it('should complete the client lifecycle without error', done => {
const client = new echoService(
`localhost:${serverPort}`,
grpc.credentials.createInsecure(),
{'grpc.use_local_subchannel_pool': 1}
);
client.echo(
{ value: 'test value', value2: 3 },
(error: ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
client.close();
done();
}
);
});
});

View File

@ -75,6 +75,8 @@ do
# npm test calls nyc gulp test
npm test || FAILED="true"
./test/distrib/run-distrib-test.sh || FAILED="true"
done
set +ex

View File

@ -0,0 +1,22 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
const grpcJs = require('@grpc/grpc-js');
const grpcJsXds = require('@grpc/grpc-js-xds');
const protoLoader = require('@grpc/proto-loader');

View File

@ -0,0 +1,33 @@
#!/bin/bash
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
cd $(dirname $0)
base=$(pwd)
cd ../../packages/grpc-js
npm pack
cd ../grpc-js-xds
npm pack
cd ../proto-loader
npm pack
cd $base
npm install ../../packages/grpc-js/grpc-grpc-js-*.tgz
npm install ../../packages/grpc-js-xds/grpc-grpc-js-xds-*.tgz
npm install ../../packages/proto-loader/grpc-proto-loader-*.tgz
node ./distrib-test.js