allows async interceptors (#120)

* allows async interceptors

* prevent chunks to come before starting streaming request
This commit is contained in:
Alexandre Ardhuin 2018-09-18 08:17:39 +02:00 committed by Sigurd Meldgaard
parent 97e47bd3e7
commit 3e3ba6682f
7 changed files with 74 additions and 32 deletions

View File

@ -5,3 +5,4 @@
Google Inc.
German Saprykin <saprykin.h@gmail.com>
Alexandre Ardhuin <alexandre.ardhuin@gmail.com>

View File

@ -1,3 +1,7 @@
## 0.6.5
* Interceptors are now async.
## 0.6.4
* Update dependencies to be compatible with Dart 2.

View File

@ -93,12 +93,14 @@ class ServerHandler extends ServiceCall {
// -- Idle state, incoming data --
void _onDataIdle(GrpcMessage message) {
void _onDataIdle(GrpcMessage message) async {
if (message is! GrpcMetadata) {
_sendError(new GrpcError.unimplemented('Expected header frame'));
_sinkIncoming();
return;
}
_incomingSubscription.pause();
final headerMessage = message
as GrpcMetadata; // TODO(jakobr): Cast should not be necessary here.
_clientMetadata = headerMessage.metadata;
@ -120,7 +122,7 @@ class ServerHandler extends ServiceCall {
return;
}
final error = _applyInterceptors();
final error = await _applyInterceptors();
if (error != null) {
_sendError(error);
_sinkIncoming();
@ -130,10 +132,10 @@ class ServerHandler extends ServiceCall {
_startStreamingRequest();
}
GrpcError _applyInterceptors() {
Future<GrpcError> _applyInterceptors() async {
try {
for (final interceptor in _interceptors) {
final error = interceptor(this, this._descriptor);
final error = await interceptor(this, this._descriptor);
if (error != null) {
return error;
}
@ -146,7 +148,6 @@ class ServerHandler extends ServiceCall {
}
void _startStreamingRequest() {
_incomingSubscription.pause();
_requests = _descriptor.createRequestStream(_incomingSubscription);
_incomingSubscription.onData(_onDataActive);
@ -297,8 +298,8 @@ class ServerHandler extends ServiceCall {
}
final outgoingTrailers = <Header>[];
outgoingTrailersMap.forEach((key, value) =>
outgoingTrailers.add(new Header(ascii.encode(key), utf8.encode(value))));
outgoingTrailersMap.forEach((key, value) => outgoingTrailers
.add(new Header(ascii.encode(key), utf8.encode(value))));
_stream.sendHeaders(outgoingTrailers, endStream: true);
// We're done!
_cancelResponseSubscription();

View File

@ -1,5 +1,6 @@
import '../shared/status.dart';
import 'dart:async';
import '../shared/status.dart';
import 'call.dart';
import 'service.dart';
@ -9,5 +10,5 @@ import 'service.dart';
/// If the interceptor returns a [GrpcError], the error will be returned as a response and [ServiceMethod] wouldn't be called.
/// If the interceptor throws [Exception], [GrpcError.internal] with exception.toString() will be returned.
/// If the interceptor returns null, the corresponding [ServiceMethod] of [Service] will be called.
typedef Interceptor = GrpcError Function(
typedef Interceptor = FutureOr<GrpcError> Function(
ServiceCall call, ServiceMethod method);

View File

@ -1,6 +1,6 @@
name: grpc
description: Dart implementation of gRPC.
version: 0.6.4
version: 0.6.5
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/grpc-dart

View File

@ -279,7 +279,7 @@ void main() {
});
group('Server with interceptor', () {
test('processes calls if interceptor allows request', () async {
group('processes calls if interceptor allows request', () {
const expectedRequest = 5;
const expectedResponse = 7;
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
@ -287,48 +287,83 @@ void main() {
return expectedResponse;
}
GrpcError interceptorHandler(ServiceCall call, ServiceMethod method) {
final Interceptor interceptor = (call, method) {
if (method.name == "Unary") {
return null;
}
return new GrpcError.unauthenticated('Request is unauthenticated');
};
Future<void> doTest(Interceptor handler) async {
harness
..interceptor.handler = handler
..service.unaryHandler = methodHandler
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
await harness.fromServer.done;
}
harness
..interceptor.handler = interceptorHandler
..service.unaryHandler = methodHandler
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
await harness.fromServer.done;
test('with sync interceptor', () => doTest(interceptor));
test('with async interceptor',
() => doTest((call, method) async => interceptor(call, method)));
});
test('returns error if interceptor blocks request', () async {
GrpcError interceptorHandler(ServiceCall call, ServiceMethod method) {
group('returns error if interceptor blocks request', () {
final Interceptor interceptor = (call, method) {
if (method.name == "Unary") {
return new GrpcError.unauthenticated('Request is unauthenticated');
}
return null;
};
Future<void> doTest(Interceptor handler) async {
harness
..interceptor.handler = handler
..expectErrorResponse(
StatusCode.unauthenticated, 'Request is unauthenticated')
..sendRequestHeader('/Test/Unary');
await harness.fromServer.done;
}
harness
..interceptor.handler = interceptorHandler
..expectErrorResponse(
StatusCode.unauthenticated, 'Request is unauthenticated')
..sendRequestHeader('/Test/Unary');
await harness.fromServer.done;
test('with sync interceptor', () => doTest(interceptor));
test('with async interceptor',
() => doTest((call, method) async => interceptor(call, method)));
});
test('returns internal error if interceptor throws exception', () async {
GrpcError interceptorHandler(ServiceCall call, ServiceMethod method) {
group('returns internal error if interceptor throws exception', () {
final Interceptor interceptor = (call, method) {
throw new Exception('Reason is unknown');
};
Future<void> doTest(Interceptor handler) async {
harness
..interceptor.handler = handler
..expectErrorResponse(
StatusCode.internal, 'Exception: Reason is unknown')
..sendRequestHeader('/Test/Unary');
await harness.fromServer.done;
}
test('with sync interceptor', () => doTest(interceptor));
test('with async interceptor',
() => doTest((call, method) async => interceptor(call, method)));
});
test("don't fail if interceptor await 2 times", () async {
final Interceptor interceptor = (call, method) async {
await Future.value();
await Future.value();
throw new Exception('Reason is unknown');
};
harness
..interceptor.handler = interceptorHandler
..interceptor.handler = interceptor
..expectErrorResponse(
StatusCode.internal, 'Exception: Reason is unknown')
..sendRequestHeader('/Test/Unary');
..sendRequestHeader('/Test/Unary')
..sendData(1);
await harness.fromServer.done;
});

View File

@ -81,7 +81,7 @@ class TestService extends Service {
class TestInterceptor {
Interceptor handler;
GrpcError call(ServiceCall call, ServiceMethod method) {
FutureOr<GrpcError> call(ServiceCall call, ServiceMethod method) {
if (handler == null) {
return null;
}