From 992e2dcc2980b72794be3735e6c35fbf9a4781ac Mon Sep 17 00:00:00 2001 From: Sigurd Meldgaard Date: Mon, 19 Aug 2019 15:31:16 +0200 Subject: [PATCH] Improve connection handling (#231) * Improve connection handling * Address review. Add round-trip-test --- CHANGELOG.md | 9 ++ lib/src/client/http2_connection.dart | 80 +++++++++++---- lib/src/client/options.dart | 22 +++-- lib/src/server/server.dart | 8 +- pubspec.yaml | 2 +- test/client_handles_bad_connections_test.dart | 98 +++++++++++++++++++ test/data/localhost.crt | 18 ++++ test/data/localhost.key | 28 ++++++ test/round_trip_test.dart | 84 ++++++++++++++++ test/src/client_utils.dart | 2 + 10 files changed, 322 insertions(+), 29 deletions(-) create mode 100644 test/client_handles_bad_connections_test.dart create mode 100644 test/data/localhost.crt create mode 100644 test/data/localhost.key create mode 100644 test/round_trip_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 36a4186..11313c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## 2.1.0 + +* Do a health check of the http2-connection before making request. +* Introduce `ChannelOptions.connectionLimit` the longest time a single connection is used for new + requests. +* Use Tcp.nodelay to improve client call speed. +* Use SecureSocket supportedProtocols to save a round trip when establishing a secure connection. +* Allow passing http2 `ServerSettings` to `Server.serve`. + ## 2.0.3 * GrpcError now implements Exception to indicate it can be reasonably handled. diff --git a/lib/src/client/http2_connection.dart b/lib/src/client/http2_connection.dart index 1698555..303f823 100644 --- a/lib/src/client/http2_connection.dart +++ b/lib/src/client/http2_connection.dart @@ -53,6 +53,10 @@ class Http2ClientConnection implements connection.ClientConnection { /// Used for idle and reconnect timeout, depending on [_state]. Timer _timer; + + /// Used for making sure a single connection is not kept alive too long. + final Stopwatch _connectionLifeTimer = Stopwatch(); + Duration _currentReconnectDelay; final String host; @@ -69,25 +73,38 @@ class Http2ClientConnection implements connection.ClientConnection { ConnectionState get state => _state; + static const _estimatedRoundTripTime = const Duration(milliseconds: 20); + Future connectTransport() async { - var socket = await Socket.connect(host, port); + final securityContext = credentials.securityContext; + Socket socket; + if (securityContext == null) { + socket = await Socket.connect(host, port); + } else { + socket = await SecureSocket.connect(host, port, + supportedProtocols: ['h2'], + context: securityContext, + onBadCertificate: _validateBadCertificate); + if ((socket as SecureSocket).selectedProtocol != 'h2') { + socket.destroy(); + throw (TransportException( + 'Endpoint $host:$port does not support http/2 via ALPN')); + } + } + + final connection = ClientTransportConnection.viaSocket(socket); + socket.done.then((_) => _abandonConnection()); + + // Give the settings settings-frame a bit of time to arrive. + // TODO(sigurdm): This is a hack. The http2 package should expose a way of + // waiting for the settings frame to arrive. + await new Future.delayed(_estimatedRoundTripTime); + if (_state == ConnectionState.shutdown) { socket.destroy(); throw _ShutdownException(); } - final securityContext = credentials.securityContext; - if (securityContext != null) { - socket = await SecureSocket.secure(socket, - host: authority, - context: securityContext, - onBadCertificate: _validateBadCertificate); - if (_state == ConnectionState.shutdown) { - socket.destroy(); - throw _ShutdownException(); - } - } - socket.done.then((_) => _handleSocketClosed()); - return ClientTransportConnection.viaSocket(socket); + return connection; } void _connect() { @@ -96,17 +113,44 @@ class Http2ClientConnection implements connection.ClientConnection { return; } _setState(ConnectionState.connecting); - connectTransport().then((transport) { + connectTransport().then((transport) async { _currentReconnectDelay = null; _transportConnection = transport; + _connectionLifeTimer + ..reset() + ..start(); transport.onActiveStateChanged = _handleActiveStateChanged; _setState(ConnectionState.ready); - _pendingCalls.forEach(_startCall); - _pendingCalls.clear(); + + if (_hasPendingCalls()) { + // Take all pending calls out, and reschedule. + final pendingCalls = _pendingCalls.toList(); + _pendingCalls.clear(); + pendingCalls.forEach(dispatchCall); + } }).catchError(_handleConnectionFailure); } + /// Abandons the current connection if it is unhealthy or has been open for + /// too long. + /// + /// Assumes [_transportConnection] is not `null`. + void _refreshConnectionIfUnhealthy() { + final bool isHealthy = _transportConnection.isOpen; + final bool shouldRefresh = + _connectionLifeTimer.elapsed > options.connectionTimeout; + if (shouldRefresh) { + _transportConnection.finish(); + } + if (!isHealthy || shouldRefresh) { + _abandonConnection(); + } + } + void dispatchCall(ClientCall call) { + if (_transportConnection != null) { + _refreshConnectionIfUnhealthy(); + } switch (_state) { case ConnectionState.ready: _startCall(call); @@ -219,7 +263,7 @@ class Http2ClientConnection implements connection.ClientConnection { _connect(); } - void _handleSocketClosed() { + void _abandonConnection() { _cancelTimer(); _transportConnection = null; diff --git a/lib/src/client/options.dart b/lib/src/client/options.dart index f9a5413..2700ee7 100644 --- a/lib/src/client/options.dart +++ b/lib/src/client/options.dart @@ -17,6 +17,11 @@ import 'dart:math'; import 'transport/http2_credentials.dart'; const defaultIdleTimeout = Duration(minutes: 5); + +/// It seems like Google's gRPC endpoints will forcefully close the +/// connection after precisely 1 hour. So we *proactively* refresh our +/// connection after 50 minutes. This will avoid one failed RPC call. +const defaultConnectionTimeOut = Duration(minutes: 50); const defaultUserAgent = 'dart-grpc/2.0.0'; typedef Duration BackoffStrategy(Duration lastBackoff); @@ -39,16 +44,17 @@ Duration defaultBackoffStrategy(Duration lastBackoff) { class ChannelOptions { final ChannelCredentials credentials; final Duration idleTimeout; + + /// The maximum time a single connection will be used for new requests. + final Duration connectionTimeout; final BackoffStrategy backoffStrategy; final String userAgent; const ChannelOptions({ - ChannelCredentials credentials, - Duration idleTimeout, - String userAgent, - BackoffStrategy backoffStrategy, - }) : this.credentials = credentials ?? const ChannelCredentials.secure(), - this.idleTimeout = idleTimeout ?? defaultIdleTimeout, - this.userAgent = userAgent ?? defaultUserAgent, - this.backoffStrategy = backoffStrategy ?? defaultBackoffStrategy; + this.credentials = const ChannelCredentials.secure(), + this.idleTimeout = defaultIdleTimeout, + this.userAgent = defaultUserAgent, + this.backoffStrategy = defaultBackoffStrategy, + this.connectionTimeout = defaultConnectionTimeOut, + }); } diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index dccb28d..bc63077 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -85,7 +85,10 @@ class Server { Service lookupService(String service) => _services[service]; Future serve( - {dynamic address, int port, ServerTlsCredentials security}) async { + {dynamic address, + int port, + ServerTlsCredentials security, + ServerSettings http2ServerSettings}) async { // TODO(dart-lang/grpc-dart#9): Handle HTTP/1.1 upgrade to h2c, if allowed. Stream server; if (security != null) { @@ -100,7 +103,8 @@ class Server { server = _insecureServer; } server.listen((socket) { - final connection = ServerTransportConnection.viaSocket(socket); + final connection = ServerTransportConnection.viaSocket(socket, + settings: http2ServerSettings); _connections.add(connection); ServerHandler_ handler; // TODO(jakobr): Set active state handlers, close connection after idle diff --git a/pubspec.yaml b/pubspec.yaml index d597835..eda2b10 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: grpc description: Dart implementation of gRPC, a high performance, open-source universal RPC framework. -version: 2.0.3 +version: 2.1.0 author: Dart Team homepage: https://github.com/dart-lang/grpc-dart diff --git a/test/client_handles_bad_connections_test.dart b/test/client_handles_bad_connections_test.dart new file mode 100644 index 0000000..a6d104b --- /dev/null +++ b/test/client_handles_bad_connections_test.dart @@ -0,0 +1,98 @@ +@TestOn('vm') +import 'dart:async'; +import 'package:grpc/grpc.dart' as grpc; +import 'package:grpc/service_api.dart'; +import 'package:grpc/src/client/channel.dart'; +import 'package:grpc/src/client/connection.dart'; +import 'package:grpc/src/client/http2_connection.dart'; +import 'package:http2/http2.dart'; +import 'package:test/test.dart'; + +class TestClient extends grpc.Client { + static final _$stream = grpc.ClientMethod( + '/test.TestService/stream', + (int value) => [value], + (List value) => value[0]); + + TestClient(ClientChannel channel) : super(channel); + grpc.ResponseStream stream(int request, {grpc.CallOptions options}) { + final call = + $createCall(_$stream, Stream.fromIterable([request]), options: options); + return grpc.ResponseStream(call); + } +} + +class TestService extends grpc.Service { + String get $name => 'test.TestService'; + + TestService() { + $addMethod(grpc.ServiceMethod('stream', stream, false, true, + (List value) => value[0], (int value) => [value])); + } + + Stream stream(grpc.ServiceCall call, Future request) async* { + yield 1; + yield 2; + yield 3; + } +} + +class FixedConnectionClientChannel extends ClientChannelBase { + final Http2ClientConnection clientConnection; + List states = []; + FixedConnectionClientChannel(this.clientConnection) { + clientConnection.onStateChanged = (c) => states.add(c.state); + } + @override + ClientConnection createConnection() => clientConnection; +} + +main() async { + test('client reconnects after the connection gets old', () async { + final grpc.Server server = grpc.Server([TestService()]); + await server.serve(port: 0); + + final channel = FixedConnectionClientChannel(Http2ClientConnection( + 'localhost', + server.port, + grpc.ChannelOptions( + idleTimeout: Duration(minutes: 1), + // Short delay to test that it will time out. + connectionTimeout: Duration(milliseconds: 100), + credentials: grpc.ChannelCredentials.insecure(), + ), + )); + final testClient = TestClient(channel); + + expect(await testClient.stream(1).toList(), [1, 2, 3]); + await Future.delayed(Duration(milliseconds: 200)); + expect(await testClient.stream(1).toList(), [1, 2, 3]); + expect( + channel.states.where((x) => x == grpc.ConnectionState.ready).length, 2); + server.shutdown(); + }); + + test('client reconnects when stream limit is used', () async { + final grpc.Server server = grpc.Server([TestService()]); + await server.serve( + port: 0, http2ServerSettings: ServerSettings(concurrentStreamLimit: 2)); + + final channel = FixedConnectionClientChannel(Http2ClientConnection( + 'localhost', + server.port, + grpc.ChannelOptions(credentials: grpc.ChannelCredentials.insecure()))); + final states = []; + channel.clientConnection.onStateChanged = + (Http2ClientConnection connection) => states.add(connection.state); + final testClient = TestClient(channel); + + await Future.wait([ + expectLater(testClient.stream(1).toList(), completion([1, 2, 3])), + expectLater(testClient.stream(1).toList(), completion([1, 2, 3])), + expectLater(testClient.stream(1).toList(), completion([1, 2, 3])), + expectLater(testClient.stream(1).toList(), completion([1, 2, 3])), + ]); + expect(states.where((x) => x == grpc.ConnectionState.ready).length, 2); + server.shutdown(); + }); +} diff --git a/test/data/localhost.crt b/test/data/localhost.crt new file mode 100644 index 0000000..7e7b55f --- /dev/null +++ b/test/data/localhost.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC8DCCAdigAwIBAgIULb6gVjdq3rIlwftMU+sIJPwdnvowDQYJKoZIhvcNAQEL +BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE5MDgxOTEwNDEwN1oXDTE5MDkx +ODEwNDEwN1owFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEAsUyU0FsJ49FotYbJUxNMXmXN7G6RpMDRBSsiap3xPbeF +LOdfSe+haK65GkYjDJeI5drWnfs9HiN5UVtUAsNKf4om9gNjdPi23+a0IKJD2d3l +45uuKAwjo+LDmIP7FCP53L3JHjCvgVo5pm+VwshUQcR/nFWc/oVgUBEhTPYOjiFm +qkAQQj87cauxEhRZ1irgwuA+ysdExkDX27BWXXQQc8rnLFOhJ4mey/M7+RoVxJQI +75eluhS9xmv67pi2HQzOEmvn/+snQ8uzMl8CqKt3MYn7pj4icW/cwApwUaGFEKui +tZA8G5zbKxHc0dgYeyNajTOlO+dTz4iQbQlROok32wIDAQABozowODAUBgNVHREE +DTALgglsb2NhbGhvc3QwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMB +MA0GCSqGSIb3DQEBCwUAA4IBAQBcMw6K6hfaQGeSYDxFJo/ZirEFPB8fNG5X6ywv +Iy8IG5StQePTmAtPe/qitAhi4PmRrQRLlQ2mEIuRg9HyE1O45389drDfPS3lAGqh +5FYqHhwOE+9ZVYlhACdxPqhtpWgZRri0IU6S1h0CG1EQZIDWMZiIWGgkOGn0XVGh +WqOayRIW2R2X5TVnemTF/46jlOpC3Hapxnx4eXw7BUBiig7gJx9PRGCQl7CtxMAI +eIbsvMfhW30zyOr25ao09QcR2A+ewtRmuo3PtIH3FcAHL67opULQc+G23r5aoC1k +aHZDOreQ+2rOQBfMSzl1XGbL7kMT7wwyJPDMLGvLgt3n8wMY +-----END CERTIFICATE----- diff --git a/test/data/localhost.key b/test/data/localhost.key new file mode 100644 index 0000000..2471b98 --- /dev/null +++ b/test/data/localhost.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCxTJTQWwnj0Wi1 +hslTE0xeZc3sbpGkwNEFKyJqnfE9t4Us519J76ForrkaRiMMl4jl2tad+z0eI3lR +W1QCw0p/iib2A2N0+Lbf5rQgokPZ3eXjm64oDCOj4sOYg/sUI/ncvckeMK+BWjmm +b5XCyFRBxH+cVZz+hWBQESFM9g6OIWaqQBBCPztxq7ESFFnWKuDC4D7Kx0TGQNfb +sFZddBBzyucsU6EniZ7L8zv5GhXElAjvl6W6FL3Ga/rumLYdDM4Sa+f/6ydDy7My +XwKoq3cxifumPiJxb9zACnBRoYUQq6K1kDwbnNsrEdzR2Bh7I1qNM6U751PPiJBt +CVE6iTfbAgMBAAECggEARQE8TheQstVW/oe2JZo2N+tBiUrDbq8I6w0NuRc9xDqA +H6jxglI8rQSL0HkJvSXhRyy0KQqWj/tYhVyZRvYBMcBwR4GsHOOMMXqWErl01P+z +MLHvx3BqEqf4XozHlOAnqE1JUHG8bQjTtT5quEPF307+J7d+geUhRihUoKKHqbMZ +7e1wZowCIbf233wI6vcx9MOnXEBgJIdlAX31aiTp0Wvk2ef4OqBNDfjOiUnbs7K2 +aC6LYj+HL3bf+gu0EE2m0LmXQxw23z965Hu6Gw/oLb+6FoglCo9RosTnYOwiJA3X +1EvqHKVg146NGGHUbRIZdw3m0xI9JH6NK8smjxbb4QKBgQDmTHSID6QH4ZSigIxG +frPt5VikB7gEXJhYfZK8pC61AoX9Bxr6kY89CxIrUeEMYk2Pg8jgaQrziLxPV3V+ +5dGOJQo38lieJXCDrwRSHxbtuENF3BRCMF1yZyRbZxPwD38KAE91SIeFreQ+59mc +aiYG3daAKAvoLbfRNgnC3PTQEQKBgQDFFfNLDkKXvVA6izMQDLjULJxjiEIz6IN/ +bdKzBkwm+oAoORCvJqiJp9mAo5sQF7r3aO1/Ke1SfvuaCZOVMnT6bliVs0UDoQcC +Jg8jpdAKb4ghIIZleJmdV3VHyvQA5Qg5vdtap+6CDS/6mfbPBMo8Zh00KaK2WJ9h +hWE6N6r1KwKBgQC5u9mTzkF1VbohINmBFTiZ2YkWqV8ArYj0fTn1x9gfhgx3194r +TW+fRKl/pIaDDVkOMLO2QSFi7dkpiBiroj/Siw7lth9AVGOc4G70qDw+togS9H6m +Lwl+da69xLEwv96uOzfaGAesiWT2UtiPLJDEou8W5rVLqGuCYDmZHciXcQKBgHiN +7LxEhMd8rc6hxyJSJdzjTOY1Owm1eHpCG1gWyg4tvKbeAS6iXwWU/p6JdRhq65rb +PCtE4j5MHmsi4Huq2ZM2XEl11wlZPog575jGnHNFtedNlegL1StBjCPWKVtCvb1U +PRE/F83Fc0u/UhFfxLUdYU+/CCCyJQvqIocR9ijxAoGAJwSGJBlWLC8MwOg+t5jn +gGO504ezpQUwr3/cWoP1Fj1mUihMLVi9A4+t2w/qqBHt8Lybx1lXDWY6Rth/8nqx +oV0LqrhkxLgMMjDWNRTDJdPeDKFm55GQlKgqi7jPEssIsSK+EXVxc9vVHsIAn6Hl ++VDVwHJUU2cABkO0BpSLU7w= +-----END PRIVATE KEY----- diff --git a/test/round_trip_test.dart b/test/round_trip_test.dart new file mode 100644 index 0000000..3a70a30 --- /dev/null +++ b/test/round_trip_test.dart @@ -0,0 +1,84 @@ +@TestOn('vm') +import 'dart:async'; +import 'dart:io'; +import 'package:grpc/grpc.dart'; +import 'package:grpc/service_api.dart' as api; +import 'package:grpc/src/client/channel.dart' hide ClientChannel; +import 'package:grpc/src/client/connection.dart'; +import 'package:grpc/src/client/http2_connection.dart'; +import 'package:http2/http2.dart'; +import 'package:test/test.dart'; + +class TestClient extends Client { + static final _$stream = ClientMethod('/test.TestService/stream', + (int value) => [value], (List value) => value[0]); + + TestClient(api.ClientChannel channel) : super(channel); + ResponseStream stream(int request, {CallOptions options}) { + final call = + $createCall(_$stream, Stream.fromIterable([request]), options: options); + return ResponseStream(call); + } +} + +class TestService extends Service { + String get $name => 'test.TestService'; + + TestService() { + $addMethod(ServiceMethod('stream', stream, false, true, + (List value) => value[0], (int value) => [value])); + } + + Stream stream(ServiceCall call, Future request) async* { + yield 1; + yield 2; + yield 3; + } +} + +class FixedConnectionClientChannel extends ClientChannelBase { + final Http2ClientConnection clientConnection; + List states = []; + FixedConnectionClientChannel(this.clientConnection) { + clientConnection.onStateChanged = (c) => states.add(c.state); + } + @override + ClientConnection createConnection() => clientConnection; +} + +main() async { + test('round trip insecure connection', () async { + final Server server = Server([TestService()]); + await server.serve(port: 0); + + final channel = FixedConnectionClientChannel(Http2ClientConnection( + 'localhost', + server.port, + ChannelOptions(credentials: ChannelCredentials.insecure()), + )); + final testClient = TestClient(channel); + expect(await testClient.stream(1).toList(), [1, 2, 3]); + server.shutdown(); + }); + + test('round trip secure connection', () async { + final Server server = Server([TestService()]); + await server.serve( + port: 0, + security: ServerTlsCredentials( + certificate: File('test/data/localhost.crt').readAsBytesSync(), + privateKey: File('test/data/localhost.key').readAsBytesSync())); + + final channel = FixedConnectionClientChannel(Http2ClientConnection( + 'localhost', + server.port, + ChannelOptions( + credentials: ChannelCredentials.secure( + certificates: File('test/data/localhost.crt').readAsBytesSync(), + authority: 'localhost')), + )); + final testClient = TestClient(channel); + expect(await testClient.stream(1).toList(), [1, 2, 3]); + server.shutdown(); + }); +} diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index d9cce32..1dd629b 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -50,6 +50,7 @@ Duration testBackoff(Duration lastBackoff) => const Duration(milliseconds: 1); class FakeChannelOptions implements ChannelOptions { ChannelCredentials credentials = const ChannelCredentials.secure(); Duration idleTimeout = const Duration(seconds: 1); + Duration connectionTimeout = const Duration(seconds: 10); String userAgent = 'dart-grpc/1.0.0 test'; BackoffStrategy backoffStrategy = testBackoff; } @@ -128,6 +129,7 @@ class ClientHarness { when(transport.makeRequest(any, endStream: anyNamed('endStream'))) .thenReturn(stream); when(transport.onActiveStateChanged = captureAny).thenReturn(null); + when(transport.isOpen).thenReturn(true); when(stream.outgoingMessages).thenReturn(fromClient.sink); when(stream.incomingMessages).thenAnswer((_) => toClient.stream); client = TestClient(channel);