From 9aeca2f01af3132c904f2098abc77839c45f6dbd Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 6 Jun 2019 09:41:44 -0700 Subject: [PATCH 1/2] Pure JS: Fixed two bugs with goaway handling --- packages/grpc-js/src/call-stream.ts | 12 ++-- packages/grpc-js/src/subchannel.ts | 4 ++ test/api/connectivity_test.js | 99 +++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 7 deletions(-) create mode 100644 test/api/connectivity_test.js diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 6ddd4414..5cbc8e34 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -298,10 +298,10 @@ export class Http2CallStream extends Duplex implements Call { stream.on('end', () => { this.tryPush(null); }); - stream.on('close', async errorCode => { + stream.on('close', async () => { let code: Status; let details = ''; - switch (errorCode) { + switch (stream.rstCode) { case http2.constants.NGHTTP2_REFUSED_STREAM: code = Status.UNAVAILABLE; break; @@ -329,11 +329,9 @@ export class Http2CallStream extends Duplex implements Call { this.endCall({ code, details, metadata: new Metadata() }); }); stream.on('error', (err: Error) => { - this.endCall({ - code: Status.INTERNAL, - details: 'Internal HTTP2 error', - metadata: new Metadata(), - }); + /* We need an error handler here to stop "Uncaught Error" exceptions + * from bubbling up. However, errors here should all correspond to + * "close" events, where we will handle the error more granularly */ }); if (!this.pendingRead) { stream.pause(); diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 80abf781..ac681d9f 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -78,6 +78,10 @@ export class Http2SubChannel extends EventEmitter implements SubChannel { this.stopKeepalivePings(); this.emit('close'); }); + this.session.on('goaway', () => { + this.stopKeepalivePings(); + this.emit('close'); + }); this.userAgent = userAgent; if (channelArgs['grpc.keepalive_time_ms']) { diff --git a/test/api/connectivity_test.js b/test/api/connectivity_test.js new file mode 100644 index 00000000..eb94e412 --- /dev/null +++ b/test/api/connectivity_test.js @@ -0,0 +1,99 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +const options = { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true +}; +const path = require('path'); +const fs = require('fs'); +const assert = require('assert'); +const _ = require('lodash'); +const anyGrpc = require('../any_grpc'); +const clientGrpc = anyGrpc.client; +const serverGrpc = anyGrpc.server; +const protoLoader = require('../../packages/proto-loader', options); +const testServiceDef = protoLoader.loadSync(__dirname + '/../proto/test_service.proto'); +const TestService = serverGrpc.loadPackageDefinition(testServiceDef).TestService.service; +const TestServiceClient = clientGrpc.loadPackageDefinition(testServiceDef).TestService; + +const clientCreds = clientGrpc.credentials.createInsecure(); +const serverCreds = serverGrpc.ServerCredentials.createInsecure(); + +const serviceImpl = { + unary: function(call, cb) { + cb(null, {}); + }, + clientStream: function(stream, cb){ + stream.on('data', function(data) {}); + stream.on('end', function() { + cb(null, {}); + }); + }, + serverStream: function(stream) { + stream.end(); + }, + bidiStream: function(stream) { + stream.on('data', function(data) {}); + stream.on('end', function() { + stream.end(); + }); + } +}; + +describe('Reconnection', function() { + let server1; + let server2; + let port; + before(function() { + server1 = new serverGrpc.Server(); + server1.addService(TestService, serviceImpl); + server2 = new serverGrpc.Server(); + server2.addService(TestService, serviceImpl); + port = server1.bind('localhost:0', serverCreds); + server1.start(); + client = new TestServiceClient(`localhost:${port}`, clientCreds); + }); + after(function() { + server1.forceShutdown(); + server2.forceShutdown(); + }); + it('Should end with either OK or UNAVAILABLE when querying a server that is shutting down', function(done) { + client.unary({}, (err, data) => { + assert.ifError(err); + server1.tryShutdown(() => { + server2.bind(`localhost:${port}`, serverCreds); + server2.start(); + client.unary({}, (err, data) => { + assert.ifError(err); + clearInterval(callInterval); + done(); + }); + }); + let callInterval = setInterval(() => { + client.unary({}, (err, data) => { + if (err) { + assert.strictEqual(err.code, clientGrpc.status.UNAVAILABLE); + } + }); + }, 0); + }); + }); +}); \ No newline at end of file From 1ee218c8bd67a7961ec03d3101d5f4b1dcfa658d Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 6 Jun 2019 10:38:28 -0700 Subject: [PATCH 2/2] Fix tests for fixed code, also fix another issue --- packages/grpc-js/src/call-stream.ts | 11 ++++++++++- packages/grpc-js/test/test-call-stream.ts | 3 ++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 5cbc8e34..35b957b3 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -130,7 +130,16 @@ export class Http2CallStream extends Duplex implements Call { private endCall(status: StatusObject): void { if (this.finalStatus === null) { this.finalStatus = status; - this.emit('status', status); + /* We do this asynchronously to ensure that no async function is in the + * call stack when we return control to the application. If an async + * function is in the call stack, any exception thrown by the application + * (or our tests) will bubble up and turn into promise rejection, which + * will result in an UnhandledPromiseRejectionWarning. Because that is + * a warning, the error will be effectively swallowed and execution will + * continue */ + process.nextTick(() => { + this.emit('status', status); + }); } } diff --git a/packages/grpc-js/test/test-call-stream.ts b/packages/grpc-js/test/test-call-stream.ts index a5f7908b..7903de76 100644 --- a/packages/grpc-js/test/test-call-stream.ts +++ b/packages/grpc-js/test/test-call-stream.ts @@ -196,7 +196,8 @@ describe('CallStream', () => { reject(e); } }); - http2Stream.emit('close', Number(key)); + http2Stream.rstCode = Number(key); + http2Stream.emit('close'); }); }); });