Fix concurrent modification error in GrpcWebClientChannel.terminate

Fixes #331

Co-authored-by: Vyacheslav Egorov <vegorov@google.com>
This commit is contained in:
Isaac Saldana 2020-11-12 03:52:04 -08:00 committed by GitHub
parent f1c475603f
commit 275cc544c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 31 deletions

View File

@ -7,10 +7,12 @@
newer of protobuf compiler plugin. newer of protobuf compiler plugin.
* `Client.$createCall` is deprecated because it does not invoke client * `Client.$createCall` is deprecated because it does not invoke client
interceptors. interceptors.
* Fix an issue [#380](https://github.com/grpc/grpc-dart/issues/380) causing * Fix issue [#380](https://github.com/grpc/grpc-dart/issues/380) causing
incorrect duplicated headers in gRPC-Web requests. incorrect duplicated headers in gRPC-Web requests.
* Change minimum required Dart SDK to 2.8 to enable access to Unix domain sockets. * Change minimum required Dart SDK to 2.8 to enable access to Unix domain sockets.
* Add support for Unix domain sockets in `Socket.serve` and `ClientChannel`. * Add support for Unix domain sockets in `Socket.serve` and `ClientChannel`.
* Fix issue [#331](https://github.com/grpc/grpc-dart/issues/331) causing
an exception in `GrpcWebClientChannel.terminate()`.
## 2.7.0 ## 2.7.0

View File

@ -214,7 +214,7 @@ class XhrClientConnection extends ClientConnection {
@override @override
Future<void> terminate() async { Future<void> terminate() async {
for (XhrTransportStream request in _requests) { for (var request in List.of(_requests)) {
request.terminate(); request.terminate();
} }
} }

View File

@ -11,42 +11,88 @@ import 'package:grpc/grpc_web.dart';
import 'src/generated/echo.pbgrpc.dart'; import 'src/generated/echo.pbgrpc.dart';
void main() { void main() {
GrpcWebServer server;
setUpAll(() async {
server = await GrpcWebServer.start();
});
tearDownAll(() async {
await server.shutdown();
});
// Test verifies that gRPC-web echo example works by talking to a gRPC // Test verifies that gRPC-web echo example works by talking to a gRPC
// server (written in Dart) via gRPC-web protocol through a third party // server (written in Dart) via gRPC-web protocol through a third party
// gRPC-web proxy. // gRPC-web proxy.
test('gRPC-web echo test', () async { test('gRPC-web echo test', () async {
final server = await GrpcWebServer.start(); final channel = GrpcWebClientChannel.xhr(server.uri);
try { final service = EchoServiceClient(channel);
final channel = GrpcWebClientChannel.xhr(server.uri);
final service = EchoServiceClient(channel);
const testMessage = 'hello from gRPC-web'; const testMessage = 'hello from gRPC-web';
// First test a simple echo request. // First test a simple echo request.
final response = await service.echo(EchoRequest()..message = testMessage); final response = await service.echo(EchoRequest()..message = testMessage);
expect(response.message, equals(testMessage));
// Now test that streaming requests also works by asking echo server
// to send us a number of messages every 100 ms. Check that we receive
// them fast enough (if streaming is broken we will receive all of them
// in one go).
final sw = Stopwatch()..start();
final timings = await service
.serverStreamingEcho(ServerStreamingEchoRequest()
..message = testMessage
..messageCount = 20
..messageInterval = 100)
.map((response) {
expect(response.message, equals(testMessage)); expect(response.message, equals(testMessage));
final timing = sw.elapsedMilliseconds;
sw.reset();
return timing;
}).toList();
final maxDelay = timings.reduce(math.max);
expect(maxDelay, lessThan(500));
});
// Now test that streaming requests also works by asking echo server // Verify that terminate does not cause an exception when terminating
// to send us a number of messages every 100 ms. Check that we receive // channel with multiple active requests.
// them fast enough (if streaming is broken we will receive all of them test("terminate works", () async {
// in one go). final channel = GrpcWebClientChannel.xhr(server.uri);
final sw = Stopwatch()..start(); final service = EchoServiceClient(channel);
final timings = await service
.serverStreamingEcho(ServerStreamingEchoRequest() const testMessage = 'hello from gRPC-web';
..message = testMessage
..messageCount = 20 // First test a simple echo request.
..messageInterval = 100) final response = await service.echo(EchoRequest()..message = testMessage);
.map((response) { expect(response.message, equals(testMessage));
expect(response.message, equals(testMessage));
final timing = sw.elapsedMilliseconds; var terminated = false;
sw.reset();
return timing; service
}).toList(); .serverStreamingEcho(ServerStreamingEchoRequest()
final maxDelay = timings.reduce(math.max); ..message = testMessage
expect(maxDelay, lessThan(500)); ..messageCount = 20
} finally { ..messageInterval = 100)
await server.shutdown(); .listen((response) {
} expect(response.message, equals(testMessage));
}, onError: (e) {
expect(terminated, isTrue);
});
service
.serverStreamingEcho(ServerStreamingEchoRequest()
..message = testMessage
..messageCount = 20
..messageInterval = 100)
.listen((response) {
expect(response.message, equals(testMessage));
}, onError: (e) {
expect(terminated, isTrue);
});
await Future.delayed(Duration(milliseconds: 500));
terminated = true;
await channel.terminate();
}); });
} }
@ -72,7 +118,8 @@ class GrpcWebServer {
static Future<GrpcWebServer> start() async { static Future<GrpcWebServer> start() async {
// Spawn the server code on the server side, it will send us back port // Spawn the server code on the server side, it will send us back port
// number we should be talking to. // number we should be talking to.
final serverChannel = spawnHybridUri('grpc_web_server.dart'); final serverChannel =
spawnHybridUri('grpc_web_server.dart', stayAlive: true);
final portCompleter = Completer<int>(); final portCompleter = Completer<int>();
final exitCompleter = Completer<void>(); final exitCompleter = Completer<void>();
serverChannel.stream.listen((event) { serverChannel.stream.listen((event) {