From ac317e6e4d42c74d349f66cbf597407c011806a3 Mon Sep 17 00:00:00 2001 From: Jakob Andersen Date: Mon, 17 Jul 2017 15:11:45 +0200 Subject: [PATCH] Add basic client tests. (#26) --- lib/src/client.dart | 2 +- lib/src/shared.dart | 19 ++- lib/src/status.dart | 9 ++ lib/src/streams.dart | 15 -- pubspec.yaml | 1 + test/client_test.dart | 322 +++++++++++++++++++++++++++++++++++++ test/src/client_utils.dart | 193 ++++++++++++++++++++++ test/stream_test.dart | 27 ---- 8 files changed, 544 insertions(+), 44 deletions(-) create mode 100644 test/client_test.dart create mode 100644 test/src/client_utils.dart diff --git a/lib/src/client.dart b/lib/src/client.dart index 4a1bed6..308f0d4 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -101,7 +101,7 @@ class ClientCall implements Response { Map _headerMetadata; TransportStream _stream; - final StreamController _requests = new StreamController(); + final _requests = new StreamController(); StreamController _responses; StreamSubscription _responseSubscription; diff --git a/lib/src/shared.dart b/lib/src/shared.dart index e104008..fabc5c3 100644 --- a/lib/src/shared.dart +++ b/lib/src/shared.dart @@ -6,6 +6,7 @@ import 'dart:async'; import 'package:async/async.dart'; import 'package:grpc/src/client.dart'; +import 'package:grpc/src/status.dart'; /// A gRPC response. abstract class Response { @@ -33,7 +34,23 @@ class ResponseFuture extends DelegatingFuture with _ResponseMixin { final ClientCall _call; - ResponseFuture(this._call) : super(_call.response.single); + static R _ensureOnlyOneResponse(R previous, R element) { + if (previous != null) { + throw new GrpcError.unimplemented('More than one response received'); + } + return element; + } + + static R _ensureOneResponse(R value) { + if (value == null) + throw new GrpcError.unimplemented('No responses received'); + return value; + } + + ResponseFuture(this._call) + : super(_call.response + .fold(null, _ensureOnlyOneResponse) + .then(_ensureOneResponse)); } /// A gRPC response producing a stream of values. diff --git a/lib/src/status.dart b/lib/src/status.dart index 4d28431..5acae34 100644 --- a/lib/src/status.dart +++ b/lib/src/status.dart @@ -219,6 +219,15 @@ class GrpcError { /// operation. GrpcError.unauthenticated([this.message]) : code = StatusCode.unauthenticated; + @override + bool operator ==(other) { + if (other is! GrpcError) return false; + return code == other.code && message == other.message; + } + + @override + int get hashCode => code.hashCode ^ (message?.hashCode ?? 17); + @override String toString() => 'gRPC Error ($code, $message)'; } diff --git a/lib/src/streams.dart b/lib/src/streams.dart index 3d48e83..526b9cc 100644 --- a/lib/src/streams.dart +++ b/lib/src/streams.dart @@ -92,9 +92,6 @@ class _GrpcMessageConversionSink extends ChunkedConversionSink { Uint8List _data; int _dataOffset = 0; - bool _headerReceived = false; - bool _trailerReceived = false; - _GrpcMessageConversionSink(this._out); void _addData(DataStreamMessage chunk) { @@ -152,23 +149,11 @@ class _GrpcMessageConversionSink extends ChunkedConversionSink { headers[ASCII.decode(header.name)] = ASCII.decode(header.value); } _out.add(new GrpcMetadata(headers)); - if (_headerReceived) { - _trailerReceived = true; - } else { - _headerReceived = true; - } } @override void add(StreamMessage chunk) { - if (_trailerReceived) { - throw new GrpcError.unimplemented('Received data after trailer metadata'); - } if (chunk is DataStreamMessage) { - if (!_headerReceived) { - throw new GrpcError.unimplemented( - 'Received data before header metadata'); - } _addData(chunk); } else if (chunk is HeadersStreamMessage) { _addHeaders(chunk); diff --git a/pubspec.yaml b/pubspec.yaml index 2d1e665..bcf1766 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -12,4 +12,5 @@ dependencies: http2: ^0.1.2 dev_dependencies: + mockito: ^2.0.2 test: ^0.12.0 diff --git a/test/client_test.dart b/test/client_test.dart new file mode 100644 index 0000000..dc0cec9 --- /dev/null +++ b/test/client_test.dart @@ -0,0 +1,322 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:grpc/src/status.dart'; +import 'package:grpc/src/streams.dart'; +import 'package:http2/transport.dart'; +import 'package:test/test.dart'; +import 'package:mockito/mockito.dart'; + +import 'src/client_utils.dart'; + +void main() { + const dummyValue = 0; + + ClientHarness harness; + + setUp(() { + harness = new ClientHarness()..setUp(); + }); + + tearDown(() { + harness.tearDown(); + }); + + test('Unary calls work end-to-end', () async { + const requestValue = 17; + const responseValue = 19; + + void _handleRequest(StreamMessage message) { + expect(message, new isInstanceOf()); + expect(message.endStream, false); + final data = new GrpcHttpDecoder().convert(message) as GrpcData; + expect(mockDecode(data.data), requestValue); + + harness + ..sendResponseHeader() + ..sendResponseValue(responseValue) + ..sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client.unary(requestValue), + expectedResult: responseValue, + expectedPath: '/Test/Unary', + serverHandlers: [_handleRequest], + ); + }); + + test('Client-streaming calls work end-to-end', () async { + const requests = const [17, 3]; + const response = 12; + + var index = 0; + + void handleRequest(StreamMessage message) { + expect(message, new isInstanceOf()); + expect(message.endStream, false); + final data = new GrpcHttpDecoder().convert(message) as GrpcData; + expect(mockDecode(data.data), requests[index++]); + } + + void handleDone() { + harness + ..sendResponseHeader() + ..sendResponseValue(response) + ..sendResponseTrailer(); + } + + await harness.runTest( + clientCall: + harness.client.clientStreaming(new Stream.fromIterable(requests)), + expectedResult: response, + expectedPath: '/Test/ClientStreaming', + serverHandlers: [handleRequest, handleRequest], + doneHandler: handleDone, + ); + }); + + test('Server-streaming calls work end-to-end', () async { + const request = 4; + const responses = const [3, 17, 9]; + + void handleRequest(StreamMessage message) { + expect(message, new isInstanceOf()); + expect(message.endStream, false); + final data = new GrpcHttpDecoder().convert(message) as GrpcData; + expect(mockDecode(data.data), request); + + harness.sendResponseHeader(); + responses.forEach(harness.sendResponseValue); + harness.sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client.serverStreaming(request).toList(), + expectedResult: responses, + expectedPath: '/Test/ServerStreaming', + serverHandlers: [handleRequest], + ); + }); + + test('Bidirectional calls work end-to-end', () async { + const requests = const [1, 15, 7]; + const responses = const [3, 17, 9]; + + var index = 0; + + void handleRequest(StreamMessage message) { + expect(message, new isInstanceOf()); + expect(message.endStream, false); + final data = new GrpcHttpDecoder().convert(message) as GrpcData; + expect(mockDecode(data.data), requests[index]); + + if (index == 0) { + harness.sendResponseHeader(); + } + harness.sendResponseValue(responses[index]); + index++; + } + + void handleDone() { + harness.sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client + .bidirectional(new Stream.fromIterable(requests)) + .toList(), + expectedResult: responses, + expectedPath: '/Test/Bidirectional', + serverHandlers: [handleRequest, handleRequest, handleRequest], + doneHandler: handleDone, + ); + }); + + test('Unary call with no response throws error', () async { + void handleRequest(_) { + harness.sendResponseTrailer(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: new GrpcError.unimplemented('No responses received'), + serverHandlers: [handleRequest], + ); + }); + + test('Unary call with more than one response throws error', () async { + void handleRequest(_) { + harness + ..sendResponseHeader() + ..sendResponseValue(dummyValue) + ..sendResponseValue(dummyValue) + ..sendResponseTrailer(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + new GrpcError.unimplemented('More than one response received'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if nothing is received', () async { + void handleRequest(_) { + harness.toClient.close(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: new GrpcError.unavailable('Did not receive anything'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if trailers are missing', () async { + void handleRequest(_) { + harness + ..sendResponseHeader() + ..sendResponseValue(dummyValue) + ..toClient.close(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: new GrpcError.unavailable('Missing trailers'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if data is received before headers', () async { + void handleRequest(_) { + harness.sendResponseValue(dummyValue); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + new GrpcError.unimplemented('Received data before headers'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if data is received after trailers', () async { + void handleRequest(_) { + harness + ..sendResponseHeader() + ..sendResponseTrailer(closeStream: false) + ..sendResponseValue(dummyValue); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + new GrpcError.unimplemented('Received data after trailers'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if multiple trailers are received', () async { + void handleRequest(_) { + harness + ..sendResponseHeader() + ..sendResponseTrailer(closeStream: false) + ..sendResponseTrailer(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + new GrpcError.unimplemented('Received multiple trailers'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if non-zero status is received', () async { + const customStatusCode = 17; + const customStatusMessage = 'Custom message'; + + void handleRequest(_) { + harness.toClient.add(new HeadersStreamMessage([ + new Header.ascii('grpc-status', '$customStatusCode'), + new Header.ascii('grpc-message', customStatusMessage) + ], endStream: true)); + harness.toClient.close(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + new GrpcError.custom(customStatusCode, customStatusMessage), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws on response stream errors', () async { + void handleRequest(_) { + harness.toClient.addError('Test error'); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: new GrpcError.unknown('Test error'), + serverHandlers: [handleRequest], + ); + }); + + test('Call forwards known response stream errors', () async { + final expectedException = new GrpcError.dataLoss('Oops!'); + + void handleRequest(_) { + harness.toClient.addError(expectedException); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: expectedException, + serverHandlers: [handleRequest], + ); + }); + + test('Connection errors are reported', () async { + reset(harness.channel); + when(harness.channel.connect()).thenThrow('Connection error'); + final expectedError = + new GrpcError.unavailable('Error connecting: Connection error'); + harness.expectThrows(harness.client.unary(dummyValue), expectedError); + harness.expectThrows( + harness.client.serverStreaming(dummyValue).toList(), expectedError); + }); + + test('Known request errors are reported', () async { + final expectedException = new GrpcError.deadlineExceeded('Too late!'); + + Stream requests() async* { + throw expectedException; + } + + await harness.runFailureTest( + clientCall: harness.client.clientStreaming(requests()), + expectedException: expectedException, + expectDone: false, + ); + }); + + test('Custom request errors are reported', () async { + Stream requests() async* { + throw 'Error'; + } + + final expectedException = new GrpcError.unknown('Error'); + await harness.runFailureTest( + clientCall: harness.client.clientStreaming(requests()), + expectedException: expectedException, + expectDone: false, + ); + }); +} diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart new file mode 100644 index 0000000..5378cb9 --- /dev/null +++ b/test/src/client_utils.dart @@ -0,0 +1,193 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:grpc/src/streams.dart'; +import 'package:http2/transport.dart'; +import 'package:test/test.dart'; +import 'package:mockito/mockito.dart'; + +import 'package:grpc/grpc.dart'; + +class MockConnection extends Mock implements ClientTransportConnection {} + +class MockStream extends Mock implements ClientTransportStream {} + +class MockChannel extends Mock implements ClientChannel {} + +typedef ServerMessageHandler = void Function(StreamMessage message); + +List mockEncode(int value) => new List.filled(value, 0); +int mockDecode(List value) => value.length; + +class TestClient { + final ClientChannel _channel; + + static final _$unary = + new ClientMethod('/Test/Unary', mockEncode, mockDecode); + static final _$clientStreaming = new ClientMethod( + '/Test/ClientStreaming', mockEncode, mockDecode); + static final _$serverStreaming = new ClientMethod( + '/Test/ServerStreaming', mockEncode, mockDecode); + static final _$bidirectional = + new ClientMethod('/Test/Bidirectional', mockEncode, mockDecode); + + TestClient(this._channel); + + ResponseFuture unary(int request, {CallOptions options}) { + final call = new ClientCall(_channel, _$unary, options: options); + call.request + ..add(request) + ..close(); + return new ResponseFuture(call); + } + + ResponseFuture clientStreaming(Stream request, + {CallOptions options}) { + final call = new ClientCall(_channel, _$clientStreaming, options: options); + request.pipe(call.request); + return new ResponseFuture(call); + } + + ResponseStream serverStreaming(int request, {CallOptions options}) { + final call = new ClientCall(_channel, _$serverStreaming, options: options); + call.request + ..add(request) + ..close(); + return new ResponseStream(call); + } + + ResponseStream bidirectional(Stream request, + {CallOptions options}) { + final call = new ClientCall(_channel, _$bidirectional, options: options); + request.pipe(call.request); + return new ResponseStream(call); + } +} + +class ClientHarness { + MockConnection connection; + MockChannel channel; + MockStream stream; + + StreamController fromClient; + StreamController toClient; + + TestClient client; + + void setUp() { + connection = new MockConnection(); + channel = new MockChannel(); + stream = new MockStream(); + fromClient = new StreamController(); + toClient = new StreamController(); + when(channel.host).thenReturn('test'); + when(channel.connect()).thenReturn(connection); + when(connection.makeRequest(any)).thenReturn(stream); + when(stream.outgoingMessages).thenReturn(fromClient.sink); + when(stream.incomingMessages).thenReturn(toClient.stream); + client = new TestClient(channel); + } + + void tearDown() { + fromClient.close(); + toClient.close(); + } + + void sendResponseHeader({List
headers = const []}) { + toClient.add(new HeadersStreamMessage(headers)); + } + + void sendResponseValue(int value) { + toClient + .add(new DataStreamMessage(GrpcHttpEncoder.frame(mockEncode(value)))); + } + + void sendResponseTrailer( + {List
headers = const [], bool closeStream = true}) { + toClient.add(new HeadersStreamMessage(headers, endStream: true)); + if (closeStream) toClient.close(); + } + + void validateHeaders(List
headers, + {String path, Map customHeaders}) { + final headerMap = new Map.fromIterable(headers, + key: (h) => ASCII.decode(h.name), value: (h) => ASCII.decode(h.value)); + expect(headerMap[':method'], 'POST'); + expect(headerMap[':scheme'], 'http'); + if (path != null) { + expect(headerMap[':path'], path); + } + expect(headerMap[':authority'], 'test'); + expect(headerMap['grpc-timeout'], isNull); + expect(headerMap['content-type'], 'application/grpc'); + expect(headerMap['te'], 'trailers'); + expect(headerMap['grpc-accept-encoding'], 'identity'); + expect(headerMap['user-agent'], startsWith('dart-grpc/')); + + customHeaders?.forEach((key, value) { + expect(headerMap[key], value); + }); + } + + Future runTest( + {Future clientCall, + dynamic expectedResult, + String expectedPath, + Map expectedCustomHeaders, + List serverHandlers = const [], + Function doneHandler, + bool expectDone = true}) async { + int serverHandlerIndex = 0; + void handleServerMessage(StreamMessage message) { + serverHandlers[serverHandlerIndex++](message); + } + + final clientSubscription = fromClient.stream.listen( + expectAsync1(handleServerMessage, count: serverHandlers.length), + onError: expectAsync1((_) {}, count: 0), + onDone: expectAsync0(doneHandler ?? () {}, count: expectDone ? 1 : 0)); + + final result = await clientCall; + if (expectedResult != null) { + expect(result, expectedResult); + } + + verify(channel.connect()).called(1); + + final List
capturedHeaders = + verify(connection.makeRequest(captureAny)).captured.single; + validateHeaders(capturedHeaders, + path: expectedPath, customHeaders: expectedCustomHeaders); + + await clientSubscription.cancel(); + } + + Future expectThrows(Future future, dynamic exception) async { + try { + await future; + fail('Did not throw'); + } catch (e) { + expect(e, exception); + } + } + + Future runFailureTest( + {Future clientCall, + dynamic expectedException, + String expectedPath, + Map expectedCustomHeaders, + List serverHandlers = const [], + bool expectDone = true}) async { + return runTest( + clientCall: expectThrows(clientCall, expectedException), + expectedPath: expectedPath, + expectedCustomHeaders: expectedCustomHeaders, + serverHandlers: serverHandlers, + expectDone: expectDone, + ); + } +} diff --git a/test/stream_test.dart b/test/stream_test.dart index 991a9b3..63b8c0c 100644 --- a/test/stream_test.dart +++ b/test/stream_test.dart @@ -20,18 +20,6 @@ void main() { output = input.stream.transform(new GrpcHttpDecoder()); }); - test('throws error if data is received before headers', () async { - input.add(new DataStreamMessage([0])); - input.close(); - try { - await output.toList(); - fail('Did not throw exception'); - } on GrpcError catch (e) { - expect(e.code, StatusCode.unimplemented); - expect(e.message, 'Received data before header metadata'); - } - }); - test('converts chunked data correctly', () async { final result = output.toList(); input @@ -60,21 +48,6 @@ void main() { verify(converted[5], new List.filled(256, 90)); }); - test('throws error if data is received after trailers', () async { - final result = output.toList(); - input - ..add(new HeadersStreamMessage([])) - ..add(new HeadersStreamMessage([])) - ..add(new DataStreamMessage([])); - try { - await result; - fail('Did not throw'); - } on GrpcError catch (e) { - expect(e.code, StatusCode.unimplemented); - expect(e.message, 'Received data after trailer metadata'); - } - }); - test('throws error if input is closed while receiving data header', () async { final result = output.toList();