Added cancel to client APIs and cancelled event to server APIs

This commit is contained in:
murgatroid99 2015-01-26 14:11:18 -08:00
parent d85ad3ecdb
commit ed731519c6
6 changed files with 102 additions and 1 deletions

View File

@ -165,7 +165,7 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
* with status CANCELLED. * with status CANCELLED.
*/ */
GrpcClientStream.prototype.cancel = function() { GrpcClientStream.prototype.cancel = function() {
self._call.cancel(); this._call.cancel();
}; };
/** /**

View File

@ -246,6 +246,7 @@ function Server(options) {
call.serverAccept(function(event) { call.serverAccept(function(event) {
if (event.data.code === grpc.status.CANCELLED) { if (event.data.code === grpc.status.CANCELLED) {
cancelled = true; cancelled = true;
stream.emit('cancelled');
} }
}, 0); }, 0);
call.serverEndInitialMetadata(0); call.serverEndInitialMetadata(0);

View File

@ -179,6 +179,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
callback(e); callback(e);
} }
}); });
stream.on('status', function forwardStatus(status) {
if (status.code !== client.status.OK) {
callback(status);
}
});
return emitter; return emitter;
} }
return makeUnaryRequest; return makeUnaryRequest;
@ -216,6 +221,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
callback(e); callback(e);
} }
}); });
stream.on('status', function forwardStatus(status) {
if (status.code !== client.status.OK) {
callback(status);
}
});
return obj_stream; return obj_stream;
} }
return makeClientStreamRequest; return makeClientStreamRequest;

View File

@ -63,6 +63,9 @@ function ServerReadableObjectStream(stream) {
get: function() { return stream.cancelled; } get: function() { return stream.cancelled; }
}); });
var self = this; var self = this;
this._stream.on('cancelled', function() {
self.emit('cancelled');
});
this._stream.on('data', function forwardData(chunk) { this._stream.on('data', function forwardData(chunk) {
if (!self.push(chunk)) { if (!self.push(chunk)) {
self._stream.pause(); self._stream.pause();
@ -100,6 +103,9 @@ function ServerWritableObjectStream(stream) {
var options = {objectMode: true}; var options = {objectMode: true};
Writable.call(this, options); Writable.call(this, options);
this._stream = stream; this._stream = stream;
this._stream.on('cancelled', function() {
self.emit('cancelled');
});
this.on('finish', function() { this.on('finish', function() {
this._stream.end(); this._stream.end();
}); });
@ -138,6 +144,9 @@ function makeUnaryHandler(handler) {
Object.defineProperty(call, 'cancelled', { Object.defineProperty(call, 'cancelled', {
get: function() { return stream.cancelled;} get: function() { return stream.cancelled;}
}); });
stream.on('cancelled', function() {
call.emit('cancelled');
});
handler(call, function sendUnaryData(err, value) { handler(call, function sendUnaryData(err, value) {
if (err) { if (err) {
stream.emit('error', err); stream.emit('error', err);

View File

@ -77,6 +77,14 @@ function errorHandler(stream) {
}; };
} }
/**
* Wait for a cancellation instead of responding
* @param {Stream} stream
*/
function cancelHandler(stream) {
// do nothing
}
describe('echo client', function() { describe('echo client', function() {
it('should receive echo responses', function(done) { it('should receive echo responses', function(done) {
var server = new Server(); var server = new Server();
@ -125,6 +133,26 @@ describe('echo client', function() {
done(); done();
}); });
}); });
it('should be able to cancel a call', function(done) {
var server = new Server();
var port_num = server.bind('0.0.0.0:0');
server.register('cancellation', cancelHandler);
server.start();
var channel = new grpc.Channel('localhost:' + port_num);
var stream = client.makeRequest(
channel,
'cancellation',
null,
getDeadline(1));
stream.cancel();
stream.on('status', function(status) {
assert.equal(status.code, grpc.status.CANCELLED);
server.shutdown();
done();
});
});
}); });
/* TODO(mlumish): explore options for reducing duplication between this test /* TODO(mlumish): explore options for reducing duplication between this test
* and the insecure echo client test */ * and the insecure echo client test */

View File

@ -35,6 +35,8 @@ var assert = require('assert');
var surface_server = require('../surface_server.js'); var surface_server = require('../surface_server.js');
var surface_client = require('../surface_client.js');
var ProtoBuf = require('protobufjs'); var ProtoBuf = require('protobufjs');
var grpc = require('..'); var grpc = require('..');
@ -73,3 +75,54 @@ describe('Surface server constructor', function() {
}, /math.Math/); }, /math.Math/);
}); });
}); });
describe('Surface client', function() {
var client;
var server;
before(function() {
var Server = grpc.buildServer([mathService]);
server = new Server({
'math.Math': {
'div': function(stream) {},
'divMany': function(stream) {},
'fib': function(stream) {},
'sum': function(stream) {}
}
});
var port = server.bind('localhost:0');
var Client = surface_client.makeClientConstructor(mathService);
client = new Client('localhost:' + port);
});
after(function() {
server.shutdown();
});
it('Should correctly cancel a unary call', function(done) {
var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
assert.strictEqual(err.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a client stream call', function(done) {
var call = client.sum(function(err, resp) {
assert.strictEqual(err.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a server stream call', function(done) {
var call = client.fib({'limit': 5});
call.on('status', function(status) {
assert.strictEqual(status.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a bidi stream call', function(done) {
var call = client.divMany();
call.on('status', function(status) {
assert.strictEqual(status.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
});