From b38b1cc7a2bebc39f4b97826efa178a88dbac75d Mon Sep 17 00:00:00 2001 From: Jakob Andersen Date: Fri, 15 Dec 2017 09:30:56 +0100 Subject: [PATCH] Reorganize code for 0.2.0 release. (#41) Split the large client/server.dart files into smaller pieces. This is in preparation for splitting the HTTP/2 dependencies into a separate file and make it easier to implement other transports. --- CHANGELOG.md | 7 +- interop/pubspec.yaml | 2 +- lib/grpc.dart | 22 +- lib/src/client.dart | 723 ------------------- lib/src/client/call.dart | 284 ++++++++ lib/src/client/channel.dart | 69 ++ lib/src/client/client.dart | 25 + lib/src/{shared.dart => client/common.dart} | 63 +- lib/src/client/connection.dart | 253 +++++++ lib/src/client/method.dart | 12 + lib/src/client/options.dart | 129 ++++ lib/src/server/call.dart | 43 ++ lib/src/{server.dart => server/handler.dart} | 246 +------ lib/src/server/server.dart | 102 +++ lib/src/server/service.dart | 45 ++ lib/src/shared/security.dart | 15 + lib/src/{ => shared}/status.dart | 0 lib/src/{ => shared}/streams.dart | 0 lib/src/shared/timeout.dart | 52 ++ pubspec.yaml | 2 +- test/server_test.dart | 1 - test/src/client_utils.dart | 2 +- test/src/server_utils.dart | 2 +- test/src/utils.dart | 2 +- test/stream_test.dart | 3 +- test/timeout_test.dart | 2 - 26 files changed, 1099 insertions(+), 1007 deletions(-) delete mode 100644 lib/src/client.dart create mode 100644 lib/src/client/call.dart create mode 100644 lib/src/client/channel.dart create mode 100644 lib/src/client/client.dart rename lib/src/{shared.dart => client/common.dart} (52%) create mode 100644 lib/src/client/connection.dart create mode 100644 lib/src/client/method.dart create mode 100644 lib/src/client/options.dart create mode 100644 lib/src/server/call.dart rename lib/src/{server.dart => server/handler.dart} (54%) create mode 100644 lib/src/server/server.dart create mode 100644 lib/src/server/service.dart create mode 100644 lib/src/shared/security.dart rename lib/src/{ => shared}/status.dart (100%) rename lib/src/{ => shared}/streams.dart (100%) create mode 100644 lib/src/shared/timeout.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 1669fb0..6ec5f13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.2.0 - 2017-12-14 + +* Implemented support for per-RPC metadata providers. This can be used for + authentication providers which may need to obtain or refresh a token before + the RPC is sent. + ## 0.1.0 - 2017-10-12 * Core gRPC functionality is implemented and passes @@ -5,7 +11,6 @@ The API is shaping up, but may still change as more advanced features are implemented. - ## 0.0.1 - 2017-07-05 * Initial version. diff --git a/interop/pubspec.yaml b/interop/pubspec.yaml index 0e5e211..8a9ee5c 100644 --- a/interop/pubspec.yaml +++ b/interop/pubspec.yaml @@ -4,7 +4,7 @@ version: 0.0.1 homepage: https://github.com/dart-lang/grpc-dart environment: - sdk: '>=1.20.1 <2.0.0' + sdk: '>=1.24.3 <2.0.0' dependencies: args: ^0.13.0 diff --git a/lib/grpc.dart b/lib/grpc.dart index 60866e7..d93df87 100644 --- a/lib/grpc.dart +++ b/lib/grpc.dart @@ -2,8 +2,20 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -export 'src/client.dart'; -export 'src/server.dart'; -export 'src/shared.dart'; -export 'src/status.dart'; -export 'src/streams.dart'; +export 'src/client/call.dart'; +export 'src/client/channel.dart'; +export 'src/client/client.dart'; +export 'src/client/common.dart'; +export 'src/client/connection.dart'; +export 'src/client/method.dart'; +export 'src/client/options.dart'; + +export 'src/server/call.dart'; +export 'src/server/handler.dart'; +export 'src/server/server.dart'; +export 'src/server/service.dart'; + +export 'src/shared/security.dart'; +export 'src/shared/status.dart'; +export 'src/shared/streams.dart'; +export 'src/shared/timeout.dart'; diff --git a/lib/src/client.dart b/lib/src/client.dart deleted file mode 100644 index 6f7376e..0000000 --- a/lib/src/client.dart +++ /dev/null @@ -1,723 +0,0 @@ -// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -/// Dart gRPC client implementation. - -import 'dart:async'; -import 'dart:io'; - -import 'dart:math'; -import 'package:http2/transport.dart'; -import 'package:meta/meta.dart'; - -import 'shared.dart'; -import 'status.dart'; -import 'streams.dart'; - -const _reservedHeaders = const [ - 'content-type', - 'te', - 'grpc-timeout', - 'grpc-accept-encoding', - 'user-agent', -]; - -const defaultIdleTimeout = const Duration(minutes: 5); - -typedef Duration BackoffStrategy(Duration lastBackoff); - -// Backoff algorithm from https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md -const _minConnectTimeout = const Duration(seconds: 20); -const _initialBackoff = const Duration(seconds: 1); -const _maxBackoff = const Duration(seconds: 120); -const _multiplier = 1.6; -const _jitter = 0.2; -final _random = new Random(); - -Duration defaultBackoffStrategy(Duration lastBackoff) { - if (lastBackoff == null) return _initialBackoff; - final jitter = _random.nextDouble() * 2 * _jitter - _jitter; - final nextBackoff = lastBackoff * (_multiplier + jitter); - return nextBackoff < _maxBackoff ? nextBackoff : _maxBackoff; -} - -/// Options controlling how connections are made on a [ClientChannel]. -class ChannelOptions { - final bool isSecure; - final List _certificateBytes; - final String _certificatePassword; - final String authority; - final Duration idleTimeout; - final BackoffStrategy backoffStrategy; - - const ChannelOptions._( - this.isSecure, - this._certificateBytes, - this._certificatePassword, - this.authority, - Duration idleTimeout, - BackoffStrategy backoffStrategy) - : this.idleTimeout = idleTimeout ?? defaultIdleTimeout, - this.backoffStrategy = backoffStrategy ?? defaultBackoffStrategy; - - /// Disable TLS. RPCs are sent in clear text. - const ChannelOptions.insecure( - {Duration idleTimeout, - BackoffStrategy backoffStrategy = - defaultBackoffStrategy}) // Remove when dart-lang/sdk#31066 is fixed. - : this._(false, null, null, null, idleTimeout, backoffStrategy); - - /// Enable TLS and optionally specify the [certificate]s to trust. If - /// [certificates] is not provided, the default trust store is used. - const ChannelOptions.secure( - {List certificate, - String password, - String authority, - Duration idleTimeout, - BackoffStrategy backoffStrategy = - defaultBackoffStrategy}) // Remove when dart-lang/sdk#31066 is fixed. - : this._(true, certificate, password, authority, idleTimeout, - backoffStrategy); - - SecurityContext get securityContext { - if (!isSecure) return null; - if (_certificateBytes != null) { - return createSecurityContext(false) - ..setTrustedCertificatesBytes(_certificateBytes, - password: _certificatePassword); - } - final context = new SecurityContext(withTrustedRoots: true); - if (SecurityContext.alpnSupported) { - context.setAlpnProtocols(supportedAlpnProtocols, false); - } - return context; - } -} - -enum ConnectionState { - /// Actively trying to connect. - connecting, - - /// Connection successfully established. - ready, - - /// Some transient failure occurred, waiting to re-connect. - transientFailure, - - /// Not currently connected, and no pending RPCs. - idle, - - /// Shutting down, no further RPCs allowed. - shutdown -} - -/// A connection to a single RPC endpoint. -/// -/// RPCs made on a connection are always sent to the same endpoint. -class ClientConnection { - static final _methodPost = new Header.ascii(':method', 'POST'); - static final _schemeHttp = new Header.ascii(':scheme', 'http'); - static final _schemeHttps = new Header.ascii(':scheme', 'https'); - static final _contentTypeGrpc = - new Header.ascii('content-type', 'application/grpc'); - static final _teTrailers = new Header.ascii('te', 'trailers'); - static final _grpcAcceptEncoding = - new Header.ascii('grpc-accept-encoding', 'identity'); - static final _userAgent = new Header.ascii('user-agent', 'dart-grpc/0.2.0'); - - final String host; - final int port; - final ChannelOptions options; - - ConnectionState _state = ConnectionState.idle; - void Function(ClientConnection connection) onStateChanged; - final _pendingCalls = []; - - ClientTransportConnection _transport; - - /// Used for idle and reconnect timeout, depending on [_state]. - Timer _timer; - Duration _currentReconnectDelay; - - ClientConnection(this.host, this.port, this.options); - - ConnectionState get state => _state; - - static List
createCallHeaders(bool useTls, String authority, - String path, Duration timeout, Map metadata) { - final headers = [ - _methodPost, - useTls ? _schemeHttps : _schemeHttp, - new Header.ascii(':path', path), - new Header.ascii(':authority', authority), - ]; - if (timeout != null) { - headers.add(new Header.ascii('grpc-timeout', toTimeoutString(timeout))); - } - headers.addAll([ - _contentTypeGrpc, - _teTrailers, - _grpcAcceptEncoding, - _userAgent, - ]); - metadata?.forEach((key, value) { - headers.add(new Header.ascii(key, value)); - }); - return headers; - } - - String get authority => options.authority ?? host; - - @visibleForTesting - Future connectTransport() async { - final securityContext = options.securityContext; - - var socket = await Socket.connect(host, port); - if (_state == ConnectionState.shutdown) { - socket.destroy(); - throw 'Shutting down'; - } - if (securityContext != null) { - socket = await SecureSocket.secure(socket, - host: authority, context: securityContext); - if (_state == ConnectionState.shutdown) { - socket.destroy(); - throw 'Shutting down'; - } - } - socket.done.then(_handleSocketClosed); - return new ClientTransportConnection.viaSocket(socket); - } - - void _connect() { - if (_state != ConnectionState.idle && - _state != ConnectionState.transientFailure) { - return; - } - _setState(ConnectionState.connecting); - connectTransport().then((transport) { - _currentReconnectDelay = null; - _transport = transport; - _transport.onActiveStateChanged = _handleActiveStateChanged; - _setState(ConnectionState.ready); - _pendingCalls.forEach(_startCall); - _pendingCalls.clear(); - }).catchError(_handleTransientFailure); - } - - void dispatchCall(ClientCall call) { - switch (_state) { - case ConnectionState.ready: - _startCall(call); - break; - case ConnectionState.shutdown: - _shutdownCall(call); - break; - default: - _pendingCalls.add(call); - if (_state == ConnectionState.idle) { - _connect(); - } - } - } - - ClientTransportStream makeRequest( - String path, Duration timeout, Map metadata) { - final headers = - createCallHeaders(options.isSecure, authority, path, timeout, metadata); - return _transport.makeRequest(headers); - } - - void _startCall(ClientCall call) { - if (call._isCancelled) return; - call._onConnectionReady(this); - } - - void _shutdownCall(ClientCall call) { - if (call._isCancelled) return; - call._onConnectionError( - new GrpcError.unavailable('Connection shutting down.')); - } - - /// Shuts down this connection. - /// - /// No further calls may be made on this connection, but existing calls - /// are allowed to finish. - Future shutdown() { - if (_state == ConnectionState.shutdown) return new Future.value(); - _setShutdownState(); - return _transport?.finish() ?? new Future.value(); - } - - /// Terminates this connection. - /// - /// All open calls are terminated immediately, and no further calls may be - /// made on this connection. - Future terminate() { - _setShutdownState(); - return _transport?.terminate() ?? new Future.value(); - } - - void _setShutdownState() { - _setState(ConnectionState.shutdown); - _cancelTimer(); - _pendingCalls.forEach(_shutdownCall); - _pendingCalls.clear(); - } - - void _setState(ConnectionState state) { - _state = state; - if (onStateChanged != null) { - onStateChanged(this); - } - } - - void _handleIdleTimeout() { - if (_timer == null || _state != ConnectionState.ready) return; - _cancelTimer(); - _transport?.finish()?.catchError((_) => {}); // TODO(jakobr): Log error. - _transport = null; - _setState(ConnectionState.idle); - } - - void _cancelTimer() { - _timer?.cancel(); - _timer = null; - } - - void _handleActiveStateChanged(bool isActive) { - if (isActive) { - _cancelTimer(); - } else { - if (options.idleTimeout != null) { - _timer ??= new Timer(options.idleTimeout, _handleIdleTimeout); - } - } - } - - bool _hasPendingCalls() { - // Get rid of pending calls that have timed out. - _pendingCalls.removeWhere((call) => call._isCancelled); - return _pendingCalls.isNotEmpty; - } - - void _handleTransientFailure(error) { - _transport = null; - if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) { - return; - } - // TODO(jakobr): Log error. - _cancelTimer(); - if (!_hasPendingCalls()) { - _setState(ConnectionState.idle); - return; - } - _setState(ConnectionState.transientFailure); - _currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay); - _timer = new Timer(_currentReconnectDelay, _handleReconnect); - } - - void _handleReconnect() { - if (_timer == null || _state != ConnectionState.transientFailure) return; - _cancelTimer(); - _connect(); - } - - void _handleSocketClosed(Socket _) { - _cancelTimer(); - if (_state != ConnectionState.idle && _state != ConnectionState.shutdown) { - // We were not planning to close the socket. - _handleTransientFailure('Socket closed'); - } - } -} - -/// A channel to a virtual RPC endpoint. -/// -/// For each RPC, the channel picks a [ClientConnection] to dispatch the call. -/// RPCs on the same channel may be sent to different connections, depending on -/// load balancing settings. -class ClientChannel { - final String host; - final int port; - final ChannelOptions options; - - // TODO(jakobr): Multiple connections, load balancing. - ClientConnection _connection; - - bool _isShutdown = false; - - ClientChannel(this.host, - {this.port = 443, this.options = const ChannelOptions.secure()}); - - /// Shuts down this channel. - /// - /// No further RPCs can be made on this channel. RPCs already in progress will - /// be allowed to complete. - Future shutdown() { - if (_isShutdown) return new Future.value(); - _isShutdown = true; - return _connection.shutdown(); - } - - /// Terminates this channel. - /// - /// RPCs already in progress will be terminated. No further RPCs can be made - /// on this channel. - Future terminate() { - _isShutdown = true; - return _connection.terminate(); - } - - /// Returns a connection to this [Channel]'s RPC endpoint. - /// - /// The connection may be shared between multiple RPCs. - Future getConnection() async { - if (_isShutdown) throw new GrpcError.unavailable('Channel shutting down.'); - return _connection ??= new ClientConnection(host, port, options); - } - - /// Initiates a new RPC on this connection. - ClientCall createCall( - ClientMethod method, Stream requests, CallOptions options) { - final call = new ClientCall(method, requests, options); - getConnection().then((connection) { - if (call._isCancelled) return; - connection.dispatchCall(call); - }, onError: call._onConnectionError); - return call; - } -} - -/// Description of a gRPC method. -class ClientMethod { - final String path; - final List Function(Q value) requestSerializer; - final R Function(List value) responseDeserializer; - - ClientMethod(this.path, this.requestSerializer, this.responseDeserializer); -} - -/// Provides per-RPC metadata. -/// -/// Metadata providers will be invoked for every RPC, and can add their own -/// metadata to the RPC. If the function returns a [Future], the RPC will await -/// completion of the returned [Future] before transmitting the request. -/// -/// The metadata provider is given the current metadata map (possibly modified -/// by previous metadata providers), and is expected to modify the map before -/// returning or before completing the returned [Future]. -typedef FutureOr MetadataProvider(Map metadata); - -/// Runtime options for an RPC. -class CallOptions { - final Map metadata; - final Duration timeout; - final List metadataProviders; - - CallOptions._(this.metadata, this.timeout, this.metadataProviders); - - /// Creates a [CallOptions] object. - /// - /// [CallOptions] can specify static [metadata], set the [timeout], and - /// configure per-RPC metadata [providers]. The metadata [providers] are - /// invoked in order for every RPC, and can modify the outgoing metadata - /// (including metadata provided by previous providers). - factory CallOptions( - {Map metadata, - Duration timeout, - List providers}) { - return new CallOptions._(new Map.unmodifiable(metadata ?? {}), timeout, - new List.unmodifiable(providers ?? [])); - } - - factory CallOptions.from(Iterable options) => - options.fold(new CallOptions(), (p, o) => p.mergedWith(o)); - - CallOptions mergedWith(CallOptions other) { - if (other == null) return this; - final mergedMetadata = new Map.from(metadata)..addAll(other.metadata); - final mergedTimeout = other.timeout ?? timeout; - final mergedProviders = new List.from(metadataProviders) - ..addAll(other.metadataProviders); - return new CallOptions._(new Map.unmodifiable(mergedMetadata), - mergedTimeout, new List.unmodifiable(mergedProviders)); - } -} - -/// Base class for client stubs. -class Client { - final ClientChannel _channel; - final CallOptions _options; - - Client(this._channel, {CallOptions options}) - : _options = options ?? new CallOptions(); - - ClientCall $createCall( - ClientMethod method, Stream requests, - {CallOptions options}) { - return _channel.createCall(method, requests, _options.mergedWith(options)); - } -} - -/// An active call to a gRPC endpoint. -class ClientCall implements Response { - final ClientMethod _method; - final Stream _requests; - final CallOptions options; - - final _headers = new Completer>(); - final _trailers = new Completer>(); - bool _hasReceivedResponses = false; - - Map _headerMetadata; - - TransportStream _stream; - StreamController _responses; - StreamSubscription _requestSubscription; - StreamSubscription _responseSubscription; - - bool _isCancelled = false; - Timer _timeoutTimer; - - ClientCall(this._method, this._requests, this.options) { - _responses = new StreamController(onListen: _onResponseListen); - if (options.timeout != null) { - _timeoutTimer = new Timer(options.timeout, _onTimedOut); - } - } - - String get path => _method.path; - - void _onConnectionError(error) { - _terminateWithError(new GrpcError.unavailable('Error connecting: $error')); - } - - void _terminateWithError(GrpcError error) { - if (!_responses.isClosed) { - _responses.addError(error); - } - _safeTerminate(); - } - - static Map _sanitizeMetadata(Map metadata) { - final sanitizedMetadata = {}; - metadata.forEach((String key, String value) { - final lowerCaseKey = key.toLowerCase(); - if (!lowerCaseKey.startsWith(':') && - !_reservedHeaders.contains(lowerCaseKey)) { - sanitizedMetadata[lowerCaseKey] = value; - } - }); - return sanitizedMetadata; - } - - void _onConnectionReady(ClientConnection connection) { - if (_isCancelled) return; - - if (options.metadataProviders.isEmpty) { - _sendRequest(connection, _sanitizeMetadata(options.metadata)); - } else { - final metadata = new Map.from(options.metadata); - Future - .forEach(options.metadataProviders, (provider) => provider(metadata)) - .then((_) => _sendRequest(connection, _sanitizeMetadata(metadata))) - .catchError(_onMetadataProviderError); - } - } - - void _onMetadataProviderError(error) { - _terminateWithError(new GrpcError.internal('Error making call: $error')); - } - - void _sendRequest(ClientConnection connection, Map metadata) { - _stream = connection.makeRequest(path, options.timeout, metadata); - _requestSubscription = _requests - .map(_method.requestSerializer) - .map(GrpcHttpEncoder.frame) - .map((bytes) => new DataStreamMessage(bytes)) - .handleError(_onRequestError) - .listen(_stream.outgoingMessages.add, - onError: _stream.outgoingMessages.addError, - onDone: _stream.outgoingMessages.close, - cancelOnError: true); - // The response stream might have been listened to before _stream was ready, - // so try setting up the subscription here as well. - _onResponseListen(); - } - - void _onTimedOut() { - _responses.addError(new GrpcError.deadlineExceeded('Deadline exceeded')); - _safeTerminate(); - } - - /// Subscribe to incoming response messages, once [_stream] is available, and - /// the caller has subscribed to the [_responses] stream. - void _onResponseListen() { - if (_stream != null && - _responses.hasListener && - _responseSubscription == null) { - _responseSubscription = _stream.incomingMessages - .transform(new GrpcHttpDecoder()) - .transform(grpcDecompressor()) - .listen(_onResponseData, - onError: _onResponseError, - onDone: _onResponseDone, - cancelOnError: true); - if (_responses.isPaused) { - _responseSubscription.pause(); - } - _responses.onPause = _responseSubscription.pause; - _responses.onResume = _responseSubscription.resume; - _responses.onCancel = _responseSubscription.cancel; - } - } - - /// Emit an error response to the user, and tear down this call. - void _responseError(GrpcError error) { - _responses.addError(error); - _timeoutTimer?.cancel(); - _requestSubscription?.cancel(); - _responseSubscription.cancel(); - _responses.close(); - _stream.terminate(); - } - - /// Data handler for responses coming from the server. Handles header/trailer - /// metadata, and forwards response objects to [_responses]. - void _onResponseData(GrpcMessage data) { - if (data is GrpcData) { - if (!_headers.isCompleted) { - _responseError( - new GrpcError.unimplemented('Received data before headers')); - return; - } - if (_trailers.isCompleted) { - _responseError( - new GrpcError.unimplemented('Received data after trailers')); - return; - } - _responses.add(_method.responseDeserializer(data.data)); - _hasReceivedResponses = true; - } else if (data is GrpcMetadata) { - if (!_headers.isCompleted) { - // TODO(jakobr): Parse, and extract common headers. - _headerMetadata = data.metadata; - _headers.complete(_headerMetadata); - return; - } - if (_trailers.isCompleted) { - _responseError( - new GrpcError.unimplemented('Received multiple trailers')); - return; - } - final metadata = data.metadata; - _trailers.complete(metadata); - // TODO(jakobr): Parse more! - if (metadata.containsKey('grpc-status')) { - final status = int.parse(metadata['grpc-status']); - final message = metadata['grpc-message']; - if (status != 0) { - _responseError(new GrpcError.custom(status, message)); - } - } - } else { - _responseError(new GrpcError.unimplemented('Unexpected frame received')); - } - } - - /// Handler for response errors. Forward the error to the [_responses] stream, - /// wrapped if necessary. - void _onResponseError(error) { - if (error is GrpcError) { - _responseError(error); - return; - } - _responseError(new GrpcError.unknown(error.toString())); - } - - /// Handles closure of the response stream. Verifies that server has sent - /// response messages and header/trailer metadata, as necessary. - void _onResponseDone() { - if (!_headers.isCompleted) { - _responseError(new GrpcError.unavailable('Did not receive anything')); - return; - } - if (!_trailers.isCompleted) { - if (_hasReceivedResponses) { - // Trailers are required after receiving data. - _responseError(new GrpcError.unavailable('Missing trailers')); - return; - } - - // Only received a header frame and no data frames, so the header - // should contain "trailers" as well (Trailers-Only). - _trailers.complete(_headerMetadata); - final status = _headerMetadata['grpc-status']; - // If status code is missing, we must treat it as '0'. As in 'success'. - final statusCode = status != null ? int.parse(status) : 0; - if (statusCode != 0) { - final message = _headerMetadata['grpc-message']; - _responseError(new GrpcError.custom(statusCode, message)); - } - } - _timeoutTimer?.cancel(); - _responses.close(); - _responseSubscription.cancel(); - } - - /// Error handler for the requests stream. Something went wrong while trying - /// to send the request to the server. Abort the request, and forward the - /// error to the user code on the [_responses] stream. - void _onRequestError(error) { - if (error is! GrpcError) { - error = new GrpcError.unknown(error.toString()); - } - - _responses.addError(error); - _timeoutTimer?.cancel(); - _responses.close(); - _requestSubscription?.cancel(); - _responseSubscription?.cancel(); - _stream.terminate(); - } - - Stream get response => _responses.stream; - - @override - Future> get headers => _headers.future; - - @override - Future> get trailers => _trailers.future; - - @override - Future cancel() { - if (!_responses.isClosed) { - _responses.addError(new GrpcError.cancelled('Cancelled by client.')); - } - return _terminate(); - } - - Future _terminate() async { - _isCancelled = true; - _timeoutTimer?.cancel(); - // Don't await _responses.close() here. It'll only complete once the done - // event has been delivered, and it's the caller of this function that is - // reading from responses as well, so we might end up deadlocked. - _responses.close(); - _stream?.terminate(); - final futures = []; - if (_requestSubscription != null) { - futures.add(_requestSubscription.cancel()); - } - if (_responseSubscription != null) { - futures.add(_responseSubscription.cancel()); - } - await Future.wait(futures); - } - - Future _safeTerminate() { - return _terminate().catchError((_) {}); - } -} diff --git a/lib/src/client/call.dart b/lib/src/client/call.dart new file mode 100644 index 0000000..c1d58f3 --- /dev/null +++ b/lib/src/client/call.dart @@ -0,0 +1,284 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:http2/transport.dart'; + +import '../shared/status.dart'; +import '../shared/streams.dart'; + +import 'common.dart'; +import 'connection.dart'; +import 'method.dart'; +import 'options.dart'; + +const _reservedHeaders = const [ + 'content-type', + 'te', + 'grpc-timeout', + 'grpc-accept-encoding', + 'user-agent', +]; + +/// An active call to a gRPC endpoint. +class ClientCall implements Response { + final ClientMethod _method; + final Stream _requests; + final CallOptions options; + + final _headers = new Completer>(); + final _trailers = new Completer>(); + bool _hasReceivedResponses = false; + + Map _headerMetadata; + + TransportStream _stream; + StreamController _responses; + StreamSubscription _requestSubscription; + StreamSubscription _responseSubscription; + + bool isCancelled = false; + Timer _timeoutTimer; + + ClientCall(this._method, this._requests, this.options) { + _responses = new StreamController(onListen: _onResponseListen); + if (options.timeout != null) { + _timeoutTimer = new Timer(options.timeout, _onTimedOut); + } + } + + String get path => _method.path; + + void onConnectionError(error) { + _terminateWithError(new GrpcError.unavailable('Error connecting: $error')); + } + + void _terminateWithError(GrpcError error) { + if (!_responses.isClosed) { + _responses.addError(error); + } + _safeTerminate(); + } + + static Map _sanitizeMetadata(Map metadata) { + final sanitizedMetadata = {}; + metadata.forEach((String key, String value) { + final lowerCaseKey = key.toLowerCase(); + if (!lowerCaseKey.startsWith(':') && + !_reservedHeaders.contains(lowerCaseKey)) { + sanitizedMetadata[lowerCaseKey] = value; + } + }); + return sanitizedMetadata; + } + + void onConnectionReady(ClientConnection connection) { + if (isCancelled) return; + + if (options.metadataProviders.isEmpty) { + _sendRequest(connection, _sanitizeMetadata(options.metadata)); + } else { + final metadata = new Map.from(options.metadata); + Future + .forEach(options.metadataProviders, (provider) => provider(metadata)) + .then((_) => _sendRequest(connection, _sanitizeMetadata(metadata))) + .catchError(_onMetadataProviderError); + } + } + + void _onMetadataProviderError(error) { + _terminateWithError(new GrpcError.internal('Error making call: $error')); + } + + void _sendRequest(ClientConnection connection, Map metadata) { + _stream = connection.makeRequest(path, options.timeout, metadata); + _requestSubscription = _requests + .map(_method.requestSerializer) + .map(GrpcHttpEncoder.frame) + .map((bytes) => new DataStreamMessage(bytes)) + .handleError(_onRequestError) + .listen(_stream.outgoingMessages.add, + onError: _stream.outgoingMessages.addError, + onDone: _stream.outgoingMessages.close, + cancelOnError: true); + // The response stream might have been listened to before _stream was ready, + // so try setting up the subscription here as well. + _onResponseListen(); + } + + void _onTimedOut() { + _responses.addError(new GrpcError.deadlineExceeded('Deadline exceeded')); + _safeTerminate(); + } + + /// Subscribe to incoming response messages, once [_stream] is available, and + /// the caller has subscribed to the [_responses] stream. + void _onResponseListen() { + if (_stream != null && + _responses.hasListener && + _responseSubscription == null) { + _responseSubscription = _stream.incomingMessages + .transform(new GrpcHttpDecoder()) + .transform(grpcDecompressor()) + .listen(_onResponseData, + onError: _onResponseError, + onDone: _onResponseDone, + cancelOnError: true); + if (_responses.isPaused) { + _responseSubscription.pause(); + } + _responses.onPause = _responseSubscription.pause; + _responses.onResume = _responseSubscription.resume; + _responses.onCancel = _responseSubscription.cancel; + } + } + + /// Emit an error response to the user, and tear down this call. + void _responseError(GrpcError error) { + _responses.addError(error); + _timeoutTimer?.cancel(); + _requestSubscription?.cancel(); + _responseSubscription.cancel(); + _responses.close(); + _stream.terminate(); + } + + /// Data handler for responses coming from the server. Handles header/trailer + /// metadata, and forwards response objects to [_responses]. + void _onResponseData(GrpcMessage data) { + if (data is GrpcData) { + if (!_headers.isCompleted) { + _responseError( + new GrpcError.unimplemented('Received data before headers')); + return; + } + if (_trailers.isCompleted) { + _responseError( + new GrpcError.unimplemented('Received data after trailers')); + return; + } + _responses.add(_method.responseDeserializer(data.data)); + _hasReceivedResponses = true; + } else if (data is GrpcMetadata) { + if (!_headers.isCompleted) { + // TODO(jakobr): Parse, and extract common headers. + _headerMetadata = data.metadata; + _headers.complete(_headerMetadata); + return; + } + if (_trailers.isCompleted) { + _responseError( + new GrpcError.unimplemented('Received multiple trailers')); + return; + } + final metadata = data.metadata; + _trailers.complete(metadata); + // TODO(jakobr): Parse more! + if (metadata.containsKey('grpc-status')) { + final status = int.parse(metadata['grpc-status']); + final message = metadata['grpc-message']; + if (status != 0) { + _responseError(new GrpcError.custom(status, message)); + } + } + } else { + _responseError(new GrpcError.unimplemented('Unexpected frame received')); + } + } + + /// Handler for response errors. Forward the error to the [_responses] stream, + /// wrapped if necessary. + void _onResponseError(error) { + if (error is GrpcError) { + _responseError(error); + return; + } + _responseError(new GrpcError.unknown(error.toString())); + } + + /// Handles closure of the response stream. Verifies that server has sent + /// response messages and header/trailer metadata, as necessary. + void _onResponseDone() { + if (!_headers.isCompleted) { + _responseError(new GrpcError.unavailable('Did not receive anything')); + return; + } + if (!_trailers.isCompleted) { + if (_hasReceivedResponses) { + // Trailers are required after receiving data. + _responseError(new GrpcError.unavailable('Missing trailers')); + return; + } + + // Only received a header frame and no data frames, so the header + // should contain "trailers" as well (Trailers-Only). + _trailers.complete(_headerMetadata); + final status = _headerMetadata['grpc-status']; + // If status code is missing, we must treat it as '0'. As in 'success'. + final statusCode = status != null ? int.parse(status) : 0; + if (statusCode != 0) { + final message = _headerMetadata['grpc-message']; + _responseError(new GrpcError.custom(statusCode, message)); + } + } + _timeoutTimer?.cancel(); + _responses.close(); + _responseSubscription.cancel(); + } + + /// Error handler for the requests stream. Something went wrong while trying + /// to send the request to the server. Abort the request, and forward the + /// error to the user code on the [_responses] stream. + void _onRequestError(error) { + if (error is! GrpcError) { + error = new GrpcError.unknown(error.toString()); + } + + _responses.addError(error); + _timeoutTimer?.cancel(); + _responses.close(); + _requestSubscription?.cancel(); + _responseSubscription?.cancel(); + _stream.terminate(); + } + + Stream get response => _responses.stream; + + @override + Future> get headers => _headers.future; + + @override + Future> get trailers => _trailers.future; + + @override + Future cancel() { + if (!_responses.isClosed) { + _responses.addError(new GrpcError.cancelled('Cancelled by client.')); + } + return _terminate(); + } + + Future _terminate() async { + isCancelled = true; + _timeoutTimer?.cancel(); + // Don't await _responses.close() here. It'll only complete once the done + // event has been delivered, and it's the caller of this function that is + // reading from responses as well, so we might end up deadlocked. + _responses.close(); + _stream?.terminate(); + final futures = []; + if (_requestSubscription != null) { + futures.add(_requestSubscription.cancel()); + } + if (_responseSubscription != null) { + futures.add(_responseSubscription.cancel()); + } + await Future.wait(futures); + } + + Future _safeTerminate() { + return _terminate().catchError((_) {}); + } +} diff --git a/lib/src/client/channel.dart b/lib/src/client/channel.dart new file mode 100644 index 0000000..c51439c --- /dev/null +++ b/lib/src/client/channel.dart @@ -0,0 +1,69 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import '../shared/status.dart'; + +import 'call.dart'; +import 'connection.dart'; +import 'method.dart'; +import 'options.dart'; + +/// A channel to a virtual RPC endpoint. +/// +/// For each RPC, the channel picks a [ClientConnection] to dispatch the call. +/// RPCs on the same channel may be sent to different connections, depending on +/// load balancing settings. +class ClientChannel { + final String host; + final int port; + final ChannelOptions options; + + // TODO(jakobr): Multiple connections, load balancing. + ClientConnection _connection; + + bool _isShutdown = false; + + ClientChannel(this.host, + {this.port = 443, this.options = const ChannelOptions.secure()}); + + /// Shuts down this channel. + /// + /// No further RPCs can be made on this channel. RPCs already in progress will + /// be allowed to complete. + Future shutdown() { + if (_isShutdown) return new Future.value(); + _isShutdown = true; + return _connection.shutdown(); + } + + /// Terminates this channel. + /// + /// RPCs already in progress will be terminated. No further RPCs can be made + /// on this channel. + Future terminate() { + _isShutdown = true; + return _connection.terminate(); + } + + /// Returns a connection to this [Channel]'s RPC endpoint. + /// + /// The connection may be shared between multiple RPCs. + Future getConnection() async { + if (_isShutdown) throw new GrpcError.unavailable('Channel shutting down.'); + return _connection ??= new ClientConnection(host, port, options); + } + + /// Initiates a new RPC on this connection. + ClientCall createCall( + ClientMethod method, Stream requests, CallOptions options) { + final call = new ClientCall(method, requests, options); + getConnection().then((connection) { + if (call.isCancelled) return; + connection.dispatchCall(call); + }, onError: call.onConnectionError); + return call; + } +} diff --git a/lib/src/client/client.dart b/lib/src/client/client.dart new file mode 100644 index 0000000..bfe2414 --- /dev/null +++ b/lib/src/client/client.dart @@ -0,0 +1,25 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'call.dart'; +import 'channel.dart'; +import 'method.dart'; +import 'options.dart'; + +/// Base class for client stubs. +class Client { + final ClientChannel _channel; + final CallOptions _options; + + Client(this._channel, {CallOptions options}) + : _options = options ?? new CallOptions(); + + ClientCall $createCall( + ClientMethod method, Stream requests, + {CallOptions options}) { + return _channel.createCall(method, requests, _options.mergedWith(options)); + } +} diff --git a/lib/src/shared.dart b/lib/src/client/common.dart similarity index 52% rename from lib/src/shared.dart rename to lib/src/client/common.dart index c8b6ee0..d47b92b 100644 --- a/lib/src/shared.dart +++ b/lib/src/client/common.dart @@ -3,60 +3,11 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; -import 'dart:io'; import 'package:async/async.dart'; -import 'package:grpc/src/client.dart'; -import 'package:grpc/src/status.dart'; -/// Convert [timeout] to grpc-timeout header string format. -// Mostly inspired by grpc-java implementation. -// TODO(jakobr): Modify to match grpc/core implementation instead. -String toTimeoutString(Duration duration) { - if (duration == null) return null; - const cutoff = 100000; - final timeout = duration.inMicroseconds; - if (timeout < 0) { - // Smallest possible timeout. - return '1n'; - } else if (timeout < cutoff) { - return '${timeout}u'; - } else if (timeout < cutoff * 1000) { - return '${timeout~/1000}m'; - } else if (timeout < cutoff * 1000 * 1000) { - return '${timeout~/1000000}S'; - } else if (timeout < cutoff * 1000 * 1000 * 60) { - return '${timeout~/60000000}M'; - } else { - return '${timeout~/3600000000}H'; - } -} - -/// Convert [timeout] from grpc-timeout header string format to [Duration]. -/// Returns [null] if [timeout] is not correctly formatted. -Duration fromTimeoutString(String timeout) { - if (timeout == null) return null; - if (timeout.length < 2) return null; - final value = - int.parse(timeout.substring(0, timeout.length - 1), onError: (_) => null); - if (value == null) return null; - switch (timeout[timeout.length - 1]) { - case 'n': - return new Duration(microseconds: value * 1000); - case 'u': - return new Duration(microseconds: value); - case 'm': - return new Duration(milliseconds: value); - case 'S': - return new Duration(seconds: value); - case 'M': - return new Duration(minutes: value); - case 'H': - return new Duration(hours: value); - default: - return null; - } -} +import '../shared/status.dart'; +import 'call.dart'; /// A gRPC response. abstract class Response { @@ -123,13 +74,3 @@ abstract class _ResponseMixin implements Response { @override Future cancel() => _call.cancel(); } - -const supportedAlpnProtocols = const ['grpc-exp', 'h2']; - -// TODO: Simplify once we have a stable Dart 1.25 release (update pubspec to -// require SDK >=1.25.0, and remove check for alpnSupported). -SecurityContext createSecurityContext(bool isServer) => - SecurityContext.alpnSupported - ? (new SecurityContext() - ..setAlpnProtocols(supportedAlpnProtocols, isServer)) - : new SecurityContext(); diff --git a/lib/src/client/connection.dart b/lib/src/client/connection.dart new file mode 100644 index 0000000..98df27f --- /dev/null +++ b/lib/src/client/connection.dart @@ -0,0 +1,253 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:http2/transport.dart'; +import 'package:meta/meta.dart'; + +import '../shared/status.dart'; +import '../shared/timeout.dart'; + +import 'call.dart'; +import 'options.dart'; + +enum ConnectionState { + /// Actively trying to connect. + connecting, + + /// Connection successfully established. + ready, + + /// Some transient failure occurred, waiting to re-connect. + transientFailure, + + /// Not currently connected, and no pending RPCs. + idle, + + /// Shutting down, no further RPCs allowed. + shutdown +} + +/// A connection to a single RPC endpoint. +/// +/// RPCs made on a connection are always sent to the same endpoint. +class ClientConnection { + static final _methodPost = new Header.ascii(':method', 'POST'); + static final _schemeHttp = new Header.ascii(':scheme', 'http'); + static final _schemeHttps = new Header.ascii(':scheme', 'https'); + static final _contentTypeGrpc = + new Header.ascii('content-type', 'application/grpc'); + static final _teTrailers = new Header.ascii('te', 'trailers'); + static final _grpcAcceptEncoding = + new Header.ascii('grpc-accept-encoding', 'identity'); + static final _userAgent = new Header.ascii('user-agent', 'dart-grpc/0.2.0'); + + final String host; + final int port; + final ChannelOptions options; + + ConnectionState _state = ConnectionState.idle; + void Function(ClientConnection connection) onStateChanged; + final _pendingCalls = []; + + ClientTransportConnection _transport; + + /// Used for idle and reconnect timeout, depending on [_state]. + Timer _timer; + Duration _currentReconnectDelay; + + ClientConnection(this.host, this.port, this.options); + + ConnectionState get state => _state; + + static List
createCallHeaders(bool useTls, String authority, + String path, Duration timeout, Map metadata) { + final headers = [ + _methodPost, + useTls ? _schemeHttps : _schemeHttp, + new Header.ascii(':path', path), + new Header.ascii(':authority', authority), + ]; + if (timeout != null) { + headers.add(new Header.ascii('grpc-timeout', toTimeoutString(timeout))); + } + headers.addAll([ + _contentTypeGrpc, + _teTrailers, + _grpcAcceptEncoding, + _userAgent, + ]); + metadata?.forEach((key, value) { + headers.add(new Header.ascii(key, value)); + }); + return headers; + } + + String get authority => options.authority ?? host; + + @visibleForTesting + Future connectTransport() async { + final securityContext = options.securityContext; + + var socket = await Socket.connect(host, port); + if (_state == ConnectionState.shutdown) { + socket.destroy(); + throw 'Shutting down'; + } + if (securityContext != null) { + socket = await SecureSocket.secure(socket, + host: authority, context: securityContext); + if (_state == ConnectionState.shutdown) { + socket.destroy(); + throw 'Shutting down'; + } + } + socket.done.then(_handleSocketClosed); + return new ClientTransportConnection.viaSocket(socket); + } + + void _connect() { + if (_state != ConnectionState.idle && + _state != ConnectionState.transientFailure) { + return; + } + _setState(ConnectionState.connecting); + connectTransport().then((transport) { + _currentReconnectDelay = null; + _transport = transport; + _transport.onActiveStateChanged = _handleActiveStateChanged; + _setState(ConnectionState.ready); + _pendingCalls.forEach(_startCall); + _pendingCalls.clear(); + }).catchError(_handleTransientFailure); + } + + void dispatchCall(ClientCall call) { + switch (_state) { + case ConnectionState.ready: + _startCall(call); + break; + case ConnectionState.shutdown: + _shutdownCall(call); + break; + default: + _pendingCalls.add(call); + if (_state == ConnectionState.idle) { + _connect(); + } + } + } + + ClientTransportStream makeRequest( + String path, Duration timeout, Map metadata) { + final headers = + createCallHeaders(options.isSecure, authority, path, timeout, metadata); + return _transport.makeRequest(headers); + } + + void _startCall(ClientCall call) { + if (call.isCancelled) return; + call.onConnectionReady(this); + } + + void _shutdownCall(ClientCall call) { + if (call.isCancelled) return; + call.onConnectionError( + new GrpcError.unavailable('Connection shutting down.')); + } + + /// Shuts down this connection. + /// + /// No further calls may be made on this connection, but existing calls + /// are allowed to finish. + Future shutdown() { + if (_state == ConnectionState.shutdown) return new Future.value(); + _setShutdownState(); + return _transport?.finish() ?? new Future.value(); + } + + /// Terminates this connection. + /// + /// All open calls are terminated immediately, and no further calls may be + /// made on this connection. + Future terminate() { + _setShutdownState(); + return _transport?.terminate() ?? new Future.value(); + } + + void _setShutdownState() { + _setState(ConnectionState.shutdown); + _cancelTimer(); + _pendingCalls.forEach(_shutdownCall); + _pendingCalls.clear(); + } + + void _setState(ConnectionState state) { + _state = state; + if (onStateChanged != null) { + onStateChanged(this); + } + } + + void _handleIdleTimeout() { + if (_timer == null || _state != ConnectionState.ready) return; + _cancelTimer(); + _transport?.finish()?.catchError((_) => {}); // TODO(jakobr): Log error. + _transport = null; + _setState(ConnectionState.idle); + } + + void _cancelTimer() { + _timer?.cancel(); + _timer = null; + } + + void _handleActiveStateChanged(bool isActive) { + if (isActive) { + _cancelTimer(); + } else { + if (options.idleTimeout != null) { + _timer ??= new Timer(options.idleTimeout, _handleIdleTimeout); + } + } + } + + bool _hasPendingCalls() { + // Get rid of pending calls that have timed out. + _pendingCalls.removeWhere((call) => call.isCancelled); + return _pendingCalls.isNotEmpty; + } + + void _handleTransientFailure(error) { + _transport = null; + if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) { + return; + } + // TODO(jakobr): Log error. + _cancelTimer(); + if (!_hasPendingCalls()) { + _setState(ConnectionState.idle); + return; + } + _setState(ConnectionState.transientFailure); + _currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay); + _timer = new Timer(_currentReconnectDelay, _handleReconnect); + } + + void _handleReconnect() { + if (_timer == null || _state != ConnectionState.transientFailure) return; + _cancelTimer(); + _connect(); + } + + void _handleSocketClosed(Socket _) { + _cancelTimer(); + if (_state != ConnectionState.idle && _state != ConnectionState.shutdown) { + // We were not planning to close the socket. + _handleTransientFailure('Socket closed'); + } + } +} diff --git a/lib/src/client/method.dart b/lib/src/client/method.dart new file mode 100644 index 0000000..5dc1ce5 --- /dev/null +++ b/lib/src/client/method.dart @@ -0,0 +1,12 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +/// Description of a gRPC method. +class ClientMethod { + final String path; + final List Function(Q value) requestSerializer; + final R Function(List value) responseDeserializer; + + ClientMethod(this.path, this.requestSerializer, this.responseDeserializer); +} diff --git a/lib/src/client/options.dart b/lib/src/client/options.dart new file mode 100644 index 0000000..65a76c2 --- /dev/null +++ b/lib/src/client/options.dart @@ -0,0 +1,129 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'dart:math'; + +import '../shared/security.dart'; + +const defaultIdleTimeout = const Duration(minutes: 5); + +typedef Duration BackoffStrategy(Duration lastBackoff); + +// Backoff algorithm from https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md +const _minConnectTimeout = const Duration(seconds: 20); +const _initialBackoff = const Duration(seconds: 1); +const _maxBackoff = const Duration(seconds: 120); +const _multiplier = 1.6; +const _jitter = 0.2; +final _random = new Random(); + +Duration defaultBackoffStrategy(Duration lastBackoff) { + if (lastBackoff == null) return _initialBackoff; + final jitter = _random.nextDouble() * 2 * _jitter - _jitter; + final nextBackoff = lastBackoff * (_multiplier + jitter); + return nextBackoff < _maxBackoff ? nextBackoff : _maxBackoff; +} + +/// Options controlling how connections are made on a [ClientChannel]. +class ChannelOptions { + final bool isSecure; + final List _certificateBytes; + final String _certificatePassword; + final String authority; + final Duration idleTimeout; + final BackoffStrategy backoffStrategy; + + const ChannelOptions._( + this.isSecure, + this._certificateBytes, + this._certificatePassword, + this.authority, + Duration idleTimeout, + BackoffStrategy backoffStrategy) + : this.idleTimeout = idleTimeout ?? defaultIdleTimeout, + this.backoffStrategy = backoffStrategy ?? defaultBackoffStrategy; + + /// Disable TLS. RPCs are sent in clear text. + const ChannelOptions.insecure( + {Duration idleTimeout, + BackoffStrategy backoffStrategy = + defaultBackoffStrategy}) // Remove when dart-lang/sdk#31066 is fixed. + : this._(false, null, null, null, idleTimeout, backoffStrategy); + + /// Enable TLS and optionally specify the [certificate]s to trust. If + /// [certificates] is not provided, the default trust store is used. + const ChannelOptions.secure( + {List certificate, + String password, + String authority, + Duration idleTimeout, + BackoffStrategy backoffStrategy = + defaultBackoffStrategy}) // Remove when dart-lang/sdk#31066 is fixed. + : this._(true, certificate, password, authority, idleTimeout, + backoffStrategy); + + SecurityContext get securityContext { + if (!isSecure) return null; + if (_certificateBytes != null) { + return createSecurityContext(false) + ..setTrustedCertificatesBytes(_certificateBytes, + password: _certificatePassword); + } + final context = new SecurityContext(withTrustedRoots: true); + if (SecurityContext.alpnSupported) { + context.setAlpnProtocols(supportedAlpnProtocols, false); + } + return context; + } +} + +/// Provides per-RPC metadata. +/// +/// Metadata providers will be invoked for every RPC, and can add their own +/// metadata to the RPC. If the function returns a [Future], the RPC will await +/// completion of the returned [Future] before transmitting the request. +/// +/// The metadata provider is given the current metadata map (possibly modified +/// by previous metadata providers), and is expected to modify the map before +/// returning or before completing the returned [Future]. +typedef FutureOr MetadataProvider(Map metadata); + +/// Runtime options for an RPC. +class CallOptions { + final Map metadata; + final Duration timeout; + final List metadataProviders; + + CallOptions._(this.metadata, this.timeout, this.metadataProviders); + + /// Creates a [CallOptions] object. + /// + /// [CallOptions] can specify static [metadata], set the [timeout], and + /// configure per-RPC metadata [providers]. The metadata [providers] are + /// invoked in order for every RPC, and can modify the outgoing metadata + /// (including metadata provided by previous providers). + factory CallOptions( + {Map metadata, + Duration timeout, + List providers}) { + return new CallOptions._(new Map.unmodifiable(metadata ?? {}), timeout, + new List.unmodifiable(providers ?? [])); + } + + factory CallOptions.from(Iterable options) => + options.fold(new CallOptions(), (p, o) => p.mergedWith(o)); + + CallOptions mergedWith(CallOptions other) { + if (other == null) return this; + final mergedMetadata = new Map.from(metadata)..addAll(other.metadata); + final mergedTimeout = other.timeout ?? timeout; + final mergedProviders = new List.from(metadataProviders) + ..addAll(other.metadataProviders); + return new CallOptions._(new Map.unmodifiable(mergedMetadata), + mergedTimeout, new List.unmodifiable(mergedProviders)); + } +} diff --git a/lib/src/server/call.dart b/lib/src/server/call.dart new file mode 100644 index 0000000..816a99f --- /dev/null +++ b/lib/src/server/call.dart @@ -0,0 +1,43 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +/// Server-side context for a gRPC call. +/// +/// Gives the method handler access to custom metadata from the client, and +/// ability to set custom metadata on the header/trailer sent to the client. +abstract class ServiceCall { + /// Custom metadata from the client. + Map get clientMetadata; + + /// Custom metadata to be sent to the client. Will be [null] once the headers + /// have been sent, either when [sendHeaders] is called, or when the first + /// response message is sent. + Map get headers; + + /// Custom metadata to be sent to the client after all response messages. + Map get trailers; + + /// Deadline for this call. If the call is still active after this time, then + /// the client or server may cancel it. + DateTime get deadline; + + /// Returns [true] if the [deadline] has been exceeded. + bool get isTimedOut; + + /// Returns [true] if the client has canceled this call. + bool get isCanceled; + + /// Send response headers. This is done automatically before sending the first + /// response message, but can be done manually before the first response is + /// ready, if necessary. + void sendHeaders(); + + /// Send response trailers. A trailer indicating success ([status] == 0) will + /// be sent automatically when all responses are sent. This method can be used + /// to send a different status code, if needed. + /// + /// The call will be closed after calling this method, and no further + /// responses can be sent. + void sendTrailers({int status, String message}); +} diff --git a/lib/src/server.dart b/lib/src/server/handler.dart similarity index 54% rename from lib/src/server.dart rename to lib/src/server/handler.dart index f80da36..d8ab16f 100644 --- a/lib/src/server.dart +++ b/lib/src/server/handler.dart @@ -3,190 +3,18 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; -import 'dart:io'; -import 'package:grpc/src/shared.dart'; import 'package:http2/transport.dart'; -import 'status.dart'; -import 'streams.dart'; +import '../shared/status.dart'; +import '../shared/streams.dart'; +import '../shared/timeout.dart'; -/// Definition of a gRPC service method. -class ServiceMethod { - final String name; - - final bool streamingRequest; - final bool streamingResponse; - - final Q Function(List request) requestDeserializer; - final List Function(R response) responseSerializer; - - final Function handler; - - ServiceMethod( - this.name, - this.handler, - this.streamingRequest, - this.streamingResponse, - this.requestDeserializer, - this.responseSerializer); -} - -/// Definition of a gRPC service. -abstract class Service { - final Map _$methods = {}; - - String get $name; - - void $addMethod(ServiceMethod method) { - _$methods[method.name] = method; - } - - /// Client metadata handler. - /// - /// Services can override this method to provide common handling of incoming - /// metadata from the client. - void $onMetadata(ServiceCall context) {} - - ServiceMethod $lookupMethod(String name) => _$methods[name]; -} - -/// A gRPC server. -/// -/// Listens for incoming RPCs, dispatching them to the right [Service] handler. -class Server { - final Map _services = {}; - final int port; - final SecurityContext _securityContext; - - ServerSocket _insecureServer; - SecureServerSocket _secureServer; - final _connections = []; - - Server._(List services, this.port, this._securityContext) { - for (final service in services) { - _services[service.$name] = service; - } - } - - /// Create a server for the given [services] with no transport security, - /// listening on [port]. - factory Server.insecure(List services, {int port}) { - return new Server._(services, port ?? 80, null); - } - - /// Create a secure server for the given [services], listening on [port]. - /// - /// If the [certificate] or [privateKey] is encrypted, the password must also - /// be provided. - factory Server.secure(List services, - {List certificate, - String certificatePassword, - List privateKey, - String privateKeyPassword, - int port}) { - final context = createSecurityContext(true); - if (privateKey != null) { - context.usePrivateKeyBytes(privateKey, password: privateKeyPassword); - } - if (certificate != null) { - context.useCertificateChainBytes(certificate, - password: certificatePassword); - } - return new Server._(services, port ?? 443, context); - } - - Service lookupService(String service) => _services[service]; - - Future serve() async { - // TODO(dart-lang/grpc-dart#9): Handle HTTP/1.1 upgrade to h2c, if allowed. - Stream server; - if (_securityContext != null) { - _secureServer = - await SecureServerSocket.bind('0.0.0.0', port, _securityContext); - server = _secureServer; - } else { - _insecureServer = await ServerSocket.bind('0.0.0.0', port); - server = _insecureServer; - } - server.listen((socket) { - final connection = new ServerTransportConnection.viaSocket(socket); - _connections.add(connection); - // TODO(jakobr): Set active state handlers, close connection after idle - // timeout. - connection.incomingStreams.listen(serveStream, onError: (error) { - print('Connection error: $error'); - }, onDone: () { - _connections.remove(connection); - }); - }, onError: (error) { - print('Socket error: $error'); - }); - } - - void serveStream(ServerTransportStream stream) { - new ServerHandler(lookupService, stream).handle(); - } - - Future shutdown() { - final done = _connections.map((connection) => connection.finish()).toList(); - if (_insecureServer != null) { - done.add(_insecureServer.close()); - } - if (_secureServer != null) { - done.add(_secureServer.close()); - } - return Future.wait(done); - } -} - -/// Server-side context for a gRPC call. -/// -/// Gives the method handler access to custom metadata from the client, and -/// ability to set custom metadata on the header/trailer sent to the client. -class ServiceCall { - final ServerHandler _handler; - - ServiceCall(this._handler); - - /// Custom metadata from the client. - Map get clientMetadata => _handler._clientMetadata; - - /// Custom metadata to be sent to the client. Will be [null] once the headers - /// have been sent, either when [sendHeaders] is called, or when the first - /// response message is sent. - Map get headers => _handler._customHeaders; - - /// Custom metadata to be sent to the client after all response messages. - Map get trailers => _handler._customTrailers; - - /// Deadline for this call. If the call is still active after this time, then - /// the client or server may cancel it. - DateTime get deadline => _handler._deadline; - - /// Returns [true] if the [deadline] has been exceeded. - bool get isTimedOut => _handler._isTimedOut; - - /// Returns [true] if the client has canceled this call. - bool get isCanceled => _handler._isCanceled; - - /// Send response headers. This is done automatically before sending the first - /// response message, but can be done manually before the first response is - /// ready, if necessary. - void sendHeaders() => _handler._sendHeaders(); - - /// Send response trailers. A trailer indicating success ([status] == 0) will - /// be sent automatically when all responses are sent. This method can be used - /// to send a different status code, if needed. - /// - /// The call will be closed after calling this method, and no further - /// responses can be sent. - void sendTrailer(int status, [String statusMessage]) => - _handler._sendTrailers(status: status, message: statusMessage); -} +import 'call.dart'; +import 'service.dart'; /// Handles an incoming gRPC call. -class ServerHandler { +class ServerHandler extends ServiceCall { final ServerTransportStream _stream; final Service Function(String service) _serviceLookup; @@ -214,7 +42,13 @@ class ServerHandler { ServerHandler(this._serviceLookup, this._stream); + DateTime get deadline => _deadline; bool get isCanceled => _isCanceled; + bool get isTimedOut => _isTimedOut; + + Map get clientMetadata => _clientMetadata; + Map get headers => _customHeaders; + Map get trailers => _customTrailers; void handle() { _stream.onTerminated = (int errorCode) { @@ -298,22 +132,20 @@ class ServerHandler { onResume: _incomingSubscription.resume); _incomingSubscription.onData(_onDataActive); - final context = new ServiceCall(this); - _service.$onMetadata(context); + _service.$onMetadata(this); if (_descriptor.streamingResponse) { if (_descriptor.streamingRequest) { - _responses = _descriptor.handler(context, _requests.stream); + _responses = _descriptor.handler(this, _requests.stream); } else { _responses = - _descriptor.handler(context, _toSingleFuture(_requests.stream)); + _descriptor.handler(this, _toSingleFuture(_requests.stream)); } } else { Future response; if (_descriptor.streamingRequest) { - response = _descriptor.handler(context, _requests.stream); + response = _descriptor.handler(this, _requests.stream); } else { - response = - _descriptor.handler(context, _toSingleFuture(_requests.stream)); + response = _descriptor.handler(this, _toSingleFuture(_requests.stream)); } _responses = response.asStream(); } @@ -390,7 +222,7 @@ class ServerHandler { try { final bytes = _descriptor.responseSerializer(response); if (!_headersSent) { - _sendHeaders(); + sendHeaders(); } _stream.sendData(GrpcHttpEncoder.frame(bytes)); } catch (error) { @@ -408,7 +240,7 @@ class ServerHandler { } void _onResponseDone() { - _sendTrailers(); + sendTrailers(); } void _onResponseError(error) { @@ -419,52 +251,52 @@ class ServerHandler { } } - void _sendHeaders() { + void sendHeaders() { if (_headersSent) throw new GrpcError.internal('Headers already sent'); _customHeaders..remove(':status')..remove('content-type'); // TODO(jakobr): Should come from package:http2? - final headersMap = { + final outgoingHeadersMap = { ':status': '200', 'content-type': 'application/grpc' }; - headersMap.addAll(_customHeaders); + outgoingHeadersMap.addAll(_customHeaders); _customHeaders = null; - final headers =
[]; - headersMap - .forEach((key, value) => headers.add(new Header.ascii(key, value))); - _stream.sendHeaders(headers); + final outgoingHeaders =
[]; + outgoingHeadersMap.forEach( + (key, value) => outgoingHeaders.add(new Header.ascii(key, value))); + _stream.sendHeaders(outgoingHeaders); _headersSent = true; } - void _sendTrailers({int status = 0, String message}) { + void sendTrailers({int status = 0, String message}) { _timeoutTimer?.cancel(); - final trailersMap = {}; + final outogingTrailersMap = {}; if (!_headersSent) { // TODO(jakobr): Should come from package:http2? - trailersMap[':status'] = '200'; - trailersMap['content-type'] = 'application/grpc'; + outogingTrailersMap[':status'] = '200'; + outogingTrailersMap['content-type'] = 'application/grpc'; _customHeaders..remove(':status')..remove('content-type'); - trailersMap.addAll(_customHeaders); + outogingTrailersMap.addAll(_customHeaders); _customHeaders = null; } _customTrailers..remove(':status')..remove('content-type'); - trailersMap.addAll(_customTrailers); + outogingTrailersMap.addAll(_customTrailers); _customTrailers = null; - trailersMap['grpc-status'] = status.toString(); + outogingTrailersMap['grpc-status'] = status.toString(); if (message != null) { - trailersMap['grpc-message'] = message; + outogingTrailersMap['grpc-message'] = message; } - final trailers =
[]; - trailersMap - .forEach((key, value) => trailers.add(new Header.ascii(key, value))); - _stream.sendHeaders(trailers, endStream: true); + final outgoingTrailers =
[]; + outogingTrailersMap.forEach( + (key, value) => outgoingTrailers.add(new Header.ascii(key, value))); + _stream.sendHeaders(outgoingTrailers, endStream: true); // We're done! _cancelResponseSubscription(); _sinkIncoming(); @@ -513,6 +345,6 @@ class ServerHandler { } void _sendError(GrpcError error) { - _sendTrailers(status: error.code, message: error.message); + sendTrailers(status: error.code, message: error.message); } } diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart new file mode 100644 index 0000000..00cfec4 --- /dev/null +++ b/lib/src/server/server.dart @@ -0,0 +1,102 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:http2/transport.dart'; + +import '../shared/security.dart'; + +import 'handler.dart'; +import 'service.dart'; + +/// A gRPC server. +/// +/// Listens for incoming RPCs, dispatching them to the right [Service] handler. +class Server { + final Map _services = {}; + final int port; + final SecurityContext _securityContext; + + ServerSocket _insecureServer; + SecureServerSocket _secureServer; + final _connections = []; + + Server._(List services, this.port, this._securityContext) { + for (final service in services) { + _services[service.$name] = service; + } + } + + /// Create a server for the given [services] with no transport security, + /// listening on [port]. + factory Server.insecure(List services, {int port}) { + return new Server._(services, port ?? 80, null); + } + + /// Create a secure server for the given [services], listening on [port]. + /// + /// If the [certificate] or [privateKey] is encrypted, the password must also + /// be provided. + factory Server.secure(List services, + {List certificate, + String certificatePassword, + List privateKey, + String privateKeyPassword, + int port}) { + final context = createSecurityContext(true); + if (privateKey != null) { + context.usePrivateKeyBytes(privateKey, password: privateKeyPassword); + } + if (certificate != null) { + context.useCertificateChainBytes(certificate, + password: certificatePassword); + } + return new Server._(services, port ?? 443, context); + } + + Service lookupService(String service) => _services[service]; + + Future serve() async { + // TODO(dart-lang/grpc-dart#9): Handle HTTP/1.1 upgrade to h2c, if allowed. + Stream server; + if (_securityContext != null) { + _secureServer = + await SecureServerSocket.bind('0.0.0.0', port, _securityContext); + server = _secureServer; + } else { + _insecureServer = await ServerSocket.bind('0.0.0.0', port); + server = _insecureServer; + } + server.listen((socket) { + final connection = new ServerTransportConnection.viaSocket(socket); + _connections.add(connection); + // TODO(jakobr): Set active state handlers, close connection after idle + // timeout. + connection.incomingStreams.listen(serveStream, onError: (error) { + print('Connection error: $error'); + }, onDone: () { + _connections.remove(connection); + }); + }, onError: (error) { + print('Socket error: $error'); + }); + } + + void serveStream(ServerTransportStream stream) { + new ServerHandler(lookupService, stream).handle(); + } + + Future shutdown() { + final done = _connections.map((connection) => connection.finish()).toList(); + if (_insecureServer != null) { + done.add(_insecureServer.close()); + } + if (_secureServer != null) { + done.add(_secureServer.close()); + } + return Future.wait(done); + } +} diff --git a/lib/src/server/service.dart b/lib/src/server/service.dart new file mode 100644 index 0000000..e3439c5 --- /dev/null +++ b/lib/src/server/service.dart @@ -0,0 +1,45 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'call.dart'; + +/// Definition of a gRPC service method. +class ServiceMethod { + final String name; + + final bool streamingRequest; + final bool streamingResponse; + + final Q Function(List request) requestDeserializer; + final List Function(R response) responseSerializer; + + final Function handler; + + ServiceMethod( + this.name, + this.handler, + this.streamingRequest, + this.streamingResponse, + this.requestDeserializer, + this.responseSerializer); +} + +/// Definition of a gRPC service. +abstract class Service { + final Map _$methods = {}; + + String get $name; + + void $addMethod(ServiceMethod method) { + _$methods[method.name] = method; + } + + /// Client metadata handler. + /// + /// Services can override this method to provide common handling of incoming + /// metadata from the client. + void $onMetadata(ServiceCall context) {} + + ServiceMethod $lookupMethod(String name) => _$methods[name]; +} diff --git a/lib/src/shared/security.dart b/lib/src/shared/security.dart new file mode 100644 index 0000000..f48631a --- /dev/null +++ b/lib/src/shared/security.dart @@ -0,0 +1,15 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; + +const supportedAlpnProtocols = const ['grpc-exp', 'h2']; + +// TODO: Simplify once we have a stable Dart 1.25 release (update pubspec to +// require SDK >=1.25.0, and remove check for alpnSupported). +SecurityContext createSecurityContext(bool isServer) => + SecurityContext.alpnSupported + ? (new SecurityContext() + ..setAlpnProtocols(supportedAlpnProtocols, isServer)) + : new SecurityContext(); diff --git a/lib/src/status.dart b/lib/src/shared/status.dart similarity index 100% rename from lib/src/status.dart rename to lib/src/shared/status.dart diff --git a/lib/src/streams.dart b/lib/src/shared/streams.dart similarity index 100% rename from lib/src/streams.dart rename to lib/src/shared/streams.dart diff --git a/lib/src/shared/timeout.dart b/lib/src/shared/timeout.dart new file mode 100644 index 0000000..1579948 --- /dev/null +++ b/lib/src/shared/timeout.dart @@ -0,0 +1,52 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +/// Convert [timeout] to grpc-timeout header string format. +// Mostly inspired by grpc-java implementation. +// TODO(jakobr): Modify to match grpc/core implementation instead. +String toTimeoutString(Duration duration) { + if (duration == null) return null; + const cutoff = 100000; + final timeout = duration.inMicroseconds; + if (timeout < 0) { + // Smallest possible timeout. + return '1n'; + } else if (timeout < cutoff) { + return '${timeout}u'; + } else if (timeout < cutoff * 1000) { + return '${timeout~/1000}m'; + } else if (timeout < cutoff * 1000 * 1000) { + return '${timeout~/1000000}S'; + } else if (timeout < cutoff * 1000 * 1000 * 60) { + return '${timeout~/60000000}M'; + } else { + return '${timeout~/3600000000}H'; + } +} + +/// Convert [timeout] from grpc-timeout header string format to [Duration]. +/// Returns [null] if [timeout] is not correctly formatted. +Duration fromTimeoutString(String timeout) { + if (timeout == null) return null; + if (timeout.length < 2) return null; + final value = + int.parse(timeout.substring(0, timeout.length - 1), onError: (_) => null); + if (value == null) return null; + switch (timeout[timeout.length - 1]) { + case 'n': + return new Duration(microseconds: value * 1000); + case 'u': + return new Duration(microseconds: value); + case 'm': + return new Duration(milliseconds: value); + case 'S': + return new Duration(seconds: value); + case 'M': + return new Duration(minutes: value); + case 'H': + return new Duration(hours: value); + default: + return null; + } +} diff --git a/pubspec.yaml b/pubspec.yaml index 7bc5491..f3efaf1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: grpc description: Dart implementation of gRPC. -version: 0.1.0 +version: 0.2.0 author: Dart Team homepage: https://github.com/dart-lang/grpc-dart diff --git a/test/server_test.dart b/test/server_test.dart index 81cff54..ffaa329 100644 --- a/test/server_test.dart +++ b/test/server_test.dart @@ -5,7 +5,6 @@ import 'dart:async'; import 'package:grpc/grpc.dart'; -import 'package:grpc/src/status.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index 31b7d14..2f21753 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -5,7 +5,7 @@ import 'dart:async'; import 'dart:io'; -import 'package:grpc/src/streams.dart'; +import 'package:grpc/src/shared/streams.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; import 'package:mockito/mockito.dart'; diff --git a/test/src/server_utils.dart b/test/src/server_utils.dart index fb17587..b01a1bb 100644 --- a/test/src/server_utils.dart +++ b/test/src/server_utils.dart @@ -4,7 +4,7 @@ import 'dart:async'; -import 'package:grpc/src/streams.dart'; +import 'package:grpc/src/shared/streams.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; diff --git a/test/src/utils.dart b/test/src/utils.dart index 92fd913..ed69a52 100644 --- a/test/src/utils.dart +++ b/test/src/utils.dart @@ -4,7 +4,7 @@ import 'dart:convert'; -import 'package:grpc/src/streams.dart'; +import 'package:grpc/src/shared/streams.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; diff --git a/test/stream_test.dart b/test/stream_test.dart index 63b8c0c..b455dcc 100644 --- a/test/stream_test.dart +++ b/test/stream_test.dart @@ -7,8 +7,7 @@ import 'dart:async'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; -import 'package:grpc/src/streams.dart'; -import 'package:grpc/src/status.dart'; +import 'package:grpc/grpc.dart'; void main() { group('GrpcHttpDecoder', () { diff --git a/test/timeout_test.dart b/test/timeout_test.dart index f78d754..6f73bf6 100644 --- a/test/timeout_test.dart +++ b/test/timeout_test.dart @@ -9,8 +9,6 @@ import 'package:http2/transport.dart'; import 'package:test/src/backend/invoker.dart'; import 'package:test/test.dart'; -import 'package:grpc/src/client.dart'; - import 'src/client_utils.dart'; import 'src/server_utils.dart'; import 'src/utils.dart';