diff --git a/javascript/net/grpc/web/calloptions.js b/javascript/net/grpc/web/calloptions.js index 845db7c..2a9692e 100644 --- a/javascript/net/grpc/web/calloptions.js +++ b/javascript/net/grpc/web/calloptions.js @@ -7,53 +7,60 @@ goog.module.declareLegacyNamespace(); /** * The collection of runtime options for a new RPC call. - * @param {!Object=} options - * @constructor + * @unrestricted */ -const CallOptions = function(options) { +class CallOptions { /** - * @const {!Object} - * @private + * @param {!Object=} options */ - this.properties_ = options || {}; -}; + constructor(options) { + /** + * @const {!Object} + * @private + */ + this.properties_ = options || {}; + } -/** - * Add a new CallOption or override an existing one. - * - * @param {string} name name of the CallOption that should be added/overridden. - * @param {VALUE} value value of the CallOption - * @template VALUE - */ -CallOptions.prototype.setOption = function(name, value) { - this.properties_[name] = value; -}; + /** + * Add a new CallOption or override an existing one. + * + * @param {string} name name of the CallOption that should be + * added/overridden. + * @param {VALUE} value value of the CallOption + * @template VALUE + */ + setOption(name, value) { + this.properties_[name] = value; + } -/** - * Get the value of one CallOption. - * - * @param {string} name name of the CallOption. - * @return {!Object} value of the CallOption. If name doesn't exist, will return - * 'undefined'. - */ -CallOptions.prototype.get = function(name) { - return this.properties_[name]; -}; + /** + * Get the value of one CallOption. + * + * @param {string} name name of the CallOption. + * @return {!Object} value of the CallOption. If name doesn't exist, will + * return 'undefined'. + */ + get(name) { + return this.properties_[name]; + } + + /** + * Remove a CallOption. + * + * @param {string} name name of the CallOption that shoud be removed. + */ + removeOption(name) { + delete this.properties_[name]; + } + + /** + * @return {!Array} + */ + getKeys() { + return Object.keys(this.properties_); + } +} -/** - * Remove a CallOption. - * - * @param {string} name name of the CallOption that shoud be removed. - */ -CallOptions.prototype.removeOption = function(name) { - delete this.properties_[name]; -}; -/** - * @return {!Array} - */ -CallOptions.prototype.getKeys = function() { - return Object.keys(this.properties_); -}; exports = CallOptions; diff --git a/javascript/net/grpc/web/grpcwebclientbase.js b/javascript/net/grpc/web/grpcwebclientbase.js index e66ef56..2128fb6 100644 --- a/javascript/net/grpc/web/grpcwebclientbase.js +++ b/javascript/net/grpc/web/grpcwebclientbase.js @@ -47,322 +47,319 @@ const {StreamInterceptor, UnaryInterceptor} = goog.require('grpc.web.Interceptor /** * Base class for gRPC web client using the application/grpc-web wire format - * @param {?Object=} opt_options - * @constructor * @implements {AbstractClientBase} + * @unrestricted */ -const GrpcWebClientBase = function(opt_options) { +class GrpcWebClientBase { /** - * @const - * @private {string} + * @param {?Object=} opt_options */ - this.format_ = - goog.getObjectByName('format', opt_options) || "text"; + constructor(opt_options) { + /** + * @const + * @private {string} + */ + this.format_ = goog.getObjectByName('format', opt_options) || 'text'; - /** - * @const - * @private {boolean} - */ - this.suppressCorsPreflight_ = - goog.getObjectByName('suppressCorsPreflight', opt_options) || false; + /** + * @const + * @private {boolean} + */ + this.suppressCorsPreflight_ = + goog.getObjectByName('suppressCorsPreflight', opt_options) || false; - /** - * @const - * @private {boolean} - */ - this.withCredentials_ = - goog.getObjectByName('withCredentials', opt_options) || false; - /** - * @const {!Array} - * @private - */ - this.streamInterceptors_ = - goog.getObjectByName('streamInterceptors', opt_options) || []; + /** + * @const + * @private {boolean} + */ + this.withCredentials_ = + goog.getObjectByName('withCredentials', opt_options) || false; + /** + * @const {!Array} + * @private + */ + this.streamInterceptors_ = + goog.getObjectByName('streamInterceptors', opt_options) || []; - /** - * @const {!Array} - * @private - */ - this.unaryInterceptors_ = - goog.getObjectByName('unaryInterceptors', opt_options) || []; -}; - - -/** - * @override - * @export - */ -GrpcWebClientBase.prototype.rpcCall = function( - method, requestMessage, metadata, methodDescriptor, callback) { - methodDescriptor = AbstractClientBase.ensureMethodDescriptor( - method, requestMessage, MethodType.UNARY, methodDescriptor); - var hostname = AbstractClientBase.getHostname(method, methodDescriptor); - var invoker = GrpcWebClientBase.runInterceptors_( - (request) => this.startStream_(request, hostname), - this.streamInterceptors_); - var stream = /** @type {!ClientReadableStream} */ (invoker.call( - this, methodDescriptor.createRequest(requestMessage, metadata))); - GrpcWebClientBase.setCallback_(stream, callback, false); - return stream; -}; - - -/** - * @override - * @export - */ -GrpcWebClientBase.prototype.thenableCall = function( - method, requestMessage, metadata, methodDescriptor) { - methodDescriptor = AbstractClientBase.ensureMethodDescriptor( - method, requestMessage, MethodType.UNARY, methodDescriptor); - var hostname = AbstractClientBase.getHostname(method, methodDescriptor); - var initialInvoker = (request) => new Promise((resolve, reject) => { - var stream = this.startStream_(request, hostname); - var unaryMetadata; - var unaryStatus; - var unaryMsg; - GrpcWebClientBase.setCallback_( - stream, (error, response, status, metadata) => { - if (error) { - reject(error); - } else if (response) { - unaryMsg = response; - } else if (status) { - unaryStatus = status; - } else if (metadata) { - unaryMetadata = metadata; - } else { - resolve(new UnaryResponse(unaryMsg, unaryMetadata, unaryStatus)); - } - }, true); - }); - var invoker = GrpcWebClientBase.runInterceptors_( - initialInvoker, this.unaryInterceptors_); - var unaryResponse = /** @type {!Promise} */ (invoker.call( - this, methodDescriptor.createRequest(requestMessage, metadata))); - return unaryResponse.then((response) => response.getResponseMessage()); -}; - -/** - * @export - * @param {string} method The method to invoke - * @param {REQUEST} requestMessage The request proto - * @param {!Object} metadata User defined call metadata - * @param {!MethodDescriptor| - * !AbstractClientBase.MethodInfo} - * methodDescriptor Information of this RPC method - * @return {!Promise} - * @template REQUEST, RESPONSE - */ -GrpcWebClientBase.prototype.unaryCall = function( - method, requestMessage, metadata, methodDescriptor) { - return /** @type {!Promise}*/ ( - this.thenableCall(method, requestMessage, metadata, methodDescriptor)); -}; - - -/** - * @override - * @export - */ -GrpcWebClientBase.prototype.serverStreaming = function( - method, requestMessage, metadata, methodDescriptor) { - methodDescriptor = AbstractClientBase.ensureMethodDescriptor( - method, requestMessage, MethodType.SERVER_STREAMING, methodDescriptor); - var hostname = AbstractClientBase.getHostname(method, methodDescriptor); - var invoker = GrpcWebClientBase.runInterceptors_( - (request) => this.startStream_(request, hostname), - this.streamInterceptors_); - return /** @type {!ClientReadableStream} */ (invoker.call( - this, methodDescriptor.createRequest(requestMessage, metadata))); -}; - - -/** - * @private - * @template REQUEST, RESPONSE - * @param {!Request} request - * @param {string} hostname - * @return {!ClientReadableStream} - */ -GrpcWebClientBase.prototype.startStream_ = function(request, hostname) { - var methodDescriptor = request.getMethodDescriptor(); - var path = hostname + methodDescriptor.name; - - var xhr = this.newXhr_(); - xhr.setWithCredentials(this.withCredentials_); - - var genericTransportInterface = { - xhr: xhr, - }; - var stream = new GrpcWebClientReadableStream(genericTransportInterface); - stream.setResponseDeserializeFn(methodDescriptor.responseDeserializeFn); - - xhr.headers.addAll(request.getMetadata()); - this.processHeaders_(xhr); - if (this.suppressCorsPreflight_) { - var headerObject = xhr.headers.toObject(); - xhr.headers.clear(); - path = GrpcWebClientBase.setCorsOverride_(path, headerObject); + /** + * @const {!Array} + * @private + */ + this.unaryInterceptors_ = + goog.getObjectByName('unaryInterceptors', opt_options) || []; } - var serialized = - methodDescriptor.requestSerializeFn(request.getRequestMessage()); - var payload = this.encodeRequest_(serialized); - if (this.format_ == 'text') { - payload = googCrypt.encodeByteArray(payload); - } else if (this.format_ == 'binary') { - xhr.setResponseType(XhrIo.ResponseType.ARRAY_BUFFER); + /** + * @override + * @export + */ + rpcCall(method, requestMessage, metadata, methodDescriptor, callback) { + methodDescriptor = AbstractClientBase.ensureMethodDescriptor( + method, requestMessage, MethodType.UNARY, methodDescriptor); + var hostname = AbstractClientBase.getHostname(method, methodDescriptor); + var invoker = GrpcWebClientBase.runInterceptors_( + (request) => this.startStream_(request, hostname), + this.streamInterceptors_); + var stream = /** @type {!ClientReadableStream} */ (invoker.call( + this, methodDescriptor.createRequest(requestMessage, metadata))); + GrpcWebClientBase.setCallback_(stream, callback, false); + return stream; } - xhr.send(path, 'POST', payload); - return stream; -}; + /** + * @override + * @export + */ + thenableCall(method, requestMessage, metadata, methodDescriptor) { + methodDescriptor = AbstractClientBase.ensureMethodDescriptor( + method, requestMessage, MethodType.UNARY, methodDescriptor); + var hostname = AbstractClientBase.getHostname(method, methodDescriptor); + var initialInvoker = (request) => new Promise((resolve, reject) => { + var stream = this.startStream_(request, hostname); + var unaryMetadata; + var unaryStatus; + var unaryMsg; + GrpcWebClientBase.setCallback_( + stream, (error, response, status, metadata) => { + if (error) { + reject(error); + } else if (response) { + unaryMsg = response; + } else if (status) { + unaryStatus = status; + } else if (metadata) { + unaryMetadata = metadata; + } else { + resolve(new UnaryResponse(unaryMsg, unaryMetadata, unaryStatus)); + } + }, true); + }); + var invoker = GrpcWebClientBase.runInterceptors_( + initialInvoker, this.unaryInterceptors_); + var unaryResponse = /** @type {!Promise} */ (invoker.call( + this, methodDescriptor.createRequest(requestMessage, metadata))); + return unaryResponse.then((response) => response.getResponseMessage()); + } -/** - * @private - * @static - * @template RESPONSE - * @param {!ClientReadableStream} stream - * @param {function(?Error, ?RESPONSE, ?Status=, ?Metadata=)| - * function(?Error,?RESPONSE)} callback - * @param {boolean} useUnaryResponse - */ -GrpcWebClientBase.setCallback_ = function(stream, callback, useUnaryResponse) { - var responseReceived = null; - var errorEmitted = false; + /** + * @export + * @param {string} method The method to invoke + * @param {REQUEST} requestMessage The request proto + * @param {!Object} metadata User defined call metadata + * @param {!MethodDescriptor| + * !AbstractClientBase.MethodInfo} + * methodDescriptor Information of this RPC method + * @return {!Promise} + * @template REQUEST, RESPONSE + */ + unaryCall(method, requestMessage, metadata, methodDescriptor) { + return /** @type {!Promise}*/ ( + this.thenableCall(method, requestMessage, metadata, methodDescriptor)); + } - stream.on('data', function(response) { - responseReceived = response; - }); + /** + * @override + * @export + */ + serverStreaming(method, requestMessage, metadata, methodDescriptor) { + methodDescriptor = AbstractClientBase.ensureMethodDescriptor( + method, requestMessage, MethodType.SERVER_STREAMING, methodDescriptor); + var hostname = AbstractClientBase.getHostname(method, methodDescriptor); + var invoker = GrpcWebClientBase.runInterceptors_( + (request) => this.startStream_(request, hostname), + this.streamInterceptors_); + return /** @type {!ClientReadableStream} */ (invoker.call( + this, methodDescriptor.createRequest(requestMessage, metadata))); + } - stream.on('error', function(error) { - if (error.code != StatusCode.OK && !errorEmitted) { - errorEmitted = true; - callback(error, null); + /** + * @private + * @template REQUEST, RESPONSE + * @param {!Request} request + * @param {string} hostname + * @return {!ClientReadableStream} + */ + startStream_(request, hostname) { + var methodDescriptor = request.getMethodDescriptor(); + var path = hostname + methodDescriptor.name; + + var xhr = this.newXhr_(); + xhr.setWithCredentials(this.withCredentials_); + + var genericTransportInterface = { + xhr: xhr, + }; + var stream = new GrpcWebClientReadableStream(genericTransportInterface); + stream.setResponseDeserializeFn(methodDescriptor.responseDeserializeFn); + + xhr.headers.addAll(request.getMetadata()); + this.processHeaders_(xhr); + if (this.suppressCorsPreflight_) { + var headerObject = xhr.headers.toObject(); + xhr.headers.clear(); + path = GrpcWebClientBase.setCorsOverride_(path, headerObject); } - }); - stream.on('status', function(status) { - if (status.code != StatusCode.OK && !errorEmitted) { - errorEmitted = true; - callback( - { - code: status.code, - message: status.details, - metadata: status.metadata - }, - null); - } else if (useUnaryResponse) { - callback(null, null, status); + var serialized = + methodDescriptor.requestSerializeFn(request.getRequestMessage()); + var payload = this.encodeRequest_(serialized); + if (this.format_ == 'text') { + payload = googCrypt.encodeByteArray(payload); + } else if (this.format_ == 'binary') { + xhr.setResponseType(XhrIo.ResponseType.ARRAY_BUFFER); } - }); + xhr.send(path, 'POST', payload); + return stream; + } - if (useUnaryResponse) { - stream.on('metadata', function(metadata) { - callback(null, null, null, metadata); + /** + * @private + * @static + * @template RESPONSE + * @param {!ClientReadableStream} stream + * @param {function(?Error, ?RESPONSE, ?Status=, ?Metadata=)| + * function(?Error,?RESPONSE)} callback + * @param {boolean} useUnaryResponse + */ + static setCallback_(stream, callback, useUnaryResponse) { + var responseReceived = null; + var errorEmitted = false; + + stream.on('data', function(response) { + responseReceived = response; + }); + + stream.on('error', function(error) { + if (error.code != StatusCode.OK && !errorEmitted) { + errorEmitted = true; + callback(error, null); + } + }); + + stream.on('status', function(status) { + if (status.code != StatusCode.OK && !errorEmitted) { + errorEmitted = true; + callback( + { + code: status.code, + message: status.details, + metadata: status.metadata + }, + null); + } else if (useUnaryResponse) { + callback(null, null, status); + } + }); + + if (useUnaryResponse) { + stream.on('metadata', function(metadata) { + callback(null, null, null, metadata); + }); + } + + stream.on('end', function() { + if (!errorEmitted) { + callback(null, responseReceived); + } + if (useUnaryResponse) { + callback(null, null); // trigger unaryResponse + } }); } - stream.on('end', function() { - if (!errorEmitted) { - callback(null, responseReceived); - } - if (useUnaryResponse) { - callback(null, null); // trigger unaryResponse - } - }); -}; - -/** - * Create a new XhrIo object - * - * @private - * @return {!XhrIo} The created XhrIo object - */ -GrpcWebClientBase.prototype.newXhr_ = function() { - return new XhrIo(); -}; - -/** - * Encode the grpc-web request - * - * @private - * @param {!Uint8Array} serialized The serialized proto payload - * @return {!Uint8Array} The application/grpc-web padded request - */ -GrpcWebClientBase.prototype.encodeRequest_ = function(serialized) { - var len = serialized.length; - var bytesArray = [0, 0, 0, 0]; - var payload = new Uint8Array(5 + len); - for (var i = 3; i >= 0; i--) { - bytesArray[i] = (len % 256); - len = len >>> 8; + /** + * Create a new XhrIo object + * + * @private + * @return {!XhrIo} The created XhrIo object + */ + newXhr_() { + return new XhrIo(); } - payload.set(new Uint8Array(bytesArray), 1); - payload.set(serialized, 5); - return payload; -}; -/** - * @private - * @param {!XhrIo} xhr The xhr object - */ -GrpcWebClientBase.prototype.processHeaders_ = function(xhr) { - if (this.format_ == "text") { - xhr.headers.set('Content-Type', 'application/grpc-web-text'); - xhr.headers.set('Accept', 'application/grpc-web-text'); - } else { - xhr.headers.set('Content-Type', 'application/grpc-web+proto'); - } - xhr.headers.set('X-User-Agent', 'grpc-web-javascript/0.1'); - xhr.headers.set('X-Grpc-Web', '1'); - if (xhr.headers.containsKey('deadline')) { - var deadline = xhr.headers.get('deadline'); // in ms - var currentTime = (new Date()).getTime(); - var timeout = Math.round(deadline - currentTime); - xhr.headers.remove('deadline'); - if (timeout === Infinity) { - // grpc-timeout header defaults to infinity if not set. - timeout = 0; + /** + * Encode the grpc-web request + * + * @private + * @param {!Uint8Array} serialized The serialized proto payload + * @return {!Uint8Array} The application/grpc-web padded request + */ + encodeRequest_(serialized) { + var len = serialized.length; + var bytesArray = [0, 0, 0, 0]; + var payload = new Uint8Array(5 + len); + for (var i = 3; i >= 0; i--) { + bytesArray[i] = (len % 256); + len = len >>> 8; } - if (timeout > 0) { - xhr.headers.set('grpc-timeout', timeout + 'm'); + payload.set(new Uint8Array(bytesArray), 1); + payload.set(serialized, 5); + return payload; + } + + /** + * @private + * @param {!XhrIo} xhr The xhr object + */ + processHeaders_(xhr) { + if (this.format_ == 'text') { + xhr.headers.set('Content-Type', 'application/grpc-web-text'); + xhr.headers.set('Accept', 'application/grpc-web-text'); + } else { + xhr.headers.set('Content-Type', 'application/grpc-web+proto'); + } + xhr.headers.set('X-User-Agent', 'grpc-web-javascript/0.1'); + xhr.headers.set('X-Grpc-Web', '1'); + if (xhr.headers.containsKey('deadline')) { + var deadline = xhr.headers.get('deadline'); // in ms + var currentTime = (new Date()).getTime(); + var timeout = Math.round(deadline - currentTime); + xhr.headers.remove('deadline'); + if (timeout === Infinity) { + // grpc-timeout header defaults to infinity if not set. + timeout = 0; + } + if (timeout > 0) { + xhr.headers.set('grpc-timeout', timeout + 'm'); + } } } -}; -/** - * @private - * @static - * @param {string} method The method to invoke - * @param {!Object} headerObject The xhr headers - * @return {string} The URI object or a string path with headers - */ -GrpcWebClientBase.setCorsOverride_ = function(method, headerObject) { - return /** @type {string} */ (HttpCors.setHttpHeadersWithOverwriteParam( - method, HttpCors.HTTP_HEADERS_PARAM_NAME, headerObject)); -}; + /** + * @private + * @static + * @param {string} method The method to invoke + * @param {!Object} headerObject The xhr headers + * @return {string} The URI object or a string path with headers + */ + static setCorsOverride_(method, headerObject) { + return /** @type {string} */ (HttpCors.setHttpHeadersWithOverwriteParam( + method, HttpCors.HTTP_HEADERS_PARAM_NAME, headerObject)); + } + + /** + * @private + * @static + * @template REQUEST, RESPONSE + * @param {function(!Request): + * (!Promise|!ClientReadableStream)} invoker + * @param {!Array} + * interceptors + * @return {function(!Request): + * (!Promise|!ClientReadableStream)} + */ + static runInterceptors_(invoker, interceptors) { + let curInvoker = invoker; + interceptors.forEach((interceptor) => { + const lastInvoker = curInvoker; + curInvoker = (request) => interceptor.intercept(request, lastInvoker); + }); + return curInvoker; + } +} + + -/** - * @private - * @static - * @template REQUEST, RESPONSE - * @param {function(!Request): - * (!Promise|!ClientReadableStream)} invoker - * @param {!Array} - * interceptors - * @return {function(!Request): - * (!Promise|!ClientReadableStream)} - */ -GrpcWebClientBase.runInterceptors_ = function(invoker, interceptors) { - let curInvoker = invoker; - interceptors.forEach((interceptor) => { - const lastInvoker = curInvoker; - curInvoker = (request) => interceptor.intercept(request, lastInvoker); - }); - return curInvoker; -}; exports = GrpcWebClientBase; diff --git a/javascript/net/grpc/web/grpcwebclientbase_test.js b/javascript/net/grpc/web/grpcwebclientbase_test.js index 177d80a..77c8b14 100644 --- a/javascript/net/grpc/web/grpcwebclientbase_test.js +++ b/javascript/net/grpc/web/grpcwebclientbase_test.js @@ -28,17 +28,17 @@ var testSuite = goog.require('goog.testing.testSuite'); const {StreamInterceptor} = goog.require('grpc.web.Interceptor'); goog.require('goog.testing.jsunit'); -var REQUEST_BYTES = [1,2,3]; -var FAKE_METHOD = "fake-method"; -var PROTO_FIELD_VALUE = "meow"; +var REQUEST_BYTES = [1, 2, 3]; +var FAKE_METHOD = 'fake-method'; +var PROTO_FIELD_VALUE = 'meow'; var EXPECTED_HEADERS; var EXPECTED_HEADER_VALUES; -var EXPECTED_UNARY_HEADERS = ['Content-Type', 'Accept', - 'X-User-Agent', 'X-Grpc-Web']; -var EXPECTED_UNARY_HEADER_VALUES = ['application/grpc-web-text', - 'application/grpc-web-text', - 'grpc-web-javascript/0.1', - '1']; +var EXPECTED_UNARY_HEADERS = + ['Content-Type', 'Accept', 'X-User-Agent', 'X-Grpc-Web']; +var EXPECTED_UNARY_HEADER_VALUES = [ + 'application/grpc-web-text', 'application/grpc-web-text', + 'grpc-web-javascript/0.1', '1' +]; var dataCallback; @@ -62,25 +62,26 @@ testSuite({ client.newXhr_ = function() { return new MockXhr({ // This parses to [ { DATA: [4,5,6] }, { TRAILER: "a: b" } ] - response: googCrypt.encodeByteArray(new Uint8Array([ - 0, 0, 0, 0, 3, 4, 5, 6, 128, 0, 0, 0, 4, 97, 58, 32, 98 - ])), + response: googCrypt.encodeByteArray(new Uint8Array( + [0, 0, 0, 0, 3, 4, 5, 6, 128, 0, 0, 0, 4, 97, 58, 32, 98])), }); }; expectUnaryHeaders(); - client.rpcCall(FAKE_METHOD, {}, {}, { - requestSerializeFn : function(request) { - return REQUEST_BYTES; - }, - responseDeserializeFn : function(bytes) { - assertElementsEquals([4,5,6], [].slice.call(bytes)); - return {"field1": PROTO_FIELD_VALUE}; - } - }, function(error, response) { - assertNull(error); - assertEquals(PROTO_FIELD_VALUE, response['field1']); - }); + client.rpcCall( + FAKE_METHOD, {}, {}, { + requestSerializeFn: function(request) { + return REQUEST_BYTES; + }, + responseDeserializeFn: function(bytes) { + assertElementsEquals([4, 5, 6], [].slice.call(bytes)); + return {'field1': PROTO_FIELD_VALUE}; + } + }, + function(error, response) { + assertNull(error); + assertEquals(PROTO_FIELD_VALUE, response['field1']); + }); dataCallback(); }, @@ -119,23 +120,26 @@ testSuite({ return new MockXhr({ // This decodes to "grpc-status: 3" response: googCrypt.encodeByteArray(new Uint8Array([ - 128, 0, 0, 0, 14, 103, 114, 112, 99, 45, 115, 116, 97, 116, 117, 115, 58, 32, 51 + 128, 0, 0, 0, 14, 103, 114, 112, 99, 45, 115, 116, 97, 116, 117, 115, + 58, 32, 51 ])), }); }; expectUnaryHeaders(); - client.rpcCall(FAKE_METHOD, {}, {}, { - requestSerializeFn : function(request) { - return REQUEST_BYTES; - }, - responseDeserializeFn : function(bytes) { - return {}; - } - }, function(error, response) { - assertNull(response); - assertEquals(3, error.code); - }); + client.rpcCall( + FAKE_METHOD, {}, {}, { + requestSerializeFn: function(request) { + return REQUEST_BYTES; + }, + responseDeserializeFn: function(bytes) { + return {}; + } + }, + function(error, response) { + assertNull(response); + assertEquals(3, error.code); + }); dataCallback(); }, @@ -144,28 +148,29 @@ testSuite({ client.newXhr_ = function() { return new MockXhr({ // This parses to [ { DATA: [4,5,6] }, { TRAILER: "a: b" } ] - response: googCrypt.encodeByteArray(new Uint8Array([ - 0, 0, 0, 0, 3, 4, 5, 6, 128, 0, 0, 0, 4, 97, 58, 32, 98 - ])), + response: googCrypt.encodeByteArray(new Uint8Array( + [0, 0, 0, 0, 3, 4, 5, 6, 128, 0, 0, 0, 4, 97, 58, 32, 98])), }); }; expectUnaryHeaders(); - var call = client.rpcCall(FAKE_METHOD, {}, {}, { - requestSerializeFn : function(request) { - return REQUEST_BYTES; - }, - responseDeserializeFn : function(bytes) { - assertElementsEquals([4,5,6], [].slice.call(bytes)); - return {"field1": PROTO_FIELD_VALUE}; - } - }, function(error, response) { - assertNull(error); - assertEquals(PROTO_FIELD_VALUE, response['field1']); - }); + var call = client.rpcCall( + FAKE_METHOD, {}, {}, { + requestSerializeFn: function(request) { + return REQUEST_BYTES; + }, + responseDeserializeFn: function(bytes) { + assertElementsEquals([4, 5, 6], [].slice.call(bytes)); + return {'field1': PROTO_FIELD_VALUE}; + } + }, + function(error, response) { + assertNull(error); + assertEquals(PROTO_FIELD_VALUE, response['field1']); + }); call.on('metadata', (metadata) => { - assertEquals(metadata['sample-initial-metadata-1'], - 'sample-initial-metadata-val'); + assertEquals( + metadata['sample-initial-metadata-1'], 'sample-initial-metadata-val'); }); dataCallback(); } @@ -180,133 +185,132 @@ function expectUnaryHeaders() { } -/** - * @constructor - * @param {?Object} mockValues - * Mock XhrIO object to test the outgoing values - */ -function MockXhr(mockValues) { - this.mockValues = mockValues; - this.headers = new Map(); +/** @unrestricted */ +class MockXhr { + /** + * @param {?Object} mockValues + * Mock XhrIO object to test the outgoing values + */ + constructor(mockValues) { + this.mockValues = mockValues; + this.headers = new Map(); + } + + /** + * @param {string} url + * @param {string=} opt_method + * @param {string=} opt_content + * @param {string=} opt_headers + */ + send(url, opt_method, opt_content, opt_headers) { + assertEquals(FAKE_METHOD, url); + assertEquals('POST', opt_method); + assertElementsEquals( + googCrypt.encodeByteArray(new Uint8Array([0, 0, 0, 0, 3, 1, 2, 3])), + opt_content); + assertElementsEquals(EXPECTED_HEADERS, this.headers.getKeys()); + assertElementsEquals(EXPECTED_HEADER_VALUES, this.headers.getValues()); + } + + /** + * @param {boolean} withCredentials + */ + setWithCredentials(withCredentials) { + return; + } + + /** + * @return {string} response + */ + getResponseText() { + return this.mockValues.response; + } + + /** + * @param {string} key header key + * @return {string} content-type + */ + getStreamingResponseHeader(key) { + return 'application/grpc-web-text'; + } + + /** + * @return {string} response + */ + getResponseHeaders() { + return {'sample-initial-metadata-1': 'sample-initial-metadata-val'}; + } + + /** + * @return {number} xhr state + */ + getReadyState() { + return 0; + } + + /** + * @return {number} lastErrorCode + */ + getLastErrorCode() { + return 0; + } + + /** + * @return {string} lastError + */ + getLastError() { + return 'server not responding'; + } + + /** + * @param {string} responseType + */ + setResponseType(responseType) { + return; + } } -/** - * @param {string} url - * @param {string=} opt_method - * @param {string=} opt_content - * @param {string=} opt_headers - */ -MockXhr.prototype.send = function(url, opt_method, opt_content, opt_headers) { - assertEquals(FAKE_METHOD, url); - assertEquals("POST", opt_method); - assertElementsEquals(googCrypt.encodeByteArray(new Uint8Array([0, 0, 0, 0, 3, 1, 2, 3])), opt_content); - assertElementsEquals(EXPECTED_HEADERS, this.headers.getKeys()); - assertElementsEquals(EXPECTED_HEADER_VALUES, this.headers.getValues()); -}; - /** - * @param {boolean} withCredentials - */ -MockXhr.prototype.setWithCredentials = function(withCredentials) { - return; -}; - - -/** - * @return {string} response - */ -MockXhr.prototype.getResponseText = function() { - return this.mockValues.response; -}; - - -/** - * @param {string} key header key - * @return {string} content-type - */ -MockXhr.prototype.getStreamingResponseHeader = function(key) { - return 'application/grpc-web-text'; -}; - - -/** - * @return {string} response - */ -MockXhr.prototype.getResponseHeaders = function() { - return {'sample-initial-metadata-1': 'sample-initial-metadata-val'}; -}; - - -/** - * @return {number} xhr state - */ -MockXhr.prototype.getReadyState = function() { - return 0; -}; - - -/** - * @return {number} lastErrorCode - */ -MockXhr.prototype.getLastErrorCode = function() { - return 0; -}; - - -/** - * @return {string} lastError - */ -MockXhr.prototype.getLastError = function() { - return 'server not responding'; -}; - - -/** - * @param {string} responseType - */ -MockXhr.prototype.setResponseType = function(responseType) { - return; -}; - -/** - * @constructor * @implements {StreamInterceptor} + * @unrestricted */ -const StreamResponseInterceptor = function() {}; - -/** @override */ -StreamResponseInterceptor.prototype.intercept = function(request, invoker) { - /** - * @implements {ClientReadableStream} - * @constructor - * @param {!ClientReadableStream} stream - * @template RESPONSE - */ - const InterceptedStream = function(stream) { - this.stream = stream; - }; +class StreamResponseInterceptor { + constructor() {} /** @override */ - InterceptedStream.prototype.on = function(eventType, callback) { - if (eventType == 'data') { - const newCallback = (response) => { - response['field2'] = 'field2'; - callback(response); - }; - this.stream.on(eventType, newCallback); - } else { - this.stream.on(eventType, callback); - } - return this; - }; + intercept(request, invoker) { + /** + * @implements {ClientReadableStream} + * @constructor + * @param {!ClientReadableStream} stream + * @template RESPONSE + */ + const InterceptedStream = function(stream) { + this.stream = stream; + }; - /** @override */ - InterceptedStream.prototype.cancel = function() { - this.stream.cancel(); - return this; - }; + /** @override */ + InterceptedStream.prototype.on = function(eventType, callback) { + if (eventType == 'data') { + const newCallback = (response) => { + response['field2'] = 'field2'; + callback(response); + }; + this.stream.on(eventType, newCallback); + } else { + this.stream.on(eventType, callback); + } + return this; + }; - return new InterceptedStream(invoker(request)); -}; + /** @override */ + InterceptedStream.prototype.cancel = function() { + this.stream.cancel(); + return this; + }; + + return new InterceptedStream(invoker(request)); + } +} diff --git a/javascript/net/grpc/web/grpcwebclientreadablestream.js b/javascript/net/grpc/web/grpcwebclientreadablestream.js index ed555f5..2807706 100644 --- a/javascript/net/grpc/web/grpcwebclientreadablestream.js +++ b/javascript/net/grpc/web/grpcwebclientreadablestream.js @@ -45,381 +45,364 @@ const {Status} = goog.require('grpc.web.Status'); -const GRPC_STATUS = "grpc-status"; -const GRPC_STATUS_MESSAGE = "grpc-message"; +const GRPC_STATUS = 'grpc-status'; +const GRPC_STATUS_MESSAGE = 'grpc-message'; /** @type {!Array} */ -const EXCLUDED_RESPONSE_HEADERS = [ - 'content-type', - GRPC_STATUS, - GRPC_STATUS_MESSAGE -]; +const EXCLUDED_RESPONSE_HEADERS = + ['content-type', GRPC_STATUS, GRPC_STATUS_MESSAGE]; /** * A stream that the client can read from. Used for calls that are streaming * from the server side. - * * @template RESPONSE - * @constructor * @implements {ClientReadableStream} * @final - * @param {!GenericTransportInterface} genericTransportInterface The - * GenericTransportInterface + * @unrestricted */ -const GrpcWebClientReadableStream = function(genericTransportInterface) { +class GrpcWebClientReadableStream { /** - * @const - * @private - * @type {?XhrIo} The XhrIo object + * @param {!GenericTransportInterface} genericTransportInterface The + * GenericTransportInterface */ - this.xhr_ = /** @type {?XhrIo} */ (genericTransportInterface.xhr); + constructor(genericTransportInterface) { + /** + * @const + * @private + * @type {?XhrIo} The XhrIo object + */ + this.xhr_ = /** @type {?XhrIo} */ (genericTransportInterface.xhr); - /** - * @private - * @type {function(?):!RESPONSE|null} The deserialize function for the proto - */ - this.responseDeserializeFn_ = null; + /** + * @private + * @type {function(?):!RESPONSE|null} The deserialize function for the proto + */ + this.responseDeserializeFn_ = null; - /** - * @const - * @private - * @type {!Array} The list of data callbacks - */ - this.onDataCallbacks_ = []; + /** + * @const + * @private + * @type {!Array} The list of data callbacks + */ + this.onDataCallbacks_ = []; - /** - * @const - * @private - * @type {!Array} The list of status callbacks - */ - this.onStatusCallbacks_ = []; + /** + * @const + * @private + * @type {!Array} The list of status callbacks + */ + this.onStatusCallbacks_ = []; - /** - * @const - * @private - * @type {!Array} The list of metadata callbacks - */ - this.onMetadataCallbacks_ = []; + /** + * @const + * @private + * @type {!Array} The list of metadata callbacks + */ + this.onMetadataCallbacks_ = []; - /** - * @const - * @private - * @type {!Array} The list of error callbacks - */ - this.onErrorCallbacks_ = []; + /** + * @const + * @private + * @type {!Array} The list of error callbacks + */ + this.onErrorCallbacks_ = []; - /** - * @const - * @private - * @type {!Array} The list of stream end callbacks - */ - this.onEndCallbacks_ = []; + /** + * @const + * @private + * @type {!Array} The list of stream end callbacks + */ + this.onEndCallbacks_ = []; - /** - * @private - * @type {boolean} Whether the stream has been aborted - */ - this.aborted_ = false; + /** + * @private + * @type {boolean} Whether the stream has been aborted + */ + this.aborted_ = false; - /** - * @private - * @type {number} The stream parser position - */ - this.pos_ = 0; + /** + * @private + * @type {number} The stream parser position + */ + this.pos_ = 0; - /** - * @private - * @type {!GrpcWebStreamParser} The grpc-web stream parser - * @const - */ - this.parser_ = new GrpcWebStreamParser(); + /** + * @private + * @type {!GrpcWebStreamParser} The grpc-web stream parser + * @const + */ + this.parser_ = new GrpcWebStreamParser(); - var self = this; - events.listen(this.xhr_, EventType.READY_STATE_CHANGE, - function(e) { - var contentType = self.xhr_.getStreamingResponseHeader('Content-Type'); - if (!contentType) return; - contentType = contentType.toLowerCase(); + var self = this; + events.listen(this.xhr_, EventType.READY_STATE_CHANGE, function(e) { + var contentType = self.xhr_.getStreamingResponseHeader('Content-Type'); + if (!contentType) return; + contentType = contentType.toLowerCase(); - if (googString.startsWith(contentType, 'application/grpc-web-text')) { - var responseText = self.xhr_.getResponseText(); - var newPos = responseText.length - responseText.length % 4; - var newData = responseText.substr(self.pos_, newPos - self.pos_); - if (newData.length == 0) return; - self.pos_ = newPos; - var byteSource = googCrypt.decodeStringToUint8Array(newData); - } else if (googString.startsWith(contentType, 'application/grpc')) { - var byteSource = new Uint8Array( - /** @type {!ArrayBuffer} */ (self.xhr_.getResponse())); - } else { - return; - } - var messages = self.parser_.parse(byteSource); - if (messages) { - var FrameType = GrpcWebStreamParser.FrameType; - for (var i = 0; i < messages.length; i++) { - if (FrameType.DATA in messages[i]) { - var data = messages[i][FrameType.DATA]; - if (data) { - var response = self.responseDeserializeFn_(data); - if (response) { - self.sendDataCallbacks_(response); - } - } - } - if (FrameType.TRAILER in messages[i]) { - if (messages[i][FrameType.TRAILER].length > 0) { - var trailerString = ''; - for (var pos = 0; pos < messages[i][FrameType.TRAILER].length; - pos++) { - trailerString += String.fromCharCode( - messages[i][FrameType.TRAILER][pos]); - } - var trailers = self.parseHttp1Headers_(trailerString); - var grpcStatusCode = StatusCode.OK; - var grpcStatusMessage = ''; - if (GRPC_STATUS in trailers) { - grpcStatusCode = trailers[GRPC_STATUS]; - delete trailers[GRPC_STATUS]; - } - if (GRPC_STATUS_MESSAGE in trailers) { - grpcStatusMessage = trailers[GRPC_STATUS_MESSAGE]; - delete trailers[GRPC_STATUS_MESSAGE]; - } - self.sendStatusCallbacks_(/** @type {!Status} */({ - code: Number(grpcStatusCode), - details: grpcStatusMessage, - metadata: trailers, - })); - } - } - } - } - }); - - events.listen(this.xhr_, EventType.COMPLETE, function(e) { - var lastErrorCode = self.xhr_.getLastErrorCode(); - var grpcStatusCode; - var grpcStatusMessage = ''; - var initialMetadata = /** @type {!Metadata} */ ({}); - - var responseHeaders = self.xhr_.getResponseHeaders(); - Object.keys(responseHeaders).forEach((header_) => { - if (!(EXCLUDED_RESPONSE_HEADERS.includes(header_))) { - initialMetadata[header_] = responseHeaders[header_]; - } - }); - self.sendMetadataCallbacks_(initialMetadata); - - // There's an XHR level error - if (lastErrorCode != ErrorCode.NO_ERROR) { - switch (lastErrorCode) { - case ErrorCode.ABORT: - grpcStatusCode = StatusCode.ABORTED; - break; - case ErrorCode.TIMEOUT: - grpcStatusCode = StatusCode.DEADLINE_EXCEEDED; - break; - case ErrorCode.HTTP_ERROR: - grpcStatusCode = StatusCode.fromHttpStatus(self.xhr_.getStatus()); - break; - default: - grpcStatusCode = StatusCode.UNAVAILABLE; - } - if (grpcStatusCode == StatusCode.ABORTED && self.aborted_) { + if (googString.startsWith(contentType, 'application/grpc-web-text')) { + var responseText = self.xhr_.getResponseText(); + var newPos = responseText.length - responseText.length % 4; + var newData = responseText.substr(self.pos_, newPos - self.pos_); + if (newData.length == 0) return; + self.pos_ = newPos; + var byteSource = googCrypt.decodeStringToUint8Array(newData); + } else if (googString.startsWith(contentType, 'application/grpc')) { + var byteSource = new Uint8Array( + /** @type {!ArrayBuffer} */ (self.xhr_.getResponse())); + } else { return; } - self.sendErrorCallbacks_({ - code: grpcStatusCode, - message: ErrorCode.getDebugMessage(lastErrorCode) + var messages = self.parser_.parse(byteSource); + if (messages) { + var FrameType = GrpcWebStreamParser.FrameType; + for (var i = 0; i < messages.length; i++) { + if (FrameType.DATA in messages[i]) { + var data = messages[i][FrameType.DATA]; + if (data) { + var response = self.responseDeserializeFn_(data); + if (response) { + self.sendDataCallbacks_(response); + } + } + } + if (FrameType.TRAILER in messages[i]) { + if (messages[i][FrameType.TRAILER].length > 0) { + var trailerString = ''; + for (var pos = 0; pos < messages[i][FrameType.TRAILER].length; + pos++) { + trailerString += + String.fromCharCode(messages[i][FrameType.TRAILER][pos]); + } + var trailers = self.parseHttp1Headers_(trailerString); + var grpcStatusCode = StatusCode.OK; + var grpcStatusMessage = ''; + if (GRPC_STATUS in trailers) { + grpcStatusCode = trailers[GRPC_STATUS]; + delete trailers[GRPC_STATUS]; + } + if (GRPC_STATUS_MESSAGE in trailers) { + grpcStatusMessage = trailers[GRPC_STATUS_MESSAGE]; + delete trailers[GRPC_STATUS_MESSAGE]; + } + self.sendStatusCallbacks_(/** @type {!Status} */ ({ + code: Number(grpcStatusCode), + details: grpcStatusMessage, + metadata: trailers, + })); + } + } + } + } + }); + + events.listen(this.xhr_, EventType.COMPLETE, function(e) { + var lastErrorCode = self.xhr_.getLastErrorCode(); + var grpcStatusCode; + var grpcStatusMessage = ''; + var initialMetadata = /** @type {!Metadata} */ ({}); + + var responseHeaders = self.xhr_.getResponseHeaders(); + Object.keys(responseHeaders).forEach((header_) => { + if (!(EXCLUDED_RESPONSE_HEADERS.includes(header_))) { + initialMetadata[header_] = responseHeaders[header_]; + } }); - return; - } + self.sendMetadataCallbacks_(initialMetadata); - var errorEmitted = false; - - // Check whethere there are grpc specific response headers - if (GRPC_STATUS in responseHeaders) { - grpcStatusCode = self.xhr_.getResponseHeader(GRPC_STATUS); - if (GRPC_STATUS_MESSAGE in responseHeaders) { - grpcStatusMessage = self.xhr_.getResponseHeader(GRPC_STATUS_MESSAGE); - } - if (Number(grpcStatusCode) != StatusCode.OK) { + // There's an XHR level error + if (lastErrorCode != ErrorCode.NO_ERROR) { + switch (lastErrorCode) { + case ErrorCode.ABORT: + grpcStatusCode = StatusCode.ABORTED; + break; + case ErrorCode.TIMEOUT: + grpcStatusCode = StatusCode.DEADLINE_EXCEEDED; + break; + case ErrorCode.HTTP_ERROR: + grpcStatusCode = StatusCode.fromHttpStatus(self.xhr_.getStatus()); + break; + default: + grpcStatusCode = StatusCode.UNAVAILABLE; + } + if (grpcStatusCode == StatusCode.ABORTED && self.aborted_) { + return; + } self.sendErrorCallbacks_({ - code: Number(grpcStatusCode), - message: grpcStatusMessage, - metadata: responseHeaders + code: grpcStatusCode, + message: ErrorCode.getDebugMessage(lastErrorCode) }); - errorEmitted = true; + return; } + + var errorEmitted = false; + + // Check whethere there are grpc specific response headers + if (GRPC_STATUS in responseHeaders) { + grpcStatusCode = self.xhr_.getResponseHeader(GRPC_STATUS); + if (GRPC_STATUS_MESSAGE in responseHeaders) { + grpcStatusMessage = self.xhr_.getResponseHeader(GRPC_STATUS_MESSAGE); + } + if (Number(grpcStatusCode) != StatusCode.OK) { + self.sendErrorCallbacks_({ + code: Number(grpcStatusCode), + message: grpcStatusMessage, + metadata: responseHeaders + }); + errorEmitted = true; + } + if (!errorEmitted) { + self.sendStatusCallbacks_(/** @type {!Status} */ ({ + code: Number(grpcStatusCode), + details: grpcStatusMessage, + metadata: responseHeaders + })); + } + } + if (!errorEmitted) { - self.sendStatusCallbacks_(/** @type {!Status} */ ({ - code: Number(grpcStatusCode), - details: grpcStatusMessage, - metadata: responseHeaders - })); + self.sendEndCallbacks_(); } + }); + } + + /** + * @override + * @export + */ + on(eventType, callback) { + // TODO(stanleycheung): change eventType to @enum type + if (eventType == 'data') { + this.onDataCallbacks_.push(callback); + } else if (eventType == 'status') { + this.onStatusCallbacks_.push(callback); + } else if (eventType == 'metadata') { + this.onMetadataCallbacks_.push(callback); + } else if (eventType == 'end') { + this.onEndCallbacks_.push(callback); + } else if (eventType == 'error') { + this.onErrorCallbacks_.push(callback); } + return this; + } - if (!errorEmitted) { - self.sendEndCallbacks_(); + /** + * @private + * @param {!Array} callbacks the internal list of callbacks + * @param {function(?)} callback the callback to remove + */ + removeListenerFromCallbacks_(callbacks, callback) { + const index = callbacks.indexOf(callback); + if (index > -1) { + callbacks.splice(index, 1); } - }); -}; - - -/** - * @override - * @export - */ -GrpcWebClientReadableStream.prototype.on = function( - eventType, callback) { - // TODO(stanleycheung): change eventType to @enum type - if (eventType == 'data') { - this.onDataCallbacks_.push(callback); - } else if (eventType == 'status') { - this.onStatusCallbacks_.push(callback); - } else if (eventType == 'metadata') { - this.onMetadataCallbacks_.push(callback); - } else if (eventType == 'end') { - this.onEndCallbacks_.push(callback); - } else if (eventType == 'error') { - this.onErrorCallbacks_.push(callback); } - return this; -}; - -/** - * @private - * @param {!Array} callbacks the internal list of callbacks - * @param {function(?)} callback the callback to remove - */ -GrpcWebClientReadableStream.prototype.removeListenerFromCallbacks_ = function( - callbacks, callback) { - const index = callbacks.indexOf(callback); - if (index > -1) { - callbacks.splice(index, 1); + /** + * @export + * @override + */ + removeListener(eventType, callback) { + if (eventType == 'data') { + this.removeListenerFromCallbacks_(this.onDataCallbacks_, callback); + } else if (eventType == 'status') { + this.removeListenerFromCallbacks_(this.onStatusCallbacks_, callback); + } else if (eventType == 'metadata') { + this.removeListenerFromCallbacks_(this.onMetadataCallbacks_, callback); + } else if (eventType == 'end') { + this.removeListenerFromCallbacks_(this.onEndCallbacks_, callback); + } else if (eventType == 'error') { + this.removeListenerFromCallbacks_(this.onErrorCallbacks_, callback); + } + return this; } -}; - -/** - * @export - * @override - */ -GrpcWebClientReadableStream.prototype.removeListener = function( - eventType, callback) { - if (eventType == 'data') { - this.removeListenerFromCallbacks_(this.onDataCallbacks_, callback); - } else if (eventType == 'status') { - this.removeListenerFromCallbacks_(this.onStatusCallbacks_, callback); - } else if (eventType == 'metadata') { - this.removeListenerFromCallbacks_(this.onMetadataCallbacks_, callback); - } else if (eventType == 'end') { - this.removeListenerFromCallbacks_(this.onEndCallbacks_, callback); - } else if (eventType == 'error') { - this.removeListenerFromCallbacks_(this.onErrorCallbacks_, callback); + /** + * Register a callbackl to parse the response + * + * @param {function(?):!RESPONSE} responseDeserializeFn The deserialize + * function for the proto + */ + setResponseDeserializeFn(responseDeserializeFn) { + this.responseDeserializeFn_ = responseDeserializeFn; } - return this; -}; - -/** - * Register a callbackl to parse the response - * - * @param {function(?):!RESPONSE} responseDeserializeFn The deserialize - * function for the proto - */ -GrpcWebClientReadableStream.prototype.setResponseDeserializeFn = - function(responseDeserializeFn) { - this.responseDeserializeFn_ = responseDeserializeFn; -}; - - -/** - * @override - * @export - */ -GrpcWebClientReadableStream.prototype.cancel = function() { - this.aborted_ = true; - this.xhr_.abort(); -}; - - -/** - * Parse HTTP headers - * - * @private - * @param {string} str The raw http header string - * @return {!Object} The header:value pairs - */ -GrpcWebClientReadableStream.prototype.parseHttp1Headers_ = - function(str) { - var chunks = str.trim().split("\r\n"); - var headers = {}; - for (var i = 0; i < chunks.length; i++) { - var pos = chunks[i].indexOf(":"); - headers[chunks[i].substring(0, pos).trim()] = - chunks[i].substring(pos+1).trim(); + /** + * @override + * @export + */ + cancel() { + this.aborted_ = true; + this.xhr_.abort(); } - return headers; -}; - -/** - * @private - * @param {!RESPONSE} data The data to send back - */ -GrpcWebClientReadableStream.prototype.sendDataCallbacks_ = function(data) { - for (var i = 0; i < this.onDataCallbacks_.length; i++) { - this.onDataCallbacks_[i](data); + /** + * Parse HTTP headers + * + * @private + * @param {string} str The raw http header string + * @return {!Object} The header:value pairs + */ + parseHttp1Headers_(str) { + var chunks = str.trim().split('\r\n'); + var headers = {}; + for (var i = 0; i < chunks.length; i++) { + var pos = chunks[i].indexOf(':'); + headers[chunks[i].substring(0, pos).trim()] = + chunks[i].substring(pos + 1).trim(); + } + return headers; } -}; - -/** - * @private - * @param {!Status} status The status to send back - */ -GrpcWebClientReadableStream.prototype.sendStatusCallbacks_ = function(status) { - for (var i = 0; i < this.onStatusCallbacks_.length; i++) { - this.onStatusCallbacks_[i](status); + /** + * @private + * @param {!RESPONSE} data The data to send back + */ + sendDataCallbacks_(data) { + for (var i = 0; i < this.onDataCallbacks_.length; i++) { + this.onDataCallbacks_[i](data); + } } -}; - -/** - * @private - * @param {!Metadata} metadata The metadata to send back - */ -GrpcWebClientReadableStream.prototype.sendMetadataCallbacks_ = - function(metadata) { - for (var i = 0; i < this.onMetadataCallbacks_.length; i++) { - this.onMetadataCallbacks_[i](metadata); + /** + * @private + * @param {!Status} status The status to send back + */ + sendStatusCallbacks_(status) { + for (var i = 0; i < this.onStatusCallbacks_.length; i++) { + this.onStatusCallbacks_[i](status); + } } -}; - -/** - * @private - * @param {?} error The error to send back - */ -GrpcWebClientReadableStream.prototype.sendErrorCallbacks_ = function(error) { - for (var i = 0; i < this.onErrorCallbacks_.length; i++) { - this.onErrorCallbacks_[i](error); + /** + * @private + * @param {!Metadata} metadata The metadata to send back + */ + sendMetadataCallbacks_(metadata) { + for (var i = 0; i < this.onMetadataCallbacks_.length; i++) { + this.onMetadataCallbacks_[i](metadata); + } } -}; - -/** - * @private - */ -GrpcWebClientReadableStream.prototype.sendEndCallbacks_ = function() { - for (var i = 0; i < this.onEndCallbacks_.length; i++) { - this.onEndCallbacks_[i](); + /** + * @private + * @param {?} error The error to send back + */ + sendErrorCallbacks_(error) { + for (var i = 0; i < this.onErrorCallbacks_.length; i++) { + this.onErrorCallbacks_[i](error); + } } -}; + + /** + * @private + */ + sendEndCallbacks_() { + for (var i = 0; i < this.onEndCallbacks_.length; i++) { + this.onEndCallbacks_[i](); + } + } +} + exports = GrpcWebClientReadableStream; diff --git a/javascript/net/grpc/web/grpcwebstreamparser.js b/javascript/net/grpc/web/grpcwebstreamparser.js index 8ed2180..76c4c5c 100644 --- a/javascript/net/grpc/web/grpcwebstreamparser.js +++ b/javascript/net/grpc/web/grpcwebstreamparser.js @@ -49,68 +49,199 @@ const asserts = goog.require('goog.asserts'); /** * The default grpc-web stream parser. - * - * @constructor - * @struct * @implements {StreamParser} * @final */ -const GrpcWebStreamParser = function() { - /** - * The current error message, if any. - * @private {?string} - */ - this.errorMessage_ = null; +class GrpcWebStreamParser { + constructor() { + /** + * The current error message, if any. + * @private {?string} + */ + this.errorMessage_ = null; + + /** + * The currently buffered result (parsed messages). + * @private {!Array} + */ + this.result_ = []; + + /** + * The current position in the streamed data. + * @private {number} + */ + this.streamPos_ = 0; + + /** + * The current parser state. + * @private {number} + */ + this.state_ = Parser.State_.INIT; + + /** + * The current frame byte being parsed + * @private {number} + */ + this.frame_ = 0; + + /** + * The length of the proto message being parsed. + * @private {number} + */ + this.length_ = 0; + + /** + * Count of processed length bytes. + * @private {number} + */ + this.countLengthBytes_ = 0; + + /** + * Raw bytes of the current message. Uses Uint8Array by default. Falls back + * to native array when Uint8Array is unsupported. + * @private {?Uint8Array|?Array} + */ + this.messageBuffer_ = null; + + /** + * Count of processed message bytes. + * @private {number} + */ + this.countMessageBytes_ = 0; + } /** - * The currently buffered result (parsed messages). - * @private {!Array} + * @override */ - this.result_ = []; + isInputValid() { + return this.state_ != Parser.State_.INVALID; + } /** - * The current position in the streamed data. - * @private {number} + * @override */ - this.streamPos_ = 0; + getErrorMessage() { + return this.errorMessage_; + } /** - * The current parser state. - * @private {number} + * Parse the new input. + * + * Note that there is no Parser state to indicate the end of a stream. + * + * @param {string|!ArrayBuffer|!Uint8Array|!Array} input The input + * data + * @throws {!Error} Throws an error message if the input is invalid. + * @return {?Array} any parsed objects (atomic messages) + * in an array, or null if more data needs be read to parse any new object. + * @override */ - this.state_ = Parser.State_.INIT; + parse(input) { + asserts.assert( + input instanceof Array || input instanceof ArrayBuffer || + input instanceof Uint8Array); - /** - * The current frame byte being parsed - * @private {number} - */ - this.frame_ = 0; + var parser = this; + var inputBytes; + var pos = 0; - /** - * The length of the proto message being parsed. - * @private {number} - */ - this.length_ = 0; + if (input instanceof Uint8Array || input instanceof Array) { + inputBytes = input; + } else { + inputBytes = new Uint8Array(input); + } - /** - * Count of processed length bytes. - * @private {number} - */ - this.countLengthBytes_ = 0; + while (pos < inputBytes.length) { + switch (parser.state_) { + case Parser.State_.INVALID: { + parser.error_(inputBytes, pos, 'stream already broken'); + break; + } + case Parser.State_.INIT: { + processFrameByte(inputBytes[pos]); + break; + } + case Parser.State_.LENGTH: { + processLengthByte(inputBytes[pos]); + break; + } + case Parser.State_.MESSAGE: { + processMessageByte(inputBytes[pos]); + break; + } + default: { + throw new Error('unexpected parser state: ' + parser.state_); + } + } - /** - * Raw bytes of the current message. Uses Uint8Array by default. Falls back to - * native array when Uint8Array is unsupported. - * @private {?Uint8Array|?Array} - */ - this.messageBuffer_ = null; + parser.streamPos_++; + pos++; + } - /** - * Count of processed message bytes. - * @private {number} - */ - this.countMessageBytes_ = 0; -}; + var msgs = parser.result_; + parser.result_ = []; + return msgs.length > 0 ? msgs : null; + + /** + * @param {number} b A frame byte to process + */ + function processFrameByte(b) { + if (b == FrameType.DATA) { + parser.frame_ = b; + } else if (b == FrameType.TRAILER) { + parser.frame_ = b; + } else { + parser.error_(inputBytes, pos, 'invalid frame byte'); + } + + parser.state_ = Parser.State_.LENGTH; + parser.length_ = 0; + parser.countLengthBytes_ = 0; + } + + /** + * @param {number} b A length byte to process + */ + function processLengthByte(b) { + parser.countLengthBytes_++; + parser.length_ = (parser.length_ << 8) + b; + + if (parser.countLengthBytes_ == 4) { // no more length byte + parser.state_ = Parser.State_.MESSAGE; + parser.countMessageBytes_ = 0; + if (typeof Uint8Array !== 'undefined') { + parser.messageBuffer_ = new Uint8Array(parser.length_); + } else { + parser.messageBuffer_ = new Array(parser.length_); + } + + if (parser.length_ == 0) { // empty message + finishMessage(); + } + } + } + + /** + * @param {number} b A message byte to process + */ + function processMessageByte(b) { + parser.messageBuffer_[parser.countMessageBytes_++] = b; + if (parser.countMessageBytes_ == parser.length_) { + finishMessage(); + } + } + + /** + * Finishes up building the current message and resets parser state + */ + function finishMessage() { + var message = {}; + message[parser.frame_] = parser.messageBuffer_; + parser.result_.push(message); + parser.state_ = Parser.State_.INIT; + } + } +} const Parser = GrpcWebStreamParser; @@ -133,29 +264,14 @@ Parser.State_ = { * @enum {number} */ GrpcWebStreamParser.FrameType = { - DATA: 0x00, // expecting a data frame - TRAILER: 0x80, // expecting a trailer frame + DATA: 0x00, // expecting a data frame + TRAILER: 0x80, // expecting a trailer frame }; var FrameType = GrpcWebStreamParser.FrameType; -/** - * @override - */ -GrpcWebStreamParser.prototype.isInputValid = function() { - return this.state_ != Parser.State_.INVALID; -}; - - -/** - * @override - */ -GrpcWebStreamParser.prototype.getErrorMessage = function() { - return this.errorMessage_; -}; - /** * @param {!Uint8Array|!Array} inputBytes The current input buffer @@ -174,119 +290,5 @@ Parser.prototype.error_ = function(inputBytes, pos, errorMsg) { }; -/** - * Parse the new input. - * - * Note that there is no Parser state to indicate the end of a stream. - * - * @param {string|!ArrayBuffer|!Uint8Array|!Array} input The input data - * @throws {!Error} Throws an error message if the input is invalid. - * @return {?Array} any parsed objects (atomic messages) - * in an array, or null if more data needs be read to parse any new object. - * @override - */ -GrpcWebStreamParser.prototype.parse = function(input) { - asserts.assert(input instanceof Array || input instanceof ArrayBuffer || input instanceof Uint8Array); - - var parser = this; - var inputBytes; - var pos = 0; - - if (input instanceof Uint8Array || input instanceof Array) { - inputBytes = input; - } else { - inputBytes = new Uint8Array(input); - } - - while (pos < inputBytes.length) { - switch (parser.state_) { - case Parser.State_.INVALID: { - parser.error_(inputBytes, pos, 'stream already broken'); - break; - } - case Parser.State_.INIT: { - processFrameByte(inputBytes[pos]); - break; - } - case Parser.State_.LENGTH: { - processLengthByte(inputBytes[pos]); - break; - } - case Parser.State_.MESSAGE: { - processMessageByte(inputBytes[pos]); - break; - } - default: { throw new Error('unexpected parser state: ' + parser.state_); } - } - - parser.streamPos_++; - pos++; - } - - var msgs = parser.result_; - parser.result_ = []; - return msgs.length > 0 ? msgs : null; - - /** - * @param {number} b A frame byte to process - */ - function processFrameByte(b) { - if (b == FrameType.DATA) { - parser.frame_ = b; - } else if (b == FrameType.TRAILER) { - parser.frame_ = b; - } else { - parser.error_(inputBytes, pos, 'invalid frame byte'); - } - - parser.state_ = Parser.State_.LENGTH; - parser.length_ = 0; - parser.countLengthBytes_ = 0; - } - - /** - * @param {number} b A length byte to process - */ - function processLengthByte(b) { - parser.countLengthBytes_++; - parser.length_ = (parser.length_ << 8) + b; - - if (parser.countLengthBytes_ == 4) { // no more length byte - parser.state_ = Parser.State_.MESSAGE; - parser.countMessageBytes_ = 0; - if (typeof Uint8Array !== 'undefined') { - parser.messageBuffer_ = new Uint8Array(parser.length_); - } else { - parser.messageBuffer_ = new Array(parser.length_); - } - - if (parser.length_ == 0) { // empty message - finishMessage(); - } - } - } - - /** - * @param {number} b A message byte to process - */ - function processMessageByte(b) { - parser.messageBuffer_[parser.countMessageBytes_++] = b; - if (parser.countMessageBytes_ == parser.length_) { - finishMessage(); - } - } - - /** - * Finishes up building the current message and resets parser state - */ - function finishMessage() { - var message = {}; - message[parser.frame_] = parser.messageBuffer_; - parser.result_.push(message); - parser.state_ = Parser.State_.INIT; - } -}; - - exports = GrpcWebStreamParser; diff --git a/javascript/net/grpc/web/methoddescriptor.js b/javascript/net/grpc/web/methoddescriptor.js index bfdd318..f16e069 100644 --- a/javascript/net/grpc/web/methoddescriptor.js +++ b/javascript/net/grpc/web/methoddescriptor.js @@ -13,44 +13,46 @@ const MethodType = goog.require('grpc.web.MethodType'); const Request = goog.require('grpc.web.Request'); const RequestInternal = goog.require('grpc.web.RequestInternal'); -/** - * @constructor - * @struct - * @template REQUEST, RESPONSE - * @param {string} name - * @param {!MethodType} methodType - * @param {function(new: REQUEST, ...)} requestType - * @param {function(new: RESPONSE, ...)} responseType - * @param {function(REQUEST): ?} requestSerializeFn - * @param {function(?): RESPONSE} responseDeserializeFn - */ -const MethodDescriptor = function( - name, methodType, requestType, responseType, requestSerializeFn, - responseDeserializeFn) { - /** @const */ - this.name = name; - /** @const */ - this.methodType = methodType; - /** @const */ - this.requestType = requestType; - /** @const */ - this.responseType = responseType; - /** @const */ - this.requestSerializeFn = requestSerializeFn; - /** @const */ - this.responseDeserializeFn = responseDeserializeFn; -}; +/** @template REQUEST, RESPONSE */ +class MethodDescriptor { + /** + * @param {string} name + * @param {!MethodType} methodType + * @param {function(new: REQUEST, ...)} requestType + * @param {function(new: RESPONSE, ...)} responseType + * @param {function(REQUEST): ?} requestSerializeFn + * @param {function(?): RESPONSE} responseDeserializeFn + */ + constructor( + name, methodType, requestType, responseType, requestSerializeFn, + responseDeserializeFn) { + /** @const */ + this.name = name; + /** @const */ + this.methodType = methodType; + /** @const */ + this.requestType = requestType; + /** @const */ + this.responseType = responseType; + /** @const */ + this.requestSerializeFn = requestSerializeFn; + /** @const */ + this.responseDeserializeFn = responseDeserializeFn; + } + + /** + * @template REQUEST, RESPONSE + * @param {REQUEST} requestMessage + * @param {!Metadata=} metadata + * @param {!CallOptions=} callOptions + * @return {!Request} + */ + createRequest( + requestMessage, metadata = {}, callOptions = new CallOptions()) { + return new RequestInternal(requestMessage, this, metadata, callOptions); + } +} + -/** - * @template REQUEST, RESPONSE - * @param {REQUEST} requestMessage - * @param {!Metadata=} metadata - * @param {!CallOptions=} callOptions - * @return {!Request} - */ -MethodDescriptor.prototype.createRequest = function( - requestMessage, metadata = {}, callOptions = new CallOptions()) { - return new RequestInternal(requestMessage, this, metadata, callOptions); -}; exports = MethodDescriptor; diff --git a/javascript/net/grpc/web/streambodyclientreadablestream.js b/javascript/net/grpc/web/streambodyclientreadablestream.js index 6842801..0513e86 100644 --- a/javascript/net/grpc/web/streambodyclientreadablestream.js +++ b/javascript/net/grpc/web/streambodyclientreadablestream.js @@ -44,256 +44,247 @@ const {Status} = goog.require('grpc.web.Status'); /** * A stream that the client can read from. Used for calls that are streaming * from the server side. - * * @template RESPONSE - * @constructor * @implements {ClientReadableStream} * @final - * @param {!GenericTransportInterface} genericTransportInterface The - * GenericTransportInterface + * @unrestricted */ -const StreamBodyClientReadableStream = function(genericTransportInterface) { +class StreamBodyClientReadableStream { /** - * @const - * @private - * @type {?NodeReadableStream|undefined} The XHR Node Readable Stream + * @param {!GenericTransportInterface} genericTransportInterface The + * GenericTransportInterface */ - this.xhrNodeReadableStream_ = genericTransportInterface.nodeReadableStream; + constructor(genericTransportInterface) { + /** + * @const + * @private + * @type {?NodeReadableStream|undefined} The XHR Node Readable Stream + */ + this.xhrNodeReadableStream_ = genericTransportInterface.nodeReadableStream; + + /** + * @private + * @type {function(?): RESPONSE|null} The deserialize function for the proto + */ + this.responseDeserializeFn_ = null; + + /** + * @const + * @private + * @type {?XhrIo|undefined} The XhrIo object + */ + this.xhr_ = genericTransportInterface.xhr; + + /** + * @const + * @private + * @type {!Array} The list of data callback + */ + this.onDataCallbacks_ = []; + + /** + * @const + * @private + * @type {!Array} The list of status callback + */ + this.onStatusCallbacks_ = []; + + /** + * @const + * @private + * @type {!Array} The list of stream end callback + */ + this.onEndCallbacks_ = []; + + /** + * @const + * @private + * @type {!Array} The list of error callback + */ + this.onErrorCallbacks_ = []; + + /** + * @private + * @type {function(?):!Status|null} + * A function to parse the Rpc Status response + */ + this.rpcStatusParseFn_ = null; + + this.setStreamCallback_(); + } /** * @private - * @type {function(?): RESPONSE|null} The deserialize function for the proto */ - this.responseDeserializeFn_ = null; - - /** - * @const - * @private - * @type {?XhrIo|undefined} The XhrIo object - */ - this.xhr_ = genericTransportInterface.xhr; - - /** - * @const - * @private - * @type {!Array} The list of data callback - */ - this.onDataCallbacks_ = []; - - /** - * @const - * @private - * @type {!Array} The list of status callback - */ - this.onStatusCallbacks_ = []; - - /** - * @const - * @private - * @type {!Array} The list of stream end callback - */ - this.onEndCallbacks_ = []; - - /** - * @const - * @private - * @type {!Array} The list of error callback - */ - this.onErrorCallbacks_ = []; - - /** - * @private - * @type {function(?):!Status|null} - * A function to parse the Rpc Status response - */ - this.rpcStatusParseFn_ = null; - - this.setStreamCallback_(); - -}; - -/** - * @private - */ -StreamBodyClientReadableStream.prototype.setStreamCallback_ = function() { - // Add the callback to the underlying stream - var self = this; - this.xhrNodeReadableStream_.on('data', function(data) { - if ('1' in data) { - var response = self.responseDeserializeFn_(data['1']); - self.sendDataCallbacks_(response); - } - if ('2' in data) { - var status = self.rpcStatusParseFn_(data['2']); - self.sendStatusCallbacks_(status); - } - }); - this.xhrNodeReadableStream_.on('end', function() { - self.sendEndCallbacks_(); - }); - this.xhrNodeReadableStream_.on('error', function() { - if (self.onErrorCallbacks_.length == 0) return; - var lastErrorCode = self.xhr_.getLastErrorCode(); - if (lastErrorCode === ErrorCode.NO_ERROR && !self.xhr_.isSuccess()) { - // The lastErrorCode on the XHR isn't useful in this case, but the XHR - // status is. Full details about the failure should be available in the - // status handler. - lastErrorCode = ErrorCode.HTTP_ERROR; - } - - var grpcStatusCode; - switch (lastErrorCode) { - case ErrorCode.NO_ERROR: - grpcStatusCode = StatusCode.UNKNOWN; - break; - case ErrorCode.ABORT: - grpcStatusCode = StatusCode.ABORTED; - break; - case ErrorCode.TIMEOUT: - grpcStatusCode = StatusCode.DEADLINE_EXCEEDED; - break; - case ErrorCode.HTTP_ERROR: - grpcStatusCode = StatusCode.fromHttpStatus(self.xhr_.getStatus()); - break; - default: - grpcStatusCode = StatusCode.UNAVAILABLE; - } - - self.sendErrorCallbacks_({ - code: grpcStatusCode, - // TODO(armiller): get the message from the response? - // GoogleRpcStatus.deserialize(rawResponse).getMessage()? - // Perhaps do the same status logic as in on('data') above? - message: ErrorCode.getDebugMessage(lastErrorCode) + setStreamCallback_() { + // Add the callback to the underlying stream + var self = this; + this.xhrNodeReadableStream_.on('data', function(data) { + if ('1' in data) { + var response = self.responseDeserializeFn_(data['1']); + self.sendDataCallbacks_(response); + } + if ('2' in data) { + var status = self.rpcStatusParseFn_(data['2']); + self.sendStatusCallbacks_(status); + } }); - }); -}; + this.xhrNodeReadableStream_.on('end', function() { + self.sendEndCallbacks_(); + }); + this.xhrNodeReadableStream_.on('error', function() { + if (self.onErrorCallbacks_.length == 0) return; + var lastErrorCode = self.xhr_.getLastErrorCode(); + if (lastErrorCode === ErrorCode.NO_ERROR && !self.xhr_.isSuccess()) { + // The lastErrorCode on the XHR isn't useful in this case, but the XHR + // status is. Full details about the failure should be available in the + // status handler. + lastErrorCode = ErrorCode.HTTP_ERROR; + } -/** - * @override - * @export - */ -StreamBodyClientReadableStream.prototype.on = function( - eventType, callback) { - // TODO(stanleycheung): change eventType to @enum type - if (eventType == 'data') { - this.onDataCallbacks_.push(callback); - } else if (eventType == 'status') { - this.onStatusCallbacks_.push(callback); - } else if (eventType == 'end') { - this.onEndCallbacks_.push(callback); - } else if (eventType == 'error') { - this.onErrorCallbacks_.push(callback); + var grpcStatusCode; + switch (lastErrorCode) { + case ErrorCode.NO_ERROR: + grpcStatusCode = StatusCode.UNKNOWN; + break; + case ErrorCode.ABORT: + grpcStatusCode = StatusCode.ABORTED; + break; + case ErrorCode.TIMEOUT: + grpcStatusCode = StatusCode.DEADLINE_EXCEEDED; + break; + case ErrorCode.HTTP_ERROR: + grpcStatusCode = StatusCode.fromHttpStatus(self.xhr_.getStatus()); + break; + default: + grpcStatusCode = StatusCode.UNAVAILABLE; + } + + self.sendErrorCallbacks_({ + code: grpcStatusCode, + // TODO(armiller): get the message from the response? + // GoogleRpcStatus.deserialize(rawResponse).getMessage()? + // Perhaps do the same status logic as in on('data') above? + message: ErrorCode.getDebugMessage(lastErrorCode) + }); + }); } - return this; -}; - -/** - * @private - * @param {!Array} callbacks the internal list of callbacks - * @param {function(?)} callback the callback to remove - */ -StreamBodyClientReadableStream.prototype.removeListenerFromCallbacks_ = function( - callbacks, callback) { - const index = callbacks.indexOf(callback); - if (index > -1) { - callbacks.splice(index, 1); + /** + * @override + * @export + */ + on(eventType, callback) { + // TODO(stanleycheung): change eventType to @enum type + if (eventType == 'data') { + this.onDataCallbacks_.push(callback); + } else if (eventType == 'status') { + this.onStatusCallbacks_.push(callback); + } else if (eventType == 'end') { + this.onEndCallbacks_.push(callback); + } else if (eventType == 'error') { + this.onErrorCallbacks_.push(callback); + } + return this; } -}; - -/** - * @export - * @override - */ -StreamBodyClientReadableStream.prototype.removeListener = function( - eventType, callback) { - if (eventType == 'data') { - this.removeListenerFromCallbacks_(this.onDataCallbacks_, callback); - } else if (eventType == 'status') { - this.removeListenerFromCallbacks_(this.onStatusCallbacks_, callback); - } else if (eventType == 'end') { - this.removeListenerFromCallbacks_(this.onEndCallbacks_, callback); - } else if (eventType == 'error') { - this.removeListenerFromCallbacks_(this.onErrorCallbacks_, callback); + /** + * @private + * @param {!Array} callbacks the internal list of callbacks + * @param {function(?)} callback the callback to remove + */ + removeListenerFromCallbacks_(callbacks, callback) { + const index = callbacks.indexOf(callback); + if (index > -1) { + callbacks.splice(index, 1); + } } - return this; -}; - -/** - * Register a callbackl to parse the response - * - * @param {function(?): RESPONSE} responseDeserializeFn The deserialize - * function for the proto - */ -StreamBodyClientReadableStream.prototype.setResponseDeserializeFn = - function(responseDeserializeFn) { - this.responseDeserializeFn_ = responseDeserializeFn; -}; - -/** - * Register a function to parse RPC status response - * - * @param {function(?):!Status} rpcStatusParseFn A function to parse - * the RPC status response - */ -StreamBodyClientReadableStream.prototype.setRpcStatusParseFn = function(rpcStatusParseFn) { - this.rpcStatusParseFn_ = rpcStatusParseFn; -}; - - -/** - * @override - * @export - */ -StreamBodyClientReadableStream.prototype.cancel = function() { - this.xhr_.abort(); -}; - - -/** - * @private - * @param {!RESPONSE} data The data to send back - */ -StreamBodyClientReadableStream.prototype.sendDataCallbacks_ = function(data) { - for (var i = 0; i < this.onDataCallbacks_.length; i++) { - this.onDataCallbacks_[i](data); + /** + * @export + * @override + */ + removeListener(eventType, callback) { + if (eventType == 'data') { + this.removeListenerFromCallbacks_(this.onDataCallbacks_, callback); + } else if (eventType == 'status') { + this.removeListenerFromCallbacks_(this.onStatusCallbacks_, callback); + } else if (eventType == 'end') { + this.removeListenerFromCallbacks_(this.onEndCallbacks_, callback); + } else if (eventType == 'error') { + this.removeListenerFromCallbacks_(this.onErrorCallbacks_, callback); + } + return this; } -}; - -/** - * @private - * @param {!Status} status The status to send back - */ -StreamBodyClientReadableStream.prototype.sendStatusCallbacks_ = function(status) { - for (var i = 0; i < this.onStatusCallbacks_.length; i++) { - this.onStatusCallbacks_[i](status); + /** + * Register a callbackl to parse the response + * + * @param {function(?): RESPONSE} responseDeserializeFn The deserialize + * function for the proto + */ + setResponseDeserializeFn(responseDeserializeFn) { + this.responseDeserializeFn_ = responseDeserializeFn; } -}; - -/** - * @private - * @param {?} error The error to send back - */ -StreamBodyClientReadableStream.prototype.sendErrorCallbacks_ = function(error) { - for (var i = 0; i < this.onErrorCallbacks_.length; i++) { - this.onErrorCallbacks_[i](error); + /** + * Register a function to parse RPC status response + * + * @param {function(?):!Status} rpcStatusParseFn A function to parse + * the RPC status response + */ + setRpcStatusParseFn(rpcStatusParseFn) { + this.rpcStatusParseFn_ = rpcStatusParseFn; } -}; - -/** - * @private - */ -StreamBodyClientReadableStream.prototype.sendEndCallbacks_ = function() { - for (var i = 0; i < this.onEndCallbacks_.length; i++) { - this.onEndCallbacks_[i](); + /** + * @override + * @export + */ + cancel() { + this.xhr_.abort(); } -}; + + /** + * @private + * @param {!RESPONSE} data The data to send back + */ + sendDataCallbacks_(data) { + for (var i = 0; i < this.onDataCallbacks_.length; i++) { + this.onDataCallbacks_[i](data); + } + } + + /** + * @private + * @param {!Status} status The status to send back + */ + sendStatusCallbacks_(status) { + for (var i = 0; i < this.onStatusCallbacks_.length; i++) { + this.onStatusCallbacks_[i](status); + } + } + + /** + * @private + * @param {?} error The error to send back + */ + sendErrorCallbacks_(error) { + for (var i = 0; i < this.onErrorCallbacks_.length; i++) { + this.onErrorCallbacks_[i](error); + } + } + + /** + * @private + */ + sendEndCallbacks_() { + for (var i = 0; i < this.onEndCallbacks_.length; i++) { + this.onEndCallbacks_[i](); + } + } +} + exports = StreamBodyClientReadableStream;