Some refactoring prework before interceptors

This commit is contained in:
Stanley Cheung 2020-04-07 23:04:01 -07:00 committed by Stanley Cheung
parent 6e2a1877da
commit b0ea6a1e55
3 changed files with 194 additions and 83 deletions

View File

@ -29,10 +29,14 @@ closure_js_library(
srcs = [ srcs = [
"abstractclientbase.js", "abstractclientbase.js",
], ],
suppress = [
"reportUnknownTypes",
],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
":clientreadablestream", ":clientreadablestream",
":error", ":error",
":methodtype",
":requester", ":requester",
], ],
) )
@ -91,8 +95,14 @@ closure_js_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
":abstractclientbase", ":abstractclientbase",
":clientreadablestream",
":error",
":grpcwebclientreadablestream", ":grpcwebclientreadablestream",
":methodtype",
":requester",
":status",
":statuscode", ":statuscode",
":unaryresponse",
"@io_bazel_rules_closure//closure/library/crypt:base64", "@io_bazel_rules_closure//closure/library/crypt:base64",
"@io_bazel_rules_closure//closure/library/net:xhrio", "@io_bazel_rules_closure//closure/library/net:xhrio",
"@io_bazel_rules_closure//closure/library/net/rpc:httpcors", "@io_bazel_rules_closure//closure/library/net/rpc:httpcors",
@ -189,6 +199,21 @@ closure_js_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
) )
closure_js_library(
name = "unaryresponse",
srcs = [
"unaryresponse.js",
],
suppress = [
"reportUnknownTypes",
],
visibility = ["//visibility:public"],
deps = [
":metadata",
":status",
],
)
closure_js_test( closure_js_test(
name = "grpcwebclientbase_test", name = "grpcwebclientbase_test",
srcs = [ srcs = [

View File

@ -30,6 +30,7 @@ goog.module.declareLegacyNamespace();
const ClientReadableStream = goog.require('grpc.web.ClientReadableStream'); const ClientReadableStream = goog.require('grpc.web.ClientReadableStream');
const Error = goog.require('grpc.web.Error'); const Error = goog.require('grpc.web.Error');
const MethodDescriptor = goog.require('grpc.web.MethodDescriptor'); const MethodDescriptor = goog.require('grpc.web.MethodDescriptor');
const MethodType = goog.require('grpc.web.MethodType');
/** /**
@ -67,6 +68,7 @@ AbstractClientBase.MethodInfo = function(
/** /**
* @abstract
* @template REQUEST, RESPONSE * @template REQUEST, RESPONSE
* Even with ?RESPONSE the RESPONSE will still be inferred as * Even with ?RESPONSE the RESPONSE will still be inferred as
* "FooResponse|Null". Use RESPONSE_LEAN to extract out the "FooResponse" * "FooResponse|Null". Use RESPONSE_LEAN to extract out the "FooResponse"
@ -79,44 +81,85 @@ AbstractClientBase.MethodInfo = function(
* X)))) * X))))
* =: * =:
* @param {string} method The method to invoke * @param {string} method The method to invoke
* @param {REQUEST} request The request proto * @param {REQUEST} requestMessage The request proto
* @param {!Object<string, string>} metadata User defined call metadata * @param {!Object<string, string>} metadata User defined call metadata
* @param {!AbstractClientBase.MethodInfo<REQUEST, * @param {!MethodDescriptor<REQUEST, RESPONSE_LEAN>|
* RESPONSE_LEAN>|!MethodDescriptor<REQUEST, RESPONSE_LEAN>} * !AbstractClientBase.MethodInfo<REQUEST, RESPONSE_LEAN>}
* methodInfo Information of this RPC method * methodDescriptor Information of this RPC method
* @param {function(?Error, ?RESPONSE)} * @param {function(?Error, ?RESPONSE)}
* callback A callback function which takes (error, response) * callback A callback function which takes (error, response)
* @return {!ClientReadableStream<RESPONSE_LEAN>|undefined} * @return {!ClientReadableStream<RESPONSE_LEAN>|undefined}
* The Client Readable Stream * The Client Readable Stream
*/ */
AbstractClientBase.prototype.rpcCall = goog.abstractMethod; AbstractClientBase.prototype.rpcCall = function(
method, requestMessage, metadata, methodDescriptor, callback) {};
/** /**
* @abstract
* @template REQUEST, RESPONSE * @template REQUEST, RESPONSE
* @param {string} method The method to invoke * @param {string} method The method to invoke
* @param {REQUEST} request The request proto * @param {REQUEST} requestMessage The request proto
* @param {!Object<string, string>} metadata User defined call metadata * @param {!Object<string, string>} metadata User defined call metadata
* @param {!AbstractClientBase.MethodInfo<REQUEST, * @param {!MethodDescriptor<REQUEST, RESPONSE>|
* RESPONSE>|!MethodDescriptor<REQUEST, RESPONSE>} * !AbstractClientBase.MethodInfo<REQUEST,RESPONSE>}
* methodInfo Information of this RPC method * methodDescriptor Information of this RPC method
* @return {!Promise<!RESPONSE>} * @return {!Promise<!RESPONSE>}
* A promise that resolves to the response message * A promise that resolves to the response message
*/ */
AbstractClientBase.prototype.unaryCall = goog.abstractMethod; AbstractClientBase.prototype.unaryCall = function(
method, requestMessage, metadata, methodDescriptor) {};
/** /**
* @abstract
* @template REQUEST, RESPONSE * @template REQUEST, RESPONSE
* @param {string} method The method to invoke * @param {string} method The method to invoke
* @param {REQUEST} request The request proto * @param {REQUEST} requestMessage The request proto
* @param {!Object<string, string>} metadata User defined call metadata * @param {!Object<string, string>} metadata User defined call metadata
* @param {!AbstractClientBase.MethodInfo<REQUEST, * @param {!MethodDescriptor<REQUEST, RESPONSE>|
* RESPONSE>|!MethodDescriptor<REQUEST, RESPONSE>} * !AbstractClientBase.MethodInfo<REQUEST,RESPONSE>}
* methodInfo Information of this RPC method * methodDescriptor Information of this RPC method
* @return {!ClientReadableStream<RESPONSE>} The Client Readable Stream * @return {!ClientReadableStream<RESPONSE>} The Client Readable Stream
*/ */
AbstractClientBase.prototype.serverStreaming = goog.abstractMethod; AbstractClientBase.prototype.serverStreaming = function(
method, requestMessage, metadata, methodDescriptor) {};
/**
* As MethodType is being deprecated, for now we need to convert MethodType to
* MethodDescriptor.
* @static
* @template REQUEST, RESPONSE
* @param {string} method
* @param {REQUEST} requestMessage
* @param {!MethodType} methodType
* @param {!AbstractClientBase.MethodInfo<REQUEST,RESPONSE>|!MethodDescriptor<REQUEST,RESPONSE>}
* methodInfo
* @return {!MethodDescriptor<REQUEST,RESPONSE>}
*/
AbstractClientBase.ensureMethodDescriptor = function(
method, requestMessage, methodType, methodInfo) {
if (methodInfo instanceof MethodDescriptor) {
return methodInfo;
}
const requestType = methodInfo.requestType || requestMessage.constructor;
return new MethodDescriptor(
method, methodType, requestType, methodInfo.responseType,
methodInfo.requestSerializeFn, methodInfo.responseDeserializeFn);
};
/**
* Get the hostname of the current request.
* @static
* @template REQUEST, RESPONSE
* @param {string} method
* @param {!MethodDescriptor<REQUEST,RESPONSE>} methodDescriptor
* @return {string}
*/
AbstractClientBase.getHostname = function(method, methodDescriptor) {
// method = hostname + methodDescriptor.name(relative path of this method)
return method.substr(0, method.length - methodDescriptor.name.length);
};

View File

@ -29,11 +29,17 @@ goog.module.declareLegacyNamespace();
const AbstractClientBase = goog.require('grpc.web.AbstractClientBase'); const AbstractClientBase = goog.require('grpc.web.AbstractClientBase');
const ClientReadableStream = goog.require('grpc.web.ClientReadableStream');
const Error = goog.require('grpc.web.Error');
const GrpcWebClientReadableStream = goog.require('grpc.web.GrpcWebClientReadableStream'); const GrpcWebClientReadableStream = goog.require('grpc.web.GrpcWebClientReadableStream');
const HttpCors = goog.require('goog.net.rpc.HttpCors'); const HttpCors = goog.require('goog.net.rpc.HttpCors');
const MethodType = goog.require('grpc.web.MethodType');
const Request = goog.require('grpc.web.Request');
const StatusCode = goog.require('grpc.web.StatusCode'); const StatusCode = goog.require('grpc.web.StatusCode');
const UnaryResponse = goog.require('grpc.web.UnaryResponse');
const XhrIo = goog.require('goog.net.XhrIo'); const XhrIo = goog.require('goog.net.XhrIo');
const googCrypt = goog.require('goog.crypt.base64'); const googCrypt = goog.require('goog.crypt.base64');
const {Status} = goog.require('grpc.web.Status');
/** /**
@ -72,56 +78,13 @@ const GrpcWebClientBase = function(opt_options) {
* @export * @export
*/ */
GrpcWebClientBase.prototype.rpcCall = function( GrpcWebClientBase.prototype.rpcCall = function(
method, request, metadata, methodInfo, callback) { method, requestMessage, metadata, methodDescriptor, callback) {
var xhr = this.newXhr_(); methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
xhr.setWithCredentials(this.withCredentials_); method, requestMessage, MethodType.UNARY, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
var genericTransportInterface = { var stream = this.startStream_(
xhr: xhr, methodDescriptor.createRequest(requestMessage, metadata), hostname);
}; GrpcWebClientBase.setCallback_(stream, callback, false);
var stream = new GrpcWebClientReadableStream(genericTransportInterface);
stream.setResponseDeserializeFn(methodInfo.responseDeserializeFn);
stream.on('data', function(response) {
callback(null, response);
});
stream.on('status', function(status) {
if (status.code != StatusCode.OK) {
callback({
code: status.code,
message: status.details,
metadata: status.metadata
}, null);
}
});
stream.on('error', function(error) {
if (error.code != StatusCode.OK) {
callback({
code: error.code,
message: error.message,
metadata: error.metadata
}, null);
}
});
xhr.headers.addAll(metadata);
this.processHeaders_(xhr);
if (this.suppressCorsPreflight_) {
var headerObject = xhr.headers.toObject();
xhr.headers.clear();
method = GrpcWebClientBase.setCorsOverride_(method, headerObject);
}
var serialized = methodInfo.requestSerializeFn(request);
var payload = this.encodeRequest_(serialized);
if (this.format_ == "text") {
payload = googCrypt.encodeByteArray(payload);
} else if (this.format_ == "binary") {
xhr.setResponseType(XhrIo.ResponseType.ARRAY_BUFFER);
}
xhr.send(method, 'POST', payload);
return stream; return stream;
}; };
@ -131,12 +94,32 @@ GrpcWebClientBase.prototype.rpcCall = function(
* @export * @export
*/ */
GrpcWebClientBase.prototype.unaryCall = function( GrpcWebClientBase.prototype.unaryCall = function(
method, request, metadata, methodInfo) { method, requestMessage, metadata, methodDescriptor) {
return new Promise((resolve, reject) => { methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
this.rpcCall(method, request, metadata, methodInfo, (error, response) => { method, requestMessage, MethodType.UNARY, methodDescriptor);
error ? reject(error) : resolve(response); var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
}); var request = methodDescriptor.createRequest(requestMessage, metadata);
var unaryResponse = new Promise((resolve, reject) => {
var stream = this.startStream_(request, hostname);
var unaryMetadata;
var unaryStatus;
var unaryMsg;
GrpcWebClientBase.setCallback_(
stream, (error, response, status, metadata) => {
if (error) {
reject(error);
} else if (response) {
unaryMsg = response;
} else if (status) {
unaryStatus = status;
} else if (metadata) {
unaryMetadata = metadata;
} else {
resolve(new UnaryResponse(unaryMsg, unaryMetadata, unaryStatus));
}
}, true);
}); });
return unaryResponse.then((response) => response.getResponseMessage());
}; };
@ -145,7 +128,26 @@ GrpcWebClientBase.prototype.unaryCall = function(
* @export * @export
*/ */
GrpcWebClientBase.prototype.serverStreaming = function( GrpcWebClientBase.prototype.serverStreaming = function(
method, request, metadata, methodInfo) { method, requestMessage, metadata, methodDescriptor) {
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.SERVER_STREAMING, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
return this.startStream_(
methodDescriptor.createRequest(requestMessage, metadata), hostname);
};
/**
* @private
* @template REQUEST, RESPONSE
* @param {!Request<REQUEST, RESPONSE>} request
* @param {string} hostname
* @return {!ClientReadableStream<RESPONSE>}
*/
GrpcWebClientBase.prototype.startStream_ = function(request, hostname) {
var methodDescriptor = request.getMethodDescriptor();
var path = hostname + methodDescriptor.name;
var xhr = this.newXhr_(); var xhr = this.newXhr_();
xhr.setWithCredentials(this.withCredentials_); xhr.setWithCredentials(this.withCredentials_);
@ -153,29 +155,74 @@ GrpcWebClientBase.prototype.serverStreaming = function(
xhr: xhr, xhr: xhr,
}; };
var stream = new GrpcWebClientReadableStream(genericTransportInterface); var stream = new GrpcWebClientReadableStream(genericTransportInterface);
stream.setResponseDeserializeFn(methodInfo.responseDeserializeFn); stream.setResponseDeserializeFn(methodDescriptor.responseDeserializeFn);
xhr.headers.addAll(metadata); xhr.headers.addAll(request.getMetadata());
this.processHeaders_(xhr); this.processHeaders_(xhr);
if (this.suppressCorsPreflight_) { if (this.suppressCorsPreflight_) {
var headerObject = xhr.headers.toObject(); var headerObject = xhr.headers.toObject();
xhr.headers.clear(); xhr.headers.clear();
method = GrpcWebClientBase.setCorsOverride_(method, headerObject); path = GrpcWebClientBase.setCorsOverride_(path, headerObject);
} }
var serialized = methodInfo.requestSerializeFn(request); var serialized =
methodDescriptor.requestSerializeFn(request.getRequestMessage());
var payload = this.encodeRequest_(serialized); var payload = this.encodeRequest_(serialized);
if (this.format_ == "text") { if (this.format_ == 'text') {
payload = googCrypt.encodeByteArray(payload); payload = googCrypt.encodeByteArray(payload);
} else if (this.format_ == "binary") { } else if (this.format_ == 'binary') {
xhr.setResponseType(XhrIo.ResponseType.ARRAY_BUFFER); xhr.setResponseType(XhrIo.ResponseType.ARRAY_BUFFER);
} }
xhr.send(method, 'POST', payload); xhr.send(path, 'POST', payload);
return stream; return stream;
}; };
/**
* @private
* @static
* @template RESPONSE
* @param {!ClientReadableStream<RESPONSE>} stream
* @param {function(?Error, ?RESPONSE, ?Status=, ?Metadata=)|
* function(?Error,?RESPONSE)} callback
* @param {boolean} useUnaryResponse
*/
GrpcWebClientBase.setCallback_ = function(stream, callback, useUnaryResponse) {
stream.on('data', function(response) {
callback(null, response);
});
stream.on('error', function(error) {
if (error.code != StatusCode.OK) {
callback(error, null);
}
});
stream.on('status', function(status) {
if (status.code != StatusCode.OK) {
callback(
{
code: status.code,
message: status.details,
metadata: status.metadata
},
null);
} else if (useUnaryResponse) {
callback(null, null, status);
}
});
if (useUnaryResponse) {
stream.on('metadata', function(metadata) {
callback(null, null, null, metadata);
});
stream.on('end', function() {
callback(null, null);
});
}
};
/** /**
* Create a new XhrIo object * Create a new XhrIo object
* *
@ -186,8 +233,6 @@ GrpcWebClientBase.prototype.newXhr_ = function() {
return new XhrIo(); return new XhrIo();
}; };
/** /**
* Encode the grpc-web request * Encode the grpc-web request
* *
@ -208,8 +253,6 @@ GrpcWebClientBase.prototype.encodeRequest_ = function(serialized) {
return payload; return payload;
}; };
/** /**
* @private * @private
* @param {!XhrIo} xhr The xhr object * @param {!XhrIo} xhr The xhr object