mirror of https://github.com/grpc/grpc-dart.git
Be more resilient to broken deployments (#460)
Require 200 HTTP status and a supported Content-Type header to be present in a response. When handling malformed responses make effort to translate HTTP statuses into gRPC statuses as gRPC protocol specification recommends. Fixes #421 Fixes #458 Co-authored-by: Vyacheslav Egorov <vegorov@google.com>
This commit is contained in:
parent
fb0c27a78a
commit
6c16fceb2a
|
|
@ -1,6 +1,12 @@
|
|||
## 3.0.1-dev
|
||||
|
||||
* Require `package:googleapis_auth` `^1.1.0`
|
||||
* Fix issues [#421](https://github.com/grpc/grpc-dart/issues/421) and
|
||||
[#458](https://github.com/grpc/grpc-dart/issues/458). Validate
|
||||
responses according to gRPC/gRPC-Web protocol specifications: require
|
||||
200 HTTP status and a supported `Content-Type` header to be present, as well
|
||||
as `grpc-status: 0` header. When handling malformed responses make effort
|
||||
to translate HTTP statuses into gRPC statuses.
|
||||
|
||||
## 3.0.0
|
||||
|
||||
|
|
|
|||
|
|
@ -14,13 +14,8 @@
|
|||
// limitations under the License.
|
||||
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:developer';
|
||||
|
||||
import 'package:grpc/src/generated/google/rpc/status.pb.dart';
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:protobuf/protobuf.dart';
|
||||
|
||||
import '../shared/codec.dart';
|
||||
import '../shared/message.dart';
|
||||
import '../shared/profiler.dart';
|
||||
|
|
@ -38,7 +33,6 @@ const _reservedHeaders = [
|
|||
'grpc-encoding',
|
||||
'user-agent',
|
||||
];
|
||||
const _statusDetailsHeader = 'grpc-status-details-bin';
|
||||
|
||||
/// Provides per-RPC metadata.
|
||||
///
|
||||
|
|
@ -343,23 +337,11 @@ class ClientCall<Q, R> implements Response {
|
|||
_stream!.terminate();
|
||||
}
|
||||
|
||||
/// If there's an error status then process it as a response error
|
||||
void _checkForErrorStatus(Map<String, String> metadata) {
|
||||
final status = metadata['grpc-status'];
|
||||
final statusCode = int.parse(status ?? '0');
|
||||
|
||||
if (statusCode != 0) {
|
||||
final messageMetadata = metadata['grpc-message'];
|
||||
final message =
|
||||
messageMetadata == null ? null : Uri.decodeFull(messageMetadata);
|
||||
|
||||
final statusDetails = metadata[_statusDetailsHeader];
|
||||
_responseError(GrpcError.custom(
|
||||
statusCode,
|
||||
message,
|
||||
statusDetails == null
|
||||
? const <GeneratedMessage>[]
|
||||
: decodeStatusDetails(statusDetails)));
|
||||
/// If there's an error status then process it as a response error.
|
||||
void _checkForErrorStatus(Map<String, String> trailers) {
|
||||
final error = grpcErrorFromTrailers(trailers);
|
||||
if (error != null) {
|
||||
_responseError(error);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -512,23 +494,3 @@ class ClientCall<Q, R> implements Response {
|
|||
} catch (_) {}
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a string of base64url data, attempt to parse a Status object from it.
|
||||
/// Once parsed, it will then map each detail item and attempt to parse it into
|
||||
/// its respective GeneratedMessage type, returning the list of parsed detail items
|
||||
/// as a `List<GeneratedMessage>`.
|
||||
///
|
||||
/// Prior to creating the Status object we pad the data to ensure its length is
|
||||
/// an even multiple of 4, which is a requirement in Dart when decoding base64url data.
|
||||
///
|
||||
/// If any errors are thrown during decoding/parsing, it will return an empty list.
|
||||
@visibleForTesting
|
||||
List<GeneratedMessage> decodeStatusDetails(String data) {
|
||||
try {
|
||||
final parsedStatus = Status.fromBuffer(
|
||||
base64Url.decode(data.padRight((data.length + 3) & ~3, '=')));
|
||||
return parsedStatus.details.map(parseErrorDetailsFromAny).toList();
|
||||
} catch (e) {
|
||||
return <GeneratedMessage>[];
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ class Http2TransportStream extends GrpcTransportStream {
|
|||
CodecRegistry? codecRegistry,
|
||||
Codec? compression,
|
||||
) : incomingMessages = _transportStream.incomingMessages
|
||||
.transform(GrpcHttpDecoder())
|
||||
.transform(GrpcHttpDecoder(forResponse: true))
|
||||
.transform(grpcDecompressor(codecRegistry: codecRegistry)) {
|
||||
_outgoingMessages.stream
|
||||
.map((payload) => frame(payload, compression))
|
||||
|
|
|
|||
|
|
@ -29,17 +29,11 @@ import 'web_streams.dart';
|
|||
|
||||
const _contentTypeKey = 'Content-Type';
|
||||
|
||||
/// All accepted content-type header's prefix.
|
||||
const _validContentTypePrefix = [
|
||||
'application/grpc',
|
||||
'application/json+protobuf',
|
||||
'application/x-protobuf'
|
||||
];
|
||||
|
||||
class XhrTransportStream implements GrpcTransportStream {
|
||||
final HttpRequest _request;
|
||||
final ErrorHandler _onError;
|
||||
final Function(XhrTransportStream stream) _onDone;
|
||||
bool _headersReceived = false;
|
||||
int _requestBytesRead = 0;
|
||||
final StreamController<ByteBuffer> _incomingProcessor = StreamController();
|
||||
final StreamController<GrpcMessage> _incomingMessages = StreamController();
|
||||
|
|
@ -104,37 +98,28 @@ class XhrTransportStream implements GrpcTransportStream {
|
|||
onError: _onError, onDone: _incomingMessages.close);
|
||||
}
|
||||
|
||||
bool _checkContentType(String contentType) {
|
||||
return _validContentTypePrefix.any(contentType.startsWith);
|
||||
bool _validateResponseState() {
|
||||
try {
|
||||
validateHttpStatusAndContentType(
|
||||
_request.status, _request.responseHeaders,
|
||||
rawResponse: _request.responseText);
|
||||
return true;
|
||||
} catch (e, st) {
|
||||
_onError(e, st);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void _onHeadersReceived() {
|
||||
// Force a metadata message with headers.
|
||||
final headers = GrpcMetadata(_request.responseHeaders);
|
||||
_incomingMessages.add(headers);
|
||||
_headersReceived = true;
|
||||
if (!_validateResponseState()) {
|
||||
return;
|
||||
}
|
||||
_incomingMessages.add(GrpcMetadata(_request.responseHeaders));
|
||||
}
|
||||
|
||||
void _onRequestDone() {
|
||||
final contentType = _request.getResponseHeader(_contentTypeKey);
|
||||
if (_request.status != 200) {
|
||||
_onError(
|
||||
GrpcError.unavailable('XhrConnection status ${_request.status}', null,
|
||||
_request.responseText),
|
||||
StackTrace.current);
|
||||
return;
|
||||
}
|
||||
if (contentType == null) {
|
||||
_onError(
|
||||
GrpcError.unavailable('XhrConnection missing Content-Type', null,
|
||||
_request.responseText),
|
||||
StackTrace.current);
|
||||
return;
|
||||
}
|
||||
if (!_checkContentType(contentType)) {
|
||||
_onError(
|
||||
GrpcError.unavailable('XhrConnection bad Content-Type $contentType',
|
||||
null, _request.responseText),
|
||||
StackTrace.current);
|
||||
if (!_headersReceived && !_validateResponseState()) {
|
||||
return;
|
||||
}
|
||||
if (_request.response == null) {
|
||||
|
|
|
|||
|
|
@ -13,10 +13,16 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import 'dart:convert';
|
||||
import 'dart:io' show HttpStatus;
|
||||
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:protobuf/protobuf.dart';
|
||||
|
||||
import 'package:grpc/src/generated/google/protobuf/any.pb.dart';
|
||||
import 'package:grpc/src/generated/google/rpc/code.pbenum.dart';
|
||||
import 'package:grpc/src/generated/google/rpc/error_details.pb.dart';
|
||||
import 'package:protobuf/protobuf.dart';
|
||||
import 'package:grpc/src/generated/google/rpc/status.pb.dart';
|
||||
|
||||
class StatusCode {
|
||||
/// The operation completed successfully.
|
||||
|
|
@ -120,6 +126,29 @@ class StatusCode {
|
|||
/// The request does not have valid authentication credentials for the
|
||||
/// operation.
|
||||
static const unauthenticated = 16;
|
||||
|
||||
/// Mapping taken from gRPC-Web JS implementation:
|
||||
/// https://github.com/grpc/grpc-web/blob/master/javascript/net/grpc/web/statuscode.js
|
||||
static const _httpStatusToGrpcStatus = <int, int>{
|
||||
HttpStatus.ok: StatusCode.ok,
|
||||
HttpStatus.badRequest: StatusCode.invalidArgument,
|
||||
HttpStatus.unauthorized: StatusCode.unauthenticated,
|
||||
HttpStatus.forbidden: StatusCode.permissionDenied,
|
||||
HttpStatus.notFound: StatusCode.notFound,
|
||||
HttpStatus.conflict: StatusCode.aborted,
|
||||
HttpStatus.preconditionFailed: StatusCode.failedPrecondition,
|
||||
HttpStatus.tooManyRequests: StatusCode.resourceExhausted,
|
||||
HttpStatus.clientClosedRequest: StatusCode.cancelled,
|
||||
HttpStatus.internalServerError: StatusCode.unknown,
|
||||
HttpStatus.notImplemented: StatusCode.unimplemented,
|
||||
HttpStatus.serviceUnavailable: StatusCode.unavailable,
|
||||
HttpStatus.gatewayTimeout: StatusCode.deadlineExceeded,
|
||||
};
|
||||
|
||||
/// Creates a gRPC Status code from a HTTP Status code
|
||||
static int fromHttpStatus(int status) {
|
||||
return _httpStatusToGrpcStatus[status] ?? StatusCode.unknown;
|
||||
}
|
||||
}
|
||||
|
||||
class GrpcError implements Exception {
|
||||
|
|
@ -309,3 +338,132 @@ GeneratedMessage parseErrorDetailsFromAny(Any any) {
|
|||
return any;
|
||||
}
|
||||
}
|
||||
|
||||
/// Validate HTTP status and Content-Type which arrived with the response:
|
||||
/// reject reponses with non-ok (200) status or unsupported Content-Type.
|
||||
///
|
||||
/// Note that grpc-status arrives in trailers and will be handled by
|
||||
/// [ClientCall._onResponseData].
|
||||
///
|
||||
/// gRPC over HTTP2 protocol specification mandates the following:
|
||||
///
|
||||
/// Implementations should expect broken deployments to send non-200 HTTP
|
||||
/// status codes in responses as well as a variety of non-GRPC content-types
|
||||
/// and to omit Status & Status-Message. Implementations must synthesize a
|
||||
/// Status & Status-Message to propagate to the application layer when this
|
||||
/// occurs.
|
||||
///
|
||||
void validateHttpStatusAndContentType(
|
||||
int? httpStatus, Map<String, String> headers,
|
||||
{Object? rawResponse}) {
|
||||
if (httpStatus == null) {
|
||||
throw GrpcError.unknown(
|
||||
'HTTP response status is unknown', null, rawResponse);
|
||||
}
|
||||
|
||||
if (httpStatus == 0) {
|
||||
throw GrpcError.unknown(
|
||||
'HTTP request completed without a status (potential CORS issue)',
|
||||
null,
|
||||
rawResponse);
|
||||
}
|
||||
|
||||
final status = StatusCode.fromHttpStatus(httpStatus);
|
||||
if (status != StatusCode.ok) {
|
||||
// [httpStatus] itself already indicates an error. Check if we also
|
||||
// received grpc-status/message (i.e. this is a Trailers-Only response)
|
||||
// and use this information to report a better error to the application
|
||||
// layer. However prefer to use status code derived from HTTP status
|
||||
// if grpc-status itself does not provide an informative error.
|
||||
final error = grpcErrorFromTrailers(headers);
|
||||
if (error == null || error.code == StatusCode.unknown) {
|
||||
throw GrpcError.custom(
|
||||
status,
|
||||
error?.message ??
|
||||
'HTTP connection completed with ${httpStatus} instead of 200',
|
||||
error?.details,
|
||||
rawResponse);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
final contentType = headers['content-type'];
|
||||
if (contentType == null) {
|
||||
throw GrpcError.unknown('missing content-type header', null, rawResponse);
|
||||
}
|
||||
|
||||
// Check if content-type header indicates a supported format.
|
||||
if (!_validContentTypePrefix.any(contentType.startsWith)) {
|
||||
throw GrpcError.unknown(
|
||||
'unsupported content-type (${contentType})', null, rawResponse);
|
||||
}
|
||||
}
|
||||
|
||||
GrpcError? grpcErrorFromTrailers(Map<String, String> trailers) {
|
||||
final status = trailers['grpc-status'];
|
||||
final statusCode = status != null ? int.parse(status) : StatusCode.unknown;
|
||||
|
||||
if (statusCode != StatusCode.ok) {
|
||||
final message = _tryDecodeStatusMessage(trailers['grpc-message']);
|
||||
final statusDetails = trailers[_statusDetailsHeader];
|
||||
return GrpcError.custom(
|
||||
statusCode,
|
||||
message,
|
||||
statusDetails == null
|
||||
? const <GeneratedMessage>[]
|
||||
: decodeStatusDetails(statusDetails));
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
const _statusDetailsHeader = 'grpc-status-details-bin';
|
||||
|
||||
/// All accepted content-type header's prefix. We are being more permissive
|
||||
/// then gRPC and gRPC-Web specifications because some of the services
|
||||
/// return slightly different content-types.
|
||||
const _validContentTypePrefix = [
|
||||
'application/grpc',
|
||||
'application/json+protobuf',
|
||||
'application/x-protobuf'
|
||||
];
|
||||
|
||||
/// Given a string of base64url data, attempt to parse a Status object from it.
|
||||
/// Once parsed, it will then map each detail item and attempt to parse it into
|
||||
/// its respective GeneratedMessage type, returning the list of parsed detail items
|
||||
/// as a `List<GeneratedMessage>`.
|
||||
///
|
||||
/// Prior to creating the Status object we pad the data to ensure its length is
|
||||
/// an even multiple of 4, which is a requirement in Dart when decoding base64url data.
|
||||
///
|
||||
/// If any errors are thrown during decoding/parsing, it will return an empty list.
|
||||
@visibleForTesting
|
||||
List<GeneratedMessage> decodeStatusDetails(String data) {
|
||||
try {
|
||||
final parsedStatus = Status.fromBuffer(
|
||||
base64Url.decode(data.padRight((data.length + 3) & ~3, '=')));
|
||||
return parsedStatus.details.map(parseErrorDetailsFromAny).toList();
|
||||
} catch (e) {
|
||||
return <GeneratedMessage>[];
|
||||
}
|
||||
}
|
||||
|
||||
/// Decode percent encoded status message contained in 'grpc-message' trailer.
|
||||
String? _tryDecodeStatusMessage(String? statusMessage) {
|
||||
if (statusMessage == null) {
|
||||
return statusMessage;
|
||||
}
|
||||
|
||||
try {
|
||||
return Uri.decodeFull(statusMessage);
|
||||
} catch (_) {
|
||||
// gRPC over HTTP2 protocol specification mandates:
|
||||
//
|
||||
// When decoding invalid values, implementations MUST NOT error or throw
|
||||
// away the message. At worst, the implementation can abort decoding the
|
||||
// status message altogether such that the user would received the raw
|
||||
// percent-encoded form.
|
||||
//
|
||||
return statusMessage;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,11 @@ class GrpcHttpEncoder extends Converter<GrpcMessage, StreamMessage> {
|
|||
}
|
||||
|
||||
class GrpcHttpDecoder extends Converter<StreamMessage, GrpcMessage> {
|
||||
/// [true] if this decoder is used for decoding responses.
|
||||
final bool forResponse;
|
||||
|
||||
GrpcHttpDecoder({this.forResponse = false});
|
||||
|
||||
@override
|
||||
GrpcMessage convert(StreamMessage input) {
|
||||
final sink = GrpcMessageSink();
|
||||
|
|
@ -50,18 +55,21 @@ class GrpcHttpDecoder extends Converter<StreamMessage, GrpcMessage> {
|
|||
|
||||
@override
|
||||
Sink<StreamMessage> startChunkedConversion(Sink<GrpcMessage> sink) {
|
||||
return _GrpcMessageConversionSink(sink);
|
||||
return _GrpcMessageConversionSink(sink, forResponse);
|
||||
}
|
||||
}
|
||||
|
||||
class _GrpcMessageConversionSink extends ChunkedConversionSink<StreamMessage> {
|
||||
final Sink<GrpcMessage> _out;
|
||||
final bool _forResponse;
|
||||
|
||||
final _dataHeader = Uint8List(5);
|
||||
Uint8List? _data;
|
||||
int _dataOffset = 0;
|
||||
|
||||
_GrpcMessageConversionSink(this._out);
|
||||
bool _headersReceived = false;
|
||||
|
||||
_GrpcMessageConversionSink(this._out, this._forResponse);
|
||||
|
||||
void _addData(DataStreamMessage chunk) {
|
||||
final chunkData = chunk.bytes;
|
||||
|
|
@ -117,7 +125,22 @@ class _GrpcMessageConversionSink extends ChunkedConversionSink<StreamMessage> {
|
|||
// TODO(jakobr): Handle duplicate header names correctly.
|
||||
headers[ascii.decode(header.name)] = ascii.decode(header.value);
|
||||
}
|
||||
// TODO(jakobr): Check :status, go to error mode if not 2xx.
|
||||
if (!_headersReceived) {
|
||||
if (_forResponse) {
|
||||
// Validate :status and content-type header here synchronously before
|
||||
// attempting to parse subsequent DataStreamMessage.
|
||||
final httpStatus = headers.containsKey(':status')
|
||||
? int.tryParse(headers[':status']!)
|
||||
: null;
|
||||
|
||||
// Validation might throw an exception. When [GrpcHttpDecoder] is
|
||||
// used as a [StreamTransformer] the underlying implementation of
|
||||
// [StreamTransformer.bind] will take care of forwarding this
|
||||
// exception into the stream as an error.
|
||||
validateHttpStatusAndContentType(httpStatus, headers);
|
||||
}
|
||||
_headersReceived = true;
|
||||
}
|
||||
_out.add(GrpcMetadata(headers));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io' show HttpStatus;
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:grpc/src/client/call.dart';
|
||||
|
|
@ -272,6 +273,8 @@ void main() {
|
|||
|
||||
void handleRequest(_) {
|
||||
harness.toClient.add(HeadersStreamMessage([
|
||||
Header.ascii(':status', '200'),
|
||||
Header.ascii('content-type', 'application/grpc'),
|
||||
Header.ascii('grpc-status', '$customStatusCode'),
|
||||
Header.ascii('grpc-message', customStatusMessage)
|
||||
], endStream: true));
|
||||
|
|
@ -286,6 +289,77 @@ void main() {
|
|||
);
|
||||
});
|
||||
|
||||
test('Call throws if HTTP status indicates an error', () async {
|
||||
void handleRequest(_) {
|
||||
harness.toClient.add(HeadersStreamMessage([
|
||||
Header.ascii(':status', HttpStatus.serviceUnavailable.toString()),
|
||||
Header.ascii('content-type', 'application/grpc'),
|
||||
]));
|
||||
// Send a frame that might be misinterpreted as a length-prefixed proto
|
||||
// message and cause OOM.
|
||||
harness.toClient
|
||||
.add(DataStreamMessage([0, 0xFF, 0xFF, 0xFF, 0xFF], endStream: true));
|
||||
harness.toClient.close();
|
||||
}
|
||||
|
||||
await harness.runFailureTest(
|
||||
clientCall: harness.client.unary(dummyValue),
|
||||
expectedException: GrpcError.unavailable(
|
||||
'HTTP connection completed with 503 instead of 200'),
|
||||
serverHandlers: [handleRequest],
|
||||
);
|
||||
});
|
||||
|
||||
test('Call throws if content-type indicates an error', () async {
|
||||
void handleRequest(_) {
|
||||
harness.toClient.add(HeadersStreamMessage([
|
||||
Header.ascii(':status', '200'),
|
||||
Header.ascii('content-type', 'text/html'),
|
||||
]));
|
||||
// Send a frame that might be misinterpreted as a length-prefixed proto
|
||||
// message and cause OOM.
|
||||
harness.toClient.add(DataStreamMessage([0, 0xFF, 0xFF, 0xFF, 0xFF]));
|
||||
harness.sendResponseTrailer();
|
||||
}
|
||||
|
||||
await harness.runFailureTest(
|
||||
clientCall: harness.client.unary(dummyValue),
|
||||
expectedException:
|
||||
GrpcError.unknown('unsupported content-type (text/html)'),
|
||||
serverHandlers: [handleRequest],
|
||||
);
|
||||
});
|
||||
|
||||
for (var contentType in [
|
||||
'application/json+protobuf',
|
||||
'application/x-protobuf'
|
||||
]) {
|
||||
test('$contentType content type is accepted', () async {
|
||||
const requestValue = 17;
|
||||
const responseValue = 19;
|
||||
|
||||
void handleRequest(StreamMessage message) {
|
||||
final data = validateDataMessage(message);
|
||||
expect(mockDecode(data.data), requestValue);
|
||||
|
||||
harness
|
||||
..toClient.add(HeadersStreamMessage([
|
||||
Header.ascii(':status', '200'),
|
||||
Header.ascii('content-type', contentType),
|
||||
]))
|
||||
..sendResponseValue(responseValue)
|
||||
..sendResponseTrailer();
|
||||
}
|
||||
|
||||
await harness.runTest(
|
||||
clientCall: harness.client.unary(requestValue),
|
||||
expectedResult: responseValue,
|
||||
expectedPath: '/Test/Unary',
|
||||
serverHandlers: [handleRequest],
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
test('Call throws decoded message', () async {
|
||||
const customStatusCode = 17;
|
||||
const customStatusMessage = 'エラー';
|
||||
|
|
@ -293,6 +367,8 @@ void main() {
|
|||
|
||||
void handleRequest(_) {
|
||||
harness.toClient.add(HeadersStreamMessage([
|
||||
Header.ascii(':status', '200'),
|
||||
Header.ascii('content-type', 'application/grpc'),
|
||||
Header.ascii('grpc-status', '$customStatusCode'),
|
||||
Header.ascii('grpc-message', encodedCustomStatusMessage)
|
||||
], endStream: true));
|
||||
|
|
@ -509,6 +585,8 @@ void main() {
|
|||
|
||||
void handleRequest(_) {
|
||||
harness.toClient.add(HeadersStreamMessage([
|
||||
Header.ascii(':status', '200'),
|
||||
Header.ascii('content-type', 'application/grpc'),
|
||||
Header.ascii('grpc-status', code.toString()),
|
||||
Header.ascii('grpc-message', message),
|
||||
Header.ascii('grpc-status-details-bin', details),
|
||||
|
|
|
|||
|
|
@ -243,6 +243,8 @@ void main() {
|
|||
|
||||
void handleRequest(_) {
|
||||
harness.toClient.add(HeadersStreamMessage([
|
||||
Header.ascii(':status', '200'),
|
||||
Header.ascii('content-type', 'application/grpc'),
|
||||
Header.ascii('grpc-status', '$customStatusCode'),
|
||||
Header.ascii('grpc-message', customStatusMessage)
|
||||
], endStream: true));
|
||||
|
|
|
|||
|
|
@ -216,7 +216,8 @@ void main() {
|
|||
test('Stream handles headers properly', () async {
|
||||
final responseHeaders = {
|
||||
'parameter_1': 'value_1',
|
||||
'parameter_2': 'value_2'
|
||||
'parameter_2': 'value_2',
|
||||
'content-type': 'application/grpc+proto',
|
||||
};
|
||||
|
||||
final transport = MockXhrClientConnection();
|
||||
|
|
@ -225,8 +226,6 @@ void main() {
|
|||
(error, _) => fail(error.toString()));
|
||||
|
||||
when(transport.latestRequest.responseHeaders).thenReturn(responseHeaders);
|
||||
when(transport.latestRequest.getResponseHeader('Content-Type'))
|
||||
.thenReturn('application/grpc+proto');
|
||||
when(transport.latestRequest.response)
|
||||
.thenReturn(String.fromCharCodes(frame(<int>[])));
|
||||
|
||||
|
|
@ -240,16 +239,20 @@ void main() {
|
|||
transport.latestRequest.readyStateChangeController
|
||||
.add(readyStateChangeEvent);
|
||||
|
||||
// Should be only one metadata message with headers.
|
||||
// Should be only one metadata message with headers augmented with :status
|
||||
// field.
|
||||
final message = await stream.incomingMessages.single as GrpcMetadata;
|
||||
expect(message.metadata, responseHeaders);
|
||||
});
|
||||
|
||||
test('Stream handles trailers properly', () async {
|
||||
final requestHeaders = {'parameter_1': 'value_1'};
|
||||
final requestHeaders = {
|
||||
'parameter_1': 'value_1',
|
||||
'content-type': 'application/grpc+proto',
|
||||
};
|
||||
final responseTrailers = <String, String>{
|
||||
'trailer_1': 'value_1',
|
||||
'trailer_2': 'value_2'
|
||||
'trailer_2': 'value_2',
|
||||
};
|
||||
|
||||
final connection = MockXhrClientConnection();
|
||||
|
|
@ -264,9 +267,7 @@ void main() {
|
|||
encodedTrailers[0] = 0x80; // Mark this frame as trailers.
|
||||
final encodedString = String.fromCharCodes(encodedTrailers);
|
||||
|
||||
when(connection.latestRequest.getResponseHeader('Content-Type'))
|
||||
.thenReturn('application/grpc+proto');
|
||||
when(connection.latestRequest.responseHeaders).thenReturn({});
|
||||
when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders);
|
||||
when(connection.latestRequest.response).thenReturn(encodedString);
|
||||
|
||||
// Set expectation for request readyState and generate events so that
|
||||
|
|
@ -284,11 +285,15 @@ void main() {
|
|||
final messages =
|
||||
await stream.incomingMessages.whereType<GrpcMetadata>().toList();
|
||||
expect(messages.length, 2);
|
||||
expect(messages.first.metadata, isEmpty);
|
||||
expect(messages.first.metadata, requestHeaders);
|
||||
expect(messages.last.metadata, responseTrailers);
|
||||
});
|
||||
|
||||
test('Stream handles empty trailers properly', () async {
|
||||
final requestHeaders = {
|
||||
'content-type': 'application/grpc+proto',
|
||||
};
|
||||
|
||||
final connection = MockXhrClientConnection();
|
||||
|
||||
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
|
||||
|
|
@ -298,9 +303,7 @@ void main() {
|
|||
encoded[0] = 0x80; // Mark this frame as trailers.
|
||||
final encodedString = String.fromCharCodes(encoded);
|
||||
|
||||
when(connection.latestRequest.getResponseHeader('Content-Type'))
|
||||
.thenReturn('application/grpc+proto');
|
||||
when(connection.latestRequest.responseHeaders).thenReturn({});
|
||||
when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders);
|
||||
when(connection.latestRequest.response).thenReturn(encodedString);
|
||||
|
||||
// Set expectation for request readyState and generate events so that
|
||||
|
|
@ -318,14 +321,15 @@ void main() {
|
|||
final messages =
|
||||
await stream.incomingMessages.whereType<GrpcMetadata>().toList();
|
||||
expect(messages.length, 2);
|
||||
expect(messages.first.metadata, isEmpty);
|
||||
expect(messages.first.metadata, requestHeaders);
|
||||
expect(messages.last.metadata, isEmpty);
|
||||
});
|
||||
|
||||
test('Stream deserializes data properly', () async {
|
||||
final requestHeaders = <String, String>{
|
||||
'parameter_1': 'value_1',
|
||||
'parameter_2': 'value_2'
|
||||
'parameter_2': 'value_2',
|
||||
'content-type': 'application/grpc+proto',
|
||||
};
|
||||
|
||||
final connection = MockXhrClientConnection();
|
||||
|
|
@ -333,9 +337,7 @@ void main() {
|
|||
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
|
||||
requestHeaders, (error, _) => fail(error.toString()));
|
||||
final data = List<int>.filled(10, 224);
|
||||
when(connection.latestRequest.getResponseHeader('Content-Type'))
|
||||
.thenReturn('application/grpc+proto');
|
||||
when(connection.latestRequest.responseHeaders).thenReturn({});
|
||||
when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders);
|
||||
when(connection.latestRequest.response)
|
||||
.thenReturn(String.fromCharCodes(frame(data)));
|
||||
|
||||
|
|
@ -366,9 +368,8 @@ void main() {
|
|||
errors.add(e as GrpcError);
|
||||
});
|
||||
const errorDetails = 'error details';
|
||||
when(connection.latestRequest.getResponseHeader('Content-Type'))
|
||||
.thenReturn('application/grpc+proto');
|
||||
when(connection.latestRequest.responseHeaders).thenReturn({});
|
||||
when(connection.latestRequest.responseHeaders)
|
||||
.thenReturn({'content-type': 'application/grpc+proto'});
|
||||
when(connection.latestRequest.readyState).thenReturn(HttpRequest.DONE);
|
||||
when(connection.latestRequest.responseText).thenReturn(errorDetails);
|
||||
connection.latestRequest.readyStateChangeController
|
||||
|
|
@ -377,10 +378,11 @@ void main() {
|
|||
expect(errors.single.rawResponse, errorDetails);
|
||||
});
|
||||
|
||||
test('Stream recieves multiple messages', () async {
|
||||
test('Stream receives multiple messages', () async {
|
||||
final metadata = <String, String>{
|
||||
'parameter_1': 'value_1',
|
||||
'parameter_2': 'value_2'
|
||||
'parameter_2': 'value_2',
|
||||
'content-type': 'application/grpc+proto',
|
||||
};
|
||||
|
||||
final connection = MockXhrClientConnection();
|
||||
|
|
@ -395,8 +397,6 @@ void main() {
|
|||
final encodedStrings =
|
||||
data.map((d) => String.fromCharCodes(frame(d))).toList();
|
||||
|
||||
when(connection.latestRequest.getResponseHeader('Content-Type'))
|
||||
.thenReturn('application/grpc+proto');
|
||||
when(connection.latestRequest.responseHeaders).thenReturn(metadata);
|
||||
when(connection.latestRequest.readyState)
|
||||
.thenReturn(HttpRequest.HEADERS_RECEIVED);
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
// @dart = 2.3
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:http2/transport.dart';
|
||||
import 'package:path/path.dart' as p;
|
||||
import 'package:stream_channel/stream_channel.dart';
|
||||
|
||||
|
|
@ -85,24 +85,27 @@ static_resources:
|
|||
port_value: %TARGET_PORT%
|
||||
''';
|
||||
|
||||
Future<void> hybridMain(StreamChannel channel) async {
|
||||
// Envoy output will be collected and dumped to stdout if envoy exits
|
||||
// with an error. Otherwise if verbose is specified it will be dumped
|
||||
// to stdout unconditionally.
|
||||
final output = <String>[];
|
||||
void _info(String line) {
|
||||
if (!verbose) {
|
||||
output.add(line);
|
||||
} else {
|
||||
print(line);
|
||||
}
|
||||
// Envoy output will be collected and dumped to stdout if envoy exits
|
||||
// with an error. Otherwise if verbose is specified it will be dumped
|
||||
// to stdout unconditionally.
|
||||
final output = <String>[];
|
||||
void _info(String line) {
|
||||
if (!verbose) {
|
||||
output.add(line);
|
||||
} else {
|
||||
print(line);
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> hybridMain(StreamChannel channel) async {
|
||||
// Spawn a gRPC server.
|
||||
final server = Server([EchoService()]);
|
||||
await server.serve(port: 0);
|
||||
_info('grpc server listening on ${server.port}');
|
||||
|
||||
final httpServer = await startHttpServer();
|
||||
_info('HTTP server listening on ${httpServer.port}');
|
||||
|
||||
// Create Envoy configuration.
|
||||
final tempDir = await Directory.systemTemp.createTemp();
|
||||
final config = p.join(tempDir.path, 'config.yaml');
|
||||
|
|
@ -136,7 +139,8 @@ if you are running tests locally.
|
|||
_info('envoy|stderr] $line');
|
||||
final m = portRe.firstMatch(line);
|
||||
if (m != null) {
|
||||
channel.sink.add(int.parse(m[1]));
|
||||
channel.sink
|
||||
.add({'grpcPort': int.parse(m[1]!), 'httpPort': httpServer.port});
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -164,4 +168,54 @@ if you are running tests locally.
|
|||
tempDir.deleteSync(recursive: true);
|
||||
}
|
||||
channel.sink.add('EXITED');
|
||||
|
||||
await server.shutdown();
|
||||
await httpServer.close();
|
||||
}
|
||||
|
||||
final testCases = <String, void Function(HttpResponse)>{
|
||||
'test:cors': (rs) {
|
||||
rs.headers.removeAll('Access-Control-Allow-Origin');
|
||||
rs.headers.add(HttpHeaders.contentTypeHeader, 'text/html');
|
||||
rs.write('some body');
|
||||
rs.close();
|
||||
},
|
||||
'test:status-503': (rs) {
|
||||
rs.headers.add(HttpHeaders.contentTypeHeader, 'text/html');
|
||||
rs.statusCode = HttpStatus.serviceUnavailable;
|
||||
rs.write('some body');
|
||||
rs.close();
|
||||
},
|
||||
'test:bad-content-type': (rs) {
|
||||
rs.headers.add(HttpHeaders.contentTypeHeader, 'text/html');
|
||||
rs.statusCode = HttpStatus.ok;
|
||||
rs.write('some body');
|
||||
rs.close();
|
||||
},
|
||||
};
|
||||
|
||||
void defaultHandler(HttpResponse rs) {
|
||||
rs.close();
|
||||
}
|
||||
|
||||
Future<HttpServer> startHttpServer() async {
|
||||
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
|
||||
server.defaultResponseHeaders.removeAll('x-frame-options');
|
||||
server.defaultResponseHeaders.removeAll('x-xss-protection');
|
||||
server.defaultResponseHeaders.removeAll('x-content-type-options');
|
||||
server.defaultResponseHeaders.add('Access-Control-Allow-Origin', '*');
|
||||
server.listen((request) async {
|
||||
_info('${request.method} ${request.requestedUri} ${request.headers}');
|
||||
final message = await GrpcHttpDecoder()
|
||||
.bind(request.map((list) => DataStreamMessage(list)))
|
||||
.first as GrpcData;
|
||||
final echoRequest = EchoRequest.fromBuffer(message.data);
|
||||
(testCases[echoRequest.message] ?? defaultHandler)(request.response);
|
||||
});
|
||||
return server;
|
||||
}
|
||||
|
||||
Future<void> main() async {
|
||||
final controller = StreamChannelController();
|
||||
await hybridMain(controller.local);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ void main() {
|
|||
// server (written in Dart) via gRPC-web protocol through a third party
|
||||
// gRPC-web proxy.
|
||||
test('gRPC-web echo test', () async {
|
||||
final channel = GrpcWebClientChannel.xhr(server.uri);
|
||||
final channel = GrpcWebClientChannel.xhr(server.grpcUri);
|
||||
final service = EchoServiceClient(channel);
|
||||
|
||||
const testMessage = 'hello from gRPC-web';
|
||||
|
|
@ -57,7 +57,7 @@ void main() {
|
|||
// Verify that terminate does not cause an exception when terminating
|
||||
// channel with multiple active requests.
|
||||
test('terminate works', () async {
|
||||
final channel = GrpcWebClientChannel.xhr(server.uri);
|
||||
final channel = GrpcWebClientChannel.xhr(server.grpcUri);
|
||||
final service = EchoServiceClient(channel);
|
||||
|
||||
const testMessage = 'hello from gRPC-web';
|
||||
|
|
@ -97,7 +97,7 @@ void main() {
|
|||
|
||||
// Verify that stream cancellation does not cause an exception
|
||||
test('stream cancellation works', () async {
|
||||
final channel = GrpcWebClientChannel.xhr(server.uri);
|
||||
final channel = GrpcWebClientChannel.xhr(server.grpcUri);
|
||||
final service = EchoServiceClient(channel);
|
||||
|
||||
const testMessage = 'hello from gRPC-web';
|
||||
|
|
@ -116,14 +116,40 @@ void main() {
|
|||
|
||||
await channel.terminate();
|
||||
});
|
||||
|
||||
final invalidResponseTests = {
|
||||
'cors': GrpcError.unknown(
|
||||
'HTTP request completed without a status (potential CORS issue)'),
|
||||
'status-503': GrpcError.unavailable(
|
||||
'HTTP connection completed with 503 instead of 200'),
|
||||
'bad-content-type':
|
||||
GrpcError.unknown('unsupported content-type (text/html)'),
|
||||
};
|
||||
|
||||
for (var entry in invalidResponseTests.entries) {
|
||||
// We test a bunch of boundary conditions by starting a simple HTTP server
|
||||
// we sends various erroneous responses back. The kind of response is
|
||||
// selected based on the payload of the request (i.e. the server expects
|
||||
// to get valid gRPC request with an [EchoRequest] payload and responds
|
||||
// with different errors based on the [EchoRequest.message] value.
|
||||
// See [startHttpServer] in [grpc_web_server.dart] for the server part.
|
||||
test('invalid response: ${entry.key}', () async {
|
||||
final channel = GrpcWebClientChannel.xhr(server.httpUri);
|
||||
final service = EchoServiceClient(channel,
|
||||
options: WebCallOptions(bypassCorsPreflight: true));
|
||||
expect(() => service.echo(EchoRequest()..message = 'test:${entry.key}'),
|
||||
throwsA(entry.value));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class GrpcWebServer {
|
||||
final StreamChannel channel;
|
||||
final Future<void> whenExited;
|
||||
final Uri uri;
|
||||
final Uri grpcUri;
|
||||
final Uri httpUri;
|
||||
|
||||
GrpcWebServer(this.channel, this.whenExited, this.uri);
|
||||
GrpcWebServer(this.channel, this.whenExited, this.grpcUri, this.httpUri);
|
||||
|
||||
Future<void> shutdown() async {
|
||||
channel.sink.add('shutdown');
|
||||
|
|
@ -142,7 +168,7 @@ class GrpcWebServer {
|
|||
// number we should be talking to.
|
||||
final serverChannel =
|
||||
spawnHybridUri('grpc_web_server.dart', stayAlive: true);
|
||||
final portCompleter = Completer<int>();
|
||||
final portCompleter = Completer<Map>();
|
||||
final exitCompleter = Completer<void>();
|
||||
serverChannel.stream.listen((event) {
|
||||
if (!portCompleter.isCompleted) {
|
||||
|
|
@ -158,12 +184,18 @@ class GrpcWebServer {
|
|||
}
|
||||
});
|
||||
|
||||
final port = await portCompleter.future;
|
||||
final ports = await portCompleter.future;
|
||||
|
||||
final grpcPort = ports['grpcPort'];
|
||||
final httpPort = ports['httpPort'];
|
||||
|
||||
// Note: we would like to test https as well, but we can't easily do it
|
||||
// because browsers like chrome don't trust self-signed certificates by
|
||||
// default.
|
||||
return GrpcWebServer(serverChannel, exitCompleter.future,
|
||||
Uri.parse('http://localhost:$port'));
|
||||
return GrpcWebServer(
|
||||
serverChannel,
|
||||
exitCompleter.future,
|
||||
Uri.parse('http://localhost:${grpcPort}'),
|
||||
Uri.parse('http://localhost:${httpPort}'));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -204,6 +204,8 @@ abstract class _Harness {
|
|||
|
||||
Iterable<ClientInterceptor>? interceptors;
|
||||
|
||||
bool headersWereSent = false;
|
||||
|
||||
late TestClient client;
|
||||
|
||||
base.ClientChannel createChannel();
|
||||
|
|
@ -233,17 +235,30 @@ abstract class _Harness {
|
|||
toClient.close();
|
||||
}
|
||||
|
||||
void sendResponseHeader({List<Header> headers = const []}) {
|
||||
toClient.add(HeadersStreamMessage(headers));
|
||||
static final _defaultHeaders = [
|
||||
Header.ascii(':status', '200'),
|
||||
Header.ascii('content-type', 'application/grpc'),
|
||||
];
|
||||
|
||||
static final _defaultTrailers = [
|
||||
Header.ascii('grpc-status', '0'),
|
||||
];
|
||||
|
||||
void sendResponseHeader() {
|
||||
assert(!headersWereSent);
|
||||
headersWereSent = true;
|
||||
toClient.add(HeadersStreamMessage(_defaultHeaders));
|
||||
}
|
||||
|
||||
void sendResponseValue(int value) {
|
||||
toClient.add(DataStreamMessage(frame(mockEncode(value))));
|
||||
}
|
||||
|
||||
void sendResponseTrailer(
|
||||
{List<Header> headers = const [], bool closeStream = true}) {
|
||||
toClient.add(HeadersStreamMessage(headers, endStream: true));
|
||||
void sendResponseTrailer({bool closeStream = true}) {
|
||||
toClient.add(HeadersStreamMessage([
|
||||
if (!headersWereSent) ..._defaultHeaders,
|
||||
..._defaultTrailers,
|
||||
], endStream: true));
|
||||
if (closeStream) toClient.close();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ void main() {
|
|||
|
||||
setUp(() {
|
||||
input = StreamController();
|
||||
output = input.stream.transform(GrpcHttpDecoder());
|
||||
output = input.stream.transform(GrpcHttpDecoder(forResponse: false));
|
||||
});
|
||||
|
||||
test('converts chunked data correctly', () async {
|
||||
|
|
|
|||
Loading…
Reference in New Issue