Preparation for RPC multiplexing (#31)

First stage of separating Connection from Channel. A Channel manages
multiple Connections, and chooses which Connection to send an RPC on.

In this change, the Channel still creates a Connection for each RPC.
Managing the Connection life-cycle comes in a later change.
This commit is contained in:
Jakob Andersen 2017-09-25 13:51:40 +02:00 committed by GitHub
parent 05bb6a5d08
commit 2f118ea043
18 changed files with 322 additions and 220 deletions

View File

@ -22,10 +22,13 @@ class Client {
await runAddOneCancel(); await runAddOneCancel();
await runFibonacciCancel(); await runFibonacciCancel();
await runFibonacciTimeout(); await runFibonacciTimeout();
await channel.close(); await channel.shutdown();
} }
// Run the echo demo. ... /// Run the echo demo.
///
/// Send custom metadata with a RPC, and print out the received response and
/// metadata.
Future<Null> runEcho() async { Future<Null> runEcho() async {
final request = new Record()..value = 'Kaj'; final request = new Record()..value = 'Kaj';
final call = stub.echo(request, final call = stub.echo(request,
@ -40,7 +43,11 @@ class Client {
print('Echo response: ${response.value}'); print('Echo response: ${response.value}');
} }
// Run the echo with delay cancel demo. ... /// Run the echo with delay cancel demo.
///
/// Same as the echo demo, but demonstrating per-client custom metadata, as
/// well as a per-call metadata. The server will delay the response for the
/// requested duration, during which the client will cancel the RPC.
Future<Null> runEchoDelayCancel() async { Future<Null> runEchoDelayCancel() async {
final stubWithCustomOptions = new MetadataClient(channel, final stubWithCustomOptions = new MetadataClient(channel,
options: new CallOptions(metadata: {'peer': 'Verner'})); options: new CallOptions(metadata: {'peer': 'Verner'}));
@ -63,7 +70,10 @@ class Client {
} }
} }
// Run the addOne cancel demo. /// Run the addOne cancel demo.
///
/// Makes a bi-directional RPC, sends 4 requests, and cancels the RPC after
/// receiving 3 responses.
Future<Null> runAddOneCancel() async { Future<Null> runAddOneCancel() async {
final numbers = new StreamController<int>(); final numbers = new StreamController<int>();
final call = final call =
@ -74,7 +84,7 @@ class Client {
if (number.value == 3) { if (number.value == 3) {
receivedThree.complete(true); receivedThree.complete(true);
} }
}); }, onError: (e) => print('Caught: $e'));
numbers.add(1); numbers.add(1);
numbers.add(2); numbers.add(2);
numbers.add(3); numbers.add(3);
@ -84,23 +94,43 @@ class Client {
await Future.wait([sub.cancel(), numbers.close()]); await Future.wait([sub.cancel(), numbers.close()]);
} }
/// Run the Fibonacci demo.
///
/// Call an RPC that returns a stream of Fibonacci numbers. Cancel the call /// Call an RPC that returns a stream of Fibonacci numbers. Cancel the call
/// after receiving more than 5 responses. /// after receiving more than 5 responses.
Future<Null> runFibonacciCancel() async { Future<Null> runFibonacciCancel() async {
final call = stub.fibonacci(new Empty()); final call = stub.fibonacci(new Empty());
int count = 0; int count = 0;
await for (var number in call) { try {
count++; await for (var number in call) {
print('Received ${number.value} (count=$count)'); count++;
if (count > 5) { print('Received ${number.value} (count=$count)');
await call.cancel(); if (count > 5) {
await call.cancel();
}
} }
} on GrpcError catch (e) {
print('Caught: $e');
} }
print('Final count: $count'); print('Final count: $count');
} }
// Run the timeout demo. ... /// Run the timeout demo.
///
/// Call an RPC that returns a stream of Fibonacci numbers, and specify an RPC
/// timeout of 2 seconds.
Future<Null> runFibonacciTimeout() async { Future<Null> runFibonacciTimeout() async {
// TODO(jakobr): Implement timeouts. final call = stub.fibonacci(new Empty(),
options: new CallOptions(timeout: new Duration(seconds: 2)));
int count = 0;
try {
await for (var number in call) {
count++;
print('Received ${number.value} (count=$count)');
}
} on GrpcError catch (e) {
print('Caught: $e');
}
print('Final count: $count');
} }
} }

View File

@ -1,8 +1,7 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
/// ///
// ignore_for_file: non_constant_identifier_names // ignore_for_file: non_constant_identifier_names,library_prefixes
// ignore_for_file: library_prefixes
library grpc_metadata; library grpc_metadata;
// ignore: UNUSED_SHOWN_NAME // ignore: UNUSED_SHOWN_NAME

View File

