mirror of https://github.com/grpc/grpc-dart.git
Add server interceptor acting as a middleware
This commit is contained in:
parent
5ba3745779
commit
1bacbe6101
|
@ -42,7 +42,8 @@ export 'src/client/proxy.dart' show Proxy;
|
||||||
export 'src/client/transport/http2_credentials.dart'
|
export 'src/client/transport/http2_credentials.dart'
|
||||||
show BadCertificateHandler, allowBadCertificates, ChannelCredentials;
|
show BadCertificateHandler, allowBadCertificates, ChannelCredentials;
|
||||||
export 'src/server/call.dart' show ServiceCall;
|
export 'src/server/call.dart' show ServiceCall;
|
||||||
export 'src/server/interceptor.dart' show Interceptor;
|
export 'src/server/interceptor.dart'
|
||||||
|
show Interceptor, ServerInterceptor, ServerStreamingInvoker;
|
||||||
export 'src/server/server.dart'
|
export 'src/server/server.dart'
|
||||||
show
|
show
|
||||||
ServerCredentials,
|
ServerCredentials,
|
||||||
|
|
|
@ -37,6 +37,7 @@ class ServerHandler extends ServiceCall {
|
||||||
final ServerTransportStream _stream;
|
final ServerTransportStream _stream;
|
||||||
final ServiceLookup _serviceLookup;
|
final ServiceLookup _serviceLookup;
|
||||||
final List<Interceptor> _interceptors;
|
final List<Interceptor> _interceptors;
|
||||||
|
final List<ServerInterceptor> _serverInterceptors;
|
||||||
final CodecRegistry? _codecRegistry;
|
final CodecRegistry? _codecRegistry;
|
||||||
final GrpcErrorHandler? _errorHandler;
|
final GrpcErrorHandler? _errorHandler;
|
||||||
|
|
||||||
|
@ -83,6 +84,7 @@ class ServerHandler extends ServiceCall {
|
||||||
required ServerTransportStream stream,
|
required ServerTransportStream stream,
|
||||||
required ServiceLookup serviceLookup,
|
required ServiceLookup serviceLookup,
|
||||||
required List<Interceptor> interceptors,
|
required List<Interceptor> interceptors,
|
||||||
|
required List<ServerInterceptor> serverInterceptors,
|
||||||
required CodecRegistry? codecRegistry,
|
required CodecRegistry? codecRegistry,
|
||||||
X509Certificate? clientCertificate,
|
X509Certificate? clientCertificate,
|
||||||
InternetAddress? remoteAddress,
|
InternetAddress? remoteAddress,
|
||||||
|
@ -94,7 +96,8 @@ class ServerHandler extends ServiceCall {
|
||||||
_codecRegistry = codecRegistry,
|
_codecRegistry = codecRegistry,
|
||||||
_clientCertificate = clientCertificate,
|
_clientCertificate = clientCertificate,
|
||||||
_remoteAddress = remoteAddress,
|
_remoteAddress = remoteAddress,
|
||||||
_errorHandler = errorHandler;
|
_errorHandler = errorHandler,
|
||||||
|
_serverInterceptors = serverInterceptors;
|
||||||
|
|
||||||
@override
|
@override
|
||||||
DateTime? get deadline => _deadline;
|
DateTime? get deadline => _deadline;
|
||||||
|
@ -239,7 +242,7 @@ class ServerHandler extends ServiceCall {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
_responses = _descriptor.handle(this, requests.stream);
|
_responses = _descriptor.handle(this, requests.stream, _serverInterceptors);
|
||||||
|
|
||||||
_responseSubscription = _responses.listen(_onResponse,
|
_responseSubscription = _responses.listen(_onResponse,
|
||||||
onError: _onResponseError,
|
onError: _onResponseError,
|
||||||
|
|
|
@ -27,3 +27,18 @@ import 'service.dart';
|
||||||
/// If the interceptor returns null, the corresponding [ServiceMethod] of [Service] will be called.
|
/// If the interceptor returns null, the corresponding [ServiceMethod] of [Service] will be called.
|
||||||
typedef Interceptor = FutureOr<GrpcError?> Function(
|
typedef Interceptor = FutureOr<GrpcError?> Function(
|
||||||
ServiceCall call, ServiceMethod method);
|
ServiceCall call, ServiceMethod method);
|
||||||
|
|
||||||
|
typedef ServerStreamingInvoker<Q, R> = Stream<R> Function(
|
||||||
|
ServiceCall call, ServiceMethod<Q, R> method, Stream<Q> requests);
|
||||||
|
|
||||||
|
/// A gRPC Interceptor.
|
||||||
|
///
|
||||||
|
/// An interceptor is called around the corresponding [ServiceMethod] invocation.
|
||||||
|
/// If the interceptor throws [GrpcError], the error will be returned as a response. [ServiceMethod] wouldn't be called if the error is thrown before calling the invoker.
|
||||||
|
/// If the interceptor modifies the provided stream, the invocation will continue with the provided stream.
|
||||||
|
abstract class ServerInterceptor {
|
||||||
|
Stream<R> intercept<Q, R>(ServiceCall call, ServiceMethod<Q, R> method,
|
||||||
|
Stream<Q> requests, ServerStreamingInvoker<Q, R> invoker) {
|
||||||
|
return invoker(call, method, requests);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -87,6 +87,7 @@ class ServerTlsCredentials extends ServerCredentials {
|
||||||
class ConnectionServer {
|
class ConnectionServer {
|
||||||
final Map<String, Service> _services = {};
|
final Map<String, Service> _services = {};
|
||||||
final List<Interceptor> _interceptors;
|
final List<Interceptor> _interceptors;
|
||||||
|
final List<ServerInterceptor> _serverInterceptors;
|
||||||
final CodecRegistry? _codecRegistry;
|
final CodecRegistry? _codecRegistry;
|
||||||
final GrpcErrorHandler? _errorHandler;
|
final GrpcErrorHandler? _errorHandler;
|
||||||
final ServerKeepAliveOptions _keepAliveOptions;
|
final ServerKeepAliveOptions _keepAliveOptions;
|
||||||
|
@ -100,11 +101,13 @@ class ConnectionServer {
|
||||||
ConnectionServer(
|
ConnectionServer(
|
||||||
List<Service> services, [
|
List<Service> services, [
|
||||||
List<Interceptor> interceptors = const <Interceptor>[],
|
List<Interceptor> interceptors = const <Interceptor>[],
|
||||||
|
List<ServerInterceptor> serverInterceptors = const <ServerInterceptor>[],
|
||||||
CodecRegistry? codecRegistry,
|
CodecRegistry? codecRegistry,
|
||||||
GrpcErrorHandler? errorHandler,
|
GrpcErrorHandler? errorHandler,
|
||||||
this._keepAliveOptions = const ServerKeepAliveOptions(),
|
this._keepAliveOptions = const ServerKeepAliveOptions(),
|
||||||
]) : _codecRegistry = codecRegistry,
|
]) : _codecRegistry = codecRegistry,
|
||||||
_interceptors = interceptors,
|
_interceptors = interceptors,
|
||||||
|
_serverInterceptors = serverInterceptors,
|
||||||
_errorHandler = errorHandler {
|
_errorHandler = errorHandler {
|
||||||
for (final service in services) {
|
for (final service in services) {
|
||||||
_services[service.$name] = service;
|
_services[service.$name] = service;
|
||||||
|
@ -168,6 +171,7 @@ class ConnectionServer {
|
||||||
stream: stream,
|
stream: stream,
|
||||||
serviceLookup: lookupService,
|
serviceLookup: lookupService,
|
||||||
interceptors: _interceptors,
|
interceptors: _interceptors,
|
||||||
|
serverInterceptors: _serverInterceptors,
|
||||||
codecRegistry: _codecRegistry,
|
codecRegistry: _codecRegistry,
|
||||||
// ignore: unnecessary_cast
|
// ignore: unnecessary_cast
|
||||||
clientCertificate: clientCertificate as io_bits.X509Certificate?,
|
clientCertificate: clientCertificate as io_bits.X509Certificate?,
|
||||||
|
@ -201,11 +205,13 @@ class Server extends ConnectionServer {
|
||||||
required List<Service> services,
|
required List<Service> services,
|
||||||
ServerKeepAliveOptions keepAliveOptions = const ServerKeepAliveOptions(),
|
ServerKeepAliveOptions keepAliveOptions = const ServerKeepAliveOptions(),
|
||||||
List<Interceptor> interceptors = const <Interceptor>[],
|
List<Interceptor> interceptors = const <Interceptor>[],
|
||||||
|
List<ServerInterceptor> serverInterceptors = const <ServerInterceptor>[],
|
||||||
CodecRegistry? codecRegistry,
|
CodecRegistry? codecRegistry,
|
||||||
GrpcErrorHandler? errorHandler,
|
GrpcErrorHandler? errorHandler,
|
||||||
}) : super(
|
}) : super(
|
||||||
services,
|
services,
|
||||||
interceptors,
|
interceptors,
|
||||||
|
serverInterceptors,
|
||||||
codecRegistry,
|
codecRegistry,
|
||||||
errorHandler,
|
errorHandler,
|
||||||
keepAliveOptions,
|
keepAliveOptions,
|
||||||
|
@ -308,6 +314,7 @@ class Server extends ConnectionServer {
|
||||||
stream: stream,
|
stream: stream,
|
||||||
serviceLookup: lookupService,
|
serviceLookup: lookupService,
|
||||||
interceptors: _interceptors,
|
interceptors: _interceptors,
|
||||||
|
serverInterceptors: _serverInterceptors,
|
||||||
codecRegistry: _codecRegistry,
|
codecRegistry: _codecRegistry,
|
||||||
// ignore: unnecessary_cast
|
// ignore: unnecessary_cast
|
||||||
clientCertificate: clientCertificate as io_bits.X509Certificate?,
|
clientCertificate: clientCertificate as io_bits.X509Certificate?,
|
||||||
|
|
|
@ -17,6 +17,7 @@ import 'dart:async';
|
||||||
|
|
||||||
import '../shared/status.dart';
|
import '../shared/status.dart';
|
||||||
import 'call.dart';
|
import 'call.dart';
|
||||||
|
import 'interceptor.dart';
|
||||||
|
|
||||||
/// Definition of a gRPC service method.
|
/// Definition of a gRPC service method.
|
||||||
class ServiceMethod<Q, R> {
|
class ServiceMethod<Q, R> {
|
||||||
|
@ -48,19 +49,39 @@ class ServiceMethod<Q, R> {
|
||||||
|
|
||||||
List<int> serialize(dynamic response) => responseSerializer(response as R);
|
List<int> serialize(dynamic response) => responseSerializer(response as R);
|
||||||
|
|
||||||
Stream<R> handle(ServiceCall call, Stream<Q> requests) {
|
ServerStreamingInvoker<Q, R> _createCall() => ((
|
||||||
if (streamingResponse) {
|
ServiceCall call,
|
||||||
if (streamingRequest) {
|
ServiceMethod<Q, R> method,
|
||||||
return handler(call, requests);
|
Stream<Q> requests,
|
||||||
} else {
|
) {
|
||||||
return handler(call, _toSingleFuture(requests));
|
if (streamingResponse) {
|
||||||
}
|
if (streamingRequest) {
|
||||||
} else {
|
return handler(call, requests);
|
||||||
final response = streamingRequest
|
} else {
|
||||||
? handler(call, requests)
|
return handler(call, _toSingleFuture(requests));
|
||||||
: handler(call, _toSingleFuture(requests));
|
}
|
||||||
return response.asStream();
|
} else {
|
||||||
|
final response = streamingRequest
|
||||||
|
? handler(call, requests)
|
||||||
|
: handler(call, _toSingleFuture(requests));
|
||||||
|
return response.asStream();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Stream<R> handle(
|
||||||
|
ServiceCall call,
|
||||||
|
Stream<Q> requests,
|
||||||
|
List<ServerInterceptor> interceptors,
|
||||||
|
) {
|
||||||
|
var invoker = _createCall();
|
||||||
|
|
||||||
|
for (final interceptor in interceptors.reversed) {
|
||||||
|
final delegate = invoker;
|
||||||
|
invoker = (call, method, requests) =>
|
||||||
|
interceptor.intercept<Q, R>(call, method, requests, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return invoker(call, this, requests);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Q> _toSingleFuture(Stream<Q> stream) {
|
Future<Q> _toSingleFuture(Stream<Q> stream) {
|
||||||
|
|
|
@ -384,4 +384,221 @@ void main() {
|
||||||
await harness.fromServer.done;
|
await harness.fromServer.done;
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
group('Server with server interceptor', () {
|
||||||
|
group('processes calls if interceptor allows request', () {
|
||||||
|
const expectedRequest = 5;
|
||||||
|
const expectedResponse = 7;
|
||||||
|
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
|
||||||
|
expect(await request, expectedRequest);
|
||||||
|
return expectedResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
Null interceptor(call, method, requests) {
|
||||||
|
if (method.name == 'Unary') {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
throw GrpcError.unauthenticated('Request is unauthenticated');
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> doTest(TestServerInterceptorOnStart? handler) async {
|
||||||
|
harness
|
||||||
|
..serverInterceptor.onStart = handler
|
||||||
|
..service.unaryHandler = methodHandler
|
||||||
|
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
|
||||||
|
|
||||||
|
await harness.fromServer.done;
|
||||||
|
}
|
||||||
|
|
||||||
|
test('with sync interceptor', () => doTest(interceptor));
|
||||||
|
test(
|
||||||
|
'with async interceptor',
|
||||||
|
() => doTest((call, method, requests) async =>
|
||||||
|
interceptor(call, method, requests)));
|
||||||
|
});
|
||||||
|
|
||||||
|
group('returns error if interceptor blocks request', () {
|
||||||
|
Null interceptor(call, method, requests) {
|
||||||
|
if (method.name == 'Unary') {
|
||||||
|
throw GrpcError.unauthenticated('Request is unauthenticated');
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> doTest(TestServerInterceptorOnStart handler) async {
|
||||||
|
harness
|
||||||
|
..serverInterceptor.onStart = handler
|
||||||
|
..expectErrorResponse(
|
||||||
|
StatusCode.unauthenticated, 'Request is unauthenticated')
|
||||||
|
..sendRequestHeader('/Test/Unary');
|
||||||
|
|
||||||
|
await harness.fromServer.done;
|
||||||
|
}
|
||||||
|
|
||||||
|
test('with sync interceptor', () => doTest(interceptor));
|
||||||
|
test(
|
||||||
|
'with async interceptor',
|
||||||
|
() => doTest((call, method, request) async =>
|
||||||
|
interceptor(call, method, request)));
|
||||||
|
});
|
||||||
|
|
||||||
|
test("don't fail if interceptor await 2 times", () async {
|
||||||
|
Future<Null> interceptor(call, method, requests) async {
|
||||||
|
await Future.value();
|
||||||
|
await Future.value();
|
||||||
|
throw GrpcError.internal('Reason is unknown');
|
||||||
|
}
|
||||||
|
|
||||||
|
harness
|
||||||
|
..serverInterceptor.onStart = interceptor
|
||||||
|
..expectErrorResponse(StatusCode.internal, 'Reason is unknown')
|
||||||
|
..sendRequestHeader('/Test/Unary')
|
||||||
|
..sendData(1);
|
||||||
|
|
||||||
|
await harness.fromServer.done;
|
||||||
|
});
|
||||||
|
|
||||||
|
group('serviceInterceptors are invoked', () {
|
||||||
|
const expectedRequest = 5;
|
||||||
|
const expectedResponse = 7;
|
||||||
|
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
|
||||||
|
expect(await request, expectedRequest);
|
||||||
|
return expectedResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> doTest(List<TestServerInterceptor> interceptors) async {
|
||||||
|
harness
|
||||||
|
// ↓ mutation: Server is already built
|
||||||
|
..serverInterceptors.addAll(interceptors)
|
||||||
|
..service.unaryHandler = methodHandler
|
||||||
|
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
|
||||||
|
|
||||||
|
await harness.fromServer.done;
|
||||||
|
}
|
||||||
|
|
||||||
|
test('single serviceInterceptor is invoked', () async {
|
||||||
|
final invocationsOrderRecords = [];
|
||||||
|
|
||||||
|
await doTest([
|
||||||
|
TestServerInterceptor(
|
||||||
|
onStart: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Start');
|
||||||
|
},
|
||||||
|
onData: (call, method, requests, data) {
|
||||||
|
invocationsOrderRecords.add('Data [$data]');
|
||||||
|
},
|
||||||
|
onFinish: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Done');
|
||||||
|
},
|
||||||
|
)
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(invocationsOrderRecords, equals(['Start', 'Data [7]', 'Done']));
|
||||||
|
});
|
||||||
|
|
||||||
|
test('multiple serviceInterceptors are invoked', () async {
|
||||||
|
final invocationsOrderRecords = [];
|
||||||
|
|
||||||
|
await doTest([
|
||||||
|
TestServerInterceptor(
|
||||||
|
onStart: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Start 1');
|
||||||
|
},
|
||||||
|
onData: (call, method, requests, data) {
|
||||||
|
invocationsOrderRecords.add('Data 1 [$data]');
|
||||||
|
},
|
||||||
|
onFinish: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Done 1');
|
||||||
|
},
|
||||||
|
),
|
||||||
|
TestServerInterceptor(
|
||||||
|
onStart: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Start 2');
|
||||||
|
},
|
||||||
|
onData: (call, method, requests, data) {
|
||||||
|
invocationsOrderRecords.add('Data 2 [$data]');
|
||||||
|
},
|
||||||
|
onFinish: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Done 2');
|
||||||
|
},
|
||||||
|
)
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(
|
||||||
|
invocationsOrderRecords,
|
||||||
|
equals([
|
||||||
|
'Start 1',
|
||||||
|
'Start 2',
|
||||||
|
'Data 2 [7]',
|
||||||
|
'Data 1 [7]',
|
||||||
|
'Done 2',
|
||||||
|
'Done 1',
|
||||||
|
]));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('can modify response', () async {
|
||||||
|
const expectedRequest = 5;
|
||||||
|
const baseResponse = 7;
|
||||||
|
const expectedResponse = 14;
|
||||||
|
|
||||||
|
final invocationsOrderRecords = [];
|
||||||
|
|
||||||
|
final interceptors = [
|
||||||
|
TestServerInterceptor(
|
||||||
|
onStart: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Start 1');
|
||||||
|
},
|
||||||
|
onData: (call, method, requests, data) {
|
||||||
|
invocationsOrderRecords.add('Data 1 [$data]');
|
||||||
|
},
|
||||||
|
onFinish: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Done 1');
|
||||||
|
},
|
||||||
|
),
|
||||||
|
TestServerInterruptingInterceptor(transform: <R>(value) {
|
||||||
|
if (value is int) {
|
||||||
|
return value * 2 as R;
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}),
|
||||||
|
TestServerInterceptor(
|
||||||
|
onStart: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Start 2');
|
||||||
|
},
|
||||||
|
onData: (call, method, requests, data) {
|
||||||
|
invocationsOrderRecords.add('Data 2 [$data]');
|
||||||
|
},
|
||||||
|
onFinish: (call, method, requests) {
|
||||||
|
invocationsOrderRecords.add('Done 2');
|
||||||
|
},
|
||||||
|
)
|
||||||
|
];
|
||||||
|
|
||||||
|
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
|
||||||
|
expect(await request, expectedRequest);
|
||||||
|
return baseResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
harness
|
||||||
|
// ↓ mutation: Server is already built
|
||||||
|
..serverInterceptors.addAll(interceptors)
|
||||||
|
..service.unaryHandler = methodHandler
|
||||||
|
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
|
||||||
|
|
||||||
|
await harness.fromServer.done;
|
||||||
|
|
||||||
|
expect(
|
||||||
|
invocationsOrderRecords,
|
||||||
|
equals([
|
||||||
|
'Start 1',
|
||||||
|
'Start 2',
|
||||||
|
'Data 2 [7]',
|
||||||
|
'Data 1 [14]',
|
||||||
|
'Done 2',
|
||||||
|
'Done 1',
|
||||||
|
]));
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ import 'dart:async';
|
||||||
|
|
||||||
import 'package:grpc/grpc.dart';
|
import 'package:grpc/grpc.dart';
|
||||||
import 'package:grpc/src/client/http2_connection.dart';
|
import 'package:grpc/src/client/http2_connection.dart';
|
||||||
|
import 'package:grpc/src/server/interceptor.dart';
|
||||||
import 'package:grpc/src/shared/message.dart';
|
import 'package:grpc/src/shared/message.dart';
|
||||||
import 'package:http2/transport.dart';
|
import 'package:http2/transport.dart';
|
||||||
import 'package:test/test.dart';
|
import 'package:test/test.dart';
|
||||||
|
@ -90,6 +91,47 @@ class TestInterceptor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef TestServerInterceptorOnStart = Function(
|
||||||
|
ServiceCall call, ServiceMethod method, Stream requests);
|
||||||
|
typedef TestServerInterceptorOnData = Function(
|
||||||
|
ServiceCall call, ServiceMethod method, Stream requests, dynamic data);
|
||||||
|
typedef TestServerInterceptorOnFinish = Function(
|
||||||
|
ServiceCall call, ServiceMethod method, Stream requests);
|
||||||
|
|
||||||
|
class TestServerInterceptor extends ServerInterceptor {
|
||||||
|
TestServerInterceptorOnStart? onStart;
|
||||||
|
TestServerInterceptorOnData? onData;
|
||||||
|
TestServerInterceptorOnFinish? onFinish;
|
||||||
|
|
||||||
|
TestServerInterceptor({this.onStart, this.onData, this.onFinish});
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<R> intercept<Q, R>(ServiceCall call, ServiceMethod<Q, R> method,
|
||||||
|
Stream<Q> requests, ServerStreamingInvoker<Q, R> invoker) async* {
|
||||||
|
await onStart?.call(call, method, requests);
|
||||||
|
|
||||||
|
await for (final chunk
|
||||||
|
in super.intercept(call, method, requests, invoker)) {
|
||||||
|
await onData?.call(call, method, requests, chunk);
|
||||||
|
yield chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
await onFinish?.call(call, method, requests);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestServerInterruptingInterceptor extends ServerInterceptor {
|
||||||
|
final R Function<R>(R) transform;
|
||||||
|
|
||||||
|
TestServerInterruptingInterceptor({required this.transform});
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<R> intercept<Q, R>(ServiceCall call, ServiceMethod<Q, R> method,
|
||||||
|
Stream<Q> requests, ServerStreamingInvoker<Q, R> invoker) async* {
|
||||||
|
yield* super.intercept(call, method, requests, invoker).map(transform);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class TestServerStream extends ServerTransportStream {
|
class TestServerStream extends ServerTransportStream {
|
||||||
@override
|
@override
|
||||||
final Stream<StreamMessage> incomingMessages;
|
final Stream<StreamMessage> incomingMessages;
|
||||||
|
@ -123,6 +165,7 @@ class ServerHarness extends _Harness {
|
||||||
ConnectionServer createServer() => Server.create(
|
ConnectionServer createServer() => Server.create(
|
||||||
services: <Service>[service],
|
services: <Service>[service],
|
||||||
interceptors: <Interceptor>[interceptor.call],
|
interceptors: <Interceptor>[interceptor.call],
|
||||||
|
serverInterceptors: serverInterceptors..insert(0, serverInterceptor),
|
||||||
);
|
);
|
||||||
|
|
||||||
static ServiceMethod<int, int> createMethod(String name,
|
static ServiceMethod<int, int> createMethod(String name,
|
||||||
|
@ -161,6 +204,10 @@ abstract class _Harness {
|
||||||
final fromServer = StreamController<StreamMessage>();
|
final fromServer = StreamController<StreamMessage>();
|
||||||
final service = TestService();
|
final service = TestService();
|
||||||
final interceptor = TestInterceptor();
|
final interceptor = TestInterceptor();
|
||||||
|
final serverInterceptor = TestServerInterceptor();
|
||||||
|
|
||||||
|
final serverInterceptors = <ServerInterceptor>[];
|
||||||
|
|
||||||
ConnectionServer? _server;
|
ConnectionServer? _server;
|
||||||
|
|
||||||
ConnectionServer createServer();
|
ConnectionServer createServer();
|
||||||
|
|
Loading…
Reference in New Issue