mirror of https://github.com/grpc/grpc-dart.git
Keepalive (#634)
* Keepalive tests run! * Renaming * Some refactorings * Find a place where to handle the keepalive manager * Fix bug * Make KeepAliveManager independent of transport * Fix call sites in client * Add server keepalive handler * Wire through onDataReceived * Add ServerKeepAliveManager test * Refactorings * Tests kind of run now * Add shutdown test * Remove unneeded override * Remove unneeded mocks * Send correct error codes and cleanup * Small changes * Rename * Add documentation * Add test for !_enforcesMaxBadPings * Refactor tests * Switch to http2 master branch * Renaming * Null shutdownTimer * Refactor to event-state model * Smaller refactorings * Works now * Switch tests to isA * Shifting things around * Split into server and client * Format * rename * Tweaks * Switch order of optional parameters to make change non-breaking * Add some leeway to the durations in tests * Make keepalive tests vm only * Switch back to onEvent in state * Switch to published http2
This commit is contained in:
parent
aae487d12f
commit
03f07e9535
|
@ -5,6 +5,7 @@
|
|||
## 3.2.2
|
||||
|
||||
* Remove `base` qualifier on `ResponseStream`.
|
||||
* Add support for clients to send KEEPALIVE pings.
|
||||
|
||||
## 3.2.1
|
||||
|
||||
|
|
|
@ -0,0 +1,242 @@
|
|||
import 'dart:async';
|
||||
|
||||
import 'package:clock/clock.dart';
|
||||
import 'package:meta/meta.dart';
|
||||
|
||||
/// KeepAlive support for gRPC, see
|
||||
/// https://github.com/grpc/grpc/blob/master/doc/keepalive.md.
|
||||
|
||||
/// Options to configure a gRPC client for sending keepalive signals.
|
||||
class ClientKeepAliveOptions {
|
||||
/// How often a ping should be sent to keep the connection alive.
|
||||
///
|
||||
/// `GRPC_ARG_KEEPALIVE_TIME_MS` in the docs.
|
||||
final Duration? pingInterval;
|
||||
|
||||
/// How long the connection should wait before shutting down after no response
|
||||
/// to a ping.
|
||||
///
|
||||
/// `GRPC_ARG_KEEPALIVE_TIMEOUT_MS` in the docs.
|
||||
final Duration timeout;
|
||||
|
||||
/// If a connection with no active calls should be kept alive by sending
|
||||
/// pings.
|
||||
///
|
||||
/// `GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS` in the docs.
|
||||
final bool permitWithoutCalls;
|
||||
|
||||
const ClientKeepAliveOptions({
|
||||
this.pingInterval,
|
||||
this.timeout = const Duration(milliseconds: 20000),
|
||||
this.permitWithoutCalls = false,
|
||||
});
|
||||
|
||||
bool get shouldSendPings => pingInterval != null;
|
||||
}
|
||||
|
||||
sealed class KeepAliveState {
|
||||
KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager);
|
||||
|
||||
void disconnect();
|
||||
}
|
||||
|
||||
/// Transport has no active rpcs. We don't need to do any keepalives.
|
||||
final class Idle extends KeepAliveState {
|
||||
final Timer? pingTimer;
|
||||
final Stopwatch timeSinceFrame;
|
||||
|
||||
Idle([this.pingTimer, Stopwatch? stopwatch])
|
||||
: timeSinceFrame = stopwatch ?? clock.stopwatch()
|
||||
..start();
|
||||
|
||||
@override
|
||||
KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) {
|
||||
switch (event) {
|
||||
case KeepAliveEvent.onTransportActive:
|
||||
// When the transport goes active, we do not reset the nextKeepaliveTime.
|
||||
// This allows us to quickly check whether the connection is still
|
||||
// working.
|
||||
final timer = pingTimer ??
|
||||
Timer(manager._pingInterval - timeSinceFrame.elapsed,
|
||||
manager.sendPing);
|
||||
return PingScheduled(timer, timeSinceFrame);
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void disconnect() => pingTimer?.cancel();
|
||||
}
|
||||
|
||||
/// We have scheduled a ping to be sent in the future. We may decide to delay
|
||||
/// it if we receive some data.
|
||||
final class PingScheduled extends KeepAliveState {
|
||||
final Timer pingTimer;
|
||||
final Stopwatch timeSinceFrame;
|
||||
|
||||
PingScheduled(this.pingTimer, [Stopwatch? stopwatch])
|
||||
: timeSinceFrame = stopwatch ?? clock.stopwatch()
|
||||
..start();
|
||||
|
||||
@override
|
||||
KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) {
|
||||
switch (event) {
|
||||
case KeepAliveEvent.onFrameReceived:
|
||||
// We do not cancel the ping future here. This avoids constantly scheduling
|
||||
// and cancellation in a busy transport. Instead, we update the status here
|
||||
// and reschedule later. So we actually keep one sendPing task always in
|
||||
// flight when there're active rpcs.
|
||||
return PingDelayed(pingTimer);
|
||||
case KeepAliveEvent.onTransportIdle:
|
||||
return Idle(pingTimer, timeSinceFrame);
|
||||
case KeepAliveEvent.sendPing:
|
||||
// Schedule a shutdown. It fires if we don't receive the ping response
|
||||
// within the timeout.
|
||||
manager.ping();
|
||||
final shutdown = Timer(manager._options.timeout, manager._shutdown);
|
||||
return ShutdownScheduled(shutdown, false);
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void disconnect() => pingTimer.cancel();
|
||||
}
|
||||
|
||||
/// We need to delay the scheduled keepalive ping.
|
||||
final class PingDelayed extends KeepAliveState {
|
||||
final Timer pingTimer;
|
||||
final Stopwatch timeSinceFrame;
|
||||
|
||||
PingDelayed(this.pingTimer) : timeSinceFrame = clock.stopwatch()..start();
|
||||
|
||||
@override
|
||||
KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) {
|
||||
switch (event) {
|
||||
case KeepAliveEvent.onTransportIdle:
|
||||
return Idle(pingTimer, timeSinceFrame);
|
||||
case KeepAliveEvent.sendPing:
|
||||
final pingTimer = Timer(
|
||||
manager._pingInterval - timeSinceFrame.elapsed,
|
||||
manager.sendPing,
|
||||
);
|
||||
return PingScheduled(pingTimer, timeSinceFrame);
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void disconnect() => pingTimer.cancel();
|
||||
}
|
||||
|
||||
/// The ping has been sent out. Waiting for a ping response.
|
||||
final class ShutdownScheduled extends KeepAliveState {
|
||||
final bool isIdle;
|
||||
final Timer shutdownTimer;
|
||||
|
||||
ShutdownScheduled(this.shutdownTimer, this.isIdle);
|
||||
|
||||
@override
|
||||
KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) {
|
||||
switch (event) {
|
||||
case KeepAliveEvent.onFrameReceived:
|
||||
// Ping acked or effectively ping acked. Cancel shutdown, and then if not
|
||||
// idle, schedule a new keep-alive ping.
|
||||
shutdownTimer.cancel();
|
||||
// schedule a new ping
|
||||
return isIdle
|
||||
? Idle()
|
||||
: PingScheduled(Timer(manager._pingInterval, manager.sendPing));
|
||||
case KeepAliveEvent.onTransportIdle:
|
||||
return ShutdownScheduled(shutdownTimer, true);
|
||||
case KeepAliveEvent.onTransportActive:
|
||||
return ShutdownScheduled(shutdownTimer, false);
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void disconnect() => shutdownTimer.cancel();
|
||||
}
|
||||
|
||||
final class Disconnected extends KeepAliveState {
|
||||
@override
|
||||
void disconnect() {}
|
||||
|
||||
@override
|
||||
KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) =>
|
||||
null;
|
||||
}
|
||||
|
||||
enum KeepAliveEvent {
|
||||
onTransportActive,
|
||||
onFrameReceived,
|
||||
onTransportIdle,
|
||||
sendPing,
|
||||
}
|
||||
|
||||
/// A keep alive "manager", deciding when to send pings or shutdown based on the
|
||||
/// [ClientKeepAliveOptions].
|
||||
class ClientKeepAlive {
|
||||
@visibleForTesting
|
||||
KeepAliveState state = Idle();
|
||||
|
||||
final void Function() onPingTimeout;
|
||||
final void Function() ping;
|
||||
|
||||
final ClientKeepAliveOptions _options;
|
||||
Duration get _pingInterval => _options.pingInterval ?? Duration(days: 365);
|
||||
|
||||
ClientKeepAlive({
|
||||
required ClientKeepAliveOptions options,
|
||||
required this.ping,
|
||||
required this.onPingTimeout,
|
||||
}) : _options = options;
|
||||
|
||||
void onTransportStarted() {
|
||||
if (_options.permitWithoutCalls) {
|
||||
onTransportActive();
|
||||
}
|
||||
}
|
||||
|
||||
/// If we receive any kind of frame from the server, that means the connection
|
||||
/// is still open, so we reset the ping timer.
|
||||
void onFrameReceived() => _setState(KeepAliveEvent.onFrameReceived);
|
||||
|
||||
void sendPing() => _setState(KeepAliveEvent.sendPing);
|
||||
|
||||
/// When the transport becomes active, we start sending pings every
|
||||
/// [_pingInterval].
|
||||
void onTransportActive() => _setState(KeepAliveEvent.onTransportActive);
|
||||
|
||||
/// If the transport has become idle and [_options.permitWithoutCalls] is
|
||||
/// set, nothing changes, we still send pings and shutdown on no response.
|
||||
///
|
||||
/// Otherwise, we stop sending pings.
|
||||
void onTransportIdle() {
|
||||
if (!_options.permitWithoutCalls) {
|
||||
_setState(KeepAliveEvent.onTransportIdle);
|
||||
}
|
||||
}
|
||||
|
||||
void onTransportTermination() => _disconnect();
|
||||
|
||||
void _shutdown() {
|
||||
onPingTimeout();
|
||||
_disconnect();
|
||||
}
|
||||
|
||||
void _disconnect() {
|
||||
state.disconnect();
|
||||
state = Disconnected();
|
||||
}
|
||||
|
||||
void _setState(KeepAliveEvent event) {
|
||||
final newState = state.onEvent(event, this);
|
||||
if (newState != null) state = newState;
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import 'package:http2/transport.dart';
|
|||
import '../shared/codec.dart';
|
||||
import '../shared/timeout.dart';
|
||||
import 'call.dart';
|
||||
import 'client_keepalive.dart';
|
||||
import 'client_transport_connector.dart';
|
||||
import 'connection.dart' hide ClientConnection;
|
||||
import 'connection.dart' as connection;
|
||||
|
@ -57,6 +58,8 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
|
||||
Duration? _currentReconnectDelay;
|
||||
|
||||
ClientKeepAlive? keepAliveManager;
|
||||
|
||||
Http2ClientConnection(Object host, int port, this.options)
|
||||
: _transportConnector = _SocketTransportConnector(host, port, options);
|
||||
|
||||
|
@ -100,6 +103,15 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
connectTransport().then<void>((transport) async {
|
||||
_currentReconnectDelay = null;
|
||||
_transportConnection = transport;
|
||||
if (options.keepAlive.shouldSendPings) {
|
||||
keepAliveManager = ClientKeepAlive(
|
||||
options: options.keepAlive,
|
||||
ping: () => transport.ping(),
|
||||
onPingTimeout: () => shutdown(),
|
||||
);
|
||||
transport.onFrameReceived
|
||||
.listen((_) => keepAliveManager?.onFrameReceived());
|
||||
}
|
||||
_connectionLifeTimer
|
||||
..reset()
|
||||
..start();
|
||||
|
@ -125,6 +137,7 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
_connectionLifeTimer.elapsed > options.connectionTimeout;
|
||||
if (shouldRefresh) {
|
||||
_transportConnection!.finish();
|
||||
keepAliveManager?.onTransportTermination();
|
||||
}
|
||||
if (!isHealthy || shouldRefresh) {
|
||||
_abandonConnection();
|
||||
|
@ -196,12 +209,14 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
if (_state == ConnectionState.shutdown) return;
|
||||
_setShutdownState();
|
||||
await _transportConnection?.finish();
|
||||
keepAliveManager?.onTransportTermination();
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> terminate() async {
|
||||
_setShutdownState();
|
||||
await _transportConnection?.terminate();
|
||||
keepAliveManager?.onTransportTermination();
|
||||
}
|
||||
|
||||
void _setShutdownState() {
|
||||
|
@ -222,7 +237,8 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
_transportConnection
|
||||
?.finish()
|
||||
.catchError((_) {}); // TODO(jakobr): Log error.
|
||||
_transportConnection = null;
|
||||
keepAliveManager?.onTransportTermination();
|
||||
_disconnect();
|
||||
_setState(ConnectionState.idle);
|
||||
}
|
||||
|
||||
|
@ -234,10 +250,12 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
void _handleActiveStateChanged(bool isActive) {
|
||||
if (isActive) {
|
||||
_cancelTimer();
|
||||
keepAliveManager?.onTransportActive();
|
||||
} else {
|
||||
if (options.idleTimeout != null) {
|
||||
_timer ??= Timer(options.idleTimeout!, _handleIdleTimeout);
|
||||
}
|
||||
keepAliveManager?.onTransportIdle();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -248,7 +266,7 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
}
|
||||
|
||||
void _handleConnectionFailure(error) {
|
||||
_transportConnection = null;
|
||||
_disconnect();
|
||||
if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) {
|
||||
return;
|
||||
}
|
||||
|
@ -258,6 +276,7 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
_failCall(call, error);
|
||||
}
|
||||
_pendingCalls.clear();
|
||||
keepAliveManager?.onTransportIdle();
|
||||
_setState(ConnectionState.idle);
|
||||
}
|
||||
|
||||
|
@ -267,11 +286,17 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
_connect();
|
||||
}
|
||||
|
||||
void _disconnect() {
|
||||
_transportConnection = null;
|
||||
keepAliveManager?.onTransportTermination();
|
||||
keepAliveManager = null;
|
||||
}
|
||||
|
||||
void _abandonConnection() {
|
||||
_cancelTimer();
|
||||
_transportConnection = null;
|
||||
_disconnect();
|
||||
|
||||
if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) {
|
||||
if (_state == ConnectionState.idle || _state == ConnectionState.shutdown) {
|
||||
// All good.
|
||||
return;
|
||||
}
|
||||
|
@ -279,6 +304,7 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
// We were not planning to close the socket.
|
||||
if (!_hasPendingCalls()) {
|
||||
// No pending calls. Just hop to idle, and wait for a new RPC.
|
||||
keepAliveManager?.onTransportIdle();
|
||||
_setState(ConnectionState.idle);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import 'dart:math';
|
||||
|
||||
import '../shared/codec_registry.dart';
|
||||
import 'client_keepalive.dart';
|
||||
import 'transport/http2_credentials.dart';
|
||||
|
||||
const defaultIdleTimeout = Duration(minutes: 5);
|
||||
|
@ -57,6 +58,7 @@ class ChannelOptions {
|
|||
final Duration? connectTimeout;
|
||||
final BackoffStrategy backoffStrategy;
|
||||
final String userAgent;
|
||||
final ClientKeepAliveOptions keepAlive;
|
||||
|
||||
const ChannelOptions({
|
||||
this.credentials = const ChannelCredentials.secure(),
|
||||
|
@ -66,5 +68,6 @@ class ChannelOptions {
|
|||
this.connectTimeout,
|
||||
this.connectionTimeout = defaultConnectionTimeOut,
|
||||
this.codecRegistry,
|
||||
this.keepAlive = const ClientKeepAliveOptions(),
|
||||
});
|
||||
}
|
||||
|
|
|
@ -67,6 +67,9 @@ class ServerHandler extends ServiceCall {
|
|||
final X509Certificate? _clientCertificate;
|
||||
final InternetAddress? _remoteAddress;
|
||||
|
||||
/// Emits a ping everytime data is received
|
||||
final Sink<void>? onDataReceived;
|
||||
|
||||
ServerHandler({
|
||||
required ServerTransportStream stream,
|
||||
required ServiceLookup serviceLookup,
|
||||
|
@ -75,6 +78,7 @@ class ServerHandler extends ServiceCall {
|
|||
X509Certificate? clientCertificate,
|
||||
InternetAddress? remoteAddress,
|
||||
GrpcErrorHandler? errorHandler,
|
||||
this.onDataReceived,
|
||||
}) : _stream = stream,
|
||||
_serviceLookup = serviceLookup,
|
||||
_interceptors = interceptors,
|
||||
|
@ -135,6 +139,7 @@ class ServerHandler extends ServiceCall {
|
|||
// -- Idle state, incoming data --
|
||||
|
||||
void _onDataIdle(GrpcMessage headerMessage) async {
|
||||
onDataReceived?.add(null);
|
||||
if (headerMessage is! GrpcMetadata) {
|
||||
_sendError(GrpcError.unimplemented('Expected header frame'));
|
||||
_sinkIncoming();
|
||||
|
@ -275,6 +280,7 @@ class ServerHandler extends ServiceCall {
|
|||
return;
|
||||
}
|
||||
|
||||
onDataReceived?.add(null);
|
||||
final data = message;
|
||||
Object? request;
|
||||
try {
|
||||
|
|
|
@ -24,6 +24,7 @@ import '../shared/io_bits/io_bits.dart' as io_bits;
|
|||
import '../shared/security.dart';
|
||||
import 'handler.dart';
|
||||
import 'interceptor.dart';
|
||||
import 'server_keepalive.dart';
|
||||
import 'service.dart';
|
||||
|
||||
/// Wrapper around grpc_server_credentials, a way to authenticate a server.
|
||||
|
@ -88,6 +89,8 @@ class ConnectionServer {
|
|||
final List<Interceptor> _interceptors;
|
||||
final CodecRegistry? _codecRegistry;
|
||||
final GrpcErrorHandler? _errorHandler;
|
||||
final ServerKeepAliveOptions _keepAliveOptions;
|
||||
final List<ServerHandler> _handlers = [];
|
||||
|
||||
final _connections = <ServerTransportConnection>[];
|
||||
|
||||
|
@ -97,6 +100,7 @@ class ConnectionServer {
|
|||
List<Interceptor> interceptors = const <Interceptor>[],
|
||||
CodecRegistry? codecRegistry,
|
||||
GrpcErrorHandler? errorHandler,
|
||||
this._keepAliveOptions = const ServerKeepAliveOptions(),
|
||||
]) : _codecRegistry = codecRegistry,
|
||||
_interceptors = interceptors,
|
||||
_errorHandler = errorHandler {
|
||||
|
@ -113,26 +117,37 @@ class ConnectionServer {
|
|||
InternetAddress? remoteAddress,
|
||||
}) async {
|
||||
_connections.add(connection);
|
||||
ServerHandler? handler;
|
||||
// TODO(jakobr): Set active state handlers, close connection after idle
|
||||
// timeout.
|
||||
final onDataReceivedController = StreamController<void>();
|
||||
ServerKeepAlive(
|
||||
options: _keepAliveOptions,
|
||||
tooManyBadPings: () async =>
|
||||
await connection.terminate(ErrorCode.ENHANCE_YOUR_CALM),
|
||||
pingNotifier: connection.onPingReceived,
|
||||
dataNotifier: onDataReceivedController.stream,
|
||||
).handle();
|
||||
connection.incomingStreams.listen((stream) {
|
||||
handler = serveStream_(
|
||||
_handlers.add(serveStream_(
|
||||
stream: stream,
|
||||
clientCertificate: clientCertificate,
|
||||
remoteAddress: remoteAddress,
|
||||
);
|
||||
onDataReceived: onDataReceivedController.sink,
|
||||
));
|
||||
}, onError: (error, stackTrace) {
|
||||
if (error is Error) {
|
||||
Zone.current.handleUncaughtError(error, stackTrace);
|
||||
}
|
||||
}, onDone: () {
|
||||
}, onDone: () async {
|
||||
// TODO(sigurdm): This is not correct behavior in the presence of
|
||||
// half-closed tcp streams.
|
||||
// Half-closed streams seems to not be fully supported by package:http2.
|
||||
// https://github.com/dart-lang/http2/issues/42
|
||||
handler?.cancel();
|
||||
for (var handler in _handlers) {
|
||||
handler.cancel();
|
||||
}
|
||||
_connections.remove(connection);
|
||||
await onDataReceivedController.close();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -141,18 +156,20 @@ class ConnectionServer {
|
|||
required ServerTransportStream stream,
|
||||
X509Certificate? clientCertificate,
|
||||
InternetAddress? remoteAddress,
|
||||
Sink<void>? onDataReceived,
|
||||
}) {
|
||||
return ServerHandler(
|
||||
stream: stream,
|
||||
serviceLookup: lookupService,
|
||||
interceptors: _interceptors,
|
||||
codecRegistry: _codecRegistry,
|
||||
// ignore: unnecessary_cast
|
||||
clientCertificate: clientCertificate as io_bits.X509Certificate?,
|
||||
// ignore: unnecessary_cast
|
||||
remoteAddress: remoteAddress as io_bits.InternetAddress?,
|
||||
errorHandler: _errorHandler,
|
||||
)..handle();
|
||||
stream: stream,
|
||||
serviceLookup: lookupService,
|
||||
interceptors: _interceptors,
|
||||
codecRegistry: _codecRegistry,
|
||||
// ignore: unnecessary_cast
|
||||
clientCertificate: clientCertificate as io_bits.X509Certificate?,
|
||||
// ignore: unnecessary_cast
|
||||
remoteAddress: remoteAddress as io_bits.InternetAddress?,
|
||||
errorHandler: _errorHandler,
|
||||
onDataReceived: onDataReceived)
|
||||
..handle();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,15 +187,23 @@ class Server extends ConnectionServer {
|
|||
super.interceptors,
|
||||
super.codecRegistry,
|
||||
super.errorHandler,
|
||||
super.keepAlive,
|
||||
]);
|
||||
|
||||
/// Create a server for the given [services].
|
||||
Server.create({
|
||||
required List<Service> services,
|
||||
ServerKeepAliveOptions keepAliveOptions = const ServerKeepAliveOptions(),
|
||||
List<Interceptor> interceptors = const <Interceptor>[],
|
||||
CodecRegistry? codecRegistry,
|
||||
GrpcErrorHandler? errorHandler,
|
||||
}) : super(services, interceptors, codecRegistry, errorHandler);
|
||||
}) : super(
|
||||
services,
|
||||
interceptors,
|
||||
codecRegistry,
|
||||
errorHandler,
|
||||
keepAliveOptions,
|
||||
);
|
||||
|
||||
/// The port that the server is listening on, or `null` if the server is not
|
||||
/// active.
|
||||
|
@ -266,6 +291,7 @@ class Server extends ConnectionServer {
|
|||
required ServerTransportStream stream,
|
||||
X509Certificate? clientCertificate,
|
||||
InternetAddress? remoteAddress,
|
||||
Sink<void>? onDataReceived,
|
||||
}) {
|
||||
return ServerHandler(
|
||||
stream: stream,
|
||||
|
@ -277,6 +303,7 @@ class Server extends ConnectionServer {
|
|||
// ignore: unnecessary_cast
|
||||
remoteAddress: remoteAddress as io_bits.InternetAddress?,
|
||||
errorHandler: _errorHandler,
|
||||
onDataReceived: onDataReceived,
|
||||
)..handle();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
import 'package:clock/clock.dart';
|
||||
|
||||
/// Options to configure a gRPC server for receiving keepalive signals.
|
||||
class ServerKeepAliveOptions {
|
||||
/// The maximum number of bad pings that the server will tolerate before
|
||||
/// sending an HTTP2 GOAWAY frame and closing the transport.
|
||||
///
|
||||
/// `GRPC_ARG_HTTP2_MAX_PING_STRIKES` in the docs.
|
||||
final int? maxBadPings;
|
||||
|
||||
/// The minimum time that is expected between receiving successive pings.
|
||||
///
|
||||
/// `GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS` in the docs.
|
||||
final Duration minIntervalBetweenPingsWithoutData;
|
||||
|
||||
const ServerKeepAliveOptions({
|
||||
this.minIntervalBetweenPingsWithoutData =
|
||||
const Duration(milliseconds: 300000),
|
||||
this.maxBadPings = 2,
|
||||
});
|
||||
}
|
||||
|
||||
/// A keep alive "manager", deciding what do to when receiving pings from a
|
||||
/// client trying to keep the connection alive, based on the set
|
||||
/// [ServerKeepAliveOptions].
|
||||
class ServerKeepAlive {
|
||||
/// What to do after receiving too many bad pings, probably shut down the
|
||||
/// connection to not be DDoSed.
|
||||
final Future<void> Function()? tooManyBadPings;
|
||||
|
||||
final ServerKeepAliveOptions options;
|
||||
|
||||
/// A stream of events for every time the server gets pinged.
|
||||
final Stream<void> pingNotifier;
|
||||
|
||||
/// A stream of events for every time the server receives data.
|
||||
final Stream<void> dataNotifier;
|
||||
|
||||
int _badPings = 0;
|
||||
Stopwatch? _timeOfLastReceivedPing;
|
||||
|
||||
ServerKeepAlive({
|
||||
this.tooManyBadPings,
|
||||
required this.options,
|
||||
required this.pingNotifier,
|
||||
required this.dataNotifier,
|
||||
});
|
||||
|
||||
void handle() {
|
||||
// If we don't care about bad pings, there is not point in listening to
|
||||
// events.
|
||||
if (_enforcesMaxBadPings) {
|
||||
pingNotifier.listen((_) => _onPingReceived());
|
||||
dataNotifier.listen((_) => _onDataReceived());
|
||||
}
|
||||
}
|
||||
|
||||
bool get _enforcesMaxBadPings => (options.maxBadPings ?? 0) > 0;
|
||||
|
||||
Future<void> _onPingReceived() async {
|
||||
if (_enforcesMaxBadPings) {
|
||||
if (_timeOfLastReceivedPing == null) {
|
||||
_timeOfLastReceivedPing = clock.stopwatch()
|
||||
..reset()
|
||||
..start();
|
||||
} else if (_timeOfLastReceivedPing!.elapsed >
|
||||
options.minIntervalBetweenPingsWithoutData) {
|
||||
_badPings++;
|
||||
}
|
||||
if (_badPings > options.maxBadPings!) {
|
||||
await tooManyBadPings?.call();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _onDataReceived() {
|
||||
if (_enforcesMaxBadPings) {
|
||||
_badPings = 0;
|
||||
_timeOfLastReceivedPing = null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,5 @@
|
|||
name: grpc
|
||||
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
|
||||
|
||||
version: 3.2.3-wip
|
||||
|
||||
repository: https://github.com/grpc/grpc-dart
|
||||
|
@ -16,8 +15,9 @@ dependencies:
|
|||
googleapis_auth: ^1.1.0
|
||||
meta: ^1.3.0
|
||||
http: '>=0.13.0 <2.0.0'
|
||||
http2: ^2.0.0
|
||||
http2: ^2.2.0
|
||||
protobuf: '>=2.0.0 <4.0.0'
|
||||
clock: ^1.1.1
|
||||
|
||||
dev_dependencies:
|
||||
build_runner: ^2.0.0
|
||||
|
@ -29,6 +29,7 @@ dev_dependencies:
|
|||
stream_channel: ^2.1.0
|
||||
stream_transform: ^2.0.0
|
||||
vm_service: ^11.6.0
|
||||
fake_async: ^1.3.1
|
||||
|
||||
false_secrets:
|
||||
- interop/server1.key
|
||||
|
|
|
@ -0,0 +1,270 @@
|
|||
import 'package:fake_async/fake_async.dart';
|
||||
import 'package:grpc/src/client/client_keepalive.dart';
|
||||
import 'package:mockito/annotations.dart';
|
||||
import 'package:mockito/mockito.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
import 'client_keepalive_manager_test.mocks.dart';
|
||||
|
||||
@GenerateNiceMocks([MockSpec<Pinger>()])
|
||||
abstract class Pinger {
|
||||
void ping();
|
||||
void onPingTimeout();
|
||||
}
|
||||
|
||||
void main() {
|
||||
late ClientKeepAlive keepAliveManager;
|
||||
|
||||
final pinger = MockPinger();
|
||||
|
||||
var transportOpen = true;
|
||||
|
||||
final shortTime = Duration(milliseconds: 150);
|
||||
|
||||
final epsilon = Duration(milliseconds: 50);
|
||||
|
||||
final timeout = Duration(milliseconds: 2000);
|
||||
|
||||
/// Add some epsilon to make sure tests pass
|
||||
final timeoutLeeway = timeout + epsilon;
|
||||
|
||||
final pingInterval = Duration(milliseconds: 1000);
|
||||
|
||||
/// Add some epsilon to make sure tests pass
|
||||
final pingIntervalLeeway = pingInterval + epsilon;
|
||||
|
||||
void initKeepAliveManager([ClientKeepAliveOptions? opt]) {
|
||||
reset(pinger);
|
||||
final options = opt ??
|
||||
ClientKeepAliveOptions(
|
||||
pingInterval: pingInterval,
|
||||
timeout: timeout,
|
||||
permitWithoutCalls: false,
|
||||
);
|
||||
|
||||
when(pinger.ping()).thenAnswer((_) async => transportOpen = true);
|
||||
when(pinger.onPingTimeout()).thenAnswer((_) async => transportOpen = false);
|
||||
|
||||
keepAliveManager = ClientKeepAlive(
|
||||
options: options,
|
||||
ping: pinger.ping,
|
||||
onPingTimeout: pinger.onPingTimeout,
|
||||
);
|
||||
transportOpen = true;
|
||||
}
|
||||
|
||||
setUp(() => initKeepAliveManager());
|
||||
test('sendKeepAlivePings', () {
|
||||
FakeAsync().run((async) {
|
||||
// Transport becomes active. We should schedule keepalive pings.
|
||||
keepAliveManager.onTransportActive();
|
||||
async.elapse(pingIntervalLeeway);
|
||||
// Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled.
|
||||
verify(pinger.ping()).called(1);
|
||||
// Ping succeeds. Reschedule another ping.
|
||||
|
||||
async.elapse(shortTime);
|
||||
keepAliveManager.onFrameReceived();
|
||||
// Shutdown task has been cancelled.
|
||||
// Next ping should be exactly 1000 milliseconds later.
|
||||
async.elapse(pingIntervalLeeway);
|
||||
verify(pinger.ping()).called(1);
|
||||
});
|
||||
});
|
||||
|
||||
test('keepAlivePingDelayedByIncomingData', () {
|
||||
FakeAsync().run((async) {
|
||||
// Transport becomes active. We should schedule keepalive pings.
|
||||
keepAliveManager.onTransportActive();
|
||||
|
||||
// We receive some data. We may need to delay the ping.
|
||||
async.elapse(pingInterval - shortTime);
|
||||
keepAliveManager.onFrameReceived();
|
||||
async.elapse(shortTime + epsilon);
|
||||
|
||||
// We didn't send the ping.
|
||||
verifyNever(pinger.ping());
|
||||
});
|
||||
});
|
||||
|
||||
test('clienttransport.ping()_pingTimeout', () {
|
||||
FakeAsync().run((async) {
|
||||
pinger.onPingTimeout();
|
||||
expect(transportOpen, false);
|
||||
});
|
||||
});
|
||||
|
||||
test('onTransportTerminationCancelsShutdownFuture', () {
|
||||
FakeAsync().run((async) {
|
||||
// Transport becomes active. We should schedule keepalive pings.
|
||||
keepAliveManager.onTransportActive();
|
||||
async.elapse(pingIntervalLeeway);
|
||||
// Shutdown task has become active.
|
||||
expect(keepAliveManager.state, isA<ShutdownScheduled>());
|
||||
|
||||
keepAliveManager.onTransportTermination();
|
||||
|
||||
// Shutdown task has been cancelled.
|
||||
expect(keepAliveManager.state, isA<Disconnected>());
|
||||
});
|
||||
});
|
||||
test('keepAlivePingTimesOut', () {
|
||||
FakeAsync().run((async) {
|
||||
// Transport becomes active. We should schedule keepalive pings.
|
||||
keepAliveManager.onTransportActive();
|
||||
expect(keepAliveManager.state, isA<PingScheduled>());
|
||||
|
||||
// Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled.
|
||||
async.elapse(pingIntervalLeeway);
|
||||
verify(pinger.ping()).called(1);
|
||||
expect(keepAliveManager.state, isA<ShutdownScheduled>());
|
||||
|
||||
// We do not receive the ping response. Shutdown runnable runs.
|
||||
async.elapse(timeoutLeeway);
|
||||
verify(pinger.onPingTimeout()).called(1);
|
||||
expect(keepAliveManager.state, isA<Disconnected>());
|
||||
|
||||
// We receive the ping response too late.
|
||||
keepAliveManager.onFrameReceived();
|
||||
// No more ping should be scheduled.
|
||||
expect(keepAliveManager.state, isA<Disconnected>());
|
||||
});
|
||||
});
|
||||
test('transportGoesIdle', () {
|
||||
FakeAsync().run((async) async {
|
||||
// Transport becomes active. We should schedule keepalive pings.
|
||||
keepAliveManager.onTransportActive();
|
||||
expect(keepAliveManager.state, isA<PingScheduled>());
|
||||
|
||||
// Transport becomes idle. Nothing should happen when ping runnable runs.
|
||||
keepAliveManager.onTransportIdle();
|
||||
expect(keepAliveManager.state, isA<Idle>());
|
||||
async.elapse(pingIntervalLeeway);
|
||||
// Ping was not sent.
|
||||
verifyNever(pinger.ping());
|
||||
// No new ping got scheduled.
|
||||
expect(keepAliveManager.state, isA<Idle>());
|
||||
|
||||
// But when transport goes back to active
|
||||
keepAliveManager.onTransportActive();
|
||||
expect(keepAliveManager.state, isA<PingScheduled>());
|
||||
// Ping is now sent.
|
||||
async.elapse(pingIntervalLeeway);
|
||||
verify(pinger.ping()).called(1);
|
||||
expect(keepAliveManager.state, isA<ShutdownScheduled>());
|
||||
});
|
||||
});
|
||||
test('transportGoesIdle_doesntCauseIdleWhenEnabled', () {
|
||||
FakeAsync().run((async) {
|
||||
keepAliveManager.onTransportTermination();
|
||||
initKeepAliveManager(ClientKeepAliveOptions(
|
||||
pingInterval: pingInterval,
|
||||
timeout: timeout,
|
||||
permitWithoutCalls: true,
|
||||
));
|
||||
keepAliveManager.onTransportStarted();
|
||||
|
||||
// Keepalive scheduling should have started immediately.
|
||||
expect(keepAliveManager.state, isA<PingScheduled>());
|
||||
|
||||
keepAliveManager.onTransportActive();
|
||||
|
||||
// Transport becomes idle. Should not impact the sending of the ping.
|
||||
keepAliveManager.onTransportIdle();
|
||||
async.elapse(pingIntervalLeeway);
|
||||
// Ping was sent.
|
||||
verify(pinger.ping()).called(1);
|
||||
// Shutdown is scheduled.
|
||||
expect(keepAliveManager.state, isA<ShutdownScheduled>());
|
||||
// Shutdown is triggered.
|
||||
async.elapse(timeoutLeeway);
|
||||
expect(keepAliveManager.state, isA<Disconnected>());
|
||||
verify(pinger.onPingTimeout()).called(1);
|
||||
});
|
||||
});
|
||||
test('transportGoesIdleAfterPingSent', () {
|
||||
FakeAsync().run((async) {
|
||||
// Transport becomes active. We should schedule keepalive pings.
|
||||
keepAliveManager.onTransportActive();
|
||||
|
||||
// Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled.
|
||||
async.elapse(pingIntervalLeeway);
|
||||
verify(pinger.ping()).called(1);
|
||||
expect(keepAliveManager.state, isA<ShutdownScheduled>());
|
||||
|
||||
// Transport becomes idle. No more ping should be scheduled after we receive a ping response.
|
||||
keepAliveManager.onTransportIdle();
|
||||
async.elapse(shortTime);
|
||||
keepAliveManager.onFrameReceived();
|
||||
expect(keepAliveManager.state, isA<Idle>());
|
||||
// Transport becomes active again. Another ping is scheduled.
|
||||
keepAliveManager.onTransportActive();
|
||||
expect(keepAliveManager.state, isA<PingScheduled>());
|
||||
});
|
||||
});
|
||||
test('transportGoesIdleBeforePingSent', () {
|
||||
FakeAsync().run((async) {
|
||||
// Transport becomes active. We should schedule keepalive pings.
|
||||
keepAliveManager.onTransportActive();
|
||||
final pingFuture = (keepAliveManager.state as PingScheduled).pingTimer;
|
||||
expect(pingFuture, isNotNull);
|
||||
|
||||
// Data is received, and we go to ping delayed
|
||||
keepAliveManager.onFrameReceived();
|
||||
|
||||
// Transport becomes idle while the 1st ping is still scheduled
|
||||
keepAliveManager.onTransportIdle();
|
||||
|
||||
// Transport becomes active again, we don't need to reschedule another ping
|
||||
keepAliveManager.onTransportActive();
|
||||
expect((keepAliveManager.state as PingScheduled).pingTimer, pingFuture);
|
||||
});
|
||||
});
|
||||
test('transportShutsdownAfterPingScheduled', () {
|
||||
FakeAsync().run((async) {
|
||||
// Ping will be scheduled.
|
||||
keepAliveManager.onTransportActive();
|
||||
expect(keepAliveManager.state, isA<PingScheduled>());
|
||||
// Transport is shutting down.
|
||||
keepAliveManager.onTransportTermination();
|
||||
// Ping future should have been cancelled.
|
||||
expect(keepAliveManager.state, isA<Disconnected>());
|
||||
});
|
||||
});
|
||||
test('transportShutsdownAfterPingSent', () {
|
||||
FakeAsync().run((async) {
|
||||
keepAliveManager.onTransportActive();
|
||||
// Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled.
|
||||
async.elapse(pingIntervalLeeway);
|
||||
verify(pinger.ping()).called(1);
|
||||
expect(keepAliveManager.state, isA<ShutdownScheduled>());
|
||||
|
||||
// Transport is shutting down.
|
||||
keepAliveManager.onTransportTermination();
|
||||
// Shutdown task has been cancelled.
|
||||
expect(keepAliveManager.state, isA<Disconnected>());
|
||||
});
|
||||
});
|
||||
test('pingSentThenIdleThenActiveThenAck', () {
|
||||
FakeAsync().run((async) {
|
||||
keepAliveManager.onTransportActive();
|
||||
// Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled.
|
||||
async.elapse(pingIntervalLeeway);
|
||||
verify(pinger.ping()).called(1);
|
||||
|
||||
// shutdown scheduled
|
||||
expect(keepAliveManager.state, isA<ShutdownScheduled>());
|
||||
|
||||
keepAliveManager.onTransportIdle();
|
||||
|
||||
keepAliveManager.onTransportActive();
|
||||
|
||||
keepAliveManager.onFrameReceived();
|
||||
|
||||
// another ping scheduled
|
||||
expect(keepAliveManager.state, isA<PingScheduled>());
|
||||
async.elapse(pingIntervalLeeway);
|
||||
verify(pinger.ping()).called(1);
|
||||
});
|
||||
});
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
// Mocks generated by Mockito 5.4.1 from annotations
|
||||
// in grpc/test/client_tests/client_keepalive_manager_test.dart.
|
||||
// Do not manually edit this file.
|
||||
|
||||
// @dart=2.19
|
||||
|
||||
// ignore_for_file: no_leading_underscores_for_library_prefixes
|
||||
import 'package:mockito/mockito.dart' as _i1;
|
||||
|
||||
import 'client_keepalive_manager_test.dart' as _i2;
|
||||
|
||||
// ignore_for_file: type=lint
|
||||
// ignore_for_file: avoid_redundant_argument_values
|
||||
// ignore_for_file: avoid_setters_without_getters
|
||||
// ignore_for_file: comment_references
|
||||
// ignore_for_file: implementation_imports
|
||||
// ignore_for_file: invalid_use_of_visible_for_testing_member
|
||||
// ignore_for_file: prefer_const_constructors
|
||||
// ignore_for_file: unnecessary_parenthesis
|
||||
// ignore_for_file: camel_case_types
|
||||
// ignore_for_file: subtype_of_sealed_class
|
||||
|
||||
/// A class which mocks [Pinger].
|
||||
///
|
||||
/// See the documentation for Mockito's code generation for more information.
|
||||
class MockPinger extends _i1.Mock implements _i2.Pinger {
|
||||
@override
|
||||
void ping() => super.noSuchMethod(
|
||||
Invocation.method(
|
||||
#ping,
|
||||
[],
|
||||
),
|
||||
returnValueForMissingStub: null,
|
||||
);
|
||||
@override
|
||||
void onPingTimeout() => super.noSuchMethod(
|
||||
Invocation.method(
|
||||
#onPingTimeout,
|
||||
[],
|
||||
),
|
||||
returnValueForMissingStub: null,
|
||||
);
|
||||
}
|
|
@ -0,0 +1,176 @@
|
|||
@TestOn('vm')
|
||||
import 'dart:async';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:grpc/src/client/client_keepalive.dart';
|
||||
import 'package:grpc/src/client/connection.dart';
|
||||
import 'package:grpc/src/client/http2_connection.dart';
|
||||
import 'package:grpc/src/server/server_keepalive.dart';
|
||||
import 'package:http2/transport.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
import 'src/generated/echo.pbgrpc.dart';
|
||||
|
||||
void main() {
|
||||
late Server server;
|
||||
late EchoServiceClient fakeClient;
|
||||
late FakeClientChannel fakeChannel;
|
||||
late EchoServiceClient unresponsiveClient;
|
||||
late ClientChannel unresponsiveChannel;
|
||||
|
||||
setUp(() async {
|
||||
final serverOptions = ServerKeepAliveOptions(
|
||||
maxBadPings: 5,
|
||||
minIntervalBetweenPingsWithoutData: Duration(milliseconds: 10),
|
||||
);
|
||||
final clientOptions = ClientKeepAliveOptions(
|
||||
pingInterval: Duration(milliseconds: 10),
|
||||
timeout: Duration(milliseconds: 30),
|
||||
permitWithoutCalls: true,
|
||||
);
|
||||
|
||||
server = Server.create(
|
||||
services: [FakeEchoService()],
|
||||
keepAliveOptions: serverOptions,
|
||||
);
|
||||
await server.serve(address: 'localhost', port: 8081);
|
||||
fakeChannel = FakeClientChannel(
|
||||
'localhost',
|
||||
port: server.port!,
|
||||
options: ChannelOptions(
|
||||
credentials: ChannelCredentials.insecure(),
|
||||
keepAlive: clientOptions,
|
||||
),
|
||||
);
|
||||
fakeClient = EchoServiceClient(fakeChannel);
|
||||
|
||||
unresponsiveChannel = UnresponsiveClientChannel(
|
||||
'localhost',
|
||||
port: server.port!,
|
||||
options: ChannelOptions(
|
||||
credentials: ChannelCredentials.insecure(),
|
||||
keepAlive: clientOptions,
|
||||
),
|
||||
);
|
||||
unresponsiveClient = EchoServiceClient(unresponsiveChannel);
|
||||
});
|
||||
|
||||
tearDown(() async {
|
||||
await fakeChannel.terminate();
|
||||
await server.shutdown();
|
||||
});
|
||||
|
||||
test('Server terminates connection after too many pings without data',
|
||||
() async {
|
||||
await fakeClient.echo(EchoRequest());
|
||||
await Future.delayed(Duration(milliseconds: 300));
|
||||
await fakeClient.echo(EchoRequest());
|
||||
// Check that the server closed the connection, the next request then has
|
||||
// to build a new one.
|
||||
expect(fakeChannel.newConnectionCounter, 2);
|
||||
});
|
||||
|
||||
test('Server doesnt terminate connection after pings, as data is sent',
|
||||
() async {
|
||||
final timer = Timer.periodic(
|
||||
Duration(milliseconds: 30), (timer) => fakeClient.echo(EchoRequest()));
|
||||
await Future.delayed(Duration(milliseconds: 200), () => timer.cancel());
|
||||
// Check that the server never closed the connection
|
||||
expect(fakeChannel.newConnectionCounter, 1);
|
||||
});
|
||||
|
||||
test('Server doesnt ack the ping, making the client shutdown the connection',
|
||||
() async {
|
||||
await unresponsiveClient.echo(EchoRequest());
|
||||
await Future.delayed(Duration(milliseconds: 200));
|
||||
await expectLater(
|
||||
unresponsiveClient.echo(EchoRequest()), throwsA(isA<GrpcError>()));
|
||||
});
|
||||
}
|
||||
|
||||
/// A wrapper around a [FakeHttp2ClientConnection]
|
||||
class FakeClientChannel extends ClientChannel {
|
||||
late FakeHttp2ClientConnection fakeHttp2ClientConnection;
|
||||
FakeClientChannel(
|
||||
super.host, {
|
||||
super.port = 443,
|
||||
super.options = const ChannelOptions(),
|
||||
super.channelShutdownHandler,
|
||||
});
|
||||
|
||||
@override
|
||||
ClientConnection createConnection() {
|
||||
fakeHttp2ClientConnection = FakeHttp2ClientConnection(host, port, options);
|
||||
return fakeHttp2ClientConnection;
|
||||
}
|
||||
|
||||
int get newConnectionCounter =>
|
||||
fakeHttp2ClientConnection.newConnectionCounter;
|
||||
}
|
||||
|
||||
/// A [Http2ClientConnection] exposing a counter for new connections
|
||||
class FakeHttp2ClientConnection extends Http2ClientConnection {
|
||||
int newConnectionCounter = 0;
|
||||
|
||||
FakeHttp2ClientConnection(super.host, super.port, super.options);
|
||||
|
||||
@override
|
||||
Future<ClientTransportConnection> connectTransport() {
|
||||
newConnectionCounter++;
|
||||
return super.connectTransport();
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper around a [FakeHttp2ClientConnection]
|
||||
class UnresponsiveClientChannel extends ClientChannel {
|
||||
UnresponsiveClientChannel(
|
||||
super.host, {
|
||||
super.port = 443,
|
||||
super.options = const ChannelOptions(),
|
||||
super.channelShutdownHandler,
|
||||
});
|
||||
|
||||
@override
|
||||
ClientConnection createConnection() =>
|
||||
UnresponsiveHttp2ClientConnection(host, port, options);
|
||||
}
|
||||
|
||||
class UnresponsiveHttp2ClientConnection extends Http2ClientConnection {
|
||||
UnresponsiveHttp2ClientConnection(super.host, super.port, super.options);
|
||||
|
||||
@override
|
||||
set keepAliveManager(ClientKeepAlive? value) {
|
||||
if (value != null) {
|
||||
super.keepAliveManager = FakeClientKeepAlive(
|
||||
options: super.options.keepAlive,
|
||||
ping: value.ping,
|
||||
onPingTimeout: value.onPingTimeout,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FakeClientKeepAlive extends ClientKeepAlive {
|
||||
FakeClientKeepAlive(
|
||||
{required super.options,
|
||||
required super.ping,
|
||||
required super.onPingTimeout});
|
||||
|
||||
@override
|
||||
void onFrameReceived() {
|
||||
// Do nothing here, to simulate a server not responding to pings.
|
||||
}
|
||||
}
|
||||
|
||||
class FakeEchoService extends EchoServiceBase {
|
||||
@override
|
||||
Future<EchoResponse> echo(ServiceCall call, EchoRequest request) async =>
|
||||
EchoResponse(message: 'Echo messsage');
|
||||
|
||||
@override
|
||||
Stream<ServerStreamingEchoResponse> serverStreamingEcho(
|
||||
ServiceCall call, ServerStreamingEchoRequest request) {
|
||||
// TODO: implement serverStreamingEcho
|
||||
throw UnimplementedError();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
import 'dart:async';
|
||||
|
||||
import 'package:grpc/src/server/server_keepalive.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
void main() {
|
||||
late StreamController pingStream;
|
||||
late StreamController dataStream;
|
||||
late int maxBadPings;
|
||||
|
||||
var goAway = false;
|
||||
|
||||
void initServer([ServerKeepAliveOptions? options]) => ServerKeepAlive(
|
||||
options: options ??
|
||||
ServerKeepAliveOptions(
|
||||
maxBadPings: maxBadPings,
|
||||
minIntervalBetweenPingsWithoutData: Duration(milliseconds: 5),
|
||||
),
|
||||
pingNotifier: pingStream.stream,
|
||||
dataNotifier: dataStream.stream,
|
||||
tooManyBadPings: () async => goAway = true,
|
||||
).handle();
|
||||
|
||||
setUp(() {
|
||||
pingStream = StreamController();
|
||||
dataStream = StreamController();
|
||||
maxBadPings = 10;
|
||||
goAway = false;
|
||||
});
|
||||
|
||||
tearDown(() {
|
||||
pingStream.close();
|
||||
dataStream.close();
|
||||
});
|
||||
|
||||
test('Sending too many pings without data kills connection', () async {
|
||||
initServer();
|
||||
// Send good ping
|
||||
pingStream.sink.add(null);
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
|
||||
// Send [maxBadPings] bad pings, that's still ok
|
||||
for (var i = 0; i < maxBadPings; i++) {
|
||||
pingStream.sink.add(null);
|
||||
}
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
expect(goAway, false);
|
||||
|
||||
// Send another bad ping; that's one too many!
|
||||
pingStream.sink.add(null);
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
expect(goAway, true);
|
||||
});
|
||||
test(
|
||||
'Sending too many pings without data doesn`t kill connection if the server doesn`t care',
|
||||
() async {
|
||||
initServer(ServerKeepAliveOptions(
|
||||
maxBadPings: null,
|
||||
minIntervalBetweenPingsWithoutData: Duration(milliseconds: 5),
|
||||
));
|
||||
// Send good ping
|
||||
pingStream.sink.add(null);
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
|
||||
// Send a lot of bad pings, that's still ok.
|
||||
for (var i = 0; i < 50; i++) {
|
||||
pingStream.sink.add(null);
|
||||
}
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
expect(goAway, false);
|
||||
});
|
||||
|
||||
test('Sending many pings with data doesn`t kill connection', () async {
|
||||
initServer();
|
||||
|
||||
// Send good ping
|
||||
pingStream.sink.add(null);
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
|
||||
// Send [maxBadPings] bad pings, that's still ok
|
||||
for (var i = 0; i < maxBadPings; i++) {
|
||||
pingStream.sink.add(null);
|
||||
}
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
expect(goAway, false);
|
||||
|
||||
// Sending data resets the bad ping count
|
||||
dataStream.add(null);
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
|
||||
// Send good ping
|
||||
pingStream.sink.add(null);
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
|
||||
// Send [maxBadPings] bad pings, that's still ok
|
||||
for (var i = 0; i < maxBadPings; i++) {
|
||||
pingStream.sink.add(null);
|
||||
}
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
expect(goAway, false);
|
||||
|
||||
// Send another bad ping; that's one too many!
|
||||
pingStream.sink.add(null);
|
||||
await Future.delayed(Duration(milliseconds: 10));
|
||||
expect(goAway, true);
|
||||
});
|
||||
}
|
|
@ -18,6 +18,7 @@ import 'dart:convert';
|
|||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:grpc/src/client/channel.dart' as base;
|
||||
import 'package:grpc/src/client/client_keepalive.dart';
|
||||
import 'package:grpc/src/client/http2_connection.dart';
|
||||
import 'package:grpc/src/shared/message.dart';
|
||||
import 'package:http2/transport.dart';
|
||||
|
@ -76,6 +77,9 @@ class FakeChannelOptions implements ChannelOptions {
|
|||
BackoffStrategy backoffStrategy = testBackoff;
|
||||
@override
|
||||
CodecRegistry codecRegistry = CodecRegistry.empty();
|
||||
|
||||
@override
|
||||
ClientKeepAliveOptions get keepAlive => const ClientKeepAliveOptions();
|
||||
}
|
||||
|
||||
class FakeChannel extends ClientChannel {
|
||||
|
|
|
@ -1,101 +1,204 @@
|
|||
// Mocks generated by Mockito 5.0.0-nullsafety.6 from annotations
|
||||
// Mocks generated by Mockito 5.4.1 from annotations
|
||||
// in grpc/test/src/client_utils.dart.
|
||||
// Do not manually edit this file.
|
||||
|
||||
import 'dart:async' as i3;
|
||||
// @dart=2.19
|
||||
|
||||
import 'package:http2/src/hpack/hpack.dart' as i4;
|
||||
import 'package:http2/transport.dart' as i2;
|
||||
import 'package:mockito/mockito.dart' as i1;
|
||||
// ignore_for_file: no_leading_underscores_for_library_prefixes
|
||||
import 'dart:async' as _i3;
|
||||
|
||||
import 'package:http2/src/hpack/hpack.dart' as _i4;
|
||||
import 'package:http2/transport.dart' as _i2;
|
||||
import 'package:mockito/mockito.dart' as _i1;
|
||||
|
||||
// ignore_for_file: type=lint
|
||||
// ignore_for_file: avoid_redundant_argument_values
|
||||
// ignore_for_file: avoid_setters_without_getters
|
||||
// ignore_for_file: comment_references
|
||||
// ignore_for_file: implementation_imports
|
||||
// ignore_for_file: invalid_use_of_visible_for_testing_member
|
||||
// ignore_for_file: prefer_const_constructors
|
||||
// ignore_for_file: unnecessary_parenthesis
|
||||
// ignore_for_file: camel_case_types
|
||||
// ignore_for_file: subtype_of_sealed_class
|
||||
|
||||
class _FakeClientTransportStream extends i1.Fake
|
||||
implements i2.ClientTransportStream {}
|
||||
class _FakeClientTransportStream_0 extends _i1.SmartFake
|
||||
implements _i2.ClientTransportStream {
|
||||
_FakeClientTransportStream_0(
|
||||
Object parent,
|
||||
Invocation parentInvocation,
|
||||
) : super(
|
||||
parent,
|
||||
parentInvocation,
|
||||
);
|
||||
}
|
||||
|
||||
class _FakeStreamSink<S> extends i1.Fake implements i3.StreamSink<S> {}
|
||||
class _FakeStreamSink_1<S> extends _i1.SmartFake implements _i3.StreamSink<S> {
|
||||
_FakeStreamSink_1(
|
||||
Object parent,
|
||||
Invocation parentInvocation,
|
||||
) : super(
|
||||
parent,
|
||||
parentInvocation,
|
||||
);
|
||||
}
|
||||
|
||||
/// A class which mocks [ClientTransportConnection].
|
||||
///
|
||||
/// See the documentation for Mockito's code generation for more information.
|
||||
class MockClientTransportConnection extends i1.Mock
|
||||
implements i2.ClientTransportConnection {
|
||||
class MockClientTransportConnection extends _i1.Mock
|
||||
implements _i2.ClientTransportConnection {
|
||||
MockClientTransportConnection() {
|
||||
i1.throwOnMissingStub(this);
|
||||
_i1.throwOnMissingStub(this);
|
||||
}
|
||||
|
||||
@override
|
||||
bool get isOpen =>
|
||||
(super.noSuchMethod(Invocation.getter(#isOpen), returnValue: false)
|
||||
as bool);
|
||||
bool get isOpen => (super.noSuchMethod(
|
||||
Invocation.getter(#isOpen),
|
||||
returnValue: false,
|
||||
) as bool);
|
||||
@override
|
||||
set onActiveStateChanged(i2.ActiveStateHandler? callback) =>
|
||||
super.noSuchMethod(Invocation.setter(#onActiveStateChanged, callback),
|
||||
returnValueForMissingStub: null);
|
||||
set onActiveStateChanged(_i2.ActiveStateHandler? callback) =>
|
||||
super.noSuchMethod(
|
||||
Invocation.setter(
|
||||
#onActiveStateChanged,
|
||||
callback,
|
||||
),
|
||||
returnValueForMissingStub: null,
|
||||
);
|
||||
@override
|
||||
i3.Future<void> get onInitialPeerSettingsReceived =>
|
||||
(super.noSuchMethod(Invocation.getter(#onInitialPeerSettingsReceived),
|
||||
returnValue: Future.value(null)) as i3.Future<void>);
|
||||
_i3.Future<void> get onInitialPeerSettingsReceived => (super.noSuchMethod(
|
||||
Invocation.getter(#onInitialPeerSettingsReceived),
|
||||
returnValue: _i3.Future<void>.value(),
|
||||
) as _i3.Future<void>);
|
||||
@override
|
||||
i2.ClientTransportStream makeRequest(List<i4.Header>? headers,
|
||||
{bool? endStream = false}) =>
|
||||
_i3.Stream<int> get onPingReceived => (super.noSuchMethod(
|
||||
Invocation.getter(#onPingReceived),
|
||||
returnValue: _i3.Stream<int>.empty(),
|
||||
) as _i3.Stream<int>);
|
||||
@override
|
||||
_i3.Stream<void> get onFrameReceived => (super.noSuchMethod(
|
||||
Invocation.getter(#onFrameReceived),
|
||||
returnValue: _i3.Stream<void>.empty(),
|
||||
) as _i3.Stream<void>);
|
||||
@override
|
||||
_i2.ClientTransportStream makeRequest(
|
||||
List<_i4.Header>? headers, {
|
||||
bool? endStream = false,
|
||||
}) =>
|
||||
(super.noSuchMethod(
|
||||
Invocation.method(#makeRequest, [headers], {#endStream: endStream}),
|
||||
returnValue:
|
||||
_FakeClientTransportStream()) as i2.ClientTransportStream);
|
||||
Invocation.method(
|
||||
#makeRequest,
|
||||
[headers],
|
||||
{#endStream: endStream},
|
||||
),
|
||||
returnValue: _FakeClientTransportStream_0(
|
||||
this,
|
||||
Invocation.method(
|
||||
#makeRequest,
|
||||
[headers],
|
||||
{#endStream: endStream},
|
||||
),
|
||||
),
|
||||
) as _i2.ClientTransportStream);
|
||||
@override
|
||||
i3.Future<dynamic> ping() => (super.noSuchMethod(Invocation.method(#ping, []),
|
||||
returnValue: Future.value(null)) as i3.Future<dynamic>);
|
||||
_i3.Future<dynamic> ping() => (super.noSuchMethod(
|
||||
Invocation.method(
|
||||
#ping,
|
||||
[],
|
||||
),
|
||||
returnValue: _i3.Future<dynamic>.value(),
|
||||
) as _i3.Future<dynamic>);
|
||||
@override
|
||||
i3.Future<dynamic> finish() =>
|
||||
(super.noSuchMethod(Invocation.method(#finish, []),
|
||||
returnValue: Future.value(null)) as i3.Future<dynamic>);
|
||||
_i3.Future<dynamic> finish() => (super.noSuchMethod(
|
||||
Invocation.method(
|
||||
#finish,
|
||||
[],
|
||||
),
|
||||
returnValue: _i3.Future<dynamic>.value(),
|
||||
) as _i3.Future<dynamic>);
|
||||
@override
|
||||
i3.Future<dynamic> terminate() =>
|
||||
(super.noSuchMethod(Invocation.method(#terminate, []),
|
||||
returnValue: Future.value(null)) as i3.Future<dynamic>);
|
||||
_i3.Future<dynamic> terminate([int? errorCode]) => (super.noSuchMethod(
|
||||
Invocation.method(
|
||||
#terminate,
|
||||
[errorCode],
|
||||
),
|
||||
returnValue: _i3.Future<dynamic>.value(),
|
||||
) as _i3.Future<dynamic>);
|
||||
}
|
||||
|
||||
/// A class which mocks [ClientTransportStream].
|
||||
///
|
||||
/// See the documentation for Mockito's code generation for more information.
|
||||
class MockClientTransportStream extends i1.Mock
|
||||
implements i2.ClientTransportStream {
|
||||
class MockClientTransportStream extends _i1.Mock
|
||||
implements _i2.ClientTransportStream {
|
||||
MockClientTransportStream() {
|
||||
i1.throwOnMissingStub(this);
|
||||
_i1.throwOnMissingStub(this);
|
||||
}
|
||||
|
||||
@override
|
||||
i3.Stream<i2.TransportStreamPush> get peerPushes =>
|
||||
(super.noSuchMethod(Invocation.getter(#peerPushes),
|
||||
returnValue: Stream<i2.TransportStreamPush>.empty())
|
||||
as i3.Stream<i2.TransportStreamPush>);
|
||||
_i3.Stream<_i2.TransportStreamPush> get peerPushes => (super.noSuchMethod(
|
||||
Invocation.getter(#peerPushes),
|
||||
returnValue: _i3.Stream<_i2.TransportStreamPush>.empty(),
|
||||
) as _i3.Stream<_i2.TransportStreamPush>);
|
||||
@override
|
||||
int get id =>
|
||||
(super.noSuchMethod(Invocation.getter(#id), returnValue: 0) as int);
|
||||
int get id => (super.noSuchMethod(
|
||||
Invocation.getter(#id),
|
||||
returnValue: 0,
|
||||
) as int);
|
||||
@override
|
||||
i3.Stream<i2.StreamMessage> get incomingMessages =>
|
||||
(super.noSuchMethod(Invocation.getter(#incomingMessages),
|
||||
returnValue: Stream<i2.StreamMessage>.empty())
|
||||
as i3.Stream<i2.StreamMessage>);
|
||||
_i3.Stream<_i2.StreamMessage> get incomingMessages => (super.noSuchMethod(
|
||||
Invocation.getter(#incomingMessages),
|
||||
returnValue: _i3.Stream<_i2.StreamMessage>.empty(),
|
||||
) as _i3.Stream<_i2.StreamMessage>);
|
||||
@override
|
||||
i3.StreamSink<i2.StreamMessage> get outgoingMessages =>
|
||||
(super.noSuchMethod(Invocation.getter(#outgoingMessages),
|
||||
returnValue: _FakeStreamSink<i2.StreamMessage>())
|
||||
as i3.StreamSink<i2.StreamMessage>);
|
||||
_i3.StreamSink<_i2.StreamMessage> get outgoingMessages => (super.noSuchMethod(
|
||||
Invocation.getter(#outgoingMessages),
|
||||
returnValue: _FakeStreamSink_1<_i2.StreamMessage>(
|
||||
this,
|
||||
Invocation.getter(#outgoingMessages),
|
||||
),
|
||||
) as _i3.StreamSink<_i2.StreamMessage>);
|
||||
@override
|
||||
set onTerminated(void Function(int?)? value) =>
|
||||
super.noSuchMethod(Invocation.setter(#onTerminated, value),
|
||||
returnValueForMissingStub: null);
|
||||
set onTerminated(void Function(int?)? value) => super.noSuchMethod(
|
||||
Invocation.setter(
|
||||
#onTerminated,
|
||||
value,
|
||||
),
|
||||
returnValueForMissingStub: null,
|
||||
);
|
||||
@override
|
||||
void sendHeaders(List<i4.Header>? headers, {bool? endStream = false}) =>
|
||||
void terminate() => super.noSuchMethod(
|
||||
Invocation.method(
|
||||
#terminate,
|
||||
[],
|
||||
),
|
||||
returnValueForMissingStub: null,
|
||||
);
|
||||
@override
|
||||
void sendHeaders(
|
||||
List<_i4.Header>? headers, {
|
||||
bool? endStream = false,
|
||||
}) =>
|
||||
super.noSuchMethod(
|
||||
Invocation.method(#sendHeaders, [headers], {#endStream: endStream}),
|
||||
returnValueForMissingStub: null);
|
||||
Invocation.method(
|
||||
#sendHeaders,
|
||||
[headers],
|
||||
{#endStream: endStream},
|
||||
),
|
||||
returnValueForMissingStub: null,
|
||||
);
|
||||
@override
|
||||
void sendData(List<int>? bytes, {bool? endStream = false}) =>
|
||||
void sendData(
|
||||
List<int>? bytes, {
|
||||
bool? endStream = false,
|
||||
}) =>
|
||||
super.noSuchMethod(
|
||||
Invocation.method(#sendData, [bytes], {#endStream: endStream}),
|
||||
returnValueForMissingStub: null);
|
||||
Invocation.method(
|
||||
#sendData,
|
||||
[bytes],
|
||||
{#endStream: endStream},
|
||||
),
|
||||
returnValueForMissingStub: null,
|
||||
);
|
||||
}
|
||||
|
|
|
@ -134,13 +134,25 @@ class ServerHarness extends _Harness {
|
|||
|
||||
class ConnectionServerHarness extends _Harness {
|
||||
@override
|
||||
ConnectionServer createServer() =>
|
||||
ConnectionServer(<Service>[service], <Interceptor>[interceptor]);
|
||||
ConnectionServer createServer() => ConnectionServer(
|
||||
<Service>[service],
|
||||
<Interceptor>[interceptor],
|
||||
);
|
||||
|
||||
static ServiceMethod<int, int> createMethod(String name,
|
||||
Function methodHandler, bool clientStreaming, bool serverStreaming) {
|
||||
return ServiceMethod<int, int>(name, methodHandler, clientStreaming,
|
||||
serverStreaming, mockDecode, mockEncode);
|
||||
static ServiceMethod<int, int> createMethod(
|
||||
String name,
|
||||
Function methodHandler,
|
||||
bool clientStreaming,
|
||||
bool serverStreaming,
|
||||
) {
|
||||
return ServiceMethod<int, int>(
|
||||
name,
|
||||
methodHandler,
|
||||
clientStreaming,
|
||||
serverStreaming,
|
||||
mockDecode,
|
||||
mockEncode,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue