Expose onConnectionStateChanged for channels (#565)

Co-authored-by: Vyacheslav Egorov <vegorov@google.com>
This commit is contained in:
Cobinja 2022-08-15 15:03:51 +02:00 committed by GitHub
parent 94b71a3e1c
commit af965f15f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 106 additions and 57 deletions

View File

@ -1,3 +1,9 @@
## 3.1.0-dev
* Expose a stream for connection state changes on ClientChannel to address
[#428](https://github.com/grpc/grpc-dart/issues/428).
This allows users to react to state changes in the connection.
## 3.0.2 ## 3.0.2
* Fix compilation on the Web with DDC. * Fix compilation on the Web with DDC.

View File

@ -39,6 +39,12 @@ abstract class ClientChannel {
/// Initiates a new RPC on this connection. /// Initiates a new RPC on this connection.
ClientCall<Q, R> createCall<Q, R>( ClientCall<Q, R> createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options); ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options);
/// Stream of connection state changes
///
/// This returns a broadcast stream that can be listened to for connection changes.
/// Note: on web channels, this will not yield any values.
Stream<ConnectionState> get onConnectionStateChanged;
} }
/// Auxiliary base class implementing much of ClientChannel. /// Auxiliary base class implementing much of ClientChannel.
@ -47,6 +53,8 @@ abstract class ClientChannelBase implements ClientChannel {
late ClientConnection _connection; late ClientConnection _connection;
var _connected = false; var _connected = false;
bool _isShutdown = false; bool _isShutdown = false;
final StreamController<ConnectionState> _connectionStateStreamController =
StreamController.broadcast();
ClientChannelBase(); ClientChannelBase();
@ -56,6 +64,7 @@ abstract class ClientChannelBase implements ClientChannel {
_isShutdown = true; _isShutdown = true;
if (_connected) { if (_connected) {
await _connection.shutdown(); await _connection.shutdown();
await _connectionStateStreamController.close();
} }
} }
@ -64,6 +73,7 @@ abstract class ClientChannelBase implements ClientChannel {
_isShutdown = true; _isShutdown = true;
if (_connected) { if (_connected) {
await _connection.terminate(); await _connection.terminate();
await _connectionStateStreamController.close();
} }
} }
@ -76,6 +86,12 @@ abstract class ClientChannelBase implements ClientChannel {
if (_isShutdown) throw GrpcError.unavailable('Channel shutting down.'); if (_isShutdown) throw GrpcError.unavailable('Channel shutting down.');
if (!_connected) { if (!_connected) {
_connection = createConnection(); _connection = createConnection();
_connection.onStateChanged = (state) {
if (_connectionStateStreamController.isClosed) {
return;
}
_connectionStateStreamController.add(state);
};
_connected = true; _connected = true;
} }
return _connection; return _connection;
@ -97,4 +113,8 @@ abstract class ClientChannelBase implements ClientChannel {
}, onError: call.onConnectionError); }, onError: call.onConnectionError);
return call; return call;
} }
@override
Stream<ConnectionState> get onConnectionStateChanged =>
_connectionStateStreamController.stream;
} }

View File

@ -57,4 +57,10 @@ abstract class ClientConnection {
/// All open calls are terminated immediately, and no further calls may be /// All open calls are terminated immediately, and no further calls may be
/// made on this connection. /// made on this connection.
Future<void> terminate(); Future<void> terminate();
/// Set state change listener for this connection. The given callback will be
/// invoked when the state of this connection changes.
// no need for this to be public,
// but needed in the actual implementations
set onStateChanged(void Function(ConnectionState) cb);
} }

View File

@ -37,9 +37,8 @@ class ClientChannel extends ClientChannelBase {
: super(); : super();
@override @override
ClientConnection createConnection() { ClientConnection createConnection() =>
return Http2ClientConnection(host, port, options); Http2ClientConnection(host, port, options);
}
} }
class ClientTransportConnectorChannel extends ClientChannelBase { class ClientTransportConnectorChannel extends ClientChannelBase {
@ -50,8 +49,7 @@ class ClientTransportConnectorChannel extends ClientChannelBase {
{this.options = const ChannelOptions()}); {this.options = const ChannelOptions()});
@override @override
ClientConnection createConnection() { ClientConnection createConnection() =>
return Http2ClientConnection.fromClientTransportConnector( Http2ClientConnection.fromClientTransportConnector(
transportConnector, options); transportConnector, options);
}
} }

View File

