This commit is contained in:
Steve Browne 2025-04-15 17:10:08 -03:00 committed by GitHub
commit 5e4b0a7539
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 887 additions and 2 deletions

View File

@ -0,0 +1,453 @@
// Copyright (c) 2022, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:convert';
import 'dart:html';
import 'dart:js_util' as js_util;
import 'dart:typed_data';
import 'package:async/async.dart';
import 'package:js/js.dart';
import 'package:meta/meta.dart';
import '../../client/call.dart';
import '../../shared/message.dart';
import '../../shared/status.dart';
import '../connection.dart';
import 'cors.dart' as cors;
import 'transport.dart';
import 'web_streams.dart';
const _contentTypeKey = 'Content-Type';
@JS()
class AbortSignal {
external factory AbortSignal();
external bool get aborted;
}
@JS()
class AbortController {
external factory AbortController();
external void abort([dynamic reason]);
external AbortSignal get signal;
}
@anonymous
// ignore: missing_js_lib_annotation
@JS()
class RequestInit {
external factory RequestInit(
{required String method,
Object? headers,
List<int>? body,
AbortSignal? signal,
required String referrerPolicy,
required String mode,
required String credentials,
required String cache,
required String redirect,
required String integrity,
required bool keepalive});
external String get method;
external set method(String newValue);
external Object? get headers;
external set headers(Object? newValue);
external Uint8List? get body;
external set body(Uint8List? newValue);
external AbortSignal? get signal;
external set signal(AbortSignal? newValue);
external String get referrerPolicy;
external set referrerPolicy(String newValue);
external String get mode;
external set mode(String newValue);
external String get credentials;
external set credentials(String newValue);
external String get cache;
external set cache(String newValue);
external String get redirect;
external set redirect(String newValue);
external String get integrity;
external set integrity(String newValue);
external bool get keepalive;
external set keepalive(bool newValue);
}
/// Implementation of Fetch API simulating @HttpRequest for minimal changes
class FetchHttpRequest {
// Request parameters
var method = 'GET';
var uri = '';
var referrerPolicy = 'origin';
var mode = 'cors';
var credentials = 'omit';
var cache = 'default';
var redirect = 'follow';
var integrity = '';
var keepAlive = true;
var headers = <String, String>{};
var readyState = HttpRequest.UNSENT;
set withCredentials(bool value) => credentials = value ? 'include' : 'omit';
set responseType(String unused) {}
// Streams and controllers
final onReadyStateChangeController = StreamController<int>.broadcast();
Stream<int> get onReadyStateChange => onReadyStateChangeController.stream;
final onProgressController = StreamController<Uint8List>.broadcast();
Stream<Uint8List> get onProgress => onProgressController.stream;
final onErrorController = StreamController<int>.broadcast();
Stream<int> get onError => onErrorController.stream;
// Response information
AbortController? _abortController;
CancelableOperation<dynamic>? _cancelableSend;
dynamic _response;
Uint8List? _lastResponse;
String? get response => responseText;
int get status =>
_response != null ? js_util.getProperty(_response, 'status') : 0;
Map<String, String> get responseHeaders => _response != null
? toDartMap(js_util.getProperty(_response, 'headers'))
: <String, String>{};
String? get responseText => _lastResponse != null
? utf8.decode(_lastResponse!, allowMalformed: true)
: null;
dynamic get body =>
_response != null ? js_util.getProperty(_response, 'body') : null;
static Map<String, String> toDartMap(Headers obj) =>
Map.fromIterable(getObjectKeys(obj),
value: (key) => js_util.callMethod(obj, 'get', [key]).toString());
static List<String> getObjectKeys(Headers obj) {
final keys = js_util.callMethod(obj, 'keys', []);
// This used to work prior to flutter 3.0 now we type check to see if supported
if (keys is Iterable) {
return List<String>.from(keys);
}
// Otherwise we have to fall back and manually iterate through the javascript iterator
final res = List<String>.empty(growable: true);
while (true) {
final next = js_util.callMethod(keys, 'next', []);
if (js_util.getProperty(next, 'done')) {
break;
}
res.add(js_util.getProperty(next, 'value').toString());
}
return res;
}
Future send([List<int>? data]) async {
final doSend = _doSend(data);
_cancelableSend = CancelableOperation.fromFuture(doSend);
await doSend;
}
Future _doSend([List<int>? data]) async {
final wgs = WorkerGlobalScope.instance;
_setReadyState(HttpRequest.LOADING);
_abortController = AbortController();
final init = RequestInit(
cache: cache,
credentials: credentials,
integrity: integrity,
keepalive: keepAlive,
method: method,
mode: mode,
redirect: redirect,
referrerPolicy: referrerPolicy,
signal: _abortController?.signal,
body: data,
headers: js_util.jsify(headers));
_response = await js_util
.promiseToFuture(js_util.callMethod(wgs, 'fetch', [uri, init]))
.onError((error, stackTrace) => null,
test: (error) => _abortController?.signal.aborted ?? false);
if (_response == null || (_cancelableSend?.isCanceled ?? false)) {
return;
}
_setReadyState(HttpRequest.HEADERS_RECEIVED);
if (status < 200 || status >= 300) {
onErrorController.add(status);
}
final stream = body;
final reader =
stream != null ? js_util.callMethod(stream, 'getReader', []) : null;
if (reader == null) {
onErrorController.add(0);
return;
}
while (true) {
final result = await js_util
.promiseToFuture(js_util.callMethod(reader, 'read', []))
.onError((error, stackTrace) => null,
test: (error) => _abortController?.signal.aborted ?? false);
if (result == null || (_cancelableSend?.isCanceled ?? false)) {
return;
}
final value = js_util.getProperty(result, 'value');
if (value != null) {
_lastResponse = value;
onProgressController.add(value as Uint8List);
}
if (js_util.getProperty(result, 'done')) {
_lastResponse ??= Uint8List(0);
_setReadyState(HttpRequest.DONE);
break;
}
}
}
void _setReadyState(int state) {
readyState = state;
onReadyStateChangeController.add(state);
if (state == HttpRequest.DONE) {}
}
void open(String method, String uri) {
this.method = method;
this.uri = uri;
_setReadyState(HttpRequest.OPENED);
}
void abort() async {
_abortController?.abort();
await _cancelableSend?.cancel();
close();
}
void close() {
onReadyStateChangeController.close();
onProgressController.close();
onErrorController.close();
_response = null;
}
void setRequestHeader(String name, String value) {
headers[name] = value;
}
void overrideMimeType(String mimeType) {}
}
class FetchTransportStream implements GrpcTransportStream {
final FetchHttpRequest _request;
final ErrorHandler _onError;
final Function(FetchTransportStream stream) _onDone;
bool _headersReceived = false;
final StreamController<ByteBuffer> _incomingProcessor = StreamController();
final StreamController<GrpcMessage> _incomingMessages = StreamController();
final StreamController<List<int>> _outgoingMessages = StreamController();
@override
Stream<GrpcMessage> get incomingMessages => _incomingMessages.stream;
@override
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
FetchTransportStream(this._request,
{required ErrorHandler onError, required onDone})
: _onError = onError,
_onDone = onDone {
_outgoingMessages.stream
.map(frame)
.listen((data) => _request.send(data), cancelOnError: true);
_request.onReadyStateChange.listen((data) {
if (_incomingProcessor.isClosed) {
return;
}
switch (_request.readyState) {
case HttpRequest.HEADERS_RECEIVED:
_onHeadersReceived();
break;
case HttpRequest.DONE:
_onRequestDone();
_close();
break;
}
});
_request.onError.listen((_) {
if (_incomingProcessor.isClosed) {
return;
}
_onError(GrpcError.unavailable('FetchTransportStream connection-error'),
StackTrace.current);
terminate();
});
_request.onProgress.listen((bytes) {
if (_incomingProcessor.isClosed) {
return;
}
_incomingProcessor.add(bytes.buffer);
});
_incomingProcessor.stream
.transform(GrpcWebDecoder())
.transform(grpcDecompressor())
.listen(_incomingMessages.add,
onError: _onError, onDone: _incomingMessages.close);
}
bool _validateResponseState() {
try {
validateHttpStatusAndContentType(
_request.status, _request.responseHeaders,
rawResponse: _request.responseText);
return true;
} catch (e, st) {
_onError(e, st);
return false;
}
}
void _onHeadersReceived() {
_headersReceived = true;
if (!_validateResponseState()) {
return;
}
_incomingMessages.add(GrpcMetadata(_request.responseHeaders));
}
void _onRequestDone() {
if (!_headersReceived && !_validateResponseState()) {
return;
}
if (_request.response == null) {
_onError(
GrpcError.unavailable('FetchTransportStream request null response',
null, _request.responseText),
StackTrace.current);
return;
}
}
void _close() {
_incomingProcessor.close();
_outgoingMessages.close();
_onDone(this);
}
@override
Future<void> terminate() async {
_close();
_request.abort();
}
}
class FetchClientConnection extends ClientConnection {
final Uri uri;
final _requests = <FetchTransportStream>{};
FetchClientConnection(this.uri);
@override
String get authority => uri.authority;
@override
String get scheme => uri.scheme;
void _initializeRequest(
FetchHttpRequest request, Map<String, String> metadata) {
for (final header in metadata.keys) {
request.setRequestHeader(header, metadata[header]!);
}
// Overriding the mimetype allows us to stream and parse the data
request.overrideMimeType('text/plain; charset=x-user-defined');
request.responseType = 'text';
}
@visibleForTesting
FetchHttpRequest createRequest() => FetchHttpRequest();
@override
GrpcTransportStream makeRequest(String path, Duration? timeout,
Map<String, String> metadata, ErrorHandler onError,
{CallOptions? callOptions}) {
// gRPC-web headers.
if (_getContentTypeHeader(metadata) == null) {
metadata['Content-Type'] = 'application/grpc-web+proto';
metadata['X-User-Agent'] = 'grpc-web-dart/0.1';
metadata['X-Grpc-Web'] = '1';
}
var requestUri = uri.resolve(path);
if (callOptions is WebCallOptions &&
callOptions.bypassCorsPreflight == true) {
requestUri = cors.moveHttpHeadersToQueryParam(metadata, requestUri);
}
final request = createRequest();
request.open('POST', requestUri.toString());
if (callOptions is WebCallOptions && callOptions.withCredentials == true) {
request.withCredentials = true;
}
// Must set headers after calling open().
_initializeRequest(request, metadata);
final transportStream =
FetchTransportStream(request, onError: onError, onDone: _removeStream);
_requests.add(transportStream);
return transportStream;
}
void _removeStream(FetchTransportStream stream) {
_requests.remove(stream);
}
@override
Future<void> terminate() async {
for (var request in List.of(_requests)) {
request.terminate();
}
}
@override
void dispatchCall(ClientCall call) {
call.onConnectionReady(this);
}
@override
Future<void> shutdown() async {}
}
MapEntry<String, String>? _getContentTypeHeader(Map<String, String> metadata) {
for (var entry in metadata.entries) {
if (entry.key.toLowerCase() == _contentTypeKey.toLowerCase()) {
return entry;
}
}
return null;
}

View File

@ -15,7 +15,8 @@
import 'channel.dart';
import 'connection.dart';
import 'transport/xhr_transport.dart';
import 'transport/fetch_transport.dart';
//import 'transport/xhr_transport.dart';
/// A channel to a grpc-web endpoint.
class GrpcWebClientChannel extends ClientChannelBase {
@ -25,6 +26,7 @@ class GrpcWebClientChannel extends ClientChannelBase {
@override
ClientConnection createConnection() {
return XhrClientConnection(uri);
//return XhrClientConnection(uri);
return FetchClientConnection(uri);
}
}

View File

@ -0,0 +1,430 @@
// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
@TestOn('browser')
import 'dart:async';
import 'dart:html';
import 'dart:typed_data';
import 'package:async/async.dart';
import 'package:grpc/src/client/call.dart';
import 'package:grpc/src/client/transport/fetch_transport.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:grpc/src/shared/status.dart';
import 'package:mockito/mockito.dart';
import 'package:stream_transform/stream_transform.dart';
import 'package:test/test.dart';
class MockFetchRequest extends Mock implements FetchHttpRequest {
MockFetchRequest({int? code}) : status = code ?? 200;
// ignore: close_sinks
final readyStateChangeController = StreamController<int>();
// ignore: close_sinks
final progressController = StreamController<Uint8List>();
// ignore: close_sinks
final errorController = StreamController<int>();
@override
Stream<int> get onReadyStateChange => readyStateChangeController.stream;
@override
Stream<Uint8List> get onProgress => progressController.stream;
@override
Stream<int> get onError => errorController.stream;
@override
final int status;
@override
int get readyState =>
super.noSuchMethod(Invocation.getter(#readyState), returnValue: -1);
@override
String? get response =>
super.noSuchMethod(Invocation.getter(#response), returnValue: null);
@override
Map<String, String> get responseHeaders =>
super.noSuchMethod(Invocation.getter(#responseHeaders),
returnValue: <String, String>{});
}
class MockFetchClientConnection extends FetchClientConnection {
MockFetchClientConnection({int? code})
: _statusCode = code ?? 200,
super(Uri.parse('test:8080'));
late MockFetchRequest latestRequest;
final int _statusCode;
@override
FetchHttpRequest createRequest() {
final request = MockFetchRequest(code: _statusCode);
latestRequest = request;
return request;
}
}
void main() {
test('Make request sends correct headers', () async {
final metadata = <String, String>{
'parameter_1': 'value_1',
'parameter_2': 'value_2'
};
final connection = MockFetchClientConnection();
connection.makeRequest('path', Duration(seconds: 10), metadata,
(error, _) => fail(error.toString()));
verify(connection.latestRequest
.setRequestHeader('Content-Type', 'application/grpc-web+proto'));
verify(connection.latestRequest
.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1'));
verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1'));
verify(connection.latestRequest
.overrideMimeType('text/plain; charset=x-user-defined'));
verify(connection.latestRequest.responseType = 'text');
});
test(
'Make request sends correct headers and path if bypassCorsPreflight=true',
() async {
final metadata = {'header_1': 'value_1', 'header_2': 'value_2'};
final connection = MockFetchClientConnection();
connection.makeRequest('path', Duration(seconds: 10), metadata,
(error, _) => fail(error.toString()),
callOptions: WebCallOptions(bypassCorsPreflight: true));
expect(metadata, isEmpty);
verify(connection.latestRequest.open('POST',
'test:path?%24httpHeaders=header_1%3Avalue_1%0D%0Aheader_2%3Avalue_2%0D%0AContent-Type%3Aapplication%2Fgrpc-web%2Bproto%0D%0AX-User-Agent%3Agrpc-web-dart%2F0.1%0D%0AX-Grpc-Web%3A1%0D%0A'));
verify(connection.latestRequest
.overrideMimeType('text/plain; charset=x-user-defined'));
verify(connection.latestRequest.responseType = 'text');
});
test(
'Make request sends correct headers if call options already have '
'Content-Type header', () async {
final metadata = {
'header_1': 'value_1',
'header_2': 'value_2',
'Content-Type': 'application/json+protobuf'
};
final connection = MockFetchClientConnection();
connection.makeRequest('/path', Duration(seconds: 10), metadata,
(error, _) => fail(error.toString()));
expect(metadata, {
'header_1': 'value_1',
'header_2': 'value_2',
'Content-Type': 'application/json+protobuf',
});
});
test('Content-Type header case insensitivity', () async {
final metadata = {
'header_1': 'value_1',
'CONTENT-TYPE': 'application/json+protobuf'
};
final connection = MockFetchClientConnection();
connection.makeRequest('/path', Duration(seconds: 10), metadata,
(error, _) => fail(error.toString()));
expect(metadata, {
'header_1': 'value_1',
'CONTENT-TYPE': 'application/json+protobuf',
});
final lowerMetadata = {
'header_1': 'value_1',
'content-type': 'application/json+protobuf'
};
connection.makeRequest('/path', Duration(seconds: 10), lowerMetadata,
(error, _) => fail(error.toString()));
expect(lowerMetadata, {
'header_1': 'value_1',
'content-type': 'application/json+protobuf',
});
});
test('Make request sends correct headers path if only withCredentials=true',
() async {
final metadata = {'header_1': 'value_1', 'header_2': 'value_2'};
final connection = MockFetchClientConnection();
connection.makeRequest('path', Duration(seconds: 10), metadata,
(error, _) => fail(error.toString()),
callOptions: WebCallOptions(withCredentials: true));
expect(metadata, {
'header_1': 'value_1',
'header_2': 'value_2',
'Content-Type': 'application/grpc-web+proto',
'X-User-Agent': 'grpc-web-dart/0.1',
'X-Grpc-Web': '1'
});
verify(connection.latestRequest
.setRequestHeader('Content-Type', 'application/grpc-web+proto'));
verify(connection.latestRequest
.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1'));
verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1'));
verify(connection.latestRequest.open('POST', 'test:path'));
verify(connection.latestRequest.withCredentials = true);
verify(connection.latestRequest
.overrideMimeType('text/plain; charset=x-user-defined'));
verify(connection.latestRequest.responseType = 'text');
});
test('Sent data converted to stream properly', () async {
final metadata = <String, String>{
'parameter_1': 'value_1',
'parameter_2': 'value_2'
};
final connection = MockFetchClientConnection();
final stream = connection.makeRequest('path', Duration(seconds: 10),
metadata, (error, _) => fail(error.toString()));
final data = List.filled(10, 0);
stream.outgoingMessages.add(data);
await stream.terminate();
final expectedData = frame(data);
expect(verify(connection.latestRequest.send(captureAny)).captured.single,
expectedData);
});
test('Stream handles headers properly', () async {
final responseHeaders = {
'parameter_1': 'value_1',
'parameter_2': 'value_2',
'content-type': 'application/grpc+proto',
};
final transport = MockFetchClientConnection();
final stream = transport.makeRequest('test_path', Duration(seconds: 10), {},
(error, _) => fail(error.toString()));
when(transport.latestRequest.responseHeaders).thenReturn(responseHeaders);
when(transport.latestRequest.response)
.thenReturn(String.fromCharCodes(frame(<int>[])));
// Set expectation for request readyState and generate two readyStateChange
// events, so that incomingMessages stream completes.
final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE];
when(transport.latestRequest.readyState)
.thenAnswer((_) => readyStates.removeAt(0));
transport.latestRequest.readyStateChangeController.add(readyStates.first);
transport.latestRequest.readyStateChangeController.add(readyStates.first);
// Should be only one metadata message with headers augmented with :status
// field.
final message = await stream.incomingMessages.single as GrpcMetadata;
expect(message.metadata, responseHeaders);
});
test('Stream handles trailers properly', () async {
final requestHeaders = {
'parameter_1': 'value_1',
'content-type': 'application/grpc+proto',
};
final responseTrailers = <String, String>{
'trailer_1': 'value_1',
'trailer_2': 'value_2',
};
final connection = MockFetchClientConnection();
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
requestHeaders, (error, _) => fail(error.toString()));
final encodedTrailers = frame(responseTrailers.entries
.map((e) => '${e.key}:${e.value}')
.join('\r\n')
.codeUnits);
encodedTrailers[0] = 0x80; // Mark this frame as trailers.
final encodedString = String.fromCharCodes(encodedTrailers);
when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders);
when(connection.latestRequest.response).thenReturn(encodedString);
// Set expectation for request readyState and generate events so that
// incomingMessages stream completes.
final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE];
when(connection.latestRequest.readyState)
.thenAnswer((_) => readyStates.removeAt(0));
connection.latestRequest.readyStateChangeController.add(readyStates.first);
connection.latestRequest.progressController
.add(Uint8List.fromList(encodedTrailers));
connection.latestRequest.readyStateChangeController.add(readyStates.first);
// Should be two metadata messages: headers and trailers.
final messages =
await stream.incomingMessages.whereType<GrpcMetadata>().toList();
expect(messages.length, 2);
expect(messages.first.metadata, requestHeaders);
expect(messages.last.metadata, responseTrailers);
});
test('Stream handles empty trailers properly', () async {
final requestHeaders = {
'content-type': 'application/grpc+proto',
};
final connection = MockFetchClientConnection();
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
{}, (error, _) => fail(error.toString()));
final encoded = frame(''.codeUnits);
encoded[0] = 0x80; // Mark this frame as trailers.
final encodedString = String.fromCharCodes(encoded);
when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders);
when(connection.latestRequest.response).thenReturn(encodedString);
// Set expectation for request readyState and generate events so that
// incomingMessages stream completes.
final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE];
when(connection.latestRequest.readyState)
.thenAnswer((_) => readyStates.removeAt(0));
connection.latestRequest.readyStateChangeController.add(readyStates.first);
connection.latestRequest.progressController
.add(Uint8List.fromList(encoded));
connection.latestRequest.readyStateChangeController.add(readyStates.first);
// Should be two metadata messages: headers and trailers.
final messages =
await stream.incomingMessages.whereType<GrpcMetadata>().toList();
expect(messages.length, 2);
expect(messages.first.metadata, requestHeaders);
expect(messages.last.metadata, isEmpty);
});
test('Stream deserializes data properly', () async {
final requestHeaders = <String, String>{
'parameter_1': 'value_1',
'parameter_2': 'value_2',
'content-type': 'application/grpc+proto',
};
final connection = MockFetchClientConnection();
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
requestHeaders, (error, _) => fail(error.toString()));
final data = List<int>.filled(10, 224);
when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders);
when(connection.latestRequest.response)
.thenReturn(String.fromCharCodes(frame(data)));
// Set expectation for request readyState and generate events, so that
// incomingMessages stream completes.
final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE];
when(connection.latestRequest.readyState)
.thenAnswer((_) => readyStates.removeAt(0));
connection.latestRequest.readyStateChangeController.add(readyStates.first);
connection.latestRequest.progressController
.add(Uint8List.fromList(frame(data)));
connection.latestRequest.readyStateChangeController.add(readyStates.first);
// Expect a single data message.
final message = await stream.incomingMessages.whereType<GrpcData>().single;
expect(message.data, data);
});
test('GrpcError with error details in response', () async {
final connection = MockFetchClientConnection(code: 400);
final errors = <GrpcError>[];
// The incoming messages stream never completes when there's an error, so
// using completer.
final errorReceived = Completer<void>();
connection.makeRequest('test_path', Duration(seconds: 10), {}, (e, _) {
errorReceived.complete();
errors.add(e as GrpcError);
});
const errorDetails = 'error details';
when(connection.latestRequest.responseHeaders)
.thenReturn({'content-type': 'application/grpc+proto'});
when(connection.latestRequest.readyState).thenReturn(HttpRequest.DONE);
when(connection.latestRequest.responseText).thenReturn(errorDetails);
connection.latestRequest.readyStateChangeController.add(HttpRequest.DONE);
await errorReceived;
expect(errors.single.rawResponse, errorDetails);
});
test('Stream receives multiple messages', () async {
final metadata = <String, String>{
'parameter_1': 'value_1',
'parameter_2': 'value_2',
'content-type': 'application/grpc+proto',
};
final connection = MockFetchClientConnection();
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
metadata, (error, _) => fail(error.toString()));
final data = <List<int>>[
List<int>.filled(10, 224),
List<int>.filled(5, 124)
];
final encodedStrings =
data.map((d) => String.fromCharCodes(frame(d))).toList();
when(connection.latestRequest.responseHeaders).thenReturn(metadata);
when(connection.latestRequest.readyState)
.thenReturn(HttpRequest.HEADERS_RECEIVED);
// At first invocation the response should be the the first message, after
// that first + last messages.
var first = true;
when(connection.latestRequest.response).thenAnswer((_) {
if (first) {
first = false;
return encodedStrings[0];
}
return encodedStrings[0] + encodedStrings[1];
});
final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE];
when(connection.latestRequest.readyState)
.thenAnswer((_) => readyStates.removeAt(0));
final queue = StreamQueue(stream.incomingMessages);
// Headers.
connection.latestRequest.readyStateChangeController.add(readyStates.first);
expect(((await queue.next) as GrpcMetadata).metadata, metadata);
// Data 1.
connection.latestRequest.progressController
.add(Uint8List.fromList(frame(data[0])));
expect(((await queue.next) as GrpcData).data, data[0]);
// Data 2.
connection.latestRequest.progressController
.add(Uint8List.fromList(frame(data[1])));
expect(((await queue.next) as GrpcData).data, data[1]);
// Done.
connection.latestRequest.readyStateChangeController.add(readyStates.first);
expect(await queue.hasNext, isFalse);
});
}