From 3f05c37367c2d88f581d24734e625c10052aea39 Mon Sep 17 00:00:00 2001 From: Moritz Date: Thu, 24 Aug 2023 10:42:21 +0200 Subject: [PATCH] Remove canceled streams (#661) * Remove canceled streams * Inline variabel * Add changelog * Add test * Add description to test * More messages for less flakiness --- CHANGELOG.md | 1 + lib/src/server/handler.dart | 21 +++++--- lib/src/server/server.dart | 16 +++--- test/server_cancellation_test.dart | 83 ++++++++++++++++++++++++++++++ 4 files changed, 109 insertions(+), 12 deletions(-) create mode 100644 test/server_cancellation_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index bb62e8e..6f88c54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * Forward internal `GrpcError` on when throwing while sending a request. * Add support for proxies, see [#33](https://github.com/grpc/grpc-dart/issues/33). +* Remove canceled `ServerHandler`s from tracking list. ## 3.2.3 diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index 04dca8f..ecbebe1 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -60,7 +60,6 @@ class ServerHandler extends ServiceCall { Map? _customTrailers = {}; DateTime? _deadline; - bool _isCanceled = false; bool _isTimedOut = false; Timer? _timeoutTimer; @@ -70,6 +69,16 @@ class ServerHandler extends ServiceCall { /// Emits a ping everytime data is received final Sink? onDataReceived; + final Completer _isCanceledCompleter = Completer(); + + Future get onCanceled => _isCanceledCompleter.future; + + set isCanceled(bool value) { + if (!isCanceled) { + _isCanceledCompleter.complete(); + } + } + ServerHandler({ required ServerTransportStream stream, required ServiceLookup serviceLookup, @@ -91,7 +100,7 @@ class ServerHandler extends ServiceCall { DateTime? get deadline => _deadline; @override - bool get isCanceled => _isCanceled; + bool get isCanceled => _isCanceledCompleter.isCompleted; @override bool get isTimedOut => _isTimedOut; @@ -247,9 +256,9 @@ class ServerHandler extends ServiceCall { } void _onTimedOut() { - if (_isCanceled) return; + if (isCanceled) return; _isTimedOut = true; - _isCanceled = true; + isCanceled = true; final error = GrpcError.deadlineExceeded('Deadline exceeded'); _sendError(error); if (!_requests!.isClosed) { @@ -408,7 +417,7 @@ class ServerHandler extends ServiceCall { // Exception from the incoming stream. Most likely a cancel request from the // client, so we treat it as such. _timeoutTimer?.cancel(); - _isCanceled = true; + isCanceled = true; if (_requests != null && !_requests!.isClosed) { _requests!.addError(GrpcError.cancelled('Cancelled')); } @@ -455,7 +464,7 @@ class ServerHandler extends ServiceCall { } void cancel() { - _isCanceled = true; + isCanceled = true; _timeoutTimer?.cancel(); _cancelResponseSubscription(); } diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index 30b7596..f571230 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -90,7 +90,9 @@ class ConnectionServer { final CodecRegistry? _codecRegistry; final GrpcErrorHandler? _errorHandler; final ServerKeepAliveOptions _keepAliveOptions; - final Map> _handlers = {}; + + @visibleForTesting + final Map> handlers = {}; final _connections = []; @@ -117,7 +119,7 @@ class ConnectionServer { InternetAddress? remoteAddress, }) async { _connections.add(connection); - _handlers[connection] = []; + handlers[connection] = []; // TODO(jakobr): Set active state handlers, close connection after idle // timeout. final onDataReceivedController = StreamController(); @@ -129,12 +131,14 @@ class ConnectionServer { dataNotifier: onDataReceivedController.stream, ).handle(); connection.incomingStreams.listen((stream) { - _handlers[connection]!.add(serveStream_( + final handler = serveStream_( stream: stream, clientCertificate: clientCertificate, remoteAddress: remoteAddress, onDataReceived: onDataReceivedController.sink, - )); + ); + handler.onCanceled.then((_) => handlers[connection]?.remove(handler)); + handlers[connection]!.add(handler); }, onError: (error, stackTrace) { if (error is Error) { Zone.current.handleUncaughtError(error, stackTrace); @@ -144,11 +148,11 @@ class ConnectionServer { // half-closed tcp streams. // Half-closed streams seems to not be fully supported by package:http2. // https://github.com/dart-lang/http2/issues/42 - for (var handler in _handlers[connection]!) { + for (var handler in handlers[connection]!) { handler.cancel(); } _connections.remove(connection); - _handlers.remove(connection); + handlers.remove(connection); await onDataReceivedController.close(); }); } diff --git a/test/server_cancellation_test.dart b/test/server_cancellation_test.dart new file mode 100644 index 0000000..ff4cd85 --- /dev/null +++ b/test/server_cancellation_test.dart @@ -0,0 +1,83 @@ +// Copyright (c) 2023, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +@TestOn('vm') +import 'package:grpc/grpc.dart'; +import 'package:test/test.dart'; + +import 'src/generated/echo.pbgrpc.dart'; + +class EchoService extends EchoServiceBase { + @override + Future echo(ServiceCall call, EchoRequest request) => + throw UnimplementedError(); + + @override + Stream serverStreamingEcho( + ServiceCall call, ServerStreamingEchoRequest request) async* { + for (var i = 0; i < request.messageCount; i++) { + yield ServerStreamingEchoResponse(message: '$i'); + await Future.delayed(Duration(milliseconds: request.messageInterval)); + } + } +} + +void main() { + late Server server; + late ClientChannel channel; + + int numberHandlers() => + server.handlers.entries.firstOrNull?.value.length ?? 0; + + setUp(() async { + server = Server.create( + services: [EchoService()], + ); + await server.serve(address: 'localhost', port: 8081); + channel = ClientChannel( + 'localhost', + port: server.port!, + options: ChannelOptions(credentials: ChannelCredentials.insecure()), + ); + }); + + tearDown(() async { + await channel.shutdown(); + await server.shutdown(); + }); + + test('Handlers get removed from map after stream is done.', () async { + final request = ServerStreamingEchoRequest( + messageCount: 5, + messageInterval: 5, + ); + final stream1 = EchoServiceClient(channel) + .serverStreamingEcho(request) + .asBroadcastStream(); + final stream2 = EchoServiceClient(channel) + .serverStreamingEcho(request) + .asBroadcastStream(); + + expect(numberHandlers(), 0); + + await stream1.take(1).toList(); + await stream2.take(1).toList(); + expect(numberHandlers(), 2); + + await stream1.drain(); + await stream2.drain(); + expect(numberHandlers(), 0); + }); +}