From 03f07e95357515e2d531e3c8730e5e14deb6e49e Mon Sep 17 00:00:00 2001 From: Moritz Date: Wed, 21 Jun 2023 11:14:43 -0400 Subject: [PATCH] Keepalive (#634) * Keepalive tests run! * Renaming * Some refactorings * Find a place where to handle the keepalive manager * Fix bug * Make KeepAliveManager independent of transport * Fix call sites in client * Add server keepalive handler * Wire through onDataReceived * Add ServerKeepAliveManager test * Refactorings * Tests kind of run now * Add shutdown test * Remove unneeded override * Remove unneeded mocks * Send correct error codes and cleanup * Small changes * Rename * Add documentation * Add test for !_enforcesMaxBadPings * Refactor tests * Switch to http2 master branch * Renaming * Null shutdownTimer * Refactor to event-state model * Smaller refactorings * Works now * Switch tests to isA * Shifting things around * Split into server and client * Format * rename * Tweaks * Switch order of optional parameters to make change non-breaking * Add some leeway to the durations in tests * Make keepalive tests vm only * Switch back to onEvent in state * Switch to published http2 --- CHANGELOG.md | 1 + lib/src/client/client_keepalive.dart | 242 ++++++++++++++++ lib/src/client/http2_connection.dart | 34 ++- lib/src/client/options.dart | 3 + lib/src/server/handler.dart | 6 + lib/src/server/server.dart | 59 ++-- lib/src/server/server_keepalive.dart | 82 ++++++ pubspec.yaml | 5 +- .../client_keepalive_manager_test.dart | 270 ++++++++++++++++++ .../client_keepalive_manager_test.mocks.dart | 43 +++ test/keepalive_test.dart | 176 ++++++++++++ test/server_keepalive_manager_test.dart | 107 +++++++ test/src/client_utils.dart | 4 + test/src/client_utils.mocks.dart | 221 ++++++++++---- test/src/server_utils.dart | 24 +- 15 files changed, 1190 insertions(+), 87 deletions(-) create mode 100644 lib/src/client/client_keepalive.dart create mode 100644 lib/src/server/server_keepalive.dart create mode 100644 test/client_tests/client_keepalive_manager_test.dart create mode 100644 test/client_tests/client_keepalive_manager_test.mocks.dart create mode 100644 test/keepalive_test.dart create mode 100644 test/server_keepalive_manager_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index cd28d20..2e7f37a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ## 3.2.2 * Remove `base` qualifier on `ResponseStream`. +* Add support for clients to send KEEPALIVE pings. ## 3.2.1 diff --git a/lib/src/client/client_keepalive.dart b/lib/src/client/client_keepalive.dart new file mode 100644 index 0000000..0f021e2 --- /dev/null +++ b/lib/src/client/client_keepalive.dart @@ -0,0 +1,242 @@ +import 'dart:async'; + +import 'package:clock/clock.dart'; +import 'package:meta/meta.dart'; + +/// KeepAlive support for gRPC, see +/// https://github.com/grpc/grpc/blob/master/doc/keepalive.md. + +/// Options to configure a gRPC client for sending keepalive signals. +class ClientKeepAliveOptions { + /// How often a ping should be sent to keep the connection alive. + /// + /// `GRPC_ARG_KEEPALIVE_TIME_MS` in the docs. + final Duration? pingInterval; + + /// How long the connection should wait before shutting down after no response + /// to a ping. + /// + /// `GRPC_ARG_KEEPALIVE_TIMEOUT_MS` in the docs. + final Duration timeout; + + /// If a connection with no active calls should be kept alive by sending + /// pings. + /// + /// `GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS` in the docs. + final bool permitWithoutCalls; + + const ClientKeepAliveOptions({ + this.pingInterval, + this.timeout = const Duration(milliseconds: 20000), + this.permitWithoutCalls = false, + }); + + bool get shouldSendPings => pingInterval != null; +} + +sealed class KeepAliveState { + KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager); + + void disconnect(); +} + +/// Transport has no active rpcs. We don't need to do any keepalives. +final class Idle extends KeepAliveState { + final Timer? pingTimer; + final Stopwatch timeSinceFrame; + + Idle([this.pingTimer, Stopwatch? stopwatch]) + : timeSinceFrame = stopwatch ?? clock.stopwatch() + ..start(); + + @override + KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) { + switch (event) { + case KeepAliveEvent.onTransportActive: + // When the transport goes active, we do not reset the nextKeepaliveTime. + // This allows us to quickly check whether the connection is still + // working. + final timer = pingTimer ?? + Timer(manager._pingInterval - timeSinceFrame.elapsed, + manager.sendPing); + return PingScheduled(timer, timeSinceFrame); + default: + return null; + } + } + + @override + void disconnect() => pingTimer?.cancel(); +} + +/// We have scheduled a ping to be sent in the future. We may decide to delay +/// it if we receive some data. +final class PingScheduled extends KeepAliveState { + final Timer pingTimer; + final Stopwatch timeSinceFrame; + + PingScheduled(this.pingTimer, [Stopwatch? stopwatch]) + : timeSinceFrame = stopwatch ?? clock.stopwatch() + ..start(); + + @override + KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) { + switch (event) { + case KeepAliveEvent.onFrameReceived: + // We do not cancel the ping future here. This avoids constantly scheduling + // and cancellation in a busy transport. Instead, we update the status here + // and reschedule later. So we actually keep one sendPing task always in + // flight when there're active rpcs. + return PingDelayed(pingTimer); + case KeepAliveEvent.onTransportIdle: + return Idle(pingTimer, timeSinceFrame); + case KeepAliveEvent.sendPing: + // Schedule a shutdown. It fires if we don't receive the ping response + // within the timeout. + manager.ping(); + final shutdown = Timer(manager._options.timeout, manager._shutdown); + return ShutdownScheduled(shutdown, false); + default: + return null; + } + } + + @override + void disconnect() => pingTimer.cancel(); +} + +/// We need to delay the scheduled keepalive ping. +final class PingDelayed extends KeepAliveState { + final Timer pingTimer; + final Stopwatch timeSinceFrame; + + PingDelayed(this.pingTimer) : timeSinceFrame = clock.stopwatch()..start(); + + @override + KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) { + switch (event) { + case KeepAliveEvent.onTransportIdle: + return Idle(pingTimer, timeSinceFrame); + case KeepAliveEvent.sendPing: + final pingTimer = Timer( + manager._pingInterval - timeSinceFrame.elapsed, + manager.sendPing, + ); + return PingScheduled(pingTimer, timeSinceFrame); + default: + return null; + } + } + + @override + void disconnect() => pingTimer.cancel(); +} + +/// The ping has been sent out. Waiting for a ping response. +final class ShutdownScheduled extends KeepAliveState { + final bool isIdle; + final Timer shutdownTimer; + + ShutdownScheduled(this.shutdownTimer, this.isIdle); + + @override + KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) { + switch (event) { + case KeepAliveEvent.onFrameReceived: + // Ping acked or effectively ping acked. Cancel shutdown, and then if not + // idle, schedule a new keep-alive ping. + shutdownTimer.cancel(); + // schedule a new ping + return isIdle + ? Idle() + : PingScheduled(Timer(manager._pingInterval, manager.sendPing)); + case KeepAliveEvent.onTransportIdle: + return ShutdownScheduled(shutdownTimer, true); + case KeepAliveEvent.onTransportActive: + return ShutdownScheduled(shutdownTimer, false); + default: + return null; + } + } + + @override + void disconnect() => shutdownTimer.cancel(); +} + +final class Disconnected extends KeepAliveState { + @override + void disconnect() {} + + @override + KeepAliveState? onEvent(KeepAliveEvent event, ClientKeepAlive manager) => + null; +} + +enum KeepAliveEvent { + onTransportActive, + onFrameReceived, + onTransportIdle, + sendPing, +} + +/// A keep alive "manager", deciding when to send pings or shutdown based on the +/// [ClientKeepAliveOptions]. +class ClientKeepAlive { + @visibleForTesting + KeepAliveState state = Idle(); + + final void Function() onPingTimeout; + final void Function() ping; + + final ClientKeepAliveOptions _options; + Duration get _pingInterval => _options.pingInterval ?? Duration(days: 365); + + ClientKeepAlive({ + required ClientKeepAliveOptions options, + required this.ping, + required this.onPingTimeout, + }) : _options = options; + + void onTransportStarted() { + if (_options.permitWithoutCalls) { + onTransportActive(); + } + } + + /// If we receive any kind of frame from the server, that means the connection + /// is still open, so we reset the ping timer. + void onFrameReceived() => _setState(KeepAliveEvent.onFrameReceived); + + void sendPing() => _setState(KeepAliveEvent.sendPing); + + /// When the transport becomes active, we start sending pings every + /// [_pingInterval]. + void onTransportActive() => _setState(KeepAliveEvent.onTransportActive); + + /// If the transport has become idle and [_options.permitWithoutCalls] is + /// set, nothing changes, we still send pings and shutdown on no response. + /// + /// Otherwise, we stop sending pings. + void onTransportIdle() { + if (!_options.permitWithoutCalls) { + _setState(KeepAliveEvent.onTransportIdle); + } + } + + void onTransportTermination() => _disconnect(); + + void _shutdown() { + onPingTimeout(); + _disconnect(); + } + + void _disconnect() { + state.disconnect(); + state = Disconnected(); + } + + void _setState(KeepAliveEvent event) { + final newState = state.onEvent(event, this); + if (newState != null) state = newState; + } +} diff --git a/lib/src/client/http2_connection.dart b/lib/src/client/http2_connection.dart index 2824594..0f1a40d 100644 --- a/lib/src/client/http2_connection.dart +++ b/lib/src/client/http2_connection.dart @@ -22,6 +22,7 @@ import 'package:http2/transport.dart'; import '../shared/codec.dart'; import '../shared/timeout.dart'; import 'call.dart'; +import 'client_keepalive.dart'; import 'client_transport_connector.dart'; import 'connection.dart' hide ClientConnection; import 'connection.dart' as connection; @@ -57,6 +58,8 @@ class Http2ClientConnection implements connection.ClientConnection { Duration? _currentReconnectDelay; + ClientKeepAlive? keepAliveManager; + Http2ClientConnection(Object host, int port, this.options) : _transportConnector = _SocketTransportConnector(host, port, options); @@ -100,6 +103,15 @@ class Http2ClientConnection implements connection.ClientConnection { connectTransport().then((transport) async { _currentReconnectDelay = null; _transportConnection = transport; + if (options.keepAlive.shouldSendPings) { + keepAliveManager = ClientKeepAlive( + options: options.keepAlive, + ping: () => transport.ping(), + onPingTimeout: () => shutdown(), + ); + transport.onFrameReceived + .listen((_) => keepAliveManager?.onFrameReceived()); + } _connectionLifeTimer ..reset() ..start(); @@ -125,6 +137,7 @@ class Http2ClientConnection implements connection.ClientConnection { _connectionLifeTimer.elapsed > options.connectionTimeout; if (shouldRefresh) { _transportConnection!.finish(); + keepAliveManager?.onTransportTermination(); } if (!isHealthy || shouldRefresh) { _abandonConnection(); @@ -196,12 +209,14 @@ class Http2ClientConnection implements connection.ClientConnection { if (_state == ConnectionState.shutdown) return; _setShutdownState(); await _transportConnection?.finish(); + keepAliveManager?.onTransportTermination(); } @override Future terminate() async { _setShutdownState(); await _transportConnection?.terminate(); + keepAliveManager?.onTransportTermination(); } void _setShutdownState() { @@ -222,7 +237,8 @@ class Http2ClientConnection implements connection.ClientConnection { _transportConnection ?.finish() .catchError((_) {}); // TODO(jakobr): Log error. - _transportConnection = null; + keepAliveManager?.onTransportTermination(); + _disconnect(); _setState(ConnectionState.idle); } @@ -234,10 +250,12 @@ class Http2ClientConnection implements connection.ClientConnection { void _handleActiveStateChanged(bool isActive) { if (isActive) { _cancelTimer(); + keepAliveManager?.onTransportActive(); } else { if (options.idleTimeout != null) { _timer ??= Timer(options.idleTimeout!, _handleIdleTimeout); } + keepAliveManager?.onTransportIdle(); } } @@ -248,7 +266,7 @@ class Http2ClientConnection implements connection.ClientConnection { } void _handleConnectionFailure(error) { - _transportConnection = null; + _disconnect(); if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) { return; } @@ -258,6 +276,7 @@ class Http2ClientConnection implements connection.ClientConnection { _failCall(call, error); } _pendingCalls.clear(); + keepAliveManager?.onTransportIdle(); _setState(ConnectionState.idle); } @@ -267,11 +286,17 @@ class Http2ClientConnection implements connection.ClientConnection { _connect(); } + void _disconnect() { + _transportConnection = null; + keepAliveManager?.onTransportTermination(); + keepAliveManager = null; + } + void _abandonConnection() { _cancelTimer(); - _transportConnection = null; + _disconnect(); - if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) { + if (_state == ConnectionState.idle || _state == ConnectionState.shutdown) { // All good. return; } @@ -279,6 +304,7 @@ class Http2ClientConnection implements connection.ClientConnection { // We were not planning to close the socket. if (!_hasPendingCalls()) { // No pending calls. Just hop to idle, and wait for a new RPC. + keepAliveManager?.onTransportIdle(); _setState(ConnectionState.idle); return; } diff --git a/lib/src/client/options.dart b/lib/src/client/options.dart index 03ef818..fdd9c27 100644 --- a/lib/src/client/options.dart +++ b/lib/src/client/options.dart @@ -16,6 +16,7 @@ import 'dart:math'; import '../shared/codec_registry.dart'; +import 'client_keepalive.dart'; import 'transport/http2_credentials.dart'; const defaultIdleTimeout = Duration(minutes: 5); @@ -57,6 +58,7 @@ class ChannelOptions { final Duration? connectTimeout; final BackoffStrategy backoffStrategy; final String userAgent; + final ClientKeepAliveOptions keepAlive; const ChannelOptions({ this.credentials = const ChannelCredentials.secure(), @@ -66,5 +68,6 @@ class ChannelOptions { this.connectTimeout, this.connectionTimeout = defaultConnectionTimeOut, this.codecRegistry, + this.keepAlive = const ClientKeepAliveOptions(), }); } diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index 2a370bc..04dca8f 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -67,6 +67,9 @@ class ServerHandler extends ServiceCall { final X509Certificate? _clientCertificate; final InternetAddress? _remoteAddress; + /// Emits a ping everytime data is received + final Sink? onDataReceived; + ServerHandler({ required ServerTransportStream stream, required ServiceLookup serviceLookup, @@ -75,6 +78,7 @@ class ServerHandler extends ServiceCall { X509Certificate? clientCertificate, InternetAddress? remoteAddress, GrpcErrorHandler? errorHandler, + this.onDataReceived, }) : _stream = stream, _serviceLookup = serviceLookup, _interceptors = interceptors, @@ -135,6 +139,7 @@ class ServerHandler extends ServiceCall { // -- Idle state, incoming data -- void _onDataIdle(GrpcMessage headerMessage) async { + onDataReceived?.add(null); if (headerMessage is! GrpcMetadata) { _sendError(GrpcError.unimplemented('Expected header frame')); _sinkIncoming(); @@ -275,6 +280,7 @@ class ServerHandler extends ServiceCall { return; } + onDataReceived?.add(null); final data = message; Object? request; try { diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index f6c5596..a93f456 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -24,6 +24,7 @@ import '../shared/io_bits/io_bits.dart' as io_bits; import '../shared/security.dart'; import 'handler.dart'; import 'interceptor.dart'; +import 'server_keepalive.dart'; import 'service.dart'; /// Wrapper around grpc_server_credentials, a way to authenticate a server. @@ -88,6 +89,8 @@ class ConnectionServer { final List _interceptors; final CodecRegistry? _codecRegistry; final GrpcErrorHandler? _errorHandler; + final ServerKeepAliveOptions _keepAliveOptions; + final List _handlers = []; final _connections = []; @@ -97,6 +100,7 @@ class ConnectionServer { List interceptors = const [], CodecRegistry? codecRegistry, GrpcErrorHandler? errorHandler, + this._keepAliveOptions = const ServerKeepAliveOptions(), ]) : _codecRegistry = codecRegistry, _interceptors = interceptors, _errorHandler = errorHandler { @@ -113,26 +117,37 @@ class ConnectionServer { InternetAddress? remoteAddress, }) async { _connections.add(connection); - ServerHandler? handler; // TODO(jakobr): Set active state handlers, close connection after idle // timeout. + final onDataReceivedController = StreamController(); + ServerKeepAlive( + options: _keepAliveOptions, + tooManyBadPings: () async => + await connection.terminate(ErrorCode.ENHANCE_YOUR_CALM), + pingNotifier: connection.onPingReceived, + dataNotifier: onDataReceivedController.stream, + ).handle(); connection.incomingStreams.listen((stream) { - handler = serveStream_( + _handlers.add(serveStream_( stream: stream, clientCertificate: clientCertificate, remoteAddress: remoteAddress, - ); + onDataReceived: onDataReceivedController.sink, + )); }, onError: (error, stackTrace) { if (error is Error) { Zone.current.handleUncaughtError(error, stackTrace); } - }, onDone: () { + }, onDone: () async { // TODO(sigurdm): This is not correct behavior in the presence of // half-closed tcp streams. // Half-closed streams seems to not be fully supported by package:http2. // https://github.com/dart-lang/http2/issues/42 - handler?.cancel(); + for (var handler in _handlers) { + handler.cancel(); + } _connections.remove(connection); + await onDataReceivedController.close(); }); } @@ -141,18 +156,20 @@ class ConnectionServer { required ServerTransportStream stream, X509Certificate? clientCertificate, InternetAddress? remoteAddress, + Sink? onDataReceived, }) { return ServerHandler( - stream: stream, - serviceLookup: lookupService, - interceptors: _interceptors, - codecRegistry: _codecRegistry, - // ignore: unnecessary_cast - clientCertificate: clientCertificate as io_bits.X509Certificate?, - // ignore: unnecessary_cast - remoteAddress: remoteAddress as io_bits.InternetAddress?, - errorHandler: _errorHandler, - )..handle(); + stream: stream, + serviceLookup: lookupService, + interceptors: _interceptors, + codecRegistry: _codecRegistry, + // ignore: unnecessary_cast + clientCertificate: clientCertificate as io_bits.X509Certificate?, + // ignore: unnecessary_cast + remoteAddress: remoteAddress as io_bits.InternetAddress?, + errorHandler: _errorHandler, + onDataReceived: onDataReceived) + ..handle(); } } @@ -170,15 +187,23 @@ class Server extends ConnectionServer { super.interceptors, super.codecRegistry, super.errorHandler, + super.keepAlive, ]); /// Create a server for the given [services]. Server.create({ required List services, + ServerKeepAliveOptions keepAliveOptions = const ServerKeepAliveOptions(), List interceptors = const [], CodecRegistry? codecRegistry, GrpcErrorHandler? errorHandler, - }) : super(services, interceptors, codecRegistry, errorHandler); + }) : super( + services, + interceptors, + codecRegistry, + errorHandler, + keepAliveOptions, + ); /// The port that the server is listening on, or `null` if the server is not /// active. @@ -266,6 +291,7 @@ class Server extends ConnectionServer { required ServerTransportStream stream, X509Certificate? clientCertificate, InternetAddress? remoteAddress, + Sink? onDataReceived, }) { return ServerHandler( stream: stream, @@ -277,6 +303,7 @@ class Server extends ConnectionServer { // ignore: unnecessary_cast remoteAddress: remoteAddress as io_bits.InternetAddress?, errorHandler: _errorHandler, + onDataReceived: onDataReceived, )..handle(); } diff --git a/lib/src/server/server_keepalive.dart b/lib/src/server/server_keepalive.dart new file mode 100644 index 0000000..d196641 --- /dev/null +++ b/lib/src/server/server_keepalive.dart @@ -0,0 +1,82 @@ +import 'package:clock/clock.dart'; + +/// Options to configure a gRPC server for receiving keepalive signals. +class ServerKeepAliveOptions { + /// The maximum number of bad pings that the server will tolerate before + /// sending an HTTP2 GOAWAY frame and closing the transport. + /// + /// `GRPC_ARG_HTTP2_MAX_PING_STRIKES` in the docs. + final int? maxBadPings; + + /// The minimum time that is expected between receiving successive pings. + /// + /// `GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS` in the docs. + final Duration minIntervalBetweenPingsWithoutData; + + const ServerKeepAliveOptions({ + this.minIntervalBetweenPingsWithoutData = + const Duration(milliseconds: 300000), + this.maxBadPings = 2, + }); +} + +/// A keep alive "manager", deciding what do to when receiving pings from a +/// client trying to keep the connection alive, based on the set +/// [ServerKeepAliveOptions]. +class ServerKeepAlive { + /// What to do after receiving too many bad pings, probably shut down the + /// connection to not be DDoSed. + final Future Function()? tooManyBadPings; + + final ServerKeepAliveOptions options; + + /// A stream of events for every time the server gets pinged. + final Stream pingNotifier; + + /// A stream of events for every time the server receives data. + final Stream dataNotifier; + + int _badPings = 0; + Stopwatch? _timeOfLastReceivedPing; + + ServerKeepAlive({ + this.tooManyBadPings, + required this.options, + required this.pingNotifier, + required this.dataNotifier, + }); + + void handle() { + // If we don't care about bad pings, there is not point in listening to + // events. + if (_enforcesMaxBadPings) { + pingNotifier.listen((_) => _onPingReceived()); + dataNotifier.listen((_) => _onDataReceived()); + } + } + + bool get _enforcesMaxBadPings => (options.maxBadPings ?? 0) > 0; + + Future _onPingReceived() async { + if (_enforcesMaxBadPings) { + if (_timeOfLastReceivedPing == null) { + _timeOfLastReceivedPing = clock.stopwatch() + ..reset() + ..start(); + } else if (_timeOfLastReceivedPing!.elapsed > + options.minIntervalBetweenPingsWithoutData) { + _badPings++; + } + if (_badPings > options.maxBadPings!) { + await tooManyBadPings?.call(); + } + } + } + + void _onDataReceived() { + if (_enforcesMaxBadPings) { + _badPings = 0; + _timeOfLastReceivedPing = null; + } + } +} diff --git a/pubspec.yaml b/pubspec.yaml index 88523a7..ced0274 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,5 @@ name: grpc description: Dart implementation of gRPC, a high performance, open-source universal RPC framework. - version: 3.2.3-wip repository: https://github.com/grpc/grpc-dart @@ -16,8 +15,9 @@ dependencies: googleapis_auth: ^1.1.0 meta: ^1.3.0 http: '>=0.13.0 <2.0.0' - http2: ^2.0.0 + http2: ^2.2.0 protobuf: '>=2.0.0 <4.0.0' + clock: ^1.1.1 dev_dependencies: build_runner: ^2.0.0 @@ -29,6 +29,7 @@ dev_dependencies: stream_channel: ^2.1.0 stream_transform: ^2.0.0 vm_service: ^11.6.0 + fake_async: ^1.3.1 false_secrets: - interop/server1.key diff --git a/test/client_tests/client_keepalive_manager_test.dart b/test/client_tests/client_keepalive_manager_test.dart new file mode 100644 index 0000000..74889d5 --- /dev/null +++ b/test/client_tests/client_keepalive_manager_test.dart @@ -0,0 +1,270 @@ +import 'package:fake_async/fake_async.dart'; +import 'package:grpc/src/client/client_keepalive.dart'; +import 'package:mockito/annotations.dart'; +import 'package:mockito/mockito.dart'; +import 'package:test/test.dart'; + +import 'client_keepalive_manager_test.mocks.dart'; + +@GenerateNiceMocks([MockSpec()]) +abstract class Pinger { + void ping(); + void onPingTimeout(); +} + +void main() { + late ClientKeepAlive keepAliveManager; + + final pinger = MockPinger(); + + var transportOpen = true; + + final shortTime = Duration(milliseconds: 150); + + final epsilon = Duration(milliseconds: 50); + + final timeout = Duration(milliseconds: 2000); + + /// Add some epsilon to make sure tests pass + final timeoutLeeway = timeout + epsilon; + + final pingInterval = Duration(milliseconds: 1000); + + /// Add some epsilon to make sure tests pass + final pingIntervalLeeway = pingInterval + epsilon; + + void initKeepAliveManager([ClientKeepAliveOptions? opt]) { + reset(pinger); + final options = opt ?? + ClientKeepAliveOptions( + pingInterval: pingInterval, + timeout: timeout, + permitWithoutCalls: false, + ); + + when(pinger.ping()).thenAnswer((_) async => transportOpen = true); + when(pinger.onPingTimeout()).thenAnswer((_) async => transportOpen = false); + + keepAliveManager = ClientKeepAlive( + options: options, + ping: pinger.ping, + onPingTimeout: pinger.onPingTimeout, + ); + transportOpen = true; + } + + setUp(() => initKeepAliveManager()); + test('sendKeepAlivePings', () { + FakeAsync().run((async) { + // Transport becomes active. We should schedule keepalive pings. + keepAliveManager.onTransportActive(); + async.elapse(pingIntervalLeeway); + // Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled. + verify(pinger.ping()).called(1); + // Ping succeeds. Reschedule another ping. + + async.elapse(shortTime); + keepAliveManager.onFrameReceived(); + // Shutdown task has been cancelled. + // Next ping should be exactly 1000 milliseconds later. + async.elapse(pingIntervalLeeway); + verify(pinger.ping()).called(1); + }); + }); + + test('keepAlivePingDelayedByIncomingData', () { + FakeAsync().run((async) { + // Transport becomes active. We should schedule keepalive pings. + keepAliveManager.onTransportActive(); + + // We receive some data. We may need to delay the ping. + async.elapse(pingInterval - shortTime); + keepAliveManager.onFrameReceived(); + async.elapse(shortTime + epsilon); + + // We didn't send the ping. + verifyNever(pinger.ping()); + }); + }); + + test('clienttransport.ping()_pingTimeout', () { + FakeAsync().run((async) { + pinger.onPingTimeout(); + expect(transportOpen, false); + }); + }); + + test('onTransportTerminationCancelsShutdownFuture', () { + FakeAsync().run((async) { + // Transport becomes active. We should schedule keepalive pings. + keepAliveManager.onTransportActive(); + async.elapse(pingIntervalLeeway); + // Shutdown task has become active. + expect(keepAliveManager.state, isA()); + + keepAliveManager.onTransportTermination(); + + // Shutdown task has been cancelled. + expect(keepAliveManager.state, isA()); + }); + }); + test('keepAlivePingTimesOut', () { + FakeAsync().run((async) { + // Transport becomes active. We should schedule keepalive pings. + keepAliveManager.onTransportActive(); + expect(keepAliveManager.state, isA()); + + // Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled. + async.elapse(pingIntervalLeeway); + verify(pinger.ping()).called(1); + expect(keepAliveManager.state, isA()); + + // We do not receive the ping response. Shutdown runnable runs. + async.elapse(timeoutLeeway); + verify(pinger.onPingTimeout()).called(1); + expect(keepAliveManager.state, isA()); + + // We receive the ping response too late. + keepAliveManager.onFrameReceived(); + // No more ping should be scheduled. + expect(keepAliveManager.state, isA()); + }); + }); + test('transportGoesIdle', () { + FakeAsync().run((async) async { + // Transport becomes active. We should schedule keepalive pings. + keepAliveManager.onTransportActive(); + expect(keepAliveManager.state, isA()); + + // Transport becomes idle. Nothing should happen when ping runnable runs. + keepAliveManager.onTransportIdle(); + expect(keepAliveManager.state, isA()); + async.elapse(pingIntervalLeeway); + // Ping was not sent. + verifyNever(pinger.ping()); + // No new ping got scheduled. + expect(keepAliveManager.state, isA()); + + // But when transport goes back to active + keepAliveManager.onTransportActive(); + expect(keepAliveManager.state, isA()); + // Ping is now sent. + async.elapse(pingIntervalLeeway); + verify(pinger.ping()).called(1); + expect(keepAliveManager.state, isA()); + }); + }); + test('transportGoesIdle_doesntCauseIdleWhenEnabled', () { + FakeAsync().run((async) { + keepAliveManager.onTransportTermination(); + initKeepAliveManager(ClientKeepAliveOptions( + pingInterval: pingInterval, + timeout: timeout, + permitWithoutCalls: true, + )); + keepAliveManager.onTransportStarted(); + + // Keepalive scheduling should have started immediately. + expect(keepAliveManager.state, isA()); + + keepAliveManager.onTransportActive(); + + // Transport becomes idle. Should not impact the sending of the ping. + keepAliveManager.onTransportIdle(); + async.elapse(pingIntervalLeeway); + // Ping was sent. + verify(pinger.ping()).called(1); + // Shutdown is scheduled. + expect(keepAliveManager.state, isA()); + // Shutdown is triggered. + async.elapse(timeoutLeeway); + expect(keepAliveManager.state, isA()); + verify(pinger.onPingTimeout()).called(1); + }); + }); + test('transportGoesIdleAfterPingSent', () { + FakeAsync().run((async) { + // Transport becomes active. We should schedule keepalive pings. + keepAliveManager.onTransportActive(); + + // Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled. + async.elapse(pingIntervalLeeway); + verify(pinger.ping()).called(1); + expect(keepAliveManager.state, isA()); + + // Transport becomes idle. No more ping should be scheduled after we receive a ping response. + keepAliveManager.onTransportIdle(); + async.elapse(shortTime); + keepAliveManager.onFrameReceived(); + expect(keepAliveManager.state, isA()); + // Transport becomes active again. Another ping is scheduled. + keepAliveManager.onTransportActive(); + expect(keepAliveManager.state, isA()); + }); + }); + test('transportGoesIdleBeforePingSent', () { + FakeAsync().run((async) { + // Transport becomes active. We should schedule keepalive pings. + keepAliveManager.onTransportActive(); + final pingFuture = (keepAliveManager.state as PingScheduled).pingTimer; + expect(pingFuture, isNotNull); + + // Data is received, and we go to ping delayed + keepAliveManager.onFrameReceived(); + + // Transport becomes idle while the 1st ping is still scheduled + keepAliveManager.onTransportIdle(); + + // Transport becomes active again, we don't need to reschedule another ping + keepAliveManager.onTransportActive(); + expect((keepAliveManager.state as PingScheduled).pingTimer, pingFuture); + }); + }); + test('transportShutsdownAfterPingScheduled', () { + FakeAsync().run((async) { + // Ping will be scheduled. + keepAliveManager.onTransportActive(); + expect(keepAliveManager.state, isA()); + // Transport is shutting down. + keepAliveManager.onTransportTermination(); + // Ping future should have been cancelled. + expect(keepAliveManager.state, isA()); + }); + }); + test('transportShutsdownAfterPingSent', () { + FakeAsync().run((async) { + keepAliveManager.onTransportActive(); + // Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled. + async.elapse(pingIntervalLeeway); + verify(pinger.ping()).called(1); + expect(keepAliveManager.state, isA()); + + // Transport is shutting down. + keepAliveManager.onTransportTermination(); + // Shutdown task has been cancelled. + expect(keepAliveManager.state, isA()); + }); + }); + test('pingSentThenIdleThenActiveThenAck', () { + FakeAsync().run((async) { + keepAliveManager.onTransportActive(); + // Forward clock to keepAliveTimeInNanos will send the ping. Shutdown task should be scheduled. + async.elapse(pingIntervalLeeway); + verify(pinger.ping()).called(1); + + // shutdown scheduled + expect(keepAliveManager.state, isA()); + + keepAliveManager.onTransportIdle(); + + keepAliveManager.onTransportActive(); + + keepAliveManager.onFrameReceived(); + + // another ping scheduled + expect(keepAliveManager.state, isA()); + async.elapse(pingIntervalLeeway); + verify(pinger.ping()).called(1); + }); + }); +} diff --git a/test/client_tests/client_keepalive_manager_test.mocks.dart b/test/client_tests/client_keepalive_manager_test.mocks.dart new file mode 100644 index 0000000..941de6a --- /dev/null +++ b/test/client_tests/client_keepalive_manager_test.mocks.dart @@ -0,0 +1,43 @@ +// Mocks generated by Mockito 5.4.1 from annotations +// in grpc/test/client_tests/client_keepalive_manager_test.dart. +// Do not manually edit this file. + +// @dart=2.19 + +// ignore_for_file: no_leading_underscores_for_library_prefixes +import 'package:mockito/mockito.dart' as _i1; + +import 'client_keepalive_manager_test.dart' as _i2; + +// ignore_for_file: type=lint +// ignore_for_file: avoid_redundant_argument_values +// ignore_for_file: avoid_setters_without_getters +// ignore_for_file: comment_references +// ignore_for_file: implementation_imports +// ignore_for_file: invalid_use_of_visible_for_testing_member +// ignore_for_file: prefer_const_constructors +// ignore_for_file: unnecessary_parenthesis +// ignore_for_file: camel_case_types +// ignore_for_file: subtype_of_sealed_class + +/// A class which mocks [Pinger]. +/// +/// See the documentation for Mockito's code generation for more information. +class MockPinger extends _i1.Mock implements _i2.Pinger { + @override + void ping() => super.noSuchMethod( + Invocation.method( + #ping, + [], + ), + returnValueForMissingStub: null, + ); + @override + void onPingTimeout() => super.noSuchMethod( + Invocation.method( + #onPingTimeout, + [], + ), + returnValueForMissingStub: null, + ); +} diff --git a/test/keepalive_test.dart b/test/keepalive_test.dart new file mode 100644 index 0000000..faeefba --- /dev/null +++ b/test/keepalive_test.dart @@ -0,0 +1,176 @@ +@TestOn('vm') +import 'dart:async'; + +import 'package:grpc/grpc.dart'; +import 'package:grpc/src/client/client_keepalive.dart'; +import 'package:grpc/src/client/connection.dart'; +import 'package:grpc/src/client/http2_connection.dart'; +import 'package:grpc/src/server/server_keepalive.dart'; +import 'package:http2/transport.dart'; +import 'package:test/test.dart'; + +import 'src/generated/echo.pbgrpc.dart'; + +void main() { + late Server server; + late EchoServiceClient fakeClient; + late FakeClientChannel fakeChannel; + late EchoServiceClient unresponsiveClient; + late ClientChannel unresponsiveChannel; + + setUp(() async { + final serverOptions = ServerKeepAliveOptions( + maxBadPings: 5, + minIntervalBetweenPingsWithoutData: Duration(milliseconds: 10), + ); + final clientOptions = ClientKeepAliveOptions( + pingInterval: Duration(milliseconds: 10), + timeout: Duration(milliseconds: 30), + permitWithoutCalls: true, + ); + + server = Server.create( + services: [FakeEchoService()], + keepAliveOptions: serverOptions, + ); + await server.serve(address: 'localhost', port: 8081); + fakeChannel = FakeClientChannel( + 'localhost', + port: server.port!, + options: ChannelOptions( + credentials: ChannelCredentials.insecure(), + keepAlive: clientOptions, + ), + ); + fakeClient = EchoServiceClient(fakeChannel); + + unresponsiveChannel = UnresponsiveClientChannel( + 'localhost', + port: server.port!, + options: ChannelOptions( + credentials: ChannelCredentials.insecure(), + keepAlive: clientOptions, + ), + ); + unresponsiveClient = EchoServiceClient(unresponsiveChannel); + }); + + tearDown(() async { + await fakeChannel.terminate(); + await server.shutdown(); + }); + + test('Server terminates connection after too many pings without data', + () async { + await fakeClient.echo(EchoRequest()); + await Future.delayed(Duration(milliseconds: 300)); + await fakeClient.echo(EchoRequest()); + // Check that the server closed the connection, the next request then has + // to build a new one. + expect(fakeChannel.newConnectionCounter, 2); + }); + + test('Server doesnt terminate connection after pings, as data is sent', + () async { + final timer = Timer.periodic( + Duration(milliseconds: 30), (timer) => fakeClient.echo(EchoRequest())); + await Future.delayed(Duration(milliseconds: 200), () => timer.cancel()); + // Check that the server never closed the connection + expect(fakeChannel.newConnectionCounter, 1); + }); + + test('Server doesnt ack the ping, making the client shutdown the connection', + () async { + await unresponsiveClient.echo(EchoRequest()); + await Future.delayed(Duration(milliseconds: 200)); + await expectLater( + unresponsiveClient.echo(EchoRequest()), throwsA(isA())); + }); +} + +/// A wrapper around a [FakeHttp2ClientConnection] +class FakeClientChannel extends ClientChannel { + late FakeHttp2ClientConnection fakeHttp2ClientConnection; + FakeClientChannel( + super.host, { + super.port = 443, + super.options = const ChannelOptions(), + super.channelShutdownHandler, + }); + + @override + ClientConnection createConnection() { + fakeHttp2ClientConnection = FakeHttp2ClientConnection(host, port, options); + return fakeHttp2ClientConnection; + } + + int get newConnectionCounter => + fakeHttp2ClientConnection.newConnectionCounter; +} + +/// A [Http2ClientConnection] exposing a counter for new connections +class FakeHttp2ClientConnection extends Http2ClientConnection { + int newConnectionCounter = 0; + + FakeHttp2ClientConnection(super.host, super.port, super.options); + + @override + Future connectTransport() { + newConnectionCounter++; + return super.connectTransport(); + } +} + +/// A wrapper around a [FakeHttp2ClientConnection] +class UnresponsiveClientChannel extends ClientChannel { + UnresponsiveClientChannel( + super.host, { + super.port = 443, + super.options = const ChannelOptions(), + super.channelShutdownHandler, + }); + + @override + ClientConnection createConnection() => + UnresponsiveHttp2ClientConnection(host, port, options); +} + +class UnresponsiveHttp2ClientConnection extends Http2ClientConnection { + UnresponsiveHttp2ClientConnection(super.host, super.port, super.options); + + @override + set keepAliveManager(ClientKeepAlive? value) { + if (value != null) { + super.keepAliveManager = FakeClientKeepAlive( + options: super.options.keepAlive, + ping: value.ping, + onPingTimeout: value.onPingTimeout, + ); + } + } +} + +class FakeClientKeepAlive extends ClientKeepAlive { + FakeClientKeepAlive( + {required super.options, + required super.ping, + required super.onPingTimeout}); + + @override + void onFrameReceived() { + // Do nothing here, to simulate a server not responding to pings. + } +} + +class FakeEchoService extends EchoServiceBase { + @override + Future echo(ServiceCall call, EchoRequest request) async => + EchoResponse(message: 'Echo messsage'); + + @override + Stream serverStreamingEcho( + ServiceCall call, ServerStreamingEchoRequest request) { + // TODO: implement serverStreamingEcho + throw UnimplementedError(); + } +} diff --git a/test/server_keepalive_manager_test.dart b/test/server_keepalive_manager_test.dart new file mode 100644 index 0000000..34f0afb --- /dev/null +++ b/test/server_keepalive_manager_test.dart @@ -0,0 +1,107 @@ +import 'dart:async'; + +import 'package:grpc/src/server/server_keepalive.dart'; +import 'package:test/test.dart'; + +void main() { + late StreamController pingStream; + late StreamController dataStream; + late int maxBadPings; + + var goAway = false; + + void initServer([ServerKeepAliveOptions? options]) => ServerKeepAlive( + options: options ?? + ServerKeepAliveOptions( + maxBadPings: maxBadPings, + minIntervalBetweenPingsWithoutData: Duration(milliseconds: 5), + ), + pingNotifier: pingStream.stream, + dataNotifier: dataStream.stream, + tooManyBadPings: () async => goAway = true, + ).handle(); + + setUp(() { + pingStream = StreamController(); + dataStream = StreamController(); + maxBadPings = 10; + goAway = false; + }); + + tearDown(() { + pingStream.close(); + dataStream.close(); + }); + + test('Sending too many pings without data kills connection', () async { + initServer(); + // Send good ping + pingStream.sink.add(null); + await Future.delayed(Duration(milliseconds: 10)); + + // Send [maxBadPings] bad pings, that's still ok + for (var i = 0; i < maxBadPings; i++) { + pingStream.sink.add(null); + } + await Future.delayed(Duration(milliseconds: 10)); + expect(goAway, false); + + // Send another bad ping; that's one too many! + pingStream.sink.add(null); + await Future.delayed(Duration(milliseconds: 10)); + expect(goAway, true); + }); + test( + 'Sending too many pings without data doesn`t kill connection if the server doesn`t care', + () async { + initServer(ServerKeepAliveOptions( + maxBadPings: null, + minIntervalBetweenPingsWithoutData: Duration(milliseconds: 5), + )); + // Send good ping + pingStream.sink.add(null); + await Future.delayed(Duration(milliseconds: 10)); + + // Send a lot of bad pings, that's still ok. + for (var i = 0; i < 50; i++) { + pingStream.sink.add(null); + } + await Future.delayed(Duration(milliseconds: 10)); + expect(goAway, false); + }); + + test('Sending many pings with data doesn`t kill connection', () async { + initServer(); + + // Send good ping + pingStream.sink.add(null); + await Future.delayed(Duration(milliseconds: 10)); + + // Send [maxBadPings] bad pings, that's still ok + for (var i = 0; i < maxBadPings; i++) { + pingStream.sink.add(null); + } + await Future.delayed(Duration(milliseconds: 10)); + expect(goAway, false); + + // Sending data resets the bad ping count + dataStream.add(null); + await Future.delayed(Duration(milliseconds: 10)); + + // Send good ping + pingStream.sink.add(null); + await Future.delayed(Duration(milliseconds: 10)); + + // Send [maxBadPings] bad pings, that's still ok + for (var i = 0; i < maxBadPings; i++) { + pingStream.sink.add(null); + } + await Future.delayed(Duration(milliseconds: 10)); + expect(goAway, false); + + // Send another bad ping; that's one too many! + pingStream.sink.add(null); + await Future.delayed(Duration(milliseconds: 10)); + expect(goAway, true); + }); +} diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index b40450f..852ebd0 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -18,6 +18,7 @@ import 'dart:convert'; import 'package:grpc/grpc.dart'; import 'package:grpc/src/client/channel.dart' as base; +import 'package:grpc/src/client/client_keepalive.dart'; import 'package:grpc/src/client/http2_connection.dart'; import 'package:grpc/src/shared/message.dart'; import 'package:http2/transport.dart'; @@ -76,6 +77,9 @@ class FakeChannelOptions implements ChannelOptions { BackoffStrategy backoffStrategy = testBackoff; @override CodecRegistry codecRegistry = CodecRegistry.empty(); + + @override + ClientKeepAliveOptions get keepAlive => const ClientKeepAliveOptions(); } class FakeChannel extends ClientChannel { diff --git a/test/src/client_utils.mocks.dart b/test/src/client_utils.mocks.dart index 9d39b58..98702df 100644 --- a/test/src/client_utils.mocks.dart +++ b/test/src/client_utils.mocks.dart @@ -1,101 +1,204 @@ -// Mocks generated by Mockito 5.0.0-nullsafety.6 from annotations +// Mocks generated by Mockito 5.4.1 from annotations // in grpc/test/src/client_utils.dart. // Do not manually edit this file. -import 'dart:async' as i3; +// @dart=2.19 -import 'package:http2/src/hpack/hpack.dart' as i4; -import 'package:http2/transport.dart' as i2; -import 'package:mockito/mockito.dart' as i1; +// ignore_for_file: no_leading_underscores_for_library_prefixes +import 'dart:async' as _i3; +import 'package:http2/src/hpack/hpack.dart' as _i4; +import 'package:http2/transport.dart' as _i2; +import 'package:mockito/mockito.dart' as _i1; + +// ignore_for_file: type=lint +// ignore_for_file: avoid_redundant_argument_values +// ignore_for_file: avoid_setters_without_getters // ignore_for_file: comment_references +// ignore_for_file: implementation_imports +// ignore_for_file: invalid_use_of_visible_for_testing_member +// ignore_for_file: prefer_const_constructors // ignore_for_file: unnecessary_parenthesis +// ignore_for_file: camel_case_types +// ignore_for_file: subtype_of_sealed_class -class _FakeClientTransportStream extends i1.Fake - implements i2.ClientTransportStream {} +class _FakeClientTransportStream_0 extends _i1.SmartFake + implements _i2.ClientTransportStream { + _FakeClientTransportStream_0( + Object parent, + Invocation parentInvocation, + ) : super( + parent, + parentInvocation, + ); +} -class _FakeStreamSink extends i1.Fake implements i3.StreamSink {} +class _FakeStreamSink_1 extends _i1.SmartFake implements _i3.StreamSink { + _FakeStreamSink_1( + Object parent, + Invocation parentInvocation, + ) : super( + parent, + parentInvocation, + ); +} /// A class which mocks [ClientTransportConnection]. /// /// See the documentation for Mockito's code generation for more information. -class MockClientTransportConnection extends i1.Mock - implements i2.ClientTransportConnection { +class MockClientTransportConnection extends _i1.Mock + implements _i2.ClientTransportConnection { MockClientTransportConnection() { - i1.throwOnMissingStub(this); + _i1.throwOnMissingStub(this); } @override - bool get isOpen => - (super.noSuchMethod(Invocation.getter(#isOpen), returnValue: false) - as bool); + bool get isOpen => (super.noSuchMethod( + Invocation.getter(#isOpen), + returnValue: false, + ) as bool); @override - set onActiveStateChanged(i2.ActiveStateHandler? callback) => - super.noSuchMethod(Invocation.setter(#onActiveStateChanged, callback), - returnValueForMissingStub: null); + set onActiveStateChanged(_i2.ActiveStateHandler? callback) => + super.noSuchMethod( + Invocation.setter( + #onActiveStateChanged, + callback, + ), + returnValueForMissingStub: null, + ); @override - i3.Future get onInitialPeerSettingsReceived => - (super.noSuchMethod(Invocation.getter(#onInitialPeerSettingsReceived), - returnValue: Future.value(null)) as i3.Future); + _i3.Future get onInitialPeerSettingsReceived => (super.noSuchMethod( + Invocation.getter(#onInitialPeerSettingsReceived), + returnValue: _i3.Future.value(), + ) as _i3.Future); @override - i2.ClientTransportStream makeRequest(List? headers, - {bool? endStream = false}) => + _i3.Stream get onPingReceived => (super.noSuchMethod( + Invocation.getter(#onPingReceived), + returnValue: _i3.Stream.empty(), + ) as _i3.Stream); + @override + _i3.Stream get onFrameReceived => (super.noSuchMethod( + Invocation.getter(#onFrameReceived), + returnValue: _i3.Stream.empty(), + ) as _i3.Stream); + @override + _i2.ClientTransportStream makeRequest( + List<_i4.Header>? headers, { + bool? endStream = false, + }) => (super.noSuchMethod( - Invocation.method(#makeRequest, [headers], {#endStream: endStream}), - returnValue: - _FakeClientTransportStream()) as i2.ClientTransportStream); + Invocation.method( + #makeRequest, + [headers], + {#endStream: endStream}, + ), + returnValue: _FakeClientTransportStream_0( + this, + Invocation.method( + #makeRequest, + [headers], + {#endStream: endStream}, + ), + ), + ) as _i2.ClientTransportStream); @override - i3.Future ping() => (super.noSuchMethod(Invocation.method(#ping, []), - returnValue: Future.value(null)) as i3.Future); + _i3.Future ping() => (super.noSuchMethod( + Invocation.method( + #ping, + [], + ), + returnValue: _i3.Future.value(), + ) as _i3.Future); @override - i3.Future finish() => - (super.noSuchMethod(Invocation.method(#finish, []), - returnValue: Future.value(null)) as i3.Future); + _i3.Future finish() => (super.noSuchMethod( + Invocation.method( + #finish, + [], + ), + returnValue: _i3.Future.value(), + ) as _i3.Future); @override - i3.Future terminate() => - (super.noSuchMethod(Invocation.method(#terminate, []), - returnValue: Future.value(null)) as i3.Future); + _i3.Future terminate([int? errorCode]) => (super.noSuchMethod( + Invocation.method( + #terminate, + [errorCode], + ), + returnValue: _i3.Future.value(), + ) as _i3.Future); } /// A class which mocks [ClientTransportStream]. /// /// See the documentation for Mockito's code generation for more information. -class MockClientTransportStream extends i1.Mock - implements i2.ClientTransportStream { +class MockClientTransportStream extends _i1.Mock + implements _i2.ClientTransportStream { MockClientTransportStream() { - i1.throwOnMissingStub(this); + _i1.throwOnMissingStub(this); } @override - i3.Stream get peerPushes => - (super.noSuchMethod(Invocation.getter(#peerPushes), - returnValue: Stream.empty()) - as i3.Stream); + _i3.Stream<_i2.TransportStreamPush> get peerPushes => (super.noSuchMethod( + Invocation.getter(#peerPushes), + returnValue: _i3.Stream<_i2.TransportStreamPush>.empty(), + ) as _i3.Stream<_i2.TransportStreamPush>); @override - int get id => - (super.noSuchMethod(Invocation.getter(#id), returnValue: 0) as int); + int get id => (super.noSuchMethod( + Invocation.getter(#id), + returnValue: 0, + ) as int); @override - i3.Stream get incomingMessages => - (super.noSuchMethod(Invocation.getter(#incomingMessages), - returnValue: Stream.empty()) - as i3.Stream); + _i3.Stream<_i2.StreamMessage> get incomingMessages => (super.noSuchMethod( + Invocation.getter(#incomingMessages), + returnValue: _i3.Stream<_i2.StreamMessage>.empty(), + ) as _i3.Stream<_i2.StreamMessage>); @override - i3.StreamSink get outgoingMessages => - (super.noSuchMethod(Invocation.getter(#outgoingMessages), - returnValue: _FakeStreamSink()) - as i3.StreamSink); + _i3.StreamSink<_i2.StreamMessage> get outgoingMessages => (super.noSuchMethod( + Invocation.getter(#outgoingMessages), + returnValue: _FakeStreamSink_1<_i2.StreamMessage>( + this, + Invocation.getter(#outgoingMessages), + ), + ) as _i3.StreamSink<_i2.StreamMessage>); @override - set onTerminated(void Function(int?)? value) => - super.noSuchMethod(Invocation.setter(#onTerminated, value), - returnValueForMissingStub: null); + set onTerminated(void Function(int?)? value) => super.noSuchMethod( + Invocation.setter( + #onTerminated, + value, + ), + returnValueForMissingStub: null, + ); @override - void sendHeaders(List? headers, {bool? endStream = false}) => + void terminate() => super.noSuchMethod( + Invocation.method( + #terminate, + [], + ), + returnValueForMissingStub: null, + ); + @override + void sendHeaders( + List<_i4.Header>? headers, { + bool? endStream = false, + }) => super.noSuchMethod( - Invocation.method(#sendHeaders, [headers], {#endStream: endStream}), - returnValueForMissingStub: null); + Invocation.method( + #sendHeaders, + [headers], + {#endStream: endStream}, + ), + returnValueForMissingStub: null, + ); @override - void sendData(List? bytes, {bool? endStream = false}) => + void sendData( + List? bytes, { + bool? endStream = false, + }) => super.noSuchMethod( - Invocation.method(#sendData, [bytes], {#endStream: endStream}), - returnValueForMissingStub: null); + Invocation.method( + #sendData, + [bytes], + {#endStream: endStream}, + ), + returnValueForMissingStub: null, + ); } diff --git a/test/src/server_utils.dart b/test/src/server_utils.dart index e6f3906..24a9410 100644 --- a/test/src/server_utils.dart +++ b/test/src/server_utils.dart @@ -134,13 +134,25 @@ class ServerHarness extends _Harness { class ConnectionServerHarness extends _Harness { @override - ConnectionServer createServer() => - ConnectionServer([service], [interceptor]); + ConnectionServer createServer() => ConnectionServer( + [service], + [interceptor], + ); - static ServiceMethod createMethod(String name, - Function methodHandler, bool clientStreaming, bool serverStreaming) { - return ServiceMethod(name, methodHandler, clientStreaming, - serverStreaming, mockDecode, mockEncode); + static ServiceMethod createMethod( + String name, + Function methodHandler, + bool clientStreaming, + bool serverStreaming, + ) { + return ServiceMethod( + name, + methodHandler, + clientStreaming, + serverStreaming, + mockDecode, + mockEncode, + ); } }