feat: support client interceptors (#338)

* Added support for client interceptors, which can be configured through
  Client constructor's interceptors parameter. Interceptors will be
  executed by Client.$createStreamingCall and Client.$createUnaryCall.
  Using interceptors requires regenerating client stubs using version 19.2.0 or
  newer of protobuf compiler plugin.
* Client.$createCall is deprecated because it does not invoke client
  interceptors.
This commit is contained in:
Zbigniew Mandziejewicz 2020-11-02 14:13:56 +04:00 committed by GitHub
parent 0589503800
commit 9f83e124e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 304 additions and 22 deletions

View File

@ -31,6 +31,10 @@ export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
export 'src/client/connection.dart' show ConnectionState; export 'src/client/connection.dart' show ConnectionState;
export 'src/client/http2_channel.dart' export 'src/client/http2_channel.dart'
show ClientChannel, ClientTransportConnectorChannel; show ClientChannel, ClientTransportConnectorChannel;
export 'src/client/interceptor.dart'
show ClientInterceptor, ClientUnaryInvoker, ClientStreamingInvoker;
export 'src/client/method.dart' show ClientMethod; export 'src/client/method.dart' show ClientMethod;
export 'src/client/options.dart' export 'src/client/options.dart'
show show

View File

@ -22,6 +22,8 @@ export 'src/client/call.dart' show CallOptions, MetadataProvider;
export 'src/client/channel.dart' show ClientChannel; export 'src/client/channel.dart' show ClientChannel;
export 'src/client/client.dart' show Client; export 'src/client/client.dart' show Client;
export 'src/client/common.dart' show ResponseFuture, ResponseStream; export 'src/client/common.dart' show ResponseFuture, ResponseStream;
export 'src/client/interceptor.dart'
show ClientInterceptor, ClientUnaryInvoker, ClientStreamingInvoker;
export 'src/client/method.dart' show ClientMethod; export 'src/client/method.dart' show ClientMethod;
export 'src/server/call.dart' show ServiceCall; export 'src/server/call.dart' show ServiceCall;
export 'src/server/service.dart' show Service, ServiceMethod; export 'src/server/service.dart' show Service, ServiceMethod;

View File

@ -15,19 +15,56 @@
import 'call.dart'; import 'call.dart';
import 'channel.dart'; import 'channel.dart';
import 'common.dart';
import 'interceptor.dart';
import 'method.dart'; import 'method.dart';
/// Base class for client stubs. /// Base class for client stubs.
class Client { class Client {
final ClientChannel _channel; final ClientChannel _channel;
final CallOptions _options; final CallOptions _options;
final List<ClientInterceptor> _interceptors;
Client(this._channel, {CallOptions options}) /// Interceptors will be applied in direct order before making a request.
: _options = options ?? CallOptions(); Client(this._channel,
{CallOptions options, Iterable<ClientInterceptor> interceptors})
: _options = options ?? CallOptions(),
_interceptors = List.unmodifiable(interceptors ?? Iterable.empty());
@Deprecated('createCall will be removed as it does not support interceptors')
ClientCall<Q, R> $createCall<Q, R>( ClientCall<Q, R> $createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, ClientMethod<Q, R> method, Stream<Q> requests,
{CallOptions options}) { {CallOptions options}) {
return _channel.createCall(method, requests, _options.mergedWith(options)); return _channel.createCall(method, requests, _options.mergedWith(options));
} }
ResponseFuture<R> $createUnaryCall<Q, R>(ClientMethod<Q, R> method, Q request,
{CallOptions options}) {
ClientUnaryInvoker<Q, R> invoker = (method, request, options) =>
ResponseFuture<R>(
_channel.createCall<Q, R>(method, Stream.value(request), options));
for (final interceptor in _interceptors.reversed) {
final delegate = invoker;
invoker = (method, request, options) =>
interceptor.interceptUnary<Q, R>(method, request, options, delegate);
}
return invoker(method, request, _options.mergedWith(options));
}
ResponseStream<R> $createStreamingCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests,
{CallOptions options}) {
ClientStreamingInvoker<Q, R> invoker = (method, request, options) =>
ResponseStream<R>(_channel.createCall<Q, R>(method, requests, options));
for (final interceptor in _interceptors.reversed) {
final delegate = invoker;
invoker = (method, requests, options) => interceptor
.interceptStreaming<Q, R>(method, requests, options, delegate);
}
return invoker(method, requests, _options.mergedWith(options));
}
} }

