mirror of https://github.com/grpc/grpc-node.git
Enforce order of operations for synchronous requests
This commit is contained in:
parent
6bfb5de337
commit
4c502ed6ae
|
@ -652,16 +652,22 @@ EndListener.prototype.onReceiveMessage = function(){};
|
|||
EndListener.prototype.onReceiveStatus = function(){};
|
||||
EndListener.prototype.recvMessageWithContext = function(){};
|
||||
|
||||
var OP_DEPENDENCIES = {
|
||||
[grpc.opType.SEND_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA],
|
||||
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: [grpc.opType.SEND_MESSAGE],
|
||||
[grpc.opType.RECV_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA]
|
||||
};
|
||||
|
||||
/**
|
||||
* Produces a callback triggered by streaming response messages.
|
||||
* @private
|
||||
* @param {EventEmitter} emitter
|
||||
* @param {grpc.internal~Call} call
|
||||
* @param {grpc~Listener} listener
|
||||
* @param {function} get_listener Returns a grpc~Listener.
|
||||
* @param {grpc~deserialize} deserialize
|
||||
* @return {Function}
|
||||
*/
|
||||
function _getStreamReadCallback(emitter, call, listener, deserialize) {
|
||||
function _getStreamReadCallback(emitter, call, get_listener, deserialize) {
|
||||
return function (err, response) {
|
||||
if (err) {
|
||||
// Something has gone wrong. Stop reading and wait for status
|
||||
|
@ -684,6 +690,7 @@ function _getStreamReadCallback(emitter, call, listener, deserialize) {
|
|||
emitter._readsDone();
|
||||
return;
|
||||
}
|
||||
var listener = get_listener();
|
||||
var context = {
|
||||
call: call,
|
||||
listener: listener
|
||||
|
@ -692,6 +699,66 @@ function _getStreamReadCallback(emitter, call, listener, deserialize) {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests whether a batch can be started.
|
||||
* @private
|
||||
* @param {number[]} batch_ops The operations in the batch we are checking.
|
||||
* @param {number[]} completed_ops Previously completed operations.
|
||||
* @return {boolean}
|
||||
*/
|
||||
function _areBatchRequirementsMet(batch_ops, completed_ops) {
|
||||
var dependencies = _.flatMap(batch_ops, function(op) {
|
||||
return OP_DEPENDENCIES[op] || [];
|
||||
});
|
||||
var dependencies_met = _.intersection(dependencies,
|
||||
batch_ops.concat(completed_ops));
|
||||
return _.isEqual(dependencies_met.sort(), dependencies.sort());
|
||||
}
|
||||
|
||||
/**
|
||||
* Enforces the order of operations for synchronous requests. If a batch's
|
||||
* operations cannot be started because required operations have not started
|
||||
* yet, the batch is deferred until requirements are met.
|
||||
* @private
|
||||
* @param {grpc.Client~Call} call
|
||||
* @param {object} batch
|
||||
* @param {object} batch_state
|
||||
* @param {number[]} [batch_state.completed_ops] The ops already sent.
|
||||
* @param {object} [batch_state.deferred_batches] Batches to be sent after
|
||||
* their dependencies are fulfilled.
|
||||
* @param {function} callback
|
||||
* @return {object}
|
||||
*/
|
||||
function _startBatchIfReady(call, batch, batch_state, callback) {
|
||||
var completed_ops = batch_state.completed_ops;
|
||||
var deferred_batches = batch_state.deferred_batches;
|
||||
var batch_ops = _.map(_.keys(batch), Number);
|
||||
if (_areBatchRequirementsMet(batch_ops, completed_ops)) {
|
||||
// Dependencies are met, start the batch and any deferred batches whose
|
||||
// dependencies are met as a result.
|
||||
call.startBatch(batch, callback);
|
||||
completed_ops = _.union(completed_ops, batch_ops);
|
||||
deferred_batches = _.flatMap(deferred_batches, function(deferred_batch) {
|
||||
var deferred_batch_ops = _.map(_.keys(deferred_batch), Number);
|
||||
if (_areBatchRequirementsMet(deferred_batch_ops, completed_ops)) {
|
||||
call.startBatch(deferred_batch.batch, deferred_batch.callback);
|
||||
return [];
|
||||
}
|
||||
return [deferred_batch];
|
||||
});
|
||||
} else {
|
||||
// Dependencies are not met, defer the batch
|
||||
deferred_batches = deferred_batches.concat({
|
||||
batch: batch,
|
||||
callback: callback
|
||||
});
|
||||
}
|
||||
return {
|
||||
completed_ops: completed_ops,
|
||||
deferred_batches: deferred_batches
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Produces an interceptor which will start gRPC batches for unary calls.
|
||||
* @private
|
||||
|
@ -708,19 +775,25 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
|
|||
var call = common.getCall(channel, method_definition.path, options);
|
||||
var first_listener;
|
||||
var final_requester = {};
|
||||
var batch_state = {
|
||||
completed_ops: [],
|
||||
deferred_batches: []
|
||||
};
|
||||
final_requester.start = function (metadata, listener) {
|
||||
var batch = {
|
||||
[grpc.opType.SEND_INITIAL_METADATA]:
|
||||
metadata._getCoreRepresentation(),
|
||||
};
|
||||
first_listener = listener;
|
||||
call.startBatch(batch, function () { });
|
||||
batch_state = _startBatchIfReady(call, batch, batch_state,
|
||||
function() {});
|
||||
};
|
||||
final_requester.sendMessage = function (message) {
|
||||
var batch = {
|
||||
[grpc.opType.SEND_MESSAGE]: serialize(message),
|
||||
};
|
||||
call.startBatch(batch, function () { });
|
||||
batch_state = _startBatchIfReady(call, batch, batch_state,
|
||||
function() {});
|
||||
};
|
||||
final_requester.halfClose = function () {
|
||||
var batch = {
|
||||
|
@ -729,7 +802,7 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
|
|||
[grpc.opType.RECV_MESSAGE]: true,
|
||||
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
|
||||
};
|
||||
call.startBatch(batch, function (err, response) {
|
||||
var callback = function (err, response) {
|
||||
response.status.metadata = Metadata._fromCoreRepresentation(
|
||||
response.status.metadata);
|
||||
var status = response.status;
|
||||
|
@ -757,7 +830,8 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
|
|||
first_listener.onReceiveMetadata(response.metadata);
|
||||
first_listener.onReceiveMessage(deserialized);
|
||||
first_listener.onReceiveStatus(status);
|
||||
});
|
||||
};
|
||||
batch_state = _startBatchIfReady(call, batch, batch_state, callback);
|
||||
};
|
||||
final_requester.cancel = function () {
|
||||
call.cancel();
|
||||
|
@ -895,17 +969,24 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) {
|
|||
method_definition.responseDeserialize);
|
||||
var serialize = method_definition.requestSerialize;
|
||||
return function (options) {
|
||||
var first_listener;
|
||||
var batch_state = {
|
||||
completed_ops: [],
|
||||
deferred_batches: []
|
||||
};
|
||||
var call = common.getCall(channel, method_definition.path, options);
|
||||
var final_requester = {};
|
||||
var first_listener;
|
||||
var get_listener = function() {
|
||||
return first_listener;
|
||||
};
|
||||
final_requester.start = function(metadata, listener) {
|
||||
first_listener = listener;
|
||||
metadata = metadata.clone();
|
||||
var metadata_batch = {
|
||||
[grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(),
|
||||
[grpc.opType.RECV_INITIAL_METADATA]: true,
|
||||
[grpc.opType.RECV_INITIAL_METADATA]: true
|
||||
};
|
||||
call.startBatch(metadata_batch, function(err, response) {
|
||||
var callback = function(err, response) {
|
||||
if (err) {
|
||||
// The call has stopped for some reason. A non-OK status will arrive
|
||||
// in the other batch.
|
||||
|
@ -913,7 +994,9 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) {
|
|||
}
|
||||
first_listener.onReceiveMetadata(
|
||||
Metadata._fromCoreRepresentation(response.metadata));
|
||||
});
|
||||
};
|
||||
batch_state = _startBatchIfReady(call, metadata_batch, batch_state,
|
||||
callback);
|
||||
var status_batch = {
|
||||
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
|
||||
};
|
||||
|
@ -935,26 +1018,28 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) {
|
|||
var send_batch = {
|
||||
[grpc.opType.SEND_MESSAGE]: message
|
||||
};
|
||||
call.startBatch(send_batch, function(err, response) {
|
||||
var callback = function(err, response) {
|
||||
if (err) {
|
||||
// The call has stopped for some reason. A non-OK status will arrive
|
||||
// in the other batch.
|
||||
return;
|
||||
}
|
||||
});
|
||||
};
|
||||
batch_state = _startBatchIfReady(call, send_batch, batch_state, callback);
|
||||
};
|
||||
final_requester.halfClose = function() {
|
||||
var batch = {
|
||||
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: true
|
||||
};
|
||||
call.startBatch(batch, function() {});
|
||||
batch_state = _startBatchIfReady(call, batch, batch_state, function() {});
|
||||
};
|
||||
final_requester.recvMessageWithContext = function(context) {
|
||||
var recv_batch = {
|
||||
[grpc.opType.RECV_MESSAGE]: true
|
||||
};
|
||||
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
|
||||
first_listener, deserialize));
|
||||
var callback = _getStreamReadCallback(emitter, call,
|
||||
get_listener, deserialize);
|
||||
batch_state = _startBatchIfReady(call, recv_batch, batch_state, callback);
|
||||
};
|
||||
final_requester.cancel = function() {
|
||||
call.cancel();
|
||||
|
@ -981,6 +1066,9 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) {
|
|||
method_definition.responseDeserialize);
|
||||
return function (options) {
|
||||
var first_listener;
|
||||
var get_listener = function() {
|
||||
return first_listener;
|
||||
};
|
||||
var call = common.getCall(channel, method_definition.path, options);
|
||||
var final_requester = {};
|
||||
final_requester.start = function (metadata, listener) {
|
||||
|
@ -1057,7 +1145,7 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) {
|
|||
[grpc.opType.RECV_MESSAGE]: true
|
||||
};
|
||||
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
|
||||
first_listener, deserialize));
|
||||
get_listener, deserialize));
|
||||
};
|
||||
final_requester.cancel = function() {
|
||||
call.cancel();
|
||||
|
@ -1144,11 +1232,13 @@ function _getServerStreamingListener(method_definition, emitter) {
|
|||
onReceiveMessage: function(message, next, context) {
|
||||
if (emitter.push(message) && message !== null) {
|
||||
var call = context.call;
|
||||
var listener = context.listener;
|
||||
var get_listener = function() {
|
||||
return context.listener;
|
||||
};
|
||||
var read_batch = {};
|
||||
read_batch[grpc.opType.RECV_MESSAGE] = true;
|
||||
call.startBatch(read_batch, _getStreamReadCallback(emitter, call,
|
||||
listener, deserialize));
|
||||
get_listener, deserialize));
|
||||
} else {
|
||||
emitter.reading = false;
|
||||
}
|
||||
|
@ -1176,11 +1266,13 @@ function _getBidiStreamingListener(method_definition, emitter) {
|
|||
onReceiveMessage: function(message, next, context) {
|
||||
if (emitter.push(message) && message !== null) {
|
||||
var call = context.call;
|
||||
var listener = context.listener;
|
||||
var get_listener = function() {
|
||||
return context.listener;
|
||||
};
|
||||
var read_batch = {};
|
||||
read_batch[grpc.opType.RECV_MESSAGE] = true;
|
||||
call.startBatch(read_batch, _getStreamReadCallback(emitter, call,
|
||||
listener, deserialize));
|
||||
get_listener, deserialize));
|
||||
} else {
|
||||
emitter.reading = false;
|
||||
}
|
||||
|
|
|
@ -1702,4 +1702,94 @@ describe('Client interceptors', function() {
|
|||
bidi_stream.end();
|
||||
});
|
||||
});
|
||||
|
||||
describe('order of operations enforced for async interceptors', function() {
|
||||
it('with unary call', function(done) {
|
||||
var expected_calls = [
|
||||
'close_b',
|
||||
'message_b',
|
||||
'start_b',
|
||||
'done'
|
||||
];
|
||||
var registry = new CallRegistry(done, expected_calls, true);
|
||||
var message = {value: 'foo'};
|
||||
var interceptor_a = function(options, nextCall) {
|
||||
return new InterceptingCall(nextCall(options), {
|
||||
start: function(metadata, listener, next) {
|
||||
setTimeout(function() { next(metadata, listener); }, 50);
|
||||
},
|
||||
sendMessage: function(message, next) {
|
||||
setTimeout(function () { next(message); }, 10);
|
||||
}
|
||||
});
|
||||
};
|
||||
var interceptor_b = function(options, nextCall) {
|
||||
return new InterceptingCall(nextCall(options), {
|
||||
start: function(metadata, listener, next) {
|
||||
registry.addCall('start_b');
|
||||
next(metadata, listener);
|
||||
},
|
||||
sendMessage: function(message, next) {
|
||||
registry.addCall('message_b');
|
||||
next(message);
|
||||
},
|
||||
halfClose: function(next) {
|
||||
registry.addCall('close_b');
|
||||
next();
|
||||
}
|
||||
});
|
||||
};
|
||||
var options = {
|
||||
interceptors: [interceptor_a, interceptor_b]
|
||||
};
|
||||
client.echo(message, options, function(err, response) {
|
||||
assert.strictEqual(err, null);
|
||||
registry.addCall('done');
|
||||
});
|
||||
});
|
||||
it('with serverStreaming call', function(done) {
|
||||
var expected_calls = [
|
||||
'close_b',
|
||||
'message_b',
|
||||
'start_b',
|
||||
'done'
|
||||
];
|
||||
var registry = new CallRegistry(done, expected_calls, true);
|
||||
var message = {value: 'foo'};
|
||||
var interceptor_a = function(options, nextCall) {
|
||||
return new InterceptingCall(nextCall(options), {
|
||||
start: function(metadata, listener, next) {
|
||||
setTimeout(function() { next(metadata, listener); }, 50);
|
||||
},
|
||||
sendMessage: function(message, next) {
|
||||
setTimeout(function () { next(message); }, 10);
|
||||
}
|
||||
});
|
||||
};
|
||||
var interceptor_b = function(options, nextCall) {
|
||||
return new InterceptingCall(nextCall(options), {
|
||||
start: function(metadata, listener, next) {
|
||||
registry.addCall('start_b');
|
||||
next(metadata, listener);
|
||||
},
|
||||
sendMessage: function(message, next) {
|
||||
registry.addCall('message_b');
|
||||
next(message);
|
||||
},
|
||||
halfClose: function(next) {
|
||||
registry.addCall('close_b');
|
||||
next();
|
||||
}
|
||||
});
|
||||
};
|
||||
var options = {
|
||||
interceptors: [interceptor_a, interceptor_b]
|
||||
};
|
||||
var stream = client.echoServerStream(message, options);
|
||||
stream.on('data', function(response) {
|
||||
assert.strictEqual(response.value, 'foo');
|
||||
registry.addCall('done');
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue