Multiplex RPCs on a single connection. (#34)

A Channel will now multiplex RPCs on a single managed connection. The
connection will attempt to reconnect on failure, and will close the
underlying transport connection if no RPCs have been made for a while.

Part of #5.
This commit is contained in:
Jakob Andersen 2017-10-12 15:37:29 +02:00 committed by GitHub
parent a9b919a5e9
commit 7b2ff3e571
8 changed files with 393 additions and 97 deletions

View File

@ -18,12 +18,17 @@ class Client {
Future<Null> main(List<String> args) async {
channel = new ClientChannel('127.0.0.1',
port: 8080, options: const ChannelOptions.insecure());
stub = new RouteGuideClient(channel);
stub = new RouteGuideClient(channel,
options: new CallOptions(timeout: new Duration(seconds: 30)));
// Run all of the demos in order.
await runGetFeature();
await runListFeatures();
await runRecordRoute();
await runRouteChat();
try {
await runGetFeature();
await runListFeatures();
await runRecordRoute();
await runRouteChat();
} catch (e) {
print('Caught error: $e');
}
await channel.shutdown();
}

View File

@ -7,7 +7,9 @@
import 'dart:async';
import 'dart:io';
import 'dart:math';
import 'package:http2/transport.dart';
import 'package:meta/meta.dart';
import 'shared.dart';
import 'status.dart';
@ -21,38 +23,95 @@ const _reservedHeaders = const [
'user-agent',
];
const defaultIdleTimeout = const Duration(minutes: 5);
typedef Duration BackoffStrategy(Duration lastBackoff);
// Backoff algorithm from https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
const _minConnectTimeout = const Duration(seconds: 20);
const _initialBackoff = const Duration(seconds: 1);
const _maxBackoff = const Duration(seconds: 120);
const _multiplier = 1.6;
const _jitter = 0.2;
final _random = new Random();
Duration defaultBackoffStrategy(Duration lastBackoff) {
if (lastBackoff == null) return _initialBackoff;
final jitter = _random.nextDouble() * 2 * _jitter - _jitter;
final nextBackoff = lastBackoff * (_multiplier + jitter);
return nextBackoff < _maxBackoff ? nextBackoff : _maxBackoff;
}
/// Options controlling how connections are made on a [ClientChannel].
class ChannelOptions {
final bool _useTls;
final bool isSecure;
final List<int> _certificateBytes;
final String _certificatePassword;
final String authority;
final Duration idleTimeout;
final BackoffStrategy backoffStrategy;
const ChannelOptions._(this._useTls,
[this._certificateBytes, this._certificatePassword, this.authority]);
/// Enable TLS using the default trust store.
const ChannelOptions() : this._(true);
const ChannelOptions._(
this.isSecure,
this._certificateBytes,
this._certificatePassword,
this.authority,
Duration idleTimeout,
BackoffStrategy backoffStrategy)
: this.idleTimeout = idleTimeout ?? defaultIdleTimeout,
this.backoffStrategy = backoffStrategy ?? defaultBackoffStrategy;
/// Disable TLS. RPCs are sent in clear text.
const ChannelOptions.insecure() : this._(false);
const ChannelOptions.insecure(
{Duration idleTimeout,
BackoffStrategy backoffStrategy =
defaultBackoffStrategy}) // Remove when dart-lang/sdk#31066 is fixed.
: this._(false, null, null, null, idleTimeout, backoffStrategy);
/// Enable TLS and specify the [certificate]s to trust.
ChannelOptions.secure(
{List<int> certificate, String password, String authority})
: this._(true, certificate, password, authority);
/// Enable TLS and optionally specify the [certificate]s to trust. If
/// [certificates] is not provided, the default trust store is used.
const ChannelOptions.secure(
{List<int> certificate,
String password,
String authority,
Duration idleTimeout,
BackoffStrategy backoffStrategy =
defaultBackoffStrategy}) // Remove when dart-lang/sdk#31066 is fixed.
: this._(true, certificate, password, authority, idleTimeout,
backoffStrategy);
SecurityContext get securityContext {
if (!_useTls) return null;
final context = createSecurityContext(false);
if (!isSecure) return null;
if (_certificateBytes != null) {
context.setTrustedCertificatesBytes(_certificateBytes,
password: _certificatePassword);
return createSecurityContext(false)
..setTrustedCertificatesBytes(_certificateBytes,
password: _certificatePassword);
}
final context = SecurityContext.defaultContext;
if (SecurityContext.alpnSupported) {
context.setAlpnProtocols(supportedAlpnProtocols, false);
}
return context;
}
}
enum ConnectionState {
/// Actively trying to connect.
connecting,
/// Connection successfully established.
ready,
/// Some transient failure occurred, waiting to re-connect.
transientFailure,
/// Not currently connected, and no pending RPCs.
idle,
/// Shutting down, no further RPCs allowed.
shutdown
}
/// A connection to a single RPC endpoint.
///
/// RPCs made on a connection are always sent to the same endpoint.
@ -67,9 +126,23 @@ class ClientConnection {
new Header.ascii('grpc-accept-encoding', 'identity');
static final _userAgent = new Header.ascii('user-agent', 'dart-grpc/0.2.0');
final ClientTransportConnection _transport;
final String host;
final int port;
final ChannelOptions options;
ClientConnection(this._transport);
ConnectionState _state = ConnectionState.idle;
void Function(ClientConnection connection) onStateChanged;
final _pendingCalls = <ClientCall>[];
ClientTransportConnection _transport;
/// Used for idle and reconnect timeout, depending on [_state].
Timer _timer;
Duration _currentReconnectDelay;
ClientConnection(this.host, this.port, this.options);
ConnectionState get state => _state;
static List<Header> createCallHeaders(
bool useTls, String authority, String path, CallOptions options) {
@ -95,13 +168,83 @@ class ClientConnection {
return headers;
}
String get authority => options.authority ?? host;
@visibleForTesting
Future<ClientTransportConnection> connectTransport() async {
final securityContext = options.securityContext;
var socket = await Socket.connect(host, port);
if (_state == ConnectionState.shutdown) {
socket.destroy();
throw 'Shutting down';
}
if (securityContext != null) {
socket = await SecureSocket.secure(socket,
host: authority, context: securityContext);
if (_state == ConnectionState.shutdown) {
socket.destroy();
throw 'Shutting down';
}
}
socket.done.then(_handleSocketClosed);
return new ClientTransportConnection.viaSocket(socket);
}
void _connect() {
if (_state != ConnectionState.idle &&
_state != ConnectionState.transientFailure) {
return;
}
_setState(ConnectionState.connecting);
connectTransport().then((transport) {
_currentReconnectDelay = null;
_transport = transport;
_transport.onActiveStateChanged = _handleActiveStateChanged;
_setState(ConnectionState.ready);
_pendingCalls.forEach(_startCall);
_pendingCalls.clear();
}).catchError(_handleTransientFailure);
}
void dispatchCall(ClientCall call) {
switch (_state) {
case ConnectionState.ready:
_startCall(call);
break;
case ConnectionState.shutdown:
_shutdownCall(call);
break;
default:
_pendingCalls.add(call);
if (_state == ConnectionState.idle) {
_connect();
}
}
}
void _startCall(ClientCall call) {
if (call._isCancelled) return;
final headers =
createCallHeaders(options.isSecure, authority, call.path, call.options);
final stream = _transport.makeRequest(headers);
call._onConnectedStream(stream);
}
void _shutdownCall(ClientCall call) {
if (call._isCancelled) return;
call._onConnectError(
new GrpcError.unavailable('Connection shutting down.'));
}
/// Shuts down this connection.
///
/// No further calls may be made on this connection, but existing calls
/// are allowed to finish.
Future<Null> shutdown() {
// TODO(jakobr): Manage streams, close [_transport] when all are done.
return _transport.finish();
if (_state == ConnectionState.shutdown) return new Future.value();
_setShutdownState();
return _transport?.finish() ?? new Future.value();
}
/// Terminates this connection.
@ -109,20 +252,81 @@ class ClientConnection {
/// All open calls are terminated immediately, and no further calls may be
/// made on this connection.
Future<Null> terminate() {
// TODO(jakobr): Manage streams, close them immediately.
return _transport.terminate();
_setShutdownState();
return _transport?.terminate() ?? new Future.value();
}
/// Starts a new RPC on this connection.
///
/// Creates a new transport stream on this connection, and sends initial call
/// metadata.
ClientTransportStream sendRequest(
bool useTls, String authority, String path, CallOptions options) {
final headers = createCallHeaders(useTls, authority, path, options);
final stream = _transport.makeRequest(headers);
// TODO(jakobr): Manage streams. Subscribe to stream state changes.
return stream;
void _setShutdownState() {
_setState(ConnectionState.shutdown);
_cancelTimer();
_pendingCalls.forEach(_shutdownCall);
_pendingCalls.clear();
}
void _setState(ConnectionState state) {
_state = state;
if (onStateChanged != null) {
onStateChanged(this);
}
}
void _handleIdleTimeout() {
if (_timer == null || _state != ConnectionState.ready) return;
_cancelTimer();
_transport?.finish()?.catchError((_) => {}); // TODO(jakobr): Log error.
_transport = null;
_setState(ConnectionState.idle);
}
void _cancelTimer() {
_timer?.cancel();
_timer = null;
}
void _handleActiveStateChanged(bool isActive) {
if (isActive) {
_cancelTimer();
} else {
if (options.idleTimeout != null) {
_timer ??= new Timer(options.idleTimeout, _handleIdleTimeout);
}
}
}
bool _hasPendingCalls() {
// Get rid of pending calls that have timed out.
_pendingCalls.removeWhere((call) => call._isCancelled);
return _pendingCalls.isNotEmpty;
}
void _handleTransientFailure(error) {
_transport = null;
if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) {
return;
}
// TODO(jakobr): Log error.
_cancelTimer();
if (!_hasPendingCalls()) {
_setState(ConnectionState.idle);
return;
}
_setState(ConnectionState.transientFailure);
_currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay);
_timer = new Timer(_currentReconnectDelay, _handleReconnect);
}
void _handleReconnect() {
if (_timer == null || _state != ConnectionState.transientFailure) return;
_cancelTimer();
_connect();
}
void _handleSocketClosed(Socket _) {
_cancelTimer();
if (_state != ConnectionState.idle && _state != ConnectionState.shutdown) {
// We were not planning to close the socket.
_handleTransientFailure('Socket closed');
}
}
}
@ -136,20 +340,13 @@ class ClientChannel {
final int port;
final ChannelOptions options;
final _connections = <ClientConnection>[];
// TODO(jakobr): Multiple connections, load balancing.
ClientConnection _connection;
bool _isShutdown = false;
ClientChannel(this.host,
{this.port = 443, this.options = const ChannelOptions()});
String get authority => options.authority ?? host;
void _shutdownCheck([Function() cleanup]) {
if (!_isShutdown) return;
if (cleanup != null) cleanup();
throw new GrpcError.unavailable('Channel shutting down.');
}
{this.port = 443, this.options = const ChannelOptions.secure()});
/// Shuts down this channel.
///
@ -158,7 +355,7 @@ class ClientChannel {
Future<Null> shutdown() {
if (_isShutdown) return new Future.value();
_isShutdown = true;
return Future.wait(_connections.map((c) => c.shutdown()));
return _connection.shutdown();
}
/// Terminates this channel.
@ -167,42 +364,25 @@ class ClientChannel {
/// on this channel.
Future<Null> terminate() {
_isShutdown = true;
return Future.wait(_connections.map((c) => c.terminate()));
return _connection.terminate();
}
/// Returns a connection to this [Channel]'s RPC endpoint.
///
/// The connection may be shared between multiple RPCs.
Future<ClientConnection> connect() async {
_shutdownCheck();
final securityContext = options.securityContext;
var socket = await Socket.connect(host, port);
_shutdownCheck(socket.destroy);
if (securityContext != null) {
socket = await SecureSocket.secure(socket,
host: authority, context: securityContext);
_shutdownCheck(socket.destroy);
}
final connection =
new ClientConnection(new ClientTransportConnection.viaSocket(socket));
_connections.add(connection);
return connection;
Future<ClientConnection> getConnection() async {
if (_isShutdown) throw new GrpcError.unavailable('Channel shutting down.');
return _connection ??= new ClientConnection(host, port, options);
}
/// Initiates a new RPC on this connection.
ClientCall<Q, R> createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options) {
final call = new ClientCall(method, requests, options.timeout);
connect().then((connection) {
// TODO(jakobr): Check if deadline is exceeded.
final call = new ClientCall(method, requests, options);
getConnection().then((connection) {
if (call._isCancelled) return;
final stream = connection.sendRequest(
this.options._useTls, authority, method.path, options);
call._onConnectedStream(stream);
}, onError: (error) {
call._onConnectError(error);
});
connection.dispatchCall(call);
}, onError: call._onConnectError);
return call;
}
}
@ -263,6 +443,7 @@ class Client {
class ClientCall<Q, R> implements Response {
final ClientMethod<Q, R> _method;
final Stream<Q> _requests;
final CallOptions options;
final _headers = new Completer<Map<String, String>>();
final _trailers = new Completer<Map<String, String>>();
@ -278,13 +459,15 @@ class ClientCall<Q, R> implements Response {
bool _isCancelled = false;
Timer _timeoutTimer;
ClientCall(this._method, this._requests, Duration timeout) {
ClientCall(this._method, this._requests, this.options) {
_responses = new StreamController(onListen: _onResponseListen);
if (timeout != null) {
_timeoutTimer = new Timer(timeout, _onTimedOut);
if (options.timeout != null) {
_timeoutTimer = new Timer(options.timeout, _onTimedOut);
}
}
String get path => _method.path;
void _onConnectError(error) {
if (!_responses.isClosed) {
_responses

View File

@ -112,6 +112,8 @@ class Server {
server.listen((socket) {
final connection = new ServerTransportConnection.viaSocket(socket);
_connections.add(connection);
// TODO(jakobr): Set active state handlers, close connection after idle
// timeout.
connection.incomingStreams.listen(serveStream, onError: (error) {
print('Connection error: $error');
}, onDone: () {

View File

@ -124,10 +124,12 @@ abstract class _ResponseMixin<Q, R> implements Response {
Future<Null> cancel() => _call.cancel();
}
const supportedAlpnProtocols = const ['grpc-exp', 'h2'];
// TODO: Simplify once we have a stable Dart 1.25 release (update pubspec to
// require SDK >=1.25.0, and remove check for alpnSupported).
SecurityContext createSecurityContext(bool isServer) =>
SecurityContext.alpnSupported
? (new SecurityContext()
..setAlpnProtocols(['grpc-exp', 'h2'], isServer))
..setAlpnProtocols(supportedAlpnProtocols, isServer))
: new SecurityContext();

View File

@ -148,6 +148,7 @@ class _GrpcMessageConversionSink extends ChunkedConversionSink<StreamMessage> {
// TODO(jakobr): Handle duplicate header names correctly.
headers[ASCII.decode(header.name)] = ASCII.decode(header.value);
}
// TODO(jakobr): Check :status, go to error mode if not 2xx.
_out.add(new GrpcMetadata(headers));
}

View File

@ -4,10 +4,9 @@
import 'dart:async';
import 'package:grpc/src/status.dart';
import 'package:grpc/grpc.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';
import 'package:mockito/mockito.dart';
import 'src/client_utils.dart';
import 'src/utils.dart';
@ -275,15 +274,6 @@ void main() {
);
});
test('Connection errors are reported', () async {
harness.channel.connectionError = 'Connection error';
final expectedError =
new GrpcError.unavailable('Error connecting: Connection error');
harness.expectThrows(harness.client.unary(dummyValue), expectedError);
harness.expectThrows(
harness.client.serverStreaming(dummyValue).toList(), expectedError);
});
test('Known request errors are reported', () async {
final expectedException = new GrpcError.deadlineExceeded('Too late!');
@ -310,4 +300,85 @@ void main() {
expectDone: false,
);
});
Future<Null> makeUnaryCall() async {
void handleRequest(StreamMessage message) {
harness
..sendResponseHeader()
..sendResponseValue(1)
..sendResponseTrailer();
}
await harness.runTest(
clientCall: harness.client.unary(1),
expectedResult: 1,
expectedPath: '/Test/Unary',
serverHandlers: [handleRequest],
);
}
test('Reconnect on connection error', () async {
final connectionStates = <ConnectionState>[];
harness.connection.connectionError = 'Connection error';
int failureCount = 0;
harness.connection.onStateChanged = (connection) {
final state = connection.state;
connectionStates.add(state);
if (state == ConnectionState.transientFailure) failureCount++;
if (failureCount == 2) {
harness.connection.connectionError = null;
}
};
await makeUnaryCall();
expect(failureCount, 2);
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.transientFailure,
ConnectionState.connecting,
ConnectionState.transientFailure,
ConnectionState.connecting,
ConnectionState.ready
]);
});
test('Connections time out if idle', () async {
final done = new Completer();
final connectionStates = <ConnectionState>[];
harness.connection.onStateChanged = (connection) {
final state = connection.state;
connectionStates.add(state);
if (state == ConnectionState.idle) done.complete();
};
harness.channelOptions.idleTimeout = const Duration(microseconds: 10);
await makeUnaryCall();
harness.signalIdle();
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.ready]);
await done.future;
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.idle
]);
});
test('Default reconnect backoff backs off', () {
Duration lastBackoff = defaultBackoffStrategy(null);
expect(lastBackoff, const Duration(seconds: 1));
for (int i = 0; i < 12; i++) {
final minNext = lastBackoff * (1.6 - 0.2);
final maxNext = lastBackoff * (1.6 + 0.2);
lastBackoff = defaultBackoffStrategy(lastBackoff);
if (lastBackoff != const Duration(minutes: 2)) {
expect(lastBackoff, greaterThanOrEqualTo(minNext));
expect(lastBackoff, lessThanOrEqualTo(maxNext));
}
}
expect(lastBackoff, const Duration(minutes: 2));
expect(defaultBackoffStrategy(lastBackoff), const Duration(minutes: 2));
});
}

View File

@ -26,9 +26,6 @@ void main() {
final correctPassword = new ChannelOptions.secure(
certificate: certificate, password: 'correct');
expect(correctPassword.securityContext, isNotNull);
final channel = new ClientChannel('localhost', options: missingPassword);
expect(channel.connect(), throwsA(isTlsException));
});
});
}

View File

@ -4,6 +4,7 @@
import 'dart:async';
import 'dart:io';
import 'package:grpc/src/streams.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';
@ -17,20 +18,42 @@ class MockTransport extends Mock implements ClientTransportConnection {}
class MockStream extends Mock implements ClientTransportStream {}
class MockChannel extends ClientChannel {
final ClientConnection connection;
class FakeConnection extends ClientConnection {
final ClientTransportConnection transport;
var connectionError;
MockChannel(String host, this.connection) : super(host);
FakeConnection(String host, this.transport, ChannelOptions options)
: super(host, 443, options);
@override
Future<ClientConnection> connect() async {
Future<ClientTransportConnection> connectTransport() async {
if (connectionError != null) throw connectionError;
return connection;
return transport;
}
}
Duration testBackoff(Duration lastBackoff) => const Duration(milliseconds: 1);
class FakeChannelOptions implements ChannelOptions {
String authority;
Duration idleTimeout = const Duration(seconds: 1);
BackoffStrategy backoffStrategy = testBackoff;
SecurityContext securityContext = new SecurityContext();
bool isSecure = true;
}
class FakeChannel extends ClientChannel {
final ClientConnection connection;
final FakeChannelOptions options;
FakeChannel(String host, this.connection, this.options)
: super(host, options: options);
@override
Future<ClientConnection> getConnection() async => connection;
}
typedef ServerMessageHandler = void Function(StreamMessage message);
class TestClient extends Client {
@ -74,7 +97,9 @@ class TestClient extends Client {
class ClientHarness {
MockTransport transport;
MockChannel channel;
FakeConnection connection;
FakeChannel channel;
FakeChannelOptions channelOptions;
MockStream stream;
StreamController<StreamMessage> fromClient;
@ -84,11 +109,14 @@ class ClientHarness {
void setUp() {
transport = new MockTransport();
channel = new MockChannel('test', new ClientConnection(transport));
channelOptions = new FakeChannelOptions();
connection = new FakeConnection('test', transport, channelOptions);
channel = new FakeChannel('test', connection, channelOptions);
stream = new MockStream();
fromClient = new StreamController();
toClient = new StreamController();
when(transport.makeRequest(any)).thenReturn(stream);
when(transport.onActiveStateChanged = captureAny).thenReturn(null);
when(stream.outgoingMessages).thenReturn(fromClient.sink);
when(stream.incomingMessages).thenReturn(toClient.stream);
client = new TestClient(channel);
@ -114,6 +142,13 @@ class ClientHarness {
if (closeStream) toClient.close();
}
void signalIdle() {
final ActiveStateHandler handler =
verify(transport.onActiveStateChanged = captureAny).captured.single;
expect(handler, isNotNull);
handler(false);
}
Future<Null> runTest(
{Future clientCall,
dynamic expectedResult,