View File

@ -70,6 +70,8 @@ class ResponseStream<R> extends DelegatingStream<R>
final ClientCall<dynamic, R> _call; final ClientCall<dynamic, R> _call;
ResponseStream(this._call) : super(_call.response); ResponseStream(this._call) : super(_call.response);
ResponseFuture<R> get single => ResponseFuture(this._call);
} }
abstract class _ResponseMixin<Q, R> implements Response { abstract class _ResponseMixin<Q, R> implements Response {

View File

@ -0,0 +1,32 @@
import 'call.dart';
import 'common.dart';
import 'method.dart';
typedef ClientUnaryInvoker<Q, R> = ResponseFuture<R> Function(
ClientMethod method, Q request, CallOptions options);
typedef ClientStreamingInvoker<Q, R> = ResponseStream<R> Function(
ClientMethod method, Stream<Q> requests, CallOptions options);
/// ClientInterceptors intercepts client calls before they are executed.
///
/// Invoker either calls next interceptor in the chain or performs the call if it is last in chain.
/// To modify [CallOptions] make a clone using [CallOptions.mergedWith].
abstract class ClientInterceptor {
// Intercept unary call.
// This method is called when client sends single request and receives single response.
ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
return invoker(method, request, options);
}
// Intercept streaming call.
// This method is called when client sends either request or response stream.
ResponseStream<R> interceptStreaming<Q, R>(
ClientMethod<Q, R> method,
Stream<Q> requests,
CallOptions options,
ClientStreamingInvoker<Q, R> invoker) {
return invoker(method, requests, options);
}
}

View File

@ -16,9 +16,8 @@ class TestClient extends grpc.Client {
TestClient(ClientChannel channel) : super(channel); TestClient(ClientChannel channel) : super(channel);
grpc.ResponseStream<int> stream(int request, {grpc.CallOptions options}) { grpc.ResponseStream<int> stream(int request, {grpc.CallOptions options}) {
final call = return $createStreamingCall(_$stream, Stream.value(request),
$createCall(_$stream, Stream.fromIterable([request]), options: options); options: options);
return grpc.ResponseStream(call);
} }
} }

View File

