From 0599c38922f49e70d898f84c91d3cc07e56bdda6 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Fri, 7 Aug 2020 15:36:30 -0700 Subject: [PATCH] Roll forward of #853 --- .../web/streambodyclientreadablestream.js | 109 +++++++++++++++--- 1 file changed, 94 insertions(+), 15 deletions(-) diff --git a/javascript/net/grpc/web/streambodyclientreadablestream.js b/javascript/net/grpc/web/streambodyclientreadablestream.js index 578a132..b98406c 100644 --- a/javascript/net/grpc/web/streambodyclientreadablestream.js +++ b/javascript/net/grpc/web/streambodyclientreadablestream.js @@ -33,9 +33,13 @@ goog.module.declareLegacyNamespace(); const ClientReadableStream = goog.require('grpc.web.ClientReadableStream'); const ErrorCode = goog.require('goog.net.ErrorCode'); +const EventType = goog.require('goog.net.EventType'); +const GoogleRpcStatus = goog.require('proto.google.rpc.Status'); +const GrpcWebError = goog.requireType('grpc.web.Error'); const NodeReadableStream = goog.require('goog.net.streams.NodeReadableStream'); const StatusCode = goog.require('grpc.web.StatusCode'); const XhrIo = goog.require('goog.net.XhrIo'); +const events = goog.require('goog.events'); const {GenericTransportInterface} = goog.require('grpc.web.GenericTransportInterface'); const {Status} = goog.require('grpc.web.Status'); @@ -111,7 +115,83 @@ class StreamBodyClientReadableStream { */ this.rpcStatusParseFn_ = null; - this.setStreamCallback_(); + if (this.xhrNodeReadableStream_) { + this.setStreamCallback_(); + } + } + + + /** + * Set up the callback functions for unary calls. + * @param {function(?GrpcWebError, ?)} callback + */ + setUnaryCallback(callback) { + this.onDataCallbacks_.push((response) => callback(null, response)); + this.onErrorCallbacks_.push((error) => callback(error, null)); + + events.listen(/** @type {!XhrIo}*/ (this.xhr_), EventType.COMPLETE, (e) => { + if (this.xhr_.isSuccess()) { + // If the response is serialized as Base64 (for example if the + // X-Goog-Encode-Response-If-Executable header is in effect), decode it + // before passing it to the deserializer. + var responseText = this.xhr_.getResponseText(); + if (this.xhr_.headers.get('X-Goog-Encode-Response-If-Executable') == + 'base64' && + this.xhr_.getResponseHeader(XhrIo.CONTENT_TYPE_HEADER) === + 'text/plain') { + if (!atob) { + throw new Error('Cannot decode Base64 response'); + } + responseText = atob(responseText); + } + + var response = this.grpcResponseDeserializeFn_(responseText); + var grpcStatus = StatusCode.fromHttpStatus(this.xhr_.getStatus()); + if (grpcStatus == StatusCode.OK) { + this.sendDataCallbacks_(response); + } else { + callback( + /** @type {!GrpcWebError} */ ({ + code: grpcStatus, + }), + response); + } + } else { + var rawResponse = this.xhr_.getResponseText(); + var code; + var message; + var metadata = {}; + if (rawResponse) { + try { + var status = GoogleRpcStatus.deserialize(rawResponse); + code = status.getCode(); + message = status.getMessage(); + if (status.getDetailsList().length) { + metadata['grpc-web-status-details-bin'] = rawResponse; + } + } catch (e) { + // 404s may be accompanied by a GoogleRpcStatus. If they are not, + // the generic message will fail to deserialize because it is not a + // status. + if (this.xhr_.getStatus() == 404) { + code = StatusCode.NOT_FOUND; + message = 'Not Found: ' + this.xhr_.getLastUri(); + } else { + code = StatusCode.UNAVAILABLE; + message = 'Unable to parse RpcStatus: ' + e; + } + } + } else { + code = StatusCode.UNAVAILABLE; + message = ErrorCode.getDebugMessage(this.xhr_.getLastErrorCode()); + } + this.sendErrorCallbacks_({ + code: code, + message: message, + metadata: metadata, + }); + } + }); } /** @@ -119,24 +199,23 @@ class StreamBodyClientReadableStream { */ setStreamCallback_() { // Add the callback to the underlying stream - var self = this; - this.xhrNodeReadableStream_.on('data', function(data) { + this.xhrNodeReadableStream_.on('data', (data) => { if ('1' in data) { - var response = self.grpcResponseDeserializeFn_(data['1']); - self.sendDataCallbacks_(response); + var response = this.grpcResponseDeserializeFn_(data['1']); + this.sendDataCallbacks_(response); } if ('2' in data) { - var status = self.rpcStatusParseFn_(data['2']); - self.sendStatusCallbacks_(status); + var status = this.rpcStatusParseFn_(data['2']); + this.sendStatusCallbacks_(status); } }); - this.xhrNodeReadableStream_.on('end', function() { - self.sendEndCallbacks_(); + this.xhrNodeReadableStream_.on('end', () => { + this.sendEndCallbacks_(); }); - this.xhrNodeReadableStream_.on('error', function() { - if (self.onErrorCallbacks_.length == 0) return; - var lastErrorCode = self.xhr_.getLastErrorCode(); - if (lastErrorCode === ErrorCode.NO_ERROR && !self.xhr_.isSuccess()) { + this.xhrNodeReadableStream_.on('error', () => { + if (this.onErrorCallbacks_.length == 0) return; + var lastErrorCode = this.xhr_.getLastErrorCode(); + if (lastErrorCode === ErrorCode.NO_ERROR && !this.xhr_.isSuccess()) { // The lastErrorCode on the XHR isn't useful in this case, but the XHR // status is. Full details about the failure should be available in the // status handler. @@ -155,13 +234,13 @@ class StreamBodyClientReadableStream { grpcStatusCode = StatusCode.DEADLINE_EXCEEDED; break; case ErrorCode.HTTP_ERROR: - grpcStatusCode = StatusCode.fromHttpStatus(self.xhr_.getStatus()); + grpcStatusCode = StatusCode.fromHttpStatus(this.xhr_.getStatus()); break; default: grpcStatusCode = StatusCode.UNAVAILABLE; } - self.sendErrorCallbacks_({ + this.sendErrorCallbacks_({ code: grpcStatusCode, // TODO(armiller): get the message from the response? // GoogleRpcStatus.deserialize(rawResponse).getMessage()?