mirror of https://github.com/grpc/grpc-node.git
Merge pull request #2942 from murgatroid99/node_parent_call
Add parent call propagation API to Node library
This commit is contained in:
commit
2cbaf1e9ac
20
ext/call.cc
20
ext/call.cc
|
@ -502,6 +502,22 @@ NAN_METHOD(Call::New) {
|
||||||
return NanThrowTypeError(
|
return NanThrowTypeError(
|
||||||
"Call's third argument must be a date or a number");
|
"Call's third argument must be a date or a number");
|
||||||
}
|
}
|
||||||
|
// These arguments are at the end because they are optional
|
||||||
|
grpc_call *parent_call = NULL;
|
||||||
|
if (Call::HasInstance(args[4])) {
|
||||||
|
Call *parent_obj = ObjectWrap::Unwrap<Call>(args[4]->ToObject());
|
||||||
|
parent_call = parent_obj->wrapped_call;
|
||||||
|
} else if (!(args[4]->IsUndefined() || args[4]->IsNull())) {
|
||||||
|
return NanThrowTypeError(
|
||||||
|
"Call's fifth argument must be another call, if provided");
|
||||||
|
}
|
||||||
|
gpr_uint32 propagate_flags = GRPC_PROPAGATE_DEFAULTS;
|
||||||
|
if (args[5]->IsUint32()) {
|
||||||
|
propagate_flags = args[5]->Uint32Value();
|
||||||
|
} else if (!(args[5]->IsUndefined() || args[5]->IsNull())) {
|
||||||
|
return NanThrowTypeError(
|
||||||
|
"Call's sixth argument must be propagate flags, if provided");
|
||||||
|
}
|
||||||
Handle<Object> channel_object = args[0]->ToObject();
|
Handle<Object> channel_object = args[0]->ToObject();
|
||||||
Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object);
|
Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object);
|
||||||
if (channel->GetWrappedChannel() == NULL) {
|
if (channel->GetWrappedChannel() == NULL) {
|
||||||
|
@ -514,12 +530,12 @@ NAN_METHOD(Call::New) {
|
||||||
if (args[3]->IsString()) {
|
if (args[3]->IsString()) {
|
||||||
NanUtf8String host_override(args[3]);
|
NanUtf8String host_override(args[3]);
|
||||||
wrapped_call = grpc_channel_create_call(
|
wrapped_call = grpc_channel_create_call(
|
||||||
wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
|
wrapped_channel, parent_call, propagate_flags,
|
||||||
CompletionQueueAsyncWorker::GetQueue(), *method,
|
CompletionQueueAsyncWorker::GetQueue(), *method,
|
||||||
*host_override, MillisecondsToTimespec(deadline), NULL);
|
*host_override, MillisecondsToTimespec(deadline), NULL);
|
||||||
} else if (args[3]->IsUndefined() || args[3]->IsNull()) {
|
} else if (args[3]->IsUndefined() || args[3]->IsNull()) {
|
||||||
wrapped_call = grpc_channel_create_call(
|
wrapped_call = grpc_channel_create_call(
|
||||||
wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
|
wrapped_channel, parent_call, propagate_flags,
|
||||||
CompletionQueueAsyncWorker::GetQueue(), *method,
|
CompletionQueueAsyncWorker::GetQueue(), *method,
|
||||||
NULL, MillisecondsToTimespec(deadline), NULL);
|
NULL, MillisecondsToTimespec(deadline), NULL);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -159,6 +159,25 @@ void InitOpTypeConstants(Handle<Object> exports) {
|
||||||
op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER);
|
op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void InitPropagateConstants(Handle<Object> exports) {
|
||||||
|
NanScope();
|
||||||
|
Handle<Object> propagate = NanNew<Object>();
|
||||||
|
exports->Set(NanNew("propagate"), propagate);
|
||||||
|
Handle<Value> DEADLINE(NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_DEADLINE));
|
||||||
|
propagate->Set(NanNew("DEADLINE"), DEADLINE);
|
||||||
|
Handle<Value> CENSUS_STATS_CONTEXT(
|
||||||
|
NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_STATS_CONTEXT));
|
||||||
|
propagate->Set(NanNew("CENSUS_STATS_CONTEXT"), CENSUS_STATS_CONTEXT);
|
||||||
|
Handle<Value> CENSUS_TRACING_CONTEXT(
|
||||||
|
NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT));
|
||||||
|
propagate->Set(NanNew("CENSUS_TRACING_CONTEXT"), CENSUS_TRACING_CONTEXT);
|
||||||
|
Handle<Value> CANCELLATION(
|
||||||
|
NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CANCELLATION));
|
||||||
|
propagate->Set(NanNew("CANCELLATION"), CANCELLATION);
|
||||||
|
Handle<Value> DEFAULTS(NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_DEFAULTS));
|
||||||
|
propagate->Set(NanNew("DEFAULTS"), DEFAULTS);
|
||||||
|
}
|
||||||
|
|
||||||
void InitConnectivityStateConstants(Handle<Object> exports) {
|
void InitConnectivityStateConstants(Handle<Object> exports) {
|
||||||
NanScope();
|
NanScope();
|
||||||
Handle<Object> channel_state = NanNew<Object>();
|
Handle<Object> channel_state = NanNew<Object>();
|
||||||
|
@ -183,6 +202,7 @@ void init(Handle<Object> exports) {
|
||||||
InitStatusConstants(exports);
|
InitStatusConstants(exports);
|
||||||
InitCallErrorConstants(exports);
|
InitCallErrorConstants(exports);
|
||||||
InitOpTypeConstants(exports);
|
InitOpTypeConstants(exports);
|
||||||
|
InitPropagateConstants(exports);
|
||||||
InitConnectivityStateConstants(exports);
|
InitConnectivityStateConstants(exports);
|
||||||
|
|
||||||
grpc::node::Call::Init(exports);
|
grpc::node::Call::Init(exports);
|
||||||
|
|
5
index.js
5
index.js
|
@ -134,6 +134,11 @@ exports.Server = server.Server;
|
||||||
*/
|
*/
|
||||||
exports.status = grpc.status;
|
exports.status = grpc.status;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Propagate flag name to number mapping
|
||||||
|
*/
|
||||||
|
exports.propagate = grpc.propagate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Call error name to code number mapping
|
* Call error name to code number mapping
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -216,14 +216,19 @@ ClientDuplexStream.prototype.getPeer = getPeer;
|
||||||
function getCall(channel, method, options) {
|
function getCall(channel, method, options) {
|
||||||
var deadline;
|
var deadline;
|
||||||
var host;
|
var host;
|
||||||
|
var parent;
|
||||||
|
var propagate_flags;
|
||||||
if (options) {
|
if (options) {
|
||||||
deadline = options.deadline;
|
deadline = options.deadline;
|
||||||
host = options.host;
|
host = options.host;
|
||||||
|
parent = _.get(options, 'parent.call');
|
||||||
|
propagate_flags = options.propagate_flags;
|
||||||
}
|
}
|
||||||
if (deadline === undefined) {
|
if (deadline === undefined) {
|
||||||
deadline = Infinity;
|
deadline = Infinity;
|
||||||
}
|
}
|
||||||
return new grpc.Call(channel, method, deadline, host);
|
return new grpc.Call(channel, method, deadline, host,
|
||||||
|
parent, propagate_flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -432,6 +432,7 @@ function handleUnary(call, handler, metadata) {
|
||||||
});
|
});
|
||||||
emitter.metadata = metadata;
|
emitter.metadata = metadata;
|
||||||
waitForCancel(call, emitter);
|
waitForCancel(call, emitter);
|
||||||
|
emitter.call = call;
|
||||||
var batch = {};
|
var batch = {};
|
||||||
batch[grpc.opType.RECV_MESSAGE] = true;
|
batch[grpc.opType.RECV_MESSAGE] = true;
|
||||||
call.startBatch(batch, function(err, result) {
|
call.startBatch(batch, function(err, result) {
|
||||||
|
|
|
@ -79,6 +79,18 @@ var callErrorNames = [
|
||||||
];
|
];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* List of all propagate flag names
|
||||||
|
* @const
|
||||||
|
* @type {Array.<string>}
|
||||||
|
*/
|
||||||
|
var propagateFlagNames = [
|
||||||
|
'DEADLINE',
|
||||||
|
'CENSUS_STATS_CONTEXT',
|
||||||
|
'CENSUS_TRACING_CONTEXT',
|
||||||
|
'CANCELLATION',
|
||||||
|
'DEFAULTS'
|
||||||
|
];
|
||||||
|
/*
|
||||||
* List of all connectivity state names
|
* List of all connectivity state names
|
||||||
* @const
|
* @const
|
||||||
* @type {Array.<string>}
|
* @type {Array.<string>}
|
||||||
|
@ -104,6 +116,12 @@ describe('constants', function() {
|
||||||
'call error missing: ' + callErrorNames[i]);
|
'call error missing: ' + callErrorNames[i]);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
it('should have all of the propagate flags', function() {
|
||||||
|
for (var i = 0; i < propagateFlagNames.length; i++) {
|
||||||
|
assert(grpc.propagate.hasOwnProperty(propagateFlagNames[i]),
|
||||||
|
'call error missing: ' + propagateFlagNames[i]);
|
||||||
|
}
|
||||||
|
});
|
||||||
it('should have all of the connectivity states', function() {
|
it('should have all of the connectivity states', function() {
|
||||||
for (var i = 0; i < connectivityStateNames.length; i++) {
|
for (var i = 0; i < connectivityStateNames.length; i++) {
|
||||||
assert(grpc.connectivityState.hasOwnProperty(connectivityStateNames[i]),
|
assert(grpc.connectivityState.hasOwnProperty(connectivityStateNames[i]),
|
||||||
|
|
|
@ -67,6 +67,7 @@ function multiDone(done, count) {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
var server_insecure_creds = grpc.ServerCredentials.createInsecure();
|
var server_insecure_creds = grpc.ServerCredentials.createInsecure();
|
||||||
|
|
||||||
describe('File loader', function() {
|
describe('File loader', function() {
|
||||||
|
@ -344,12 +345,14 @@ describe('Echo metadata', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
describe('Other conditions', function() {
|
describe('Other conditions', function() {
|
||||||
|
var test_service;
|
||||||
|
var Client;
|
||||||
var client;
|
var client;
|
||||||
var server;
|
var server;
|
||||||
var port;
|
var port;
|
||||||
before(function() {
|
before(function() {
|
||||||
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
|
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
|
||||||
var test_service = test_proto.lookup('TestService');
|
test_service = test_proto.lookup('TestService');
|
||||||
server = new grpc.Server();
|
server = new grpc.Server();
|
||||||
server.addProtoService(test_service, {
|
server.addProtoService(test_service, {
|
||||||
unary: function(call, cb) {
|
unary: function(call, cb) {
|
||||||
|
@ -411,7 +414,7 @@ describe('Other conditions', function() {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
port = server.bind('localhost:0', server_insecure_creds);
|
port = server.bind('localhost:0', server_insecure_creds);
|
||||||
var Client = surface_client.makeProtobufClientConstructor(test_service);
|
Client = surface_client.makeProtobufClientConstructor(test_service);
|
||||||
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
|
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
|
||||||
server.start();
|
server.start();
|
||||||
});
|
});
|
||||||
|
@ -664,6 +667,166 @@ describe('Other conditions', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
describe('Call propagation', function() {
|
||||||
|
var proxy;
|
||||||
|
var proxy_impl;
|
||||||
|
beforeEach(function() {
|
||||||
|
proxy = new grpc.Server();
|
||||||
|
proxy_impl = {
|
||||||
|
unary: function(call) {},
|
||||||
|
clientStream: function(stream) {},
|
||||||
|
serverStream: function(stream) {},
|
||||||
|
bidiStream: function(stream) {}
|
||||||
|
};
|
||||||
|
});
|
||||||
|
afterEach(function() {
|
||||||
|
console.log('Shutting down server');
|
||||||
|
proxy.shutdown();
|
||||||
|
});
|
||||||
|
describe('Cancellation', function() {
|
||||||
|
it('With a unary call', function(done) {
|
||||||
|
done = multiDone(done, 2);
|
||||||
|
proxy_impl.unary = function(parent, callback) {
|
||||||
|
client.unary(parent.request, function(err, value) {
|
||||||
|
try {
|
||||||
|
assert(err);
|
||||||
|
assert.strictEqual(err.code, grpc.status.CANCELLED);
|
||||||
|
} finally {
|
||||||
|
callback(err, value);
|
||||||
|
done();
|
||||||
|
}
|
||||||
|
}, null, {parent: parent});
|
||||||
|
call.cancel();
|
||||||
|
};
|
||||||
|
proxy.addProtoService(test_service, proxy_impl);
|
||||||
|
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
|
||||||
|
proxy.start();
|
||||||
|
var proxy_client = new Client('localhost:' + proxy_port,
|
||||||
|
grpc.Credentials.createInsecure());
|
||||||
|
var call = proxy_client.unary({}, function(err, value) {
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
it('With a client stream call', function(done) {
|
||||||
|
done = multiDone(done, 2);
|
||||||
|
proxy_impl.clientStream = function(parent, callback) {
|
||||||
|
client.clientStream(function(err, value) {
|
||||||
|
try {
|
||||||
|
assert(err);
|
||||||
|
assert.strictEqual(err.code, grpc.status.CANCELLED);
|
||||||
|
} finally {
|
||||||
|
callback(err, value);
|
||||||
|
done();
|
||||||
|
}
|
||||||
|
}, null, {parent: parent});
|
||||||
|
call.cancel();
|
||||||
|
};
|
||||||
|
proxy.addProtoService(test_service, proxy_impl);
|
||||||
|
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
|
||||||
|
proxy.start();
|
||||||
|
var proxy_client = new Client('localhost:' + proxy_port,
|
||||||
|
grpc.Credentials.createInsecure());
|
||||||
|
var call = proxy_client.clientStream(function(err, value) {
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
it('With a server stream call', function(done) {
|
||||||
|
done = multiDone(done, 2);
|
||||||
|
proxy_impl.serverStream = function(parent) {
|
||||||
|
var child = client.serverStream(parent.request, null,
|
||||||
|
{parent: parent});
|
||||||
|
child.on('error', function(err) {
|
||||||
|
assert(err);
|
||||||
|
assert.strictEqual(err.code, grpc.status.CANCELLED);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
call.cancel();
|
||||||
|
};
|
||||||
|
proxy.addProtoService(test_service, proxy_impl);
|
||||||
|
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
|
||||||
|
proxy.start();
|
||||||
|
var proxy_client = new Client('localhost:' + proxy_port,
|
||||||
|
grpc.Credentials.createInsecure());
|
||||||
|
var call = proxy_client.serverStream({});
|
||||||
|
call.on('error', function(err) {
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
it('With a bidi stream call', function(done) {
|
||||||
|
done = multiDone(done, 2);
|
||||||
|
proxy_impl.bidiStream = function(parent) {
|
||||||
|
var child = client.bidiStream(null, {parent: parent});
|
||||||
|
child.on('error', function(err) {
|
||||||
|
assert(err);
|
||||||
|
assert.strictEqual(err.code, grpc.status.CANCELLED);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
call.cancel();
|
||||||
|
};
|
||||||
|
proxy.addProtoService(test_service, proxy_impl);
|
||||||
|
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
|
||||||
|
proxy.start();
|
||||||
|
var proxy_client = new Client('localhost:' + proxy_port,
|
||||||
|
grpc.Credentials.createInsecure());
|
||||||
|
var call = proxy_client.bidiStream();
|
||||||
|
call.on('error', function(err) {
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
describe('Deadline', function() {
|
||||||
|
/* jshint bitwise:false */
|
||||||
|
var deadline_flags = (grpc.propagate.DEFAULTS &
|
||||||
|
~grpc.propagate.CANCELLATION);
|
||||||
|
it('With a client stream call', function(done) {
|
||||||
|
done = multiDone(done, 2);
|
||||||
|
proxy_impl.clientStream = function(parent, callback) {
|
||||||
|
client.clientStream(function(err, value) {
|
||||||
|
try {
|
||||||
|
assert(err);
|
||||||
|
assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED);
|
||||||
|
} finally {
|
||||||
|
callback(err, value);
|
||||||
|
done();
|
||||||
|
}
|
||||||
|
}, null, {parent: parent, propagate_flags: deadline_flags});
|
||||||
|
};
|
||||||
|
proxy.addProtoService(test_service, proxy_impl);
|
||||||
|
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
|
||||||
|
proxy.start();
|
||||||
|
var proxy_client = new Client('localhost:' + proxy_port,
|
||||||
|
grpc.Credentials.createInsecure());
|
||||||
|
var deadline = new Date();
|
||||||
|
deadline.setSeconds(deadline.getSeconds() + 1);
|
||||||
|
proxy_client.clientStream(function(err, value) {
|
||||||
|
done();
|
||||||
|
}, null, {deadline: deadline});
|
||||||
|
});
|
||||||
|
it('With a bidi stream call', function(done) {
|
||||||
|
done = multiDone(done, 2);
|
||||||
|
proxy_impl.bidiStream = function(parent) {
|
||||||
|
var child = client.bidiStream(
|
||||||
|
null, {parent: parent, propagate_flags: deadline_flags});
|
||||||
|
child.on('error', function(err) {
|
||||||
|
assert(err);
|
||||||
|
assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
};
|
||||||
|
proxy.addProtoService(test_service, proxy_impl);
|
||||||
|
var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
|
||||||
|
proxy.start();
|
||||||
|
var proxy_client = new Client('localhost:' + proxy_port,
|
||||||
|
grpc.Credentials.createInsecure());
|
||||||
|
var deadline = new Date();
|
||||||
|
deadline.setSeconds(deadline.getSeconds() + 1);
|
||||||
|
var call = proxy_client.bidiStream(null, {deadline: deadline});
|
||||||
|
call.on('error', function(err) {
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
describe('Cancelling surface client', function() {
|
describe('Cancelling surface client', function() {
|
||||||
var client;
|
var client;
|
||||||
|
|
Loading…
Reference in New Issue