Revert "Support grpc-web in pure dart (#287)" (#351)

This reverts commit c513e1467f.

The original commit has broken streaming due to limitations of package:http.
This commit is contained in:
Vyacheslav Egorov 2020-09-17 19:59:15 +02:00 committed by GitHub
parent 6fa4616bac
commit 3414356950
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 165 additions and 142 deletions

View File

@ -14,9 +14,9 @@
// limitations under the License. // limitations under the License.
import 'dart:async'; import 'dart:async';
import 'dart:html';
import 'dart:typed_data'; import 'dart:typed_data';
import 'package:http/http.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import '../../client/call.dart'; import '../../client/call.dart';
@ -27,10 +27,10 @@ import 'transport.dart';
import 'web_streams.dart'; import 'web_streams.dart';
class XhrTransportStream implements GrpcTransportStream { class XhrTransportStream implements GrpcTransportStream {
final Client _client; final HttpRequest _request;
final Request _request;
final ErrorHandler _onError; final ErrorHandler _onError;
final Function(XhrTransportStream stream) _onDone; final Function(XhrTransportStream stream) _onDone;
int _requestBytesRead = 0;
final StreamController<ByteBuffer> _incomingProcessor = StreamController(); final StreamController<ByteBuffer> _incomingProcessor = StreamController();
final StreamController<GrpcMessage> _incomingMessages = StreamController(); final StreamController<GrpcMessage> _incomingMessages = StreamController();
final StreamController<List<int>> _outgoingMessages = StreamController(); final StreamController<List<int>> _outgoingMessages = StreamController();
@ -41,34 +41,53 @@ class XhrTransportStream implements GrpcTransportStream {
@override @override
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink; StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
XhrTransportStream(this._client, this._request, {onError, onDone}) XhrTransportStream(this._request, {onError, onDone})
: _onError = onError, : _onError = onError,
_onDone = onDone { _onDone = onDone {
final asyncOnError = (e, st) { _outgoingMessages.stream
.map(frame)
.listen((data) => _request.send(data), cancelOnError: true);
_request.onReadyStateChange.listen((data) {
if (_incomingMessages.isClosed) {
return;
}
switch (_request.readyState) {
case HttpRequest.HEADERS_RECEIVED:
_onHeadersReceived();
break;
case HttpRequest.DONE:
if (_request.status != 200) {
_onError(GrpcError.unavailable(
'XhrConnection status ${_request.status}'));
} else {
_close();
}
break;
}
});
_request.onError.listen((ProgressEvent event) {
if (_incomingMessages.isClosed) { if (_incomingMessages.isClosed) {
return; return;
} }
_onError(GrpcError.unavailable('XhrConnection connection-error')); _onError(GrpcError.unavailable('XhrConnection connection-error'));
terminate(); terminate();
}; });
_outgoingMessages.stream.map(frame).listen((data) {
_request.bodyBytes = data; _request.onProgress.listen((_) {
var firstMessage = true;
_client.send(_request).then((response) {
if (_incomingMessages.isClosed) { if (_incomingMessages.isClosed) {
return; return;
} }
if (firstMessage) { // Use response over responseText as most browsers don't support
if (!_onHeadersReceived(response)) { // using responseText during an onProgress event.
return; final responseString = _request.response as String;
} final bytes = Uint8List.fromList(
} responseString.substring(_requestBytesRead).codeUnits)
firstMessage = false; .buffer;
response.stream.listen((data) { _requestBytesRead = responseString.length;
_incomingProcessor.add(Uint8List.fromList(data).buffer); _incomingProcessor.add(bytes);
}, onDone: _close); });
}).catchError(asyncOnError);
}, cancelOnError: true, onError: asyncOnError);
_incomingProcessor.stream _incomingProcessor.stream
.transform(GrpcWebDecoder()) .transform(GrpcWebDecoder())
@ -77,27 +96,30 @@ class XhrTransportStream implements GrpcTransportStream {
onError: _onError, onDone: _incomingMessages.close); onError: _onError, onDone: _incomingMessages.close);
} }
bool _onHeadersReceived(StreamedResponse response) { _onHeadersReceived() {
final contentType = response.headers['content-type']; final contentType = _request.getResponseHeader('Content-Type');
if (response.statusCode != 200) { if (_request.status != 200) {
_onError( _onError(
GrpcError.unavailable('XhrConnection status ${response.statusCode}')); GrpcError.unavailable('XhrConnection status ${_request.status}'));
return false; return;
} }
if (contentType == null) { if (contentType == null) {
_onError(GrpcError.unavailable('XhrConnection missing Content-Type')); _onError(GrpcError.unavailable('XhrConnection missing Content-Type'));
return false; return;
} }
if (!contentType.startsWith('application/grpc')) { if (!contentType.startsWith('application/grpc')) {
_onError( _onError(
GrpcError.unavailable('XhrConnection bad Content-Type $contentType')); GrpcError.unavailable('XhrConnection bad Content-Type $contentType'));
return false; return;
}
if (_request.response == null) {
_onError(GrpcError.unavailable('XhrConnection request null response'));
return;
} }
// Force a metadata message with headers. // Force a metadata message with headers.
final headers = GrpcMetadata(response.headers); final headers = GrpcMetadata(_request.responseHeaders);
_incomingMessages.add(headers); _incomingMessages.add(headers);
return true;
} }
_close() { _close() {
@ -109,47 +131,45 @@ class XhrTransportStream implements GrpcTransportStream {
@override @override
Future<void> terminate() async { Future<void> terminate() async {
_close(); _close();
_request.abort();
} }
} }
class XhrClientConnection extends ClientConnection { class XhrClientConnection extends ClientConnection {
final Uri uri; final Uri uri;
Client _client;
final Set<XhrTransportStream> _requests = Set<XhrTransportStream>(); final Set<XhrTransportStream> _requests = Set<XhrTransportStream>();
XhrClientConnection(this.uri) { XhrClientConnection(this.uri);
_client = createClient();
}
String get authority => uri.authority; String get authority => uri.authority;
String get scheme => uri.scheme; String get scheme => uri.scheme;
void _initializeRequest(Request request, Map<String, String> metadata) { void _initializeRequest(HttpRequest request, Map<String, String> metadata) {
for (final header in metadata.keys) { for (final header in metadata.keys) {
request.headers[header] = metadata[header]; request.setRequestHeader(header, metadata[header]);
} }
request.headers['Content-Type'] = 'application/grpc-web+proto'; request.setRequestHeader('Content-Type', 'application/grpc-web+proto');
request.headers['X-User-Agent'] = 'grpc-web-dart/0.1'; request.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1');
request.headers['X-Grpc-Web'] = '1'; request.setRequestHeader('X-Grpc-Web', '1');
// Overriding the mimetype allows us to stream and parse the data
request.overrideMimeType('text/plain; charset=x-user-defined');
request.responseType = 'text';
} }
@visibleForTesting @visibleForTesting
Request createHttpRequest(String path) => Request('POST', uri.resolve(path)); HttpRequest createHttpRequest() => HttpRequest();
@visibleForTesting
Client createClient() => Client();
@override @override
GrpcTransportStream makeRequest(String path, Duration timeout, GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onError) { Map<String, String> metadata, ErrorHandler onError) {
final Request request = createHttpRequest(path); final HttpRequest request = createHttpRequest();
request.open('POST', uri.resolve(path).toString());
_initializeRequest(request, metadata); _initializeRequest(request, metadata);
final XhrTransportStream transportStream = XhrTransportStream( final XhrTransportStream transportStream =
_client, request, XhrTransportStream(request, onError: onError, onDone: _removeStream);
onError: onError, onDone: _removeStream);
_requests.add(transportStream); _requests.add(transportStream);
return transportStream; return transportStream;
} }

View File

@ -12,32 +12,49 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
@TestOn('browser')
import 'dart:async'; import 'dart:async';
import 'dart:html';
import 'package:grpc/src/client/transport/xhr_transport.dart'; import 'package:grpc/src/client/transport/xhr_transport.dart';
import 'package:grpc/src/shared/message.dart'; import 'package:grpc/src/shared/message.dart';
import 'package:http/http.dart';
import 'package:mockito/mockito.dart'; import 'package:mockito/mockito.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';
class MockClient extends Mock implements Client {} class MockHttpRequest extends Mock implements HttpRequest {
// ignore: close_sinks
StreamController<Event> readyStateChangeController =
StreamController<Event>();
// ignore: close_sinks
StreamController<ProgressEvent> progressController =
StreamController<ProgressEvent>();
class MockRequest extends Mock implements Request {} @override
Stream<Event> get onReadyStateChange => readyStateChangeController.stream;
@override
Stream<ProgressEvent> get onProgress => progressController.stream;
@override
Stream<ProgressEvent> get onError => StreamController<ProgressEvent>().stream;
@override
int status = 200;
}
class MockXhrClientConnection extends XhrClientConnection { class MockXhrClientConnection extends XhrClientConnection {
MockXhrClientConnection() : super(Uri.parse('test:8080')); MockXhrClientConnection() : super(Uri.parse('test:8080'));
MockRequest latestRequest = MockRequest(); MockHttpRequest latestRequest;
final client = MockClient();
@override @override
createHttpRequest(String path) { createHttpRequest() {
return latestRequest; final request = MockHttpRequest();
} latestRequest = request;
return request;
@override
createClient() {
return client;
} }
} }
@ -49,16 +66,18 @@ void main() {
}; };
final connection = MockXhrClientConnection(); final connection = MockXhrClientConnection();
when(connection.latestRequest.headers).thenReturn({});
connection.makeRequest('path', Duration(seconds: 10), metadata, connection.makeRequest('path', Duration(seconds: 10), metadata,
(error) => fail(error.toString())); (error) => fail(error.toString()));
expect(connection.latestRequest.headers['Content-Type'], verify(connection.latestRequest
'application/grpc-web+proto'); .setRequestHeader('Content-Type', 'application/grpc-web+proto'));
expect( verify(connection.latestRequest
connection.latestRequest.headers['X-User-Agent'], 'grpc-web-dart/0.1'); .setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1'));
expect(connection.latestRequest.headers['X-Grpc-Web'], '1'); verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1'));
verify(connection.latestRequest
.overrideMimeType('text/plain; charset=x-user-defined'));
verify(connection.latestRequest.responseType = 'text');
}); });
test('Sent data converted to stream properly', () async { test('Sent data converted to stream properly', () async {
@ -68,41 +87,29 @@ void main() {
}; };
final connection = MockXhrClientConnection(); final connection = MockXhrClientConnection();
when(connection.latestRequest.headers).thenReturn({});
final stream = connection.makeRequest('path', Duration(seconds: 10), final stream = connection.makeRequest('path', Duration(seconds: 10),
metadata, (error) => fail(error.toString())); metadata, (error) => fail(error.toString()));
when(connection.client.send(captureAny)).thenAnswer(
(_) => Future.value(StreamedResponse(Stream.fromIterable([]), 200)));
final data = List.filled(10, 0); final data = List.filled(10, 0);
final expectedData = frame(data);
stream.outgoingMessages.add(data); stream.outgoingMessages.add(data);
await stream.terminate(); await stream.terminate();
verify(connection.latestRequest.bodyBytes = expectedData); final expectedData = frame(data);
expect(verify(connection.latestRequest.send(captureAny)).captured.single,
expectedData);
}); });
test('Stream handles headers properly', () async { test('Stream handles headers properly', () async {
final metadata = <String, String>{ final metadata = <String, String>{
'parameter_1': 'value_1', 'parameter_1': 'value_1',
'parameter_2': 'value_2', 'parameter_2': 'value_2'
'content-type': 'application/grpc+proto',
}; };
final transport = MockXhrClientConnection(); final transport = MockXhrClientConnection();
when(transport.latestRequest.headers).thenReturn({});
final stream = transport.makeRequest('test_path', Duration(seconds: 10), final stream = transport.makeRequest('test_path', Duration(seconds: 10),
metadata, (error) => fail(error.toString())); metadata, (error) => fail(error.toString()));
when(transport.client.send(captureAny)).thenAnswer((_) {
return Future.value(
StreamedResponse(Stream.fromIterable([]), 200, headers: metadata));
});
final data = List.filled(10, 0);
stream.outgoingMessages.add(data);
stream.incomingMessages.listen((message) { stream.incomingMessages.listen((message) {
expect(message, TypeMatcher<GrpcMetadata>()); expect(message, TypeMatcher<GrpcMetadata>());
@ -117,76 +124,63 @@ void main() {
test('Stream handles trailers properly', () async { test('Stream handles trailers properly', () async {
final trailers = <String, String>{ final trailers = <String, String>{
'trailer_1': 'value_1', 'trailer_1': 'value_1',
'trailer_2': 'value_2', 'trailer_2': 'value_2'
}; };
final connection = MockXhrClientConnection(); final connection = MockXhrClientConnection();
when(connection.latestRequest.headers).thenReturn({});
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
{}, (error) => fail(error.toString()));
final encodedTrailers = frame(trailers.entries final encodedTrailers = frame(trailers.entries
.map((e) => '${e.key}:${e.value}') .map((e) => '${e.key}:${e.value}')
.join('\r\n') .join('\r\n')
.codeUnits); .codeUnits);
encodedTrailers[0] = 0x80; // Mark this frame as trailers. encodedTrailers[0] = 0x80; // Mark this frame as trailers.
final response = StreamedResponse( final encodedString = String.fromCharCodes(encodedTrailers);
Future.value(encodedTrailers).asStream(), 200,
headers: {'content-type': 'application/grpc+proto'});
when(connection.client.send(connection.latestRequest))
.thenAnswer((_) => Future.value(response));
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
{}, (error) => fail(error.toString()));
final data = List.filled(10, 0);
stream.outgoingMessages.add(data);
bool first = true;
stream.incomingMessages.listen((message) { stream.incomingMessages.listen((message) {
expect(message, TypeMatcher<GrpcMetadata>()); expect(message, TypeMatcher<GrpcMetadata>());
if (message is GrpcMetadata) { if (message is GrpcMetadata) {
if (first) {
expect(message.metadata.length, 1);
expect(message.metadata.entries.first.key, 'content-type');
expect(
message.metadata.entries.first.value, 'application/grpc+proto');
first = false;
} else {
message.metadata.forEach((key, value) { message.metadata.forEach((key, value) {
expect(value, trailers[key]); expect(value, trailers[key]);
}); });
} }
}
}); });
when(connection.latestRequest.getResponseHeader('Content-Type'))
.thenReturn('application/grpc+proto');
when(connection.latestRequest.responseHeaders).thenReturn({});
when(connection.latestRequest.readyState)
.thenReturn(HttpRequest.HEADERS_RECEIVED);
when(connection.latestRequest.response).thenReturn(encodedString);
connection.latestRequest.readyStateChangeController.add(null);
connection.latestRequest.progressController.add(null);
}); });
test('Stream handles empty trailers properly', () async { test('Stream handles empty trailers properly', () async {
final connection = MockXhrClientConnection(); final connection = MockXhrClientConnection();
when(connection.latestRequest.headers).thenReturn({});
final stream = connection.makeRequest('test_path', Duration(seconds: 10), final stream = connection.makeRequest('test_path', Duration(seconds: 10),
{}, (error) => fail(error.toString())); {}, (error) => fail(error.toString()));
final encoded = frame(''.codeUnits); final encoded = frame(''.codeUnits);
encoded[0] = 0x80; // Mark this frame as trailers. encoded[0] = 0x80; // Mark this frame as trailers.
final response = StreamedResponse(Future.value(encoded).asStream(), 200, final encodedString = String.fromCharCodes(encoded);
headers: {'content-type': 'application/grpc+proto'});
when(connection.client.send(connection.latestRequest))
.thenAnswer((_) => Future.value(response));
final data = List.filled(10, 0);
stream.outgoingMessages.add(data);
bool trailer = false;
stream.incomingMessages.listen((message) { stream.incomingMessages.listen((message) {
expect(message, TypeMatcher<GrpcMetadata>()); expect(message, TypeMatcher<GrpcMetadata>());
if (message is GrpcMetadata) { if (message is GrpcMetadata) {
if (trailer) { message.metadata.isEmpty;
expect(message.metadata.isEmpty, true);
}
trailer = true;
} }
}); });
when(connection.latestRequest.getResponseHeader('Content-Type'))
.thenReturn('application/grpc+proto');
when(connection.latestRequest.responseHeaders).thenReturn({});
when(connection.latestRequest.readyState)
.thenReturn(HttpRequest.HEADERS_RECEIVED);
when(connection.latestRequest.response).thenReturn(encodedString);
connection.latestRequest.readyStateChangeController.add(null);
connection.latestRequest.progressController.add(null);
}); });
test('Stream deserializes data properly', () async { test('Stream deserializes data properly', () async {
@ -196,36 +190,36 @@ void main() {
}; };
final connection = MockXhrClientConnection(); final connection = MockXhrClientConnection();
when(connection.latestRequest.headers).thenReturn({});
final stream = connection.makeRequest('test_path', Duration(seconds: 10), final stream = connection.makeRequest('test_path', Duration(seconds: 10),
metadata, (error) => fail(error.toString())); metadata, (error) => fail(error.toString()));
final data = List<int>.filled(10, 224); final data = List<int>.filled(10, 224);
final encoded = frame(data); final encoded = frame(data);
final response = StreamedResponse(Future.value(encoded).asStream(), 200, final encodedString = String.fromCharCodes(encoded);
headers: {'content-type': 'application/grpc+proto'});
stream.outgoingMessages.add(data);
when(connection.client.send(connection.latestRequest))
.thenAnswer((_) => Future.value(response));
stream.incomingMessages.listen(expectAsync1((message) { stream.incomingMessages.listen(expectAsync1((message) {
if (message is GrpcData) { if (message is GrpcData) {
expect(message.data, equals(data)); expect(message.data, equals(data));
} }
}, count: 2)); }, count: 2));
when(connection.latestRequest.getResponseHeader('Content-Type'))
.thenReturn('application/grpc+proto');
when(connection.latestRequest.responseHeaders).thenReturn(metadata);
when(connection.latestRequest.readyState)
.thenReturn(HttpRequest.HEADERS_RECEIVED);
when(connection.latestRequest.response).thenReturn(encodedString);
connection.latestRequest.readyStateChangeController.add(null);
connection.latestRequest.progressController.add(null);
}); });
test('Stream recieves multiple messages', () async { test('Stream recieves multiple messages', () async {
final metadata = <String, String>{ final metadata = <String, String>{
'parameter_1': 'value_1', 'parameter_1': 'value_1',
'parameter_2': 'value_2', 'parameter_2': 'value_2'
'content-type': 'application/grpc+proto',
}; };
final connection = MockXhrClientConnection(); final connection = MockXhrClientConnection();
when(connection.latestRequest.headers).thenReturn({});
final stream = connection.makeRequest('test_path', Duration(seconds: 10), final stream = connection.makeRequest('test_path', Duration(seconds: 10),
metadata, (error) => fail(error.toString())); metadata, (error) => fail(error.toString()));
@ -235,14 +229,7 @@ void main() {
List<int>.filled(5, 124) List<int>.filled(5, 124)
]; ];
final encoded = data.map((d) => frame(d)); final encoded = data.map((d) => frame(d));
final response = final encodedStrings = encoded.map((e) => String.fromCharCodes(e)).toList();
StreamedResponse(Stream.fromIterable(encoded), 200, headers: metadata);
when(connection.client.send(connection.latestRequest))
.thenAnswer((_) => Future.value(response));
final outData = List.filled(10, 0);
stream.outgoingMessages.add(outData);
final expectedMessages = <GrpcMessage>[ final expectedMessages = <GrpcMessage>[
GrpcMetadata(metadata), GrpcMetadata(metadata),
@ -260,5 +247,21 @@ void main() {
expect(message.data, (expectedMessage as GrpcData).data); expect(message.data, (expectedMessage as GrpcData).data);
} }
}, count: expectedMessages.length)); }, count: expectedMessages.length));
when(connection.latestRequest.getResponseHeader('Content-Type'))
.thenReturn('application/grpc+proto');
when(connection.latestRequest.responseHeaders).thenReturn(metadata);
when(connection.latestRequest.readyState)
.thenReturn(HttpRequest.HEADERS_RECEIVED);
// At first - expected response is the first message
when(connection.latestRequest.response)
.thenAnswer((_) => encodedStrings[0]);
connection.latestRequest.readyStateChangeController.add(null);
connection.latestRequest.progressController.add(null);
// After the first call, expected response should now be both responses together
when(connection.latestRequest.response)
.thenAnswer((_) => encodedStrings[0] + encodedStrings[1]);
connection.latestRequest.progressController.add(null);
}); });
} }