Merge pull request #2211 from murgatroid99/merge_1.6.x

Merge the 1.6.x branch into master
This commit is contained in:
Michael Lumish 2022-08-29 15:30:29 -07:00 committed by GitHub
commit 3f7102084a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 16 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@grpc/grpc-js", "name": "@grpc/grpc-js",
"version": "1.6.9", "version": "1.6.11",
"description": "gRPC Library for Node - pure JS implementation", "description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/", "homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -329,6 +329,11 @@ export class Http2CallStream implements Call {
process.nextTick(() => { process.nextTick(() => {
this.listener?.onReceiveStatus(filteredStatus); this.listener?.onReceiveStatus(filteredStatus);
}); });
/* Leave the http2 stream in flowing state to drain incoming messages, to
* ensure that the stream closure completes. The call stream already does
* not push more messages after the status is output, so the messages go
* nowhere either way. */
this.http2Stream?.resume();
if (this.subchannel) { if (this.subchannel) {
this.subchannel.callUnref(); this.subchannel.callUnref();
this.subchannel.removeDisconnectListener(this.disconnectListener); this.subchannel.removeDisconnectListener(this.disconnectListener);
@ -483,7 +488,11 @@ export class Http2CallStream implements Call {
} }
let details = ''; let details = '';
if (typeof metadataMap['grpc-message'] === 'string') { if (typeof metadataMap['grpc-message'] === 'string') {
details = decodeURI(metadataMap['grpc-message']); try {
details = decodeURI(metadataMap['grpc-message']);
} catch (e) {
details = metadataMap['grpc-messages'] as string;
}
metadata.remove('grpc-message'); metadata.remove('grpc-message');
this.trace( this.trace(
'received status details string "' + details + '" from server' 'received status details string "' + details + '" from server'
@ -573,8 +582,15 @@ export class Http2CallStream implements Call {
} }
} }
}); });
stream.on('trailers', this.handleTrailers.bind(this)); stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
this.handleTrailers(headers);
});
stream.on('data', (data: Buffer) => { stream.on('data', (data: Buffer) => {
/* If the status has already been output, allow the http2 stream to
* drain without processing the data. */
if (this.statusOutput) {
return;
}
this.trace('receive HTTP/2 data frame of length ' + data.length); this.trace('receive HTTP/2 data frame of length ' + data.length);
const messages = this.decoder.write(data); const messages = this.decoder.write(data);
@ -686,9 +702,6 @@ export class Http2CallStream implements Call {
} }
this.streamEndWatchers.forEach(watcher => watcher(false)); this.streamEndWatchers.forEach(watcher => watcher(false));
}); });
if (!this.pendingRead) {
stream.pause();
}
if (this.pendingWrite) { if (this.pendingWrite) {
if (!this.pendingWriteCallback) { if (!this.pendingWriteCallback) {
throw new Error('Invalid state in write handling code'); throw new Error('Invalid state in write handling code');

View File

@ -467,7 +467,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
// Step 3 // Step 3
for (const [address, mapEntry] of this.addressMap.entries()) { for (const [address, mapEntry] of this.addressMap.entries()) {
// Step 3.i // Step 3.i
if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) { if (this.getCurrentEjectionPercent() >= this.latestConfig.getMaxEjectionPercent()) {
break; break;
} }
// Step 3.ii // Step 3.ii
@ -500,14 +500,22 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
} }
trace('Running failure percentage check. threshold=' + failurePercentageConfig.threshold + ' request volume threshold=' + failurePercentageConfig.request_volume); trace('Running failure percentage check. threshold=' + failurePercentageConfig.threshold + ' request volume threshold=' + failurePercentageConfig.request_volume);
// Step 1 // Step 1
if (this.addressMap.size < failurePercentageConfig.minimum_hosts) { let addressesWithTargetVolume = 0;
for (const mapEntry of this.addressMap.values()) {
const successes = mapEntry.counter.getLastSuccesses();
const failures = mapEntry.counter.getLastFailures();
if (successes + failures >= failurePercentageConfig.request_volume) {
addressesWithTargetVolume += 1;
}
}
if (addressesWithTargetVolume < failurePercentageConfig.minimum_hosts) {
return; return;
} }
// Step 2 // Step 2
for (const [address, mapEntry] of this.addressMap.entries()) { for (const [address, mapEntry] of this.addressMap.entries()) {
// Step 2.i // Step 2.i
if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) { if (this.getCurrentEjectionPercent() >= this.latestConfig.getMaxEjectionPercent()) {
break; break;
} }
// Step 2.ii // Step 2.ii

View File

@ -361,12 +361,21 @@ export class Subchannel {
this.handleDisconnect(); this.handleDisconnect();
}, this.keepaliveTimeoutMs); }, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.(); this.keepaliveTimeoutId.unref?.();
this.session!.ping( try {
(err: Error | null, duration: number, payload: Buffer) => { this.session!.ping(
this.keepaliveTrace('Received ping response'); (err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(this.keepaliveTimeoutId); this.keepaliveTrace('Received ping response');
} clearTimeout(this.keepaliveTimeoutId);
); }
);
} catch (e) {
/* If we fail to send a ping, the connection is no longer functional, so
* we should discard it. */
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
);
}
} }
private startKeepalivePings() { private startKeepalivePings() {

View File

@ -6,7 +6,8 @@
"target": "es2017", "target": "es2017",
"module": "commonjs", "module": "commonjs",
"resolveJsonModule": true, "resolveJsonModule": true,
"incremental": true "incremental": true,
"types": ["mocha"]
}, },
"include": [ "include": [
"src/**/*.ts", "src/**/*.ts",