Added interceptors. (#86)

* Add draft implementation of interceptors.

* Fix review comment.

* Revert example files.

* Revert interop files.

* Revert interop files.

* Format.

* Fix typos.
This commit is contained in:
German Saprykin 2018-07-09 22:57:07 +08:00 committed by Michael Thomsen
parent 694332921a
commit 847a3625e1
9 changed files with 129 additions and 5 deletions

View File

@ -4,3 +4,4 @@
# Name/Organization <email address>
Google Inc.
German Saprykin <saprykin.h@gmail.com>

View File

@ -1,3 +1,7 @@
## 0.5.0 - 2018-06-29
* Implemented interceptors.
## 0.4.1 - 2018-04-04
* Fixes for supporting Dart 2.

View File

@ -25,6 +25,7 @@ export 'src/client/options.dart';
export 'src/server/call.dart';
export 'src/server/handler.dart';
export 'src/server/interceptor.dart';
export 'src/server/server.dart';
export 'src/server/service.dart';

View File

@ -22,12 +22,14 @@ import '../shared/streams.dart';
import '../shared/timeout.dart';
import 'call.dart';
import 'interceptor.dart';
import 'service.dart';
/// Handles an incoming gRPC call.
class ServerHandler extends ServiceCall {
final ServerTransportStream _stream;
final Service Function(String service) _serviceLookup;
final List<Interceptor> _interceptors;
StreamSubscription<GrpcMessage> _incomingSubscription;
@ -51,14 +53,19 @@ class ServerHandler extends ServiceCall {
bool _isTimedOut = false;
Timer _timeoutTimer;
ServerHandler(this._serviceLookup, this._stream);
ServerHandler(this._serviceLookup, this._stream,
[this._interceptors = const <Interceptor>[]]);
DateTime get deadline => _deadline;
bool get isCanceled => _isCanceled;
bool get isTimedOut => _isTimedOut;
Map<String, String> get clientMetadata => _clientMetadata;
Map<String, String> get headers => _customHeaders;
Map<String, String> get trailers => _customTrailers;
void handle() {
@ -103,6 +110,7 @@ class ServerHandler extends ServiceCall {
}
final serviceName = pathSegments[1];
final methodName = pathSegments[2];
_service = _serviceLookup(serviceName);
_descriptor = _service?.$lookupMethod(methodName);
if (_descriptor == null) {
@ -110,9 +118,32 @@ class ServerHandler extends ServiceCall {
_sinkIncoming();
return;
}
final error = _applyInterceptors();
if (error != null) {
_sendError(error);
_sinkIncoming();
return;
}
_startStreamingRequest();
}
GrpcError _applyInterceptors() {
try {
for (final interceptor in _interceptors) {
final error = interceptor(this, this._descriptor);
if (error != null) {
return error;
}
}
} catch (error) {
final grpcError = new GrpcError.internal(error.toString());
return grpcError;
}
return null;
}
void _startStreamingRequest() {
_incomingSubscription.pause();
_requests = _descriptor.createRequestStream(_incomingSubscription);

View File

@ -0,0 +1,13 @@
import '../shared/status.dart';
import 'call.dart';
import 'service.dart';
/// A gRPC Interceptor.
///
/// An interceptor is called before the corresponding [ServiceMethod] invocation.
/// If the interceptor returns a [GrpcError], the error will be returned as a response and [ServiceMethod] wouldn't be called.
/// If the interceptor throws [Exception], [GrpcError.internal] with exception.toString() will be returned.
/// If the interceptor returns null, the corresponding [ServiceMethod] of [Service] will be called.
typedef Interceptor = GrpcError Function(
ServiceCall call, ServiceMethod method);

View File

@ -21,6 +21,7 @@ import 'package:http2/transport.dart';
import '../shared/security.dart';
import 'handler.dart';
import 'interceptor.dart';
import 'service.dart';
class ServerTlsCredentials {
@ -57,13 +58,16 @@ class ServerTlsCredentials {
/// Listens for incoming RPCs, dispatching them to the right [Service] handler.
class Server {
final Map<String, Service> _services = {};
final List<Interceptor> _interceptors;
ServerSocket _insecureServer;
SecureServerSocket _secureServer;
final _connections = <ServerTransportConnection>[];
/// Create a server for the given [services].
Server(List<Service> services) {
Server(List<Service> services,
[List<Interceptor> interceptors = const <Interceptor>[]])
: _interceptors = interceptors {
for (final service in services) {
_services[service.$name] = service;
}
@ -110,7 +114,7 @@ class Server {
}
void serveStream(ServerTransportStream stream) {
new ServerHandler(lookupService, stream).handle();
new ServerHandler(lookupService, stream, _interceptors).handle();
}
Future<Null> shutdown() async {

View File

@ -1,6 +1,6 @@
name: grpc
description: Dart implementation of gRPC.
version: 0.4.1
version: 0.5.0
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/grpc-dart

View File

@ -277,4 +277,60 @@ void main() {
..toServer.close();
await harness.fromServer.done;
});
group('Server with interceptor', () {
test('processes calls if interceptor allows request', () async {
const expectedRequest = 5;
const expectedResponse = 7;
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
expect(await request, expectedRequest);
return expectedResponse;
}
GrpcError interceptorHandler(ServiceCall call, ServiceMethod method) {
if (method.name == "Unary") {
return null;
}
return new GrpcError.unauthenticated('Request is unauthenticated');
}
harness
..interceptor.handler = interceptorHandler
..service.unaryHandler = methodHandler
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
await harness.fromServer.done;
});
test('returns error if interceptor blocks request', () async {
GrpcError interceptorHandler(ServiceCall call, ServiceMethod method) {
if (method.name == "Unary") {
return new GrpcError.unauthenticated('Request is unauthenticated');
}
return null;
}
harness
..interceptor.handler = interceptorHandler
..expectErrorResponse(
StatusCode.unauthenticated, 'Request is unauthenticated')
..sendRequestHeader('/Test/Unary');
await harness.fromServer.done;
});
test('returns internal error if interceptor throws exception', () async {
GrpcError interceptorHandler(ServiceCall call, ServiceMethod method) {
throw new Exception('Reason is unknown');
}
harness
..interceptor.handler = interceptorHandler
..expectErrorResponse(
StatusCode.internal, 'Exception: Reason is unknown')
..sendRequestHeader('/Test/Unary');
await harness.fromServer.done;
});
});
}

View File

@ -78,6 +78,18 @@ class TestService extends Service {
}
}
class TestInterceptor {
Interceptor handler;
GrpcError call(ServiceCall call, ServiceMethod method) {
if (handler == null) {
return null;
}
return handler(call, method);
}
}
class TestServerStream extends ServerTransportStream {
final Stream<StreamMessage> incomingMessages;
final StreamSink<StreamMessage> outgoingMessages;
@ -107,10 +119,12 @@ class ServerHarness {
final toServer = new StreamController<StreamMessage>();
final fromServer = new StreamController<StreamMessage>();
final service = new TestService();
final interceptor = new TestInterceptor();
Server server;
ServerHarness() {
server = new Server([service]);
server = new Server(<Service>[service], <Interceptor>[interceptor]);
}
static ServiceMethod<int, int> createMethod(String name,