@ -18,7 +18,6 @@ import 'dart:convert';
import 'dart:io'; import 'dart:io';
import 'package:http2/transport.dart'; import 'package:http2/transport.dart';
import 'package:meta/meta.dart';
import '../shared/codec.dart'; import '../shared/codec.dart';
import '../shared/timeout.dart'; import '../shared/timeout.dart';
@ -43,8 +42,8 @@ class Http2ClientConnection implements connection.ClientConnection {
connection.ConnectionState _state = ConnectionState.idle; connection.ConnectionState _state = ConnectionState.idle;
@visibleForTesting void Function(connection.ConnectionState)? onStateChanged;
void Function(Http2ClientConnection connection)? onStateChanged;
final _pendingCalls = <ClientCall>[]; final _pendingCalls = <ClientCall>[];
final ClientTransportConnector _transportConnector; final ClientTransportConnector _transportConnector;
@ -214,7 +213,7 @@ class Http2ClientConnection implements connection.ClientConnection {
void _setState(ConnectionState state) { void _setState(ConnectionState state) {
_state = state; _state = state;
onStateChanged?.call(this); onStateChanged?.call(state);
} }
void _handleIdleTimeout() { void _handleIdleTimeout() {

View File

@ -144,7 +144,7 @@ class XhrTransportStream implements GrpcTransportStream {
} }
} }
class XhrClientConnection extends ClientConnection { class XhrClientConnection implements ClientConnection {
final Uri uri; final Uri uri;
final _requests = <XhrTransportStream>{}; final _requests = <XhrTransportStream>{};
@ -217,6 +217,11 @@ class XhrClientConnection extends ClientConnection {
@override @override
Future<void> shutdown() async {} Future<void> shutdown() async {}
@override
set onStateChanged(void Function(ConnectionState) cb) {
// Do nothing.
}
} }
MapEntry<String, String>? _getContentTypeHeader(Map<String, String> metadata) { MapEntry<String, String>? _getContentTypeHeader(Map<String, String> metadata) {

View File

@ -1,7 +1,7 @@
name: grpc name: grpc
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework. description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
version: 3.0.2 version: 3.1.0-dev
repository: https://github.com/grpc/grpc-dart repository: https://github.com/grpc/grpc-dart

View File

@ -43,7 +43,9 @@ class FixedConnectionClientChannel extends ClientChannelBase {
final Http2ClientConnection clientConnection; final Http2ClientConnection clientConnection;
List<grpc.ConnectionState> states = <grpc.ConnectionState>[]; List<grpc.ConnectionState> states = <grpc.ConnectionState>[];
FixedConnectionClientChannel(this.clientConnection) { FixedConnectionClientChannel(this.clientConnection) {
clientConnection.onStateChanged = (c) => states.add(c.state); onConnectionStateChanged.listen((state) {
states.add(state);
});
} }
@override @override
ClientConnection createConnection() => clientConnection; ClientConnection createConnection() => clientConnection;
@ -89,8 +91,9 @@ Future<void> main() async {
server.port!, server.port!,
grpc.ChannelOptions(credentials: grpc.ChannelCredentials.insecure()))); grpc.ChannelOptions(credentials: grpc.ChannelCredentials.insecure())));
final states = <grpc.ConnectionState>[]; final states = <grpc.ConnectionState>[];
channel.clientConnection.onStateChanged = channel.onConnectionStateChanged.listen((state) {
(Http2ClientConnection connection) => states.add(connection.state); states.add(state);
});
final testClient = TestClient(channel); final testClient = TestClient(channel);
await Future.wait(<Future>[ await Future.wait(<Future>[

View File

@ -472,45 +472,59 @@ void main() {
); );
} }
test('Connection states are reported', () async {
final connectionStates = <ConnectionState>[];
harness.channel.onConnectionStateChanged.listen((state) {
connectionStates.add(state);
}, onDone: () {
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.shutdown
]);
});
await makeUnaryCall();
});
test('Connection errors are reported', () async { test('Connection errors are reported', () async {
final connectionStates = <ConnectionState>[]; final connectionStates = <ConnectionState>[];
harness.connection!.connectionError = 'Connection error'; harness.connection!.connectionError = 'Connection error';
harness.connection!.onStateChanged = (connection) { harness.channel.onConnectionStateChanged.listen((state) {
final state = connection.state;
connectionStates.add(state); connectionStates.add(state);
}; }, onDone: () {
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.idle]);
});
final expectedException = final expectedException =
GrpcError.unavailable('Error connecting: Connection error'); GrpcError.unavailable('Error connecting: Connection error');
await harness.expectThrows( await harness.expectThrows(
harness.client.unary(dummyValue), expectedException); harness.client.unary(dummyValue), expectedException);
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.idle]);
}); });
test('Connections time out if idle', () async { test('Connections time out if idle', () async {
final done = Completer(); final done = Completer();
final connectionStates = <ConnectionState>[]; final connectionStates = <ConnectionState>[];
harness.connection!.onStateChanged = (connection) { harness.channel.onConnectionStateChanged.listen((state) {
final state = connection.state;
connectionStates.add(state); connectionStates.add(state);
if (state == ConnectionState.idle) done.complete(); if (state == ConnectionState.idle) done.complete();
}; }, onDone: () async {
expect(connectionStates,
[ConnectionState.connecting, ConnectionState.ready]);
await done.future;
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.idle
]);
});
harness.channelOptions.idleTimeout = const Duration(microseconds: 10); harness.channelOptions.idleTimeout = const Duration(microseconds: 10);
await makeUnaryCall(); await makeUnaryCall();
harness.signalIdle(); harness.signalIdle();
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.ready]);
await done.future;
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.idle
]);
}); });
test('Default reconnect backoff backs off', () { test('Default reconnect backoff backs off', () {

View File

@ -351,43 +351,41 @@ void main() {
test('Connection errors are reported', () async { test('Connection errors are reported', () async {
final connectionStates = <ConnectionState>[]; final connectionStates = <ConnectionState>[];
harness.connection!.connectionError = 'Connection error';
harness.connection!.onStateChanged = (connection) {
final state = connection.state;
connectionStates.add(state);
};
final expectedException = final expectedException =
GrpcError.unavailable('Error connecting: Connection error'); GrpcError.unavailable('Error connecting: Connection error');
harness.connection!.connectionError = 'Connection error';
harness.channel.onConnectionStateChanged.listen((state) {
connectionStates.add(state);
}, onDone: () async {
await harness.expectThrows(
harness.client.unary(dummyValue), expectedException);
await harness.expectThrows( expect(
harness.client.unary(dummyValue), expectedException); connectionStates, [ConnectionState.connecting, ConnectionState.idle]);
});
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.idle]);
}); });
test('Connections time out if idle', () async { test('Connections time out if idle', () async {
final done = Completer(); final done = Completer();
final connectionStates = <ConnectionState>[]; final connectionStates = <ConnectionState>[];
harness.connection!.onStateChanged = (connection) { harness.channel.onConnectionStateChanged.listen((state) {
final state = connection.state;
connectionStates.add(state); connectionStates.add(state);
if (state == ConnectionState.idle) done.complete(); if (state == ConnectionState.idle) done.complete();
}; }, onDone: () async {
expect(connectionStates,
[ConnectionState.connecting, ConnectionState.ready]);
await done.future;
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.idle
]);
});
harness.channelOptions.idleTimeout = const Duration(microseconds: 10); harness.channelOptions.idleTimeout = const Duration(microseconds: 10);
await makeUnaryCall(); await makeUnaryCall();
harness.signalIdle(); harness.signalIdle();
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.ready]);
await done.future;
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.idle
]);
}); });
test('Default reconnect backoff backs off', () { test('Default reconnect backoff backs off', () {

View File

@ -54,7 +54,7 @@ class FixedConnectionClientChannel extends ClientChannelBase {
final Http2ClientConnection clientConnection; final Http2ClientConnection clientConnection;
List<ConnectionState> states = <ConnectionState>[]; List<ConnectionState> states = <ConnectionState>[];
FixedConnectionClientChannel(this.clientConnection) { FixedConnectionClientChannel(this.clientConnection) {
clientConnection.onStateChanged = (c) => states.add(c.state); onConnectionStateChanged.listen((state) => states.add(state));
} }
@override @override
ClientConnection createConnection() => clientConnection; ClientConnection createConnection() => clientConnection;

View File

@ -59,7 +59,7 @@ class FixedConnectionClientChannel extends ClientChannelBase {
final Http2ClientConnection clientConnection; final Http2ClientConnection clientConnection;
List<ConnectionState> states = <ConnectionState>[]; List<ConnectionState> states = <ConnectionState>[];
FixedConnectionClientChannel(this.clientConnection) { FixedConnectionClientChannel(this.clientConnection) {
clientConnection.onStateChanged = (c) => states.add(c.state); onConnectionStateChanged.listen((state) => states.add(state));
} }
@override @override
ClientConnection createConnection() => clientConnection; ClientConnection createConnection() => clientConnection;