Improve connection handling (#231)

* Improve connection handling

* Address review. Add round-trip-test
This commit is contained in:
Sigurd Meldgaard 2019-08-19 15:31:16 +02:00 committed by GitHub
parent 5ac5d6a1e4
commit 992e2dcc29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 322 additions and 29 deletions

View File

@ -1,3 +1,12 @@
## 2.1.0
* Do a health check of the http2-connection before making request.
* Introduce `ChannelOptions.connectionLimit` the longest time a single connection is used for new
requests.
* Use Tcp.nodelay to improve client call speed.
* Use SecureSocket supportedProtocols to save a round trip when establishing a secure connection.
* Allow passing http2 `ServerSettings` to `Server.serve`.
## 2.0.3
* GrpcError now implements Exception to indicate it can be reasonably handled.

View File

@ -53,6 +53,10 @@ class Http2ClientConnection implements connection.ClientConnection {
/// Used for idle and reconnect timeout, depending on [_state].
Timer _timer;
/// Used for making sure a single connection is not kept alive too long.
final Stopwatch _connectionLifeTimer = Stopwatch();
Duration _currentReconnectDelay;
final String host;
@ -69,25 +73,38 @@ class Http2ClientConnection implements connection.ClientConnection {
ConnectionState get state => _state;
static const _estimatedRoundTripTime = const Duration(milliseconds: 20);
Future<ClientTransportConnection> connectTransport() async {
var socket = await Socket.connect(host, port);
final securityContext = credentials.securityContext;
Socket socket;
if (securityContext == null) {
socket = await Socket.connect(host, port);
} else {
socket = await SecureSocket.connect(host, port,
supportedProtocols: ['h2'],
context: securityContext,
onBadCertificate: _validateBadCertificate);
if ((socket as SecureSocket).selectedProtocol != 'h2') {
socket.destroy();
throw (TransportException(
'Endpoint $host:$port does not support http/2 via ALPN'));
}
}
final connection = ClientTransportConnection.viaSocket(socket);
socket.done.then((_) => _abandonConnection());
// Give the settings settings-frame a bit of time to arrive.
// TODO(sigurdm): This is a hack. The http2 package should expose a way of
// waiting for the settings frame to arrive.
await new Future.delayed(_estimatedRoundTripTime);
if (_state == ConnectionState.shutdown) {
socket.destroy();
throw _ShutdownException();
}
final securityContext = credentials.securityContext;
if (securityContext != null) {
socket = await SecureSocket.secure(socket,
host: authority,
context: securityContext,
onBadCertificate: _validateBadCertificate);
if (_state == ConnectionState.shutdown) {
socket.destroy();
throw _ShutdownException();
}
}
socket.done.then((_) => _handleSocketClosed());
return ClientTransportConnection.viaSocket(socket);
return connection;
}
void _connect() {
@ -96,17 +113,44 @@ class Http2ClientConnection implements connection.ClientConnection {
return;
}
_setState(ConnectionState.connecting);
connectTransport().then((transport) {
connectTransport().then((transport) async {
_currentReconnectDelay = null;
_transportConnection = transport;
_connectionLifeTimer
..reset()
..start();
transport.onActiveStateChanged = _handleActiveStateChanged;
_setState(ConnectionState.ready);
_pendingCalls.forEach(_startCall);
_pendingCalls.clear();
if (_hasPendingCalls()) {
// Take all pending calls out, and reschedule.
final pendingCalls = _pendingCalls.toList();
_pendingCalls.clear();
pendingCalls.forEach(dispatchCall);
}
}).catchError(_handleConnectionFailure);
}
/// Abandons the current connection if it is unhealthy or has been open for
/// too long.
///
/// Assumes [_transportConnection] is not `null`.
void _refreshConnectionIfUnhealthy() {
final bool isHealthy = _transportConnection.isOpen;
final bool shouldRefresh =
_connectionLifeTimer.elapsed > options.connectionTimeout;
if (shouldRefresh) {
_transportConnection.finish();
}
if (!isHealthy || shouldRefresh) {
_abandonConnection();
}
}
void dispatchCall(ClientCall call) {
if (_transportConnection != null) {
_refreshConnectionIfUnhealthy();
}
switch (_state) {
case ConnectionState.ready:
_startCall(call);
@ -219,7 +263,7 @@ class Http2ClientConnection implements connection.ClientConnection {
_connect();
}
void _handleSocketClosed() {
void _abandonConnection() {
_cancelTimer();
_transportConnection = null;

View File

@ -17,6 +17,11 @@ import 'dart:math';
import 'transport/http2_credentials.dart';
const defaultIdleTimeout = Duration(minutes: 5);
/// It seems like Google's gRPC endpoints will forcefully close the
/// connection after precisely 1 hour. So we *proactively* refresh our
/// connection after 50 minutes. This will avoid one failed RPC call.
const defaultConnectionTimeOut = Duration(minutes: 50);
const defaultUserAgent = 'dart-grpc/2.0.0';
typedef Duration BackoffStrategy(Duration lastBackoff);
@ -39,16 +44,17 @@ Duration defaultBackoffStrategy(Duration lastBackoff) {
class ChannelOptions {
final ChannelCredentials credentials;
final Duration idleTimeout;
/// The maximum time a single connection will be used for new requests.
final Duration connectionTimeout;
final BackoffStrategy backoffStrategy;
final String userAgent;
const ChannelOptions({
ChannelCredentials credentials,
Duration idleTimeout,
String userAgent,
BackoffStrategy backoffStrategy,
}) : this.credentials = credentials ?? const ChannelCredentials.secure(),
this.idleTimeout = idleTimeout ?? defaultIdleTimeout,
this.userAgent = userAgent ?? defaultUserAgent,
this.backoffStrategy = backoffStrategy ?? defaultBackoffStrategy;
this.credentials = const ChannelCredentials.secure(),
this.idleTimeout = defaultIdleTimeout,
this.userAgent = defaultUserAgent,
this.backoffStrategy = defaultBackoffStrategy,
this.connectionTimeout = defaultConnectionTimeOut,
});
}

View File

@ -85,7 +85,10 @@ class Server {
Service lookupService(String service) => _services[service];
Future<void> serve(
{dynamic address, int port, ServerTlsCredentials security}) async {
{dynamic address,
int port,
ServerTlsCredentials security,
ServerSettings http2ServerSettings}) async {
// TODO(dart-lang/grpc-dart#9): Handle HTTP/1.1 upgrade to h2c, if allowed.
Stream<Socket> server;
if (security != null) {
@ -100,7 +103,8 @@ class Server {
server = _insecureServer;
}
server.listen((socket) {
final connection = ServerTransportConnection.viaSocket(socket);
final connection = ServerTransportConnection.viaSocket(socket,
settings: http2ServerSettings);
_connections.add(connection);
ServerHandler_ handler;
// TODO(jakobr): Set active state handlers, close connection after idle

View File

@ -1,7 +1,7 @@
name: grpc
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
version: 2.0.3
version: 2.1.0
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/grpc-dart

View File

@ -0,0 +1,98 @@
@TestOn('vm')
import 'dart:async';
import 'package:grpc/grpc.dart' as grpc;
import 'package:grpc/service_api.dart';
import 'package:grpc/src/client/channel.dart';
import 'package:grpc/src/client/connection.dart';
import 'package:grpc/src/client/http2_connection.dart';
import 'package:http2/http2.dart';
import 'package:test/test.dart';
class TestClient extends grpc.Client {
static final _$stream = grpc.ClientMethod<int, int>(
'/test.TestService/stream',
(int value) => [value],
(List<int> value) => value[0]);
TestClient(ClientChannel channel) : super(channel);
grpc.ResponseStream<int> stream(int request, {grpc.CallOptions options}) {
final call =
$createCall(_$stream, Stream.fromIterable([request]), options: options);
return grpc.ResponseStream(call);
}
}
class TestService extends grpc.Service {
String get $name => 'test.TestService';
TestService() {
$addMethod(grpc.ServiceMethod<int, int>('stream', stream, false, true,
(List<int> value) => value[0], (int value) => [value]));
}
Stream<int> stream(grpc.ServiceCall call, Future request) async* {
yield 1;
yield 2;
yield 3;
}
}
class FixedConnectionClientChannel extends ClientChannelBase {
final Http2ClientConnection clientConnection;
List<grpc.ConnectionState> states = <grpc.ConnectionState>[];
FixedConnectionClientChannel(this.clientConnection) {
clientConnection.onStateChanged = (c) => states.add(c.state);
}
@override
ClientConnection createConnection() => clientConnection;
}
main() async {
test('client reconnects after the connection gets old', () async {
final grpc.Server server = grpc.Server([TestService()]);
await server.serve(port: 0);
final channel = FixedConnectionClientChannel(Http2ClientConnection(
'localhost',
server.port,
grpc.ChannelOptions(
idleTimeout: Duration(minutes: 1),
// Short delay to test that it will time out.
connectionTimeout: Duration(milliseconds: 100),
credentials: grpc.ChannelCredentials.insecure(),
),
));
final testClient = TestClient(channel);
expect(await testClient.stream(1).toList(), [1, 2, 3]);
await Future.delayed(Duration(milliseconds: 200));
expect(await testClient.stream(1).toList(), [1, 2, 3]);
expect(
channel.states.where((x) => x == grpc.ConnectionState.ready).length, 2);
server.shutdown();
});
test('client reconnects when stream limit is used', () async {
final grpc.Server server = grpc.Server([TestService()]);
await server.serve(
port: 0, http2ServerSettings: ServerSettings(concurrentStreamLimit: 2));
final channel = FixedConnectionClientChannel(Http2ClientConnection(
'localhost',
server.port,
grpc.ChannelOptions(credentials: grpc.ChannelCredentials.insecure())));
final states = <grpc.ConnectionState>[];
channel.clientConnection.onStateChanged =
(Http2ClientConnection connection) => states.add(connection.state);
final testClient = TestClient(channel);
await Future.wait(<Future>[
expectLater(testClient.stream(1).toList(), completion([1, 2, 3])),
expectLater(testClient.stream(1).toList(), completion([1, 2, 3])),
expectLater(testClient.stream(1).toList(), completion([1, 2, 3])),
expectLater(testClient.stream(1).toList(), completion([1, 2, 3])),
]);
expect(states.where((x) => x == grpc.ConnectionState.ready).length, 2);
server.shutdown();
});
}

18
test/data/localhost.crt Normal file
View File

@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC8DCCAdigAwIBAgIULb6gVjdq3rIlwftMU+sIJPwdnvowDQYJKoZIhvcNAQEL
BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE5MDgxOTEwNDEwN1oXDTE5MDkx
ODEwNDEwN1owFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEAsUyU0FsJ49FotYbJUxNMXmXN7G6RpMDRBSsiap3xPbeF
LOdfSe+haK65GkYjDJeI5drWnfs9HiN5UVtUAsNKf4om9gNjdPi23+a0IKJD2d3l
45uuKAwjo+LDmIP7FCP53L3JHjCvgVo5pm+VwshUQcR/nFWc/oVgUBEhTPYOjiFm
qkAQQj87cauxEhRZ1irgwuA+ysdExkDX27BWXXQQc8rnLFOhJ4mey/M7+RoVxJQI
75eluhS9xmv67pi2HQzOEmvn/+snQ8uzMl8CqKt3MYn7pj4icW/cwApwUaGFEKui
tZA8G5zbKxHc0dgYeyNajTOlO+dTz4iQbQlROok32wIDAQABozowODAUBgNVHREE
DTALgglsb2NhbGhvc3QwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMB
MA0GCSqGSIb3DQEBCwUAA4IBAQBcMw6K6hfaQGeSYDxFJo/ZirEFPB8fNG5X6ywv
Iy8IG5StQePTmAtPe/qitAhi4PmRrQRLlQ2mEIuRg9HyE1O45389drDfPS3lAGqh
5FYqHhwOE+9ZVYlhACdxPqhtpWgZRri0IU6S1h0CG1EQZIDWMZiIWGgkOGn0XVGh
WqOayRIW2R2X5TVnemTF/46jlOpC3Hapxnx4eXw7BUBiig7gJx9PRGCQl7CtxMAI
eIbsvMfhW30zyOr25ao09QcR2A+ewtRmuo3PtIH3FcAHL67opULQc+G23r5aoC1k
aHZDOreQ+2rOQBfMSzl1XGbL7kMT7wwyJPDMLGvLgt3n8wMY
-----END CERTIFICATE-----

28
test/data/localhost.key Normal file
View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCxTJTQWwnj0Wi1
hslTE0xeZc3sbpGkwNEFKyJqnfE9t4Us519J76ForrkaRiMMl4jl2tad+z0eI3lR
W1QCw0p/iib2A2N0+Lbf5rQgokPZ3eXjm64oDCOj4sOYg/sUI/ncvckeMK+BWjmm
b5XCyFRBxH+cVZz+hWBQESFM9g6OIWaqQBBCPztxq7ESFFnWKuDC4D7Kx0TGQNfb
sFZddBBzyucsU6EniZ7L8zv5GhXElAjvl6W6FL3Ga/rumLYdDM4Sa+f/6ydDy7My
XwKoq3cxifumPiJxb9zACnBRoYUQq6K1kDwbnNsrEdzR2Bh7I1qNM6U751PPiJBt
CVE6iTfbAgMBAAECggEARQE8TheQstVW/oe2JZo2N+tBiUrDbq8I6w0NuRc9xDqA
H6jxglI8rQSL0HkJvSXhRyy0KQqWj/tYhVyZRvYBMcBwR4GsHOOMMXqWErl01P+z
MLHvx3BqEqf4XozHlOAnqE1JUHG8bQjTtT5quEPF307+J7d+geUhRihUoKKHqbMZ
7e1wZowCIbf233wI6vcx9MOnXEBgJIdlAX31aiTp0Wvk2ef4OqBNDfjOiUnbs7K2
aC6LYj+HL3bf+gu0EE2m0LmXQxw23z965Hu6Gw/oLb+6FoglCo9RosTnYOwiJA3X
1EvqHKVg146NGGHUbRIZdw3m0xI9JH6NK8smjxbb4QKBgQDmTHSID6QH4ZSigIxG
frPt5VikB7gEXJhYfZK8pC61AoX9Bxr6kY89CxIrUeEMYk2Pg8jgaQrziLxPV3V+
5dGOJQo38lieJXCDrwRSHxbtuENF3BRCMF1yZyRbZxPwD38KAE91SIeFreQ+59mc
aiYG3daAKAvoLbfRNgnC3PTQEQKBgQDFFfNLDkKXvVA6izMQDLjULJxjiEIz6IN/
bdKzBkwm+oAoORCvJqiJp9mAo5sQF7r3aO1/Ke1SfvuaCZOVMnT6bliVs0UDoQcC
Jg8jpdAKb4ghIIZleJmdV3VHyvQA5Qg5vdtap+6CDS/6mfbPBMo8Zh00KaK2WJ9h
hWE6N6r1KwKBgQC5u9mTzkF1VbohINmBFTiZ2YkWqV8ArYj0fTn1x9gfhgx3194r
TW+fRKl/pIaDDVkOMLO2QSFi7dkpiBiroj/Siw7lth9AVGOc4G70qDw+togS9H6m
Lwl+da69xLEwv96uOzfaGAesiWT2UtiPLJDEou8W5rVLqGuCYDmZHciXcQKBgHiN
7LxEhMd8rc6hxyJSJdzjTOY1Owm1eHpCG1gWyg4tvKbeAS6iXwWU/p6JdRhq65rb
PCtE4j5MHmsi4Huq2ZM2XEl11wlZPog575jGnHNFtedNlegL1StBjCPWKVtCvb1U
PRE/F83Fc0u/UhFfxLUdYU+/CCCyJQvqIocR9ijxAoGAJwSGJBlWLC8MwOg+t5jn
gGO504ezpQUwr3/cWoP1Fj1mUihMLVi9A4+t2w/qqBHt8Lybx1lXDWY6Rth/8nqx
oV0LqrhkxLgMMjDWNRTDJdPeDKFm55GQlKgqi7jPEssIsSK+EXVxc9vVHsIAn6Hl
+VDVwHJUU2cABkO0BpSLU7w=
-----END PRIVATE KEY-----

84
test/round_trip_test.dart Normal file
View File

@ -0,0 +1,84 @@
@TestOn('vm')
import 'dart:async';
import 'dart:io';
import 'package:grpc/grpc.dart';
import 'package:grpc/service_api.dart' as api;
import 'package:grpc/src/client/channel.dart' hide ClientChannel;
import 'package:grpc/src/client/connection.dart';
import 'package:grpc/src/client/http2_connection.dart';
import 'package:http2/http2.dart';
import 'package:test/test.dart';
class TestClient extends Client {
static final _$stream = ClientMethod<int, int>('/test.TestService/stream',
(int value) => [value], (List<int> value) => value[0]);
TestClient(api.ClientChannel channel) : super(channel);
ResponseStream<int> stream(int request, {CallOptions options}) {
final call =
$createCall(_$stream, Stream.fromIterable([request]), options: options);
return ResponseStream(call);
}
}
class TestService extends Service {
String get $name => 'test.TestService';
TestService() {
$addMethod(ServiceMethod<int, int>('stream', stream, false, true,
(List<int> value) => value[0], (int value) => [value]));
}
Stream<int> stream(ServiceCall call, Future request) async* {
yield 1;
yield 2;
yield 3;
}
}
class FixedConnectionClientChannel extends ClientChannelBase {
final Http2ClientConnection clientConnection;
List<ConnectionState> states = <ConnectionState>[];
FixedConnectionClientChannel(this.clientConnection) {
clientConnection.onStateChanged = (c) => states.add(c.state);
}
@override
ClientConnection createConnection() => clientConnection;
}
main() async {
test('round trip insecure connection', () async {
final Server server = Server([TestService()]);
await server.serve(port: 0);
final channel = FixedConnectionClientChannel(Http2ClientConnection(
'localhost',
server.port,
ChannelOptions(credentials: ChannelCredentials.insecure()),
));
final testClient = TestClient(channel);
expect(await testClient.stream(1).toList(), [1, 2, 3]);
server.shutdown();
});
test('round trip secure connection', () async {
final Server server = Server([TestService()]);
await server.serve(
port: 0,
security: ServerTlsCredentials(
certificate: File('test/data/localhost.crt').readAsBytesSync(),
privateKey: File('test/data/localhost.key').readAsBytesSync()));
final channel = FixedConnectionClientChannel(Http2ClientConnection(
'localhost',
server.port,
ChannelOptions(
credentials: ChannelCredentials.secure(
certificates: File('test/data/localhost.crt').readAsBytesSync(),
authority: 'localhost')),
));
final testClient = TestClient(channel);
expect(await testClient.stream(1).toList(), [1, 2, 3]);
server.shutdown();
});
}

View File

@ -50,6 +50,7 @@ Duration testBackoff(Duration lastBackoff) => const Duration(milliseconds: 1);
class FakeChannelOptions implements ChannelOptions {
ChannelCredentials credentials = const ChannelCredentials.secure();
Duration idleTimeout = const Duration(seconds: 1);
Duration connectionTimeout = const Duration(seconds: 10);
String userAgent = 'dart-grpc/1.0.0 test';
BackoffStrategy backoffStrategy = testBackoff;
}
@ -128,6 +129,7 @@ class ClientHarness {
when(transport.makeRequest(any, endStream: anyNamed('endStream')))
.thenReturn(stream);
when(transport.onActiveStateChanged = captureAny).thenReturn(null);
when(transport.isOpen).thenReturn(true);
when(stream.outgoingMessages).thenReturn(fromClient.sink);
when(stream.incomingMessages).thenAnswer((_) => toClient.stream);
client = TestClient(channel);