Handle request errors (#160)

This commit is contained in:
Sigurd Meldgaard 2019-03-22 14:06:33 +01:00 committed by GitHub
parent d58659507c
commit 2b7e261f3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 115 additions and 79 deletions

View File

@ -11,8 +11,13 @@ class EchoApp {
Future<void> echo(String message) async {
_addLeftMessage(message);
final response = await _service.echo(new EchoRequest()..message = message);
_addRightMessage(response.message);
try {
final response =
await _service.echo(new EchoRequest()..message = message);
_addRightMessage(response.message);
} catch (error) {
_addRightMessage(error.toString());
}
}
void repeatEcho(String message, int count) {
@ -23,6 +28,8 @@ class EchoApp {
..messageInterval = 500;
_service.serverStreamingEcho(request).listen((response) {
_addRightMessage(response.message);
}, onError: (error) {
_addRightMessage(error.toString());
}, onDone: () => print('Closed connection to server.'));
}

View File

@ -112,7 +112,8 @@ class ClientCall<Q, R> implements Response {
void _sendRequest(ClientConnection connection, Map<String, String> metadata) {
try {
_stream = connection.makeRequest(path, options.timeout, metadata);
_stream = connection.makeRequest(
path, options.timeout, metadata, _onRequestError);
} catch (e) {
_terminateWithError(new GrpcError.unavailable('Error making call: $e'));
return;
@ -249,7 +250,7 @@ class ClientCall<Q, R> implements Response {
/// Error handler for the requests stream. Something went wrong while trying
/// to send the request to the server. Abort the request, and forward the
/// error to the user code on the [_responses] stream.
void _onRequestError(error) {
void _onRequestError(error, [StackTrace stackTrace]) {
if (error is! GrpcError) {
error = new GrpcError.unknown(error.toString());
}

View File

@ -69,7 +69,8 @@ abstract class ClientChannel {
/// The connection may be shared between multiple RPCs.
Future<ClientConnection> getConnection() async {
if (_isShutdown) throw new GrpcError.unavailable('Channel shutting down.');
return _connection ??= new ClientConnection(host, port, options, connectTransport);
return _connection ??=
new ClientConnection(host, port, options, connectTransport);
}
/// Initiates a new RPC on this connection.

View File

@ -16,9 +16,7 @@
import 'dart:async';
import 'package:grpc/src/client/channel.dart';
import 'package:meta/meta.dart';
import '../shared/status.dart';
import 'call.dart';
import 'options.dart';
@ -99,9 +97,9 @@ class ClientConnection {
}
}
GrpcTransportStream makeRequest(
String path, Duration timeout, Map<String, String> metadata) {
return _transport.makeRequest(path, timeout, metadata);
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onRequestFailure) {
return _transport.makeRequest(path, timeout, metadata, onRequestFailure);
}
void _startCall(ClientCall call) {

View File

@ -30,17 +30,15 @@ import 'http2_credentials.dart';
import 'transport.dart';
class Http2TransportStream extends GrpcTransportStream {
TransportStream _transportStream;
StreamController<GrpcMessage> _incomingMessages;
StreamController<List<int>> _outgoingMessages;
final TransportStream _transportStream;
final StreamController<GrpcMessage> _incomingMessages = StreamController();
final StreamController<List<int>> _outgoingMessages = StreamController();
final ErrorHandler _onError;
Stream<GrpcMessage> get incomingMessages => _incomingMessages.stream;
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
Http2TransportStream(this._transportStream) {
_incomingMessages = new StreamController();
_outgoingMessages = new StreamController();
Http2TransportStream(this._transportStream, this._onError) {
_transportStream.incomingMessages
.transform(new GrpcHttpDecoder())
.transform(grpcDecompressor())
@ -56,8 +54,8 @@ class Http2TransportStream extends GrpcTransportStream {
cancelOnError: true);
}
void _onRequestError() {
// TODO: Implement errors on requests
void _onRequestError(error) {
_onError(error);
}
@override
@ -132,12 +130,12 @@ class Http2Transport extends Transport {
}
@override
GrpcTransportStream makeRequest(
String path, Duration timeout, Map<String, String> metadata) {
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onError) {
final headers = createCallHeaders(
options.credentials.isSecure, authority, path, timeout, metadata);
final stream = transportConnection.makeRequest(headers);
return new Http2TransportStream(stream);
return new Http2TransportStream(stream, onError);
}
@override

View File

@ -19,6 +19,7 @@ import '../../shared/message.dart';
typedef void SocketClosedHandler();
typedef void ActiveStateHandler(bool isActive);
typedef void ErrorHandler(error);
abstract class GrpcTransportStream {
Stream<GrpcMessage> get incomingMessages;
@ -32,8 +33,8 @@ abstract class Transport {
SocketClosedHandler onSocketClosed;
Future<void> connect();
GrpcTransportStream makeRequest(
String path, Duration timeout, Map<String, String> metadata);
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onRequestFailure);
Future<void> finish();
Future<void> terminate();
}

View File

@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
@ -157,4 +156,3 @@ class _GrpcWebConversionSink extends ChunkedConversionSink<ByteBuffer> {
_out.close();
}
}

View File

@ -17,6 +17,7 @@ import 'dart:async';
import 'dart:html';
import 'dart:typed_data';
import 'package:grpc/src/shared/status.dart';
import 'package:meta/meta.dart';
import '../../shared/message.dart';
@ -24,11 +25,12 @@ import 'transport.dart';
import 'web_streams.dart';
class XhrTransportStream implements GrpcTransportStream {
HttpRequest _request;
final HttpRequest _request;
final ErrorHandler _onError;
int _requestBytesRead = 0;
StreamController<ByteBuffer> _incomingProcessor;
StreamController<GrpcMessage> _incomingMessages;
StreamController<List<int>> _outgoingMessages;
final StreamController<ByteBuffer> _incomingProcessor = StreamController();
final StreamController<GrpcMessage> _incomingMessages = StreamController();
final StreamController<List<int>> _outgoingMessages = StreamController();
@override
Stream<GrpcMessage> get incomingMessages => _incomingMessages.stream;
@ -36,48 +38,43 @@ class XhrTransportStream implements GrpcTransportStream {
@override
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
XhrTransportStream(this._request) {
_incomingProcessor = StreamController();
_incomingMessages = StreamController();
_outgoingMessages = StreamController();
_incomingProcessor.stream
.transform(GrpcWebDecoder())
.transform(grpcDecompressor())
.listen(_incomingMessages.add,
onError: _incomingMessages.addError,
onDone: _incomingMessages.close);
XhrTransportStream(this._request, this._onError) {
_outgoingMessages.stream
.map(frame)
.listen((data) => _request.send(data));
.listen((data) => _request.send(data), cancelOnError: true);
_request.onReadyStateChange.listen((data) {
final contentType = _request.getResponseHeader('Content-Type');
if (contentType == null) {
if (_incomingMessages.isClosed) {
return;
}
if (_request.readyState == HttpRequest.HEADERS_RECEIVED) {
if (contentType.startsWith('application/grpc')) {
if (_request.response == null) {
return;
switch (_request.readyState) {
case HttpRequest.HEADERS_RECEIVED:
_onHeadersReceived();
break;
case HttpRequest.DONE:
if (_request.status != 200) {
_onError(GrpcError.unavailable(
'XhrConnection status ${_request.status}'));
} else {
_close();
}
// Force a metadata message with headers
final headers = GrpcMetadata(_request.responseHeaders);
_incomingMessages.add(headers);
}
}
if (_request.readyState == HttpRequest.DONE) {
_incomingProcessor.close();
_outgoingMessages.close();
break;
}
});
_request.onError.listen((ProgressEvent event) {
if (_incomingMessages.isClosed) {
return;
}
_onError(GrpcError.unavailable('XhrConnection connection-error'));
terminate();
});
_request.onProgress.listen((_) {
// use response over responseText as most browsers don't support
if (_incomingMessages.isClosed) {
return;
}
// Use response over responseText as most browsers don't support
// using responseText during an onProgress event.
final responseString = _request.response as String;
final bytes = Uint8List.fromList(
@ -86,12 +83,48 @@ class XhrTransportStream implements GrpcTransportStream {
_requestBytesRead = responseString.length;
_incomingProcessor.add(bytes);
});
_incomingProcessor.stream
.transform(GrpcWebDecoder())
.transform(grpcDecompressor())
.listen(_incomingMessages.add,
onError: _onError, onDone: _incomingMessages.close);
}
_onHeadersReceived() {
final contentType = _request.getResponseHeader('Content-Type');
if (_request.status != 200) {
_onError(
GrpcError.unavailable('XhrConnection status ${_request.status}'));
return;
}
if (contentType == null) {
_onError(GrpcError.unavailable('XhrConnection missing Content-Type'));
return;
}
if (!contentType.startsWith('application/grpc')) {
_onError(
GrpcError.unavailable('XhrConnection bad Content-Type $contentType'));
return;
}
if (_request.response == null) {
_onError(GrpcError.unavailable('XhrConnection request null response'));
return;
}
// Force a metadata message with headers.
final headers = GrpcMetadata(_request.responseHeaders);
_incomingMessages.add(headers);
}
_close() {
_incomingProcessor.close();
_outgoingMessages.close();
}
@override
Future<void> terminate() async {
await _incomingProcessor.close();
await _outgoingMessages.close();
_close();
_request.abort();
}
}
@ -124,17 +157,16 @@ class XhrTransport extends Transport {
}
@override
GrpcTransportStream makeRequest(
String path, Duration timeout, Map<String, String> metadata) {
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onError) {
_request = HttpRequest();
_request.open('POST', '${host}:${port}${path}');
initializeRequest(_request, metadata);
return XhrTransportStream(_request);
return XhrTransportStream(_request, onError);
}
@override
Future<void> terminate() async {}
}

View File

@ -27,6 +27,6 @@ Future<Transport> connectXhrTransport(
}
class GrpcWebClientChannel extends ClientChannel {
GrpcWebClientChannel.xhr(String host,
{int port = 443}) : super(host, connectXhrTransport, port: port);
}
GrpcWebClientChannel.xhr(String host, {int port = 443})
: super(host, connectXhrTransport, port: port);
}

View File

@ -52,12 +52,12 @@ List<int> frame(List<int> payload) {
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor() =>
new StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
if (value is GrpcData) {
if (value.isCompressed) {
// TODO(dart-lang/grpc-dart#6): Actually handle decompression.
sink.add(new GrpcData(value.data, isCompressed: false));
return;
}
}
sink.add(value);
});
if (value is GrpcData) {
if (value.isCompressed) {
// TODO(dart-lang/grpc-dart#6): Actually handle decompression.
sink.add(new GrpcData(value.data, isCompressed: false));
return;
}
}
sink.add(value);
});