Improve interfaces (#163)

ChannelCredentials is now a http2-only-thing

ClientCall now asks the Transport about the authority.

The Xhr client-channel now takes a Uri.
This commit is contained in:
Sigurd Meldgaard 2019-04-02 15:15:40 +02:00 committed by GitHub
parent afe0aea7f5
commit edc0c19073
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 189 additions and 181 deletions

View File

@ -19,8 +19,7 @@ import 'package:grpc_web/app.dart';
import 'package:grpc_web/src/generated/echo.pbgrpc.dart'; import 'package:grpc_web/src/generated/echo.pbgrpc.dart';
void main() { void main() {
final channel = new GrpcWebClientChannel.xhr('http://localhost', final channel = new GrpcWebClientChannel.xhr(Uri.parse('http://localhost:8080'));
port: 8080);
final service = EchoServiceClient(channel); final service = EchoServiceClient(channel);
final app = EchoApp(service); final app = EchoApp(service);

View File

@ -95,7 +95,7 @@ class Tester {
if (_useTestCA) { if (_useTestCA) {
trustedRoot = new File('ca.pem').readAsBytesSync(); trustedRoot = new File('ca.pem').readAsBytesSync();
} }
credentials = new Http2ChannelCredentials.secure( credentials = new ChannelCredentials.secure(
certificates: trustedRoot, authority: serverHostOverride); certificates: trustedRoot, authority: serverHostOverride);
} else { } else {
credentials = const ChannelCredentials.insecure(); credentials = const ChannelCredentials.insecure();
@ -472,8 +472,7 @@ class Tester {
responses.map((response) => response.payload.body.length).toList(); responses.map((response) => response.payload.body.length).toList();
if (!new ListEquality().equals(responseLengths, expectedResponses)) { if (!new ListEquality().equals(responseLengths, expectedResponses)) {
throw 'Incorrect response lengths received (${responseLengths.join( throw 'Incorrect response lengths received (${responseLengths.join(', ')} != ${expectedResponses.join(', ')})';
', ')} != ${expectedResponses.join(', ')})';
} }
} }
@ -586,8 +585,7 @@ class Tester {
requests.add(index); requests.add(index);
await for (final response in responses) { await for (final response in responses) {
if (index >= expectedResponses.length) { if (index >= expectedResponses.length) {
throw 'Received too many responses. $index > ${expectedResponses throw 'Received too many responses. $index > ${expectedResponses.length}.';
.length}.';
} }
if (response.payload.body.length != expectedResponses[index]) { if (response.payload.body.length != expectedResponses[index]) {
throw 'Response mismatch for response $index: ' throw 'Response mismatch for response $index: '

View File

@ -32,14 +32,15 @@ export 'src/client/options.dart'
defaultIdleTimeout, defaultIdleTimeout,
BackoffStrategy, BackoffStrategy,
defaultBackoffStrategy, defaultBackoffStrategy,
ChannelCredentials,
ChannelOptions,
MetadataProvider, MetadataProvider,
CallOptions; CallOptions;
// TODO(sigurdm): Get rid of Http2ChannelCredentials.
export 'src/client/transport/http2_credentials.dart' export 'src/client/transport/http2_credentials.dart'
show BadCertificateHandler, allowBadCertificates, Http2ChannelCredentials; show
BadCertificateHandler,
allowBadCertificates,
ChannelCredentials,
ChannelOptions;
export 'src/server/call.dart' show ServiceCall; export 'src/server/call.dart' show ServiceCall;
export 'src/server/handler.dart' show ServerHandler; export 'src/server/handler.dart' show ServerHandler;

View File

@ -31,7 +31,6 @@ export 'src/client/options.dart'
defaultIdleTimeout, defaultIdleTimeout,
BackoffStrategy, BackoffStrategy,
defaultBackoffStrategy, defaultBackoffStrategy,
ChannelCredentials,
ChannelOptions, ChannelOptions,
MetadataProvider, MetadataProvider,
CallOptions; CallOptions;

View File

@ -59,8 +59,6 @@ class ClientCall<Q, R> implements Response {
} }
} }
String get path => _method.path;
void onConnectionError(error) { void onConnectionError(error) {
_terminateWithError(new GrpcError.unavailable('Error connecting: $error')); _terminateWithError(new GrpcError.unavailable('Error connecting: $error'));
} }
@ -91,16 +89,10 @@ class ClientCall<Q, R> implements Response {
_sendRequest(connection, _sanitizeMetadata(options.metadata)); _sendRequest(connection, _sanitizeMetadata(options.metadata));
} else { } else {
final metadata = new Map<String, String>.from(options.metadata); final metadata = new Map<String, String>.from(options.metadata);
String audience; Future.forEach(
if (connection.options.credentials.isSecure) { options.metadataProviders,
final port = connection.port != 443 ? ':${connection.port}' : ''; (provider) => provider(
final lastSlashPos = path.lastIndexOf('/'); metadata, "${connection.authority}${audiencePath(_method)}"))
final audiencePath =
lastSlashPos == -1 ? path : path.substring(0, lastSlashPos);
audience = 'https://${connection.authority}$port$audiencePath';
}
Future.forEach(options.metadataProviders,
(provider) => provider(metadata, audience))
.then((_) => _sendRequest(connection, _sanitizeMetadata(metadata))) .then((_) => _sendRequest(connection, _sanitizeMetadata(metadata)))
.catchError(_onMetadataProviderError); .catchError(_onMetadataProviderError);
} }
@ -113,7 +105,7 @@ class ClientCall<Q, R> implements Response {
void _sendRequest(ClientConnection connection, Map<String, String> metadata) { void _sendRequest(ClientConnection connection, Map<String, String> metadata) {
try { try {
_stream = connection.makeRequest( _stream = connection.makeRequest(
path, options.timeout, metadata, _onRequestError); _method.path, options.timeout, metadata, _onRequestError);
} catch (e) { } catch (e) {
_terminateWithError(new GrpcError.unavailable('Error making call: $e')); _terminateWithError(new GrpcError.unavailable('Error making call: $e'));
return; return;

View File

@ -21,59 +21,58 @@ import 'call.dart';
import 'connection.dart'; import 'connection.dart';
import 'method.dart'; import 'method.dart';
import 'options.dart'; import 'options.dart';
import 'transport/transport.dart';
typedef ConnectTransport = Future<Transport> Function(
String host, int port, ChannelOptions options);
/// A channel to a virtual RPC endpoint. /// A channel to a virtual RPC endpoint.
///
/// For each RPC, the channel picks a [ClientConnection] to dispatch the call.
/// RPCs on the same channel may be sent to different connections, depending on
/// load balancing settings.
abstract class ClientChannel { abstract class ClientChannel {
final String host; /// Shuts down this channel.
final int port; ///
final ChannelOptions options; /// No further RPCs can be made on this channel. RPCs already in progress will
final ConnectTransport connectTransport; /// be allowed to complete.
Future<void> shutdown();
/// Terminates this channel.
///
/// RPCs already in progress will be terminated. No further RPCs can be made
/// on this channel.
Future<void> terminate();
/// Initiates a new RPC on this connection.
ClientCall<Q, R> createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options);
}
/// Auxiliary base class implementing much of ClientChannel.
abstract class ClientChannelBase implements ClientChannel {
// TODO(jakobr): Multiple connections, load balancing. // TODO(jakobr): Multiple connections, load balancing.
ClientConnection _connection; ClientConnection _connection;
bool _isShutdown = false; bool _isShutdown = false;
ClientChannel(this.host, this.connectTransport, ClientChannelBase();
{this.port = 443, this.options = const ChannelOptions()});
/// Shuts down this channel. @override
///
/// No further RPCs can be made on this channel. RPCs already in progress will
/// be allowed to complete.
Future<void> shutdown() async { Future<void> shutdown() async {
if (_isShutdown) return; if (_isShutdown) return;
_isShutdown = true; _isShutdown = true;
if (_connection != null) await _connection.shutdown(); if (_connection != null) await _connection.shutdown();
} }
/// Terminates this channel. @override
///
/// RPCs already in progress will be terminated. No further RPCs can be made
/// on this channel.
Future<void> terminate() async { Future<void> terminate() async {
_isShutdown = true; _isShutdown = true;
if (_connection != null) await _connection.terminate(); if (_connection != null) await _connection.terminate();
} }
ClientConnection createConnection();
/// Returns a connection to this [Channel]'s RPC endpoint. /// Returns a connection to this [Channel]'s RPC endpoint.
/// ///
/// The connection may be shared between multiple RPCs. /// The connection may be shared between multiple RPCs.
Future<ClientConnection> getConnection() async { Future<ClientConnection> getConnection() async {
if (_isShutdown) throw new GrpcError.unavailable('Channel shutting down.'); if (_isShutdown) throw new GrpcError.unavailable('Channel shutting down.');
return _connection ??= return _connection ??= createConnection();
new ClientConnection(host, port, options, connectTransport);
} }
/// Initiates a new RPC on this connection.
ClientCall<Q, R> createCall<Q, R>( ClientCall<Q, R> createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options) { ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options) {
final call = new ClientCall(method, requests, options); final call = new ClientCall(method, requests, options);

View File

@ -39,14 +39,9 @@ enum ConnectionState {
shutdown shutdown
} }
/// A connection to a single RPC endpoint.
///
/// RPCs made on a connection are always sent to the same endpoint.
class ClientConnection { class ClientConnection {
final String host;
final int port;
final ChannelOptions options; final ChannelOptions options;
final ConnectTransport connectTransport; final Future<Transport> Function() _connectTransport;
ConnectionState _state = ConnectionState.idle; ConnectionState _state = ConnectionState.idle;
void Function(ClientConnection connection) onStateChanged; void Function(ClientConnection connection) onStateChanged;
@ -57,20 +52,19 @@ class ClientConnection {
/// Used for idle and reconnect timeout, depending on [_state]. /// Used for idle and reconnect timeout, depending on [_state].
Timer _timer; Timer _timer;
Duration _currentReconnectDelay; Duration _currentReconnectDelay;
String get authority => _transport.authority;
ClientConnection(this.host, this.port, this.options, this.connectTransport); ClientConnection(this.options, this._connectTransport);
ConnectionState get state => _state; ConnectionState get state => _state;
String get authority => options.credentials.authority ?? host;
void _connect() { void _connect() {
if (_state != ConnectionState.idle && if (_state != ConnectionState.idle &&
_state != ConnectionState.transientFailure) { _state != ConnectionState.transientFailure) {
return; return;
} }
_setState(ConnectionState.connecting); _setState(ConnectionState.connecting);
connectTransport(host, port, options).then((transport) { _connectTransport().then((transport) {
_currentReconnectDelay = null; _currentReconnectDelay = null;
_transport = transport; _transport = transport;
_transport.onActiveStateChanged = _handleActiveStateChanged; _transport.onActiveStateChanged = _handleActiveStateChanged;

View File

@ -14,21 +14,35 @@
// limitations under the License. // limitations under the License.
import 'dart:async'; import 'dart:async';
import 'package:meta/meta.dart';
import 'channel.dart' as channel; import 'channel.dart';
import 'options.dart'; import 'connection.dart';
import 'transport/http2_credentials.dart';
import 'transport/http2_transport.dart'; import 'transport/http2_transport.dart';
import 'transport/transport.dart'; import 'transport/transport.dart';
@visibleForTesting /// A channel to a virtual gRPC endpoint.
Future<Transport> connectTransport( ///
String host, int port, ChannelOptions options) async { /// For each RPC, the channel picks a [ClientConnection] to dispatch the call.
return Http2Transport(host, port, options)..connect(); /// RPCs on the same channel may be sent to different connections, depending on
} /// load balancing settings.
class ClientChannel extends ClientChannelBase {
final String host;
final int port;
final ChannelOptions options;
class ClientChannel extends channel.ClientChannel { ClientChannel(this.host,
ClientChannel(String host, {this.port = 443, this.options = const ChannelOptions()})
{int port = 443, ChannelOptions options = const ChannelOptions()}) : super();
: super(host, connectTransport, port: port, options: options);
Future<Transport> _connectTransport() async {
final result = Http2Transport(host, port, options.credentials);
await result.connect();
return result;
}
@override
ClientConnection createConnection() {
return ClientConnection(options, _connectTransport);
}
} }

View File

@ -21,3 +21,11 @@ class ClientMethod<Q, R> {
ClientMethod(this.path, this.requestSerializer, this.responseDeserializer); ClientMethod(this.path, this.requestSerializer, this.responseDeserializer);
} }
// TODO(sigurdm): Find out why we do this.
String audiencePath(ClientMethod method) {
final lastSlashPos = method.path.lastIndexOf('/');
return lastSlashPos == -1
? method.path
: method.path.substring(0, lastSlashPos);
}

View File

@ -16,8 +16,6 @@
import 'dart:async'; import 'dart:async';
import 'dart:math'; import 'dart:math';
import 'package:meta/meta.dart';
const defaultIdleTimeout = const Duration(minutes: 5); const defaultIdleTimeout = const Duration(minutes: 5);
typedef Duration BackoffStrategy(Duration lastBackoff); typedef Duration BackoffStrategy(Duration lastBackoff);
@ -36,29 +34,15 @@ Duration defaultBackoffStrategy(Duration lastBackoff) {
return nextBackoff < _maxBackoff ? nextBackoff : _maxBackoff; return nextBackoff < _maxBackoff ? nextBackoff : _maxBackoff;
} }
/// Options controlling TLS security settings on a [ClientChannel].
class ChannelCredentials {
final bool isSecure;
final String authority;
@visibleForOverriding
const ChannelCredentials(this.isSecure, this.authority);
/// Disable TLS. RPCs are sent in clear text.
const ChannelCredentials.insecure() : this(false, null);
}
/// Options controlling how connections are made on a [ClientChannel]. /// Options controlling how connections are made on a [ClientChannel].
class ChannelOptions { class ChannelOptions {
final ChannelCredentials credentials;
final Duration idleTimeout; final Duration idleTimeout;
final BackoffStrategy backoffStrategy; final BackoffStrategy backoffStrategy;
const ChannelOptions({ const ChannelOptions({
ChannelCredentials credentials,
this.idleTimeout = defaultIdleTimeout, this.idleTimeout = defaultIdleTimeout,
this.backoffStrategy = defaultBackoffStrategy, this.backoffStrategy = defaultBackoffStrategy,
}) : this.credentials = credentials ?? const ChannelCredentials.insecure(); });
} }
/// Provides per-RPC metadata. /// Provides per-RPC metadata.

View File

@ -16,7 +16,7 @@
import 'dart:io'; import 'dart:io';
import '../../shared/security.dart'; import '../../shared/security.dart';
import '../options.dart'; import '../options.dart' as options;
/// Handler for checking certificates that fail validation. If this handler /// Handler for checking certificates that fail validation. If this handler
/// returns `true`, the bad certificate is allowed, and the TLS handshake can /// returns `true`, the bad certificate is allowed, and the TLS handshake can
@ -30,24 +30,38 @@ typedef bool BadCertificateHandler(X509Certificate certificate, String host);
/// certificates, etc. /// certificates, etc.
bool allowBadCertificates(X509Certificate certificate, String host) => true; bool allowBadCertificates(X509Certificate certificate, String host) => true;
class Http2ChannelCredentials extends ChannelCredentials { class git ci ChannelOptions extends options.ChannelOptions {
final ChannelCredentials credentials;
const ChannelOptions({
this.credentials,
Duration idleTimeout = options.defaultIdleTimeout,
options.BackoffStrategy backoffStrategy = options.defaultBackoffStrategy,
}) : super(idleTimeout: idleTimeout, backoffStrategy: backoffStrategy);
}
class ChannelCredentials {
final bool isSecure;
final String authority;
final List<int> _certificateBytes; final List<int> _certificateBytes;
final String _certificatePassword; final String _certificatePassword;
final BadCertificateHandler onBadCertificate; final BadCertificateHandler onBadCertificate;
const Http2ChannelCredentials._(bool isSecure, String authority, const ChannelCredentials._(this.isSecure, this.authority,
this._certificateBytes, this._certificatePassword, this.onBadCertificate) this._certificateBytes, this._certificatePassword, this.onBadCertificate);
: super(isSecure, authority);
/// Enable TLS and optionally specify the [certificates] to trust. If /// Enable TLS and optionally specify the [certificates] to trust. If
/// [certificates] is not provided, the default trust store is used. /// [certificates] is not provided, the default trust store is used.
const Http2ChannelCredentials.secure( const ChannelCredentials.secure(
{List<int> certificates, {List<int> certificates,
String password, String password,
String authority, String authority,
BadCertificateHandler onBadCertificate}) BadCertificateHandler onBadCertificate})
: this._(true, authority, certificates, password, onBadCertificate); : this._(true, authority, certificates, password, onBadCertificate);
const ChannelCredentials.insecure() : this._(false, null, null, null, null);
SecurityContext get securityContext { SecurityContext get securityContext {
if (!isSecure) return null; if (!isSecure) return null;
if (_certificateBytes != null) { if (_certificateBytes != null) {

View File

@ -24,9 +24,7 @@ import '../../shared/message.dart';
import '../../shared/streams.dart'; import '../../shared/streams.dart';
import '../../shared/timeout.dart'; import '../../shared/timeout.dart';
import '../options.dart'; import 'http2_credentials.dart' as http2_credentials;
import 'http2_credentials.dart';
import 'transport.dart'; import 'transport.dart';
class Http2TransportStream extends GrpcTransportStream { class Http2TransportStream extends GrpcTransportStream {
@ -79,14 +77,14 @@ class Http2Transport extends Transport {
final String host; final String host;
final int port; final int port;
final ChannelOptions options; final http2_credentials.ChannelCredentials credentials;
@visibleForTesting @visibleForTesting
ClientTransportConnection transportConnection; ClientTransportConnection transportConnection;
Http2Transport(this.host, this.port, this.options); Http2Transport(this.host, this.port, this.credentials);
String get authority => options.credentials.authority ?? host; String get authority => credentials.authority ?? "$host:$port";
static List<Header> createCallHeaders(bool useTls, String authority, static List<Header> createCallHeaders(bool useTls, String authority,
String path, Duration timeout, Map<String, String> metadata) { String path, Duration timeout, Map<String, String> metadata) {
@ -115,15 +113,12 @@ class Http2Transport extends Transport {
Future<void> connect() async { Future<void> connect() async {
var socket = await Socket.connect(host, port); var socket = await Socket.connect(host, port);
final credentials = options.credentials; final securityContext = credentials.securityContext;
if (credentials is Http2ChannelCredentials) { if (securityContext != null) {
final securityContext = credentials.securityContext; socket = await SecureSocket.secure(socket,
if (securityContext != null) { host: authority,
socket = await SecureSocket.secure(socket, context: securityContext,
host: authority, onBadCertificate: _validateBadCertificate);
context: securityContext,
onBadCertificate: _validateBadCertificate);
}
} }
socket.done.then(_handleSocketClosed); socket.done.then(_handleSocketClosed);
transportConnection = ClientTransportConnection.viaSocket(socket); transportConnection = ClientTransportConnection.viaSocket(socket);
@ -133,7 +128,7 @@ class Http2Transport extends Transport {
GrpcTransportStream makeRequest(String path, Duration timeout, GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onError) { Map<String, String> metadata, ErrorHandler onError) {
final headers = createCallHeaders( final headers = createCallHeaders(
options.credentials.isSecure, authority, path, timeout, metadata); credentials.isSecure, authority, path, timeout, metadata);
final stream = transportConnection.makeRequest(headers); final stream = transportConnection.makeRequest(headers);
return new Http2TransportStream(stream, onError); return new Http2TransportStream(stream, onError);
} }
@ -149,14 +144,11 @@ class Http2Transport extends Transport {
} }
bool _validateBadCertificate(X509Certificate certificate) { bool _validateBadCertificate(X509Certificate certificate) {
final credentials = options.credentials; final credentials = this.credentials;
if (credentials is Http2ChannelCredentials) { final validator = credentials.onBadCertificate;
final validator = credentials.onBadCertificate;
if (validator == null) return false; if (validator == null) return false;
return validator(certificate, authority); return validator(certificate, authority);
}
return false;
} }
void _handleSocketClosed(_) { void _handleSocketClosed(_) {

View File

@ -32,6 +32,7 @@ abstract class Transport {
ActiveStateHandler onActiveStateChanged; ActiveStateHandler onActiveStateChanged;
SocketClosedHandler onSocketClosed; SocketClosedHandler onSocketClosed;
String get authority;
Future<void> connect(); Future<void> connect();
GrpcTransportStream makeRequest(String path, Duration timeout, GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onRequestFailure); Map<String, String> metadata, ErrorHandler onRequestFailure);

View File

@ -130,12 +130,13 @@ class XhrTransportStream implements GrpcTransportStream {
} }
class XhrTransport extends Transport { class XhrTransport extends Transport {
final String host; final Uri uri;
final int port;
HttpRequest _request; HttpRequest _request;
XhrTransport(this.host, this.port); XhrTransport(this.uri);
String get authority => uri.authority;
@override @override
Future<void> connect() async {} Future<void> connect() async {}
@ -160,7 +161,7 @@ class XhrTransport extends Transport {
GrpcTransportStream makeRequest(String path, Duration timeout, GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onError) { Map<String, String> metadata, ErrorHandler onError) {
_request = HttpRequest(); _request = HttpRequest();
_request.open('POST', '${host}:${port}${path}'); _request.open('POST', uri.resolve(path).toString());
initializeRequest(_request, metadata); initializeRequest(_request, metadata);

View File

@ -15,20 +15,28 @@
import 'dart:async'; import 'dart:async';
import 'package:grpc/src/client/options.dart';
import 'package:grpc/src/client/transport/transport.dart';
import 'package:grpc/src/client/transport/xhr_transport.dart';
import 'package:meta/meta.dart';
import 'channel.dart'; import 'channel.dart';
import 'connection.dart';
import 'options.dart';
import 'transport/transport.dart';
import 'transport/xhr_transport.dart';
@visibleForTesting /// A channel to a grpc-web endpoint.
Future<Transport> connectXhrTransport( class GrpcWebClientChannel extends ClientChannelBase {
String host, int port, ChannelOptions _) async { final Uri uri;
return XhrTransport(host, port)..connect(); ChannelOptions options;
}
class GrpcWebClientChannel extends ClientChannel { GrpcWebClientChannel.xhr(this.uri, {this.options: const ChannelOptions()})
GrpcWebClientChannel.xhr(String host, {int port = 443}) : super();
: super(host, connectXhrTransport, port: port);
Future<Transport> _connectXhrTransport() async {
final result = XhrTransport(uri);
await result.connect();
return result;
}
@override
ClientConnection createConnection() {
return ClientConnection(options, _connectXhrTransport);
}
} }

View File

@ -39,8 +39,8 @@ class MockHttp2Transport extends Http2Transport {
StreamController<StreamMessage> fromClient; StreamController<StreamMessage> fromClient;
StreamController<StreamMessage> toClient; StreamController<StreamMessage> toClient;
MockHttp2Transport(String host, int port, ChannelOptions options) MockHttp2Transport(String host, int port, ChannelCredentials credentials)
: super(host, port, options); : super(host, port, credentials);
@override @override
Future<void> connect() async { Future<void> connect() async {
@ -71,10 +71,7 @@ class MockHttp2Transport extends Http2Transport {
void main() { void main() {
final MockHttp2Transport transport = new MockHttp2Transport( final MockHttp2Transport transport = new MockHttp2Transport(
'host', 'host', 9999, ChannelCredentials.secure(authority: 'test'));
9999,
ChannelOptions(
credentials: new Http2ChannelCredentials.secure(authority: 'test')));
setUp(() { setUp(() {
transport.connect(); transport.connect();
@ -98,7 +95,8 @@ void main() {
timeout: toTimeoutString(Duration(seconds: 10))); timeout: toTimeoutString(Duration(seconds: 10)));
}; };
transport.makeRequest('test_path', Duration(seconds: 10), metadata); transport.makeRequest('test_path', Duration(seconds: 10), metadata,
(error) => fail(error.toString()));
}); });
test('Sent data converted to StreamMessages properly', () async { test('Sent data converted to StreamMessages properly', () async {
@ -107,8 +105,8 @@ void main() {
"parameter_2": "value_2" "parameter_2": "value_2"
}; };
final stream = final stream = transport.makeRequest('test_path', Duration(seconds: 10),
transport.makeRequest('test_path', Duration(seconds: 10), metadata); metadata, (error) => fail(error.toString()));
transport.fromClient.stream.listen((message) { transport.fromClient.stream.listen((message) {
final dataMessage = validateDataMessage(message); final dataMessage = validateDataMessage(message);
@ -124,8 +122,8 @@ void main() {
"parameter_2": "value_2" "parameter_2": "value_2"
}; };
final stream = final stream = transport.makeRequest('test_path', Duration(seconds: 10),
transport.makeRequest('test_path', Duration(seconds: 10), metadata); metadata, (error) => fail(error.toString()));
stream.incomingMessages.listen((message) { stream.incomingMessages.listen((message) {
expect(message, TypeMatcher<GrpcMetadata>()); expect(message, TypeMatcher<GrpcMetadata>());
@ -146,8 +144,8 @@ void main() {
"parameter_2": "value_2" "parameter_2": "value_2"
}; };
final stream = final stream = transport.makeRequest('test_path', Duration(seconds: 10),
transport.makeRequest('test_path', Duration(seconds: 10), metadata); metadata, (error) => fail(error.toString()));
final data = List<int>.filled(10, 0); final data = List<int>.filled(10, 0);
stream.incomingMessages.listen((message) { stream.incomingMessages.listen((message) {
expect(message, TypeMatcher<GrpcData>()); expect(message, TypeMatcher<GrpcData>());

View File

@ -28,24 +28,25 @@ import 'package:test/test.dart';
class MockHttpRequest extends Mock implements HttpRequest {} class MockHttpRequest extends Mock implements HttpRequest {}
class MockXhrTransport extends XhrTransport { class MockXhrTransport extends XhrTransport {
StreamController<Event> readyStateChangeStream = StreamController<Event>(); final StreamController<Event> readyStateChangeStream =
StreamController<ProgressEvent> progressStream = StreamController<Event>();
final StreamController<ProgressEvent> progressStream =
StreamController<ProgressEvent>(); StreamController<ProgressEvent>();
MockHttpRequest mockRequest; MockHttpRequest mockRequest;
MockXhrTransport(this.mockRequest) : super('test', 8080) {} MockXhrTransport(this.mockRequest) : super(Uri.parse('test:8080'));
@override @override
GrpcTransportStream makeRequest( GrpcTransportStream makeRequest(String path, Duration timeout,
String path, Duration timeout, Map<String, String> metadata) { Map<String, String> metadata, ErrorHandler onError) {
when(mockRequest.onReadyStateChange) when(mockRequest.onReadyStateChange)
.thenAnswer((_) => readyStateChangeStream.stream); .thenAnswer((_) => readyStateChangeStream.stream);
when(mockRequest.onProgress).thenAnswer((_) => progressStream.stream); when(mockRequest.onProgress).thenAnswer((_) => progressStream.stream);
initializeRequest(mockRequest, metadata); initializeRequest(mockRequest, metadata);
return XhrTransportStream(mockRequest); return XhrTransportStream(mockRequest, onError);
} }
@override @override
@ -65,7 +66,8 @@ void main() {
final mockRequest = MockHttpRequest(); final mockRequest = MockHttpRequest();
final transport = MockXhrTransport(mockRequest); final transport = MockXhrTransport(mockRequest);
transport.makeRequest('path', Duration(seconds: 10), metadata); transport.makeRequest('path', Duration(seconds: 10), metadata,
(error) => fail(error.toString()));
verify(mockRequest.setRequestHeader( verify(mockRequest.setRequestHeader(
'Content-Type', 'application/grpc-web+proto')); 'Content-Type', 'application/grpc-web+proto'));
@ -84,8 +86,8 @@ void main() {
final mockRequest = MockHttpRequest(); final mockRequest = MockHttpRequest();
final transport = MockXhrTransport(mockRequest); final transport = MockXhrTransport(mockRequest);
final stream = final stream = transport.makeRequest('path', Duration(seconds: 10),
transport.makeRequest('path', Duration(seconds: 10), metadata); metadata, (error) => fail(error.toString()));
final data = List.filled(10, 0); final data = List.filled(10, 0);
stream.outgoingMessages.add(data); stream.outgoingMessages.add(data);
@ -104,8 +106,8 @@ void main() {
final mockRequest = MockHttpRequest(); final mockRequest = MockHttpRequest();
final transport = MockXhrTransport(mockRequest); final transport = MockXhrTransport(mockRequest);
final stream = final stream = transport.makeRequest('test_path', Duration(seconds: 10),
transport.makeRequest('test_path', Duration(seconds: 10), metadata); metadata, (error) => fail(error.toString()));
stream.incomingMessages.listen((message) { stream.incomingMessages.listen((message) {
expect(message, TypeMatcher<GrpcMetadata>()); expect(message, TypeMatcher<GrpcMetadata>());
@ -126,8 +128,8 @@ void main() {
final mockRequest = MockHttpRequest(); final mockRequest = MockHttpRequest();
final transport = MockXhrTransport(mockRequest); final transport = MockXhrTransport(mockRequest);
final stream = final stream = transport.makeRequest('test_path', Duration(seconds: 10),
transport.makeRequest('test_path', Duration(seconds: 10), metadata); metadata, (error) => fail(error.toString()));
final data = List<int>.filled(10, 224); final data = List<int>.filled(10, 224);
final encoded = frame(data); final encoded = frame(data);
final encodedString = String.fromCharCodes(encoded); final encodedString = String.fromCharCodes(encoded);
@ -164,8 +166,8 @@ void main() {
final mockRequest = MockHttpRequest(); final mockRequest = MockHttpRequest();
final transport = MockXhrTransport(mockRequest); final transport = MockXhrTransport(mockRequest);
final stream = final stream = transport.makeRequest('test_path', Duration(seconds: 10),
transport.makeRequest('test_path', Duration(seconds: 10), metadata); metadata, (error) => fail(error.toString()));
final data = <List<int>>[ final data = <List<int>>[
List<int>.filled(10, 224), List<int>.filled(10, 224),

View File

@ -16,7 +16,7 @@
import 'dart:io'; import 'dart:io';
import 'package:grpc/grpc.dart'; import 'package:grpc/src/client/transport/http2_credentials.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';
const isTlsException = const TypeMatcher<TlsException>(); const isTlsException = const TypeMatcher<TlsException>();
@ -28,14 +28,14 @@ void main() {
await new File('test/data/certstore.p12').readAsBytes(); await new File('test/data/certstore.p12').readAsBytes();
final missingPassword = final missingPassword =
new Http2ChannelCredentials.secure(certificates: certificates); ChannelCredentials.secure(certificates: certificates);
expect(() => missingPassword.securityContext, throwsA(isTlsException)); expect(() => missingPassword.securityContext, throwsA(isTlsException));
final wrongPassword = new Http2ChannelCredentials.secure( final wrongPassword = ChannelCredentials.secure(
certificates: certificates, password: 'wrong'); certificates: certificates, password: 'wrong');
expect(() => wrongPassword.securityContext, throwsA(isTlsException)); expect(() => wrongPassword.securityContext, throwsA(isTlsException));
final correctPassword = new Http2ChannelCredentials.secure( final correctPassword = ChannelCredentials.secure(
certificates: certificates, password: 'correct'); certificates: certificates, password: 'correct');
expect(correctPassword.securityContext, isNotNull); expect(correctPassword.securityContext, isNotNull);
}); });

View File

@ -15,13 +15,19 @@
import 'dart:async'; import 'dart:async';
import 'package:grpc/src/client/call.dart';
import 'package:grpc/src/client/channel.dart';
import 'package:grpc/src/client/channel.dart';
import 'package:grpc/src/client/client.dart';
import 'package:grpc/src/client/common.dart';
import 'package:grpc/src/client/connection.dart';
import 'package:grpc/src/client/method.dart';
import 'package:grpc/src/client/options.dart';
import 'package:grpc/src/shared/message.dart'; import 'package:grpc/src/shared/message.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';
import 'package:mockito/mockito.dart'; import 'package:mockito/mockito.dart';
import 'package:grpc/src/client/transport/transport.dart'; import 'package:grpc/src/client/transport/transport.dart';
import 'package:grpc/src/client/channel.dart' show ConnectTransport;
import 'package:grpc/grpc.dart';
import 'utils.dart'; import 'utils.dart';
@ -42,13 +48,13 @@ class FakeConnection extends ClientConnection {
var connectionError; var connectionError;
FakeConnection._(String host, Transport transport, ChannelOptions options, FakeConnection._(String host, Transport transport, ChannelOptions options,
ConnectTransport connectTransport) Future<Transport> Function() connectTransport)
: super(host, 443, options, connectTransport); : super(options, connectTransport);
factory FakeConnection( factory FakeConnection(
String host, Transport transport, ChannelOptions options) { String host, Transport transport, ChannelOptions options) {
FakeConnection f; FakeConnection f;
f = FakeConnection._(host, transport, options, (_, _1, _2) async { f = FakeConnection._(host, transport, options, () async {
if (f.connectionError != null) throw f.connectionError; if (f.connectionError != null) throw f.connectionError;
return transport; return transport;
}); });
@ -59,20 +65,18 @@ class FakeConnection extends ClientConnection {
Duration testBackoff(Duration lastBackoff) => const Duration(milliseconds: 1); Duration testBackoff(Duration lastBackoff) => const Duration(milliseconds: 1);
class FakeChannelOptions implements ChannelOptions { class FakeChannelOptions implements ChannelOptions {
ChannelCredentials credentials = const Http2ChannelCredentials.secure();
Duration idleTimeout = const Duration(seconds: 1); Duration idleTimeout = const Duration(seconds: 1);
BackoffStrategy backoffStrategy = testBackoff; BackoffStrategy backoffStrategy = testBackoff;
} }
class FakeChannel extends ClientChannel { class FakeChannel extends ClientChannelBase {
final ClientConnection connection; final ClientConnection connection;
final FakeChannelOptions options; final FakeChannelOptions options;
FakeChannel(String host, this.connection, this.options) FakeChannel(String host, this.connection, this.options);
: super(host, options: options);
@override @override
Future<ClientConnection> getConnection() async => connection; ClientConnection createConnection() => connection;
} }
class TestClient extends Client { class TestClient extends Client {
@ -134,7 +138,7 @@ class ClientHarness {
stream = new MockStream(); stream = new MockStream();
fromClient = new StreamController(); fromClient = new StreamController();
toClient = new StreamController(); toClient = new StreamController();
when(transport.makeRequest(any, any, any)).thenReturn(stream); when(transport.makeRequest(any, any, any, any)).thenReturn(stream);
when(transport.onActiveStateChanged = captureAny).thenReturn(null); when(transport.onActiveStateChanged = captureAny).thenReturn(null);
when(stream.outgoingMessages).thenReturn(fromClient.sink); when(stream.outgoingMessages).thenReturn(fromClient.sink);
when(stream.incomingMessages).thenAnswer((_) => toClient.stream); when(stream.incomingMessages).thenAnswer((_) => toClient.stream);
@ -191,9 +195,9 @@ class ClientHarness {
expect(result, expectedResult); expect(result, expectedResult);
} }
final capturedParameters = final capturedParameters = verify(transport.makeRequest(
verify(transport.makeRequest(captureAny, captureAny, captureAny)) captureAny, captureAny, captureAny, captureAny))
.captured; .captured;
if (expectedPath != null) { if (expectedPath != null) {
expect(capturedParameters[0], expectedPath); expect(capturedParameters[0], expectedPath);
} }