grpc-web interceptors implementation

This commit is contained in:
Stanley Cheung 2020-04-20 16:09:44 -07:00 committed by Stanley Cheung
parent 319c2c6836
commit b62d3629c2
8 changed files with 260 additions and 17 deletions

View File

@ -98,6 +98,7 @@ closure_js_library(
":clientreadablestream",
":error",
":grpcwebclientreadablestream",
":interceptor",
":methodtype",
":requester",
":status",
@ -149,6 +150,19 @@ closure_js_library(
],
)
closure_js_library(
name = "interceptor",
srcs = [
"interceptor.js",
],
visibility = ["//visibility:public"],
deps = [
":clientreadablestream",
":requester",
":unaryresponse",
],
)
closure_js_library(
name = "metadata",
srcs = [
@ -230,7 +244,9 @@ closure_js_test(
"strictCheckTypes",
],
deps = [
":clientreadablestream",
":grpcwebclientbase",
":interceptor",
"@io_bazel_rules_closure//closure/library/crypt:base64",
"@io_bazel_rules_closure//closure/library/events",
"@io_bazel_rules_closure//closure/library/structs:map",

View File

@ -7,22 +7,22 @@ goog.module.declareLegacyNamespace();
/**
* The collection of runtime options for a new RPC call.
*
* @param {!Object<string, !Object>=} options
* @constructor
*/
const CallOptions = function() {
const CallOptions = function(options) {
/**
* @const {!Object<string, ?>}
* @const {!Object<string, !Object>}
* @private
*/
this.properties_ = {};
this.properties_ = options || {};
};
/**
* Add a new CallOption or override an existing one.
*
* @param {string} name name of the CallOption that should be added/overridden.
* @param {?} value value of the CallOption
* @param {!Object} value value of the CallOption
*/
CallOptions.prototype.setOption = function(name, value) {
this.properties_[name] = value;
@ -32,7 +32,7 @@ CallOptions.prototype.setOption = function(name, value) {
* Get the value of one CallOption.
*
* @param {string} name name of the CallOption.
* @return {?} value of the CallOption. If name doesn't exist, will return
* @return {!Object} value of the CallOption. If name doesn't exist, will return
* 'undefined'.
*/
CallOptions.prototype.get = function(name) {

View File

@ -40,6 +40,8 @@ const UnaryResponse = goog.require('grpc.web.UnaryResponse');
const XhrIo = goog.require('goog.net.XhrIo');
const googCrypt = goog.require('goog.crypt.base64');
const {Status} = goog.require('grpc.web.Status');
const {StreamInterceptor, UnaryInterceptor} = goog.require('grpc.web.Interceptor');
/**
@ -70,6 +72,20 @@ const GrpcWebClientBase = function(opt_options) {
*/
this.withCredentials_ =
goog.getObjectByName('withCredentials', opt_options) || false;
/**
* @const {!Array<!StreamInterceptor>}
* @private
*/
this.streamInterceptors_ =
goog.getObjectByName('streamInterceptors', opt_options) || [];
/**
* @const {!Array<!UnaryInterceptor>}
* @private
*/
this.unaryInterceptors_ =
goog.getObjectByName('unaryInterceptors', opt_options) || [];
};
@ -82,8 +98,11 @@ GrpcWebClientBase.prototype.rpcCall = function(
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.UNARY, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
var stream = this.startStream_(
methodDescriptor.createRequest(requestMessage, metadata), hostname);
var invoker = GrpcWebClientBase.runInterceptors_(
(request) => this.startStream_(request, hostname),
this.streamInterceptors_);
var stream = /** @type {!ClientReadableStream<?>} */ (invoker.call(
this, methodDescriptor.createRequest(requestMessage, metadata)));
GrpcWebClientBase.setCallback_(stream, callback, false);
return stream;
};
@ -98,8 +117,7 @@ GrpcWebClientBase.prototype.unaryCall = function(
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.UNARY, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
var request = methodDescriptor.createRequest(requestMessage, metadata);
var unaryResponse = new Promise((resolve, reject) => {
var initialInvoker = (request) => new Promise((resolve, reject) => {
var stream = this.startStream_(request, hostname);
var unaryMetadata;
var unaryStatus;
@ -119,6 +137,10 @@ GrpcWebClientBase.prototype.unaryCall = function(
}
}, true);
});
var invoker = GrpcWebClientBase.runInterceptors_(
initialInvoker, this.unaryInterceptors_);
var unaryResponse = /** @type {!Promise<?>} */ (invoker.call(
this, methodDescriptor.createRequest(requestMessage, metadata)));
return unaryResponse.then((response) => response.getResponseMessage());
};
@ -132,8 +154,11 @@ GrpcWebClientBase.prototype.serverStreaming = function(
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.SERVER_STREAMING, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
return this.startStream_(
methodDescriptor.createRequest(requestMessage, metadata), hostname);
var invoker = GrpcWebClientBase.runInterceptors_(
(request) => this.startStream_(request, hostname),
this.streamInterceptors_);
return /** @type {!ClientReadableStream<?>} */ (invoker.call(
this, methodDescriptor.createRequest(requestMessage, metadata)));
};
@ -293,5 +318,23 @@ GrpcWebClientBase.setCorsOverride_ = function(method, headerObject) {
method, HttpCors.HTTP_HEADERS_PARAM_NAME, headerObject));
};
/**
* @private
* @static
* @template REQUEST, RESPONSE
* @param {function(!Request<REQUEST,RESPONSE>):
* (!Promise<RESPONSE>|!ClientReadableStream<RESPONSE>)} invoker
* @param {!Array<!UnaryInterceptor|!StreamInterceptor>}
* interceptors
* @return {function(!Request<REQUEST,RESPONSE>):
* (!Promise<RESPONSE>|!ClientReadableStream<RESPONSE>)}
*/
GrpcWebClientBase.runInterceptors_ = function(invoker, interceptors) {
let curInvoker = invoker;
interceptors.forEach((interceptor) => {
const lastInvoker = curInvoker;
curInvoker = (request) => interceptor.intercept(request, lastInvoker);
});
return curInvoker;
};
exports = GrpcWebClientBase;

View File

@ -18,11 +18,13 @@
goog.module('grpc.web.GrpcWebClientBaseTest');
goog.setTestOnly('grpc.web.GrpcWebClientBaseTest');
const ClientReadableStream = goog.require('grpc.web.ClientReadableStream');
var GrpcWebClientBase = goog.require('grpc.web.GrpcWebClientBase');
var Map = goog.require('goog.structs.Map');
var googCrypt = goog.require('goog.crypt.base64');
var googEvents = goog.require('goog.events');
var testSuite = goog.require('goog.testing.testSuite');
const {StreamInterceptor} = goog.require('grpc.web.Interceptor');
goog.require('goog.testing.jsunit');
var REQUEST_BYTES = [1,2,3];
@ -79,6 +81,35 @@ testSuite({
dataCallback();
},
testStreamInterceptor: function() {
var interceptor = new StreamResponseInterceptor();
var client = new GrpcWebClientBase({streamInterceptors: [interceptor]});
client.newXhr_ = function() {
return new MockXhr({
// This parses to [ { DATA: [4,5,6] }, { TRAILER: "a: b" } ]
response: googCrypt.encodeByteArray(new Uint8Array(
[0, 0, 0, 0, 3, 4, 5, 6, 128, 0, 0, 0, 4, 97, 58, 32, 98])),
});
};
expectUnaryHeaders();
client.rpcCall(
FAKE_METHOD, {}, {}, {
requestSerializeFn: function(request) {
return REQUEST_BYTES;
},
responseDeserializeFn: function(bytes) {
assertElementsEquals([4, 5, 6], [].slice.call(bytes));
return {'field1': PROTO_FIELD_VALUE};
}
},
function(error, response) {
assertNull(error);
assertEquals('field2', response.field2);
});
dataCallback();
},
testRpcError: function() {
var client = new GrpcWebClientBase();
client.newXhr_ = function() {
@ -226,3 +257,44 @@ MockXhr.prototype.getLastError = function() {
MockXhr.prototype.setResponseType = function(responseType) {
return;
};
/**
* @constructor
* @implements {StreamInterceptor}
*/
const StreamResponseInterceptor = function() {};
/** @override */
StreamResponseInterceptor.prototype.intercept = function(request, invoker) {
/**
* @implements {ClientReadableStream}
* @constructor
* @param {!ClientReadableStream<RESPONSE>} stream
* @template RESPONSE
*/
const InterceptedStream = function(stream) {
this.stream = stream;
};
/** @override */
InterceptedStream.prototype.on = function(eventType, callback) {
if (eventType == 'data') {
const newCallback = (response) => {
response.field2 = 'field2';
callback(response);
};
this.stream.on(eventType, newCallback);
} else {
this.stream.on(eventType, callback);
}
return this;
};
/** @override */
InterceptedStream.prototype.cancel = function() {
this.stream.cancel();
return this;
};
return new InterceptedStream(invoker(request));
};

View File

@ -0,0 +1,50 @@
/**
* @fileoverview grpc-web Interceptor.
*/
goog.module('grpc.web.Interceptor');
goog.module.declareLegacyNamespace();
const ClientReadableStream = goog.require('grpc.web.ClientReadableStream');
const Request = goog.require('grpc.web.Request');
const UnaryResponse = goog.require('grpc.web.UnaryResponse');
/**
* @interface
*/
const UnaryInterceptor = function() {};
/**
* @export
* @abstract
* @template REQUEST, RESPONSE
* @param {!Request<REQUEST, RESPONSE>} request
* @param {function(!Request<REQUEST,RESPONSE>):!Promise<!UnaryResponse<RESPONSE>>}
* invoker
* @return {!Promise<!UnaryResponse<RESPONSE>>}
*/
UnaryInterceptor.prototype.intercept = function(request, invoker) {};
/**
* @interface
*/
const StreamInterceptor = function() {};
/**
* @export
* @abstract
* @template REQUEST, RESPONSE
* @param {!Request<REQUEST, RESPONSE>} request
* @param {function(!Request<REQUEST,RESPONSE>):!ClientReadableStream<RESPONSE>}
* invoker
* @return {!ClientReadableStream<RESPONSE>}
*/
StreamInterceptor.prototype.intercept = function(request, invoker) {};
exports = {
UnaryInterceptor,
StreamInterceptor
};

View File

@ -14,19 +14,29 @@ const MethodDescriptor = goog.requireType('grpc.web.MethodDescriptor');
* @template REQUEST, RESPONSE
*/
class Request {
/** @return {REQUEST} */
/**
* @export
* @return {REQUEST}
*/
getRequestMessage() {}
/** @return {!MethodDescriptor<REQUEST, RESPONSE>}*/
/**
* @export
* @return {!MethodDescriptor<REQUEST, RESPONSE>}
*/
getMethodDescriptor() {}
/** @return {!Metadata} */
/**
* @export
* @return {!Metadata}
*/
getMetadata() {}
/**
* Client CallOptions. Note that CallOptions has not been implemented in
* grpc.web.AbstractClientbase yet, but will be used in
* grpc.web.GenericClient.
* @export
* @return {!CallOptions|undefined}
*/
getCallOptions() {}

View File

@ -23,7 +23,34 @@ const {EchoApp} = require('../echoapp.js');
const grpc = {};
grpc.web = require('grpc-web');
var echoService = new EchoServiceClient('http://'+window.location.hostname+':8080', null, null);
/** Sample interceptor implementation */
const StreamResponseInterceptor = function() {};
StreamResponseInterceptor.prototype.intercept = function(request, invoker) {
const InterceptedStream = function(stream) {
this.stream = stream;
};
InterceptedStream.prototype.on = function(eventType, callback) {
if (eventType == 'data') {
const newCallback = (response) => {
response.setMessage('[Intcpt Resp1]'+response.getMessage());
callback(response);
};
this.stream.on(eventType, newCallback);
} else {
this.stream.on(eventType, callback);
}
return this;
};
var reqMsg = request.getRequestMessage();
reqMsg.setMessage('[Intcpt Req1]'+reqMsg.getMessage());
return new InterceptedStream(invoker(request));
};
var opts = {'streamInterceptors' : [new StreamResponseInterceptor()]};
var echoService = new EchoServiceClient('http://'+window.location.hostname+':8080', null,
null);
// opts);
var echoApp = new EchoApp(
echoService,

View File

@ -1 +1,26 @@
var module;
/**
* List of functions we want to preserve when running the closure compiler
* with --compilation_level=ADVANCED_OPTIMIZATIONS.
*/
module.ClientReadableStream = function() {};
module.ClientReadableStream.prototype.on = function(eventType, callback) {};
module.ClientReadableStream.prototype.cancel = function() {};
module.GenericClient = function() {};
module.GenericClient.prototype.unaryCall = function(request) {};
module.GenericClient.prototype.call = function(requestMessage,
methodDescriptor) {};
module.UnaryInterceptor = function() {};
module.UnaryInterceptor.prototype.intercept = function(request, invoker) {};
module.StreamInterceptor = function() {};
module.StreamInterceptor.prototype.intercept = function(request, invoker) {};
module.Request = function() {};
module.Request.prototype.getRequestMessage = function() {};
module.Request.prototype.getMethodDescriptor = function() {};
module.Request.prototype.getMetadata = function() {};
module.Request.prototype.getCallOptions = function() {};