Some more internal refactor

This commit is contained in:
Stanley Cheung 2020-08-12 14:56:13 -07:00 committed by Stanley Cheung
parent 91702e7461
commit 6f5ecb0ad1
2 changed files with 87 additions and 54 deletions

View File

@ -51,7 +51,7 @@ const AbstractClientBase = class {
* methodDescriptor Information of this RPC method
* @param {function(?Error, ?)}
* callback A callback function which takes (error, RESPONSE or null)
* @return {!ClientReadableStream<RESPONSE>|undefined}
* @return {!ClientReadableStream<RESPONSE>}
*/
rpcCall(method, requestMessage, metadata, methodDescriptor, callback) {}

View File

@ -34,7 +34,6 @@ goog.module.declareLegacyNamespace();
const ClientReadableStream = goog.require('grpc.web.ClientReadableStream');
const ErrorCode = goog.require('goog.net.ErrorCode');
const EventType = goog.require('goog.net.EventType');
const GoogleRpcStatus = goog.require('proto.google.rpc.Status');
const GrpcWebError = goog.requireType('grpc.web.Error');
const NodeReadableStream = goog.require('goog.net.streams.NodeReadableStream');
const StatusCode = goog.require('grpc.web.StatusCode');
@ -110,7 +109,7 @@ class StreamBodyClientReadableStream {
/**
* @private
* @type {function(?):!Status|null}
* @type {function(?, !XhrIo=):!Status|null}
* A function to parse the Rpc Status response
*/
this.rpcStatusParseFn_ = null;
@ -124,66 +123,58 @@ class StreamBodyClientReadableStream {
/**
* Set up the callback functions for unary calls.
* @param {function(?GrpcWebError, ?)} callback
* @param {boolean} binaryResponse True if the client is using 'binary' mode
* @param {boolean} base64Encoded True if
* 'X-Goog-Encode-Response-If-Executable' is 'base64' in request headers
*/
setUnaryCallback(callback) {
setUnaryCallback(callback, binaryResponse, base64Encoded) {
this.onDataCallbacks_.push((response) => callback(null, response));
this.onErrorCallbacks_.push((error) => callback(error, null));
events.listen(/** @type {!XhrIo}*/ (this.xhr_), EventType.COMPLETE, (e) => {
if (this.xhr_.isSuccess()) {
// If the response is serialized as Base64 (for example if the
// X-Goog-Encode-Response-If-Executable header is in effect), decode it
// before passing it to the deserializer.
var responseText = this.xhr_.getResponseText();
if (this.xhr_.headers.get('X-Goog-Encode-Response-If-Executable') ==
'base64' &&
this.xhr_.getResponseHeader(XhrIo.CONTENT_TYPE_HEADER) ===
'text/plain') {
if (!atob) {
throw new Error('Cannot decode Base64 response');
}
responseText = atob(responseText);
let response;
if (binaryResponse) {
response = this.decodeBinaryResponse_(base64Encoded);
} else {
response = this.decodeJspbResponse_(base64Encoded);
}
var response = this.grpcResponseDeserializeFn_(responseText);
var grpcStatus = StatusCode.fromHttpStatus(this.xhr_.getStatus());
const responseMessage = this.grpcResponseDeserializeFn_(response);
const grpcStatus = StatusCode.fromHttpStatus(this.xhr_.getStatus());
if (grpcStatus == StatusCode.OK) {
this.sendDataCallbacks_(response);
this.sendDataCallbacks_(responseMessage);
} else {
callback(
/** @type {!GrpcWebError} */ ({
code: grpcStatus,
}),
response);
responseMessage);
}
} else {
var rawResponse = this.xhr_.getResponseText();
var code;
var message;
var metadata = {};
if (rawResponse) {
try {
var status = GoogleRpcStatus.deserialize(rawResponse);
code = status.getCode();
message = status.getMessage();
if (status.getDetailsList().length) {
metadata['grpc-web-status-details-bin'] = rawResponse;
}
} catch (e) {
// 404s may be accompanied by a GoogleRpcStatus. If they are not,
// the generic message will fail to deserialize because it is not a
// status.
if (this.xhr_.getStatus() == 404) {
code = StatusCode.NOT_FOUND;
message = 'Not Found: ' + this.xhr_.getLastUri();
} else {
code = StatusCode.UNAVAILABLE;
message = 'Unable to parse RpcStatus: ' + e;
}
let rawResponse;
if (binaryResponse) {
const xhrResponse = this.xhr_.getResponse();
if (xhrResponse) {
rawResponse =
new Uint8Array(/** @type {!ArrayBuffer} */ (xhrResponse));
}
} else {
code = StatusCode.UNAVAILABLE;
message = ErrorCode.getDebugMessage(this.xhr_.getLastErrorCode());
rawResponse = this.xhr_.getResponseText();
}
let code;
let message;
let metadata = {};
if (rawResponse) {
const status = this.rpcStatusParseFn_(rawResponse, this.xhr_);
code = status.code;
message = status.details;
metadata = status.metadata;
} else {
code = StatusCode.UNKNOWN;
message = 'Rpc failed due to xhr error. error code: ' +
this.xhr_.getLastErrorCode() +
', error: ' + this.xhr_.getLastError();
}
this.sendErrorCallbacks_({
code: code,
@ -201,11 +192,11 @@ class StreamBodyClientReadableStream {
// Add the callback to the underlying stream
this.xhrNodeReadableStream_.on('data', (data) => {
if ('1' in data) {
var response = this.grpcResponseDeserializeFn_(data['1']);
const response = this.grpcResponseDeserializeFn_(data['1']);
this.sendDataCallbacks_(response);
}
if ('2' in data) {
var status = this.rpcStatusParseFn_(data['2']);
const status = this.rpcStatusParseFn_(data['2']);
this.sendStatusCallbacks_(status);
}
});
@ -214,7 +205,7 @@ class StreamBodyClientReadableStream {
});
this.xhrNodeReadableStream_.on('error', () => {
if (this.onErrorCallbacks_.length == 0) return;
var lastErrorCode = this.xhr_.getLastErrorCode();
let lastErrorCode = this.xhr_.getLastErrorCode();
if (lastErrorCode === ErrorCode.NO_ERROR && !this.xhr_.isSuccess()) {
// The lastErrorCode on the XHR isn't useful in this case, but the XHR
// status is. Full details about the failure should be available in the
@ -222,7 +213,7 @@ class StreamBodyClientReadableStream {
lastErrorCode = ErrorCode.HTTP_ERROR;
}
var grpcStatusCode;
let grpcStatusCode;
switch (lastErrorCode) {
case ErrorCode.NO_ERROR:
grpcStatusCode = StatusCode.UNKNOWN;
@ -250,6 +241,48 @@ class StreamBodyClientReadableStream {
});
}
/**
* @private
* @param {boolean} base64Encoded
* @return {string}
*/
decodeJspbResponse_(base64Encoded) {
// If the response is serialized as Base64 (for example if the
// X-Goog-Encode-Response-If-Executable header is in effect), decode it
// before passing it to the deserializer.
let responseText = this.xhr_.getResponseText();
if (base64Encoded &&
this.xhr_.getResponseHeader(XhrIo.CONTENT_TYPE_HEADER) ===
'text/plain') {
if (!atob) {
throw new Error('Cannot decode Base64 response');
}
responseText = atob(responseText);
}
return responseText;
}
/**
* @private
* @param {boolean} base64Encoded
* @return {*}
*/
decodeBinaryResponse_(base64Encoded) {
if (base64Encoded &&
this.xhr_.getResponseHeader('X-Goog-Safety-Encoding') == 'base64') {
// Convert the response's ArrayBuffer to a string, which should
// be a base64 encoded string.
const bytes = new Uint8Array(
/** @type {!ArrayBuffer} */ (this.xhr_.getResponse()));
let byteSource = '';
for (let i = 0; i < bytes.length; i++) {
byteSource += String.fromCharCode(bytes[i]);
}
} else {
return this.xhr_.getResponse();
}
}
/**
* @override
* @export
@ -330,7 +363,7 @@ class StreamBodyClientReadableStream {
* @param {!RESPONSE} data The data to send back
*/
sendDataCallbacks_(data) {
for (var i = 0; i < this.onDataCallbacks_.length; i++) {
for (let i = 0; i < this.onDataCallbacks_.length; i++) {
this.onDataCallbacks_[i](data);
}
}
@ -340,7 +373,7 @@ class StreamBodyClientReadableStream {
* @param {!Status} status The status to send back
*/
sendStatusCallbacks_(status) {
for (var i = 0; i < this.onStatusCallbacks_.length; i++) {
for (let i = 0; i < this.onStatusCallbacks_.length; i++) {
this.onStatusCallbacks_[i](status);
}
}
@ -350,7 +383,7 @@ class StreamBodyClientReadableStream {
* @param {?} error The error to send back
*/
sendErrorCallbacks_(error) {
for (var i = 0; i < this.onErrorCallbacks_.length; i++) {
for (let i = 0; i < this.onErrorCallbacks_.length; i++) {
this.onErrorCallbacks_[i](error);
}
}
@ -359,7 +392,7 @@ class StreamBodyClientReadableStream {
* @private
*/
sendEndCallbacks_() {
for (var i = 0; i < this.onEndCallbacks_.length; i++) {
for (let i = 0; i < this.onEndCallbacks_.length; i++) {
this.onEndCallbacks_[i]();
}
}