Remove canceled streams (#661)

* Remove canceled streams

* Inline variabel

* Add changelog

* Add test

* Add description to test

* More messages for less flakiness
This commit is contained in:
Moritz 2023-08-24 10:42:21 +02:00 committed by GitHub
parent 4ccd8a0e3d
commit 3f05c37367
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 109 additions and 12 deletions

View File

@ -2,6 +2,7 @@
* Forward internal `GrpcError` on when throwing while sending a request.
* Add support for proxies, see [#33](https://github.com/grpc/grpc-dart/issues/33).
* Remove canceled `ServerHandler`s from tracking list.
## 3.2.3

View File

@ -60,7 +60,6 @@ class ServerHandler extends ServiceCall {
Map<String, String>? _customTrailers = {};
DateTime? _deadline;
bool _isCanceled = false;
bool _isTimedOut = false;
Timer? _timeoutTimer;
@ -70,6 +69,16 @@ class ServerHandler extends ServiceCall {
/// Emits a ping everytime data is received
final Sink<void>? onDataReceived;
final Completer<void> _isCanceledCompleter = Completer<void>();
Future<void> get onCanceled => _isCanceledCompleter.future;
set isCanceled(bool value) {
if (!isCanceled) {
_isCanceledCompleter.complete();
}
}
ServerHandler({
required ServerTransportStream stream,
required ServiceLookup serviceLookup,
@ -91,7 +100,7 @@ class ServerHandler extends ServiceCall {
DateTime? get deadline => _deadline;
@override
bool get isCanceled => _isCanceled;
bool get isCanceled => _isCanceledCompleter.isCompleted;
@override
bool get isTimedOut => _isTimedOut;
@ -247,9 +256,9 @@ class ServerHandler extends ServiceCall {
}
void _onTimedOut() {
if (_isCanceled) return;
if (isCanceled) return;
_isTimedOut = true;
_isCanceled = true;
isCanceled = true;
final error = GrpcError.deadlineExceeded('Deadline exceeded');
_sendError(error);
if (!_requests!.isClosed) {
@ -408,7 +417,7 @@ class ServerHandler extends ServiceCall {
// Exception from the incoming stream. Most likely a cancel request from the
// client, so we treat it as such.
_timeoutTimer?.cancel();
_isCanceled = true;
isCanceled = true;
if (_requests != null && !_requests!.isClosed) {
_requests!.addError(GrpcError.cancelled('Cancelled'));
}
@ -455,7 +464,7 @@ class ServerHandler extends ServiceCall {
}
void cancel() {
_isCanceled = true;
isCanceled = true;
_timeoutTimer?.cancel();
_cancelResponseSubscription();
}

View File

@ -90,7 +90,9 @@ class ConnectionServer {
final CodecRegistry? _codecRegistry;
final GrpcErrorHandler? _errorHandler;
final ServerKeepAliveOptions _keepAliveOptions;
final Map<ServerTransportConnection, List<ServerHandler>> _handlers = {};
@visibleForTesting
final Map<ServerTransportConnection, List<ServerHandler>> handlers = {};
final _connections = <ServerTransportConnection>[];
@ -117,7 +119,7 @@ class ConnectionServer {
InternetAddress? remoteAddress,
}) async {
_connections.add(connection);
_handlers[connection] = [];
handlers[connection] = [];
// TODO(jakobr): Set active state handlers, close connection after idle
// timeout.
final onDataReceivedController = StreamController<void>();
@ -129,12 +131,14 @@ class ConnectionServer {
dataNotifier: onDataReceivedController.stream,
).handle();
connection.incomingStreams.listen((stream) {
_handlers[connection]!.add(serveStream_(
final handler = serveStream_(
stream: stream,
clientCertificate: clientCertificate,
remoteAddress: remoteAddress,
onDataReceived: onDataReceivedController.sink,
));
);
handler.onCanceled.then((_) => handlers[connection]?.remove(handler));
handlers[connection]!.add(handler);
}, onError: (error, stackTrace) {
if (error is Error) {
Zone.current.handleUncaughtError(error, stackTrace);
@ -144,11 +148,11 @@ class ConnectionServer {
// half-closed tcp streams.
// Half-closed streams seems to not be fully supported by package:http2.
// https://github.com/dart-lang/http2/issues/42
for (var handler in _handlers[connection]!) {
for (var handler in handlers[connection]!) {
handler.cancel();
}
_connections.remove(connection);
_handlers.remove(connection);
handlers.remove(connection);
await onDataReceivedController.close();
});
}

View File

@ -0,0 +1,83 @@
// Copyright (c) 2023, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
@TestOn('vm')
import 'package:grpc/grpc.dart';
import 'package:test/test.dart';
import 'src/generated/echo.pbgrpc.dart';
class EchoService extends EchoServiceBase {
@override
Future<EchoResponse> echo(ServiceCall call, EchoRequest request) =>
throw UnimplementedError();
@override
Stream<ServerStreamingEchoResponse> serverStreamingEcho(
ServiceCall call, ServerStreamingEchoRequest request) async* {
for (var i = 0; i < request.messageCount; i++) {
yield ServerStreamingEchoResponse(message: '$i');
await Future.delayed(Duration(milliseconds: request.messageInterval));
}
}
}
void main() {
late Server server;
late ClientChannel channel;
int numberHandlers() =>
server.handlers.entries.firstOrNull?.value.length ?? 0;
setUp(() async {
server = Server.create(
services: [EchoService()],
);
await server.serve(address: 'localhost', port: 8081);
channel = ClientChannel(
'localhost',
port: server.port!,
options: ChannelOptions(credentials: ChannelCredentials.insecure()),
);
});
tearDown(() async {
await channel.shutdown();
await server.shutdown();
});
test('Handlers get removed from map after stream is done.', () async {
final request = ServerStreamingEchoRequest(
messageCount: 5,
messageInterval: 5,
);
final stream1 = EchoServiceClient(channel)
.serverStreamingEcho(request)
.asBroadcastStream();
final stream2 = EchoServiceClient(channel)
.serverStreamingEcho(request)
.asBroadcastStream();
expect(numberHandlers(), 0);
await stream1.take(1).toList();
await stream2.take(1).toList();
expect(numberHandlers(), 2);
await stream1.drain();
await stream2.drain();
expect(numberHandlers(), 0);
});
}