Merge pull request #2067 from murgatroid99/v1.5.x_upmerge

Merge 1.5.x into master
This commit is contained in:
Michael Lumish 2022-03-25 12:53:31 -07:00 committed by GitHub
commit 86aa9b51f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 371 additions and 90 deletions

View File

@ -36,7 +36,10 @@ can be set.
- `server` - Traces high-level server events - `server` - Traces high-level server events
- `server_call` - Traces server handling of individual requests - `server_call` - Traces server handling of individual requests
- `subchannel` - Traces subchannel connectivity state and errors - `subchannel` - Traces subchannel connectivity state and errors
- `subchannel_refcount` - Traces subchannel refcount changes - `subchannel_refcount` - Traces subchannel refcount changes. Includes per-call logs.
- `subchannel_flowctrl` - Traces HTTP/2 flow control. Includes per-call logs.
- `subchannel_internals` - Traces HTTP/2 session state. Includes per-call logs.
- `channel_stacktrace` - Traces channel construction events with stack traces.
The following tracers are added by the `@grpc/grpc-js-xds` library: The following tracers are added by the `@grpc/grpc-js-xds` library:
- `cds_balancer` - Traces the CDS load balancing policy - `cds_balancer` - Traces the CDS load balancing policy

View File

@ -26,4 +26,5 @@ const client = new MyServiceClient('xds:///example.com:123');
- [xDS v3 API](https://github.com/grpc/proposal/blob/master/A30-xds-v3.md) - [xDS v3 API](https://github.com/grpc/proposal/blob/master/A30-xds-v3.md)
- [xDS Timeouts](https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md) - [xDS Timeouts](https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md)
- [xDS Circuit Breaking](https://github.com/grpc/proposal/blob/master/A32-xds-circuit-breaking.md) - [xDS Circuit Breaking](https://github.com/grpc/proposal/blob/master/A32-xds-circuit-breaking.md)
- [xDS Client-Side Fault Injection](https://github.com/grpc/proposal/blob/master/A33-Fault-Injection.md) - [xDS Client-Side Fault Injection](https://github.com/grpc/proposal/blob/master/A33-Fault-Injection.md)
- [Client Status Discovery Service](https://github.com/grpc/proposal/blob/master/A40-csds-support.md)

View File

@ -1,6 +1,6 @@
{ {
"name": "@grpc/grpc-js-xds", "name": "@grpc/grpc-js-xds",
"version": "1.4.0", "version": "1.5.2",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.", "description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js", "main": "build/src/index.js",
"scripts": { "scripts": {
@ -47,7 +47,7 @@
"re2-wasm": "^1.0.1" "re2-wasm": "^1.0.1"
}, },
"peerDependencies": { "peerDependencies": {
"@grpc/grpc-js": "~1.4.0" "@grpc/grpc-js": "~1.5.0"
}, },
"engines": { "engines": {
"node": ">=10.10.0" "node": ">=10.10.0"
@ -55,6 +55,7 @@
"files": [ "files": [
"src/**/*.ts", "src/**/*.ts",
"build/src/**/*.{js,d.ts,js.map}", "build/src/**/*.{js,d.ts,js.map}",
"deps/envoy-api/envoy/admin/v3/**/*.proto",
"deps/envoy-api/envoy/api/v2/**/*.proto", "deps/envoy-api/envoy/api/v2/**/*.proto",
"deps/envoy-api/envoy/config/**/*.proto", "deps/envoy-api/envoy/config/**/*.proto",
"deps/envoy-api/envoy/service/**/*.proto", "deps/envoy-api/envoy/service/**/*.proto",

View File

@ -56,6 +56,8 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.max_send_message_length` - `grpc.max_send_message_length`
- `grpc.max_receive_message_length` - `grpc.max_receive_message_length`
- `grpc.enable_http_proxy` - `grpc.enable_http_proxy`
- `grpc.default_compression_algorithm`
- `grpc.enable_channelz`
- `grpc-node.max_session_memory` - `grpc-node.max_session_memory`
- `channelOverride` - `channelOverride`
- `channelFactoryOverride` - `channelFactoryOverride`

View File

@ -1,6 +1,6 @@
{ {
"name": "@grpc/grpc-js", "name": "@grpc/grpc-js",
"version": "1.4.6", "version": "1.5.10",
"description": "gRPC Library for Node - pure JS implementation", "description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/", "homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -312,6 +312,13 @@ export class Http2CallStream implements Call {
const filteredStatus = this.filterStack.receiveTrailers( const filteredStatus = this.filterStack.receiveTrailers(
this.finalStatus! this.finalStatus!
); );
this.trace(
'ended with status: code=' +
filteredStatus.code +
' details="' +
filteredStatus.details +
'"'
);
this.statusWatchers.forEach(watcher => watcher(filteredStatus)); this.statusWatchers.forEach(watcher => watcher(filteredStatus));
/* We delay the actual action of bubbling up the status to insulate the /* We delay the actual action of bubbling up the status to insulate the
* cleanup code in this class from any errors that may be thrown in the * cleanup code in this class from any errors that may be thrown in the
@ -346,13 +353,6 @@ export class Http2CallStream implements Call {
/* If the status is OK and a new status comes in (e.g. from a /* If the status is OK and a new status comes in (e.g. from a
* deserialization failure), that new status takes priority */ * deserialization failure), that new status takes priority */
if (this.finalStatus === null || this.finalStatus.code === Status.OK) { if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
this.trace(
'ended with status: code=' +
status.code +
' details="' +
status.details +
'"'
);
this.finalStatus = status; this.finalStatus = status;
this.maybeOutputStatus(); this.maybeOutputStatus();
} }
@ -795,6 +795,10 @@ export class Http2CallStream implements Call {
this.filterStack.push(extraFilters); this.filterStack.push(extraFilters);
} }
getCallNumber() {
return this.callNumber;
}
startRead() { startRead() {
/* If the stream has ended with an error, we should not emit any more /* If the stream has ended with an error, we should not emit any more
* messages and we should communicate that the stream has ended */ * messages and we should communicate that the stream has ended */

View File

@ -36,6 +36,9 @@ export interface ChannelOptions {
'grpc.max_send_message_length'?: number; 'grpc.max_send_message_length'?: number;
'grpc.max_receive_message_length'?: number; 'grpc.max_receive_message_length'?: number;
'grpc.enable_http_proxy'?: number; 'grpc.enable_http_proxy'?: number;
/* http_connect_target and http_connect_creds are used for passing data
* around internally, and should not be documented as public-facing options
*/
'grpc.http_connect_target'?: string; 'grpc.http_connect_target'?: string;
'grpc.http_connect_creds'?: string; 'grpc.http_connect_creds'?: string;
'grpc.default_compression_algorithm'?: CompressionAlgorithms; 'grpc.default_compression_algorithm'?: CompressionAlgorithms;

View File

@ -336,6 +336,8 @@ export class ChannelImplementation implements Channel {
new CompressionFilterFactory(this, this.options), new CompressionFilterFactory(this, this.options),
]); ]);
this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
const error = new Error();
trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1));
} }
private getChannelzInfo(): ChannelInfo { private getChannelzInfo(): ChannelInfo {
@ -405,11 +407,16 @@ export class ChannelImplementation implements Channel {
metadata: callMetadata, metadata: callMetadata,
extraPickInfo: callConfig.pickInformation, extraPickInfo: callConfig.pickInformation,
}); });
const subchannelString = pickResult.subchannel ?
'(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
'' + pickResult.subchannel;
this.trace( this.trace(
'Pick result: ' + 'Pick result for call [' +
callStream.getCallNumber() +
']: ' +
PickResultType[pickResult.pickResultType] + PickResultType[pickResult.pickResultType] +
' subchannel: ' + ' subchannel: ' +
pickResult.subchannel?.getAddress() + subchannelString +
' status: ' + ' status: ' +
pickResult.status?.code + pickResult.status?.code +
' ' + ' ' +
@ -434,7 +441,7 @@ export class ChannelImplementation implements Channel {
log( log(
LogVerbosity.ERROR, LogVerbosity.ERROR,
'Error: COMPLETE pick result subchannel ' + 'Error: COMPLETE pick result subchannel ' +
pickResult.subchannel!.getAddress() + subchannelString +
' has state ' + ' has state ' +
ConnectivityState[pickResult.subchannel!.getConnectivityState()] ConnectivityState[pickResult.subchannel!.getConnectivityState()]
); );
@ -462,9 +469,9 @@ export class ChannelImplementation implements Channel {
callConfig.onCommitted?.(); callConfig.onCommitted?.();
pickResult.onCallStarted?.(); pickResult.onCallStarted?.();
} catch (error) { } catch (error) {
if ( const errorCode = (error as NodeJS.ErrnoException).code;
(error as NodeJS.ErrnoException).code === if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' ||
'ERR_HTTP2_GOAWAY_SESSION' errorCode === 'ERR_HTTP2_INVALID_SESSION'
) { ) {
/* An error here indicates that something went wrong with /* An error here indicates that something went wrong with
* the picked subchannel's http2 stream right before we * the picked subchannel's http2 stream right before we
@ -481,7 +488,7 @@ export class ChannelImplementation implements Channel {
* tryPick */ * tryPick */
this.trace( this.trace(
'Failed to start call on picked subchannel ' + 'Failed to start call on picked subchannel ' +
pickResult.subchannel!.getAddress() + subchannelString +
' with error ' + ' with error ' +
(error as Error).message + (error as Error).message +
'. Retrying pick', '. Retrying pick',
@ -491,7 +498,7 @@ export class ChannelImplementation implements Channel {
} else { } else {
this.trace( this.trace(
'Failed to start call on picked subchanel ' + 'Failed to start call on picked subchanel ' +
pickResult.subchannel!.getAddress() + subchannelString +
' with error ' + ' with error ' +
(error as Error).message + (error as Error).message +
'. Ending call', '. Ending call',
@ -510,7 +517,7 @@ export class ChannelImplementation implements Channel {
* block above */ * block above */
this.trace( this.trace(
'Picked subchannel ' + 'Picked subchannel ' +
pickResult.subchannel!.getAddress() + subchannelString +
' has state ' + ' has state ' +
ConnectivityState[subchannelState] + ConnectivityState[subchannelState] +
' after metadata filters. Retrying pick', ' after metadata filters. Retrying pick',

View File

@ -125,9 +125,9 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
} }
exitIdle(): void { exitIdle(): void {
if (this.currentChild) { if (this.currentChild) {
this.currentChild.resetBackoff(); this.currentChild.exitIdle();
if (this.pendingChild) { if (this.pendingChild) {
this.pendingChild.resetBackoff(); this.pendingChild.exitIdle();
} }
} }
} }

View File

@ -449,11 +449,15 @@ export class PickFirstLoadBalancer implements LoadBalancer {
destroy() { destroy() {
this.resetSubchannelList(); this.resetSubchannelList();
if (this.currentPick !== null) { if (this.currentPick !== null) {
this.currentPick.unref(); /* Unref can cause a state change, which can cause a change in the value
this.currentPick.removeConnectivityStateListener( * of this.currentPick, so we hold a local reference to make sure that
* does not impact this function. */
const currentPick = this.currentPick;
currentPick.unref();
currentPick.removeConnectivityStateListener(
this.pickedSubchannelStateListener this.pickedSubchannelStateListener
); );
this.channelControlHelper.removeChannelzChild(this.currentPick.getChannelzRef()); this.channelControlHelper.removeChannelzChild(currentPick.getChannelzRef());
} }
} }

View File

@ -108,10 +108,12 @@ export function trace(
tracer: string, tracer: string,
text: string text: string
): void { ): void {
if ( if (isTracerEnabled(tracer)) {
!disabledTracers.has(tracer) &&
(allEnabled || enabledTracers.has(tracer))
) {
log(severity, new Date().toISOString() + ' | ' + tracer + ' | ' + text); log(severity, new Date().toISOString() + ' | ' + tracer + ' | ' + text);
} }
} }
export function isTracerEnabled(tracer: string): boolean {
return !disabledTracers.has(tracer) &&
(allEnabled || enabledTracers.has(tracer));
}

View File

@ -32,6 +32,7 @@ import { SubchannelAddress, TcpSubchannelAddress } from './subchannel-address';
import { GrpcUri, uriToString, splitHostPort } from './uri-parser'; import { GrpcUri, uriToString, splitHostPort } from './uri-parser';
import { isIPv6, isIPv4 } from 'net'; import { isIPv6, isIPv4 } from 'net';
import { ChannelOptions } from './channel-options'; import { ChannelOptions } from './channel-options';
import { BackoffOptions, BackoffTimeout } from './backoff-timeout';
const TRACER_NAME = 'dns_resolver'; const TRACER_NAME = 'dns_resolver';
@ -85,6 +86,8 @@ class DnsResolver implements Resolver {
private latestServiceConfigError: StatusObject | null = null; private latestServiceConfigError: StatusObject | null = null;
private percentage: number; private percentage: number;
private defaultResolutionError: StatusObject; private defaultResolutionError: StatusObject;
private backoff: BackoffTimeout;
private continueResolving = false;
constructor( constructor(
private target: GrpcUri, private target: GrpcUri,
private listener: ResolverListener, private listener: ResolverListener,
@ -119,6 +122,18 @@ class DnsResolver implements Resolver {
details: `Name resolution failed for target ${uriToString(this.target)}`, details: `Name resolution failed for target ${uriToString(this.target)}`,
metadata: new Metadata(), metadata: new Metadata(),
}; };
const backoffOptions: BackoffOptions = {
initialDelay: channelOptions['grpc.initial_reconnect_backoff_ms'],
maxDelay: channelOptions['grpc.max_reconnect_backoff_ms'],
};
this.backoff = new BackoffTimeout(() => {
if (this.continueResolving) {
this.startResolutionWithBackoff();
}
}, backoffOptions);
this.backoff.unref();
} }
/** /**
@ -129,6 +144,7 @@ class DnsResolver implements Resolver {
if (this.ipResult !== null) { if (this.ipResult !== null) {
trace('Returning IP address for target ' + uriToString(this.target)); trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => { setImmediate(() => {
this.backoff.reset();
this.listener.onSuccessfulResolution( this.listener.onSuccessfulResolution(
this.ipResult!, this.ipResult!,
null, null,
@ -140,6 +156,7 @@ class DnsResolver implements Resolver {
return; return;
} }
if (this.dnsHostname === null) { if (this.dnsHostname === null) {
trace('Failed to parse DNS address ' + uriToString(this.target));
setImmediate(() => { setImmediate(() => {
this.listener.onError({ this.listener.onError({
code: Status.UNAVAILABLE, code: Status.UNAVAILABLE,
@ -148,6 +165,7 @@ class DnsResolver implements Resolver {
}); });
}); });
} else { } else {
trace('Looking up DNS hostname ' + this.dnsHostname);
/* We clear out latestLookupResult here to ensure that it contains the /* We clear out latestLookupResult here to ensure that it contains the
* latest result since the last time we started resolving. That way, the * latest result since the last time we started resolving. That way, the
* TXT resolution handler can use it, but only if it finishes second. We * TXT resolution handler can use it, but only if it finishes second. We
@ -164,6 +182,7 @@ class DnsResolver implements Resolver {
this.pendingLookupPromise.then( this.pendingLookupPromise.then(
(addressList) => { (addressList) => {
this.pendingLookupPromise = null; this.pendingLookupPromise = null;
this.backoff.reset();
const ip4Addresses: dns.LookupAddress[] = addressList.filter( const ip4Addresses: dns.LookupAddress[] = addressList.filter(
(addr) => addr.family === 4 (addr) => addr.family === 4
); );
@ -263,10 +282,21 @@ class DnsResolver implements Resolver {
} }
} }
updateResolution() { private startResolutionWithBackoff() {
trace('Resolution update requested for target ' + uriToString(this.target));
if (this.pendingLookupPromise === null) {
this.startResolution(); this.startResolution();
this.backoff.runOnce();
}
updateResolution() {
/* If there is a pending lookup, just let it finish. Otherwise, if the
* backoff timer is running, do another lookup when it ends, and if not,
* do another lookup immeidately. */
if (this.pendingLookupPromise === null) {
if (this.backoff.isRunning()) {
this.continueResolving = true;
} else {
this.startResolutionWithBackoff();
}
} }
} }

View File

@ -371,6 +371,13 @@ export class Server {
creds._getSettings()! creds._getSettings()!
); );
http2Server = http2.createSecureServer(secureServerOptions); http2Server = http2.createSecureServer(secureServerOptions);
http2Server.on('secureConnection', (socket: TLSSocket) => {
/* These errors need to be handled by the user of Http2SecureServer,
* according to https://github.com/nodejs/node/issues/35824 */
socket.on('error', (e: Error) => {
this.trace('An incoming TLS connection closed with error: ' + e.message);
});
});
} else { } else {
http2Server = http2.createServer(serverOptions); http2Server = http2.createServer(serverOptions);
} }
@ -423,27 +430,36 @@ export class Server {
port: boundAddress.port port: boundAddress.port
} }
} }
const channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => { let channelzRef: SocketRef;
return { if (this.channelzEnabled) {
localAddress: boundSubchannelAddress, channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
remoteAddress: null, return {
security: null, localAddress: boundSubchannelAddress,
remoteName: null, remoteAddress: null,
streamsStarted: 0, security: null,
streamsSucceeded: 0, remoteName: null,
streamsFailed: 0, streamsStarted: 0,
messagesSent: 0, streamsSucceeded: 0,
messagesReceived: 0, streamsFailed: 0,
keepAlivesSent: 0, messagesSent: 0,
lastLocalStreamCreatedTimestamp: null, messagesReceived: 0,
lastRemoteStreamCreatedTimestamp: null, keepAlivesSent: 0,
lastMessageSentTimestamp: null, lastLocalStreamCreatedTimestamp: null,
lastMessageReceivedTimestamp: null, lastRemoteStreamCreatedTimestamp: null,
localFlowControlWindow: null, lastMessageSentTimestamp: null,
remoteFlowControlWindow: null lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null
};
});
this.listenerChildrenTracker.refChild(channelzRef);
} else {
channelzRef = {
kind: 'socket',
id: -1,
name: ''
}; };
}); }
this.listenerChildrenTracker.refChild(channelzRef);
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef}); this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress)); this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum); resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
@ -492,27 +508,36 @@ export class Server {
host: boundAddress.address, host: boundAddress.address,
port: boundAddress.port port: boundAddress.port
}; };
const channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => { let channelzRef: SocketRef;
return { if (this.channelzEnabled) {
localAddress: boundSubchannelAddress, channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
remoteAddress: null, return {
security: null, localAddress: boundSubchannelAddress,
remoteName: null, remoteAddress: null,
streamsStarted: 0, security: null,
streamsSucceeded: 0, remoteName: null,
streamsFailed: 0, streamsStarted: 0,
messagesSent: 0, streamsSucceeded: 0,
messagesReceived: 0, streamsFailed: 0,
keepAlivesSent: 0, messagesSent: 0,
lastLocalStreamCreatedTimestamp: null, messagesReceived: 0,
lastRemoteStreamCreatedTimestamp: null, keepAlivesSent: 0,
lastMessageSentTimestamp: null, lastLocalStreamCreatedTimestamp: null,
lastMessageReceivedTimestamp: null, lastRemoteStreamCreatedTimestamp: null,
localFlowControlWindow: null, lastMessageSentTimestamp: null,
remoteFlowControlWindow: null lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null
};
});
this.listenerChildrenTracker.refChild(channelzRef);
} else {
channelzRef = {
kind: 'socket',
id: -1,
name: ''
}; };
}); }
this.listenerChildrenTracker.refChild(channelzRef);
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef}); this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress)); this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
resolve( resolve(
@ -592,8 +617,10 @@ export class Server {
for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) { for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) {
if (http2Server.listening) { if (http2Server.listening) {
http2Server.close(() => { http2Server.close(() => {
this.listenerChildrenTracker.unrefChild(ref); if (this.channelzEnabled) {
unregisterChannelzRef(ref); this.listenerChildrenTracker.unrefChild(ref);
unregisterChannelzRef(ref);
}
}); });
} }
} }
@ -609,7 +636,9 @@ export class Server {
session.destroy(http2.constants.NGHTTP2_CANCEL as any); session.destroy(http2.constants.NGHTTP2_CANCEL as any);
}); });
this.sessions.clear(); this.sessions.clear();
unregisterChannelzRef(this.channelzRef); if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
} }
register<RequestType, ResponseType>( register<RequestType, ResponseType>(
@ -658,7 +687,9 @@ export class Server {
tryShutdown(callback: (error?: Error) => void): void { tryShutdown(callback: (error?: Error) => void): void {
const wrappedCallback = (error?: Error) => { const wrappedCallback = (error?: Error) => {
unregisterChannelzRef(this.channelzRef); if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
callback(error); callback(error);
}; };
let pendingChecks = 0; let pendingChecks = 0;
@ -678,8 +709,10 @@ export class Server {
if (http2Server.listening) { if (http2Server.listening) {
pendingChecks++; pendingChecks++;
http2Server.close(() => { http2Server.close(() => {
this.listenerChildrenTracker.unrefChild(ref); if (this.channelzEnabled) {
unregisterChannelzRef(ref); this.listenerChildrenTracker.unrefChild(ref);
unregisterChannelzRef(ref);
}
maybeCallback(); maybeCallback();
}); });
} }
@ -720,8 +753,10 @@ export class Server {
'stream', 'stream',
(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => { (stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => {
const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session); const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session);
this.callTracker.addCallStarted(); if (this.channelzEnabled) {
channelzSessionInfo?.streamTracker.addCallStarted(); this.callTracker.addCallStarted();
channelzSessionInfo?.streamTracker.addCallStarted();
}
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
if ( if (
@ -736,7 +771,9 @@ export class Server {
{ endStream: true } { endStream: true }
); );
this.callTracker.addCallFailed(); this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed(); if (this.channelzEnabled) {
channelzSessionInfo?.streamTracker.addCallFailed();
}
return; return;
} }
@ -779,7 +816,7 @@ export class Server {
this.callTracker.addCallFailed(); this.callTracker.addCallFailed();
} }
}); });
if (channelzSessionInfo) { if (this.channelzEnabled && channelzSessionInfo) {
call.once('streamEnd', (success: boolean) => { call.once('streamEnd', (success: boolean) => {
if (success) { if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded(); channelzSessionInfo.streamTracker.addCallSucceeded();
@ -834,8 +871,10 @@ export class Server {
} catch (err) { } catch (err) {
if (!call) { if (!call) {
call = new Http2ServerCallStream(stream, null!, this.options); call = new Http2ServerCallStream(stream, null!, this.options);
this.callTracker.addCallFailed(); if (this.channelzEnabled) {
channelzSessionInfo?.streamTracker.addCallFailed() this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed()
}
} }
if (err.code === undefined) { if (err.code === undefined) {
@ -853,7 +892,16 @@ export class Server {
return; return;
} }
const channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session)); let channelzRef: SocketRef;
if (this.channelzEnabled) {
channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session));
} else {
channelzRef = {
kind: 'socket',
id: -1,
name: ''
}
}
const channelzSessionInfo: ChannelzSessionInfo = { const channelzSessionInfo: ChannelzSessionInfo = {
ref: channelzRef, ref: channelzRef,

View File

@ -42,6 +42,7 @@ import { ConnectivityStateListener } from './subchannel-interface';
const clientVersion = require('../../package.json').version; const clientVersion = require('../../package.json').version;
const TRACER_NAME = 'subchannel'; const TRACER_NAME = 'subchannel';
const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl';
const MIN_CONNECT_TIMEOUT_MS = 20000; const MIN_CONNECT_TIMEOUT_MS = 20000;
const INITIAL_BACKOFF_MS = 1000; const INITIAL_BACKOFF_MS = 1000;
@ -319,6 +320,14 @@ export class Subchannel {
logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
} }
private flowControlTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private internalsTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private handleBackoffTimer() { private handleBackoffTimer() {
if (this.continueConnecting) { if (this.continueConnecting) {
this.transitionToState( this.transitionToState(
@ -550,6 +559,24 @@ export class Subchannel {
(error as Error).message (error as Error).message
); );
}); });
if (logging.isTracerEnabled(TRACER_NAME)) {
session.on('remoteSettings', (settings: http2.Settings) => {
this.trace(
'new settings received' +
(this.session !== session ? ' on the old connection' : '') +
': ' +
JSON.stringify(settings)
);
});
session.on('localSettings', (settings: http2.Settings) => {
this.trace(
'local settings acknowledged by remote' +
(this.session !== session ? ' on the old connection' : '') +
': ' +
JSON.stringify(settings)
);
});
}
} }
private startConnectingInternal() { private startConnectingInternal() {
@ -637,9 +664,15 @@ export class Subchannel {
switch (newState) { switch (newState) {
case ConnectivityState.READY: case ConnectivityState.READY:
this.stopBackoff(); this.stopBackoff();
this.session!.socket.once('close', () => { const session = this.session!;
for (const listener of this.disconnectListeners) { session.socket.once('close', () => {
listener(); if (this.session === session) {
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
for (const listener of this.disconnectListeners) {
listener();
}
} }
}); });
if (this.keepaliveWithoutCalls) { if (this.keepaliveWithoutCalls) {
@ -819,13 +852,26 @@ export class Subchannel {
logging.trace( logging.trace(
LogVerbosity.DEBUG, LogVerbosity.DEBUG,
'call_stream', 'call_stream',
'Starting stream on subchannel ' + 'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' +
'(' + this.channelzRef.id + ') ' + '(' + this.channelzRef.id + ') ' +
this.subchannelAddressString + this.subchannelAddressString +
' with headers\n' + ' with headers\n' +
headersString headersString
); );
this.flowControlTrace(
'local window size: ' +
this.session!.state.localWindowSize +
' remote window size: ' +
this.session!.state.remoteWindowSize
);
const streamSession = this.session; const streamSession = this.session;
this.internalsTrace(
'session.closed=' +
streamSession!.closed +
' session.destroyed=' +
streamSession!.destroyed +
' session.socket.destroyed=' +
streamSession!.socket.destroyed);
let statsTracker: SubchannelCallStatsTracker; let statsTracker: SubchannelCallStatsTracker;
if (this.channelzEnabled) { if (this.channelzEnabled) {
this.callTracker.addCallStarted(); this.callTracker.addCallStarted();

View File

@ -0,0 +1,73 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as assert from 'assert';
import * as path from 'path';
import * as grpc from '../src';
import { sendUnaryData, Server, ServerCredentials, ServerUnaryCall, ServiceClientConstructor, ServiceError } from "../src";
import { loadProtoFile } from './common';
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;
describe('Local subchannel pool', () => {
let server: Server;
let serverPort: number;
before(done => {
server = new Server();
server.addService(echoService.service, {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
});
server.bindAsync(
'localhost:0',
ServerCredentials.createInsecure(),
(err, port) => {
assert.ifError(err);
serverPort = port;
server.start();
done();
}
);
});
after(done => {
server.tryShutdown(done);
});
it('should complete the client lifecycle without error', done => {
const client = new echoService(
`localhost:${serverPort}`,
grpc.credentials.createInsecure(),
{'grpc.use_local_subchannel_pool': 1}
);
client.echo(
{ value: 'test value', value2: 3 },
(error: ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
client.close();
done();
}
);
});
});

View File

@ -75,6 +75,8 @@ do
# npm test calls nyc gulp test # npm test calls nyc gulp test
npm test || FAILED="true" npm test || FAILED="true"
./test/distrib/run-distrib-test.sh || FAILED="true"
done done
set +ex set +ex

View File

@ -0,0 +1,22 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
const grpcJs = require('@grpc/grpc-js');
const grpcJsXds = require('@grpc/grpc-js-xds');
const protoLoader = require('@grpc/proto-loader');

View File

@ -0,0 +1,33 @@
#!/bin/bash
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
cd $(dirname $0)
base=$(pwd)
cd ../../packages/grpc-js
npm pack
cd ../grpc-js-xds
npm pack
cd ../proto-loader
npm pack
cd $base
npm install ../../packages/grpc-js/grpc-grpc-js-*.tgz
npm install ../../packages/grpc-js-xds/grpc-grpc-js-xds-*.tgz
npm install ../../packages/proto-loader/grpc-proto-loader-*.tgz
node ./distrib-test.js