Migrate to ES6 classes

This commit is contained in:
Stanley Cheung 2020-06-17 10:58:32 -07:00 committed by Stanley Cheung
parent 8c96b8cee3
commit 670326124a
7 changed files with 1249 additions and 1263 deletions

View File

@ -7,53 +7,60 @@ goog.module.declareLegacyNamespace();
/**
* The collection of runtime options for a new RPC call.
* @param {!Object<string, !Object>=} options
* @constructor
* @unrestricted
*/
const CallOptions = function(options) {
class CallOptions {
/**
* @const {!Object<string, !Object>}
* @private
* @param {!Object<string, !Object>=} options
*/
this.properties_ = options || {};
};
constructor(options) {
/**
* @const {!Object<string, !Object>}
* @private
*/
this.properties_ = options || {};
}
/**
* Add a new CallOption or override an existing one.
*
* @param {string} name name of the CallOption that should be added/overridden.
* @param {VALUE} value value of the CallOption
* @template VALUE
*/
CallOptions.prototype.setOption = function(name, value) {
this.properties_[name] = value;
};
/**
* Add a new CallOption or override an existing one.
*
* @param {string} name name of the CallOption that should be
* added/overridden.
* @param {VALUE} value value of the CallOption
* @template VALUE
*/
setOption(name, value) {
this.properties_[name] = value;
}
/**
* Get the value of one CallOption.
*
* @param {string} name name of the CallOption.
* @return {!Object} value of the CallOption. If name doesn't exist, will return
* 'undefined'.
*/
CallOptions.prototype.get = function(name) {
return this.properties_[name];
};
/**
* Get the value of one CallOption.
*
* @param {string} name name of the CallOption.
* @return {!Object} value of the CallOption. If name doesn't exist, will
* return 'undefined'.
*/
get(name) {
return this.properties_[name];
}
/**
* Remove a CallOption.
*
* @param {string} name name of the CallOption that shoud be removed.
*/
removeOption(name) {
delete this.properties_[name];
}
/**
* @return {!Array<string>}
*/
getKeys() {
return Object.keys(this.properties_);
}
}
/**
* Remove a CallOption.
*
* @param {string} name name of the CallOption that shoud be removed.
*/
CallOptions.prototype.removeOption = function(name) {
delete this.properties_[name];
};
/**
* @return {!Array<string>}
*/
CallOptions.prototype.getKeys = function() {
return Object.keys(this.properties_);
};
exports = CallOptions;

View File

