mirror of https://github.com/grpc/grpc-dart.git
Don't use Transport for grpc-web (#177)
This commit is contained in:
parent
af63debc43
commit
839b2ca03c
|
@ -24,8 +24,9 @@ export 'src/auth/auth.dart'
|
|||
export 'src/client/call.dart' show ClientCall;
|
||||
export 'src/client/client.dart' show Client;
|
||||
export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
|
||||
export 'src/client/connection.dart' show ConnectionState, ClientConnection;
|
||||
export 'src/client/connection.dart' show ConnectionState;
|
||||
export 'src/client/http2_channel.dart' show ClientChannel;
|
||||
export 'src/client/http2_connection.dart' show ClientConnection, Http2ClientConnection;
|
||||
export 'src/client/method.dart' show ClientMethod;
|
||||
export 'src/client/options.dart'
|
||||
show
|
||||
|
|
|
@ -24,7 +24,6 @@ export 'src/client/call.dart' show ClientCall;
|
|||
|
||||
export 'src/client/client.dart' show Client;
|
||||
export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
|
||||
export 'src/client/connection.dart' show ClientConnection;
|
||||
export 'src/client/method.dart' show ClientMethod;
|
||||
export 'src/client/options.dart'
|
||||
show
|
||||
|
|
|
@ -39,176 +39,26 @@ enum ConnectionState {
|
|||
shutdown
|
||||
}
|
||||
|
||||
class ClientConnection {
|
||||
final ChannelOptions options;
|
||||
final Future<Transport> Function() _connectTransport;
|
||||
abstract class ClientConnection {
|
||||
String get authority;
|
||||
|
||||
ConnectionState _state = ConnectionState.idle;
|
||||
void Function(ClientConnection connection) onStateChanged;
|
||||
final _pendingCalls = <ClientCall>[];
|
||||
|
||||
Transport _transport;
|
||||
|
||||
/// Used for idle and reconnect timeout, depending on [_state].
|
||||
Timer _timer;
|
||||
Duration _currentReconnectDelay;
|
||||
String get authority => _transport.authority;
|
||||
|
||||
ClientConnection(this.options, this._connectTransport);
|
||||
|
||||
ConnectionState get state => _state;
|
||||
|
||||
void _connect() {
|
||||
if (_state != ConnectionState.idle &&
|
||||
_state != ConnectionState.transientFailure) {
|
||||
return;
|
||||
}
|
||||
_setState(ConnectionState.connecting);
|
||||
_connectTransport().then((transport) {
|
||||
_currentReconnectDelay = null;
|
||||
_transport = transport;
|
||||
_transport.onActiveStateChanged = _handleActiveStateChanged;
|
||||
_transport.onSocketClosed = _handleSocketClosed;
|
||||
_setState(ConnectionState.ready);
|
||||
_pendingCalls.forEach(_startCall);
|
||||
_pendingCalls.clear();
|
||||
}).catchError(_handleConnectionFailure);
|
||||
}
|
||||
|
||||
void dispatchCall(ClientCall call) {
|
||||
switch (_state) {
|
||||
case ConnectionState.ready:
|
||||
_startCall(call);
|
||||
break;
|
||||
case ConnectionState.shutdown:
|
||||
_shutdownCall(call);
|
||||
break;
|
||||
default:
|
||||
_pendingCalls.add(call);
|
||||
if (_state == ConnectionState.idle) {
|
||||
_connect();
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Put [call] on the queue to be dispatched when the connection is ready.
|
||||
void dispatchCall(ClientCall call);
|
||||
|
||||
/// Start a request for [path[ with [metadata].
|
||||
GrpcTransportStream makeRequest(String path, Duration timeout,
|
||||
Map<String, String> metadata, ErrorHandler onRequestFailure) {
|
||||
return _transport.makeRequest(path, timeout, metadata, onRequestFailure);
|
||||
}
|
||||
|
||||
void _startCall(ClientCall call) {
|
||||
if (call.isCancelled) return;
|
||||
call.onConnectionReady(this);
|
||||
}
|
||||
|
||||
void _failCall(ClientCall call, dynamic error) {
|
||||
if (call.isCancelled) return;
|
||||
call.onConnectionError(error);
|
||||
}
|
||||
|
||||
void _shutdownCall(ClientCall call) {
|
||||
_failCall(call, 'Connection shutting down.');
|
||||
}
|
||||
Map<String, String> metadata, ErrorHandler onRequestFailure);
|
||||
|
||||
/// Shuts down this connection.
|
||||
///
|
||||
/// No further calls may be made on this connection, but existing calls
|
||||
/// are allowed to finish.
|
||||
Future<void> shutdown() async {
|
||||
if (_state == ConnectionState.shutdown) return null;
|
||||
_setShutdownState();
|
||||
await _transport?.finish();
|
||||
}
|
||||
Future<void> shutdown();
|
||||
|
||||
/// Terminates this connection.
|
||||
///
|
||||
/// All open calls are terminated immediately, and no further calls may be
|
||||
/// made on this connection.
|
||||
Future<void> terminate() async {
|
||||
_setShutdownState();
|
||||
await _transport?.terminate();
|
||||
}
|
||||
|
||||
void _setShutdownState() {
|
||||
_setState(ConnectionState.shutdown);
|
||||
_cancelTimer();
|
||||
_pendingCalls.forEach(_shutdownCall);
|
||||
_pendingCalls.clear();
|
||||
}
|
||||
|
||||
void _setState(ConnectionState state) {
|
||||
_state = state;
|
||||
if (onStateChanged != null) {
|
||||
onStateChanged(this);
|
||||
}
|
||||
}
|
||||
|
||||
void _handleIdleTimeout() {
|
||||
if (_timer == null || _state != ConnectionState.ready) return;
|
||||
_cancelTimer();
|
||||
_transport?.finish()?.catchError((_) => {}); // TODO(jakobr): Log error.
|
||||
_transport = null;
|
||||
_setState(ConnectionState.idle);
|
||||
}
|
||||
|
||||
void _cancelTimer() {
|
||||
_timer?.cancel();
|
||||
_timer = null;
|
||||
}
|
||||
|
||||
void _handleActiveStateChanged(bool isActive) {
|
||||
if (isActive) {
|
||||
_cancelTimer();
|
||||
} else {
|
||||
if (options.idleTimeout != null) {
|
||||
_timer ??= new Timer(options.idleTimeout, _handleIdleTimeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool _hasPendingCalls() {
|
||||
// Get rid of pending calls that have timed out.
|
||||
_pendingCalls.removeWhere((call) => call.isCancelled);
|
||||
return _pendingCalls.isNotEmpty;
|
||||
}
|
||||
|
||||
void _handleConnectionFailure(error) {
|
||||
_transport = null;
|
||||
if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) {
|
||||
return;
|
||||
}
|
||||
// TODO(jakobr): Log error.
|
||||
_cancelTimer();
|
||||
_pendingCalls.forEach((call) => _failCall(call, error));
|
||||
_pendingCalls.clear();
|
||||
_setState(ConnectionState.idle);
|
||||
}
|
||||
|
||||
void _handleReconnect() {
|
||||
if (_timer == null || _state != ConnectionState.transientFailure) return;
|
||||
_cancelTimer();
|
||||
_connect();
|
||||
}
|
||||
|
||||
void _handleSocketClosed() {
|
||||
_cancelTimer();
|
||||
_transport = null;
|
||||
|
||||
if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) {
|
||||
// All good.
|
||||
return;
|
||||
}
|
||||
|
||||
// We were not planning to close the socket.
|
||||
if (!_hasPendingCalls()) {
|
||||
// No pending calls. Just hop to idle, and wait for a new RPC.
|
||||
_setState(ConnectionState.idle);
|
||||
return;
|
||||
}
|
||||
|
||||
// We have pending RPCs. Reconnect after backoff delay.
|
||||
_setState(ConnectionState.transientFailure);
|
||||
_currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay);
|
||||
_timer = new Timer(_currentReconnectDelay, _handleReconnect);
|
||||
}
|
||||
Future<void> terminate();
|
||||
}
|
||||
|
||||
|
|
|
@ -17,13 +17,14 @@ import 'dart:async';
|
|||
|
||||
import 'channel.dart';
|
||||
import 'connection.dart';
|
||||
import 'http2_connection.dart' show Http2ClientConnection;
|
||||
import 'transport/http2_credentials.dart';
|
||||
import 'transport/http2_transport.dart';
|
||||
import 'transport/transport.dart';
|
||||
|
||||
/// A channel to a virtual gRPC endpoint.
|
||||
///
|
||||
/// For each RPC, the channel picks a [ClientConnection] to dispatch the call.
|
||||
/// For each RPC, the channel picks a [Http2ClientConnection] to dispatch the call.
|
||||
/// RPCs on the same channel may be sent to different connections, depending on
|
||||
/// load balancing settings.
|
||||
class ClientChannel extends ClientChannelBase {
|
||||
|
@ -43,6 +44,6 @@ class ClientChannel extends ClientChannelBase {
|
|||
|
||||
@override
|
||||
ClientConnection createConnection() {
|
||||
return ClientConnection(options, _connectTransport);
|
||||
return Http2ClientConnection(options, _connectTransport);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import 'dart:async';
|
||||
|
||||
import 'call.dart';
|
||||
import 'connection.dart' hide ClientConnection;
|
||||
import 'connection.dart' as connection;
|
||||
import 'options.dart';
|
||||
import 'transport/transport.dart';
|
||||
|
||||
@Deprecated('Use Http2ClientConnection. Will be removed in next breaking release')
|
||||
class ClientConnection extends Http2ClientConnection {
|
||||
ClientConnection(ChannelOptions options, Future<Transport> Function() _connectTransport) : super(options, _connectTransport);
|
||||
}
|
||||
|
||||
class Http2ClientConnection implements connection.ClientConnection {
|
||||
final ChannelOptions options;
|
||||
final Future<Transport> Function() _connectTransport;
|
||||
|
||||
connection.ConnectionState _state = ConnectionState.idle;
|
||||
void Function(Http2ClientConnection connection) onStateChanged;
|
||||
final _pendingCalls = <ClientCall>[];
|
||||
|
||||
Transport _transport;
|
||||
|
||||
/// Used for idle and reconnect timeout, depending on [_state].
|
||||
Timer _timer;
|
||||
Duration _currentReconnectDelay;
|
||||
String get authority => _transport.authority;
|
||||
|
||||
Http2ClientConnection(this.options, this._connectTransport);
|
||||
|
||||
ConnectionState get state => _state;
|
||||
|
||||
void _connect() {
|
||||
if (_state != ConnectionState.idle &&
|
||||
_state != ConnectionState.transientFailure) {
|
||||
return;
|
||||
}
|
||||
_setState(ConnectionState.connecting);
|
||||
_connectTransport().then((transport) {
|
||||
_currentReconnectDelay = null;
|
||||
_transport = transport;
|
||||
_transport.onActiveStateChanged = _handleActiveStateChanged;
|
||||
_transport.onSocketClosed = _handleSocketClosed;
|
||||
_setState(ConnectionState.ready);
|
||||
_pendingCalls.forEach(_startCall);
|
||||
_pendingCalls.clear();
|
||||
}).catchError(_handleConnectionFailure);
|
||||
}
|
||||
|
||||
void dispatchCall(ClientCall call) {
|
||||
switch (_state) {
|
||||
case ConnectionState.ready:
|
||||
_startCall(call);
|
||||
break;
|
||||
case ConnectionState.shutdown:
|
||||
_shutdownCall(call);
|
||||
break;
|
||||
default:
|
||||
_pendingCalls.add(call);
|
||||
if (_state == ConnectionState.idle) {
|
||||
_connect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
GrpcTransportStream makeRequest(String path, Duration timeout,
|
||||
Map<String, String> metadata, ErrorHandler onRequestFailure) {
|
||||
return _transport.makeRequest(path, timeout, metadata, onRequestFailure);
|
||||
}
|
||||
|
||||
void _startCall(ClientCall call) {
|
||||
if (call.isCancelled) return;
|
||||
call.onConnectionReady(this);
|
||||
}
|
||||
|
||||
void _failCall(ClientCall call, dynamic error) {
|
||||
if (call.isCancelled) return;
|
||||
call.onConnectionError(error);
|
||||
}
|
||||
|
||||
void _shutdownCall(ClientCall call) {
|
||||
_failCall(call, 'Connection shutting down.');
|
||||
}
|
||||
|
||||
Future<void> shutdown() async {
|
||||
if (_state == ConnectionState.shutdown) return null;
|
||||
_setShutdownState();
|
||||
await _transport?.finish();
|
||||
}
|
||||
|
||||
Future<void> terminate() async {
|
||||
_setShutdownState();
|
||||
await _transport?.terminate();
|
||||
}
|
||||
|
||||
void _setShutdownState() {
|
||||
_setState(ConnectionState.shutdown);
|
||||
_cancelTimer();
|
||||
_pendingCalls.forEach(_shutdownCall);
|
||||
_pendingCalls.clear();
|
||||
}
|
||||
|
||||
void _setState(ConnectionState state) {
|
||||
_state = state;
|
||||
if (onStateChanged != null) {
|
||||
onStateChanged(this);
|
||||
}
|
||||
}
|
||||
|
||||
void _handleIdleTimeout() {
|
||||
if (_timer == null || _state != ConnectionState.ready) return;
|
||||
_cancelTimer();
|
||||
_transport?.finish()?.catchError((_) => {}); // TODO(jakobr): Log error.
|
||||
_transport = null;
|
||||
_setState(ConnectionState.idle);
|
||||
}
|
||||
|
||||
void _cancelTimer() {
|
||||
_timer?.cancel();
|
||||
_timer = null;
|
||||
}
|
||||
|
||||
void _handleActiveStateChanged(bool isActive) {
|
||||
if (isActive) {
|
||||
_cancelTimer();
|
||||
} else {
|
||||
if (options.idleTimeout != null) {
|
||||
_timer ??= new Timer(options.idleTimeout, _handleIdleTimeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool _hasPendingCalls() {
|
||||
// Get rid of pending calls that have timed out.
|
||||
_pendingCalls.removeWhere((call) => call.isCancelled);
|
||||
return _pendingCalls.isNotEmpty;
|
||||
}
|
||||
|
||||
void _handleConnectionFailure(error) {
|
||||
_transport = null;
|
||||
if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) {
|
||||
return;
|
||||
}
|
||||
// TODO(jakobr): Log error.
|
||||
_cancelTimer();
|
||||
_pendingCalls.forEach((call) => _failCall(call, error));
|
||||
_pendingCalls.clear();
|
||||
_setState(ConnectionState.idle);
|
||||
}
|
||||
|
||||
void _handleReconnect() {
|
||||
if (_timer == null || _state != ConnectionState.transientFailure) return;
|
||||
_cancelTimer();
|
||||
_connect();
|
||||
}
|
||||
|
||||
void _handleSocketClosed() {
|
||||
_cancelTimer();
|
||||
_transport = null;
|
||||
|
||||
if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) {
|
||||
// All good.
|
||||
return;
|
||||
}
|
||||
|
||||
// We were not planning to close the socket.
|
||||
if (!_hasPendingCalls()) {
|
||||
// No pending calls. Just hop to idle, and wait for a new RPC.
|
||||
_setState(ConnectionState.idle);
|
||||
return;
|
||||
}
|
||||
|
||||
// We have pending RPCs. Reconnect after backoff delay.
|
||||
_setState(ConnectionState.transientFailure);
|
||||
_currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay);
|
||||
_timer = new Timer(_currentReconnectDelay, _handleReconnect);
|
||||
}
|
||||
}
|
|
@ -64,6 +64,7 @@ class Http2TransportStream extends GrpcTransportStream {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO(sigurdm): Fold this class into Http2ClientConnection
|
||||
class Http2Transport extends Transport {
|
||||
static final _methodPost = new Header.ascii(':method', 'POST');
|
||||
static final _schemeHttp = new Header.ascii(':scheme', 'http');
|
||||
|
|
|
@ -17,16 +17,19 @@ import 'dart:async';
|
|||
import 'dart:html';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:grpc/src/shared/status.dart';
|
||||
import 'package:grpc/src/client/call.dart';
|
||||
import 'package:meta/meta.dart';
|
||||
|
||||
import '../../shared/message.dart';
|
||||
import '../../shared/status.dart';
|
||||
import '../connection.dart';
|
||||
import 'transport.dart';
|
||||
import 'web_streams.dart';
|
||||
|
||||
class XhrTransportStream implements GrpcTransportStream {
|
||||
final HttpRequest _request;
|
||||
final ErrorHandler _onError;
|
||||
final Function(XhrTransportStream stream) _onDone;
|
||||
int _requestBytesRead = 0;
|
||||
final StreamController<ByteBuffer> _incomingProcessor = StreamController();
|
||||
final StreamController<GrpcMessage> _incomingMessages = StreamController();
|
||||
|
@ -38,7 +41,9 @@ class XhrTransportStream implements GrpcTransportStream {
|
|||
@override
|
||||
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
|
||||
|
||||
XhrTransportStream(this._request, this._onError) {
|
||||
XhrTransportStream(this._request, {onError, onDone})
|
||||
: _onError = onError,
|
||||
_onDone = onDone {
|
||||
_outgoingMessages.stream
|
||||
.map(frame)
|
||||
.listen((data) => _request.send(data), cancelOnError: true);
|
||||
|
@ -120,6 +125,7 @@ class XhrTransportStream implements GrpcTransportStream {
|
|||
_close() {
|
||||
_incomingProcessor.close();
|
||||
_outgoingMessages.close();
|
||||
_onDone(this);
|
||||
}
|
||||
|
||||
@override
|
||||
|
@ -129,23 +135,16 @@ class XhrTransportStream implements GrpcTransportStream {
|
|||
}
|
||||
}
|
||||
|
||||
class XhrTransport extends Transport {
|
||||
class XhrClientConnection extends ClientConnection {
|
||||
final Uri uri;
|
||||
|
||||
HttpRequest _request;
|
||||
final Set<XhrTransportStream> _requests = Set<XhrTransportStream>();
|
||||
|
||||
XhrTransport(this.uri);
|
||||
XhrClientConnection(this.uri);
|
||||
|
||||
String get authority => uri.authority;
|
||||
|
||||
@override
|
||||
Future<void> connect() async {}
|
||||
|
||||
@override
|
||||
Future<void> finish() async {}
|
||||
|
||||
@visibleForTesting
|
||||
void initializeRequest(HttpRequest request, Map<String, String> metadata) {
|
||||
void _initializeRequest(HttpRequest request, Map<String, String> metadata) {
|
||||
for (final header in metadata.keys) {
|
||||
request.setRequestHeader(header, metadata[header]);
|
||||
}
|
||||
|
@ -157,17 +156,40 @@ class XhrTransport extends Transport {
|
|||
request.responseType = 'text';
|
||||
}
|
||||
|
||||
@visibleForTesting
|
||||
HttpRequest createHttpRequest() => HttpRequest();
|
||||
|
||||
@override
|
||||
GrpcTransportStream makeRequest(String path, Duration timeout,
|
||||
Map<String, String> metadata, ErrorHandler onError) {
|
||||
_request = HttpRequest();
|
||||
_request.open('POST', uri.resolve(path).toString());
|
||||
final HttpRequest request = createHttpRequest();
|
||||
request.open('POST', uri.resolve(path).toString());
|
||||
|
||||
initializeRequest(_request, metadata);
|
||||
_initializeRequest(request, metadata);
|
||||
|
||||
return XhrTransportStream(_request, onError);
|
||||
final XhrTransportStream transportStream =
|
||||
XhrTransportStream(request, onError: onError, onDone: _removeStream);
|
||||
_requests.add(transportStream);
|
||||
return transportStream;
|
||||
}
|
||||
|
||||
void _removeStream(XhrTransportStream stream) {
|
||||
_requests.remove(stream);
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> terminate() async {}
|
||||
Future<void> terminate() async {
|
||||
for (XhrTransportStream request in _requests) {
|
||||
request.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void dispatchCall(ClientCall call) {
|
||||
call.onConnectionReady(this);
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> shutdown() async {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ import 'dart:async';
|
|||
import 'channel.dart';
|
||||
import 'connection.dart';
|
||||
import 'options.dart';
|
||||
import 'transport/transport.dart';
|
||||
import 'transport/xhr_transport.dart';
|
||||
|
||||
/// A channel to a grpc-web endpoint.
|
||||
|
@ -29,14 +28,8 @@ class GrpcWebClientChannel extends ClientChannelBase {
|
|||
GrpcWebClientChannel.xhr(this.uri, {this.options: const ChannelOptions()})
|
||||
: super();
|
||||
|
||||
Future<Transport> _connectXhrTransport() async {
|
||||
final result = XhrTransport(uri);
|
||||
await result.connect();
|
||||
return result;
|
||||
}
|
||||
|
||||
@override
|
||||
ClientConnection createConnection() {
|
||||
return ClientConnection(options, _connectXhrTransport);
|
||||
return XhrClientConnection(uri);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,41 +18,41 @@ import 'dart:async';
|
|||
|
||||
import 'dart:html';
|
||||
|
||||
import 'package:grpc/grpc_web.dart';
|
||||
import 'package:grpc/src/client/transport/xhr_transport.dart';
|
||||
import 'package:grpc/src/shared/message.dart';
|
||||
import 'package:mockito/mockito.dart';
|
||||
|
||||
import 'package:test/test.dart';
|
||||
|
||||
class MockHttpRequest extends Mock implements HttpRequest {}
|
||||
|
||||
class MockXhrTransport extends XhrTransport {
|
||||
final StreamController<Event> readyStateChangeStream =
|
||||
StreamController<Event>();
|
||||
final StreamController<ProgressEvent> progressStream =
|
||||
StreamController<ProgressEvent>();
|
||||
|
||||
MockHttpRequest mockRequest;
|
||||
|
||||
MockXhrTransport(this.mockRequest) : super(Uri.parse('test:8080'));
|
||||
class MockHttpRequest extends Mock implements HttpRequest {
|
||||
// ignore: close_sinks
|
||||
StreamController<Event> readyStateChangeController = StreamController<Event>();
|
||||
// ignore: close_sinks
|
||||
StreamController<ProgressEvent> progressController = StreamController<ProgressEvent>();
|
||||
|
||||
@override
|
||||
GrpcTransportStream makeRequest(String path, Duration timeout,
|
||||
Map<String, String> metadata, ErrorHandler onError) {
|
||||
when(mockRequest.onReadyStateChange)
|
||||
.thenAnswer((_) => readyStateChangeStream.stream);
|
||||
when(mockRequest.onProgress).thenAnswer((_) => progressStream.stream);
|
||||
|
||||
initializeRequest(mockRequest, metadata);
|
||||
|
||||
return XhrTransportStream(mockRequest, onError);
|
||||
}
|
||||
Stream<Event> get onReadyStateChange => readyStateChangeController.stream;
|
||||
|
||||
@override
|
||||
Future<void> terminate() async {
|
||||
readyStateChangeStream.close();
|
||||
progressStream.close();
|
||||
Stream<ProgressEvent> get onProgress => progressController.stream;
|
||||
|
||||
@override
|
||||
Stream<ProgressEvent> get onError => StreamController<ProgressEvent>().stream;
|
||||
|
||||
@override
|
||||
int status = 200;
|
||||
}
|
||||
|
||||
class MockXhrClientConnection extends XhrClientConnection {
|
||||
MockXhrClientConnection() : super(Uri.parse('test:8080'));
|
||||
|
||||
MockHttpRequest latestRequest;
|
||||
|
||||
@override
|
||||
createHttpRequest() {
|
||||
final request = MockHttpRequest();
|
||||
latestRequest = request;
|
||||
return request;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,18 +63,17 @@ void main() {
|
|||
'parameter_2': 'value_2'
|
||||
};
|
||||
|
||||
final mockRequest = MockHttpRequest();
|
||||
final transport = MockXhrTransport(mockRequest);
|
||||
final connection = MockXhrClientConnection();
|
||||
|
||||
transport.makeRequest('path', Duration(seconds: 10), metadata,
|
||||
connection.makeRequest('path', Duration(seconds: 10), metadata,
|
||||
(error) => fail(error.toString()));
|
||||
|
||||
verify(mockRequest.setRequestHeader(
|
||||
verify(connection.latestRequest.setRequestHeader(
|
||||
'Content-Type', 'application/grpc-web+proto'));
|
||||
verify(mockRequest.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1'));
|
||||
verify(mockRequest.setRequestHeader('X-Grpc-Web', '1'));
|
||||
verify(mockRequest.overrideMimeType('text/plain; charset=x-user-defined'));
|
||||
verify(mockRequest.responseType = 'text');
|
||||
verify(connection.latestRequest.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1'));
|
||||
verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1'));
|
||||
verify(connection.latestRequest.overrideMimeType('text/plain; charset=x-user-defined'));
|
||||
verify(connection.latestRequest.responseType = 'text');
|
||||
});
|
||||
|
||||
test('Sent data converted to stream properly', () async {
|
||||
|
@ -83,10 +82,9 @@ void main() {
|
|||
'parameter_2': 'value_2'
|
||||
};
|
||||
|
||||
final mockRequest = MockHttpRequest();
|
||||
final transport = MockXhrTransport(mockRequest);
|
||||
final connection = MockXhrClientConnection();
|
||||
|
||||
final stream = transport.makeRequest('path', Duration(seconds: 10),
|
||||
final stream = connection.makeRequest('path', Duration(seconds: 10),
|
||||
metadata, (error) => fail(error.toString()));
|
||||
|
||||
final data = List.filled(10, 0);
|
||||
|
@ -94,7 +92,7 @@ void main() {
|
|||
await stream.terminate();
|
||||
|
||||
final expectedData = frame(data);
|
||||
expect(verify(mockRequest.send(captureAny)).captured.single, expectedData);
|
||||
expect(verify(connection.latestRequest.send(captureAny)).captured.single, expectedData);
|
||||
});
|
||||
|
||||
test('Stream handles headers properly', () async {
|
||||
|
@ -103,8 +101,7 @@ void main() {
|
|||
'parameter_2': 'value_2'
|
||||
};
|
||||
|
||||
final mockRequest = MockHttpRequest();
|
||||
final transport = MockXhrTransport(mockRequest);
|
||||
final transport = MockXhrClientConnection();
|
||||
|
||||
final stream = transport.makeRequest('test_path', Duration(seconds: 10),
|
||||
metadata, (error) => fail(error.toString()));
|
||||
|
@ -125,36 +122,27 @@ void main() {
|
|||
'parameter_2': 'value_2'
|
||||
};
|
||||
|
||||
final mockRequest = MockHttpRequest();
|
||||
final transport = MockXhrTransport(mockRequest);
|
||||
final connection = MockXhrClientConnection();
|
||||
|
||||
final stream = transport.makeRequest('test_path', Duration(seconds: 10),
|
||||
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
|
||||
metadata, (error) => fail(error.toString()));
|
||||
final data = List<int>.filled(10, 224);
|
||||
final encoded = frame(data);
|
||||
final encodedString = String.fromCharCodes(encoded);
|
||||
|
||||
bool dataVerified = false;
|
||||
stream.incomingMessages.listen((message) {
|
||||
stream.incomingMessages.listen(expectAsync1((message) {
|
||||
if (message is GrpcData) {
|
||||
dataVerified = true;
|
||||
expect(message.data, equals(data));
|
||||
}
|
||||
});
|
||||
}, count: 2));
|
||||
|
||||
when(mockRequest.getResponseHeader('Content-Type'))
|
||||
when(connection.latestRequest.getResponseHeader('Content-Type'))
|
||||
.thenReturn('application/grpc+proto');
|
||||
when(mockRequest.responseHeaders).thenReturn(metadata);
|
||||
when(mockRequest.readyState).thenReturn(HttpRequest.HEADERS_RECEIVED);
|
||||
when(mockRequest.response).thenReturn(encodedString);
|
||||
transport.readyStateChangeStream.add(null);
|
||||
transport.progressStream.add(null);
|
||||
|
||||
// Wait for all streams to process
|
||||
await Future.sync(() {});
|
||||
|
||||
await stream.terminate();
|
||||
expect(dataVerified, true);
|
||||
when(connection.latestRequest.responseHeaders).thenReturn(metadata);
|
||||
when(connection.latestRequest.readyState).thenReturn(HttpRequest.HEADERS_RECEIVED);
|
||||
when(connection.latestRequest.response).thenReturn(encodedString);
|
||||
connection.latestRequest.readyStateChangeController.add(null);
|
||||
connection.latestRequest.progressController.add(null);
|
||||
});
|
||||
|
||||
test('Stream recieves multiple messages', () async {
|
||||
|
@ -163,10 +151,9 @@ void main() {
|
|||
'parameter_2': 'value_2'
|
||||
};
|
||||
|
||||
final mockRequest = MockHttpRequest();
|
||||
final transport = MockXhrTransport(mockRequest);
|
||||
final connection = MockXhrClientConnection();
|
||||
|
||||
final stream = transport.makeRequest('test_path', Duration(seconds: 10),
|
||||
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
|
||||
metadata, (error) => fail(error.toString()));
|
||||
|
||||
final data = <List<int>>[
|
||||
|
@ -175,42 +162,35 @@ void main() {
|
|||
];
|
||||
final encoded = data.map((d) => frame(d));
|
||||
final encodedStrings = encoded.map((e) => String.fromCharCodes(e)).toList();
|
||||
// to start - expected response is the first message
|
||||
var expectedResponse = encodedStrings[0];
|
||||
|
||||
final expectedMessages = <GrpcMessage>[
|
||||
GrpcMetadata(metadata),
|
||||
GrpcData(data[0]),
|
||||
GrpcData(data[1])
|
||||
];
|
||||
stream.incomingMessages.listen((message) {
|
||||
final expectedMessage = expectedMessages.removeAt(0);
|
||||
int i = 0;
|
||||
stream.incomingMessages.listen(expectAsync1((message) {
|
||||
final expectedMessage = expectedMessages[i];
|
||||
i++;
|
||||
expect(message.runtimeType, expectedMessage.runtimeType);
|
||||
if (message is GrpcMetadata) {
|
||||
expect(message.metadata, (expectedMessage as GrpcMetadata).metadata);
|
||||
} else if (message is GrpcData) {
|
||||
expect(message.data, (expectedMessage as GrpcData).data);
|
||||
}
|
||||
});
|
||||
}, count: expectedMessages.length));
|
||||
|
||||
when(mockRequest.getResponseHeader('Content-Type'))
|
||||
when(connection.latestRequest.getResponseHeader('Content-Type'))
|
||||
.thenReturn('application/grpc+proto');
|
||||
when(mockRequest.responseHeaders).thenReturn(metadata);
|
||||
when(mockRequest.readyState).thenReturn(HttpRequest.HEADERS_RECEIVED);
|
||||
when(mockRequest.response).thenAnswer((_) => expectedResponse);
|
||||
transport.readyStateChangeStream.add(null);
|
||||
transport.progressStream.add(null);
|
||||
// Wait for all streams to process
|
||||
await Future.sync(() {});
|
||||
when(connection.latestRequest.responseHeaders).thenReturn(metadata);
|
||||
when(connection.latestRequest.readyState).thenReturn(HttpRequest.HEADERS_RECEIVED);
|
||||
// At first - expected response is the first message
|
||||
when(connection.latestRequest.response).thenAnswer((_) => encodedStrings[0]);
|
||||
connection.latestRequest.readyStateChangeController.add(null);
|
||||
connection.latestRequest.progressController.add(null);
|
||||
|
||||
// After the first call, expected response should now be both responses together
|
||||
expectedResponse = encodedStrings[0] + encodedStrings[1];
|
||||
transport.progressStream.add(null);
|
||||
|
||||
// Wait for all streams to process
|
||||
await Future.sync(() {});
|
||||
|
||||
await stream.terminate();
|
||||
expect(expectedMessages.isEmpty, isTrue);
|
||||
when(connection.latestRequest.response).thenAnswer((_) => encodedStrings[0] + encodedStrings[1]);
|
||||
connection.latestRequest.progressController.add(null);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import 'package:grpc/src/client/channel.dart';
|
|||
import 'package:grpc/src/client/client.dart';
|
||||
import 'package:grpc/src/client/common.dart';
|
||||
import 'package:grpc/src/client/connection.dart';
|
||||
import 'package:grpc/src/client/http2_connection.dart';
|
||||
import 'package:grpc/src/client/method.dart';
|
||||
import 'package:grpc/src/client/options.dart';
|
||||
import 'package:grpc/src/shared/message.dart';
|
||||
|
@ -44,7 +45,7 @@ class MockTransport extends Mock implements Transport {}
|
|||
|
||||
class MockStream extends Mock implements GrpcTransportStream {}
|
||||
|
||||
class FakeConnection extends ClientConnection {
|
||||
class FakeConnection extends Http2ClientConnection {
|
||||
var connectionError;
|
||||
|
||||
FakeConnection._(String host, Transport transport, ChannelOptions options,
|
||||
|
@ -70,13 +71,13 @@ class FakeChannelOptions implements ChannelOptions {
|
|||
}
|
||||
|
||||
class FakeChannel extends ClientChannelBase {
|
||||
final ClientConnection connection;
|
||||
final Http2ClientConnection connection;
|
||||
final FakeChannelOptions options;
|
||||
|
||||
FakeChannel(String host, this.connection, this.options);
|
||||
|
||||
@override
|
||||
ClientConnection createConnection() => connection;
|
||||
Http2ClientConnection createConnection() => connection;
|
||||
}
|
||||
|
||||
class TestClient extends Client {
|
||||
|
|
Loading…
Reference in New Issue