@ -0,0 +1,208 @@
import 'package:grpc/grpc.dart';
import 'package:grpc/src/client/interceptor.dart';
import 'package:test/test.dart';
import 'package:http2/transport.dart';
import '../src/client_utils.dart';
import '../src/utils.dart';
class InterceptorInvocation {
final int id;
final int unary;
final int streaming;
InterceptorInvocation(this.id, this.unary, this.streaming);
String toString() {
return '{id: ${id}, unary: ${unary}, streaming: ${streaming}}';
}
}
class FakeInterceptor implements ClientInterceptor {
final int _id;
int _unary = 0;
int _streaming = 0;
static final List<InterceptorInvocation> _invocations = new List();
FakeInterceptor(this._id);
@override
ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
_invocations.add(InterceptorInvocation(_id, ++_unary, _streaming));
return invoker(method, request, _inject(options));
}
@override
ResponseStream<R> interceptStreaming<Q, R>(
ClientMethod<Q, R> method,
Stream<Q> requests,
CallOptions options,
ClientStreamingInvoker<Q, R> invoker) {
_invocations.add(InterceptorInvocation(_id, _unary, ++_streaming));
return invoker(method, requests, _inject(options));
}
CallOptions _inject(CallOptions options) {
return options.mergedWith(CallOptions(metadata: {
"x-interceptor": _invocations.map((i) => i.toString()).join(', '),
}));
}
static void tearDown() {
_invocations.clear();
}
}
main() {
test('single unary interceptor', () async {
final harness = ClientHarness()
..interceptors = [FakeInterceptor(1)]
..setUp();
const requestValue = 17;
const responseValue = 19;
void handleRequest(StreamMessage message) {
final data = validateDataMessage(message);
expect(mockDecode(data.data), requestValue);
harness
..sendResponseHeader()
..sendResponseValue(responseValue)
..sendResponseTrailer();
}
await harness.runTest(
clientCall: harness.client.unary(requestValue),
expectedResult: responseValue,
expectedPath: '/Test/Unary',
expectedCustomHeaders: {
"x-interceptor": "{id: 1, unary: 1, streaming: 0}"
},
serverHandlers: [handleRequest],
);
harness.tearDown();
FakeInterceptor.tearDown();
});
test('multiple unary interceptors', () async {
final harness = ClientHarness()
..interceptors = [FakeInterceptor(1), FakeInterceptor(2)]
..setUp();
const requestValue = 17;
const responseValue = 19;
void handleRequest(StreamMessage message) {
final data = validateDataMessage(message);
expect(mockDecode(data.data), requestValue);
harness
..sendResponseHeader()
..sendResponseValue(responseValue)
..sendResponseTrailer();
}
await harness.runTest(
clientCall: harness.client.unary(requestValue),
expectedResult: responseValue,
expectedPath: '/Test/Unary',
expectedCustomHeaders: {
"x-interceptor":
"{id: 1, unary: 1, streaming: 0}, {id: 2, unary: 1, streaming: 0}"
},
serverHandlers: [handleRequest],
);
harness.tearDown();
FakeInterceptor.tearDown();
});
test('single streaming interceptor', () async {
final harness = ClientHarness()
..interceptors = [FakeInterceptor(1)]
..setUp();
const requests = [1, 15, 7];
const responses = [3, 17, 9];
var index = 0;
void handleRequest(StreamMessage message) {
final data = validateDataMessage(message);
expect(mockDecode(data.data), requests[index]);
if (index == 0) {
harness.sendResponseHeader();
}
harness.sendResponseValue(responses[index]);
index++;
}
void handleDone() {
harness.sendResponseTrailer();
}
await harness.runTest(
clientCall:
harness.client.bidirectional(Stream.fromIterable(requests)).toList(),
expectedResult: responses,
expectedPath: '/Test/Bidirectional',
expectedCustomHeaders: {
"x-interceptor": "{id: 1, unary: 0, streaming: 1}"
},
serverHandlers: [handleRequest, handleRequest, handleRequest],
doneHandler: handleDone,
);
harness.tearDown();
FakeInterceptor.tearDown();
});
test('multiple streaming interceptors', () async {
final harness = ClientHarness()
..interceptors = [FakeInterceptor(1), FakeInterceptor(2)]
..setUp();
const requests = [1, 15, 7];
const responses = [3, 17, 9];
var index = 0;
void handleRequest(StreamMessage message) {
final data = validateDataMessage(message);
expect(mockDecode(data.data), requests[index]);
if (index == 0) {
harness.sendResponseHeader();
}
harness.sendResponseValue(responses[index]);
index++;
}
void handleDone() {
harness.sendResponseTrailer();
}
await harness.runTest(
clientCall:
harness.client.bidirectional(Stream.fromIterable(requests)).toList(),
expectedResult: responses,
expectedPath: '/Test/Bidirectional',
expectedCustomHeaders: {
"x-interceptor":
"{id: 1, unary: 0, streaming: 1}, {id: 2, unary: 0, streaming: 1}"
},
serverHandlers: [handleRequest, handleRequest, handleRequest],
doneHandler: handleDone,
);
harness.tearDown();
FakeInterceptor.tearDown();
});
}

View File

