Propagate cancellation from responses stream to the call (#376)

It is not enough to just cancel the subscription.

Fixes #368
This commit is contained in:
Vyacheslav Egorov 2020-10-29 12:56:49 +01:00 committed by GitHub
parent b6e40c34e3
commit 7ea15a8160
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 7 deletions

View File

@ -265,7 +265,7 @@ class ClientCall<Q, R> implements Response {
} }
_responses.onPause = _responseSubscription.pause; _responses.onPause = _responseSubscription.pause;
_responses.onResume = _responseSubscription.resume; _responses.onResume = _responseSubscription.resume;
_responses.onCancel = _responseSubscription.cancel; _responses.onCancel = cancel;
} }
} }

View File

@ -28,10 +28,15 @@ class TestService extends Service {
(List<int> value) => value[0], (int value) => [value])); (List<int> value) => value[0], (int value) => [value]));
} }
static const requestFiniteStream = 1;
static const requestInfiniteStream = 2;
Stream<int> stream(ServiceCall call, Future request) async* { Stream<int> stream(ServiceCall call, Future request) async* {
yield 1; final isInfinite = 2 == await request;
yield 2; for (var i = 1; i <= 3 || isInfinite; i++) {
yield 3; yield i;
await Future.delayed(Duration(milliseconds: 100));
}
} }
} }
@ -62,7 +67,8 @@ main() async {
ChannelOptions(credentials: ChannelCredentials.insecure()), ChannelOptions(credentials: ChannelCredentials.insecure()),
)); ));
final testClient = TestClient(channel); final testClient = TestClient(channel);
expect(await testClient.stream(1).toList(), [1, 2, 3]); expect(await testClient.stream(TestService.requestFiniteStream).toList(),
[1, 2, 3]);
server.shutdown(); server.shutdown();
}); });
@ -84,7 +90,8 @@ main() async {
authority: 'localhost')), authority: 'localhost')),
)); ));
final testClient = TestClient(channel); final testClient = TestClient(channel);
expect(await testClient.stream(1).toList(), [1, 2, 3]); expect(await testClient.stream(TestService.requestFiniteStream).toList(),
[1, 2, 3]);
server.shutdown(); server.shutdown();
}); });
@ -98,7 +105,24 @@ main() async {
ChannelOptions(credentials: ChannelCredentials.insecure()), ChannelOptions(credentials: ChannelCredentials.insecure()),
)); ));
final testClient = TestClient(channel); final testClient = TestClient(channel);
await expectLater(testClient.stream(1).toList(), throwsA(isA<GrpcError>())); await expectLater(
testClient.stream(TestService.requestFiniteStream).toList(),
throwsA(isA<GrpcError>()));
await server.shutdown();
});
test('cancellation of streaming subscription propagates properly', () async {
final Server server = Server([TestService()]);
await server.serve(address: 'localhost', port: 0);
final channel = FixedConnectionClientChannel(Http2ClientConnection(
'localhost',
server.port,
ChannelOptions(credentials: ChannelCredentials.insecure()),
));
final testClient = TestClient(channel);
expect(await testClient.stream(TestService.requestInfiniteStream).first, 1);
await channel.shutdown();
await server.shutdown(); await server.shutdown();
}); });
} }