@ -1,8 +1,7 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
/// ///
// ignore_for_file: non_constant_identifier_names // ignore_for_file: non_constant_identifier_names,library_prefixes
// ignore_for_file: library_prefixes
library grpc_metadata_pbgrpc; library grpc_metadata_pbgrpc;
import 'dart:async'; import 'dart:async';
@ -30,24 +29,19 @@ class MetadataClient extends Client {
: super(channel, options: options); : super(channel, options: options);
ResponseFuture<Record> echo(Record request, {CallOptions options}) { ResponseFuture<Record> echo(Record request, {CallOptions options}) {
final call = $createCall(_$echo, options: options); final call = $createCall(_$echo, new Stream.fromIterable([request]),
call.request options: options);
..add(request)
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseStream<Number> addOne(Stream<Number> request, {CallOptions options}) { ResponseStream<Number> addOne(Stream<Number> request, {CallOptions options}) {
final call = $createCall(_$addOne, options: options); final call = $createCall(_$addOne, request, options: options);
request.pipe(call.request);
return new ResponseStream(call); return new ResponseStream(call);
} }
ResponseStream<Number> fibonacci(Empty request, {CallOptions options}) { ResponseStream<Number> fibonacci(Empty request, {CallOptions options}) {
final call = $createCall(_$fibonacci, options: options); final call = $createCall(_$fibonacci, new Stream.fromIterable([request]),
call.request options: options);
..add(request)
..close();
return new ResponseStream(call); return new ResponseStream(call);
} }
} }

View File

@ -24,7 +24,7 @@ class Client {
await runListFeatures(); await runListFeatures();
await runRecordRoute(); await runRecordRoute();
await runRouteChat(); await runRouteChat();
await channel.close(); await channel.shutdown();
} }
void printFeature(Feature feature) { void printFeature(Feature feature) {

View File

@ -34,33 +34,27 @@ class RouteGuideClient extends Client {
: super(channel, options: options); : super(channel, options: options);
ResponseFuture<Feature> getFeature(Point request, {CallOptions options}) { ResponseFuture<Feature> getFeature(Point request, {CallOptions options}) {
final call = $createCall(_$getFeature, options: options); final call = $createCall(_$getFeature, new Stream.fromIterable([request]),
call.request options: options);
..add(request)
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseStream<Feature> listFeatures(Rectangle request, ResponseStream<Feature> listFeatures(Rectangle request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$listFeatures, options: options); final call = $createCall(_$listFeatures, new Stream.fromIterable([request]),
call.request options: options);
..add(request)
..close();
return new ResponseStream(call); return new ResponseStream(call);
} }
ResponseFuture<RouteSummary> recordRoute(Stream<Point> request, ResponseFuture<RouteSummary> recordRoute(Stream<Point> request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$recordRoute, options: options); final call = $createCall(_$recordRoute, request, options: options);
request.pipe(call.request);
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseStream<RouteNote> routeChat(Stream<RouteNote> request, ResponseStream<RouteNote> routeChat(Stream<RouteNote> request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$routeChat, options: options); final call = $createCall(_$routeChat, request, options: options);
request.pipe(call.request);
return new ResponseStream(call); return new ResponseStream(call);
} }
} }

View File

@ -84,8 +84,11 @@ class TestService extends TestServiceBase {
throw new GrpcError.custom( throw new GrpcError.custom(
request.responseStatus.code, request.responseStatus.message); request.responseStatus.code, request.responseStatus.message);
} }
return new StreamingOutputCallResponse() final response = new StreamingOutputCallResponse();
..payload = _payloadForRequest(request.responseParameters[0]); if (request.responseParameters.isNotEmpty) {
response.payload = _payloadForRequest(request.responseParameters[0]);
}
return response;
} }
@override @override

View File

@ -1,8 +1,7 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
/// ///
// ignore_for_file: non_constant_identifier_names // ignore_for_file: non_constant_identifier_names,library_prefixes
// ignore_for_file: library_prefixes
library grpc.testing_empty; library grpc.testing_empty;
// ignore: UNUSED_SHOWN_NAME // ignore: UNUSED_SHOWN_NAME

View File

@ -1,8 +1,7 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
/// ///
// ignore_for_file: non_constant_identifier_names // ignore_for_file: non_constant_identifier_names,library_prefixes
// ignore_for_file: library_prefixes
library grpc.testing_messages; library grpc.testing_messages;
// ignore: UNUSED_SHOWN_NAME // ignore: UNUSED_SHOWN_NAME

View File

@ -1,11 +1,10 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
/// ///
// ignore_for_file: non_constant_identifier_names // ignore_for_file: non_constant_identifier_names,library_prefixes
// ignore_for_file: library_prefixes
library grpc.testing_messages_pbenum; library grpc.testing_messages_pbenum;
// ignore: UNUSED_SHOWN_NAME // ignore_for_file: UNDEFINED_SHOWN_NAME,UNUSED_SHOWN_NAME
import 'dart:core' show int, dynamic, String, List, Map; import 'dart:core' show int, dynamic, String, List, Map;
import 'package:protobuf/protobuf.dart'; import 'package:protobuf/protobuf.dart';