@ -14,9 +14,8 @@ class TestClient extends Client {
TestClient(api.ClientChannel channel) : super(channel); TestClient(api.ClientChannel channel) : super(channel);
ResponseStream<int> stream(int request, {CallOptions options}) { ResponseStream<int> stream(int request, {CallOptions options}) {
final call = return $createStreamingCall(_$stream, Stream.value(request),
$createCall(_$stream, Stream.fromIterable([request]), options: options); options: options);
return ResponseStream(call);
} }
} }

View File

@ -13,9 +13,8 @@ class TestClient extends grpc.Client {
TestClient(grpc.ClientChannel channel) : super(channel); TestClient(grpc.ClientChannel channel) : super(channel);
grpc.ResponseStream<int> infiniteStream(int request, grpc.ResponseStream<int> infiniteStream(int request,
{grpc.CallOptions options}) { {grpc.CallOptions options}) {
final call = $createCall(_$infiniteStream, Stream.fromIterable([request]), return $createStreamingCall(_$infiniteStream, Stream.value(request),
options: options); options: options);
return grpc.ResponseStream(call);
} }
} }

View File

@ -105,8 +105,10 @@ class TestClient extends Client {
final int Function(List<int> value) decode; final int Function(List<int> value) decode;
TestClient(base.ClientChannel channel, TestClient(base.ClientChannel channel,
{CallOptions options, this.decode: mockDecode}) {CallOptions options,
: super(channel, options: options) { Iterable<ClientInterceptor> interceptors,
this.decode: mockDecode})
: super(channel, options: options, interceptors: interceptors) {
_$unary = ClientMethod<int, int>('/Test/Unary', mockEncode, decode); _$unary = ClientMethod<int, int>('/Test/Unary', mockEncode, decode);
_$clientStreaming = _$clientStreaming =
ClientMethod<int, int>('/Test/ClientStreaming', mockEncode, decode); ClientMethod<int, int>('/Test/ClientStreaming', mockEncode, decode);
@ -117,27 +119,23 @@ class TestClient extends Client {
} }
ResponseFuture<int> unary(int request, {CallOptions options}) { ResponseFuture<int> unary(int request, {CallOptions options}) {
final call = return $createUnaryCall(_$unary, request, options: options);
$createCall(_$unary, Stream.fromIterable([request]), options: options);
return ResponseFuture(call);
} }
ResponseFuture<int> clientStreaming(Stream<int> request, ResponseFuture<int> clientStreaming(Stream<int> request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$clientStreaming, request, options: options); return $createStreamingCall(_$clientStreaming, request, options: options)
return ResponseFuture(call); .single;
} }
ResponseStream<int> serverStreaming(int request, {CallOptions options}) { ResponseStream<int> serverStreaming(int request, {CallOptions options}) {
final call = $createCall(_$serverStreaming, Stream.fromIterable([request]), return $createStreamingCall(_$serverStreaming, Stream.value(request),
options: options); options: options);
return ResponseStream(call);
} }
ResponseStream<int> bidirectional(Stream<int> request, ResponseStream<int> bidirectional(Stream<int> request,
{CallOptions options}) { {CallOptions options}) {
final call = $createCall(_$bidirectional, request, options: options); return $createStreamingCall(_$bidirectional, request, options: options);
return ResponseStream(call);
} }
} }
@ -197,6 +195,8 @@ abstract class _Harness {
StreamController<StreamMessage> fromClient; StreamController<StreamMessage> fromClient;
StreamController<StreamMessage> toClient; StreamController<StreamMessage> toClient;
Iterable<ClientInterceptor> interceptors;
TestClient client; TestClient client;
base.ClientChannel createChannel(); base.ClientChannel createChannel();
@ -216,7 +216,7 @@ abstract class _Harness {
when(transport.isOpen).thenReturn(true); when(transport.isOpen).thenReturn(true);
when(stream.outgoingMessages).thenReturn(fromClient.sink); when(stream.outgoingMessages).thenReturn(fromClient.sink);
when(stream.incomingMessages).thenAnswer((_) => toClient.stream); when(stream.incomingMessages).thenAnswer((_) => toClient.stream);
client = TestClient(channel); client = TestClient(channel, interceptors: interceptors);
} }
void tearDown() { void tearDown() {