mirror of https://github.com/grpc/grpc-dart.git
Create gRPC servers and clients with TransportConnections. (#364)
* Create gRPC servers and clients with TransportConnections. * switch class ordering to put base class first.
This commit is contained in:
parent
831f5d8cfb
commit
fa6b127dca
|
@ -1,3 +1,9 @@
|
||||||
|
## 2.6.0
|
||||||
|
|
||||||
|
* Create gRPC servers and clients with [Server|Client]TransportConnnection.
|
||||||
|
This allows callers to propvide their own transport configuration, such
|
||||||
|
as their own implementation of streams and sinks instead of sockets.
|
||||||
|
|
||||||
## 2.5.0
|
## 2.5.0
|
||||||
|
|
||||||
* Expose a `validateClient` method for server credentials so gRPC server
|
* Expose a `validateClient` method for server credentials so gRPC server
|
||||||
|
|
|
@ -27,9 +27,12 @@ export 'src/auth/auth_io.dart'
|
||||||
|
|
||||||
export 'src/client/call.dart' show CallOptions, ClientCall, MetadataProvider;
|
export 'src/client/call.dart' show CallOptions, ClientCall, MetadataProvider;
|
||||||
export 'src/client/client.dart' show Client;
|
export 'src/client/client.dart' show Client;
|
||||||
|
export 'src/client/client_transport_connector.dart'
|
||||||
|
show ClientTransportConnector;
|
||||||
export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
|
export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
|
||||||
export 'src/client/connection.dart' show ConnectionState;
|
export 'src/client/connection.dart' show ConnectionState;
|
||||||
export 'src/client/http2_channel.dart' show ClientChannel;
|
export 'src/client/http2_channel.dart'
|
||||||
|
show ClientChannel, ClientTransportConnectorChannel;
|
||||||
export 'src/client/method.dart' show ClientMethod;
|
export 'src/client/method.dart' show ClientMethod;
|
||||||
export 'src/client/options.dart'
|
export 'src/client/options.dart'
|
||||||
show
|
show
|
||||||
|
@ -48,6 +51,7 @@ export 'src/server/server.dart'
|
||||||
ServerCredentials,
|
ServerCredentials,
|
||||||
ServerLocalCredentials,
|
ServerLocalCredentials,
|
||||||
ServerTlsCredentials,
|
ServerTlsCredentials,
|
||||||
|
ConnectionServer,
|
||||||
Server;
|
Server;
|
||||||
export 'src/server/service.dart' show ServiceMethod, Service;
|
export 'src/server/service.dart' show ServiceMethod, Service;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:http2/transport.dart';
|
||||||
|
|
||||||
|
/// A transport-specific configuration used by gRPC clients to connect.
|
||||||
|
abstract class ClientTransportConnector {
|
||||||
|
/// Creates a HTTP/2 client connection.
|
||||||
|
Future<ClientTransportConnection> connect();
|
||||||
|
|
||||||
|
/// A future that completes when the client closes or when an error occurs.
|
||||||
|
Future get done;
|
||||||
|
|
||||||
|
/// Shuts down the connection, which should at least close the client.
|
||||||
|
void shutdown();
|
||||||
|
|
||||||
|
/// Header populated from any credentials or hostname information.
|
||||||
|
String get authority;
|
||||||
|
}
|
|
@ -14,6 +14,7 @@
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
import 'channel.dart';
|
import 'channel.dart';
|
||||||
|
import 'client_transport_connector.dart';
|
||||||
import 'connection.dart';
|
import 'connection.dart';
|
||||||
import 'http2_connection.dart' show Http2ClientConnection;
|
import 'http2_connection.dart' show Http2ClientConnection;
|
||||||
import 'options.dart';
|
import 'options.dart';
|
||||||
|
@ -37,3 +38,17 @@ class ClientChannel extends ClientChannelBase {
|
||||||
return Http2ClientConnection(host, port, options);
|
return Http2ClientConnection(host, port, options);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class ClientTransportConnectorChannel extends ClientChannelBase {
|
||||||
|
final ClientTransportConnector transportConnector;
|
||||||
|
final ChannelOptions options;
|
||||||
|
|
||||||
|
ClientTransportConnectorChannel(this.transportConnector,
|
||||||
|
{this.options = const ChannelOptions()});
|
||||||
|
|
||||||
|
@override
|
||||||
|
ClientConnection createConnection() {
|
||||||
|
return Http2ClientConnection.fromClientTransportConnector(
|
||||||
|
transportConnector, options);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import 'package:meta/meta.dart';
|
||||||
import '../shared/timeout.dart';
|
import '../shared/timeout.dart';
|
||||||
|
|
||||||
import 'call.dart';
|
import 'call.dart';
|
||||||
|
import 'client_transport_connector.dart';
|
||||||
import 'connection.dart' hide ClientConnection;
|
import 'connection.dart' hide ClientConnection;
|
||||||
import 'connection.dart' as connection;
|
import 'connection.dart' as connection;
|
||||||
|
|
||||||
|
@ -49,6 +50,7 @@ class Http2ClientConnection implements connection.ClientConnection {
|
||||||
void Function(Http2ClientConnection connection) onStateChanged;
|
void Function(Http2ClientConnection connection) onStateChanged;
|
||||||
final _pendingCalls = <ClientCall>[];
|
final _pendingCalls = <ClientCall>[];
|
||||||
|
|
||||||
|
final ClientTransportConnector _transportConnector;
|
||||||
ClientTransportConnection _transportConnection;
|
ClientTransportConnection _transportConnection;
|
||||||
|
|
||||||
/// Used for idle and reconnect timeout, depending on [_state].
|
/// Used for idle and reconnect timeout, depending on [_state].
|
||||||
|
@ -59,15 +61,15 @@ class Http2ClientConnection implements connection.ClientConnection {
|
||||||
|
|
||||||
Duration _currentReconnectDelay;
|
Duration _currentReconnectDelay;
|
||||||
|
|
||||||
final String host;
|
Http2ClientConnection(String host, int port, this.options)
|
||||||
final int port;
|
: _transportConnector = _SocketTransportConnector(host, port, options);
|
||||||
|
|
||||||
Http2ClientConnection(this.host, this.port, this.options);
|
Http2ClientConnection.fromClientTransportConnector(
|
||||||
|
this._transportConnector, this.options);
|
||||||
|
|
||||||
ChannelCredentials get credentials => options.credentials;
|
ChannelCredentials get credentials => options.credentials;
|
||||||
|
|
||||||
String get authority =>
|
String get authority => _transportConnector.authority;
|
||||||
options.credentials.authority ?? (port == 443 ? host : "$host:$port");
|
|
||||||
|
|
||||||
String get scheme => options.credentials.isSecure ? 'https' : 'http';
|
String get scheme => options.credentials.isSecure ? 'https' : 'http';
|
||||||
|
|
||||||
|
@ -75,36 +77,9 @@ class Http2ClientConnection implements connection.ClientConnection {
|
||||||
|
|
||||||
static const _estimatedRoundTripTime = const Duration(milliseconds: 20);
|
static const _estimatedRoundTripTime = const Duration(milliseconds: 20);
|
||||||
|
|
||||||
Future<Socket> _createSocket() async {
|
|
||||||
final securityContext = credentials.securityContext;
|
|
||||||
if (securityContext == null) {
|
|
||||||
return Socket.connect(host, port);
|
|
||||||
} else {
|
|
||||||
if (options.credentials.authority == null) {
|
|
||||||
return SecureSocket.connect(host, port,
|
|
||||||
context: securityContext,
|
|
||||||
onBadCertificate: _validateBadCertificate);
|
|
||||||
} else {
|
|
||||||
// Todo(sigurdm): We want to pass supportedProtocols: ['h2']. http://dartbug.com/37950
|
|
||||||
return SecureSocket.secure(await Socket.connect(host, port),
|
|
||||||
// This is not really the host, but the authority to verify the TLC
|
|
||||||
// connection against.
|
|
||||||
//
|
|
||||||
// We don't use `this.authority` here, as that includes the port.
|
|
||||||
host: options.credentials.authority,
|
|
||||||
context: securityContext,
|
|
||||||
onBadCertificate: _validateBadCertificate);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<ClientTransportConnection> connectTransport() async {
|
Future<ClientTransportConnection> connectTransport() async {
|
||||||
final Socket socket = await _createSocket();
|
final connection = await _transportConnector.connect();
|
||||||
// Don't wait for io buffers to fill up before sending requests.
|
_transportConnector.done.then((_) => _abandonConnection());
|
||||||
socket.setOption(SocketOption.tcpNoDelay, true);
|
|
||||||
|
|
||||||
final connection = ClientTransportConnection.viaSocket(socket);
|
|
||||||
socket.done.then((_) => _abandonConnection());
|
|
||||||
|
|
||||||
// Give the settings settings-frame a bit of time to arrive.
|
// Give the settings settings-frame a bit of time to arrive.
|
||||||
// TODO(sigurdm): This is a hack. The http2 package should expose a way of
|
// TODO(sigurdm): This is a hack. The http2 package should expose a way of
|
||||||
|
@ -112,7 +87,7 @@ class Http2ClientConnection implements connection.ClientConnection {
|
||||||
await new Future.delayed(_estimatedRoundTripTime);
|
await new Future.delayed(_estimatedRoundTripTime);
|
||||||
|
|
||||||
if (_state == ConnectionState.shutdown) {
|
if (_state == ConnectionState.shutdown) {
|
||||||
socket.destroy();
|
_transportConnector.shutdown();
|
||||||
throw _ShutdownException();
|
throw _ShutdownException();
|
||||||
}
|
}
|
||||||
return connection;
|
return connection;
|
||||||
|
@ -180,8 +155,8 @@ class Http2ClientConnection implements connection.ClientConnection {
|
||||||
GrpcTransportStream makeRequest(String path, Duration timeout,
|
GrpcTransportStream makeRequest(String path, Duration timeout,
|
||||||
Map<String, String> metadata, ErrorHandler onRequestFailure,
|
Map<String, String> metadata, ErrorHandler onRequestFailure,
|
||||||
{CallOptions callOptions}) {
|
{CallOptions callOptions}) {
|
||||||
final headers = createCallHeaders(
|
final headers = createCallHeaders(credentials.isSecure,
|
||||||
credentials.isSecure, authority, path, timeout, metadata,
|
_transportConnector.authority, path, timeout, metadata,
|
||||||
userAgent: options.userAgent);
|
userAgent: options.userAgent);
|
||||||
final stream = _transportConnection.makeRequest(headers);
|
final stream = _transportConnection.makeRequest(headers);
|
||||||
return Http2TransportStream(stream, onRequestFailure);
|
return Http2TransportStream(stream, onRequestFailure);
|
||||||
|
@ -320,9 +295,57 @@ class Http2ClientConnection implements connection.ClientConnection {
|
||||||
});
|
});
|
||||||
return headers;
|
return headers;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class _SocketTransportConnector implements ClientTransportConnector {
|
||||||
|
final String _host;
|
||||||
|
final int _port;
|
||||||
|
final ChannelOptions _options;
|
||||||
|
Socket _socket;
|
||||||
|
|
||||||
|
_SocketTransportConnector(this._host, this._port, this._options);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<ClientTransportConnection> connect() async {
|
||||||
|
final securityContext = _options.credentials.securityContext;
|
||||||
|
_socket = await Socket.connect(_host, _port);
|
||||||
|
// Don't wait for io buffers to fill up before sending requests.
|
||||||
|
_socket.setOption(SocketOption.tcpNoDelay, true);
|
||||||
|
if (securityContext != null) {
|
||||||
|
// Todo(sigurdm): We want to pass supportedProtocols: ['h2'].
|
||||||
|
// http://dartbug.com/37950
|
||||||
|
_socket = await SecureSocket.secure(_socket,
|
||||||
|
// This is not really the host, but the authority to verify the TLC
|
||||||
|
// connection against.
|
||||||
|
//
|
||||||
|
// We don't use `this.authority` here, as that includes the port.
|
||||||
|
host: _options.credentials.authority ?? _host,
|
||||||
|
context: securityContext,
|
||||||
|
onBadCertificate: _validateBadCertificate);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ClientTransportConnection.viaSocket(_socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
String get authority =>
|
||||||
|
_options.credentials.authority ??
|
||||||
|
(_port == 443 ? _host : "$_host:$_port");
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future get done {
|
||||||
|
assert(_socket != null);
|
||||||
|
return _socket.done;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void shutdown() {
|
||||||
|
assert(_socket != null);
|
||||||
|
_socket.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
bool _validateBadCertificate(X509Certificate certificate) {
|
bool _validateBadCertificate(X509Certificate certificate) {
|
||||||
final credentials = this.credentials;
|
final credentials = _options.credentials;
|
||||||
final validator = credentials.onBadCertificate;
|
final validator = credentials.onBadCertificate;
|
||||||
|
|
||||||
if (validator == null) return false;
|
if (validator == null) return false;
|
||||||
|
|
|
@ -77,19 +77,18 @@ class ServerTlsCredentials extends ServerCredentials {
|
||||||
bool validateClient(Socket socket) => true;
|
bool validateClient(Socket socket) => true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A gRPC server.
|
/// A gRPC server that serves via provided [ServerTransportConnection]s.
|
||||||
///
|
///
|
||||||
/// Listens for incoming RPCs, dispatching them to the right [Service] handler.
|
/// Unlike [Server], the caller has the responsibility of configuring and
|
||||||
class Server {
|
/// managing the connection from a client.
|
||||||
|
class ConnectionServer {
|
||||||
final Map<String, Service> _services = {};
|
final Map<String, Service> _services = {};
|
||||||
final List<Interceptor> _interceptors;
|
final List<Interceptor> _interceptors;
|
||||||
|
|
||||||
ServerSocket _insecureServer;
|
|
||||||
SecureServerSocket _secureServer;
|
|
||||||
final _connections = <ServerTransportConnection>[];
|
final _connections = <ServerTransportConnection>[];
|
||||||
|
|
||||||
/// Create a server for the given [services].
|
/// Create a server for the given [services].
|
||||||
Server(List<Service> services,
|
ConnectionServer(List<Service> services,
|
||||||
[List<Interceptor> interceptors = const <Interceptor>[]])
|
[List<Interceptor> interceptors = const <Interceptor>[]])
|
||||||
: _interceptors = interceptors {
|
: _interceptors = interceptors {
|
||||||
for (final service in services) {
|
for (final service in services) {
|
||||||
|
@ -97,6 +96,45 @@ class Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Service lookupService(String service) => _services[service];
|
||||||
|
|
||||||
|
Future<void> serveConnection(ServerTransportConnection connection) async {
|
||||||
|
_connections.add(connection);
|
||||||
|
ServerHandler_ handler;
|
||||||
|
// TODO(jakobr): Set active state handlers, close connection after idle
|
||||||
|
// timeout.
|
||||||
|
connection.incomingStreams.listen((stream) {
|
||||||
|
handler = serveStream_(stream);
|
||||||
|
}, onError: (error) {
|
||||||
|
print('Connection error: $error');
|
||||||
|
}, onDone: () {
|
||||||
|
// TODO(sigurdm): This is not correct behavior in the presence of
|
||||||
|
// half-closed tcp streams.
|
||||||
|
// Half-closed streams seems to not be fully supported by package:http2.
|
||||||
|
// https://github.com/dart-lang/http2/issues/42
|
||||||
|
handler?.cancel();
|
||||||
|
_connections.remove(connection);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@visibleForTesting
|
||||||
|
ServerHandler_ serveStream_(ServerTransportStream stream) {
|
||||||
|
return ServerHandler_(lookupService, stream, _interceptors)..handle();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A gRPC server.
|
||||||
|
///
|
||||||
|
/// Listens for incoming RPCs, dispatching them to the right [Service] handler.
|
||||||
|
class Server extends ConnectionServer {
|
||||||
|
ServerSocket _insecureServer;
|
||||||
|
SecureServerSocket _secureServer;
|
||||||
|
|
||||||
|
/// Create a server for the given [services].
|
||||||
|
Server(List<Service> services,
|
||||||
|
[List<Interceptor> interceptors = const <Interceptor>[]])
|
||||||
|
: super(services, interceptors);
|
||||||
|
|
||||||
/// The port that the server is listening on, or `null` if the server is not
|
/// The port that the server is listening on, or `null` if the server is not
|
||||||
/// active.
|
/// active.
|
||||||
int get port {
|
int get port {
|
||||||
|
@ -105,8 +143,6 @@ class Server {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Service lookupService(String service) => _services[service];
|
|
||||||
|
|
||||||
Future<void> serve(
|
Future<void> serve(
|
||||||
{dynamic address,
|
{dynamic address,
|
||||||
int port,
|
int port,
|
||||||
|
@ -138,27 +174,7 @@ class Server {
|
||||||
socket.setOption(SocketOption.tcpNoDelay, true);
|
socket.setOption(SocketOption.tcpNoDelay, true);
|
||||||
final connection = ServerTransportConnection.viaSocket(socket,
|
final connection = ServerTransportConnection.viaSocket(socket,
|
||||||
settings: http2ServerSettings);
|
settings: http2ServerSettings);
|
||||||
_connections.add(connection);
|
serveConnection(connection);
|
||||||
if (security != null && !security.validateClient(socket)) {
|
|
||||||
_printSocketError(
|
|
||||||
'cannot serve $address:$port - unable to validate client socket');
|
|
||||||
return socket.close();
|
|
||||||
}
|
|
||||||
ServerHandler_ handler;
|
|
||||||
// TODO(jakobr): Set active state handlers, close connection after idle
|
|
||||||
// timeout.
|
|
||||||
connection.incomingStreams.listen((stream) {
|
|
||||||
handler = serveStream_(stream);
|
|
||||||
}, onError: (error) {
|
|
||||||
print('Connection error: $error');
|
|
||||||
}, onDone: () {
|
|
||||||
// TODO(sigurdm): This is not correct behavior in the presence of
|
|
||||||
// half-closed tcp streams.
|
|
||||||
// Half-closed streams seems to not be fully supported by package:http2.
|
|
||||||
// https://github.com/dart-lang/http2/issues/42
|
|
||||||
handler?.cancel();
|
|
||||||
_connections.remove(connection);
|
|
||||||
});
|
|
||||||
}, onError: _printSocketError);
|
}, onError: _printSocketError);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
name: grpc
|
name: grpc
|
||||||
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
|
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
|
||||||
|
|
||||||
version: 2.5.0
|
version: 2.6.0
|
||||||
|
|
||||||
author: Dart Team <misc@dartlang.org>
|
author: Dart Team <misc@dartlang.org>
|
||||||
homepage: https://github.com/dart-lang/grpc-dart
|
homepage: https://github.com/dart-lang/grpc-dart
|
||||||
|
|
|
@ -0,0 +1,420 @@
|
||||||
|
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
|
||||||
|
// for details. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:grpc/grpc.dart';
|
||||||
|
import 'package:grpc/src/client/http2_connection.dart';
|
||||||
|
import 'package:http2/transport.dart';
|
||||||
|
import 'package:test/test.dart';
|
||||||
|
|
||||||
|
import '../src/client_utils.dart';
|
||||||
|
import '../src/utils.dart';
|
||||||
|
|
||||||
|
void main() {
|
||||||
|
const dummyValue = 0;
|
||||||
|
|
||||||
|
ClientTransportConnectorHarness harness;
|
||||||
|
|
||||||
|
setUp(() {
|
||||||
|
harness = ClientTransportConnectorHarness()..setUp();
|
||||||
|
});
|
||||||
|
|
||||||
|
tearDown(() {
|
||||||
|
harness.tearDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Unary calls work on the client', () async {
|
||||||
|
const requestValue = 17;
|
||||||
|
const responseValue = 19;
|
||||||
|
|
||||||
|
void handleRequest(StreamMessage message) {
|
||||||
|
final data = validateDataMessage(message);
|
||||||
|
expect(mockDecode(data.data), requestValue);
|
||||||
|
|
||||||
|
harness
|
||||||
|
..sendResponseHeader()
|
||||||
|
..sendResponseValue(responseValue)
|
||||||
|
..sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runTest(
|
||||||
|
clientCall: harness.client.unary(requestValue),
|
||||||
|
expectedResult: responseValue,
|
||||||
|
expectedPath: '/Test/Unary',
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Client-streaming calls work on the client', () async {
|
||||||
|
const requests = [17, 3];
|
||||||
|
const response = 12;
|
||||||
|
|
||||||
|
var index = 0;
|
||||||
|
|
||||||
|
void handleRequest(StreamMessage message) {
|
||||||
|
final data = validateDataMessage(message);
|
||||||
|
expect(mockDecode(data.data), requests[index++]);
|
||||||
|
}
|
||||||
|
|
||||||
|
void handleDone() {
|
||||||
|
harness
|
||||||
|
..sendResponseHeader()
|
||||||
|
..sendResponseValue(response)
|
||||||
|
..sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runTest(
|
||||||
|
clientCall: harness.client.clientStreaming(Stream.fromIterable(requests)),
|
||||||
|
expectedResult: response,
|
||||||
|
expectedPath: '/Test/ClientStreaming',
|
||||||
|
serverHandlers: [handleRequest, handleRequest],
|
||||||
|
doneHandler: handleDone,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server-streaming calls work on the client', () async {
|
||||||
|
const request = 4;
|
||||||
|
const responses = [3, 17, 9];
|
||||||
|
|
||||||
|
void handleRequest(StreamMessage message) {
|
||||||
|
final data = validateDataMessage(message);
|
||||||
|
expect(mockDecode(data.data), request);
|
||||||
|
|
||||||
|
harness.sendResponseHeader();
|
||||||
|
responses.forEach(harness.sendResponseValue);
|
||||||
|
harness.sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runTest(
|
||||||
|
clientCall: harness.client.serverStreaming(request).toList(),
|
||||||
|
expectedResult: responses,
|
||||||
|
expectedPath: '/Test/ServerStreaming',
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Bidirectional calls work on the client', () async {
|
||||||
|
const requests = [1, 15, 7];
|
||||||
|
const responses = [3, 17, 9];
|
||||||
|
|
||||||
|
var index = 0;
|
||||||
|
|
||||||
|
void handleRequest(StreamMessage message) {
|
||||||
|
final data = validateDataMessage(message);
|
||||||
|
expect(mockDecode(data.data), requests[index]);
|
||||||
|
|
||||||
|
if (index == 0) {
|
||||||
|
harness.sendResponseHeader();
|
||||||
|
}
|
||||||
|
harness.sendResponseValue(responses[index]);
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
|
||||||
|
void handleDone() {
|
||||||
|
harness.sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runTest(
|
||||||
|
clientCall:
|
||||||
|
harness.client.bidirectional(Stream.fromIterable(requests)).toList(),
|
||||||
|
expectedResult: responses,
|
||||||
|
expectedPath: '/Test/Bidirectional',
|
||||||
|
serverHandlers: [handleRequest, handleRequest, handleRequest],
|
||||||
|
doneHandler: handleDone,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Unary call with no response throws error', () async {
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness.sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException: GrpcError.unimplemented('No responses received'),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Unary call with more than one response throws error', () async {
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness
|
||||||
|
..sendResponseHeader()
|
||||||
|
..sendResponseValue(dummyValue)
|
||||||
|
..sendResponseValue(dummyValue)
|
||||||
|
..sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException:
|
||||||
|
GrpcError.unimplemented('More than one response received'),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Call throws if nothing is received', () async {
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness.toClient.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException: GrpcError.unavailable('Did not receive anything'),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Call throws if trailers are missing', () async {
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness
|
||||||
|
..sendResponseHeader()
|
||||||
|
..sendResponseValue(dummyValue)
|
||||||
|
..toClient.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException: GrpcError.unavailable('Missing trailers'),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Call throws if data is received before headers', () async {
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness.sendResponseValue(dummyValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException:
|
||||||
|
GrpcError.unimplemented('Received data before headers'),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Call throws if data is received after trailers', () async {
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness
|
||||||
|
..sendResponseHeader()
|
||||||
|
..sendResponseTrailer(closeStream: false)
|
||||||
|
..sendResponseValue(dummyValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException:
|
||||||
|
GrpcError.unimplemented('Received data after trailers'),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Call throws if multiple trailers are received', () async {
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness
|
||||||
|
..sendResponseHeader()
|
||||||
|
..sendResponseTrailer(closeStream: false)
|
||||||
|
..sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException: GrpcError.unimplemented('Received multiple trailers'),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Call throws if non-zero status is received', () async {
|
||||||
|
const customStatusCode = 17;
|
||||||
|
const customStatusMessage = 'Custom message';
|
||||||
|
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness.toClient.add(HeadersStreamMessage([
|
||||||
|
Header.ascii('grpc-status', '$customStatusCode'),
|
||||||
|
Header.ascii('grpc-message', customStatusMessage)
|
||||||
|
], endStream: true));
|
||||||
|
harness.toClient.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException:
|
||||||
|
GrpcError.custom(customStatusCode, customStatusMessage),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Call throws on response stream errors', () async {
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness.toClient.addError('Test error');
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException: GrpcError.unknown('Test error'),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Call throws if unable to decode response', () async {
|
||||||
|
const responseValue = 19;
|
||||||
|
|
||||||
|
void handleRequest(StreamMessage message) {
|
||||||
|
harness
|
||||||
|
..sendResponseHeader()
|
||||||
|
..sendResponseValue(responseValue)
|
||||||
|
..sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
harness.client = TestClient(harness.channel, decode: (bytes) {
|
||||||
|
throw "error decoding";
|
||||||
|
});
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException: GrpcError.dataLoss('Error parsing response'),
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Call forwards known response stream errors', () async {
|
||||||
|
final expectedException = GrpcError.dataLoss('Oops!');
|
||||||
|
|
||||||
|
void handleRequest(_) {
|
||||||
|
harness.toClient.addError(expectedException);
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.unary(dummyValue),
|
||||||
|
expectedException: expectedException,
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Known request errors are reported', () async {
|
||||||
|
final expectedException = GrpcError.deadlineExceeded('Too late!');
|
||||||
|
|
||||||
|
Stream<int> requests() async* {
|
||||||
|
throw expectedException;
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.clientStreaming(requests()),
|
||||||
|
expectedException: expectedException,
|
||||||
|
expectDone: false,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Custom request errors are reported', () async {
|
||||||
|
Stream<int> requests() async* {
|
||||||
|
throw 'Error';
|
||||||
|
}
|
||||||
|
|
||||||
|
final expectedException = GrpcError.unknown('Error');
|
||||||
|
await harness.runFailureTest(
|
||||||
|
clientCall: harness.client.clientStreaming(requests()),
|
||||||
|
expectedException: expectedException,
|
||||||
|
expectDone: false,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
Future<void> makeUnaryCall() async {
|
||||||
|
void handleRequest(StreamMessage message) {
|
||||||
|
harness
|
||||||
|
..sendResponseHeader()
|
||||||
|
..sendResponseValue(1)
|
||||||
|
..sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runTest(
|
||||||
|
clientCall: harness.client.unary(1),
|
||||||
|
expectedResult: 1,
|
||||||
|
expectedPath: '/Test/Unary',
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
test('Connection errors are reported', () async {
|
||||||
|
final connectionStates = <ConnectionState>[];
|
||||||
|
harness.connection.connectionError = 'Connection error';
|
||||||
|
harness.connection.onStateChanged = (connection) {
|
||||||
|
final state = connection.state;
|
||||||
|
connectionStates.add(state);
|
||||||
|
};
|
||||||
|
|
||||||
|
final expectedException =
|
||||||
|
GrpcError.unavailable('Error connecting: Connection error');
|
||||||
|
|
||||||
|
await harness.expectThrows(
|
||||||
|
harness.client.unary(dummyValue), expectedException);
|
||||||
|
|
||||||
|
expect(
|
||||||
|
connectionStates, [ConnectionState.connecting, ConnectionState.idle]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Connections time out if idle', () async {
|
||||||
|
final done = Completer();
|
||||||
|
final connectionStates = <ConnectionState>[];
|
||||||
|
harness.connection.onStateChanged = (connection) {
|
||||||
|
final state = connection.state;
|
||||||
|
connectionStates.add(state);
|
||||||
|
if (state == ConnectionState.idle) done.complete();
|
||||||
|
};
|
||||||
|
|
||||||
|
harness.channelOptions.idleTimeout = const Duration(microseconds: 10);
|
||||||
|
|
||||||
|
await makeUnaryCall();
|
||||||
|
harness.signalIdle();
|
||||||
|
expect(
|
||||||
|
connectionStates, [ConnectionState.connecting, ConnectionState.ready]);
|
||||||
|
await done.future;
|
||||||
|
expect(connectionStates, [
|
||||||
|
ConnectionState.connecting,
|
||||||
|
ConnectionState.ready,
|
||||||
|
ConnectionState.idle
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Default reconnect backoff backs off', () {
|
||||||
|
Duration lastBackoff = defaultBackoffStrategy(null);
|
||||||
|
expect(lastBackoff, const Duration(seconds: 1));
|
||||||
|
for (int i = 0; i < 12; i++) {
|
||||||
|
final minNext = lastBackoff * (1.6 - 0.2);
|
||||||
|
final maxNext = lastBackoff * (1.6 + 0.2);
|
||||||
|
lastBackoff = defaultBackoffStrategy(lastBackoff);
|
||||||
|
if (lastBackoff != const Duration(minutes: 2)) {
|
||||||
|
expect(lastBackoff, greaterThanOrEqualTo(minNext));
|
||||||
|
expect(lastBackoff, lessThanOrEqualTo(maxNext));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expect(lastBackoff, const Duration(minutes: 2));
|
||||||
|
expect(defaultBackoffStrategy(lastBackoff), const Duration(minutes: 2));
|
||||||
|
});
|
||||||
|
|
||||||
|
test('authority is computed correctly', () {
|
||||||
|
final emptyOptions = ChannelOptions();
|
||||||
|
expect(Http2ClientConnection('localhost', 8080, emptyOptions).authority,
|
||||||
|
'localhost:8080');
|
||||||
|
expect(Http2ClientConnection('localhost', 443, emptyOptions).authority,
|
||||||
|
'localhost');
|
||||||
|
final channelOptions = ChannelOptions(
|
||||||
|
credentials: ChannelCredentials.insecure(authority: 'myauthority.com'));
|
||||||
|
expect(Http2ClientConnection('localhost', 8080, channelOptions).authority,
|
||||||
|
'myauthority.com');
|
||||||
|
expect(Http2ClientConnection('localhost', null, channelOptions).authority,
|
||||||
|
'myauthority.com');
|
||||||
|
});
|
||||||
|
}
|
|
@ -0,0 +1,372 @@
|
||||||
|
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
|
||||||
|
// for details. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
@TestOn('vm')
|
||||||
|
|
||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:grpc/grpc.dart';
|
||||||
|
import 'package:http2/transport.dart';
|
||||||
|
import 'package:test/test.dart';
|
||||||
|
|
||||||
|
import 'src/server_utils.dart';
|
||||||
|
|
||||||
|
void main() {
|
||||||
|
const dummyValue = 17;
|
||||||
|
|
||||||
|
ConnectionServerHarness harness;
|
||||||
|
|
||||||
|
setUp(() {
|
||||||
|
harness = ConnectionServerHarness()..setUp();
|
||||||
|
});
|
||||||
|
|
||||||
|
tearDown(() {
|
||||||
|
harness.tearDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Unary calls work on the server', () async {
|
||||||
|
const expectedRequest = 5;
|
||||||
|
const expectedResponse = 7;
|
||||||
|
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
|
||||||
|
expect(await request, expectedRequest);
|
||||||
|
return expectedResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
harness
|
||||||
|
..service.unaryHandler = methodHandler
|
||||||
|
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Client-streaming calls work on the server', () async {
|
||||||
|
const expectedRequests = [5, 3, 17];
|
||||||
|
const expectedResponse = 12;
|
||||||
|
Future<int> methodHandler(ServiceCall call, Stream<int> request) async {
|
||||||
|
expect(await request.toList(), expectedRequests);
|
||||||
|
return expectedResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
harness
|
||||||
|
..service.clientStreamingHandler = methodHandler
|
||||||
|
..runTest('/Test/ClientStreaming', expectedRequests, [expectedResponse]);
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server-streaming calls work on the server', () async {
|
||||||
|
const expectedRequest = 5;
|
||||||
|
const expectedResponses = [7, 9, 1];
|
||||||
|
|
||||||
|
Stream<int> methodHandler(ServiceCall call, Future<int> request) async* {
|
||||||
|
expect(await request, expectedRequest);
|
||||||
|
for (final value in expectedResponses) {
|
||||||
|
yield value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
harness
|
||||||
|
..service.serverStreamingHandler = methodHandler
|
||||||
|
..runTest('/Test/ServerStreaming', [expectedRequest], expectedResponses);
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Bidirectional calls work on the server', () async {
|
||||||
|
const expectedRequests = [3, 1, 7];
|
||||||
|
final expectedResponses = expectedRequests.map((v) => v + 5).toList();
|
||||||
|
|
||||||
|
Stream<int> methodHandler(ServiceCall call, Stream<int> request) async* {
|
||||||
|
yield* request.map((value) => value + 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
harness
|
||||||
|
..service.bidirectionalHandler = methodHandler
|
||||||
|
..runTest('/Test/Bidirectional', expectedRequests, expectedResponses);
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server returns error on missing call header', () async {
|
||||||
|
harness
|
||||||
|
..expectErrorResponse(StatusCode.unimplemented, 'Expected header frame')
|
||||||
|
..sendData(dummyValue);
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server returns error on invalid path', () async {
|
||||||
|
harness
|
||||||
|
..expectErrorResponse(StatusCode.unimplemented, 'Invalid path')
|
||||||
|
..sendRequestHeader('InvalidPath');
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server returns error on unimplemented path', () async {
|
||||||
|
harness
|
||||||
|
..expectErrorResponse(
|
||||||
|
StatusCode.unimplemented, 'Path /Test/NotFound not found')
|
||||||
|
..sendRequestHeader('/Test/NotFound');
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
/// Returns a service method handler that verifies that awaiting the request
|
||||||
|
/// throws a specific error.
|
||||||
|
Future<int> Function(ServiceCall call, Future<int> request) expectError(
|
||||||
|
expectedError) {
|
||||||
|
return expectAsync2((ServiceCall call, Future<int> request) async {
|
||||||
|
try {
|
||||||
|
final result = await request;
|
||||||
|
registerException('Did not throw');
|
||||||
|
return result;
|
||||||
|
} catch (caughtError) {
|
||||||
|
try {
|
||||||
|
expect(caughtError, expectedError);
|
||||||
|
} catch (failure, stack) {
|
||||||
|
registerException(failure, stack);
|
||||||
|
}
|
||||||
|
rethrow;
|
||||||
|
}
|
||||||
|
}, count: 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a service method handler that verifies that awaiting the request
|
||||||
|
/// stream throws a specific error.
|
||||||
|
Stream<int> Function(ServiceCall call, Stream<int> request)
|
||||||
|
expectErrorStreaming(expectedError) {
|
||||||
|
return (ServiceCall call, Stream<int> request) async* {
|
||||||
|
try {
|
||||||
|
await for (var entry in request) {
|
||||||
|
yield entry;
|
||||||
|
}
|
||||||
|
registerException('Did not throw');
|
||||||
|
} catch (caughtError) {
|
||||||
|
try {
|
||||||
|
expect(caughtError, expectedError);
|
||||||
|
} catch (failure, stack) {
|
||||||
|
registerException(failure, stack);
|
||||||
|
}
|
||||||
|
rethrow;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
test('Server returns error on missing request for unary call', () async {
|
||||||
|
harness
|
||||||
|
..service.unaryHandler =
|
||||||
|
expectError(GrpcError.unimplemented('No request received'))
|
||||||
|
..expectErrorResponse(StatusCode.unimplemented, 'No request received')
|
||||||
|
..sendRequestHeader('/Test/Unary')
|
||||||
|
..toServer.close();
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server returns error if multiple headers are received for unary call',
|
||||||
|
() async {
|
||||||
|
harness
|
||||||
|
..service.unaryHandler =
|
||||||
|
expectError(GrpcError.unimplemented('Expected request'))
|
||||||
|
..expectErrorResponse(StatusCode.unimplemented, 'Expected request')
|
||||||
|
..sendRequestHeader('/Test/Unary')
|
||||||
|
..toServer.add(HeadersStreamMessage([]))
|
||||||
|
..toServer.close();
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server returns error on too many requests for unary call', () async {
|
||||||
|
harness
|
||||||
|
..service.unaryHandler =
|
||||||
|
expectError(GrpcError.unimplemented('Too many requests'))
|
||||||
|
..expectErrorResponse(StatusCode.unimplemented, 'Too many requests')
|
||||||
|
..sendRequestHeader('/Test/Unary')
|
||||||
|
..sendData(dummyValue)
|
||||||
|
..sendData(dummyValue)
|
||||||
|
..toServer.close();
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server returns request deserialization errors', () async {
|
||||||
|
harness
|
||||||
|
..service.bidirectionalHandler = expectErrorStreaming(
|
||||||
|
GrpcError.internal('Error deserializing request: Failed'))
|
||||||
|
..expectErrorResponse(
|
||||||
|
StatusCode.internal, 'Error deserializing request: Failed')
|
||||||
|
..sendRequestHeader('/Test/RequestError')
|
||||||
|
..sendData(dummyValue)
|
||||||
|
..toServer.close();
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server returns response serialization errors', () async {
|
||||||
|
harness
|
||||||
|
..service.bidirectionalHandler = expectErrorStreaming(
|
||||||
|
GrpcError.internal('Error sending response: Failed'))
|
||||||
|
..expectErrorResponse(
|
||||||
|
StatusCode.internal, 'Error sending response: Failed')
|
||||||
|
..sendRequestHeader('/Test/ResponseError')
|
||||||
|
..sendData(dummyValue)
|
||||||
|
..sendData(dummyValue)
|
||||||
|
..toServer.close();
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Header can only be sent once', () async {
|
||||||
|
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
|
||||||
|
call.sendHeaders();
|
||||||
|
call.sendHeaders();
|
||||||
|
return await request;
|
||||||
|
}
|
||||||
|
|
||||||
|
harness
|
||||||
|
..service.unaryHandler = methodHandler
|
||||||
|
..expectTrailingErrorResponse(StatusCode.internal, 'Headers already sent')
|
||||||
|
..sendRequestHeader('/Test/Unary');
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Server receives cancel', () async {
|
||||||
|
final success = Completer<bool>();
|
||||||
|
|
||||||
|
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
|
||||||
|
try {
|
||||||
|
final result = await request;
|
||||||
|
registerException('Did not throw');
|
||||||
|
return result;
|
||||||
|
} catch (caughtError) {
|
||||||
|
try {
|
||||||
|
expect(caughtError, GrpcError.cancelled('Cancelled'));
|
||||||
|
expect(call.isCanceled, isTrue);
|
||||||
|
success.complete(true);
|
||||||
|
} catch (failure, stack) {
|
||||||
|
registerException(failure, stack);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (!success.isCompleted) {
|
||||||
|
success.complete(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dummyValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
harness
|
||||||
|
..service.unaryHandler = methodHandler
|
||||||
|
..fromServer.stream.listen(expectAsync1((_) {}, count: 0),
|
||||||
|
onError: expectAsync1((error) {
|
||||||
|
expect(error, 'TERMINATED');
|
||||||
|
}, count: 1),
|
||||||
|
onDone: expectAsync0(() {}, count: 1))
|
||||||
|
..sendRequestHeader('/Test/Unary')
|
||||||
|
..toServer.addError('CANCEL');
|
||||||
|
|
||||||
|
expect(await success.future, isTrue);
|
||||||
|
await harness.toServer.close();
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
test(
|
||||||
|
'Server returns error if request stream is closed before sending anything',
|
||||||
|
() async {
|
||||||
|
harness
|
||||||
|
..expectErrorResponse(
|
||||||
|
StatusCode.unavailable, 'Request stream closed unexpectedly')
|
||||||
|
..toServer.close();
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
group('Server with interceptor', () {
|
||||||
|
group('processes calls if interceptor allows request', () {
|
||||||
|
const expectedRequest = 5;
|
||||||
|
const expectedResponse = 7;
|
||||||
|
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
|
||||||
|
expect(await request, expectedRequest);
|
||||||
|
return expectedResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Interceptor interceptor = (call, method) {
|
||||||
|
if (method.name == "Unary") {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return GrpcError.unauthenticated('Request is unauthenticated');
|
||||||
|
};
|
||||||
|
|
||||||
|
Future<void> doTest(Interceptor handler) async {
|
||||||
|
harness
|
||||||
|
..interceptor.handler = handler
|
||||||
|
..service.unaryHandler = methodHandler
|
||||||
|
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
|
||||||
|
|
||||||
|
await harness.fromServer.done;
|
||||||
|
}
|
||||||
|
|
||||||
|
test('with sync interceptor', () => doTest(interceptor));
|
||||||
|
test('with async interceptor',
|
||||||
|
() => doTest((call, method) async => interceptor(call, method)));
|
||||||
|
});
|
||||||
|
|
||||||
|
group('returns error if interceptor blocks request', () {
|
||||||
|
final Interceptor interceptor = (call, method) {
|
||||||
|
if (method.name == "Unary") {
|
||||||
|
return GrpcError.unauthenticated('Request is unauthenticated');
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
};
|
||||||
|
|
||||||
|
Future<void> doTest(Interceptor handler) async {
|
||||||
|
harness
|
||||||
|
..interceptor.handler = handler
|
||||||
|
..expectErrorResponse(
|
||||||
|
StatusCode.unauthenticated, 'Request is unauthenticated')
|
||||||
|
..sendRequestHeader('/Test/Unary');
|
||||||
|
|
||||||
|
await harness.fromServer.done;
|
||||||
|
}
|
||||||
|
|
||||||
|
test('with sync interceptor', () => doTest(interceptor));
|
||||||
|
test('with async interceptor',
|
||||||
|
() => doTest((call, method) async => interceptor(call, method)));
|
||||||
|
});
|
||||||
|
|
||||||
|
group('returns internal error if interceptor throws exception', () {
|
||||||
|
final Interceptor interceptor = (call, method) {
|
||||||
|
throw Exception('Reason is unknown');
|
||||||
|
};
|
||||||
|
|
||||||
|
Future<void> doTest(Interceptor handler) async {
|
||||||
|
harness
|
||||||
|
..interceptor.handler = handler
|
||||||
|
..expectErrorResponse(
|
||||||
|
StatusCode.internal, 'Exception: Reason is unknown')
|
||||||
|
..sendRequestHeader('/Test/Unary');
|
||||||
|
|
||||||
|
await harness.fromServer.done;
|
||||||
|
}
|
||||||
|
|
||||||
|
test('with sync interceptor', () => doTest(interceptor));
|
||||||
|
test('with async interceptor',
|
||||||
|
() => doTest((call, method) async => interceptor(call, method)));
|
||||||
|
});
|
||||||
|
|
||||||
|
test("don't fail if interceptor await 2 times", () async {
|
||||||
|
final Interceptor interceptor = (call, method) async {
|
||||||
|
await Future.value();
|
||||||
|
await Future.value();
|
||||||
|
throw Exception('Reason is unknown');
|
||||||
|
};
|
||||||
|
|
||||||
|
harness
|
||||||
|
..interceptor.handler = interceptor
|
||||||
|
..expectErrorResponse(
|
||||||
|
StatusCode.internal, 'Exception: Reason is unknown')
|
||||||
|
..sendRequestHeader('/Test/Unary')
|
||||||
|
..sendData(1);
|
||||||
|
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
|
@ -16,6 +16,7 @@
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'dart:convert';
|
import 'dart:convert';
|
||||||
|
|
||||||
|
import 'package:grpc/src/client/channel.dart' as base;
|
||||||
import 'package:grpc/src/client/http2_connection.dart';
|
import 'package:grpc/src/client/http2_connection.dart';
|
||||||
import 'package:grpc/src/shared/message.dart';
|
import 'package:grpc/src/shared/message.dart';
|
||||||
import 'package:http2/transport.dart';
|
import 'package:http2/transport.dart';
|
||||||
|
@ -45,6 +46,21 @@ class FakeConnection extends Http2ClientConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class FakeClientTransportConnection extends Http2ClientConnection {
|
||||||
|
final ClientTransportConnector connector;
|
||||||
|
|
||||||
|
var connectionError;
|
||||||
|
|
||||||
|
FakeClientTransportConnection(this.connector, ChannelOptions options)
|
||||||
|
: super.fromClientTransportConnector(connector, options);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<ClientTransportConnection> connectTransport() async {
|
||||||
|
if (connectionError != null) throw connectionError;
|
||||||
|
return await connector.connect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Duration testBackoff(Duration lastBackoff) => const Duration(milliseconds: 1);
|
Duration testBackoff(Duration lastBackoff) => const Duration(milliseconds: 1);
|
||||||
|
|
||||||
class FakeChannelOptions implements ChannelOptions {
|
class FakeChannelOptions implements ChannelOptions {
|
||||||
|
@ -66,6 +82,18 @@ class FakeChannel extends ClientChannel {
|
||||||
Future<Http2ClientConnection> getConnection() async => connection;
|
Future<Http2ClientConnection> getConnection() async => connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class FakeClientConnectorChannel extends ClientTransportConnectorChannel {
|
||||||
|
final Http2ClientConnection connection;
|
||||||
|
final FakeChannelOptions options;
|
||||||
|
|
||||||
|
FakeClientConnectorChannel(
|
||||||
|
ClientTransportConnector connector, this.connection, this.options)
|
||||||
|
: super(connector, options: options);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<Http2ClientConnection> getConnection() async => connection;
|
||||||
|
}
|
||||||
|
|
||||||
typedef ServerMessageHandler = void Function(StreamMessage message);
|
typedef ServerMessageHandler = void Function(StreamMessage message);
|
||||||
|
|
||||||
class TestClient extends Client {
|
class TestClient extends Client {
|
||||||
|
@ -76,7 +104,7 @@ class TestClient extends Client {
|
||||||
|
|
||||||
final int Function(List<int> value) decode;
|
final int Function(List<int> value) decode;
|
||||||
|
|
||||||
TestClient(ClientChannel channel,
|
TestClient(base.ClientChannel channel,
|
||||||
{CallOptions options, this.decode: mockDecode})
|
{CallOptions options, this.decode: mockDecode})
|
||||||
: super(channel, options: options) {
|
: super(channel, options: options) {
|
||||||
_$unary = ClientMethod<int, int>('/Test/Unary', mockEncode, decode);
|
_$unary = ClientMethod<int, int>('/Test/Unary', mockEncode, decode);
|
||||||
|
@ -113,10 +141,56 @@ class TestClient extends Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ClientHarness {
|
class ClientHarness extends _Harness {
|
||||||
MockTransport transport;
|
|
||||||
FakeConnection connection;
|
FakeConnection connection;
|
||||||
FakeChannel channel;
|
|
||||||
|
@override
|
||||||
|
FakeChannel createChannel() {
|
||||||
|
connection = FakeConnection('test', transport, channelOptions);
|
||||||
|
return FakeChannel('test', connection, channelOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
String get expectedAuthority => 'test';
|
||||||
|
}
|
||||||
|
|
||||||
|
class ClientTransportConnectorHarness extends _Harness {
|
||||||
|
FakeClientTransportConnection connection;
|
||||||
|
ClientTransportConnector connector;
|
||||||
|
|
||||||
|
@override
|
||||||
|
FakeClientConnectorChannel createChannel() {
|
||||||
|
connector = FakeClientTransportConnector(transport);
|
||||||
|
connection = FakeClientTransportConnection(connector, channelOptions);
|
||||||
|
return FakeClientConnectorChannel(connector, connection, channelOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
String get expectedAuthority => 'test';
|
||||||
|
}
|
||||||
|
|
||||||
|
class FakeClientTransportConnector extends ClientTransportConnector {
|
||||||
|
final ClientTransportConnection _transportConnection;
|
||||||
|
final completer = Completer();
|
||||||
|
|
||||||
|
FakeClientTransportConnector(this._transportConnection);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<ClientTransportConnection> connect() async => _transportConnection;
|
||||||
|
|
||||||
|
@override
|
||||||
|
String get authority => 'test';
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future get done => completer.future;
|
||||||
|
|
||||||
|
@override
|
||||||
|
void shutdown() => completer.complete();
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class _Harness {
|
||||||
|
MockTransport transport;
|
||||||
|
base.ClientChannel channel;
|
||||||
FakeChannelOptions channelOptions;
|
FakeChannelOptions channelOptions;
|
||||||
MockStream stream;
|
MockStream stream;
|
||||||
|
|
||||||
|
@ -125,11 +199,14 @@ class ClientHarness {
|
||||||
|
|
||||||
TestClient client;
|
TestClient client;
|
||||||
|
|
||||||
|
base.ClientChannel createChannel();
|
||||||
|
|
||||||
|
String get expectedAuthority;
|
||||||
|
|
||||||
void setUp() {
|
void setUp() {
|
||||||
transport = MockTransport();
|
transport = MockTransport();
|
||||||
channelOptions = FakeChannelOptions();
|
channelOptions = FakeChannelOptions();
|
||||||
connection = FakeConnection('test', transport, channelOptions);
|
channel = createChannel();
|
||||||
channel = FakeChannel('test', connection, channelOptions);
|
|
||||||
stream = MockStream();
|
stream = MockStream();
|
||||||
fromClient = StreamController();
|
fromClient = StreamController();
|
||||||
toClient = StreamController();
|
toClient = StreamController();
|
||||||
|
@ -198,6 +275,7 @@ class ClientHarness {
|
||||||
Map.fromEntries(capturedHeaders.map((header) =>
|
Map.fromEntries(capturedHeaders.map((header) =>
|
||||||
MapEntry(utf8.decode(header.name), utf8.decode(header.value)))),
|
MapEntry(utf8.decode(header.name), utf8.decode(header.value)))),
|
||||||
path: expectedPath,
|
path: expectedPath,
|
||||||
|
authority: expectedAuthority,
|
||||||
timeout: toTimeoutString(expectedTimeout),
|
timeout: toTimeoutString(expectedTimeout),
|
||||||
customHeaders: expectedCustomHeaders);
|
customHeaders: expectedCustomHeaders);
|
||||||
|
|
||||||
|
|
|
@ -116,23 +116,40 @@ class TestServerStream extends ServerTransportStream {
|
||||||
ServerTransportStream push(List<Header> requestHeaders) => null;
|
ServerTransportStream push(List<Header> requestHeaders) => null;
|
||||||
}
|
}
|
||||||
|
|
||||||
class ServerHarness {
|
class ServerHarness extends _Harness {
|
||||||
final toServer = StreamController<StreamMessage>();
|
@override
|
||||||
final fromServer = StreamController<StreamMessage>();
|
ConnectionServer createServer() =>
|
||||||
final service = TestService();
|
Server(<Service>[service], <Interceptor>[interceptor]);
|
||||||
final interceptor = TestInterceptor();
|
|
||||||
|
|
||||||
Server server;
|
|
||||||
|
|
||||||
ServerHarness() {
|
|
||||||
server = Server(<Service>[service], <Interceptor>[interceptor]);
|
|
||||||
}
|
|
||||||
|
|
||||||
static ServiceMethod<int, int> createMethod(String name,
|
static ServiceMethod<int, int> createMethod(String name,
|
||||||
Function methodHandler, bool clientStreaming, bool serverStreaming) {
|
Function methodHandler, bool clientStreaming, bool serverStreaming) {
|
||||||
return ServiceMethod<int, int>(name, methodHandler, clientStreaming,
|
return ServiceMethod<int, int>(name, methodHandler, clientStreaming,
|
||||||
serverStreaming, mockDecode, mockEncode);
|
serverStreaming, mockDecode, mockEncode);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class ConnectionServerHarness extends _Harness {
|
||||||
|
@override
|
||||||
|
ConnectionServer createServer() =>
|
||||||
|
ConnectionServer(<Service>[service], <Interceptor>[interceptor]);
|
||||||
|
|
||||||
|
static ServiceMethod<int, int> createMethod(String name,
|
||||||
|
Function methodHandler, bool clientStreaming, bool serverStreaming) {
|
||||||
|
return ServiceMethod<int, int>(name, methodHandler, clientStreaming,
|
||||||
|
serverStreaming, mockDecode, mockEncode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class _Harness {
|
||||||
|
final toServer = StreamController<StreamMessage>();
|
||||||
|
final fromServer = StreamController<StreamMessage>();
|
||||||
|
final service = TestService();
|
||||||
|
final interceptor = TestInterceptor();
|
||||||
|
ConnectionServer _server;
|
||||||
|
|
||||||
|
ConnectionServer createServer();
|
||||||
|
|
||||||
|
ConnectionServer get server => _server ??= createServer();
|
||||||
|
|
||||||
void setUp() {
|
void setUp() {
|
||||||
final stream = TestServerStream(toServer.stream, fromServer.sink);
|
final stream = TestServerStream(toServer.stream, fromServer.sink);
|
||||||
|
|
Loading…
Reference in New Issue