View File

@ -1,8 +1,7 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
/// ///
// ignore_for_file: non_constant_identifier_names // ignore_for_file: non_constant_identifier_names,library_prefixes
// ignore_for_file: library_prefixes
library grpc.testing_test; library grpc.testing_test;
// ignore: UNUSED_SHOWN_NAME // ignore: UNUSED_SHOWN_NAME

View File

@ -1,8 +1,7 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
/// ///
// ignore_for_file: non_constant_identifier_names // ignore_for_file: non_constant_identifier_names,library_prefixes
// ignore_for_file: library_prefixes
library grpc.testing_test_pbgrpc; library grpc.testing_test_pbgrpc;
import 'dart:async'; import 'dart:async';
@ -60,71 +59,61 @@ class TestServiceClient extends Client {
: super(channel, options: options); : super(channel, options: options);
ResponseFuture<Empty> emptyCall(Empty request, {CallOptions options}) { ResponseFuture<Empty> emptyCall(Empty request, {CallOptions options}) {
final call = $createCall(_$emptyCall, options: options); final call = $createCall(_$emptyCall, new Stream.fromIterable([request]),
call.request options: options);
..add(request)
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseFuture<SimpleResponse> unaryCall(SimpleRequest request, ResponseFuture<SimpleResponse> unaryCall(SimpleRequest request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$unaryCall, options: options); final call = $createCall(_$unaryCall, new Stream.fromIterable([request]),
call.request options: options);
..add(request)
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseFuture<SimpleResponse> cacheableUnaryCall(SimpleRequest request, ResponseFuture<SimpleResponse> cacheableUnaryCall(SimpleRequest request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$cacheableUnaryCall, options: options); final call = $createCall(
call.request _$cacheableUnaryCall, new Stream.fromIterable([request]),
..add(request) options: options);
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseStream<StreamingOutputCallResponse> streamingOutputCall( ResponseStream<StreamingOutputCallResponse> streamingOutputCall(
StreamingOutputCallRequest request, StreamingOutputCallRequest request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$streamingOutputCall, options: options); final call = $createCall(
call.request _$streamingOutputCall, new Stream.fromIterable([request]),
..add(request) options: options);
..close();
return new ResponseStream(call); return new ResponseStream(call);
} }
ResponseFuture<StreamingInputCallResponse> streamingInputCall( ResponseFuture<StreamingInputCallResponse> streamingInputCall(
Stream<StreamingInputCallRequest> request, Stream<StreamingInputCallRequest> request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$streamingInputCall, options: options); final call = $createCall(_$streamingInputCall, request, options: options);
request.pipe(call.request);
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseStream<StreamingOutputCallResponse> fullDuplexCall( ResponseStream<StreamingOutputCallResponse> fullDuplexCall(
Stream<StreamingOutputCallRequest> request, Stream<StreamingOutputCallRequest> request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$fullDuplexCall, options: options); final call = $createCall(_$fullDuplexCall, request, options: options);
request.pipe(call.request);
return new ResponseStream(call); return new ResponseStream(call);
} }
ResponseStream<StreamingOutputCallResponse> halfDuplexCall( ResponseStream<StreamingOutputCallResponse> halfDuplexCall(
Stream<StreamingOutputCallRequest> request, Stream<StreamingOutputCallRequest> request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$halfDuplexCall, options: options); final call = $createCall(_$halfDuplexCall, request, options: options);
request.pipe(call.request);
return new ResponseStream(call); return new ResponseStream(call);
} }
ResponseFuture<Empty> unimplementedCall(Empty request, ResponseFuture<Empty> unimplementedCall(Empty request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$unimplementedCall, options: options); final call = $createCall(
call.request _$unimplementedCall, new Stream.fromIterable([request]),
..add(request) options: options);
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
} }
@ -228,10 +217,9 @@ class UnimplementedServiceClient extends Client {
ResponseFuture<Empty> unimplementedCall(Empty request, ResponseFuture<Empty> unimplementedCall(Empty request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$unimplementedCall, options: options); final call = $createCall(
call.request _$unimplementedCall, new Stream.fromIterable([request]),
..add(request) options: options);
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
} }
@ -271,18 +259,14 @@ class ReconnectServiceClient extends Client {
: super(channel, options: options); : super(channel, options: options);
ResponseFuture<Empty> start(ReconnectParams request, {CallOptions options}) { ResponseFuture<Empty> start(ReconnectParams request, {CallOptions options}) {
final call = $createCall(_$start, options: options); final call = $createCall(_$start, new Stream.fromIterable([request]),
call.request options: options);
..add(request)
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseFuture<ReconnectInfo> stop(Empty request, {CallOptions options}) { ResponseFuture<ReconnectInfo> stop(Empty request, {CallOptions options}) {
final call = $createCall(_$stop, options: options); final call = $createCall(_$stop, new Stream.fromIterable([request]),
call.request options: options);
..add(request)
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
} }

View File

@ -26,9 +26,10 @@ class ChannelOptions {
final bool _useTls; final bool _useTls;
final List<int> _certificateBytes; final List<int> _certificateBytes;
final String _certificatePassword; final String _certificatePassword;
final String authority;
const ChannelOptions._(this._useTls, const ChannelOptions._(this._useTls,
[this._certificateBytes, this._certificatePassword]); [this._certificateBytes, this._certificatePassword, this.authority]);
/// Enable TLS using the default trust store. /// Enable TLS using the default trust store.
const ChannelOptions() : this._(true); const ChannelOptions() : this._(true);
@ -37,8 +38,9 @@ class ChannelOptions {
const ChannelOptions.insecure() : this._(false); const ChannelOptions.insecure() : this._(false);
/// Enable TLS and specify the [certificate]s to trust. /// Enable TLS and specify the [certificate]s to trust.
ChannelOptions.secure({List<int> certificate, String password}) ChannelOptions.secure(
: this._(true, certificate, password); {List<int> certificate, String password, String authority})
: this._(true, certificate, password, authority);
SecurityContext get securityContext { SecurityContext get securityContext {
if (!_useTls) return null; if (!_useTls) return null;
@ -51,42 +53,157 @@ class ChannelOptions {
} }
} }
/// A channel to an RPC endpoint. /// A connection to a single RPC endpoint.
///
/// RPCs made on a connection are always sent to the same endpoint.
class ClientConnection {
static final _methodPost = new Header.ascii(':method', 'POST');
static final _schemeHttp = new Header.ascii(':scheme', 'http');
static final _schemeHttps = new Header.ascii(':scheme', 'https');
static final _contentTypeGrpc =
new Header.ascii('content-type', 'application/grpc');
static final _teTrailers = new Header.ascii('te', 'trailers');
static final _grpcAcceptEncoding =
new Header.ascii('grpc-accept-encoding', 'identity');
static final _userAgent = new Header.ascii('user-agent', 'dart-grpc/0.2.0');
final ClientTransportConnection _transport;
ClientConnection(this._transport);
static List<Header> createCallHeaders(
bool useTls, String authority, String path, CallOptions options) {
final headers = [
_methodPost,
useTls ? _schemeHttps : _schemeHttp,
new Header.ascii(':path', path),
new Header.ascii(':authority', authority),
];
if (options.timeout != null) {
headers.add(
new Header.ascii('grpc-timeout', toTimeoutString(options.timeout)));
}
headers.addAll([
_contentTypeGrpc,
_teTrailers,
_grpcAcceptEncoding,
_userAgent,
]);
options.metadata.forEach((key, value) {
headers.add(new Header.ascii(key, value));
});
return headers;
}
/// Shuts down this connection.
///
/// No further calls may be made on this connection, but existing calls
/// are allowed to finish.
Future<Null> shutdown() {
// TODO(jakobr): Manage streams, close [_transport] when all are done.
return _transport.finish();
}
/// Terminates this connection.
///
/// All open calls are terminated immediately, and no further calls may be
/// made on this connection.
Future<Null> terminate() {
// TODO(jakobr): Manage streams, close them immediately.
return _transport.terminate();
}
/// Starts a new RPC on this connection.
///
/// Creates a new transport stream on this connection, and sends initial call
/// metadata.
ClientTransportStream sendRequest(
bool useTls, String authority, String path, CallOptions options) {
final headers = createCallHeaders(useTls, authority, path, options);
final stream = _transport.makeRequest(headers);
// TODO(jakobr): Manage streams. Subscribe to stream state changes.
return stream;
}
}
/// A channel to a virtual RPC endpoint.
///
/// For each RPC, the channel picks a [ClientConnection] to dispatch the call.
/// RPCs on the same channel may be sent to different connections, depending on
/// load balancing settings.
class ClientChannel { class ClientChannel {
final String host; final String host;
final int port; final int port;
final ChannelOptions options; final ChannelOptions options;
final List<Socket> _sockets = []; final _connections = <ClientConnection>[];
final List<TransportConnection> _connections = [];
bool _isShutdown = false;
ClientChannel(this.host, ClientChannel(this.host,
{this.port = 443, this.options = const ChannelOptions()}); {this.port = 443, this.options = const ChannelOptions()});
/// Returns a connection to this [Channel]'s RPC endpoint. The connection may String get authority => options.authority ?? host;
/// be shared between multiple RPCs.
Future<ClientTransportConnection> connect() async { void _shutdownCheck([Function() cleanup]) {
if (!_isShutdown) return;
if (cleanup != null) cleanup();
throw new GrpcError.unavailable('Channel shutting down.');
}
/// Shuts down this channel.
///
/// No further RPCs can be made on this channel. RPCs already in progress will
/// be allowed to complete.
Future<Null> shutdown() {
if (_isShutdown) return new Future.value();
_isShutdown = true;
return Future.wait(_connections.map((c) => c.shutdown()));
}
/// Terminates this channel.
///
/// RPCs already in progress will be terminated. No further RPCs can be made
/// on this channel.
Future<Null> terminate() {
_isShutdown = true;
return Future.wait(_connections.map((c) => c.terminate()));
}
/// Returns a connection to this [Channel]'s RPC endpoint.
///
/// The connection may be shared between multiple RPCs.
Future<ClientConnection> connect() async {
_shutdownCheck();
final securityContext = options.securityContext; final securityContext = options.securityContext;
Socket socket; var socket = await Socket.connect(host, port);
if (securityContext == null) { _shutdownCheck(socket.destroy);
socket = await Socket.connect(host, port); if (securityContext != null) {
} else { socket = await SecureSocket.secure(socket,
socket = await SecureSocket.connect(host, port, host: authority, context: securityContext);
context: securityContext, supportedProtocols: ['grpc-exp', 'h2']); _shutdownCheck(socket.destroy);
} }
_sockets.add(socket); final connection =
final connection = new ClientTransportConnection.viaSocket(socket); new ClientConnection(new ClientTransportConnection.viaSocket(socket));
_connections.add(connection); _connections.add(connection);
return connection; return connection;
} }
/// Close all connections made on this [ClientChannel]. /// Initiates a new RPC on this connection.
Future<Null> close() async { ClientCall<Q, R> createCall<Q, R>(
await Future.wait(_connections.map((c) => c.finish())); ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options) {
_connections.clear(); final call = new ClientCall(method, requests, options.timeout);
await Future.wait(_sockets.map((s) => s.close())); connect().then((connection) {
_sockets.clear(); // TODO(jakobr): Check if deadline is exceeded.
if (call._isCancelled) return;
final stream = connection.sendRequest(
this.options._useTls, authority, method.path, options);
call._onConnectedStream(stream);
}, onError: (error) {
call._onConnectError(error);
});
return call;
} }
} }
@ -135,105 +252,73 @@ class Client {
Client(this._channel, {CallOptions options}) Client(this._channel, {CallOptions options})
: _options = options ?? new CallOptions(); : _options = options ?? new CallOptions();
ClientCall<Q, R> $createCall<Q, R>(ClientMethod<Q, R> method, ClientCall<Q, R> $createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests,
{CallOptions options}) { {CallOptions options}) {
return new ClientCall(_channel, method, return _channel.createCall(method, requests, _options.mergedWith(options));
options: _options.mergedWith(options));
} }
} }
/// An active call to a gRPC endpoint. /// An active call to a gRPC endpoint.
class ClientCall<Q, R> implements Response { class ClientCall<Q, R> implements Response {
static final _methodPost = new Header.ascii(':method', 'POST');
static final _schemeHttp = new Header.ascii(':scheme', 'http');
static final _contentTypeGrpc =
new Header.ascii('content-type', 'application/grpc');
static final _teTrailers = new Header.ascii('te', 'trailers');
static final _grpcAcceptEncoding =
new Header.ascii('grpc-accept-encoding', 'identity');
static final _userAgent = new Header.ascii('user-agent', 'dart-grpc/0.2.0');
final ClientChannel _channel;
final ClientMethod<Q, R> _method; final ClientMethod<Q, R> _method;
final Stream<Q> _requests;
final Completer<Map<String, String>> _headers = new Completer(); final _headers = new Completer<Map<String, String>>();
final Completer<Map<String, String>> _trailers = new Completer(); final _trailers = new Completer<Map<String, String>>();
bool _hasReceivedResponses = false; bool _hasReceivedResponses = false;
Map<String, String> _headerMetadata; Map<String, String> _headerMetadata;
TransportStream _stream; TransportStream _stream;
final _requests = new StreamController<Q>();
StreamController<R> _responses; StreamController<R> _responses;
StreamSubscription<StreamMessage> _requestSubscription;
StreamSubscription<GrpcMessage> _responseSubscription; StreamSubscription<GrpcMessage> _responseSubscription;
final CallOptions options; bool _isCancelled = false;
Future<Null> _callSetup;
Timer _timeoutTimer; Timer _timeoutTimer;
ClientCall(this._channel, this._method, {this.options}) { ClientCall(this._method, this._requests, Duration timeout) {
_responses = new StreamController(onListen: _onResponseListen); _responses = new StreamController(onListen: _onResponseListen);
final timeout = options?.timeout;
if (timeout != null) { if (timeout != null) {
_timeoutTimer = new Timer(timeout, _onTimedOut); _timeoutTimer = new Timer(timeout, _onTimedOut);
} }
_callSetup = _initiateCall(timeout).catchError((error) {
_responses.addError(
new GrpcError.unavailable('Error connecting: ${error.toString()}'));
_timeoutTimer?.cancel();
});
} }
void _onTimedOut() { void _onConnectError(error) {
_responses.addError(new GrpcError.deadlineExceeded('Deadline exceeded')); if (!_responses.isClosed) {
cancel().catchError((_) {}); _responses
} .addError(new GrpcError.unavailable('Error connecting: $error'));
static List<Header> createCallHeaders(String path, String authority,
{String timeout, Map<String, String> metadata}) {
// TODO(jakobr): Populate HTTP-specific headers in connection?
final headers = <Header>[
_methodPost,
_schemeHttp,
new Header.ascii(':path', path),
new Header.ascii(':authority', authority),
];
if (timeout != null) {
headers.add(new Header.ascii('grpc-timeout', timeout));
} }
headers.addAll([ _safeTerminate();
_contentTypeGrpc,
_teTrailers,
_grpcAcceptEncoding,
_userAgent,
]);
metadata?.forEach((key, value) {
headers.add(new Header.ascii(key, value));
});
return headers;
} }
Future<Null> _initiateCall(Duration timeout) async { void _onConnectedStream(ClientTransportStream stream) {
final connection = await _channel.connect(); if (_isCancelled) {
final timeoutString = toTimeoutString(timeout); // Should not happen, but just in case.
// TODO(jakobr): Flip this around, and have the Channel create the call stream.terminate();
// object and apply options (including the above TODO). return;
final headers = createCallHeaders(_method.path, _channel.host, }
timeout: timeoutString, metadata: options?.metadata); _stream = stream;
_stream = connection.makeRequest(headers); _requestSubscription = _requests
_requests.stream
.map(_method.requestSerializer) .map(_method.requestSerializer)
.map(GrpcHttpEncoder.frame) .map(GrpcHttpEncoder.frame)
.map<StreamMessage>((bytes) => new DataStreamMessage(bytes)) .map<StreamMessage>((bytes) => new DataStreamMessage(bytes))
.handleError(_onRequestError) .handleError(_onRequestError)
.pipe(_stream.outgoingMessages) .listen(_stream.outgoingMessages.add,
.catchError(_onRequestError); onError: _stream.outgoingMessages.addError,
onDone: _stream.outgoingMessages.close,
cancelOnError: true);
// The response stream might have been listened to before _stream was ready, // The response stream might have been listened to before _stream was ready,
// so try setting up the subscription here as well. // so try setting up the subscription here as well.
_onResponseListen(); _onResponseListen();
} }
void _onTimedOut() {
_responses.addError(new GrpcError.deadlineExceeded('Deadline exceeded'));
_safeTerminate();
}
/// Subscribe to incoming response messages, once [_stream] is available, and /// Subscribe to incoming response messages, once [_stream] is available, and
/// the caller has subscribed to the [_responses] stream. /// the caller has subscribed to the [_responses] stream.
void _onResponseListen() { void _onResponseListen() {
@ -260,6 +345,7 @@ class ClientCall<Q, R> implements Response {
void _responseError(GrpcError error) { void _responseError(GrpcError error) {
_responses.addError(error); _responses.addError(error);
_timeoutTimer?.cancel(); _timeoutTimer?.cancel();
_requestSubscription?.cancel();
_responseSubscription.cancel(); _responseSubscription.cancel();
_responses.close(); _responses.close();
_stream.terminate(); _stream.terminate();
@ -359,11 +445,11 @@ class ClientCall<Q, R> implements Response {
_responses.addError(error); _responses.addError(error);
_timeoutTimer?.cancel(); _timeoutTimer?.cancel();
_responses.close(); _responses.close();
_requestSubscription?.cancel();
_responseSubscription?.cancel(); _responseSubscription?.cancel();
_stream.terminate(); _stream.terminate();
} }
StreamSink<Q> get request => _requests.sink;
Stream<R> get response => _responses.stream; Stream<R> get response => _responses.stream;
@override @override
@ -373,21 +459,32 @@ class ClientCall<Q, R> implements Response {
Future<Map<String, String>> get trailers => _trailers.future; Future<Map<String, String>> get trailers => _trailers.future;
@override @override
Future<Null> cancel() async { Future<Null> cancel() {
if (!_responses.isClosed) {
_responses.addError(new GrpcError.cancelled('Cancelled by client.'));
}
return _terminate();
}
Future<Null> _terminate() async {
_isCancelled = true;
_timeoutTimer?.cancel(); _timeoutTimer?.cancel();
_callSetup.whenComplete(() {
// Terminate the stream if the call connects after being canceled.
_stream?.terminate();
});
// Don't await _responses.close() here. It'll only complete once the done // Don't await _responses.close() here. It'll only complete once the done
// event has been delivered, and it's the caller of this function that is // event has been delivered, and it's the caller of this function that is
// reading from responses as well, so we might end up deadlocked. // reading from responses as well, so we might end up deadlocked.
_responses.close(); _responses.close();
_stream?.terminate(); _stream?.terminate();
final futures = <Future>[_requests.close()]; final futures = <Future>[];
if (_requestSubscription != null) {
futures.add(_requestSubscription.cancel());
}
if (_responseSubscription != null) { if (_responseSubscription != null) {
futures.add(_responseSubscription.cancel()); futures.add(_responseSubscription.cancel());
} }
await Future.wait(futures); await Future.wait(futures);
} }
Future<Null> _safeTerminate() {
return _terminate().catchError((_) {});
}
} }

View File

@ -217,6 +217,7 @@ class ServerHandler {
void handle() { void handle() {
_stream.onTerminated = (int errorCode) { _stream.onTerminated = (int errorCode) {
_isCanceled = true; _isCanceled = true;
_timeoutTimer?.cancel();
_cancelResponseSubscription(); _cancelResponseSubscription();
}; };
@ -330,15 +331,16 @@ class ServerHandler {
} }
void _onTimedOut() { void _onTimedOut() {
if (_isCanceled) return;
_isTimedOut = true; _isTimedOut = true;
_isCanceled = true; _isCanceled = true;
final error = new GrpcError.deadlineExceeded('Deadline exceeded'); final error = new GrpcError.deadlineExceeded('Deadline exceeded');
_sendError(error);
if (!_requests.isClosed) { if (!_requests.isClosed) {
_requests _requests
..addError(error) ..addError(error)
..close(); ..close();
} }
_sendError(error);
} }
// -- Active state, incoming data -- // -- Active state, incoming data --
@ -473,7 +475,7 @@ class ServerHandler {
// client, so we treat it as such. // client, so we treat it as such.
_timeoutTimer?.cancel(); _timeoutTimer?.cancel();
_isCanceled = true; _isCanceled = true;
if (!_requests.isClosed) { if (_requests != null && !_requests.isClosed) {
_requests.addError(new GrpcError.cancelled('Cancelled')); _requests.addError(new GrpcError.cancelled('Cancelled'));
} }
_cancelResponseSubscription(); _cancelResponseSubscription();

View File

@ -276,8 +276,7 @@ void main() {
}); });
test('Connection errors are reported', () async { test('Connection errors are reported', () async {
reset(harness.channel); harness.channel.connectionError = 'Connection error';
when(harness.channel.connect()).thenThrow('Connection error');
final expectedError = final expectedError =
new GrpcError.unavailable('Error connecting: Connection error'); new GrpcError.unavailable('Error connecting: Connection error');
harness.expectThrows(harness.client.unary(dummyValue), expectedError); harness.expectThrows(harness.client.unary(dummyValue), expectedError);

View File

@ -13,17 +13,27 @@ import 'package:grpc/grpc.dart';
import 'utils.dart'; import 'utils.dart';
class MockConnection extends Mock implements ClientTransportConnection {} class MockTransport extends Mock implements ClientTransportConnection {}
class MockStream extends Mock implements ClientTransportStream {} class MockStream extends Mock implements ClientTransportStream {}
class MockChannel extends Mock implements ClientChannel {} class MockChannel extends ClientChannel {
final ClientConnection connection;
var connectionError;
MockChannel(String host, this.connection) : super(host);
@override
Future<ClientConnection> connect() async {
if (connectionError != null) throw connectionError;
return connection;
}
}
typedef ServerMessageHandler = void Function(StreamMessage message); typedef ServerMessageHandler = void Function(StreamMessage message);
class TestClient { class TestClient extends Client {
final ClientChannel _channel;
static final _$unary = static final _$unary =
new ClientMethod<int, int>('/Test/Unary', mockEncode, mockDecode); new ClientMethod<int, int>('/Test/Unary', mockEncode, mockDecode);
static final _$clientStreaming = new ClientMethod<int, int>( static final _$clientStreaming = new ClientMethod<int, int>(
@ -33,41 +43,37 @@ class TestClient {
static final _$bidirectional = static final _$bidirectional =
new ClientMethod<int, int>('/Test/Bidirectional', mockEncode, mockDecode); new ClientMethod<int, int>('/Test/Bidirectional', mockEncode, mockDecode);
TestClient(this._channel); TestClient(ClientChannel channel, {CallOptions options})
: super(channel, options: options);
ResponseFuture<int> unary(int request, {CallOptions options}) { ResponseFuture<int> unary(int request, {CallOptions options}) {
final call = new ClientCall(_channel, _$unary, options: options); final call = $createCall(_$unary, new Stream.fromIterable([request]),
call.request options: options);
..add(request)
..close();
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseFuture<int> clientStreaming(Stream<int> request, ResponseFuture<int> clientStreaming(Stream<int> request,
{CallOptions options}) { {CallOptions options}) {
final call = new ClientCall(_channel, _$clientStreaming, options: options); final call = $createCall(_$clientStreaming, request, options: options);
request.pipe(call.request);
return new ResponseFuture(call); return new ResponseFuture(call);
} }
ResponseStream<int> serverStreaming(int request, {CallOptions options}) { ResponseStream<int> serverStreaming(int request, {CallOptions options}) {
final call = new ClientCall(_channel, _$serverStreaming, options: options); final call = $createCall(
call.request _$serverStreaming, new Stream.fromIterable([request]),
..add(request) options: options);
..close();
return new ResponseStream(call); return new ResponseStream(call);
} }
ResponseStream<int> bidirectional(Stream<int> request, ResponseStream<int> bidirectional(Stream<int> request,
{CallOptions options}) { {CallOptions options}) {
final call = new ClientCall(_channel, _$bidirectional, options: options); final call = $createCall(_$bidirectional, request, options: options);
request.pipe(call.request);
return new ResponseStream(call); return new ResponseStream(call);
} }
} }
class ClientHarness { class ClientHarness {
MockConnection connection; MockTransport transport;
MockChannel channel; MockChannel channel;
MockStream stream; MockStream stream;
@ -77,14 +83,12 @@ class ClientHarness {
TestClient client; TestClient client;
void setUp() { void setUp() {
connection = new MockConnection(); transport = new MockTransport();
channel = new MockChannel(); channel = new MockChannel('test', new ClientConnection(transport));
stream = new MockStream(); stream = new MockStream();
fromClient = new StreamController(); fromClient = new StreamController();
toClient = new StreamController(); toClient = new StreamController();
when(channel.host).thenReturn('test'); when(transport.makeRequest(any)).thenReturn(stream);
when(channel.connect()).thenReturn(connection);
when(connection.makeRequest(any)).thenReturn(stream);
when(stream.outgoingMessages).thenReturn(fromClient.sink); when(stream.outgoingMessages).thenReturn(fromClient.sink);
when(stream.incomingMessages).thenReturn(toClient.stream); when(stream.incomingMessages).thenReturn(toClient.stream);
client = new TestClient(channel); client = new TestClient(channel);
@ -134,10 +138,8 @@ class ClientHarness {
expect(result, expectedResult); expect(result, expectedResult);
} }
verify(channel.connect()).called(1);
final List<Header> capturedHeaders = final List<Header> capturedHeaders =
verify(connection.makeRequest(captureAny)).captured.single; verify(transport.makeRequest(captureAny)).captured.single;
validateRequestHeaders(capturedHeaders, validateRequestHeaders(capturedHeaders,
path: expectedPath, path: expectedPath,
timeout: toTimeoutString(expectedTimeout), timeout: toTimeoutString(expectedTimeout),

View File

@ -143,10 +143,11 @@ class ServerHarness {
void sendRequestHeader(String path, void sendRequestHeader(String path,
{String authority = 'test', {String authority = 'test',
String timeout, Map<String, String> metadata,
Map<String, String> metadata}) { Duration timeout}) {
final headers = ClientCall.createCallHeaders(path, authority, final options = new CallOptions(metadata: metadata, timeout: timeout);
timeout: timeout, metadata: metadata); final headers =
ClientConnection.createCallHeaders(true, authority, path, options);
toServer.add(new HeadersStreamMessage(headers)); toServer.add(new HeadersStreamMessage(headers));
} }

View File

@ -25,7 +25,7 @@ void validateRequestHeaders(List<Header> headers,
Map<String, String> customHeaders}) { Map<String, String> customHeaders}) {
final headerMap = headersToMap(headers); final headerMap = headersToMap(headers);
expect(headerMap[':method'], 'POST'); expect(headerMap[':method'], 'POST');
expect(headerMap[':scheme'], 'http'); expect(headerMap[':scheme'], 'https');
if (path != null) { if (path != null) {
expect(headerMap[':path'], path); expect(headerMap[':path'], path);
} }

View File

@ -132,7 +132,8 @@ void main() {
harness harness
..service.unaryHandler = methodHandler ..service.unaryHandler = methodHandler
..expectErrorResponse(StatusCode.deadlineExceeded, 'Deadline exceeded') ..expectErrorResponse(StatusCode.deadlineExceeded, 'Deadline exceeded')
..sendRequestHeader('/Test/Unary', metadata: {'grpc-timeout': '1u'}); ..sendRequestHeader('/Test/Unary',
timeout: new Duration(microseconds: 1));
await harness.fromServer.done; await harness.fromServer.done;
}); });
}); });