@ -47,322 +47,319 @@ const {StreamInterceptor, UnaryInterceptor} = goog.require('grpc.web.Interceptor
/**
* Base class for gRPC web client using the application/grpc-web wire format
* @param {?Object=} opt_options
* @constructor
* @implements {AbstractClientBase}
* @unrestricted
*/
const GrpcWebClientBase = function(opt_options) {
class GrpcWebClientBase {
/**
* @const
* @private {string}
* @param {?Object=} opt_options
*/
this.format_ =
goog.getObjectByName('format', opt_options) || "text";
constructor(opt_options) {
/**
* @const
* @private {string}
*/
this.format_ = goog.getObjectByName('format', opt_options) || 'text';
/**
* @const
* @private {boolean}
*/
this.suppressCorsPreflight_ =
goog.getObjectByName('suppressCorsPreflight', opt_options) || false;
/**
* @const
* @private {boolean}
*/
this.suppressCorsPreflight_ =
goog.getObjectByName('suppressCorsPreflight', opt_options) || false;
/**
* @const
* @private {boolean}
*/
this.withCredentials_ =
goog.getObjectByName('withCredentials', opt_options) || false;
/**
* @const {!Array<!StreamInterceptor>}
* @private
*/
this.streamInterceptors_ =
goog.getObjectByName('streamInterceptors', opt_options) || [];
/**
* @const
* @private {boolean}
*/
this.withCredentials_ =
goog.getObjectByName('withCredentials', opt_options) || false;
/**
* @const {!Array<!StreamInterceptor>}
* @private
*/
this.streamInterceptors_ =
goog.getObjectByName('streamInterceptors', opt_options) || [];
/**
* @const {!Array<!UnaryInterceptor>}
* @private
*/
this.unaryInterceptors_ =
goog.getObjectByName('unaryInterceptors', opt_options) || [];
};
/**
* @override
* @export
*/
GrpcWebClientBase.prototype.rpcCall = function(
method, requestMessage, metadata, methodDescriptor, callback) {
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.UNARY, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
var invoker = GrpcWebClientBase.runInterceptors_(
(request) => this.startStream_(request, hostname),
this.streamInterceptors_);
var stream = /** @type {!ClientReadableStream<?>} */ (invoker.call(
this, methodDescriptor.createRequest(requestMessage, metadata)));
GrpcWebClientBase.setCallback_(stream, callback, false);
return stream;
};
/**
* @override
* @export
*/
GrpcWebClientBase.prototype.thenableCall = function(
method, requestMessage, metadata, methodDescriptor) {
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.UNARY, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
var initialInvoker = (request) => new Promise((resolve, reject) => {
var stream = this.startStream_(request, hostname);
var unaryMetadata;
var unaryStatus;
var unaryMsg;
GrpcWebClientBase.setCallback_(
stream, (error, response, status, metadata) => {
if (error) {
reject(error);
} else if (response) {
unaryMsg = response;
} else if (status) {
unaryStatus = status;
} else if (metadata) {
unaryMetadata = metadata;
} else {
resolve(new UnaryResponse(unaryMsg, unaryMetadata, unaryStatus));
}
}, true);
});
var invoker = GrpcWebClientBase.runInterceptors_(
initialInvoker, this.unaryInterceptors_);
var unaryResponse = /** @type {!Promise<?>} */ (invoker.call(
this, methodDescriptor.createRequest(requestMessage, metadata)));
return unaryResponse.then((response) => response.getResponseMessage());
};
/**
* @export
* @param {string} method The method to invoke
* @param {REQUEST} requestMessage The request proto
* @param {!Object<string, string>} metadata User defined call metadata
* @param {!MethodDescriptor<REQUEST, RESPONSE>|
* !AbstractClientBase.MethodInfo<REQUEST,RESPONSE>}
* methodDescriptor Information of this RPC method
* @return {!Promise<RESPONSE>}
* @template REQUEST, RESPONSE
*/
GrpcWebClientBase.prototype.unaryCall = function(
method, requestMessage, metadata, methodDescriptor) {
return /** @type {!Promise<RESPONSE>}*/ (
this.thenableCall(method, requestMessage, metadata, methodDescriptor));
};
/**
* @override
* @export
*/
GrpcWebClientBase.prototype.serverStreaming = function(
method, requestMessage, metadata, methodDescriptor) {
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.SERVER_STREAMING, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
var invoker = GrpcWebClientBase.runInterceptors_(
(request) => this.startStream_(request, hostname),
this.streamInterceptors_);
return /** @type {!ClientReadableStream<?>} */ (invoker.call(
this, methodDescriptor.createRequest(requestMessage, metadata)));
};
/**
* @private
* @template REQUEST, RESPONSE
* @param {!Request<REQUEST, RESPONSE>} request
* @param {string} hostname
* @return {!ClientReadableStream<RESPONSE>}
*/
GrpcWebClientBase.prototype.startStream_ = function(request, hostname) {
var methodDescriptor = request.getMethodDescriptor();
var path = hostname + methodDescriptor.name;
var xhr = this.newXhr_();
xhr.setWithCredentials(this.withCredentials_);
var genericTransportInterface = {
xhr: xhr,
};
var stream = new GrpcWebClientReadableStream(genericTransportInterface);
stream.setResponseDeserializeFn(methodDescriptor.responseDeserializeFn);
xhr.headers.addAll(request.getMetadata());
this.processHeaders_(xhr);
if (this.suppressCorsPreflight_) {
var headerObject = xhr.headers.toObject();
xhr.headers.clear();
path = GrpcWebClientBase.setCorsOverride_(path, headerObject);
/**
* @const {!Array<!UnaryInterceptor>}
* @private
*/
this.unaryInterceptors_ =
goog.getObjectByName('unaryInterceptors', opt_options) || [];
}
var serialized =
methodDescriptor.requestSerializeFn(request.getRequestMessage());
var payload = this.encodeRequest_(serialized);
if (this.format_ == 'text') {
payload = googCrypt.encodeByteArray(payload);
} else if (this.format_ == 'binary') {
xhr.setResponseType(XhrIo.ResponseType.ARRAY_BUFFER);
/**
* @override
* @export
*/
rpcCall(method, requestMessage, metadata, methodDescriptor, callback) {
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.UNARY, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
var invoker = GrpcWebClientBase.runInterceptors_(
(request) => this.startStream_(request, hostname),
this.streamInterceptors_);
var stream = /** @type {!ClientReadableStream<?>} */ (invoker.call(
this, methodDescriptor.createRequest(requestMessage, metadata)));
GrpcWebClientBase.setCallback_(stream, callback, false);
return stream;
}
xhr.send(path, 'POST', payload);
return stream;
};
/**
* @override
* @export
*/
thenableCall(method, requestMessage, metadata, methodDescriptor) {
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.UNARY, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
var initialInvoker = (request) => new Promise((resolve, reject) => {
var stream = this.startStream_(request, hostname);
var unaryMetadata;
var unaryStatus;
var unaryMsg;
GrpcWebClientBase.setCallback_(
stream, (error, response, status, metadata) => {
if (error) {
reject(error);
} else if (response) {
unaryMsg = response;
} else if (status) {
unaryStatus = status;
} else if (metadata) {
unaryMetadata = metadata;
} else {
resolve(new UnaryResponse(unaryMsg, unaryMetadata, unaryStatus));
}
}, true);
});
var invoker = GrpcWebClientBase.runInterceptors_(
initialInvoker, this.unaryInterceptors_);
var unaryResponse = /** @type {!Promise<?>} */ (invoker.call(
this, methodDescriptor.createRequest(requestMessage, metadata)));
return unaryResponse.then((response) => response.getResponseMessage());
}
/**
* @private
* @static
* @template RESPONSE
* @param {!ClientReadableStream<RESPONSE>} stream
* @param {function(?Error, ?RESPONSE, ?Status=, ?Metadata=)|
* function(?Error,?RESPONSE)} callback
* @param {boolean} useUnaryResponse
*/
GrpcWebClientBase.setCallback_ = function(stream, callback, useUnaryResponse) {
var responseReceived = null;
var errorEmitted = false;
/**
* @export
* @param {string} method The method to invoke
* @param {REQUEST} requestMessage The request proto
* @param {!Object<string, string>} metadata User defined call metadata
* @param {!MethodDescriptor<REQUEST, RESPONSE>|
* !AbstractClientBase.MethodInfo<REQUEST,RESPONSE>}
* methodDescriptor Information of this RPC method
* @return {!Promise<RESPONSE>}
* @template REQUEST, RESPONSE
*/
unaryCall(method, requestMessage, metadata, methodDescriptor) {
return /** @type {!Promise<RESPONSE>}*/ (
this.thenableCall(method, requestMessage, metadata, methodDescriptor));
}
stream.on('data', function(response) {
responseReceived = response;
});
/**
* @override
* @export
*/
serverStreaming(method, requestMessage, metadata, methodDescriptor) {
methodDescriptor = AbstractClientBase.ensureMethodDescriptor(
method, requestMessage, MethodType.SERVER_STREAMING, methodDescriptor);
var hostname = AbstractClientBase.getHostname(method, methodDescriptor);
var invoker = GrpcWebClientBase.runInterceptors_(
(request) => this.startStream_(request, hostname),
this.streamInterceptors_);
return /** @type {!ClientReadableStream<?>} */ (invoker.call(
this, methodDescriptor.createRequest(requestMessage, metadata)));
}
stream.on('error', function(error) {
if (error.code != StatusCode.OK && !errorEmitted) {
errorEmitted = true;
callback(error, null);
/**
* @private
* @template REQUEST, RESPONSE
* @param {!Request<REQUEST, RESPONSE>} request
* @param {string} hostname
* @return {!ClientReadableStream<RESPONSE>}
*/
startStream_(request, hostname) {
var methodDescriptor = request.getMethodDescriptor();
var path = hostname + methodDescriptor.name;
var xhr = this.newXhr_();
xhr.setWithCredentials(this.withCredentials_);
var genericTransportInterface = {
xhr: xhr,
};
var stream = new GrpcWebClientReadableStream(genericTransportInterface);
stream.setResponseDeserializeFn(methodDescriptor.responseDeserializeFn);
xhr.headers.addAll(request.getMetadata());
this.processHeaders_(xhr);
if (this.suppressCorsPreflight_) {
var headerObject = xhr.headers.toObject();
xhr.headers.clear();
path = GrpcWebClientBase.setCorsOverride_(path, headerObject);
}
});
stream.on('status', function(status) {
if (status.code != StatusCode.OK && !errorEmitted) {
errorEmitted = true;
callback(
{
code: status.code,
message: status.details,
metadata: status.metadata
},
null);
} else if (useUnaryResponse) {
callback(null, null, status);
var serialized =
methodDescriptor.requestSerializeFn(request.getRequestMessage());
var payload = this.encodeRequest_(serialized);
if (this.format_ == 'text') {
payload = googCrypt.encodeByteArray(payload);
} else if (this.format_ == 'binary') {
xhr.setResponseType(XhrIo.ResponseType.ARRAY_BUFFER);
}
});
xhr.send(path, 'POST', payload);
return stream;
}
if (useUnaryResponse) {
stream.on('metadata', function(metadata) {
callback(null, null, null, metadata);
/**
* @private
* @static
* @template RESPONSE
* @param {!ClientReadableStream<RESPONSE>} stream
* @param {function(?Error, ?RESPONSE, ?Status=, ?Metadata=)|
* function(?Error,?RESPONSE)} callback
* @param {boolean} useUnaryResponse
*/
static setCallback_(stream, callback, useUnaryResponse) {
var responseReceived = null;
var errorEmitted = false;
stream.on('data', function(response) {
responseReceived = response;
});
stream.on('error', function(error) {
if (error.code != StatusCode.OK && !errorEmitted) {
errorEmitted = true;
callback(error, null);
}
});
stream.on('status', function(status) {
if (status.code != StatusCode.OK && !errorEmitted) {
errorEmitted = true;
callback(
{
code: status.code,
message: status.details,
metadata: status.metadata
},
null);
} else if (useUnaryResponse) {
callback(null, null, status);
}
});
if (useUnaryResponse) {
stream.on('metadata', function(metadata) {
callback(null, null, null, metadata);
});
}
stream.on('end', function() {
if (!errorEmitted) {
callback(null, responseReceived);
}
if (useUnaryResponse) {
callback(null, null); // trigger unaryResponse
}
});
}
stream.on('end', function() {
if (!errorEmitted) {
callback(null, responseReceived);
}
if (useUnaryResponse) {
callback(null, null); // trigger unaryResponse
}
});
};
/**
* Create a new XhrIo object
*
* @private
* @return {!XhrIo} The created XhrIo object
*/
GrpcWebClientBase.prototype.newXhr_ = function() {
return new XhrIo();
};
/**
* Encode the grpc-web request
*
* @private
* @param {!Uint8Array} serialized The serialized proto payload
* @return {!Uint8Array} The application/grpc-web padded request
*/
GrpcWebClientBase.prototype.encodeRequest_ = function(serialized) {
var len = serialized.length;
var bytesArray = [0, 0, 0, 0];
var payload = new Uint8Array(5 + len);
for (var i = 3; i >= 0; i--) {
bytesArray[i] = (len % 256);
len = len >>> 8;
/**
* Create a new XhrIo object
*
* @private
* @return {!XhrIo} The created XhrIo object
*/
newXhr_() {
return new XhrIo();
}
payload.set(new Uint8Array(bytesArray), 1);
payload.set(serialized, 5);
return payload;
};
/**
* @private
* @param {!XhrIo} xhr The xhr object
*/
GrpcWebClientBase.prototype.processHeaders_ = function(xhr) {
if (this.format_ == "text") {
xhr.headers.set('Content-Type', 'application/grpc-web-text');
xhr.headers.set('Accept', 'application/grpc-web-text');
} else {
xhr.headers.set('Content-Type', 'application/grpc-web+proto');
}
xhr.headers.set('X-User-Agent', 'grpc-web-javascript/0.1');
xhr.headers.set('X-Grpc-Web', '1');
if (xhr.headers.containsKey('deadline')) {
var deadline = xhr.headers.get('deadline'); // in ms
var currentTime = (new Date()).getTime();
var timeout = Math.round(deadline - currentTime);
xhr.headers.remove('deadline');
if (timeout === Infinity) {
// grpc-timeout header defaults to infinity if not set.
timeout = 0;
/**
* Encode the grpc-web request
*
* @private
* @param {!Uint8Array} serialized The serialized proto payload
* @return {!Uint8Array} The application/grpc-web padded request
*/
encodeRequest_(serialized) {
var len = serialized.length;
var bytesArray = [0, 0, 0, 0];
var payload = new Uint8Array(5 + len);
for (var i = 3; i >= 0; i--) {
bytesArray[i] = (len % 256);
len = len >>> 8;
}
if (timeout > 0) {
xhr.headers.set('grpc-timeout', timeout + 'm');
payload.set(new Uint8Array(bytesArray), 1);
payload.set(serialized, 5);
return payload;
}
/**
* @private
* @param {!XhrIo} xhr The xhr object
*/
processHeaders_(xhr) {
if (this.format_ == 'text') {
xhr.headers.set('Content-Type', 'application/grpc-web-text');
xhr.headers.set('Accept', 'application/grpc-web-text');
} else {
xhr.headers.set('Content-Type', 'application/grpc-web+proto');
}
xhr.headers.set('X-User-Agent', 'grpc-web-javascript/0.1');
xhr.headers.set('X-Grpc-Web', '1');
if (xhr.headers.containsKey('deadline')) {
var deadline = xhr.headers.get('deadline'); // in ms
var currentTime = (new Date()).getTime();
var timeout = Math.round(deadline - currentTime);
xhr.headers.remove('deadline');
if (timeout === Infinity) {
// grpc-timeout header defaults to infinity if not set.
timeout = 0;
}
if (timeout > 0) {
xhr.headers.set('grpc-timeout', timeout + 'm');
}
}
}
};
/**
* @private
* @static
* @param {string} method The method to invoke
* @param {!Object<string,string>} headerObject The xhr headers
* @return {string} The URI object or a string path with headers
*/
GrpcWebClientBase.setCorsOverride_ = function(method, headerObject) {
return /** @type {string} */ (HttpCors.setHttpHeadersWithOverwriteParam(
method, HttpCors.HTTP_HEADERS_PARAM_NAME, headerObject));
};
/**
* @private
* @static
* @param {string} method The method to invoke
* @param {!Object<string,string>} headerObject The xhr headers
* @return {string} The URI object or a string path with headers
*/
static setCorsOverride_(method, headerObject) {
return /** @type {string} */ (HttpCors.setHttpHeadersWithOverwriteParam(
method, HttpCors.HTTP_HEADERS_PARAM_NAME, headerObject));
}
/**
* @private
* @static
* @template REQUEST, RESPONSE
* @param {function(!Request<REQUEST,RESPONSE>):
* (!Promise<RESPONSE>|!ClientReadableStream<RESPONSE>)} invoker
* @param {!Array<!UnaryInterceptor|!StreamInterceptor>}
* interceptors
* @return {function(!Request<REQUEST,RESPONSE>):
* (!Promise<RESPONSE>|!ClientReadableStream<RESPONSE>)}
*/
static runInterceptors_(invoker, interceptors) {
let curInvoker = invoker;
interceptors.forEach((interceptor) => {
const lastInvoker = curInvoker;
curInvoker = (request) => interceptor.intercept(request, lastInvoker);
});
return curInvoker;
}
}
/**
* @private
* @static
* @template REQUEST, RESPONSE
* @param {function(!Request<REQUEST,RESPONSE>):
* (!Promise<RESPONSE>|!ClientReadableStream<RESPONSE>)} invoker
* @param {!Array<!UnaryInterceptor|!StreamInterceptor>}
* interceptors
* @return {function(!Request<REQUEST,RESPONSE>):
* (!Promise<RESPONSE>|!ClientReadableStream<RESPONSE>)}
*/
GrpcWebClientBase.runInterceptors_ = function(invoker, interceptors) {
let curInvoker = invoker;
interceptors.forEach((interceptor) => {
const lastInvoker = curInvoker;
curInvoker = (request) => interceptor.intercept(request, lastInvoker);
});
return curInvoker;
};
exports = GrpcWebClientBase;

View File

@ -28,17 +28,17 @@ var testSuite = goog.require('goog.testing.testSuite');
const {StreamInterceptor} = goog.require('grpc.web.Interceptor');
goog.require('goog.testing.jsunit');
var REQUEST_BYTES = [1,2,3];
var FAKE_METHOD = "fake-method";
var PROTO_FIELD_VALUE = "meow";
var REQUEST_BYTES = [1, 2, 3];
var FAKE_METHOD = 'fake-method';
var PROTO_FIELD_VALUE = 'meow';
var EXPECTED_HEADERS;
var EXPECTED_HEADER_VALUES;
var EXPECTED_UNARY_HEADERS = ['Content-Type', 'Accept',
'X-User-Agent', 'X-Grpc-Web'];
var EXPECTED_UNARY_HEADER_VALUES = ['application/grpc-web-text',
'application/grpc-web-text',
'grpc-web-javascript/0.1',
'1'];
var EXPECTED_UNARY_HEADERS =
['Content-Type', 'Accept', 'X-User-Agent', 'X-Grpc-Web'];
var EXPECTED_UNARY_HEADER_VALUES = [
'application/grpc-web-text', 'application/grpc-web-text',
'grpc-web-javascript/0.1', '1'
];
var dataCallback;
@ -62,25 +62,26 @@ testSuite({
client.newXhr_ = function() {
return new MockXhr({
// This parses to [ { DATA: [4,5,6] }, { TRAILER: "a: b" } ]
response: googCrypt.encodeByteArray(new Uint8Array([
0, 0, 0, 0, 3, 4, 5, 6, 128, 0, 0, 0, 4, 97, 58, 32, 98
])),
response: googCrypt.encodeByteArray(new Uint8Array(
[0, 0, 0, 0, 3, 4, 5, 6, 128, 0, 0, 0, 4, 97, 58, 32, 98])),
});
};
expectUnaryHeaders();
client.rpcCall(FAKE_METHOD, {}, {}, {
requestSerializeFn : function(request) {
return REQUEST_BYTES;
},
responseDeserializeFn : function(bytes) {
assertElementsEquals([4,5,6], [].slice.call(bytes));
return {"field1": PROTO_FIELD_VALUE};
}
}, function(error, response) {
assertNull(error);
assertEquals(PROTO_FIELD_VALUE, response['field1']);
});
client.rpcCall(
FAKE_METHOD, {}, {}, {
requestSerializeFn: function(request) {
return REQUEST_BYTES;
},
responseDeserializeFn: function(bytes) {
assertElementsEquals([4, 5, 6], [].slice.call(bytes));
return {'field1': PROTO_FIELD_VALUE};
}
},
function(error, response) {
assertNull(error);
assertEquals(PROTO_FIELD_VALUE, response['field1']);
});
dataCallback();
},
@ -119,23 +120,26 @@ testSuite({
return new MockXhr({
// This decodes to "grpc-status: 3"
response: googCrypt.encodeByteArray(new Uint8Array([
128, 0, 0, 0, 14, 103, 114, 112, 99, 45, 115, 116, 97, 116, 117, 115, 58, 32, 51
128, 0, 0, 0, 14, 103, 114, 112, 99, 45, 115, 116, 97, 116, 117, 115,
58, 32, 51
])),
});
};
expectUnaryHeaders();
client.rpcCall(FAKE_METHOD, {}, {}, {
requestSerializeFn : function(request) {
return REQUEST_BYTES;
},
responseDeserializeFn : function(bytes) {
return {};
}
}, function(error, response) {
assertNull(response);
assertEquals(3, error.code);
});
client.rpcCall(
FAKE_METHOD, {}, {}, {
requestSerializeFn: function(request) {
return REQUEST_BYTES;
},
responseDeserializeFn: function(bytes) {
return {};
}
},
function(error, response) {
assertNull(response);
assertEquals(3, error.code);
});
dataCallback();
},
@ -144,28 +148,29 @@ testSuite({
client.newXhr_ = function() {
return new MockXhr({
// This parses to [ { DATA: [4,5,6] }, { TRAILER: "a: b" } ]
response: googCrypt.encodeByteArray(new Uint8Array([
0, 0, 0, 0, 3, 4, 5, 6, 128, 0, 0, 0, 4, 97, 58, 32, 98
])),
response: googCrypt.encodeByteArray(new Uint8Array(
[0, 0, 0, 0, 3, 4, 5, 6, 128, 0, 0, 0, 4, 97, 58, 32, 98])),
});
};
expectUnaryHeaders();
var call = client.rpcCall(FAKE_METHOD, {}, {}, {
requestSerializeFn : function(request) {
return REQUEST_BYTES;
},
responseDeserializeFn : function(bytes) {
assertElementsEquals([4,5,6], [].slice.call(bytes));
return {"field1": PROTO_FIELD_VALUE};
}
}, function(error, response) {
assertNull(error);
assertEquals(PROTO_FIELD_VALUE, response['field1']);
});
var call = client.rpcCall(
FAKE_METHOD, {}, {}, {
requestSerializeFn: function(request) {
return REQUEST_BYTES;
},
responseDeserializeFn: function(bytes) {
assertElementsEquals([4, 5, 6], [].slice.call(bytes));
return {'field1': PROTO_FIELD_VALUE};
}
},
function(error, response) {
assertNull(error);
assertEquals(PROTO_FIELD_VALUE, response['field1']);
});
call.on('metadata', (metadata) => {
assertEquals(metadata['sample-initial-metadata-1'],
'sample-initial-metadata-val');
assertEquals(
metadata['sample-initial-metadata-1'], 'sample-initial-metadata-val');
});
dataCallback();
}
@ -180,133 +185,132 @@ function expectUnaryHeaders() {
}
/**
* @constructor
* @param {?Object} mockValues
* Mock XhrIO object to test the outgoing values
*/
function MockXhr(mockValues) {
this.mockValues = mockValues;
this.headers = new Map();
/** @unrestricted */
class MockXhr {
/**
* @param {?Object} mockValues
* Mock XhrIO object to test the outgoing values
*/
constructor(mockValues) {
this.mockValues = mockValues;
this.headers = new Map();
}
/**
* @param {string} url
* @param {string=} opt_method
* @param {string=} opt_content
* @param {string=} opt_headers
*/
send(url, opt_method, opt_content, opt_headers) {
assertEquals(FAKE_METHOD, url);
assertEquals('POST', opt_method);
assertElementsEquals(
googCrypt.encodeByteArray(new Uint8Array([0, 0, 0, 0, 3, 1, 2, 3])),
opt_content);
assertElementsEquals(EXPECTED_HEADERS, this.headers.getKeys());
assertElementsEquals(EXPECTED_HEADER_VALUES, this.headers.getValues());
}
/**
* @param {boolean} withCredentials
*/
setWithCredentials(withCredentials) {
return;
}
/**
* @return {string} response
*/
getResponseText() {
return this.mockValues.response;
}
/**
* @param {string} key header key
* @return {string} content-type
*/
getStreamingResponseHeader(key) {
return 'application/grpc-web-text';
}
/**
* @return {string} response
*/
getResponseHeaders() {
return {'sample-initial-metadata-1': 'sample-initial-metadata-val'};
}
/**
* @return {number} xhr state
*/
getReadyState() {
return 0;
}
/**
* @return {number} lastErrorCode
*/
getLastErrorCode() {
return 0;
}
/**
* @return {string} lastError
*/
getLastError() {
return 'server not responding';
}
/**
* @param {string} responseType
*/
setResponseType(responseType) {
return;
}
}
/**
* @param {string} url
* @param {string=} opt_method
* @param {string=} opt_content
* @param {string=} opt_headers
*/
MockXhr.prototype.send = function(url, opt_method, opt_content, opt_headers) {
assertEquals(FAKE_METHOD, url);
assertEquals("POST", opt_method);
assertElementsEquals(googCrypt.encodeByteArray(new Uint8Array([0, 0, 0, 0, 3, 1, 2, 3])), opt_content);
assertElementsEquals(EXPECTED_HEADERS, this.headers.getKeys());
assertElementsEquals(EXPECTED_HEADER_VALUES, this.headers.getValues());
};
/**
* @param {boolean} withCredentials
*/
MockXhr.prototype.setWithCredentials = function(withCredentials) {
return;
};
/**
* @return {string} response
*/
MockXhr.prototype.getResponseText = function() {
return this.mockValues.response;
};
/**
* @param {string} key header key
* @return {string} content-type
*/
MockXhr.prototype.getStreamingResponseHeader = function(key) {
return 'application/grpc-web-text';
};
/**
* @return {string} response
*/
MockXhr.prototype.getResponseHeaders = function() {
return {'sample-initial-metadata-1': 'sample-initial-metadata-val'};
};
/**
* @return {number} xhr state
*/
MockXhr.prototype.getReadyState = function() {
return 0;
};
/**
* @return {number} lastErrorCode
*/
MockXhr.prototype.getLastErrorCode = function() {
return 0;
};
/**
* @return {string} lastError
*/
MockXhr.prototype.getLastError = function() {
return 'server not responding';
};
/**
* @param {string} responseType
*/
MockXhr.prototype.setResponseType = function(responseType) {
return;
};
/**
* @constructor
* @implements {StreamInterceptor}
* @unrestricted
*/
const StreamResponseInterceptor = function() {};
/** @override */
StreamResponseInterceptor.prototype.intercept = function(request, invoker) {
/**
* @implements {ClientReadableStream}
* @constructor
* @param {!ClientReadableStream<RESPONSE>} stream
* @template RESPONSE
*/
const InterceptedStream = function(stream) {
this.stream = stream;
};
class StreamResponseInterceptor {
constructor() {}
/** @override */
InterceptedStream.prototype.on = function(eventType, callback) {
if (eventType == 'data') {
const newCallback = (response) => {
response['field2'] = 'field2';
callback(response);
};
this.stream.on(eventType, newCallback);
} else {
this.stream.on(eventType, callback);
}
return this;
};
intercept(request, invoker) {
/**
* @implements {ClientReadableStream}
* @constructor
* @param {!ClientReadableStream<RESPONSE>} stream
* @template RESPONSE
*/
const InterceptedStream = function(stream) {
this.stream = stream;
};
/** @override */
InterceptedStream.prototype.cancel = function() {
this.stream.cancel();
return this;
};
/** @override */
InterceptedStream.prototype.on = function(eventType, callback) {
if (eventType == 'data') {
const newCallback = (response) => {
response['field2'] = 'field2';
callback(response);
};
this.stream.on(eventType, newCallback);
} else {
this.stream.on(eventType, callback);
}
return this;
};
return new InterceptedStream(invoker(request));
};
/** @override */
InterceptedStream.prototype.cancel = function() {
this.stream.cancel();
return this;
};
return new InterceptedStream(invoker(request));
}
}

View File

@ -45,381 +45,364 @@ const {Status} = goog.require('grpc.web.Status');
const GRPC_STATUS = "grpc-status";
const GRPC_STATUS_MESSAGE = "grpc-message";
const GRPC_STATUS = 'grpc-status';
const GRPC_STATUS_MESSAGE = 'grpc-message';
/** @type {!Array<string>} */
const EXCLUDED_RESPONSE_HEADERS = [
'content-type',
GRPC_STATUS,
GRPC_STATUS_MESSAGE
];
const EXCLUDED_RESPONSE_HEADERS =
['content-type', GRPC_STATUS, GRPC_STATUS_MESSAGE];
/**
* A stream that the client can read from. Used for calls that are streaming
* from the server side.
*
* @template RESPONSE
* @constructor
* @implements {ClientReadableStream}
* @final
* @param {!GenericTransportInterface} genericTransportInterface The
* GenericTransportInterface
* @unrestricted
*/
const GrpcWebClientReadableStream = function(genericTransportInterface) {
class GrpcWebClientReadableStream {
/**
* @const
* @private
* @type {?XhrIo} The XhrIo object
* @param {!GenericTransportInterface} genericTransportInterface The
* GenericTransportInterface
*/
this.xhr_ = /** @type {?XhrIo} */ (genericTransportInterface.xhr);
constructor(genericTransportInterface) {
/**
* @const
* @private
* @type {?XhrIo} The XhrIo object
*/
this.xhr_ = /** @type {?XhrIo} */ (genericTransportInterface.xhr);
/**
* @private
* @type {function(?):!RESPONSE|null} The deserialize function for the proto
*/
this.responseDeserializeFn_ = null;
/**
* @private
* @type {function(?):!RESPONSE|null} The deserialize function for the proto
*/
this.responseDeserializeFn_ = null;
/**
* @const
* @private
* @type {!Array<function(!RESPONSE)>} The list of data callbacks
*/
this.onDataCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(!RESPONSE)>} The list of data callbacks
*/
this.onDataCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(!Status)>} The list of status callbacks
*/
this.onStatusCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(!Status)>} The list of status callbacks
*/
this.onStatusCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(!Metadata)>} The list of metadata callbacks
*/
this.onMetadataCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(!Metadata)>} The list of metadata callbacks
*/
this.onMetadataCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(...):?>} The list of error callbacks
*/
this.onErrorCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(...):?>} The list of error callbacks
*/
this.onErrorCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(...):?>} The list of stream end callbacks
*/
this.onEndCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(...):?>} The list of stream end callbacks
*/
this.onEndCallbacks_ = [];
/**
* @private
* @type {boolean} Whether the stream has been aborted
*/
this.aborted_ = false;
/**
* @private
* @type {boolean} Whether the stream has been aborted
*/
this.aborted_ = false;
/**
* @private
* @type {number} The stream parser position
*/
this.pos_ = 0;
/**
* @private
* @type {number} The stream parser position
*/
this.pos_ = 0;
/**
* @private
* @type {!GrpcWebStreamParser} The grpc-web stream parser
* @const
*/
this.parser_ = new GrpcWebStreamParser();
/**
* @private
* @type {!GrpcWebStreamParser} The grpc-web stream parser
* @const
*/
this.parser_ = new GrpcWebStreamParser();
var self = this;
events.listen(this.xhr_, EventType.READY_STATE_CHANGE,
function(e) {
var contentType = self.xhr_.getStreamingResponseHeader('Content-Type');
if (!contentType) return;
contentType = contentType.toLowerCase();
var self = this;
events.listen(this.xhr_, EventType.READY_STATE_CHANGE, function(e) {
var contentType = self.xhr_.getStreamingResponseHeader('Content-Type');
if (!contentType) return;
contentType = contentType.toLowerCase();
if (googString.startsWith(contentType, 'application/grpc-web-text')) {
var responseText = self.xhr_.getResponseText();
var newPos = responseText.length - responseText.length % 4;
var newData = responseText.substr(self.pos_, newPos - self.pos_);
if (newData.length == 0) return;
self.pos_ = newPos;
var byteSource = googCrypt.decodeStringToUint8Array(newData);
} else if (googString.startsWith(contentType, 'application/grpc')) {
var byteSource = new Uint8Array(
/** @type {!ArrayBuffer} */ (self.xhr_.getResponse()));
} else {
return;
}
var messages = self.parser_.parse(byteSource);
if (messages) {
var FrameType = GrpcWebStreamParser.FrameType;
for (var i = 0; i < messages.length; i++) {
if (FrameType.DATA in messages[i]) {
var data = messages[i][FrameType.DATA];
if (data) {
var response = self.responseDeserializeFn_(data);
if (response) {
self.sendDataCallbacks_(response);
}
}
}
if (FrameType.TRAILER in messages[i]) {
if (messages[i][FrameType.TRAILER].length > 0) {
var trailerString = '';
for (var pos = 0; pos < messages[i][FrameType.TRAILER].length;
pos++) {
trailerString += String.fromCharCode(
messages[i][FrameType.TRAILER][pos]);
}
var trailers = self.parseHttp1Headers_(trailerString);
var grpcStatusCode = StatusCode.OK;
var grpcStatusMessage = '';
if (GRPC_STATUS in trailers) {
grpcStatusCode = trailers[GRPC_STATUS];
delete trailers[GRPC_STATUS];
}
if (GRPC_STATUS_MESSAGE in trailers) {
grpcStatusMessage = trailers[GRPC_STATUS_MESSAGE];
delete trailers[GRPC_STATUS_MESSAGE];
}
self.sendStatusCallbacks_(/** @type {!Status} */({
code: Number(grpcStatusCode),
details: grpcStatusMessage,
metadata: trailers,
}));
}
}
}
}
});
events.listen(this.xhr_, EventType.COMPLETE, function(e) {
var lastErrorCode = self.xhr_.getLastErrorCode();
var grpcStatusCode;
var grpcStatusMessage = '';
var initialMetadata = /** @type {!Metadata} */ ({});
var responseHeaders = self.xhr_.getResponseHeaders();
Object.keys(responseHeaders).forEach((header_) => {
if (!(EXCLUDED_RESPONSE_HEADERS.includes(header_))) {
initialMetadata[header_] = responseHeaders[header_];
}
});
self.sendMetadataCallbacks_(initialMetadata);
// There's an XHR level error
if (lastErrorCode != ErrorCode.NO_ERROR) {
switch (lastErrorCode) {
case ErrorCode.ABORT:
grpcStatusCode = StatusCode.ABORTED;
break;
case ErrorCode.TIMEOUT:
grpcStatusCode = StatusCode.DEADLINE_EXCEEDED;
break;
case ErrorCode.HTTP_ERROR:
grpcStatusCode = StatusCode.fromHttpStatus(self.xhr_.getStatus());
break;
default:
grpcStatusCode = StatusCode.UNAVAILABLE;
}
if (grpcStatusCode == StatusCode.ABORTED && self.aborted_) {
if (googString.startsWith(contentType, 'application/grpc-web-text')) {
var responseText = self.xhr_.getResponseText();
var newPos = responseText.length - responseText.length % 4;
var newData = responseText.substr(self.pos_, newPos - self.pos_);
if (newData.length == 0) return;
self.pos_ = newPos;
var byteSource = googCrypt.decodeStringToUint8Array(newData);
} else if (googString.startsWith(contentType, 'application/grpc')) {
var byteSource = new Uint8Array(
/** @type {!ArrayBuffer} */ (self.xhr_.getResponse()));
} else {
return;
}
self.sendErrorCallbacks_({
code: grpcStatusCode,
message: ErrorCode.getDebugMessage(lastErrorCode)
var messages = self.parser_.parse(byteSource);
if (messages) {
var FrameType = GrpcWebStreamParser.FrameType;
for (var i = 0; i < messages.length; i++) {
if (FrameType.DATA in messages[i]) {
var data = messages[i][FrameType.DATA];
if (data) {
var response = self.responseDeserializeFn_(data);
if (response) {
self.sendDataCallbacks_(response);
}
}
}
if (FrameType.TRAILER in messages[i]) {
if (messages[i][FrameType.TRAILER].length > 0) {
var trailerString = '';
for (var pos = 0; pos < messages[i][FrameType.TRAILER].length;
pos++) {
trailerString +=
String.fromCharCode(messages[i][FrameType.TRAILER][pos]);
}
var trailers = self.parseHttp1Headers_(trailerString);
var grpcStatusCode = StatusCode.OK;
var grpcStatusMessage = '';
if (GRPC_STATUS in trailers) {
grpcStatusCode = trailers[GRPC_STATUS];
delete trailers[GRPC_STATUS];
}
if (GRPC_STATUS_MESSAGE in trailers) {
grpcStatusMessage = trailers[GRPC_STATUS_MESSAGE];
delete trailers[GRPC_STATUS_MESSAGE];
}
self.sendStatusCallbacks_(/** @type {!Status} */ ({
code: Number(grpcStatusCode),
details: grpcStatusMessage,
metadata: trailers,
}));
}
}
}
}
});
events.listen(this.xhr_, EventType.COMPLETE, function(e) {
var lastErrorCode = self.xhr_.getLastErrorCode();
var grpcStatusCode;
var grpcStatusMessage = '';
var initialMetadata = /** @type {!Metadata} */ ({});
var responseHeaders = self.xhr_.getResponseHeaders();
Object.keys(responseHeaders).forEach((header_) => {
if (!(EXCLUDED_RESPONSE_HEADERS.includes(header_))) {
initialMetadata[header_] = responseHeaders[header_];
}
});
return;
}
self.sendMetadataCallbacks_(initialMetadata);
var errorEmitted = false;
// Check whethere there are grpc specific response headers
if (GRPC_STATUS in responseHeaders) {
grpcStatusCode = self.xhr_.getResponseHeader(GRPC_STATUS);
if (GRPC_STATUS_MESSAGE in responseHeaders) {
grpcStatusMessage = self.xhr_.getResponseHeader(GRPC_STATUS_MESSAGE);
}
if (Number(grpcStatusCode) != StatusCode.OK) {
// There's an XHR level error
if (lastErrorCode != ErrorCode.NO_ERROR) {
switch (lastErrorCode) {
case ErrorCode.ABORT:
grpcStatusCode = StatusCode.ABORTED;
break;
case ErrorCode.TIMEOUT:
grpcStatusCode = StatusCode.DEADLINE_EXCEEDED;
break;
case ErrorCode.HTTP_ERROR:
grpcStatusCode = StatusCode.fromHttpStatus(self.xhr_.getStatus());
break;
default:
grpcStatusCode = StatusCode.UNAVAILABLE;
}
if (grpcStatusCode == StatusCode.ABORTED && self.aborted_) {
return;
}
self.sendErrorCallbacks_({
code: Number(grpcStatusCode),
message: grpcStatusMessage,
metadata: responseHeaders
code: grpcStatusCode,
message: ErrorCode.getDebugMessage(lastErrorCode)
});
errorEmitted = true;
return;
}
var errorEmitted = false;
// Check whethere there are grpc specific response headers
if (GRPC_STATUS in responseHeaders) {
grpcStatusCode = self.xhr_.getResponseHeader(GRPC_STATUS);
if (GRPC_STATUS_MESSAGE in responseHeaders) {
grpcStatusMessage = self.xhr_.getResponseHeader(GRPC_STATUS_MESSAGE);
}
if (Number(grpcStatusCode) != StatusCode.OK) {
self.sendErrorCallbacks_({
code: Number(grpcStatusCode),
message: grpcStatusMessage,
metadata: responseHeaders
});
errorEmitted = true;
}
if (!errorEmitted) {
self.sendStatusCallbacks_(/** @type {!Status} */ ({
code: Number(grpcStatusCode),
details: grpcStatusMessage,
metadata: responseHeaders
}));
}
}
if (!errorEmitted) {
self.sendStatusCallbacks_(/** @type {!Status} */ ({
code: Number(grpcStatusCode),
details: grpcStatusMessage,
metadata: responseHeaders
}));
self.sendEndCallbacks_();
}
});
}
/**
* @override
* @export
*/
on(eventType, callback) {
// TODO(stanleycheung): change eventType to @enum type
if (eventType == 'data') {
this.onDataCallbacks_.push(callback);
} else if (eventType == 'status') {
this.onStatusCallbacks_.push(callback);
} else if (eventType == 'metadata') {
this.onMetadataCallbacks_.push(callback);
} else if (eventType == 'end') {
this.onEndCallbacks_.push(callback);
} else if (eventType == 'error') {
this.onErrorCallbacks_.push(callback);
}
return this;
}
if (!errorEmitted) {
self.sendEndCallbacks_();
/**
* @private
* @param {!Array<function(?)>} callbacks the internal list of callbacks
* @param {function(?)} callback the callback to remove
*/
removeListenerFromCallbacks_(callbacks, callback) {
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
}
});
};
/**
* @override
* @export
*/
GrpcWebClientReadableStream.prototype.on = function(
eventType, callback) {
// TODO(stanleycheung): change eventType to @enum type
if (eventType == 'data') {
this.onDataCallbacks_.push(callback);
} else if (eventType == 'status') {
this.onStatusCallbacks_.push(callback);
} else if (eventType == 'metadata') {
this.onMetadataCallbacks_.push(callback);
} else if (eventType == 'end') {
this.onEndCallbacks_.push(callback);
} else if (eventType == 'error') {
this.onErrorCallbacks_.push(callback);
}
return this;
};
/**
* @private
* @param {!Array<function(?)>} callbacks the internal list of callbacks
* @param {function(?)} callback the callback to remove
*/
GrpcWebClientReadableStream.prototype.removeListenerFromCallbacks_ = function(
callbacks, callback) {
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
/**
* @export
* @override
*/
removeListener(eventType, callback) {
if (eventType == 'data') {
this.removeListenerFromCallbacks_(this.onDataCallbacks_, callback);
} else if (eventType == 'status') {
this.removeListenerFromCallbacks_(this.onStatusCallbacks_, callback);
} else if (eventType == 'metadata') {
this.removeListenerFromCallbacks_(this.onMetadataCallbacks_, callback);
} else if (eventType == 'end') {
this.removeListenerFromCallbacks_(this.onEndCallbacks_, callback);
} else if (eventType == 'error') {
this.removeListenerFromCallbacks_(this.onErrorCallbacks_, callback);
}
return this;
}
};
/**
* @export
* @override
*/
GrpcWebClientReadableStream.prototype.removeListener = function(
eventType, callback) {
if (eventType == 'data') {
this.removeListenerFromCallbacks_(this.onDataCallbacks_, callback);
} else if (eventType == 'status') {
this.removeListenerFromCallbacks_(this.onStatusCallbacks_, callback);
} else if (eventType == 'metadata') {
this.removeListenerFromCallbacks_(this.onMetadataCallbacks_, callback);
} else if (eventType == 'end') {
this.removeListenerFromCallbacks_(this.onEndCallbacks_, callback);
} else if (eventType == 'error') {
this.removeListenerFromCallbacks_(this.onErrorCallbacks_, callback);
/**
* Register a callbackl to parse the response
*
* @param {function(?):!RESPONSE} responseDeserializeFn The deserialize
* function for the proto
*/
setResponseDeserializeFn(responseDeserializeFn) {
this.responseDeserializeFn_ = responseDeserializeFn;
}
return this;
};
/**
* Register a callbackl to parse the response
*
* @param {function(?):!RESPONSE} responseDeserializeFn The deserialize
* function for the proto
*/
GrpcWebClientReadableStream.prototype.setResponseDeserializeFn =
function(responseDeserializeFn) {
this.responseDeserializeFn_ = responseDeserializeFn;
};
/**
* @override
* @export
*/
GrpcWebClientReadableStream.prototype.cancel = function() {
this.aborted_ = true;
this.xhr_.abort();
};
/**
* Parse HTTP headers
*
* @private
* @param {string} str The raw http header string
* @return {!Object} The header:value pairs
*/
GrpcWebClientReadableStream.prototype.parseHttp1Headers_ =
function(str) {
var chunks = str.trim().split("\r\n");
var headers = {};
for (var i = 0; i < chunks.length; i++) {
var pos = chunks[i].indexOf(":");
headers[chunks[i].substring(0, pos).trim()] =
chunks[i].substring(pos+1).trim();
/**
* @override
* @export
*/
cancel() {
this.aborted_ = true;
this.xhr_.abort();
}
return headers;
};
/**
* @private
* @param {!RESPONSE} data The data to send back
*/
GrpcWebClientReadableStream.prototype.sendDataCallbacks_ = function(data) {
for (var i = 0; i < this.onDataCallbacks_.length; i++) {
this.onDataCallbacks_[i](data);
/**
* Parse HTTP headers
*
* @private
* @param {string} str The raw http header string
* @return {!Object} The header:value pairs
*/
parseHttp1Headers_(str) {
var chunks = str.trim().split('\r\n');
var headers = {};
for (var i = 0; i < chunks.length; i++) {
var pos = chunks[i].indexOf(':');
headers[chunks[i].substring(0, pos).trim()] =
chunks[i].substring(pos + 1).trim();
}
return headers;
}
};
/**
* @private
* @param {!Status} status The status to send back
*/
GrpcWebClientReadableStream.prototype.sendStatusCallbacks_ = function(status) {
for (var i = 0; i < this.onStatusCallbacks_.length; i++) {
this.onStatusCallbacks_[i](status);
/**
* @private
* @param {!RESPONSE} data The data to send back
*/
sendDataCallbacks_(data) {
for (var i = 0; i < this.onDataCallbacks_.length; i++) {
this.onDataCallbacks_[i](data);
}
}
};
/**
* @private
* @param {!Metadata} metadata The metadata to send back
*/
GrpcWebClientReadableStream.prototype.sendMetadataCallbacks_ =
function(metadata) {
for (var i = 0; i < this.onMetadataCallbacks_.length; i++) {
this.onMetadataCallbacks_[i](metadata);
/**
* @private
* @param {!Status} status The status to send back
*/
sendStatusCallbacks_(status) {
for (var i = 0; i < this.onStatusCallbacks_.length; i++) {
this.onStatusCallbacks_[i](status);
}
}
};
/**
* @private
* @param {?} error The error to send back
*/
GrpcWebClientReadableStream.prototype.sendErrorCallbacks_ = function(error) {
for (var i = 0; i < this.onErrorCallbacks_.length; i++) {
this.onErrorCallbacks_[i](error);
/**
* @private
* @param {!Metadata} metadata The metadata to send back
*/
sendMetadataCallbacks_(metadata) {
for (var i = 0; i < this.onMetadataCallbacks_.length; i++) {
this.onMetadataCallbacks_[i](metadata);
}
}
};
/**
* @private
*/
GrpcWebClientReadableStream.prototype.sendEndCallbacks_ = function() {
for (var i = 0; i < this.onEndCallbacks_.length; i++) {
this.onEndCallbacks_[i]();
/**
* @private
* @param {?} error The error to send back
*/
sendErrorCallbacks_(error) {
for (var i = 0; i < this.onErrorCallbacks_.length; i++) {
this.onErrorCallbacks_[i](error);
}
}
};
/**
* @private
*/
sendEndCallbacks_() {
for (var i = 0; i < this.onEndCallbacks_.length; i++) {
this.onEndCallbacks_[i]();
}
}
}
exports = GrpcWebClientReadableStream;

View File

@ -49,68 +49,199 @@ const asserts = goog.require('goog.asserts');
/**
* The default grpc-web stream parser.
*
* @constructor
* @struct
* @implements {StreamParser}
* @final
*/
const GrpcWebStreamParser = function() {
/**
* The current error message, if any.
* @private {?string}
*/
this.errorMessage_ = null;
class GrpcWebStreamParser {
constructor() {
/**
* The current error message, if any.
* @private {?string}
*/
this.errorMessage_ = null;
/**
* The currently buffered result (parsed messages).
* @private {!Array<!Object>}
*/
this.result_ = [];
/**
* The current position in the streamed data.
* @private {number}
*/
this.streamPos_ = 0;
/**
* The current parser state.
* @private {number}
*/
this.state_ = Parser.State_.INIT;
/**
* The current frame byte being parsed
* @private {number}
*/
this.frame_ = 0;
/**
* The length of the proto message being parsed.
* @private {number}
*/
this.length_ = 0;
/**
* Count of processed length bytes.
* @private {number}
*/
this.countLengthBytes_ = 0;
/**
* Raw bytes of the current message. Uses Uint8Array by default. Falls back
* to native array when Uint8Array is unsupported.
* @private {?Uint8Array|?Array<number>}
*/
this.messageBuffer_ = null;
/**
* Count of processed message bytes.
* @private {number}
*/
this.countMessageBytes_ = 0;
}
/**
* The currently buffered result (parsed messages).
* @private {!Array<!Object>}
* @override
*/
this.result_ = [];
isInputValid() {
return this.state_ != Parser.State_.INVALID;
}
/**
* The current position in the streamed data.
* @private {number}
* @override
*/
this.streamPos_ = 0;
getErrorMessage() {
return this.errorMessage_;
}
/**
* The current parser state.
* @private {number}
* Parse the new input.
*
* Note that there is no Parser state to indicate the end of a stream.
*
* @param {string|!ArrayBuffer|!Uint8Array|!Array<number>} input The input
* data
* @throws {!Error} Throws an error message if the input is invalid.
* @return {?Array<string|!Object>} any parsed objects (atomic messages)
* in an array, or null if more data needs be read to parse any new object.
* @override
*/
this.state_ = Parser.State_.INIT;
parse(input) {
asserts.assert(
input instanceof Array || input instanceof ArrayBuffer ||
input instanceof Uint8Array);
/**
* The current frame byte being parsed
* @private {number}
*/
this.frame_ = 0;
var parser = this;
var inputBytes;
var pos = 0;
/**
* The length of the proto message being parsed.
* @private {number}
*/
this.length_ = 0;
if (input instanceof Uint8Array || input instanceof Array) {
inputBytes = input;
} else {
inputBytes = new Uint8Array(input);
}
/**
* Count of processed length bytes.
* @private {number}
*/
this.countLengthBytes_ = 0;
while (pos < inputBytes.length) {
switch (parser.state_) {
case Parser.State_.INVALID: {
parser.error_(inputBytes, pos, 'stream already broken');
break;
}
case Parser.State_.INIT: {
processFrameByte(inputBytes[pos]);
break;
}
case Parser.State_.LENGTH: {
processLengthByte(inputBytes[pos]);
break;
}
case Parser.State_.MESSAGE: {
processMessageByte(inputBytes[pos]);
break;
}
default: {
throw new Error('unexpected parser state: ' + parser.state_);
}
}
/**
* Raw bytes of the current message. Uses Uint8Array by default. Falls back to
* native array when Uint8Array is unsupported.
* @private {?Uint8Array|?Array<number>}
*/
this.messageBuffer_ = null;
parser.streamPos_++;
pos++;
}
/**
* Count of processed message bytes.
* @private {number}
*/
this.countMessageBytes_ = 0;
};
var msgs = parser.result_;
parser.result_ = [];
return msgs.length > 0 ? msgs : null;
/**
* @param {number} b A frame byte to process
*/
function processFrameByte(b) {
if (b == FrameType.DATA) {
parser.frame_ = b;
} else if (b == FrameType.TRAILER) {
parser.frame_ = b;
} else {
parser.error_(inputBytes, pos, 'invalid frame byte');
}
parser.state_ = Parser.State_.LENGTH;
parser.length_ = 0;
parser.countLengthBytes_ = 0;
}
/**
* @param {number} b A length byte to process
*/
function processLengthByte(b) {
parser.countLengthBytes_++;
parser.length_ = (parser.length_ << 8) + b;
if (parser.countLengthBytes_ == 4) { // no more length byte
parser.state_ = Parser.State_.MESSAGE;
parser.countMessageBytes_ = 0;
if (typeof Uint8Array !== 'undefined') {
parser.messageBuffer_ = new Uint8Array(parser.length_);
} else {
parser.messageBuffer_ = new Array(parser.length_);
}
if (parser.length_ == 0) { // empty message
finishMessage();
}
}
}
/**
* @param {number} b A message byte to process
*/
function processMessageByte(b) {
parser.messageBuffer_[parser.countMessageBytes_++] = b;
if (parser.countMessageBytes_ == parser.length_) {
finishMessage();
}
}
/**
* Finishes up building the current message and resets parser state
*/
function finishMessage() {
var message = {};
message[parser.frame_] = parser.messageBuffer_;
parser.result_.push(message);
parser.state_ = Parser.State_.INIT;
}
}
}
const Parser = GrpcWebStreamParser;
@ -133,29 +264,14 @@ Parser.State_ = {
* @enum {number}
*/
GrpcWebStreamParser.FrameType = {
DATA: 0x00, // expecting a data frame
TRAILER: 0x80, // expecting a trailer frame
DATA: 0x00, // expecting a data frame
TRAILER: 0x80, // expecting a trailer frame
};
var FrameType = GrpcWebStreamParser.FrameType;
/**
* @override
*/
GrpcWebStreamParser.prototype.isInputValid = function() {
return this.state_ != Parser.State_.INVALID;
};
/**
* @override
*/
GrpcWebStreamParser.prototype.getErrorMessage = function() {
return this.errorMessage_;
};
/**
* @param {!Uint8Array|!Array<number>} inputBytes The current input buffer
@ -174,119 +290,5 @@ Parser.prototype.error_ = function(inputBytes, pos, errorMsg) {
};
/**
* Parse the new input.
*
* Note that there is no Parser state to indicate the end of a stream.
*
* @param {string|!ArrayBuffer|!Uint8Array|!Array<number>} input The input data
* @throws {!Error} Throws an error message if the input is invalid.
* @return {?Array<string|!Object>} any parsed objects (atomic messages)
* in an array, or null if more data needs be read to parse any new object.
* @override
*/
GrpcWebStreamParser.prototype.parse = function(input) {
asserts.assert(input instanceof Array || input instanceof ArrayBuffer || input instanceof Uint8Array);
var parser = this;
var inputBytes;
var pos = 0;
if (input instanceof Uint8Array || input instanceof Array) {
inputBytes = input;
} else {
inputBytes = new Uint8Array(input);
}
while (pos < inputBytes.length) {
switch (parser.state_) {
case Parser.State_.INVALID: {
parser.error_(inputBytes, pos, 'stream already broken');
break;
}
case Parser.State_.INIT: {
processFrameByte(inputBytes[pos]);
break;
}
case Parser.State_.LENGTH: {
processLengthByte(inputBytes[pos]);
break;
}
case Parser.State_.MESSAGE: {
processMessageByte(inputBytes[pos]);
break;
}
default: { throw new Error('unexpected parser state: ' + parser.state_); }
}
parser.streamPos_++;
pos++;
}
var msgs = parser.result_;
parser.result_ = [];
return msgs.length > 0 ? msgs : null;
/**
* @param {number} b A frame byte to process
*/
function processFrameByte(b) {
if (b == FrameType.DATA) {
parser.frame_ = b;
} else if (b == FrameType.TRAILER) {
parser.frame_ = b;
} else {
parser.error_(inputBytes, pos, 'invalid frame byte');
}
parser.state_ = Parser.State_.LENGTH;
parser.length_ = 0;
parser.countLengthBytes_ = 0;
}
/**
* @param {number} b A length byte to process
*/
function processLengthByte(b) {
parser.countLengthBytes_++;
parser.length_ = (parser.length_ << 8) + b;
if (parser.countLengthBytes_ == 4) { // no more length byte
parser.state_ = Parser.State_.MESSAGE;
parser.countMessageBytes_ = 0;
if (typeof Uint8Array !== 'undefined') {
parser.messageBuffer_ = new Uint8Array(parser.length_);
} else {
parser.messageBuffer_ = new Array(parser.length_);
}
if (parser.length_ == 0) { // empty message
finishMessage();
}
}
}
/**
* @param {number} b A message byte to process
*/
function processMessageByte(b) {
parser.messageBuffer_[parser.countMessageBytes_++] = b;
if (parser.countMessageBytes_ == parser.length_) {
finishMessage();
}
}
/**
* Finishes up building the current message and resets parser state
*/
function finishMessage() {
var message = {};
message[parser.frame_] = parser.messageBuffer_;
parser.result_.push(message);
parser.state_ = Parser.State_.INIT;
}
};
exports = GrpcWebStreamParser;

View File

@ -13,44 +13,46 @@ const MethodType = goog.require('grpc.web.MethodType');
const Request = goog.require('grpc.web.Request');
const RequestInternal = goog.require('grpc.web.RequestInternal');
/**
* @constructor
* @struct
* @template REQUEST, RESPONSE
* @param {string} name
* @param {!MethodType} methodType
* @param {function(new: REQUEST, ...)} requestType
* @param {function(new: RESPONSE, ...)} responseType
* @param {function(REQUEST): ?} requestSerializeFn
* @param {function(?): RESPONSE} responseDeserializeFn
*/
const MethodDescriptor = function(
name, methodType, requestType, responseType, requestSerializeFn,
responseDeserializeFn) {
/** @const */
this.name = name;
/** @const */
this.methodType = methodType;
/** @const */
this.requestType = requestType;
/** @const */
this.responseType = responseType;
/** @const */
this.requestSerializeFn = requestSerializeFn;
/** @const */
this.responseDeserializeFn = responseDeserializeFn;
};
/** @template REQUEST, RESPONSE */
class MethodDescriptor {
/**
* @param {string} name
* @param {!MethodType} methodType
* @param {function(new: REQUEST, ...)} requestType
* @param {function(new: RESPONSE, ...)} responseType
* @param {function(REQUEST): ?} requestSerializeFn
* @param {function(?): RESPONSE} responseDeserializeFn
*/
constructor(
name, methodType, requestType, responseType, requestSerializeFn,
responseDeserializeFn) {
/** @const */
this.name = name;
/** @const */
this.methodType = methodType;
/** @const */
this.requestType = requestType;
/** @const */
this.responseType = responseType;
/** @const */
this.requestSerializeFn = requestSerializeFn;
/** @const */
this.responseDeserializeFn = responseDeserializeFn;
}
/**
* @template REQUEST, RESPONSE
* @param {REQUEST} requestMessage
* @param {!Metadata=} metadata
* @param {!CallOptions=} callOptions
* @return {!Request<REQUEST, RESPONSE>}
*/
createRequest(
requestMessage, metadata = {}, callOptions = new CallOptions()) {
return new RequestInternal(requestMessage, this, metadata, callOptions);
}
}
/**
* @template REQUEST, RESPONSE
* @param {REQUEST} requestMessage
* @param {!Metadata=} metadata
* @param {!CallOptions=} callOptions
* @return {!Request<REQUEST, RESPONSE>}
*/
MethodDescriptor.prototype.createRequest = function(
requestMessage, metadata = {}, callOptions = new CallOptions()) {
return new RequestInternal(requestMessage, this, metadata, callOptions);
};
exports = MethodDescriptor;

View File

@ -44,256 +44,247 @@ const {Status} = goog.require('grpc.web.Status');
/**
* A stream that the client can read from. Used for calls that are streaming
* from the server side.
*
* @template RESPONSE
* @constructor
* @implements {ClientReadableStream}
* @final
* @param {!GenericTransportInterface} genericTransportInterface The
* GenericTransportInterface
* @unrestricted
*/
const StreamBodyClientReadableStream = function(genericTransportInterface) {
class StreamBodyClientReadableStream {
/**
* @const
* @private
* @type {?NodeReadableStream|undefined} The XHR Node Readable Stream
* @param {!GenericTransportInterface} genericTransportInterface The
* GenericTransportInterface
*/
this.xhrNodeReadableStream_ = genericTransportInterface.nodeReadableStream;
constructor(genericTransportInterface) {
/**
* @const
* @private
* @type {?NodeReadableStream|undefined} The XHR Node Readable Stream
*/
this.xhrNodeReadableStream_ = genericTransportInterface.nodeReadableStream;
/**
* @private
* @type {function(?): RESPONSE|null} The deserialize function for the proto
*/
this.responseDeserializeFn_ = null;
/**
* @const
* @private
* @type {?XhrIo|undefined} The XhrIo object
*/
this.xhr_ = genericTransportInterface.xhr;
/**
* @const
* @private
* @type {!Array<function(RESPONSE)>} The list of data callback
*/
this.onDataCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(!Status)>} The list of status callback
*/
this.onStatusCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(...):?>} The list of stream end callback
*/
this.onEndCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(...):?>} The list of error callback
*/
this.onErrorCallbacks_ = [];
/**
* @private
* @type {function(?):!Status|null}
* A function to parse the Rpc Status response
*/
this.rpcStatusParseFn_ = null;
this.setStreamCallback_();
}
/**
* @private
* @type {function(?): RESPONSE|null} The deserialize function for the proto
*/
this.responseDeserializeFn_ = null;
/**
* @const
* @private
* @type {?XhrIo|undefined} The XhrIo object
*/
this.xhr_ = genericTransportInterface.xhr;
/**
* @const
* @private
* @type {!Array<function(RESPONSE)>} The list of data callback
*/
this.onDataCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(!Status)>} The list of status callback
*/
this.onStatusCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(...):?>} The list of stream end callback
*/
this.onEndCallbacks_ = [];
/**
* @const
* @private
* @type {!Array<function(...):?>} The list of error callback
*/
this.onErrorCallbacks_ = [];
/**
* @private
* @type {function(?):!Status|null}
* A function to parse the Rpc Status response
*/
this.rpcStatusParseFn_ = null;
this.setStreamCallback_();
};
/**
* @private
*/
StreamBodyClientReadableStream.prototype.setStreamCallback_ = function() {
// Add the callback to the underlying stream
var self = this;
this.xhrNodeReadableStream_.on('data', function(data) {
if ('1' in data) {
var response = self.responseDeserializeFn_(data['1']);
self.sendDataCallbacks_(response);
}
if ('2' in data) {
var status = self.rpcStatusParseFn_(data['2']);
self.sendStatusCallbacks_(status);
}
});
this.xhrNodeReadableStream_.on('end', function() {
self.sendEndCallbacks_();
});
this.xhrNodeReadableStream_.on('error', function() {
if (self.onErrorCallbacks_.length == 0) return;
var lastErrorCode = self.xhr_.getLastErrorCode();
if (lastErrorCode === ErrorCode.NO_ERROR && !self.xhr_.isSuccess()) {
// The lastErrorCode on the XHR isn't useful in this case, but the XHR
// status is. Full details about the failure should be available in the
// status handler.
lastErrorCode = ErrorCode.HTTP_ERROR;
}
var grpcStatusCode;
switch (lastErrorCode) {
case ErrorCode.NO_ERROR:
grpcStatusCode = StatusCode.UNKNOWN;
break;
case ErrorCode.ABORT:
grpcStatusCode = StatusCode.ABORTED;
break;
case ErrorCode.TIMEOUT:
grpcStatusCode = StatusCode.DEADLINE_EXCEEDED;
break;
case ErrorCode.HTTP_ERROR:
grpcStatusCode = StatusCode.fromHttpStatus(self.xhr_.getStatus());
break;
default:
grpcStatusCode = StatusCode.UNAVAILABLE;
}
self.sendErrorCallbacks_({
code: grpcStatusCode,
// TODO(armiller): get the message from the response?
// GoogleRpcStatus.deserialize(rawResponse).getMessage()?
// Perhaps do the same status logic as in on('data') above?
message: ErrorCode.getDebugMessage(lastErrorCode)
setStreamCallback_() {
// Add the callback to the underlying stream
var self = this;
this.xhrNodeReadableStream_.on('data', function(data) {
if ('1' in data) {
var response = self.responseDeserializeFn_(data['1']);
self.sendDataCallbacks_(response);
}
if ('2' in data) {
var status = self.rpcStatusParseFn_(data['2']);
self.sendStatusCallbacks_(status);
}
});
});
};
this.xhrNodeReadableStream_.on('end', function() {
self.sendEndCallbacks_();
});
this.xhrNodeReadableStream_.on('error', function() {
if (self.onErrorCallbacks_.length == 0) return;
var lastErrorCode = self.xhr_.getLastErrorCode();
if (lastErrorCode === ErrorCode.NO_ERROR && !self.xhr_.isSuccess()) {
// The lastErrorCode on the XHR isn't useful in this case, but the XHR
// status is. Full details about the failure should be available in the
// status handler.
lastErrorCode = ErrorCode.HTTP_ERROR;
}
/**
* @override
* @export
*/
StreamBodyClientReadableStream.prototype.on = function(
eventType, callback) {
// TODO(stanleycheung): change eventType to @enum type
if (eventType == 'data') {
this.onDataCallbacks_.push(callback);
} else if (eventType == 'status') {
this.onStatusCallbacks_.push(callback);
} else if (eventType == 'end') {
this.onEndCallbacks_.push(callback);
} else if (eventType == 'error') {
this.onErrorCallbacks_.push(callback);
var grpcStatusCode;
switch (lastErrorCode) {
case ErrorCode.NO_ERROR:
grpcStatusCode = StatusCode.UNKNOWN;
break;
case ErrorCode.ABORT:
grpcStatusCode = StatusCode.ABORTED;
break;
case ErrorCode.TIMEOUT:
grpcStatusCode = StatusCode.DEADLINE_EXCEEDED;
break;
case ErrorCode.HTTP_ERROR:
grpcStatusCode = StatusCode.fromHttpStatus(self.xhr_.getStatus());
break;
default:
grpcStatusCode = StatusCode.UNAVAILABLE;
}
self.sendErrorCallbacks_({
code: grpcStatusCode,
// TODO(armiller): get the message from the response?
// GoogleRpcStatus.deserialize(rawResponse).getMessage()?
// Perhaps do the same status logic as in on('data') above?
message: ErrorCode.getDebugMessage(lastErrorCode)
});
});
}
return this;
};
/**
* @private
* @param {!Array<function(?)>} callbacks the internal list of callbacks
* @param {function(?)} callback the callback to remove
*/
StreamBodyClientReadableStream.prototype.removeListenerFromCallbacks_ = function(
callbacks, callback) {
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
/**
* @override
* @export
*/
on(eventType, callback) {
// TODO(stanleycheung): change eventType to @enum type
if (eventType == 'data') {
this.onDataCallbacks_.push(callback);
} else if (eventType == 'status') {
this.onStatusCallbacks_.push(callback);
} else if (eventType == 'end') {
this.onEndCallbacks_.push(callback);
} else if (eventType == 'error') {
this.onErrorCallbacks_.push(callback);
}
return this;
}
};
/**
* @export
* @override
*/
StreamBodyClientReadableStream.prototype.removeListener = function(
eventType, callback) {
if (eventType == 'data') {
this.removeListenerFromCallbacks_(this.onDataCallbacks_, callback);
} else if (eventType == 'status') {
this.removeListenerFromCallbacks_(this.onStatusCallbacks_, callback);
} else if (eventType == 'end') {
this.removeListenerFromCallbacks_(this.onEndCallbacks_, callback);
} else if (eventType == 'error') {
this.removeListenerFromCallbacks_(this.onErrorCallbacks_, callback);
/**
* @private
* @param {!Array<function(?)>} callbacks the internal list of callbacks
* @param {function(?)} callback the callback to remove
*/
removeListenerFromCallbacks_(callbacks, callback) {
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
}
}
return this;
};
/**
* Register a callbackl to parse the response
*
* @param {function(?): RESPONSE} responseDeserializeFn The deserialize
* function for the proto
*/
StreamBodyClientReadableStream.prototype.setResponseDeserializeFn =
function(responseDeserializeFn) {
this.responseDeserializeFn_ = responseDeserializeFn;
};
/**
* Register a function to parse RPC status response
*
* @param {function(?):!Status} rpcStatusParseFn A function to parse
* the RPC status response
*/
StreamBodyClientReadableStream.prototype.setRpcStatusParseFn = function(rpcStatusParseFn) {
this.rpcStatusParseFn_ = rpcStatusParseFn;
};
/**
* @override
* @export
*/
StreamBodyClientReadableStream.prototype.cancel = function() {
this.xhr_.abort();
};
/**
* @private
* @param {!RESPONSE} data The data to send back
*/
StreamBodyClientReadableStream.prototype.sendDataCallbacks_ = function(data) {
for (var i = 0; i < this.onDataCallbacks_.length; i++) {
this.onDataCallbacks_[i](data);
/**
* @export
* @override
*/
removeListener(eventType, callback) {
if (eventType == 'data') {
this.removeListenerFromCallbacks_(this.onDataCallbacks_, callback);
} else if (eventType == 'status') {
this.removeListenerFromCallbacks_(this.onStatusCallbacks_, callback);
} else if (eventType == 'end') {
this.removeListenerFromCallbacks_(this.onEndCallbacks_, callback);
} else if (eventType == 'error') {
this.removeListenerFromCallbacks_(this.onErrorCallbacks_, callback);
}
return this;
}
};
/**
* @private
* @param {!Status} status The status to send back
*/
StreamBodyClientReadableStream.prototype.sendStatusCallbacks_ = function(status) {
for (var i = 0; i < this.onStatusCallbacks_.length; i++) {
this.onStatusCallbacks_[i](status);
/**
* Register a callbackl to parse the response
*
* @param {function(?): RESPONSE} responseDeserializeFn The deserialize
* function for the proto
*/
setResponseDeserializeFn(responseDeserializeFn) {
this.responseDeserializeFn_ = responseDeserializeFn;
}
};
/**
* @private
* @param {?} error The error to send back
*/
StreamBodyClientReadableStream.prototype.sendErrorCallbacks_ = function(error) {
for (var i = 0; i < this.onErrorCallbacks_.length; i++) {
this.onErrorCallbacks_[i](error);
/**
* Register a function to parse RPC status response
*
* @param {function(?):!Status} rpcStatusParseFn A function to parse
* the RPC status response
*/
setRpcStatusParseFn(rpcStatusParseFn) {
this.rpcStatusParseFn_ = rpcStatusParseFn;
}
};
/**
* @private
*/
StreamBodyClientReadableStream.prototype.sendEndCallbacks_ = function() {
for (var i = 0; i < this.onEndCallbacks_.length; i++) {
this.onEndCallbacks_[i]();
/**
* @override
* @export
*/
cancel() {
this.xhr_.abort();
}
};
/**
* @private
* @param {!RESPONSE} data The data to send back
*/
sendDataCallbacks_(data) {
for (var i = 0; i < this.onDataCallbacks_.length; i++) {
this.onDataCallbacks_[i](data);
}
}
/**
* @private
* @param {!Status} status The status to send back
*/
sendStatusCallbacks_(status) {
for (var i = 0; i < this.onStatusCallbacks_.length; i++) {
this.onStatusCallbacks_[i](status);
}
}
/**
* @private
* @param {?} error The error to send back
*/
sendErrorCallbacks_(error) {
for (var i = 0; i < this.onErrorCallbacks_.length; i++) {
this.onErrorCallbacks_[i](error);
}
}
/**
* @private
*/
sendEndCallbacks_() {
for (var i = 0; i < this.onEndCallbacks_.length; i++) {
this.onEndCallbacks_[i]();
}
}
}
exports = StreamBodyClientReadableStream;