diff --git a/example/metadata/pubspec.yaml b/example/metadata/pubspec.yaml index 9ea516e..1c6e7c8 100644 --- a/example/metadata/pubspec.yaml +++ b/example/metadata/pubspec.yaml @@ -11,7 +11,6 @@ dependencies: grpc: path: ../../ protobuf: ^0.5.4 - http2: ^0.1.2 dev_dependencies: test: ^0.12.0 diff --git a/example/route_guide/pubspec.yaml b/example/route_guide/pubspec.yaml index a8d25c4..5d4a5db 100644 --- a/example/route_guide/pubspec.yaml +++ b/example/route_guide/pubspec.yaml @@ -11,7 +11,6 @@ dependencies: grpc: path: ../../ protobuf: ^0.5.4 - http2: ^0.1.2 dev_dependencies: test: ^0.12.0 diff --git a/interop/pubspec.yaml b/interop/pubspec.yaml index 6b50d98..16baafa 100644 --- a/interop/pubspec.yaml +++ b/interop/pubspec.yaml @@ -12,7 +12,6 @@ dependencies: grpc: path: ../ protobuf: ^0.5.4 - http2: ^0.1.2 dev_dependencies: test: ^0.12.0 diff --git a/lib/src/client.dart b/lib/src/client.dart index 308f0d4..eeaf4f7 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -139,18 +139,17 @@ class ClientCall implements Response { } } - Future _initiateCall() async { - final connection = await _channel.connect(); - final timeout = options?.timeout ?? _channel.options?.timeout; + static List
createCallHeaders(String path, String authority, + {String timeout, Map metadata}) { // TODO(jakobr): Populate HTTP-specific headers in connection? final headers =
[ _methodPost, _schemeHttp, - new Header.ascii(':path', _method.path), - new Header.ascii(':authority', _channel.host), + new Header.ascii(':path', path), + new Header.ascii(':authority', authority), ]; if (timeout != null) { - headers.add(new Header.ascii('grpc-timeout', toTimeoutString(timeout))); + headers.add(new Header.ascii('grpc-timeout', timeout)); } headers.addAll([ _contentTypeGrpc, @@ -158,14 +157,23 @@ class ClientCall implements Response { _grpcAcceptEncoding, _userAgent, ]); + metadata?.forEach((key, value) { + headers.add(new Header.ascii(key, value)); + }); + return headers; + } + + Future _initiateCall() async { + final connection = await _channel.connect(); + final timeout = options?.timeout ?? _channel.options?.timeout; + final timeoutString = timeout != null ? toTimeoutString(timeout) : null; // TODO(jakobr): Flip this around, and have the Channel create the call // object and apply options (including the above TODO). final customMetadata = {}; customMetadata.addAll(_channel.options?.metadata ?? {}); customMetadata.addAll(options?.metadata ?? {}); - customMetadata.forEach((key, value) { - headers.add(new Header.ascii(key, value)); - }); + final headers = createCallHeaders(_method.path, _channel.host, + timeout: timeoutString, metadata: customMetadata); _stream = connection.makeRequest(headers); _requests.stream .map(_method.requestSerializer) diff --git a/lib/src/server.dart b/lib/src/server.dart index 24cbf71..ade4f16 100644 --- a/lib/src/server.dart +++ b/lib/src/server.dart @@ -76,9 +76,7 @@ class Server { _server.listen((socket) { final connection = new ServerTransportConnection.viaSocket(socket); _connections.add(connection); - connection.incomingStreams.listen((stream) { - new ServerHandler(lookupService, stream).handle(); - }, onError: (error) { + connection.incomingStreams.listen(serveStream, onError: (error) { print('Connection error: $error'); }, onDone: () { _connections.remove(connection); @@ -88,6 +86,10 @@ class Server { }); } + void serveStream(ServerTransportStream stream) { + new ServerHandler(lookupService, stream).handle(); + } + Future shutdown() { final done = _connections.map((connection) => connection.finish()).toList(); if (_server != null) { @@ -198,6 +200,7 @@ class ServerHandler { void _onDataIdle(GrpcMessage message) { if (message is! GrpcMetadata) { _sendError(new GrpcError.unimplemented('Expected header frame')); + _sinkIncoming(); return; } final headerMessage = message @@ -222,6 +225,28 @@ class ServerHandler { _startStreamingRequest(); } + Future _toSingleFuture(Stream stream) { + T _ensureOnlyOneRequest(T previous, T element) { + if (previous != null) { + throw new GrpcError.unimplemented('More than one request received'); + } + return element; + } + + T _ensureOneRequest(T value) { + if (value == null) + throw new GrpcError.unimplemented('No requests received'); + return value; + } + + final future = + stream.fold(null, _ensureOnlyOneRequest).then(_ensureOneRequest); + // Make sure errors on the future aren't unhandled, but return the original + // future so the request handler can also get the error. + future.catchError((_) {}); + return future; + } + void _startStreamingRequest() { _incomingSubscription.pause(); _requests = new StreamController( @@ -236,17 +261,20 @@ class ServerHandler { if (_descriptor.streamingRequest) { _responses = _descriptor.handler(context, _requests.stream); } else { - _responses = _descriptor.handler(context, _requests.stream.single); + _responses = + _descriptor.handler(context, _toSingleFuture(_requests.stream)); } } else { Future response; if (_descriptor.streamingRequest) { response = _descriptor.handler(context, _requests.stream); } else { - response = _descriptor.handler(context, _requests.stream.single); + response = + _descriptor.handler(context, _toSingleFuture(_requests.stream)); } _responses = response.asStream(); } + _responseSubscription = _responses.listen(_onResponse, onError: _onResponseError, onDone: _onResponseDone, @@ -259,9 +287,10 @@ class ServerHandler { void _onDataActive(GrpcMessage message) { if (message is! GrpcData) { - _sendError(new GrpcError.unimplemented('Expected data frame')); + final error = new GrpcError.unimplemented('Expected request'); + _sendError(error); _requests - ..addError(new GrpcError.unimplemented('No request received')) + ..addError(error) ..close(); return; } @@ -272,6 +301,7 @@ class ServerHandler { _requests ..addError(error) ..close(); + return; } // TODO(jakobr): Cast should not be necessary here. @@ -296,10 +326,10 @@ class ServerHandler { void _onResponse(response) { try { + final bytes = _descriptor.responseSerializer(response); if (!_headersSent) { _sendHeaders(); } - final bytes = _descriptor.responseSerializer(response); _stream.sendData(GrpcHttpEncoder.frame(bytes)); } catch (error) { final grpcError = @@ -312,7 +342,6 @@ class ServerHandler { } _sendError(grpcError); _cancelResponseSubscription(); - _sinkIncoming(); } } @@ -330,6 +359,7 @@ class ServerHandler { void _sendHeaders() { if (_headersSent) throw new GrpcError.internal('Headers already sent'); + final headersMap = {}; headersMap.addAll(_customHeaders); _customHeaders = null; @@ -369,6 +399,7 @@ class ServerHandler { _stream.sendHeaders(trailers, endStream: true); // We're done! _cancelResponseSubscription(); + _sinkIncoming(); } // -- All states, incoming error / stream closed -- @@ -381,6 +412,8 @@ class ServerHandler { _requests.addError(new GrpcError.cancelled('Cancelled')); } _cancelResponseSubscription(); + _incomingSubscription.cancel(); + _stream.terminate(); } void _onDoneError() { @@ -390,7 +423,7 @@ class ServerHandler { void _onDoneExpected() { if (!(_hasReceivedRequest || _descriptor.streamingRequest)) { - final error = new GrpcError.unimplemented('Expected request message'); + final error = new GrpcError.unimplemented('No request received'); _sendError(error); _requests.addError(error); } diff --git a/pubspec.yaml b/pubspec.yaml index bcf1766..5f84b4e 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -9,7 +9,7 @@ environment: dependencies: async: ^1.13.3 meta: ^1.0.5 - http2: ^0.1.2 + http2: ^0.1.3 dev_dependencies: mockito: ^2.0.2 diff --git a/test/client_test.dart b/test/client_test.dart index dc0cec9..9ea682d 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -5,12 +5,12 @@ import 'dart:async'; import 'package:grpc/src/status.dart'; -import 'package:grpc/src/streams.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; import 'package:mockito/mockito.dart'; import 'src/client_utils.dart'; +import 'src/utils.dart'; void main() { const dummyValue = 0; @@ -25,14 +25,12 @@ void main() { harness.tearDown(); }); - test('Unary calls work end-to-end', () async { + test('Unary calls work on the client', () async { const requestValue = 17; const responseValue = 19; - void _handleRequest(StreamMessage message) { - expect(message, new isInstanceOf()); - expect(message.endStream, false); - final data = new GrpcHttpDecoder().convert(message) as GrpcData; + void handleRequest(StreamMessage message) { + final data = validateDataMessage(message); expect(mockDecode(data.data), requestValue); harness @@ -45,20 +43,18 @@ void main() { clientCall: harness.client.unary(requestValue), expectedResult: responseValue, expectedPath: '/Test/Unary', - serverHandlers: [_handleRequest], + serverHandlers: [handleRequest], ); }); - test('Client-streaming calls work end-to-end', () async { + test('Client-streaming calls work on the client', () async { const requests = const [17, 3]; const response = 12; var index = 0; void handleRequest(StreamMessage message) { - expect(message, new isInstanceOf()); - expect(message.endStream, false); - final data = new GrpcHttpDecoder().convert(message) as GrpcData; + final data = validateDataMessage(message); expect(mockDecode(data.data), requests[index++]); } @@ -79,14 +75,12 @@ void main() { ); }); - test('Server-streaming calls work end-to-end', () async { + test('Server-streaming calls work on the client', () async { const request = 4; const responses = const [3, 17, 9]; void handleRequest(StreamMessage message) { - expect(message, new isInstanceOf()); - expect(message.endStream, false); - final data = new GrpcHttpDecoder().convert(message) as GrpcData; + final data = validateDataMessage(message); expect(mockDecode(data.data), request); harness.sendResponseHeader(); @@ -102,16 +96,14 @@ void main() { ); }); - test('Bidirectional calls work end-to-end', () async { + test('Bidirectional calls work on the client', () async { const requests = const [1, 15, 7]; const responses = const [3, 17, 9]; var index = 0; void handleRequest(StreamMessage message) { - expect(message, new isInstanceOf()); - expect(message.endStream, false); - final data = new GrpcHttpDecoder().convert(message) as GrpcData; + final data = validateDataMessage(message); expect(mockDecode(data.data), requests[index]); if (index == 0) { diff --git a/test/server_test.dart b/test/server_test.dart new file mode 100644 index 0000000..81cff54 --- /dev/null +++ b/test/server_test.dart @@ -0,0 +1,270 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:grpc/grpc.dart'; +import 'package:grpc/src/status.dart'; +import 'package:http2/transport.dart'; +import 'package:test/test.dart'; + +import 'src/server_utils.dart'; + +void main() { + const dummyValue = 17; + + ServerHarness harness; + + setUp(() { + harness = new ServerHarness()..setUp(); + }); + + tearDown(() { + harness.tearDown(); + }); + + test('Unary calls work on the server', () async { + const expectedRequest = 5; + const expectedResponse = 7; + Future methodHandler(ServiceCall call, Future request) async { + expect(await request, expectedRequest); + return expectedResponse; + } + + harness + ..service.unaryHandler = methodHandler + ..runTest('/Test/Unary', [expectedRequest], [expectedResponse]); + await harness.fromServer.done; + }); + + test('Client-streaming calls work on the server', () async { + const expectedRequests = const [5, 3, 17]; + const expectedResponse = 12; + Future methodHandler(ServiceCall call, Stream request) async { + expect(await request.toList(), expectedRequests); + return expectedResponse; + } + + harness + ..service.clientStreamingHandler = methodHandler + ..runTest('/Test/ClientStreaming', expectedRequests, [expectedResponse]); + await harness.fromServer.done; + }); + + test('Server-streaming calls work on the server', () async { + const expectedRequest = 5; + const expectedResponses = const [7, 9, 1]; + + Stream methodHandler(ServiceCall call, Future request) async* { + expect(await request, expectedRequest); + for (final value in expectedResponses) { + yield value; + } + } + + harness + ..service.serverStreamingHandler = methodHandler + ..runTest('/Test/ServerStreaming', [expectedRequest], expectedResponses); + await harness.fromServer.done; + }); + + test('Bidirectional calls work on the server', () async { + const expectedRequests = const [3, 1, 7]; + final expectedResponses = expectedRequests.map((v) => v + 5).toList(); + + Stream methodHandler(ServiceCall call, Stream request) async* { + yield* request.map((value) => value + 5); + } + + harness + ..service.bidirectionalHandler = methodHandler + ..runTest('/Test/Bidirectional', expectedRequests, expectedResponses); + await harness.fromServer.done; + }); + + test('Server returns error on missing call header', () async { + harness + ..expectErrorResponse(StatusCode.unimplemented, 'Expected header frame') + ..sendData(dummyValue); + await harness.fromServer.done; + }); + + test('Server returns error on invalid path', () async { + harness + ..expectErrorResponse(StatusCode.unimplemented, 'Invalid path') + ..sendRequestHeader('InvalidPath'); + await harness.fromServer.done; + }); + + test('Server returns error on unimplemented path', () async { + harness + ..expectErrorResponse( + StatusCode.unimplemented, 'Path /Test/NotFound not found') + ..sendRequestHeader('/Test/NotFound'); + await harness.fromServer.done; + }); + + /// Returns a service method handler that verifies that awaiting the request + /// throws a specific error. + Future Function(ServiceCall call, Future request) expectError( + expectedError) { + return expectAsync2((ServiceCall call, Future request) async { + try { + final result = await request; + registerException('Did not throw'); + return result; + } catch (caughtError) { + try { + expect(caughtError, expectedError); + } catch (failure, stack) { + registerException(failure, stack); + } + rethrow; + } + }, count: 1); + } + + /// Returns a service method handler that verifies that awaiting the request + /// stream throws a specific error. + Stream Function(ServiceCall call, Stream request) + expectErrorStreaming(expectedError) { + return (ServiceCall call, Stream request) async* { + try { + await for (var entry in request) { + yield entry; + } + registerException('Did not throw'); + } catch (caughtError) { + try { + expect(caughtError, expectedError); + } catch (failure, stack) { + registerException(failure, stack); + } + rethrow; + } + }; + } + + test('Server returns error on missing request for unary call', () async { + harness + ..service.unaryHandler = + expectError(new GrpcError.unimplemented('No request received')) + ..expectErrorResponse(StatusCode.unimplemented, 'No request received') + ..sendRequestHeader('/Test/Unary') + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Server returns error if multiple headers are received for unary call', + () async { + harness + ..service.unaryHandler = + expectError(new GrpcError.unimplemented('Expected request')) + ..expectErrorResponse(StatusCode.unimplemented, 'Expected request') + ..sendRequestHeader('/Test/Unary') + ..toServer.add(new HeadersStreamMessage([])) + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Server returns error on too many requests for unary call', () async { + harness + ..service.unaryHandler = + expectError(new GrpcError.unimplemented('Too many requests')) + ..expectErrorResponse(StatusCode.unimplemented, 'Too many requests') + ..sendRequestHeader('/Test/Unary') + ..sendData(dummyValue) + ..sendData(dummyValue) + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Server returns request deserialization errors', () async { + harness + ..service.bidirectionalHandler = expectErrorStreaming( + new GrpcError.internal('Error deserializing request: Failed')) + ..expectErrorResponse( + StatusCode.internal, 'Error deserializing request: Failed') + ..sendRequestHeader('/Test/RequestError') + ..sendData(dummyValue) + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Server returns response serialization errors', () async { + harness + ..service.bidirectionalHandler = expectErrorStreaming( + new GrpcError.internal('Error sending response: Failed')) + ..expectErrorResponse( + StatusCode.internal, 'Error sending response: Failed') + ..sendRequestHeader('/Test/ResponseError') + ..sendData(dummyValue) + ..sendData(dummyValue) + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Header can only be sent once', () async { + Future methodHandler(ServiceCall call, Future request) async { + call.sendHeaders(); + call.sendHeaders(); + return await request; + } + + harness + ..service.unaryHandler = methodHandler + ..expectTrailingErrorResponse(StatusCode.internal, 'Headers already sent') + ..sendRequestHeader('/Test/Unary'); + await harness.fromServer.done; + }); + + test('Server receives cancel', () async { + final success = new Completer(); + + Future methodHandler(ServiceCall call, Future request) async { + try { + final result = await request; + registerException('Did not throw'); + return result; + } catch (caughtError) { + try { + expect(caughtError, new GrpcError.cancelled('Cancelled')); + expect(call.isCanceled, isTrue); + success.complete(true); + } catch (failure, stack) { + registerException(failure, stack); + } + } finally { + if (!success.isCompleted) { + success.complete(false); + } + } + return dummyValue; + } + + harness + ..service.unaryHandler = methodHandler + ..fromServer.stream.listen(expectAsync1((_) {}, count: 0), + onError: expectAsync1((error) { + expect(error, 'TERMINATED'); + }, count: 1), + onDone: expectAsync0(() {}, count: 1)) + ..sendRequestHeader('/Test/Unary') + ..toServer.addError('CANCEL'); + + expect(await success.future, isTrue); + await harness.toServer.close(); + await harness.fromServer.done; + }); + + test( + 'Server returns error if request stream is closed before sending anything', + () async { + harness + ..expectErrorResponse( + StatusCode.unavailable, 'Request stream closed unexpectedly') + ..toServer.close(); + await harness.fromServer.done; + }); +} diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index 5378cb9..cc23dca 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -3,7 +3,6 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; -import 'dart:convert'; import 'package:grpc/src/streams.dart'; import 'package:http2/transport.dart'; @@ -12,6 +11,8 @@ import 'package:mockito/mockito.dart'; import 'package:grpc/grpc.dart'; +import 'utils.dart'; + class MockConnection extends Mock implements ClientTransportConnection {} class MockStream extends Mock implements ClientTransportStream {} @@ -20,9 +21,6 @@ class MockChannel extends Mock implements ClientChannel {} typedef ServerMessageHandler = void Function(StreamMessage message); -List mockEncode(int value) => new List.filled(value, 0); -int mockDecode(List value) => value.length; - class TestClient { final ClientChannel _channel; @@ -73,8 +71,8 @@ class ClientHarness { MockChannel channel; MockStream stream; - StreamController fromClient; - StreamController toClient; + StreamController fromClient; + StreamController toClient; TestClient client; @@ -112,33 +110,12 @@ class ClientHarness { if (closeStream) toClient.close(); } - void validateHeaders(List
headers, - {String path, Map customHeaders}) { - final headerMap = new Map.fromIterable(headers, - key: (h) => ASCII.decode(h.name), value: (h) => ASCII.decode(h.value)); - expect(headerMap[':method'], 'POST'); - expect(headerMap[':scheme'], 'http'); - if (path != null) { - expect(headerMap[':path'], path); - } - expect(headerMap[':authority'], 'test'); - expect(headerMap['grpc-timeout'], isNull); - expect(headerMap['content-type'], 'application/grpc'); - expect(headerMap['te'], 'trailers'); - expect(headerMap['grpc-accept-encoding'], 'identity'); - expect(headerMap['user-agent'], startsWith('dart-grpc/')); - - customHeaders?.forEach((key, value) { - expect(headerMap[key], value); - }); - } - Future runTest( {Future clientCall, dynamic expectedResult, String expectedPath, Map expectedCustomHeaders, - List serverHandlers = const [], + List serverHandlers = const [], Function doneHandler, bool expectDone = true}) async { int serverHandlerIndex = 0; @@ -160,7 +137,7 @@ class ClientHarness { final List
capturedHeaders = verify(connection.makeRequest(captureAny)).captured.single; - validateHeaders(capturedHeaders, + validateRequestHeaders(capturedHeaders, path: expectedPath, customHeaders: expectedCustomHeaders); await clientSubscription.cancel(); @@ -180,7 +157,7 @@ class ClientHarness { dynamic expectedException, String expectedPath, Map expectedCustomHeaders, - List serverHandlers = const [], + List serverHandlers = const [], bool expectDone = true}) async { return runTest( clientCall: expectThrows(clientCall, expectedException), diff --git a/test/src/server_utils.dart b/test/src/server_utils.dart new file mode 100644 index 0000000..918c714 --- /dev/null +++ b/test/src/server_utils.dart @@ -0,0 +1,183 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:grpc/src/streams.dart'; +import 'package:http2/transport.dart'; +import 'package:test/test.dart'; + +import 'package:grpc/grpc.dart'; + +import 'utils.dart'; + +class TestService extends Service { + @override + String get $name => 'Test'; + + Future Function(ServiceCall call, Future request) unaryHandler; + Future Function(ServiceCall call, Stream request) + clientStreamingHandler; + Stream Function(ServiceCall call, Future request) + serverStreamingHandler; + Stream Function(ServiceCall call, Stream request) + bidirectionalHandler; + + TestService() { + $addMethod(ServerHarness.createMethod('Unary', _unary, false, false)); + $addMethod(ServerHarness.createMethod( + 'ClientStreaming', _clientStreaming, true, false)); + $addMethod(ServerHarness.createMethod( + 'ServerStreaming', _serverStreaming, false, true)); + $addMethod(ServerHarness.createMethod( + 'Bidirectional', _bidirectional, true, true)); + $addMethod(new ServiceMethod('RequestError', _bidirectional, true, true, + (List value) => throw 'Failed', mockEncode)); + $addMethod(new ServiceMethod('ResponseError', _bidirectional, true, true, + mockDecode, (int value) => throw 'Failed')); + } + + Future _unary(ServiceCall call, Future request) async { + if (unaryHandler == null) { + fail('Should not invoke Unary'); + } + return unaryHandler(call, request); + } + + Future _clientStreaming(ServiceCall call, Stream request) async { + if (clientStreamingHandler == null) { + fail('Should not invoke ClientStreaming'); + } + return clientStreamingHandler(call, request); + } + + Stream _serverStreaming(ServiceCall call, Future request) async* { + if (serverStreamingHandler == null) { + fail('Should not invoke ServerStreaming'); + } + yield* serverStreamingHandler(call, request); + } + + Stream _bidirectional(ServiceCall call, Stream request) async* { + if (bidirectionalHandler == null) { + fail('Should not invoke Bidirectional'); + } + yield* bidirectionalHandler(call, request); + } +} + +class TestServerStream extends ServerTransportStream { + final Stream incomingMessages; + final StreamSink outgoingMessages; + + TestServerStream(this.incomingMessages, this.outgoingMessages); + + @override + int get id => -1; + + @override + void terminate() { + outgoingMessages.addError('TERMINATED'); + outgoingMessages.close(); + } + + @override + set onTerminated(void value(int)) {} + + @override + bool get canPush => true; + + @override + ServerTransportStream push(List
requestHeaders) => null; +} + +class ServerHarness { + final toServer = new StreamController(); + final fromServer = new StreamController(); + final service = new TestService(); + final server = new Server(); + + static ServiceMethod createMethod(String name, + Function methodHandler, bool clientStreaming, bool serverStreaming) { + return new ServiceMethod(name, methodHandler, clientStreaming, + serverStreaming, mockDecode, mockEncode); + } + + void setUp() { + server.addService(service); + final stream = new TestServerStream(toServer.stream, fromServer.sink); + server.serveStream(stream); + } + + void tearDown() { + fromServer.close(); + toServer.close(); + } + + void setupTest(List handlers) { + int handlerIndex = 0; + void handleMessages(StreamMessage message) { + handlers[handlerIndex++](message); + } + + fromServer.stream.listen( + expectAsync1(handleMessages, count: handlers.length), + onError: expectAsync1((_) {}, count: 0), + onDone: expectAsync0(() {}, count: 1)); + } + + void expectErrorResponse(int status, String message) { + setupTest([errorTrailerValidator(status, message, validateHeader: true)]); + } + + void expectTrailingErrorResponse(int status, String message) { + setupTest([ + headerValidator(), + errorTrailerValidator(status, message, validateHeader: false) + ]); + } + + void sendRequestHeader(String path, + {String authority = 'test', + String timeout, + Map metadata}) { + final headers = ClientCall.createCallHeaders(path, authority, + timeout: timeout, metadata: metadata); + toServer.add(new HeadersStreamMessage(headers)); + } + + void sendData(int value) { + toServer + .add(new DataStreamMessage(GrpcHttpEncoder.frame(mockEncode(value)))); + } + + void runTest(String path, List requests, List expectedResponses) { + void handleHeader(StreamMessage message) { + final header = validateMetadataMessage(message); + validateResponseHeaders(header.metadata); + } + + int responseIndex = 0; + void handleResponse(StreamMessage message) { + final response = validateDataMessage(message); + expect(mockDecode(response.data), expectedResponses[responseIndex++]); + } + + void handleTrailer(StreamMessage message) { + final trailer = validateMetadataMessage(message, endStream: true); + validateResponseTrailers(trailer.metadata); + } + + final handlers = [handleHeader]; + for (var i = 0; i < expectedResponses.length; i++) { + handlers.add(handleResponse); + } + handlers.add(handleTrailer); + + setupTest(handlers); + sendRequestHeader(path); + requests.forEach(sendData); + toServer.close(); + } +} diff --git a/test/src/utils.dart b/test/src/utils.dart new file mode 100644 index 0000000..54a8f08 --- /dev/null +++ b/test/src/utils.dart @@ -0,0 +1,107 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:convert'; + +import 'package:grpc/src/streams.dart'; +import 'package:http2/transport.dart'; +import 'package:test/test.dart'; + +typedef MessageHandler = void Function(StreamMessage message); + +List mockEncode(int value) => new List.filled(value, 0); + +int mockDecode(List value) => value.length; + +Map headersToMap(List
headers) => + new Map.fromIterable(headers, + key: (h) => ASCII.decode(h.name), value: (h) => ASCII.decode(h.value)); + +void validateRequestHeaders(List
headers, + {String path, + String authority = 'test', + String timeout, + Map customHeaders}) { + final headerMap = headersToMap(headers); + expect(headerMap[':method'], 'POST'); + expect(headerMap[':scheme'], 'http'); + if (path != null) { + expect(headerMap[':path'], path); + } + expect(headerMap[':authority'], authority); + expect(headerMap['grpc-timeout'], timeout); + expect(headerMap['content-type'], 'application/grpc'); + expect(headerMap['te'], 'trailers'); + expect(headerMap['grpc-accept-encoding'], 'identity'); + expect(headerMap['user-agent'], startsWith('dart-grpc/')); + + customHeaders?.forEach((key, value) { + expect(headerMap[key], value); + }); +} + +void validateResponseHeaders(Map headers, + {int status = 200, + bool allowTrailers = false, + Map customHeaders}) { + expect(headers[':status'], '200'); + expect(headers['content-type'], startsWith('application/grpc')); + if (!allowTrailers) { + expect(headers.containsKey('grpc-status'), isFalse); + expect(headers.containsKey('grpc-message'), isFalse); + } + customHeaders?.forEach((key, value) { + expect(headers[key], value); + }); +} + +void validateResponseTrailers(Map trailers, + {int status = 0, String message, Map customTrailers}) { + expect(trailers['grpc-status'], '$status'); + if (message != null) { + expect(trailers['grpc-message'], message); + } + customTrailers?.forEach((key, value) { + expect(trailers[key], value); + }); +} + +GrpcMetadata validateMetadataMessage(StreamMessage message, + {bool endStream = false}) { + expect(message, new isInstanceOf()); + expect(message.endStream, endStream); + + final decoded = new GrpcHttpDecoder().convert(message); + expect(decoded, new isInstanceOf()); + return decoded; +} + +GrpcData validateDataMessage(StreamMessage message, {bool endStream = false}) { + expect(message, new isInstanceOf()); + expect(message.endStream, endStream); + + final decoded = new GrpcHttpDecoder().convert(message); + expect(decoded, new isInstanceOf()); + return decoded; +} + +void Function(StreamMessage message) headerValidator() { + return (StreamMessage message) { + final header = validateMetadataMessage(message, endStream: false); + validateResponseHeaders(header.metadata, allowTrailers: true); + }; +} + +void Function(StreamMessage message) errorTrailerValidator( + int status, String statusMessage, + {bool validateHeader = false}) { + return (StreamMessage message) { + final trailer = validateMetadataMessage(message, endStream: true); + if (validateHeader) { + validateResponseHeaders(trailer.metadata, allowTrailers: true); + } + validateResponseTrailers(trailer.metadata, + status: status, message: statusMessage); + }; +}