Implement client interceptors for grpc-native-core

A NodeJS implementation of client-side interceptors, as described in
the proposal: https://github.com/grpc/proposal/pull/14
This commit is contained in:
David Vroom Duke 2018-02-12 17:16:24 -08:00
parent 5674367e36
commit 5bae250077
7 changed files with 3496 additions and 346 deletions

View File

@ -223,6 +223,8 @@ exports.writeFlags = constants.writeFlags;
exports.logVerbosity = constants.logVerbosity;
exports.methodTypes = constants.methodTypes;
exports.credentials = require('./src/credentials.js');
/**
@ -266,6 +268,11 @@ exports.getClientChannel = client.getClientChannel;
exports.waitForClientReady = client.waitForClientReady;
exports.StatusBuilder = client.StatusBuilder;
exports.ListenerBuilder = client.ListenerBuilder;
exports.RequesterBuilder = client.RequesterBuilder;
exports.InterceptingCall = client.InterceptingCall;
/**
* @memberof grpc
* @alias grpc.closeClient

View File

@ -34,6 +34,7 @@
var _ = require('lodash');
var client_interceptors = require('./client_interceptors');
var grpc = require('./grpc_extension');
var common = require('./common');
@ -49,25 +50,10 @@ var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;
var Duplex = stream.Duplex;
var methodTypes = constants.methodTypes;
var util = require('util');
var version = require('../package.json').version;
/**
* Create an Error object from a status object
* @private
* @param {grpc~StatusObject} status The status object
* @return {Error} The resulting Error
*/
function createStatusError(status) {
let statusName = _.invert(constants.status)[status.code];
let message = `${status.code} ${statusName}: ${status.details}`;
let error = new Error(message);
error.code = status.code;
error.metadata = status.metadata;
error.details = status.details;
return error;
}
/**
* Initial response metadata sent by the server when it starts processing the
* call
@ -107,18 +93,15 @@ util.inherits(ClientWritableStream, Writable);
* grpc~ClientWritableStream#metadata
* @borrows grpc~ClientUnaryCall#event:status as
* grpc~ClientWritableStream#status
* @param {grpc.internal~Call} call The call object to send data with
* @param {grpc~serialize=} [serialize=identity] Serialization
* function for writes.
* @param {InterceptingCall} call Exposes gRPC request operations, processed by
* an interceptor stack.
*/
function ClientWritableStream(call, serialize) {
function ClientWritableStream(call) {
Writable.call(this, {objectMode: true});
this.call = call;
this.serialize = common.wrapIgnoreNull(serialize);
var self = this;
this.on('finish', function() {
var batch = {};
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(batch, function() {});
self.call.halfClose();
});
}
@ -145,8 +128,6 @@ function ClientWritableStream(call, serialize) {
*/
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
var message;
var self = this;
if (this.writeFailed) {
/* Once a write fails, just call the callback immediately to let the caller
@ -154,26 +135,7 @@ function _write(chunk, encoding, callback) {
setImmediate(callback);
return;
}
try {
message = this.serialize(chunk);
} catch (e) {
/* Sending this error to the server and emitting it immediately on the
client may put the call in a slightly weird state on the client side,
but passing an object that causes a serialization failure is a misuse
of the API anyway, so that's OK. The primary purpose here is to give the
programmer a useful error and to stop the stream properly */
this.call.cancelWithStatus(constants.status.INTERNAL,
'Serialization failure');
callback(e);
return;
}
if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we
* can get to checking that it is valid flags */
message.grpcWriteFlags = encoding;
}
batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, event) {
var outerCallback = function(err, event) {
if (err) {
/* Assume that the call is complete and that writing failed because a
status was received. In that case, set a flag to discard all future
@ -181,7 +143,12 @@ function _write(chunk, encoding, callback) {
self.writeFailed = true;
}
callback();
});
};
var context = {
encoding: encoding,
callback: outerCallback
};
this.call.sendMessageWithContext(context, chunk);
}
ClientWritableStream.prototype._write = _write;
@ -199,16 +166,14 @@ util.inherits(ClientReadableStream, Readable);
* grpc~ClientReadableStream#metadata
* @borrows grpc~ClientUnaryCall#event:status as
* grpc~ClientReadableStream#status
* @param {grpc.internal~Call} call The call object to read data with
* @param {grpc~deserialize=} [deserialize=identity]
* Deserialization function for reads
* @param {InterceptingCall} call Exposes gRPC request operations, processed by
* an interceptor stack.
*/
function ClientReadableStream(call, deserialize) {
function ClientReadableStream(call) {
Readable.call(this, {objectMode: true});
this.call = call;
this.finished = false;
this.reading = false;
this.deserialize = common.wrapIgnoreNull(deserialize);
/* Status generated from reading messages from the server. Overrides the
* status from the server if not OK */
this.read_status = null;
@ -267,7 +232,7 @@ function _emitStatusIfDone() {
if (status.code === constants.status.OK) {
this.push(null);
} else {
var error = createStatusError(status);
var error = common.createStatusError(status);
this.emit('error', error);
}
this.emit('status', status);
@ -283,48 +248,15 @@ ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone;
*/
function _read(size) {
/* jshint validthis: true */
var self = this;
/**
* Callback to be called when a READ event is received. Pushes the data onto
* the read queue and starts reading again if applicable
* @param {grpc.Event} event READ event object
*/
function readCallback(err, event) {
if (err) {
// Something has gone wrong. Stop reading and wait for status
self.finished = true;
self._readsDone();
return;
}
var data = event.read;
var deserialized;
try {
deserialized = self.deserialize(data);
} catch (e) {
self._readsDone({code: constants.status.INTERNAL,
details: 'Failed to parse server response'});
return;
}
if (data === null) {
self._readsDone();
return;
}
if (self.push(deserialized) && data !== null) {
var read_batch = {};
read_batch[grpc.opType.RECV_MESSAGE] = true;
self.call.startBatch(read_batch, readCallback);
} else {
self.reading = false;
}
}
if (self.finished) {
self.push(null);
if (this.finished) {
this.push(null);
} else {
if (!self.reading) {
self.reading = true;
var read_batch = {};
read_batch[grpc.opType.RECV_MESSAGE] = true;
self.call.startBatch(read_batch, readCallback);
if (!this.reading) {
this.reading = true;
var context = {
stream: this
};
this.call.recvMessageWithContext(context);
}
}
}
@ -345,26 +277,20 @@ util.inherits(ClientDuplexStream, Duplex);
* grpc~ClientDuplexStream#metadata
* @borrows grpc~ClientUnaryCall#event:status as
* grpc~ClientDuplexStream#status
* @param {grpc.internal~Call} call Call object to proxy
* @param {grpc~serialize=} [serialize=identity] Serialization
* function for requests
* @param {grpc~deserialize=} [deserialize=identity]
* Deserialization function for responses
* @param {InterceptingCall} call Exposes gRPC request operations, processed by
* an interceptor stack.
*/
function ClientDuplexStream(call, serialize, deserialize) {
function ClientDuplexStream(call) {
Duplex.call(this, {objectMode: true});
this.serialize = common.wrapIgnoreNull(serialize);
this.deserialize = common.wrapIgnoreNull(deserialize);
this.call = call;
/* Status generated from reading messages from the server. Overrides the
* status from the server if not OK */
this.read_status = null;
/* Status received from the server. */
this.received_status = null;
var self = this;
this.on('finish', function() {
var batch = {};
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(batch, function() {});
self.call.halfClose();
});
}
@ -429,45 +355,17 @@ ClientDuplexStream.prototype.getPeer = getPeer;
* should be used to make this particular call.
*/
/**
* Get a call object built with the provided options.
* @access private
* @param {grpc.Client~CallOptions=} options Options object.
*/
function getCall(channel, method, options) {
var deadline;
var host;
var parent;
var propagate_flags;
var credentials;
if (options) {
deadline = options.deadline;
host = options.host;
parent = _.get(options, 'parent.call');
propagate_flags = options.propagate_flags;
credentials = options.credentials;
}
if (deadline === undefined) {
deadline = Infinity;
}
var call = new grpc.Call(channel, method, deadline, host,
parent, propagate_flags);
if (credentials) {
call.setCredentials(credentials);
}
return call;
}
/**
* A generic gRPC client. Primarily useful as a base class for generated clients
* @memberof grpc
* @constructor
* @param {string} address Server address to connect to
* @param {grpc.credentials~ChannelCredentials} credentials Credentials to use to connect to
* the server
* @param {grpc.credentials~ChannelCredentials} credentials Credentials to use
* to connect to the server
* @param {Object} options Options to apply to channel creation
*/
function Client(address, credentials, options) {
var self = this;
if (!options) {
options = {};
}
@ -480,9 +378,27 @@ function Client(address, credentials, options) {
options['grpc.primary_user_agent'] = '';
}
options['grpc.primary_user_agent'] += 'grpc-node/' + version;
// Resolve interceptor options and assign interceptors to each method
var interceptor_providers = options.interceptor_providers || [];
var interceptors = options.interceptors || [];
if (interceptor_providers.length && interceptors.length) {
throw new client_interceptors.InterceptorConfigurationError(
'Both interceptors and interceptor_providers were passed as options ' +
'to the client constructor. Only one of these is allowed.');
}
_.each(self.$method_definitions, function(method_definition, method_name) {
self[method_name].interceptors = client_interceptors
.resolveInterceptorProviders(interceptor_providers, method_definition)
.concat(interceptors);
});
// Exclude interceptor options which have already been consumed
var channel_options = _.omit(options,
['interceptors', 'interceptor_providers']);
/* Private fields use $ as a prefix instead of _ because it is an invalid
* prefix of a method name */
this.$channel = new grpc.Channel(address, credentials, options);
this.$channel = new grpc.Channel(address, credentials, channel_options);
}
exports.Client = Client;
@ -497,7 +413,7 @@ exports.Client = Client;
/**
* Make a unary request to the given method, using the given serialize
* and deserialize functions, with the given argument.
* @param {string} method The name of the method to request
* @param {string} path The path of the method to request
* @param {grpc~serialize} serialize The serialization function for
* inputs
* @param {grpc~deserialize} deserialize The deserialization
@ -506,11 +422,11 @@ exports.Client = Client;
* serialize
* @param {grpc.Metadata=} metadata Metadata to add to the call
* @param {grpc.Client~CallOptions=} options Options map
* @param {grpc.Client~requestCallback} callback The callback to
* @param {grpc.Client~requestCallback} callback The callback
* for when the response is received
* @return {grpc~ClientUnaryCall} An event emitter for stream related events
*/
Client.prototype.makeUnaryRequest = function(method, serialize, deserialize,
Client.prototype.makeUnaryRequest = function(path, serialize, deserialize,
argument, metadata, options,
callback) {
if (options instanceof Function) {
@ -533,67 +449,50 @@ Client.prototype.makeUnaryRequest = function(method, serialize, deserialize,
options = {};
}
if (!((metadata instanceof Metadata) &&
(options instanceof Object) &&
(callback instanceof Function))) {
throw new Error("Argument mismatch in makeUnaryRequest");
}
var call = getCall(this.$channel, method, options);
var emitter = new ClientUnaryCall(call);
metadata = metadata.clone();
var client_batch = {};
var message = serialize(argument);
if (options) {
message.grpcWriteFlags = options.flags;
(options instanceof Object) &&
(callback instanceof Function))) {
throw new Error('Argument mismatch in makeUnaryRequest');
}
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var error;
var deserialized;
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
if (status.code === constants.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: constants.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
}
if (status.code !== constants.status.OK) {
error = new createStatusError(status);
callback(error);
} else {
callback(null, deserialized);
}
emitter.emit('status', status);
});
var method_name = this.$method_names[path];
var constructor_interceptors = this[method_name] ?
this[method_name].interceptors :
null;
var method_definition = options.method_definition = {
path: path,
requestStream: false,
responseStream: false,
requestSerialize: serialize,
responseDeserialize: deserialize
};
metadata = metadata.clone();
var intercepting_call = client_interceptors.getInterceptingCall(
method_definition,
options,
constructor_interceptors,
this.$channel,
callback
);
var emitter = new ClientUnaryCall(intercepting_call);
var last_listener = client_interceptors.getLastListener(
method_definition,
emitter,
callback
);
intercepting_call.start(metadata, last_listener);
intercepting_call.sendMessage(argument);
intercepting_call.halfClose();
return emitter;
};
/**
* Make a client stream request to the given method, using the given serialize
* and deserialize functions, with the given argument.
* @param {string} method The name of the method to request
* @param {string} path The path of the method to request
* @param {grpc~serialize} serialize The serialization function for
* inputs
* @param {grpc~deserialize} deserialize The deserialization
@ -601,14 +500,14 @@ Client.prototype.makeUnaryRequest = function(method, serialize, deserialize,
* @param {grpc.Metadata=} metadata Array of metadata key/value pairs to add to
* the call
* @param {grpc.Client~CallOptions=} options Options map
* @param {grpc.Client~requestCallback} callback The callback to for when the
* @param {grpc.Client~requestCallback} callback The callback for when the
* response is received
* @return {grpc~ClientWritableStream} An event emitter for stream related
* events
*/
Client.prototype.makeClientStreamRequest = function(method, serialize,
deserialize, metadata,
options, callback) {
Client.prototype.makeClientStreamRequest = function(path, serialize,
deserialize, metadata,
options, callback) {
if (options instanceof Function) {
callback = options;
if (metadata instanceof Metadata) {
@ -629,68 +528,48 @@ Client.prototype.makeClientStreamRequest = function(method, serialize,
options = {};
}
if (!((metadata instanceof Metadata) &&
(options instanceof Object) &&
(callback instanceof Function))) {
throw new Error("Argument mismatch in makeClientStreamRequest");
(options instanceof Object) &&
(callback instanceof Function))) {
throw new Error('Argument mismatch in makeClientStreamRequest');
}
var call = getCall(this.$channel, method, options);
var method_name = this.$method_names[path];
var constructor_interceptors = this[method_name] ?
this[method_name].interceptors :
null;
var method_definition = options.method_definition = {
path: path,
requestStream: true,
responseStream: false,
requestSerialize: serialize,
responseDeserialize: deserialize
};
metadata = metadata.clone();
var stream = new ClientWritableStream(call, serialize);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var client_batch = {};
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var error;
var deserialized;
if (status.code === constants.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: constants.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
}
if (status.code !== constants.status.OK) {
error = createStatusError(status);
callback(error);
} else {
callback(null, deserialized);
}
stream.emit('status', status);
});
return stream;
var intercepting_call = client_interceptors.getInterceptingCall(
method_definition,
options,
constructor_interceptors,
this.$channel,
callback
);
var emitter = new ClientWritableStream(intercepting_call);
var last_listener = client_interceptors.getLastListener(
method_definition,
emitter,
callback
);
intercepting_call.start(metadata, last_listener);
return emitter;
};
/**
* Make a server stream request to the given method, with the given serialize
* and deserialize function, using the given argument
* @param {string} method The name of the method to request
* @param {string} path The path of the method to request
* @param {grpc~serialize} serialize The serialization function for inputs
* @param {grpc~deserialize} deserialize The deserialization
* function for outputs
@ -702,7 +581,7 @@ Client.prototype.makeClientStreamRequest = function(method, serialize,
* @return {grpc~ClientReadableStream} An event emitter for stream related
* events
*/
Client.prototype.makeServerStreamRequest = function(method, serialize,
Client.prototype.makeServerStreamRequest = function(path, serialize,
deserialize, argument,
metadata, options) {
if (!(metadata instanceof Metadata)) {
@ -713,48 +592,47 @@ Client.prototype.makeServerStreamRequest = function(method, serialize,
options = {};
}
if (!((metadata instanceof Metadata) && (options instanceof Object))) {
throw new Error("Argument mismatch in makeServerStreamRequest");
throw new Error('Argument mismatch in makeServerStreamRequest');
}
var call = getCall(this.$channel, method, options);
metadata = metadata.clone();
var stream = new ClientReadableStream(call, deserialize);
var start_batch = {};
var message = serialize(argument);
if (options) {
message.grpcWriteFlags = options.flags;
}
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) {
stream.emit('error', err);
return;
}
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream._receiveStatus(response.status);
});
return stream;
};
var method_name = this.$method_names[path];
var constructor_interceptors = this[method_name] ?
this[method_name].interceptors :
null;
var method_definition = options.method_definition = {
path: path,
requestStream: false,
responseStream: true,
requestSerialize: serialize,
responseDeserialize: deserialize
};
metadata = metadata.clone();
var emitter = new ClientReadableStream();
var intercepting_call = client_interceptors.getInterceptingCall(
method_definition,
options,
constructor_interceptors,
this.$channel,
emitter
);
emitter.call = intercepting_call;
var last_listener = client_interceptors.getLastListener(
method_definition,
emitter
);
intercepting_call.start(metadata, last_listener);
intercepting_call.sendMessage(argument);
intercepting_call.halfClose();
return emitter;
};
/**
* Make a bidirectional stream request with this method on the given channel.
* @param {string} method The name of the method to request
* @param {string} path The path of the method to request
* @param {grpc~serialize} serialize The serialization function for inputs
* @param {grpc~deserialize} deserialize The deserialization
* function for outputs
@ -763,7 +641,7 @@ Client.prototype.makeServerStreamRequest = function(method, serialize,
* @param {grpc.Client~CallOptions=} options Options map
* @return {grpc~ClientDuplexStream} An event emitter for stream related events
*/
Client.prototype.makeBidiStreamRequest = function(method, serialize,
Client.prototype.makeBidiStreamRequest = function(path, serialize,
deserialize, metadata,
options) {
if (!(metadata instanceof Metadata)) {
@ -774,36 +652,40 @@ Client.prototype.makeBidiStreamRequest = function(method, serialize,
options = {};
}
if (!((metadata instanceof Metadata) && (options instanceof Object))) {
throw new Error("Argument mismatch in makeBidiStreamRequest");
throw new Error('Argument mismatch in makeBidiStreamRequest');
}
var call = getCall(this.$channel, method, options);
var method_name = this.$method_names[path];
var constructor_interceptors = this[method_name] ?
this[method_name].interceptors :
null;
var method_definition = options.method_definition = {
path: path,
requestStream: true,
responseStream: true,
requestSerialize: serialize,
responseDeserialize: deserialize
};
metadata = metadata.clone();
var stream = new ClientDuplexStream(call, serialize, deserialize);
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) {
stream.emit('error', err);
return;
}
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream._receiveStatus(response.status);
});
return stream;
var emitter = new ClientDuplexStream();
var intercepting_call = client_interceptors.getInterceptingCall(
method_definition,
options,
constructor_interceptors,
this.$channel,
emitter
);
emitter.call = intercepting_call;
var last_listener = client_interceptors.getLastListener(
method_definition,
emitter
);
intercepting_call.start(metadata, last_listener);
return emitter;
};
/**
@ -859,10 +741,10 @@ Client.prototype.waitForReady = function(deadline, callback) {
* @private
*/
var requester_funcs = {
unary: Client.prototype.makeUnaryRequest,
server_stream: Client.prototype.makeServerStreamRequest,
client_stream: Client.prototype.makeClientStreamRequest,
bidi: Client.prototype.makeBidiStreamRequest
[methodTypes.UNARY]: Client.prototype.makeUnaryRequest,
[methodTypes.CLIENT_STREAMING]: Client.prototype.makeClientStreamRequest,
[methodTypes.SERVER_STREAMING]: Client.prototype.makeServerStreamRequest,
[methodTypes.BIDI_STREAMING]: Client.prototype.makeBidiStreamRequest
};
function getDefaultValues(metadata, options) {
@ -878,7 +760,7 @@ function getDefaultValues(metadata, options) {
* @access private
*/
var deprecated_request_wrap = {
unary: function(makeUnaryRequest) {
[methodTypes.UNARY]: function(makeUnaryRequest) {
return function makeWrappedUnaryRequest(argument, callback,
metadata, options) {
/* jshint validthis: true */
@ -887,7 +769,7 @@ var deprecated_request_wrap = {
opt_args.options, callback);
};
},
client_stream: function(makeServerStreamRequest) {
[methodTypes.CLIENT_STREAMING]: function(makeServerStreamRequest) {
return function makeWrappedClientStreamRequest(callback, metadata,
options) {
/* jshint validthis: true */
@ -896,8 +778,8 @@ var deprecated_request_wrap = {
opt_args.options, callback);
};
},
server_stream: _.identity,
bidi: _.identity
[methodTypes.SERVER_STREAMING]: _.identity,
[methodTypes.BIDI_STREAMING]: _.identity
};
/**
@ -932,38 +814,29 @@ exports.makeClientConstructor = function(methods, serviceName,
}
util.inherits(ServiceClient, Client);
ServiceClient.prototype.$method_definitions = methods;
ServiceClient.prototype.$method_names = {};
_.each(methods, function(attrs, name) {
var method_type;
if (_.startsWith(name, '$')) {
throw new Error('Method names cannot start with $');
}
if (attrs.requestStream) {
if (attrs.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (attrs.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
var serialize = attrs.requestSerialize;
var deserialize = attrs.responseDeserialize;
var method_type = common.getMethodType(attrs);
var method_func = _.partial(requester_funcs[method_type], attrs.path,
serialize, deserialize);
attrs.requestSerialize,
attrs.responseDeserialize);
if (class_options.deprecatedArgumentOrder) {
ServiceClient.prototype[name] = deprecated_request_wrap(method_func);
ServiceClient.prototype[name] =
deprecated_request_wrap[method_type](method_func);
} else {
ServiceClient.prototype[name] = method_func;
}
ServiceClient.prototype.$method_names[attrs.path] = name;
// Associate all provided attributes with the method
_.assign(ServiceClient.prototype[name], attrs);
if (attrs.originalName) {
ServiceClient.prototype[attrs.originalName] = ServiceClient.prototype[name];
ServiceClient.prototype[attrs.originalName] =
ServiceClient.prototype[name];
}
});
@ -984,6 +857,17 @@ exports.getClientChannel = function(client) {
return Client.prototype.getChannel.call(client);
};
/**
* Gets a map of client method names to interceptor stacks.
* @param {grpc.Client} client
* @returns {Object.<string, Interceptor[]>}
*/
exports.getClientInterceptors = function(client) {
return _.mapValues(client.$method_definitions, function(def, name) {
return client[name].interceptors;
});
};
/**
* Wait for the client to be ready. The callback will be called when the
* client has successfully connected to the server, and it will be called
@ -1002,3 +886,8 @@ exports.getClientChannel = function(client) {
exports.waitForClientReady = function(client, deadline, callback) {
Client.prototype.waitForReady.call(client, deadline, callback);
};
exports.StatusBuilder = client_interceptors.StatusBuilder;
exports.ListenerBuilder = client_interceptors.ListenerBuilder;
exports.RequesterBuilder = client_interceptors.RequesterBuilder;
exports.InterceptingCall = client_interceptors.InterceptingCall;

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,8 @@
'use strict';
var _ = require('lodash');
var constants = require('./constants');
var grpc = require('./grpc_extension');
/**
* Wrap a function to pass null-like values through without calling it. If no
@ -75,6 +77,73 @@ exports.defaultGrpcOptions = {
deprecatedArgumentOrder: false
};
/**
* Create an Error object from a status object
* @param {grpc~StatusObject} status The status object
* @return {Error} The resulting Error
*/
exports.createStatusError = function(status) {
let statusName = _.invert(constants.status)[status.code];
let message = `${status.code} ${statusName}: ${status.details}`;
let error = new Error(message);
error.code = status.code;
error.metadata = status.metadata;
error.details = status.details;
return error;
};
/**
* Get a method's type from its definition
* @param {grpc~MethodDefinition} method_definition
* @return {number}
*/
exports.getMethodType = function(method_definition) {
if (method_definition.requestStream) {
if (method_definition.responseStream) {
return constants.methodTypes.BIDI_STREAMING;
} else {
return constants.methodTypes.CLIENT_STREAMING;
}
} else {
if (method_definition.responseStream) {
return constants.methodTypes.SERVER_STREAMING;
} else {
return constants.methodTypes.UNARY;
}
}
};
/**
* Get a call object built with the provided options.
* @param {grpc.Channel} channel
* @param {string} path
* @param {grpc.Client~CallOptions=} options Options object.
*/
exports.getCall = function(channel, path, options) {
var deadline;
var host;
var parent;
var propagate_flags;
var credentials;
if (options) {
deadline = options.deadline;
host = options.host;
parent = _.get(options, 'parent.call');
propagate_flags = options.propagate_flags;
credentials = options.credentials;
}
if (deadline === undefined) {
deadline = Infinity;
}
var call = new grpc.Call(channel, path, deadline, host,
parent, propagate_flags);
if (credentials) {
call.setCredentials(credentials);
}
return call;
};
// JSDoc definitions that are used in multiple other modules
/**
@ -166,6 +235,71 @@ exports.defaultGrpcOptions = {
* function for repsonse data
*/
/**
* @function MetadataListener
* @param {grpc.Metadata} metadata The response metadata.
* @param {function} next Passes metadata to the next interceptor.
*/
/**
* @function MessageListener
* @param {jspb.Message} message The response message.
* @param {function} next Passes a message to the next interceptor.
*/
/**
* @function StatusListener
* @param {grpc~StatusObject} status The response status.
* @param {function} next Passes a status to the next interceptor.
*/
/**
* A set of interceptor functions triggered by responses
* @typedef {object} grpc~Listener
* @property {MetadataListener=} onReceiveMetadata A function triggered by
* response metadata.
* @property {MessageListener=} onReceiveMessage A function triggered by a
* response message.
* @property {StatusListener=} onReceiveStatus A function triggered by a
* response status.
*/
/**
* @function MetadataRequester
* @param {grpc.Metadata} metadata The request metadata.
* @param {grpc~Listener} listener A listener wired to the previous layers
* in the interceptor stack.
* @param {function} next Passes metadata and a listener to the next
* interceptor.
*/
/**
* @function MessageRequester
* @param {jspb.Message} message The request message.
* @param {function} next Passes a message to the next interceptor.
*/
/**
* @function CloseRequester
* @param {function} next Calls the next interceptor.
*/
/**
* @function CancelRequester
* @param {function} next Calls the next interceptor.
*/
/**
* @typedef {object} grpc~Requester
* @param {MetadataRequester=} start A function triggered when the call begins.
* @param {MessageRequester=} sendMessage A function triggered by the request
* message.
* @param {CloseRequester=} halfClose A function triggered when the client
* closes the call.
* @param {CancelRequester=} cancel A function triggered when the call is
* cancelled.
*/
/**
* An object that completely defines a service.
* @typedef {Object.<string, grpc~MethodDefinition>} grpc~ServiceDefinition
@ -175,3 +309,27 @@ exports.defaultGrpcOptions = {
* An object that defines a package hierarchy with multiple services
* @typedef {Object.<string, grpc~ServiceDefinition>} grpc~PackageDefinition
*/
/**
* A function for dynamically assigning an interceptor to a call.
* @function InterceptorProvider
* @param {grpc~MethodDefinition} method_definition The method to provide
* an interceptor for.
* @return {Interceptor|null} The interceptor to provide or nothing
*/
/**
* A function which can modify call options and produce methods to intercept
* RPC operations.
* @function Interceptor
* @param {object} options The grpc call options
* @param {NextCall} nextCall
* @return {InterceptingCall}
*/
/**
* A function which produces the next InterceptingCall.
* @function NextCall
* @param {object} options The grpc call options
* @return {InterceptingCall|null}
*/

View File

@ -235,3 +235,17 @@ exports.logVerbosity = {
INFO: 1,
ERROR: 2
};
/**
* Method types: the supported RPC types
* @memberof grpc
* @alias grpc.methodTypes
* @readonly
* @enum {number}
*/
exports.methodTypes = {
UNARY: 0,
CLIENT_STREAMING: 1,
SERVER_STREAMING: 2,
BIDI_STREAMING: 3
};

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,34 @@
/**
* @license
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
syntax = "proto3";
message EchoMessage {
string value = 1;
int32 value2 = 2;
}
service EchoService {
rpc Echo (EchoMessage) returns (EchoMessage);
rpc EchoClientStream (stream EchoMessage) returns (EchoMessage);
rpc EchoServerStream (EchoMessage) returns (stream EchoMessage);
rpc EchoBidiStream (stream EchoMessage) returns (stream EchoMessage);
}