From 2f118ea043310b32c4e8c9a01a11c61a89fc601f Mon Sep 17 00:00:00 2001 From: Jakob Andersen Date: Mon, 25 Sep 2017 13:51:40 +0200 Subject: [PATCH] Preparation for RPC multiplexing (#31) First stage of separating Connection from Channel. A Channel manages multiple Connections, and chooses which Connection to send an RPC on. In this change, the Channel still creates a Connection for each RPC. Managing the Connection life-cycle comes in a later change. --- example/metadata/lib/src/client.dart | 54 +++- .../lib/src/generated/metadata.pb.dart | 3 +- .../lib/src/generated/metadata.pbgrpc.dart | 18 +- example/route_guide/lib/src/client.dart | 2 +- .../lib/src/generated/route_guide.pbgrpc.dart | 18 +- interop/bin/server.dart | 7 +- interop/lib/src/generated/empty.pb.dart | 3 +- interop/lib/src/generated/messages.pb.dart | 3 +- .../lib/src/generated/messages.pbenum.dart | 5 +- interop/lib/src/generated/test.pb.dart | 3 +- interop/lib/src/generated/test.pbgrpc.dart | 64 ++-- lib/src/client.dart | 283 ++++++++++++------ lib/src/server.dart | 6 +- test/client_test.dart | 3 +- test/src/client_utils.dart | 56 ++-- test/src/server_utils.dart | 9 +- test/src/utils.dart | 2 +- test/timeout_test.dart | 3 +- 18 files changed, 322 insertions(+), 220 deletions(-) diff --git a/example/metadata/lib/src/client.dart b/example/metadata/lib/src/client.dart index 1ee0d98..f1aa66d 100644 --- a/example/metadata/lib/src/client.dart +++ b/example/metadata/lib/src/client.dart @@ -22,10 +22,13 @@ class Client { await runAddOneCancel(); await runFibonacciCancel(); await runFibonacciTimeout(); - await channel.close(); + await channel.shutdown(); } - // Run the echo demo. ... + /// Run the echo demo. + /// + /// Send custom metadata with a RPC, and print out the received response and + /// metadata. Future runEcho() async { final request = new Record()..value = 'Kaj'; final call = stub.echo(request, @@ -40,7 +43,11 @@ class Client { print('Echo response: ${response.value}'); } - // Run the echo with delay cancel demo. ... + /// Run the echo with delay cancel demo. + /// + /// Same as the echo demo, but demonstrating per-client custom metadata, as + /// well as a per-call metadata. The server will delay the response for the + /// requested duration, during which the client will cancel the RPC. Future runEchoDelayCancel() async { final stubWithCustomOptions = new MetadataClient(channel, options: new CallOptions(metadata: {'peer': 'Verner'})); @@ -63,7 +70,10 @@ class Client { } } - // Run the addOne cancel demo. + /// Run the addOne cancel demo. + /// + /// Makes a bi-directional RPC, sends 4 requests, and cancels the RPC after + /// receiving 3 responses. Future runAddOneCancel() async { final numbers = new StreamController(); final call = @@ -74,7 +84,7 @@ class Client { if (number.value == 3) { receivedThree.complete(true); } - }); + }, onError: (e) => print('Caught: $e')); numbers.add(1); numbers.add(2); numbers.add(3); @@ -84,23 +94,43 @@ class Client { await Future.wait([sub.cancel(), numbers.close()]); } + /// Run the Fibonacci demo. + /// /// Call an RPC that returns a stream of Fibonacci numbers. Cancel the call /// after receiving more than 5 responses. Future runFibonacciCancel() async { final call = stub.fibonacci(new Empty()); int count = 0; - await for (var number in call) { - count++; - print('Received ${number.value} (count=$count)'); - if (count > 5) { - await call.cancel(); + try { + await for (var number in call) { + count++; + print('Received ${number.value} (count=$count)'); + if (count > 5) { + await call.cancel(); + } } + } on GrpcError catch (e) { + print('Caught: $e'); } print('Final count: $count'); } - // Run the timeout demo. ... + /// Run the timeout demo. + /// + /// Call an RPC that returns a stream of Fibonacci numbers, and specify an RPC + /// timeout of 2 seconds. Future runFibonacciTimeout() async { - // TODO(jakobr): Implement timeouts. + final call = stub.fibonacci(new Empty(), + options: new CallOptions(timeout: new Duration(seconds: 2))); + int count = 0; + try { + await for (var number in call) { + count++; + print('Received ${number.value} (count=$count)'); + } + } on GrpcError catch (e) { + print('Caught: $e'); + } + print('Final count: $count'); } } diff --git a/example/metadata/lib/src/generated/metadata.pb.dart b/example/metadata/lib/src/generated/metadata.pb.dart index 00940d8..a42a431 100644 --- a/example/metadata/lib/src/generated/metadata.pb.dart +++ b/example/metadata/lib/src/generated/metadata.pb.dart @@ -1,8 +1,7 @@ /// // Generated code. Do not modify. /// -// ignore_for_file: non_constant_identifier_names -// ignore_for_file: library_prefixes +// ignore_for_file: non_constant_identifier_names,library_prefixes library grpc_metadata; // ignore: UNUSED_SHOWN_NAME diff --git a/example/metadata/lib/src/generated/metadata.pbgrpc.dart b/example/metadata/lib/src/generated/metadata.pbgrpc.dart index 7430dcf..8c77e07 100644 --- a/example/metadata/lib/src/generated/metadata.pbgrpc.dart +++ b/example/metadata/lib/src/generated/metadata.pbgrpc.dart @@ -1,8 +1,7 @@ /// // Generated code. Do not modify. /// -// ignore_for_file: non_constant_identifier_names -// ignore_for_file: library_prefixes +// ignore_for_file: non_constant_identifier_names,library_prefixes library grpc_metadata_pbgrpc; import 'dart:async'; @@ -30,24 +29,19 @@ class MetadataClient extends Client { : super(channel, options: options); ResponseFuture echo(Record request, {CallOptions options}) { - final call = $createCall(_$echo, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall(_$echo, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } ResponseStream addOne(Stream request, {CallOptions options}) { - final call = $createCall(_$addOne, options: options); - request.pipe(call.request); + final call = $createCall(_$addOne, request, options: options); return new ResponseStream(call); } ResponseStream fibonacci(Empty request, {CallOptions options}) { - final call = $createCall(_$fibonacci, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall(_$fibonacci, new Stream.fromIterable([request]), + options: options); return new ResponseStream(call); } } diff --git a/example/route_guide/lib/src/client.dart b/example/route_guide/lib/src/client.dart index b89cb38..c154510 100644 --- a/example/route_guide/lib/src/client.dart +++ b/example/route_guide/lib/src/client.dart @@ -24,7 +24,7 @@ class Client { await runListFeatures(); await runRecordRoute(); await runRouteChat(); - await channel.close(); + await channel.shutdown(); } void printFeature(Feature feature) { diff --git a/example/route_guide/lib/src/generated/route_guide.pbgrpc.dart b/example/route_guide/lib/src/generated/route_guide.pbgrpc.dart index 46d5072..17dae33 100644 --- a/example/route_guide/lib/src/generated/route_guide.pbgrpc.dart +++ b/example/route_guide/lib/src/generated/route_guide.pbgrpc.dart @@ -34,33 +34,27 @@ class RouteGuideClient extends Client { : super(channel, options: options); ResponseFuture getFeature(Point request, {CallOptions options}) { - final call = $createCall(_$getFeature, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall(_$getFeature, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } ResponseStream listFeatures(Rectangle request, {CallOptions options}) { - final call = $createCall(_$listFeatures, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall(_$listFeatures, new Stream.fromIterable([request]), + options: options); return new ResponseStream(call); } ResponseFuture recordRoute(Stream request, {CallOptions options}) { - final call = $createCall(_$recordRoute, options: options); - request.pipe(call.request); + final call = $createCall(_$recordRoute, request, options: options); return new ResponseFuture(call); } ResponseStream routeChat(Stream request, {CallOptions options}) { - final call = $createCall(_$routeChat, options: options); - request.pipe(call.request); + final call = $createCall(_$routeChat, request, options: options); return new ResponseStream(call); } } diff --git a/interop/bin/server.dart b/interop/bin/server.dart index 1ce3c40..3c35546 100644 --- a/interop/bin/server.dart +++ b/interop/bin/server.dart @@ -84,8 +84,11 @@ class TestService extends TestServiceBase { throw new GrpcError.custom( request.responseStatus.code, request.responseStatus.message); } - return new StreamingOutputCallResponse() - ..payload = _payloadForRequest(request.responseParameters[0]); + final response = new StreamingOutputCallResponse(); + if (request.responseParameters.isNotEmpty) { + response.payload = _payloadForRequest(request.responseParameters[0]); + } + return response; } @override diff --git a/interop/lib/src/generated/empty.pb.dart b/interop/lib/src/generated/empty.pb.dart index 4db05ee..4b040c7 100644 --- a/interop/lib/src/generated/empty.pb.dart +++ b/interop/lib/src/generated/empty.pb.dart @@ -1,8 +1,7 @@ /// // Generated code. Do not modify. /// -// ignore_for_file: non_constant_identifier_names -// ignore_for_file: library_prefixes +// ignore_for_file: non_constant_identifier_names,library_prefixes library grpc.testing_empty; // ignore: UNUSED_SHOWN_NAME diff --git a/interop/lib/src/generated/messages.pb.dart b/interop/lib/src/generated/messages.pb.dart index def9e61..3f98eae 100644 --- a/interop/lib/src/generated/messages.pb.dart +++ b/interop/lib/src/generated/messages.pb.dart @@ -1,8 +1,7 @@ /// // Generated code. Do not modify. /// -// ignore_for_file: non_constant_identifier_names -// ignore_for_file: library_prefixes +// ignore_for_file: non_constant_identifier_names,library_prefixes library grpc.testing_messages; // ignore: UNUSED_SHOWN_NAME diff --git a/interop/lib/src/generated/messages.pbenum.dart b/interop/lib/src/generated/messages.pbenum.dart index 545cbfa..964d000 100644 --- a/interop/lib/src/generated/messages.pbenum.dart +++ b/interop/lib/src/generated/messages.pbenum.dart @@ -1,11 +1,10 @@ /// // Generated code. Do not modify. /// -// ignore_for_file: non_constant_identifier_names -// ignore_for_file: library_prefixes +// ignore_for_file: non_constant_identifier_names,library_prefixes library grpc.testing_messages_pbenum; -// ignore: UNUSED_SHOWN_NAME +// ignore_for_file: UNDEFINED_SHOWN_NAME,UNUSED_SHOWN_NAME import 'dart:core' show int, dynamic, String, List, Map; import 'package:protobuf/protobuf.dart'; diff --git a/interop/lib/src/generated/test.pb.dart b/interop/lib/src/generated/test.pb.dart index 584edd8..d64b754 100644 --- a/interop/lib/src/generated/test.pb.dart +++ b/interop/lib/src/generated/test.pb.dart @@ -1,8 +1,7 @@ /// // Generated code. Do not modify. /// -// ignore_for_file: non_constant_identifier_names -// ignore_for_file: library_prefixes +// ignore_for_file: non_constant_identifier_names,library_prefixes library grpc.testing_test; // ignore: UNUSED_SHOWN_NAME diff --git a/interop/lib/src/generated/test.pbgrpc.dart b/interop/lib/src/generated/test.pbgrpc.dart index 84be978..8a5dda1 100644 --- a/interop/lib/src/generated/test.pbgrpc.dart +++ b/interop/lib/src/generated/test.pbgrpc.dart @@ -1,8 +1,7 @@ /// // Generated code. Do not modify. /// -// ignore_for_file: non_constant_identifier_names -// ignore_for_file: library_prefixes +// ignore_for_file: non_constant_identifier_names,library_prefixes library grpc.testing_test_pbgrpc; import 'dart:async'; @@ -60,71 +59,61 @@ class TestServiceClient extends Client { : super(channel, options: options); ResponseFuture emptyCall(Empty request, {CallOptions options}) { - final call = $createCall(_$emptyCall, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall(_$emptyCall, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } ResponseFuture unaryCall(SimpleRequest request, {CallOptions options}) { - final call = $createCall(_$unaryCall, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall(_$unaryCall, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } ResponseFuture cacheableUnaryCall(SimpleRequest request, {CallOptions options}) { - final call = $createCall(_$cacheableUnaryCall, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall( + _$cacheableUnaryCall, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } ResponseStream streamingOutputCall( StreamingOutputCallRequest request, {CallOptions options}) { - final call = $createCall(_$streamingOutputCall, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall( + _$streamingOutputCall, new Stream.fromIterable([request]), + options: options); return new ResponseStream(call); } ResponseFuture streamingInputCall( Stream request, {CallOptions options}) { - final call = $createCall(_$streamingInputCall, options: options); - request.pipe(call.request); + final call = $createCall(_$streamingInputCall, request, options: options); return new ResponseFuture(call); } ResponseStream fullDuplexCall( Stream request, {CallOptions options}) { - final call = $createCall(_$fullDuplexCall, options: options); - request.pipe(call.request); + final call = $createCall(_$fullDuplexCall, request, options: options); return new ResponseStream(call); } ResponseStream halfDuplexCall( Stream request, {CallOptions options}) { - final call = $createCall(_$halfDuplexCall, options: options); - request.pipe(call.request); + final call = $createCall(_$halfDuplexCall, request, options: options); return new ResponseStream(call); } ResponseFuture unimplementedCall(Empty request, {CallOptions options}) { - final call = $createCall(_$unimplementedCall, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall( + _$unimplementedCall, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } } @@ -228,10 +217,9 @@ class UnimplementedServiceClient extends Client { ResponseFuture unimplementedCall(Empty request, {CallOptions options}) { - final call = $createCall(_$unimplementedCall, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall( + _$unimplementedCall, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } } @@ -271,18 +259,14 @@ class ReconnectServiceClient extends Client { : super(channel, options: options); ResponseFuture start(ReconnectParams request, {CallOptions options}) { - final call = $createCall(_$start, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall(_$start, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } ResponseFuture stop(Empty request, {CallOptions options}) { - final call = $createCall(_$stop, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall(_$stop, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } } diff --git a/lib/src/client.dart b/lib/src/client.dart index f940af6..b87f04c 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -26,9 +26,10 @@ class ChannelOptions { final bool _useTls; final List _certificateBytes; final String _certificatePassword; + final String authority; const ChannelOptions._(this._useTls, - [this._certificateBytes, this._certificatePassword]); + [this._certificateBytes, this._certificatePassword, this.authority]); /// Enable TLS using the default trust store. const ChannelOptions() : this._(true); @@ -37,8 +38,9 @@ class ChannelOptions { const ChannelOptions.insecure() : this._(false); /// Enable TLS and specify the [certificate]s to trust. - ChannelOptions.secure({List certificate, String password}) - : this._(true, certificate, password); + ChannelOptions.secure( + {List certificate, String password, String authority}) + : this._(true, certificate, password, authority); SecurityContext get securityContext { if (!_useTls) return null; @@ -51,42 +53,157 @@ class ChannelOptions { } } -/// A channel to an RPC endpoint. +/// A connection to a single RPC endpoint. +/// +/// RPCs made on a connection are always sent to the same endpoint. +class ClientConnection { + static final _methodPost = new Header.ascii(':method', 'POST'); + static final _schemeHttp = new Header.ascii(':scheme', 'http'); + static final _schemeHttps = new Header.ascii(':scheme', 'https'); + static final _contentTypeGrpc = + new Header.ascii('content-type', 'application/grpc'); + static final _teTrailers = new Header.ascii('te', 'trailers'); + static final _grpcAcceptEncoding = + new Header.ascii('grpc-accept-encoding', 'identity'); + static final _userAgent = new Header.ascii('user-agent', 'dart-grpc/0.2.0'); + + final ClientTransportConnection _transport; + + ClientConnection(this._transport); + + static List
createCallHeaders( + bool useTls, String authority, String path, CallOptions options) { + final headers = [ + _methodPost, + useTls ? _schemeHttps : _schemeHttp, + new Header.ascii(':path', path), + new Header.ascii(':authority', authority), + ]; + if (options.timeout != null) { + headers.add( + new Header.ascii('grpc-timeout', toTimeoutString(options.timeout))); + } + headers.addAll([ + _contentTypeGrpc, + _teTrailers, + _grpcAcceptEncoding, + _userAgent, + ]); + options.metadata.forEach((key, value) { + headers.add(new Header.ascii(key, value)); + }); + return headers; + } + + /// Shuts down this connection. + /// + /// No further calls may be made on this connection, but existing calls + /// are allowed to finish. + Future shutdown() { + // TODO(jakobr): Manage streams, close [_transport] when all are done. + return _transport.finish(); + } + + /// Terminates this connection. + /// + /// All open calls are terminated immediately, and no further calls may be + /// made on this connection. + Future terminate() { + // TODO(jakobr): Manage streams, close them immediately. + return _transport.terminate(); + } + + /// Starts a new RPC on this connection. + /// + /// Creates a new transport stream on this connection, and sends initial call + /// metadata. + ClientTransportStream sendRequest( + bool useTls, String authority, String path, CallOptions options) { + final headers = createCallHeaders(useTls, authority, path, options); + final stream = _transport.makeRequest(headers); + // TODO(jakobr): Manage streams. Subscribe to stream state changes. + return stream; + } +} + +/// A channel to a virtual RPC endpoint. +/// +/// For each RPC, the channel picks a [ClientConnection] to dispatch the call. +/// RPCs on the same channel may be sent to different connections, depending on +/// load balancing settings. class ClientChannel { final String host; final int port; final ChannelOptions options; - final List _sockets = []; - final List _connections = []; + final _connections = []; + + bool _isShutdown = false; ClientChannel(this.host, {this.port = 443, this.options = const ChannelOptions()}); - /// Returns a connection to this [Channel]'s RPC endpoint. The connection may - /// be shared between multiple RPCs. - Future connect() async { + String get authority => options.authority ?? host; + + void _shutdownCheck([Function() cleanup]) { + if (!_isShutdown) return; + if (cleanup != null) cleanup(); + throw new GrpcError.unavailable('Channel shutting down.'); + } + + /// Shuts down this channel. + /// + /// No further RPCs can be made on this channel. RPCs already in progress will + /// be allowed to complete. + Future shutdown() { + if (_isShutdown) return new Future.value(); + _isShutdown = true; + return Future.wait(_connections.map((c) => c.shutdown())); + } + + /// Terminates this channel. + /// + /// RPCs already in progress will be terminated. No further RPCs can be made + /// on this channel. + Future terminate() { + _isShutdown = true; + return Future.wait(_connections.map((c) => c.terminate())); + } + + /// Returns a connection to this [Channel]'s RPC endpoint. + /// + /// The connection may be shared between multiple RPCs. + Future connect() async { + _shutdownCheck(); final securityContext = options.securityContext; - Socket socket; - if (securityContext == null) { - socket = await Socket.connect(host, port); - } else { - socket = await SecureSocket.connect(host, port, - context: securityContext, supportedProtocols: ['grpc-exp', 'h2']); + var socket = await Socket.connect(host, port); + _shutdownCheck(socket.destroy); + if (securityContext != null) { + socket = await SecureSocket.secure(socket, + host: authority, context: securityContext); + _shutdownCheck(socket.destroy); } - _sockets.add(socket); - final connection = new ClientTransportConnection.viaSocket(socket); + final connection = + new ClientConnection(new ClientTransportConnection.viaSocket(socket)); _connections.add(connection); return connection; } - /// Close all connections made on this [ClientChannel]. - Future close() async { - await Future.wait(_connections.map((c) => c.finish())); - _connections.clear(); - await Future.wait(_sockets.map((s) => s.close())); - _sockets.clear(); + /// Initiates a new RPC on this connection. + ClientCall createCall( + ClientMethod method, Stream requests, CallOptions options) { + final call = new ClientCall(method, requests, options.timeout); + connect().then((connection) { + // TODO(jakobr): Check if deadline is exceeded. + if (call._isCancelled) return; + final stream = connection.sendRequest( + this.options._useTls, authority, method.path, options); + call._onConnectedStream(stream); + }, onError: (error) { + call._onConnectError(error); + }); + return call; } } @@ -135,105 +252,73 @@ class Client { Client(this._channel, {CallOptions options}) : _options = options ?? new CallOptions(); - ClientCall $createCall(ClientMethod method, + ClientCall $createCall( + ClientMethod method, Stream requests, {CallOptions options}) { - return new ClientCall(_channel, method, - options: _options.mergedWith(options)); + return _channel.createCall(method, requests, _options.mergedWith(options)); } } /// An active call to a gRPC endpoint. class ClientCall implements Response { - static final _methodPost = new Header.ascii(':method', 'POST'); - static final _schemeHttp = new Header.ascii(':scheme', 'http'); - static final _contentTypeGrpc = - new Header.ascii('content-type', 'application/grpc'); - static final _teTrailers = new Header.ascii('te', 'trailers'); - static final _grpcAcceptEncoding = - new Header.ascii('grpc-accept-encoding', 'identity'); - static final _userAgent = new Header.ascii('user-agent', 'dart-grpc/0.2.0'); - - final ClientChannel _channel; final ClientMethod _method; + final Stream _requests; - final Completer> _headers = new Completer(); - final Completer> _trailers = new Completer(); + final _headers = new Completer>(); + final _trailers = new Completer>(); bool _hasReceivedResponses = false; Map _headerMetadata; TransportStream _stream; - final _requests = new StreamController(); StreamController _responses; + StreamSubscription _requestSubscription; StreamSubscription _responseSubscription; - final CallOptions options; - - Future _callSetup; + bool _isCancelled = false; Timer _timeoutTimer; - ClientCall(this._channel, this._method, {this.options}) { + ClientCall(this._method, this._requests, Duration timeout) { _responses = new StreamController(onListen: _onResponseListen); - final timeout = options?.timeout; if (timeout != null) { _timeoutTimer = new Timer(timeout, _onTimedOut); } - _callSetup = _initiateCall(timeout).catchError((error) { - _responses.addError( - new GrpcError.unavailable('Error connecting: ${error.toString()}')); - _timeoutTimer?.cancel(); - }); } - void _onTimedOut() { - _responses.addError(new GrpcError.deadlineExceeded('Deadline exceeded')); - cancel().catchError((_) {}); - } - - static List
createCallHeaders(String path, String authority, - {String timeout, Map metadata}) { - // TODO(jakobr): Populate HTTP-specific headers in connection? - final headers =
[ - _methodPost, - _schemeHttp, - new Header.ascii(':path', path), - new Header.ascii(':authority', authority), - ]; - if (timeout != null) { - headers.add(new Header.ascii('grpc-timeout', timeout)); + void _onConnectError(error) { + if (!_responses.isClosed) { + _responses + .addError(new GrpcError.unavailable('Error connecting: $error')); } - headers.addAll([ - _contentTypeGrpc, - _teTrailers, - _grpcAcceptEncoding, - _userAgent, - ]); - metadata?.forEach((key, value) { - headers.add(new Header.ascii(key, value)); - }); - return headers; + _safeTerminate(); } - Future _initiateCall(Duration timeout) async { - final connection = await _channel.connect(); - final timeoutString = toTimeoutString(timeout); - // TODO(jakobr): Flip this around, and have the Channel create the call - // object and apply options (including the above TODO). - final headers = createCallHeaders(_method.path, _channel.host, - timeout: timeoutString, metadata: options?.metadata); - _stream = connection.makeRequest(headers); - _requests.stream + void _onConnectedStream(ClientTransportStream stream) { + if (_isCancelled) { + // Should not happen, but just in case. + stream.terminate(); + return; + } + _stream = stream; + _requestSubscription = _requests .map(_method.requestSerializer) .map(GrpcHttpEncoder.frame) .map((bytes) => new DataStreamMessage(bytes)) .handleError(_onRequestError) - .pipe(_stream.outgoingMessages) - .catchError(_onRequestError); + .listen(_stream.outgoingMessages.add, + onError: _stream.outgoingMessages.addError, + onDone: _stream.outgoingMessages.close, + cancelOnError: true); // The response stream might have been listened to before _stream was ready, // so try setting up the subscription here as well. _onResponseListen(); } + void _onTimedOut() { + _responses.addError(new GrpcError.deadlineExceeded('Deadline exceeded')); + _safeTerminate(); + } + /// Subscribe to incoming response messages, once [_stream] is available, and /// the caller has subscribed to the [_responses] stream. void _onResponseListen() { @@ -260,6 +345,7 @@ class ClientCall implements Response { void _responseError(GrpcError error) { _responses.addError(error); _timeoutTimer?.cancel(); + _requestSubscription?.cancel(); _responseSubscription.cancel(); _responses.close(); _stream.terminate(); @@ -359,11 +445,11 @@ class ClientCall implements Response { _responses.addError(error); _timeoutTimer?.cancel(); _responses.close(); + _requestSubscription?.cancel(); _responseSubscription?.cancel(); _stream.terminate(); } - StreamSink get request => _requests.sink; Stream get response => _responses.stream; @override @@ -373,21 +459,32 @@ class ClientCall implements Response { Future> get trailers => _trailers.future; @override - Future cancel() async { + Future cancel() { + if (!_responses.isClosed) { + _responses.addError(new GrpcError.cancelled('Cancelled by client.')); + } + return _terminate(); + } + + Future _terminate() async { + _isCancelled = true; _timeoutTimer?.cancel(); - _callSetup.whenComplete(() { - // Terminate the stream if the call connects after being canceled. - _stream?.terminate(); - }); // Don't await _responses.close() here. It'll only complete once the done // event has been delivered, and it's the caller of this function that is // reading from responses as well, so we might end up deadlocked. _responses.close(); _stream?.terminate(); - final futures = [_requests.close()]; + final futures = []; + if (_requestSubscription != null) { + futures.add(_requestSubscription.cancel()); + } if (_responseSubscription != null) { futures.add(_responseSubscription.cancel()); } await Future.wait(futures); } + + Future _safeTerminate() { + return _terminate().catchError((_) {}); + } } diff --git a/lib/src/server.dart b/lib/src/server.dart index 516ccfe..cf51a2f 100644 --- a/lib/src/server.dart +++ b/lib/src/server.dart @@ -217,6 +217,7 @@ class ServerHandler { void handle() { _stream.onTerminated = (int errorCode) { _isCanceled = true; + _timeoutTimer?.cancel(); _cancelResponseSubscription(); }; @@ -330,15 +331,16 @@ class ServerHandler { } void _onTimedOut() { + if (_isCanceled) return; _isTimedOut = true; _isCanceled = true; final error = new GrpcError.deadlineExceeded('Deadline exceeded'); + _sendError(error); if (!_requests.isClosed) { _requests ..addError(error) ..close(); } - _sendError(error); } // -- Active state, incoming data -- @@ -473,7 +475,7 @@ class ServerHandler { // client, so we treat it as such. _timeoutTimer?.cancel(); _isCanceled = true; - if (!_requests.isClosed) { + if (_requests != null && !_requests.isClosed) { _requests.addError(new GrpcError.cancelled('Cancelled')); } _cancelResponseSubscription(); diff --git a/test/client_test.dart b/test/client_test.dart index 9ea682d..f1b2e43 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -276,8 +276,7 @@ void main() { }); test('Connection errors are reported', () async { - reset(harness.channel); - when(harness.channel.connect()).thenThrow('Connection error'); + harness.channel.connectionError = 'Connection error'; final expectedError = new GrpcError.unavailable('Error connecting: Connection error'); harness.expectThrows(harness.client.unary(dummyValue), expectedError); diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index a361402..d5d8fbc 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -13,17 +13,27 @@ import 'package:grpc/grpc.dart'; import 'utils.dart'; -class MockConnection extends Mock implements ClientTransportConnection {} +class MockTransport extends Mock implements ClientTransportConnection {} class MockStream extends Mock implements ClientTransportStream {} -class MockChannel extends Mock implements ClientChannel {} +class MockChannel extends ClientChannel { + final ClientConnection connection; + + var connectionError; + + MockChannel(String host, this.connection) : super(host); + + @override + Future connect() async { + if (connectionError != null) throw connectionError; + return connection; + } +} typedef ServerMessageHandler = void Function(StreamMessage message); -class TestClient { - final ClientChannel _channel; - +class TestClient extends Client { static final _$unary = new ClientMethod('/Test/Unary', mockEncode, mockDecode); static final _$clientStreaming = new ClientMethod( @@ -33,41 +43,37 @@ class TestClient { static final _$bidirectional = new ClientMethod('/Test/Bidirectional', mockEncode, mockDecode); - TestClient(this._channel); + TestClient(ClientChannel channel, {CallOptions options}) + : super(channel, options: options); ResponseFuture unary(int request, {CallOptions options}) { - final call = new ClientCall(_channel, _$unary, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall(_$unary, new Stream.fromIterable([request]), + options: options); return new ResponseFuture(call); } ResponseFuture clientStreaming(Stream request, {CallOptions options}) { - final call = new ClientCall(_channel, _$clientStreaming, options: options); - request.pipe(call.request); + final call = $createCall(_$clientStreaming, request, options: options); return new ResponseFuture(call); } ResponseStream serverStreaming(int request, {CallOptions options}) { - final call = new ClientCall(_channel, _$serverStreaming, options: options); - call.request - ..add(request) - ..close(); + final call = $createCall( + _$serverStreaming, new Stream.fromIterable([request]), + options: options); return new ResponseStream(call); } ResponseStream bidirectional(Stream request, {CallOptions options}) { - final call = new ClientCall(_channel, _$bidirectional, options: options); - request.pipe(call.request); + final call = $createCall(_$bidirectional, request, options: options); return new ResponseStream(call); } } class ClientHarness { - MockConnection connection; + MockTransport transport; MockChannel channel; MockStream stream; @@ -77,14 +83,12 @@ class ClientHarness { TestClient client; void setUp() { - connection = new MockConnection(); - channel = new MockChannel(); + transport = new MockTransport(); + channel = new MockChannel('test', new ClientConnection(transport)); stream = new MockStream(); fromClient = new StreamController(); toClient = new StreamController(); - when(channel.host).thenReturn('test'); - when(channel.connect()).thenReturn(connection); - when(connection.makeRequest(any)).thenReturn(stream); + when(transport.makeRequest(any)).thenReturn(stream); when(stream.outgoingMessages).thenReturn(fromClient.sink); when(stream.incomingMessages).thenReturn(toClient.stream); client = new TestClient(channel); @@ -134,10 +138,8 @@ class ClientHarness { expect(result, expectedResult); } - verify(channel.connect()).called(1); - final List
capturedHeaders = - verify(connection.makeRequest(captureAny)).captured.single; + verify(transport.makeRequest(captureAny)).captured.single; validateRequestHeaders(capturedHeaders, path: expectedPath, timeout: toTimeoutString(expectedTimeout), diff --git a/test/src/server_utils.dart b/test/src/server_utils.dart index 254053e..ac5a4f7 100644 --- a/test/src/server_utils.dart +++ b/test/src/server_utils.dart @@ -143,10 +143,11 @@ class ServerHarness { void sendRequestHeader(String path, {String authority = 'test', - String timeout, - Map metadata}) { - final headers = ClientCall.createCallHeaders(path, authority, - timeout: timeout, metadata: metadata); + Map metadata, + Duration timeout}) { + final options = new CallOptions(metadata: metadata, timeout: timeout); + final headers = + ClientConnection.createCallHeaders(true, authority, path, options); toServer.add(new HeadersStreamMessage(headers)); } diff --git a/test/src/utils.dart b/test/src/utils.dart index 54a8f08..92fd913 100644 --- a/test/src/utils.dart +++ b/test/src/utils.dart @@ -25,7 +25,7 @@ void validateRequestHeaders(List
headers, Map customHeaders}) { final headerMap = headersToMap(headers); expect(headerMap[':method'], 'POST'); - expect(headerMap[':scheme'], 'http'); + expect(headerMap[':scheme'], 'https'); if (path != null) { expect(headerMap[':path'], path); } diff --git a/test/timeout_test.dart b/test/timeout_test.dart index 0394bbe..f78d754 100644 --- a/test/timeout_test.dart +++ b/test/timeout_test.dart @@ -132,7 +132,8 @@ void main() { harness ..service.unaryHandler = methodHandler ..expectErrorResponse(StatusCode.deadlineExceeded, 'Deadline exceeded') - ..sendRequestHeader('/Test/Unary', metadata: {'grpc-timeout': '1u'}); + ..sendRequestHeader('/Test/Unary', + timeout: new Duration(microseconds: 1)); await harness.fromServer.done; }); });