diff --git a/CHANGELOG.md b/CHANGELOG.md index 846485d..66e307f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 2.6.0 + +* Create gRPC servers and clients with [Server|Client]TransportConnnection. + This allows callers to propvide their own transport configuration, such + as their own implementation of streams and sinks instead of sockets. + ## 2.5.0 * Expose a `validateClient` method for server credentials so gRPC server diff --git a/lib/grpc.dart b/lib/grpc.dart index ffe8e99..8c678fc 100644 --- a/lib/grpc.dart +++ b/lib/grpc.dart @@ -27,9 +27,12 @@ export 'src/auth/auth_io.dart' export 'src/client/call.dart' show CallOptions, ClientCall, MetadataProvider; export 'src/client/client.dart' show Client; +export 'src/client/client_transport_connector.dart' + show ClientTransportConnector; export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture; export 'src/client/connection.dart' show ConnectionState; -export 'src/client/http2_channel.dart' show ClientChannel; +export 'src/client/http2_channel.dart' + show ClientChannel, ClientTransportConnectorChannel; export 'src/client/method.dart' show ClientMethod; export 'src/client/options.dart' show @@ -48,6 +51,7 @@ export 'src/server/server.dart' ServerCredentials, ServerLocalCredentials, ServerTlsCredentials, + ConnectionServer, Server; export 'src/server/service.dart' show ServiceMethod, Service; diff --git a/lib/src/client/client_transport_connector.dart b/lib/src/client/client_transport_connector.dart new file mode 100644 index 0000000..08bfa03 --- /dev/null +++ b/lib/src/client/client_transport_connector.dart @@ -0,0 +1,18 @@ +import 'dart:async'; + +import 'package:http2/transport.dart'; + +/// A transport-specific configuration used by gRPC clients to connect. +abstract class ClientTransportConnector { + /// Creates a HTTP/2 client connection. + Future connect(); + + /// A future that completes when the client closes or when an error occurs. + Future get done; + + /// Shuts down the connection, which should at least close the client. + void shutdown(); + + /// Header populated from any credentials or hostname information. + String get authority; +} diff --git a/lib/src/client/http2_channel.dart b/lib/src/client/http2_channel.dart index d947f02..d6db16e 100644 --- a/lib/src/client/http2_channel.dart +++ b/lib/src/client/http2_channel.dart @@ -14,6 +14,7 @@ // limitations under the License. import 'channel.dart'; +import 'client_transport_connector.dart'; import 'connection.dart'; import 'http2_connection.dart' show Http2ClientConnection; import 'options.dart'; @@ -37,3 +38,17 @@ class ClientChannel extends ClientChannelBase { return Http2ClientConnection(host, port, options); } } + +class ClientTransportConnectorChannel extends ClientChannelBase { + final ClientTransportConnector transportConnector; + final ChannelOptions options; + + ClientTransportConnectorChannel(this.transportConnector, + {this.options = const ChannelOptions()}); + + @override + ClientConnection createConnection() { + return Http2ClientConnection.fromClientTransportConnector( + transportConnector, options); + } +} diff --git a/lib/src/client/http2_connection.dart b/lib/src/client/http2_connection.dart index 235419a..c82b9de 100644 --- a/lib/src/client/http2_connection.dart +++ b/lib/src/client/http2_connection.dart @@ -23,6 +23,7 @@ import 'package:meta/meta.dart'; import '../shared/timeout.dart'; import 'call.dart'; +import 'client_transport_connector.dart'; import 'connection.dart' hide ClientConnection; import 'connection.dart' as connection; @@ -49,6 +50,7 @@ class Http2ClientConnection implements connection.ClientConnection { void Function(Http2ClientConnection connection) onStateChanged; final _pendingCalls = []; + final ClientTransportConnector _transportConnector; ClientTransportConnection _transportConnection; /// Used for idle and reconnect timeout, depending on [_state]. @@ -59,15 +61,15 @@ class Http2ClientConnection implements connection.ClientConnection { Duration _currentReconnectDelay; - final String host; - final int port; + Http2ClientConnection(String host, int port, this.options) + : _transportConnector = _SocketTransportConnector(host, port, options); - Http2ClientConnection(this.host, this.port, this.options); + Http2ClientConnection.fromClientTransportConnector( + this._transportConnector, this.options); ChannelCredentials get credentials => options.credentials; - String get authority => - options.credentials.authority ?? (port == 443 ? host : "$host:$port"); + String get authority => _transportConnector.authority; String get scheme => options.credentials.isSecure ? 'https' : 'http'; @@ -75,36 +77,9 @@ class Http2ClientConnection implements connection.ClientConnection { static const _estimatedRoundTripTime = const Duration(milliseconds: 20); - Future _createSocket() async { - final securityContext = credentials.securityContext; - if (securityContext == null) { - return Socket.connect(host, port); - } else { - if (options.credentials.authority == null) { - return SecureSocket.connect(host, port, - context: securityContext, - onBadCertificate: _validateBadCertificate); - } else { - // Todo(sigurdm): We want to pass supportedProtocols: ['h2']. http://dartbug.com/37950 - return SecureSocket.secure(await Socket.connect(host, port), - // This is not really the host, but the authority to verify the TLC - // connection against. - // - // We don't use `this.authority` here, as that includes the port. - host: options.credentials.authority, - context: securityContext, - onBadCertificate: _validateBadCertificate); - } - } - } - Future connectTransport() async { - final Socket socket = await _createSocket(); - // Don't wait for io buffers to fill up before sending requests. - socket.setOption(SocketOption.tcpNoDelay, true); - - final connection = ClientTransportConnection.viaSocket(socket); - socket.done.then((_) => _abandonConnection()); + final connection = await _transportConnector.connect(); + _transportConnector.done.then((_) => _abandonConnection()); // Give the settings settings-frame a bit of time to arrive. // TODO(sigurdm): This is a hack. The http2 package should expose a way of @@ -112,7 +87,7 @@ class Http2ClientConnection implements connection.ClientConnection { await new Future.delayed(_estimatedRoundTripTime); if (_state == ConnectionState.shutdown) { - socket.destroy(); + _transportConnector.shutdown(); throw _ShutdownException(); } return connection; @@ -180,8 +155,8 @@ class Http2ClientConnection implements connection.ClientConnection { GrpcTransportStream makeRequest(String path, Duration timeout, Map metadata, ErrorHandler onRequestFailure, {CallOptions callOptions}) { - final headers = createCallHeaders( - credentials.isSecure, authority, path, timeout, metadata, + final headers = createCallHeaders(credentials.isSecure, + _transportConnector.authority, path, timeout, metadata, userAgent: options.userAgent); final stream = _transportConnection.makeRequest(headers); return Http2TransportStream(stream, onRequestFailure); @@ -320,9 +295,57 @@ class Http2ClientConnection implements connection.ClientConnection { }); return headers; } +} + +class _SocketTransportConnector implements ClientTransportConnector { + final String _host; + final int _port; + final ChannelOptions _options; + Socket _socket; + + _SocketTransportConnector(this._host, this._port, this._options); + + @override + Future connect() async { + final securityContext = _options.credentials.securityContext; + _socket = await Socket.connect(_host, _port); + // Don't wait for io buffers to fill up before sending requests. + _socket.setOption(SocketOption.tcpNoDelay, true); + if (securityContext != null) { + // Todo(sigurdm): We want to pass supportedProtocols: ['h2']. + // http://dartbug.com/37950 + _socket = await SecureSocket.secure(_socket, + // This is not really the host, but the authority to verify the TLC + // connection against. + // + // We don't use `this.authority` here, as that includes the port. + host: _options.credentials.authority ?? _host, + context: securityContext, + onBadCertificate: _validateBadCertificate); + } + + return ClientTransportConnection.viaSocket(_socket); + } + + @override + String get authority => + _options.credentials.authority ?? + (_port == 443 ? _host : "$_host:$_port"); + + @override + Future get done { + assert(_socket != null); + return _socket.done; + } + + @override + void shutdown() { + assert(_socket != null); + _socket.destroy(); + } bool _validateBadCertificate(X509Certificate certificate) { - final credentials = this.credentials; + final credentials = _options.credentials; final validator = credentials.onBadCertificate; if (validator == null) return false; diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index 48f151e..3f13a31 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -77,19 +77,18 @@ class ServerTlsCredentials extends ServerCredentials { bool validateClient(Socket socket) => true; } -/// A gRPC server. +/// A gRPC server that serves via provided [ServerTransportConnection]s. /// -/// Listens for incoming RPCs, dispatching them to the right [Service] handler. -class Server { +/// Unlike [Server], the caller has the responsibility of configuring and +/// managing the connection from a client. +class ConnectionServer { final Map _services = {}; final List _interceptors; - ServerSocket _insecureServer; - SecureServerSocket _secureServer; final _connections = []; /// Create a server for the given [services]. - Server(List services, + ConnectionServer(List services, [List interceptors = const []]) : _interceptors = interceptors { for (final service in services) { @@ -97,6 +96,45 @@ class Server { } } + Service lookupService(String service) => _services[service]; + + Future serveConnection(ServerTransportConnection connection) async { + _connections.add(connection); + ServerHandler_ handler; + // TODO(jakobr): Set active state handlers, close connection after idle + // timeout. + connection.incomingStreams.listen((stream) { + handler = serveStream_(stream); + }, onError: (error) { + print('Connection error: $error'); + }, onDone: () { + // TODO(sigurdm): This is not correct behavior in the presence of + // half-closed tcp streams. + // Half-closed streams seems to not be fully supported by package:http2. + // https://github.com/dart-lang/http2/issues/42 + handler?.cancel(); + _connections.remove(connection); + }); + } + + @visibleForTesting + ServerHandler_ serveStream_(ServerTransportStream stream) { + return ServerHandler_(lookupService, stream, _interceptors)..handle(); + } +} + +/// A gRPC server. +/// +/// Listens for incoming RPCs, dispatching them to the right [Service] handler. +class Server extends ConnectionServer { + ServerSocket _insecureServer; + SecureServerSocket _secureServer; + + /// Create a server for the given [services]. + Server(List services, + [List interceptors = const []]) + : super(services, interceptors); + /// The port that the server is listening on, or `null` if the server is not /// active. int get port { @@ -105,8 +143,6 @@ class Server { return null; } - Service lookupService(String service) => _services[service]; - Future serve( {dynamic address, int port, @@ -138,27 +174,7 @@ class Server { socket.setOption(SocketOption.tcpNoDelay, true); final connection = ServerTransportConnection.viaSocket(socket, settings: http2ServerSettings); - _connections.add(connection); - if (security != null && !security.validateClient(socket)) { - _printSocketError( - 'cannot serve $address:$port - unable to validate client socket'); - return socket.close(); - } - ServerHandler_ handler; - // TODO(jakobr): Set active state handlers, close connection after idle - // timeout. - connection.incomingStreams.listen((stream) { - handler = serveStream_(stream); - }, onError: (error) { - print('Connection error: $error'); - }, onDone: () { - // TODO(sigurdm): This is not correct behavior in the presence of - // half-closed tcp streams. - // Half-closed streams seems to not be fully supported by package:http2. - // https://github.com/dart-lang/http2/issues/42 - handler?.cancel(); - _connections.remove(connection); - }); + serveConnection(connection); }, onError: _printSocketError); } diff --git a/pubspec.yaml b/pubspec.yaml index 9f01a00..8e6a18c 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: grpc description: Dart implementation of gRPC, a high performance, open-source universal RPC framework. -version: 2.5.0 +version: 2.6.0 author: Dart Team homepage: https://github.com/dart-lang/grpc-dart diff --git a/test/client_tests/client_transport_connector_test.dart b/test/client_tests/client_transport_connector_test.dart new file mode 100644 index 0000000..c0dd437 --- /dev/null +++ b/test/client_tests/client_transport_connector_test.dart @@ -0,0 +1,420 @@ +// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +import 'package:grpc/grpc.dart'; +import 'package:grpc/src/client/http2_connection.dart'; +import 'package:http2/transport.dart'; +import 'package:test/test.dart'; + +import '../src/client_utils.dart'; +import '../src/utils.dart'; + +void main() { + const dummyValue = 0; + + ClientTransportConnectorHarness harness; + + setUp(() { + harness = ClientTransportConnectorHarness()..setUp(); + }); + + tearDown(() { + harness.tearDown(); + }); + + test('Unary calls work on the client', () async { + const requestValue = 17; + const responseValue = 19; + + void handleRequest(StreamMessage message) { + final data = validateDataMessage(message); + expect(mockDecode(data.data), requestValue); + + harness + ..sendResponseHeader() + ..sendResponseValue(responseValue) + ..sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client.unary(requestValue), + expectedResult: responseValue, + expectedPath: '/Test/Unary', + serverHandlers: [handleRequest], + ); + }); + + test('Client-streaming calls work on the client', () async { + const requests = [17, 3]; + const response = 12; + + var index = 0; + + void handleRequest(StreamMessage message) { + final data = validateDataMessage(message); + expect(mockDecode(data.data), requests[index++]); + } + + void handleDone() { + harness + ..sendResponseHeader() + ..sendResponseValue(response) + ..sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client.clientStreaming(Stream.fromIterable(requests)), + expectedResult: response, + expectedPath: '/Test/ClientStreaming', + serverHandlers: [handleRequest, handleRequest], + doneHandler: handleDone, + ); + }); + + test('Server-streaming calls work on the client', () async { + const request = 4; + const responses = [3, 17, 9]; + + void handleRequest(StreamMessage message) { + final data = validateDataMessage(message); + expect(mockDecode(data.data), request); + + harness.sendResponseHeader(); + responses.forEach(harness.sendResponseValue); + harness.sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client.serverStreaming(request).toList(), + expectedResult: responses, + expectedPath: '/Test/ServerStreaming', + serverHandlers: [handleRequest], + ); + }); + + test('Bidirectional calls work on the client', () async { + const requests = [1, 15, 7]; + const responses = [3, 17, 9]; + + var index = 0; + + void handleRequest(StreamMessage message) { + final data = validateDataMessage(message); + expect(mockDecode(data.data), requests[index]); + + if (index == 0) { + harness.sendResponseHeader(); + } + harness.sendResponseValue(responses[index]); + index++; + } + + void handleDone() { + harness.sendResponseTrailer(); + } + + await harness.runTest( + clientCall: + harness.client.bidirectional(Stream.fromIterable(requests)).toList(), + expectedResult: responses, + expectedPath: '/Test/Bidirectional', + serverHandlers: [handleRequest, handleRequest, handleRequest], + doneHandler: handleDone, + ); + }); + + test('Unary call with no response throws error', () async { + void handleRequest(_) { + harness.sendResponseTrailer(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: GrpcError.unimplemented('No responses received'), + serverHandlers: [handleRequest], + ); + }); + + test('Unary call with more than one response throws error', () async { + void handleRequest(_) { + harness + ..sendResponseHeader() + ..sendResponseValue(dummyValue) + ..sendResponseValue(dummyValue) + ..sendResponseTrailer(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + GrpcError.unimplemented('More than one response received'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if nothing is received', () async { + void handleRequest(_) { + harness.toClient.close(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: GrpcError.unavailable('Did not receive anything'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if trailers are missing', () async { + void handleRequest(_) { + harness + ..sendResponseHeader() + ..sendResponseValue(dummyValue) + ..toClient.close(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: GrpcError.unavailable('Missing trailers'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if data is received before headers', () async { + void handleRequest(_) { + harness.sendResponseValue(dummyValue); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + GrpcError.unimplemented('Received data before headers'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if data is received after trailers', () async { + void handleRequest(_) { + harness + ..sendResponseHeader() + ..sendResponseTrailer(closeStream: false) + ..sendResponseValue(dummyValue); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + GrpcError.unimplemented('Received data after trailers'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if multiple trailers are received', () async { + void handleRequest(_) { + harness + ..sendResponseHeader() + ..sendResponseTrailer(closeStream: false) + ..sendResponseTrailer(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: GrpcError.unimplemented('Received multiple trailers'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if non-zero status is received', () async { + const customStatusCode = 17; + const customStatusMessage = 'Custom message'; + + void handleRequest(_) { + harness.toClient.add(HeadersStreamMessage([ + Header.ascii('grpc-status', '$customStatusCode'), + Header.ascii('grpc-message', customStatusMessage) + ], endStream: true)); + harness.toClient.close(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + GrpcError.custom(customStatusCode, customStatusMessage), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws on response stream errors', () async { + void handleRequest(_) { + harness.toClient.addError('Test error'); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: GrpcError.unknown('Test error'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if unable to decode response', () async { + const responseValue = 19; + + void handleRequest(StreamMessage message) { + harness + ..sendResponseHeader() + ..sendResponseValue(responseValue) + ..sendResponseTrailer(); + } + + harness.client = TestClient(harness.channel, decode: (bytes) { + throw "error decoding"; + }); + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: GrpcError.dataLoss('Error parsing response'), + serverHandlers: [handleRequest], + ); + }); + + test('Call forwards known response stream errors', () async { + final expectedException = GrpcError.dataLoss('Oops!'); + + void handleRequest(_) { + harness.toClient.addError(expectedException); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: expectedException, + serverHandlers: [handleRequest], + ); + }); + + test('Known request errors are reported', () async { + final expectedException = GrpcError.deadlineExceeded('Too late!'); + + Stream requests() async* { + throw expectedException; + } + + await harness.runFailureTest( + clientCall: harness.client.clientStreaming(requests()), + expectedException: expectedException, + expectDone: false, + ); + }); + + test('Custom request errors are reported', () async { + Stream requests() async* { + throw 'Error'; + } + + final expectedException = GrpcError.unknown('Error'); + await harness.runFailureTest( + clientCall: harness.client.clientStreaming(requests()), + expectedException: expectedException, + expectDone: false, + ); + }); + + Future makeUnaryCall() async { + void handleRequest(StreamMessage message) { + harness + ..sendResponseHeader() + ..sendResponseValue(1) + ..sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client.unary(1), + expectedResult: 1, + expectedPath: '/Test/Unary', + serverHandlers: [handleRequest], + ); + } + + test('Connection errors are reported', () async { + final connectionStates = []; + harness.connection.connectionError = 'Connection error'; + harness.connection.onStateChanged = (connection) { + final state = connection.state; + connectionStates.add(state); + }; + + final expectedException = + GrpcError.unavailable('Error connecting: Connection error'); + + await harness.expectThrows( + harness.client.unary(dummyValue), expectedException); + + expect( + connectionStates, [ConnectionState.connecting, ConnectionState.idle]); + }); + + test('Connections time out if idle', () async { + final done = Completer(); + final connectionStates = []; + harness.connection.onStateChanged = (connection) { + final state = connection.state; + connectionStates.add(state); + if (state == ConnectionState.idle) done.complete(); + }; + + harness.channelOptions.idleTimeout = const Duration(microseconds: 10); + + await makeUnaryCall(); + harness.signalIdle(); + expect( + connectionStates, [ConnectionState.connecting, ConnectionState.ready]); + await done.future; + expect(connectionStates, [ + ConnectionState.connecting, + ConnectionState.ready, + ConnectionState.idle + ]); + }); + + test('Default reconnect backoff backs off', () { + Duration lastBackoff = defaultBackoffStrategy(null); + expect(lastBackoff, const Duration(seconds: 1)); + for (int i = 0; i < 12; i++) { + final minNext = lastBackoff * (1.6 - 0.2); + final maxNext = lastBackoff * (1.6 + 0.2); + lastBackoff = defaultBackoffStrategy(lastBackoff); + if (lastBackoff != const Duration(minutes: 2)) { + expect(lastBackoff, greaterThanOrEqualTo(minNext)); + expect(lastBackoff, lessThanOrEqualTo(maxNext)); + } + } + expect(lastBackoff, const Duration(minutes: 2)); + expect(defaultBackoffStrategy(lastBackoff), const Duration(minutes: 2)); + }); + + test('authority is computed correctly', () { + final emptyOptions = ChannelOptions(); + expect(Http2ClientConnection('localhost', 8080, emptyOptions).authority, + 'localhost:8080'); + expect(Http2ClientConnection('localhost', 443, emptyOptions).authority, + 'localhost'); + final channelOptions = ChannelOptions( + credentials: ChannelCredentials.insecure(authority: 'myauthority.com')); + expect(Http2ClientConnection('localhost', 8080, channelOptions).authority, + 'myauthority.com'); + expect(Http2ClientConnection('localhost', null, channelOptions).authority, + 'myauthority.com'); + }); +} diff --git a/test/connection_server_test.dart b/test/connection_server_test.dart new file mode 100644 index 0000000..0073bbb --- /dev/null +++ b/test/connection_server_test.dart @@ -0,0 +1,372 @@ +// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +@TestOn('vm') + +import 'dart:async'; + +import 'package:grpc/grpc.dart'; +import 'package:http2/transport.dart'; +import 'package:test/test.dart'; + +import 'src/server_utils.dart'; + +void main() { + const dummyValue = 17; + + ConnectionServerHarness harness; + + setUp(() { + harness = ConnectionServerHarness()..setUp(); + }); + + tearDown(() { + harness.tearDown(); + }); + + test('Unary calls work on the server', () async { + const expectedRequest = 5; + const expectedResponse = 7; + Future methodHandler(ServiceCall call, Future request) async { + expect(await request, expectedRequest); + return expectedResponse; + } + + harness + ..service.unaryHandler = methodHandler + ..runTest('/Test/Unary', [expectedRequest], [expectedResponse]); + await harness.fromServer.done; + }); + + test('Client-streaming calls work on the server', () async { + const expectedRequests = [5, 3, 17]; + const expectedResponse = 12; + Future methodHandler(ServiceCall call, Stream request) async { + expect(await request.toList(), expectedRequests); + return expectedResponse; + } + + harness + ..service.clientStreamingHandler = methodHandler + ..runTest('/Test/ClientStreaming', expectedRequests, [expectedResponse]); + await harness.fromServer.done; + }); + + test('Server-streaming calls work on the server', () async { + const expectedRequest = 5; + const expectedResponses = [7, 9, 1]; + + Stream methodHandler(ServiceCall call, Future request) async* { + expect(await request, expectedRequest); + for (final value in expectedResponses) { + yield value; + } + } + + harness + ..service.serverStreamingHandler = methodHandler + ..runTest('/Test/ServerStreaming', [expectedRequest], expectedResponses); + await harness.fromServer.done; + }); + + test('Bidirectional calls work on the server', () async { + const expectedRequests = [3, 1, 7]; + final expectedResponses = expectedRequests.map((v) => v + 5).toList(); + + Stream methodHandler(ServiceCall call, Stream request) async* { + yield* request.map((value) => value + 5); + } + + harness + ..service.bidirectionalHandler = methodHandler + ..runTest('/Test/Bidirectional', expectedRequests, expectedResponses); + await harness.fromServer.done; + }); + + test('Server returns error on missing call header', () async { + harness + ..expectErrorResponse(StatusCode.unimplemented, 'Expected header frame') + ..sendData(dummyValue); + await harness.fromServer.done; + }); + + test('Server returns error on invalid path', () async { + harness + ..expectErrorResponse(StatusCode.unimplemented, 'Invalid path') + ..sendRequestHeader('InvalidPath'); + await harness.fromServer.done; + }); + + test('Server returns error on unimplemented path', () async { + harness + ..expectErrorResponse( + StatusCode.unimplemented, 'Path /Test/NotFound not found') + ..sendRequestHeader('/Test/NotFound'); + await harness.fromServer.done; + }); + + /// Returns a service method handler that verifies that awaiting the request + /// throws a specific error. + Future Function(ServiceCall call, Future request) expectError( + expectedError) { + return expectAsync2((ServiceCall call, Future request) async { + try { + final result = await request; + registerException('Did not throw'); + return result; + } catch (caughtError) { + try { + expect(caughtError, expectedError); + } catch (failure, stack) { + registerException(failure, stack); + } + rethrow; + } + }, count: 1); + } + + /// Returns a service method handler that verifies that awaiting the request + /// stream throws a specific error. + Stream Function(ServiceCall call, Stream request) + expectErrorStreaming(expectedError) { + return (ServiceCall call, Stream request) async* { + try { + await for (var entry in request) { + yield entry; + } + registerException('Did not throw'); + } catch (caughtError) { + try { + expect(caughtError, expectedError); + } catch (failure, stack) { + registerException(failure, stack); + } + rethrow; + } + }; + } + + test('Server returns error on missing request for unary call', () async { + harness + ..service.unaryHandler = + expectError(GrpcError.unimplemented('No request received')) + ..expectErrorResponse(StatusCode.unimplemented, 'No request received') + ..sendRequestHeader('/Test/Unary') + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Server returns error if multiple headers are received for unary call', + () async { + harness + ..service.unaryHandler = + expectError(GrpcError.unimplemented('Expected request')) + ..expectErrorResponse(StatusCode.unimplemented, 'Expected request') + ..sendRequestHeader('/Test/Unary') + ..toServer.add(HeadersStreamMessage([])) + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Server returns error on too many requests for unary call', () async { + harness + ..service.unaryHandler = + expectError(GrpcError.unimplemented('Too many requests')) + ..expectErrorResponse(StatusCode.unimplemented, 'Too many requests') + ..sendRequestHeader('/Test/Unary') + ..sendData(dummyValue) + ..sendData(dummyValue) + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Server returns request deserialization errors', () async { + harness + ..service.bidirectionalHandler = expectErrorStreaming( + GrpcError.internal('Error deserializing request: Failed')) + ..expectErrorResponse( + StatusCode.internal, 'Error deserializing request: Failed') + ..sendRequestHeader('/Test/RequestError') + ..sendData(dummyValue) + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Server returns response serialization errors', () async { + harness + ..service.bidirectionalHandler = expectErrorStreaming( + GrpcError.internal('Error sending response: Failed')) + ..expectErrorResponse( + StatusCode.internal, 'Error sending response: Failed') + ..sendRequestHeader('/Test/ResponseError') + ..sendData(dummyValue) + ..sendData(dummyValue) + ..toServer.close(); + await harness.fromServer.done; + }); + + test('Header can only be sent once', () async { + Future methodHandler(ServiceCall call, Future request) async { + call.sendHeaders(); + call.sendHeaders(); + return await request; + } + + harness + ..service.unaryHandler = methodHandler + ..expectTrailingErrorResponse(StatusCode.internal, 'Headers already sent') + ..sendRequestHeader('/Test/Unary'); + await harness.fromServer.done; + }); + + test('Server receives cancel', () async { + final success = Completer(); + + Future methodHandler(ServiceCall call, Future request) async { + try { + final result = await request; + registerException('Did not throw'); + return result; + } catch (caughtError) { + try { + expect(caughtError, GrpcError.cancelled('Cancelled')); + expect(call.isCanceled, isTrue); + success.complete(true); + } catch (failure, stack) { + registerException(failure, stack); + } + } finally { + if (!success.isCompleted) { + success.complete(false); + } + } + return dummyValue; + } + + harness + ..service.unaryHandler = methodHandler + ..fromServer.stream.listen(expectAsync1((_) {}, count: 0), + onError: expectAsync1((error) { + expect(error, 'TERMINATED'); + }, count: 1), + onDone: expectAsync0(() {}, count: 1)) + ..sendRequestHeader('/Test/Unary') + ..toServer.addError('CANCEL'); + + expect(await success.future, isTrue); + await harness.toServer.close(); + await harness.fromServer.done; + }); + + test( + 'Server returns error if request stream is closed before sending anything', + () async { + harness + ..expectErrorResponse( + StatusCode.unavailable, 'Request stream closed unexpectedly') + ..toServer.close(); + await harness.fromServer.done; + }); + + group('Server with interceptor', () { + group('processes calls if interceptor allows request', () { + const expectedRequest = 5; + const expectedResponse = 7; + Future methodHandler(ServiceCall call, Future request) async { + expect(await request, expectedRequest); + return expectedResponse; + } + + final Interceptor interceptor = (call, method) { + if (method.name == "Unary") { + return null; + } + return GrpcError.unauthenticated('Request is unauthenticated'); + }; + + Future doTest(Interceptor handler) async { + harness + ..interceptor.handler = handler + ..service.unaryHandler = methodHandler + ..runTest('/Test/Unary', [expectedRequest], [expectedResponse]); + + await harness.fromServer.done; + } + + test('with sync interceptor', () => doTest(interceptor)); + test('with async interceptor', + () => doTest((call, method) async => interceptor(call, method))); + }); + + group('returns error if interceptor blocks request', () { + final Interceptor interceptor = (call, method) { + if (method.name == "Unary") { + return GrpcError.unauthenticated('Request is unauthenticated'); + } + return null; + }; + + Future doTest(Interceptor handler) async { + harness + ..interceptor.handler = handler + ..expectErrorResponse( + StatusCode.unauthenticated, 'Request is unauthenticated') + ..sendRequestHeader('/Test/Unary'); + + await harness.fromServer.done; + } + + test('with sync interceptor', () => doTest(interceptor)); + test('with async interceptor', + () => doTest((call, method) async => interceptor(call, method))); + }); + + group('returns internal error if interceptor throws exception', () { + final Interceptor interceptor = (call, method) { + throw Exception('Reason is unknown'); + }; + + Future doTest(Interceptor handler) async { + harness + ..interceptor.handler = handler + ..expectErrorResponse( + StatusCode.internal, 'Exception: Reason is unknown') + ..sendRequestHeader('/Test/Unary'); + + await harness.fromServer.done; + } + + test('with sync interceptor', () => doTest(interceptor)); + test('with async interceptor', + () => doTest((call, method) async => interceptor(call, method))); + }); + + test("don't fail if interceptor await 2 times", () async { + final Interceptor interceptor = (call, method) async { + await Future.value(); + await Future.value(); + throw Exception('Reason is unknown'); + }; + + harness + ..interceptor.handler = interceptor + ..expectErrorResponse( + StatusCode.internal, 'Exception: Reason is unknown') + ..sendRequestHeader('/Test/Unary') + ..sendData(1); + + await harness.fromServer.done; + }); + }); +} diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index c5a6c20..a882b6c 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -16,6 +16,7 @@ import 'dart:async'; import 'dart:convert'; +import 'package:grpc/src/client/channel.dart' as base; import 'package:grpc/src/client/http2_connection.dart'; import 'package:grpc/src/shared/message.dart'; import 'package:http2/transport.dart'; @@ -45,6 +46,21 @@ class FakeConnection extends Http2ClientConnection { } } +class FakeClientTransportConnection extends Http2ClientConnection { + final ClientTransportConnector connector; + + var connectionError; + + FakeClientTransportConnection(this.connector, ChannelOptions options) + : super.fromClientTransportConnector(connector, options); + + @override + Future connectTransport() async { + if (connectionError != null) throw connectionError; + return await connector.connect(); + } +} + Duration testBackoff(Duration lastBackoff) => const Duration(milliseconds: 1); class FakeChannelOptions implements ChannelOptions { @@ -66,6 +82,18 @@ class FakeChannel extends ClientChannel { Future getConnection() async => connection; } +class FakeClientConnectorChannel extends ClientTransportConnectorChannel { + final Http2ClientConnection connection; + final FakeChannelOptions options; + + FakeClientConnectorChannel( + ClientTransportConnector connector, this.connection, this.options) + : super(connector, options: options); + + @override + Future getConnection() async => connection; +} + typedef ServerMessageHandler = void Function(StreamMessage message); class TestClient extends Client { @@ -76,7 +104,7 @@ class TestClient extends Client { final int Function(List value) decode; - TestClient(ClientChannel channel, + TestClient(base.ClientChannel channel, {CallOptions options, this.decode: mockDecode}) : super(channel, options: options) { _$unary = ClientMethod('/Test/Unary', mockEncode, decode); @@ -113,10 +141,56 @@ class TestClient extends Client { } } -class ClientHarness { - MockTransport transport; +class ClientHarness extends _Harness { FakeConnection connection; - FakeChannel channel; + + @override + FakeChannel createChannel() { + connection = FakeConnection('test', transport, channelOptions); + return FakeChannel('test', connection, channelOptions); + } + + @override + String get expectedAuthority => 'test'; +} + +class ClientTransportConnectorHarness extends _Harness { + FakeClientTransportConnection connection; + ClientTransportConnector connector; + + @override + FakeClientConnectorChannel createChannel() { + connector = FakeClientTransportConnector(transport); + connection = FakeClientTransportConnection(connector, channelOptions); + return FakeClientConnectorChannel(connector, connection, channelOptions); + } + + @override + String get expectedAuthority => 'test'; +} + +class FakeClientTransportConnector extends ClientTransportConnector { + final ClientTransportConnection _transportConnection; + final completer = Completer(); + + FakeClientTransportConnector(this._transportConnection); + + @override + Future connect() async => _transportConnection; + + @override + String get authority => 'test'; + + @override + Future get done => completer.future; + + @override + void shutdown() => completer.complete(); +} + +abstract class _Harness { + MockTransport transport; + base.ClientChannel channel; FakeChannelOptions channelOptions; MockStream stream; @@ -125,11 +199,14 @@ class ClientHarness { TestClient client; + base.ClientChannel createChannel(); + + String get expectedAuthority; + void setUp() { transport = MockTransport(); channelOptions = FakeChannelOptions(); - connection = FakeConnection('test', transport, channelOptions); - channel = FakeChannel('test', connection, channelOptions); + channel = createChannel(); stream = MockStream(); fromClient = StreamController(); toClient = StreamController(); @@ -198,6 +275,7 @@ class ClientHarness { Map.fromEntries(capturedHeaders.map((header) => MapEntry(utf8.decode(header.name), utf8.decode(header.value)))), path: expectedPath, + authority: expectedAuthority, timeout: toTimeoutString(expectedTimeout), customHeaders: expectedCustomHeaders); diff --git a/test/src/server_utils.dart b/test/src/server_utils.dart index eb37a64..691d85b 100644 --- a/test/src/server_utils.dart +++ b/test/src/server_utils.dart @@ -116,23 +116,40 @@ class TestServerStream extends ServerTransportStream { ServerTransportStream push(List
requestHeaders) => null; } -class ServerHarness { - final toServer = StreamController(); - final fromServer = StreamController(); - final service = TestService(); - final interceptor = TestInterceptor(); - - Server server; - - ServerHarness() { - server = Server([service], [interceptor]); - } +class ServerHarness extends _Harness { + @override + ConnectionServer createServer() => + Server([service], [interceptor]); static ServiceMethod createMethod(String name, Function methodHandler, bool clientStreaming, bool serverStreaming) { return ServiceMethod(name, methodHandler, clientStreaming, serverStreaming, mockDecode, mockEncode); } +} + +class ConnectionServerHarness extends _Harness { + @override + ConnectionServer createServer() => + ConnectionServer([service], [interceptor]); + + static ServiceMethod createMethod(String name, + Function methodHandler, bool clientStreaming, bool serverStreaming) { + return ServiceMethod(name, methodHandler, clientStreaming, + serverStreaming, mockDecode, mockEncode); + } +} + +abstract class _Harness { + final toServer = StreamController(); + final fromServer = StreamController(); + final service = TestService(); + final interceptor = TestInterceptor(); + ConnectionServer _server; + + ConnectionServer createServer(); + + ConnectionServer get server => _server ??= createServer(); void setUp() { final stream = TestServerStream(toServer.stream, fromServer.sink);