mirror of https://github.com/grpc/grpc-node.git
Refactored server.js and added a test to increase coverage
This commit is contained in:
parent
1529ea2921
commit
12aafc237f
139
src/server.js
139
src/server.js
|
@ -100,28 +100,6 @@ function handleError(call, error) {
|
||||||
call.startBatch(error_batch, function(){});
|
call.startBatch(error_batch, function(){});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait for the client to close, then emit a cancelled event if the client
|
|
||||||
* cancelled.
|
|
||||||
* @access private
|
|
||||||
* @param {grpc.Call} call The call object to wait on
|
|
||||||
* @param {EventEmitter} emitter The event emitter to emit the cancelled event
|
|
||||||
* on
|
|
||||||
*/
|
|
||||||
function waitForCancel(call, emitter) {
|
|
||||||
var cancel_batch = {};
|
|
||||||
cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
|
|
||||||
call.startBatch(cancel_batch, function(err, result) {
|
|
||||||
if (err) {
|
|
||||||
emitter.emit('error', err);
|
|
||||||
}
|
|
||||||
if (result.cancelled) {
|
|
||||||
emitter.cancelled = true;
|
|
||||||
emitter.emit('cancelled');
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a response to a unary or client streaming call.
|
* Send a response to a unary or client streaming call.
|
||||||
* @access private
|
* @access private
|
||||||
|
@ -258,6 +236,13 @@ function setUpReadable(stream, deserialize) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
util.inherits(ServerUnaryCall, EventEmitter);
|
||||||
|
|
||||||
|
function ServerUnaryCall(call) {
|
||||||
|
EventEmitter.call(this);
|
||||||
|
this.call = call;
|
||||||
|
}
|
||||||
|
|
||||||
util.inherits(ServerWritableStream, Writable);
|
util.inherits(ServerWritableStream, Writable);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -311,33 +296,6 @@ function _write(chunk, encoding, callback) {
|
||||||
|
|
||||||
ServerWritableStream.prototype._write = _write;
|
ServerWritableStream.prototype._write = _write;
|
||||||
|
|
||||||
/**
|
|
||||||
* Send the initial metadata for a writable stream.
|
|
||||||
* @param {Metadata} responseMetadata Metadata to send
|
|
||||||
*/
|
|
||||||
function sendMetadata(responseMetadata) {
|
|
||||||
/* jshint validthis: true */
|
|
||||||
var self = this;
|
|
||||||
if (!this.call.metadataSent) {
|
|
||||||
this.call.metadataSent = true;
|
|
||||||
var batch = [];
|
|
||||||
batch[grpc.opType.SEND_INITIAL_METADATA] =
|
|
||||||
responseMetadata._getCoreRepresentation();
|
|
||||||
this.call.startBatch(batch, function(err) {
|
|
||||||
if (err) {
|
|
||||||
self.emit('error', err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @inheritdoc
|
|
||||||
* @alias module:src/server~ServerWritableStream#sendMetadata
|
|
||||||
*/
|
|
||||||
ServerWritableStream.prototype.sendMetadata = sendMetadata;
|
|
||||||
|
|
||||||
util.inherits(ServerReadableStream, Readable);
|
util.inherits(ServerReadableStream, Readable);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -427,6 +385,31 @@ function ServerDuplexStream(call, serialize, deserialize) {
|
||||||
|
|
||||||
ServerDuplexStream.prototype._read = _read;
|
ServerDuplexStream.prototype._read = _read;
|
||||||
ServerDuplexStream.prototype._write = _write;
|
ServerDuplexStream.prototype._write = _write;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send the initial metadata for a writable stream.
|
||||||
|
* @param {Metadata} responseMetadata Metadata to send
|
||||||
|
*/
|
||||||
|
function sendMetadata(responseMetadata) {
|
||||||
|
/* jshint validthis: true */
|
||||||
|
var self = this;
|
||||||
|
if (!this.call.metadataSent) {
|
||||||
|
this.call.metadataSent = true;
|
||||||
|
var batch = {};
|
||||||
|
batch[grpc.opType.SEND_INITIAL_METADATA] =
|
||||||
|
responseMetadata._getCoreRepresentation();
|
||||||
|
this.call.startBatch(batch, function(err) {
|
||||||
|
if (err) {
|
||||||
|
self.emit('error', err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ServerUnaryCall.prototype.sendMetadata = sendMetadata;
|
||||||
|
ServerWritableStream.prototype.sendMetadata = sendMetadata;
|
||||||
|
ServerReadableStream.prototype.sendMetadata = sendMetadata;
|
||||||
ServerDuplexStream.prototype.sendMetadata = sendMetadata;
|
ServerDuplexStream.prototype.sendMetadata = sendMetadata;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -438,10 +421,36 @@ function getPeer() {
|
||||||
return this.call.getPeer();
|
return this.call.getPeer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ServerUnaryCall.prototype.getPeer = getPeer;
|
||||||
ServerReadableStream.prototype.getPeer = getPeer;
|
ServerReadableStream.prototype.getPeer = getPeer;
|
||||||
ServerWritableStream.prototype.getPeer = getPeer;
|
ServerWritableStream.prototype.getPeer = getPeer;
|
||||||
ServerDuplexStream.prototype.getPeer = getPeer;
|
ServerDuplexStream.prototype.getPeer = getPeer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the client to close, then emit a cancelled event if the client
|
||||||
|
* cancelled.
|
||||||
|
*/
|
||||||
|
function waitForCancel() {
|
||||||
|
/* jshint validthis: true */
|
||||||
|
var self = this;
|
||||||
|
var cancel_batch = {};
|
||||||
|
cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
|
||||||
|
self.call.startBatch(cancel_batch, function(err, result) {
|
||||||
|
if (err) {
|
||||||
|
self.emit('error', err);
|
||||||
|
}
|
||||||
|
if (result.cancelled) {
|
||||||
|
self.cancelled = true;
|
||||||
|
self.emit('cancelled');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
ServerUnaryCall.prototype.waitForCancel = waitForCancel;
|
||||||
|
ServerReadableStream.prototype.waitForCancel = waitForCancel;
|
||||||
|
ServerWritableStream.prototype.waitForCancel = waitForCancel;
|
||||||
|
ServerDuplexStream.prototype.waitForCancel = waitForCancel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fully handle a unary call
|
* Fully handle a unary call
|
||||||
* @access private
|
* @access private
|
||||||
|
@ -450,25 +459,12 @@ ServerDuplexStream.prototype.getPeer = getPeer;
|
||||||
* @param {Metadata} metadata Metadata from the client
|
* @param {Metadata} metadata Metadata from the client
|
||||||
*/
|
*/
|
||||||
function handleUnary(call, handler, metadata) {
|
function handleUnary(call, handler, metadata) {
|
||||||
var emitter = new EventEmitter();
|
var emitter = new ServerUnaryCall(call);
|
||||||
emitter.sendMetadata = function(responseMetadata) {
|
|
||||||
if (!call.metadataSent) {
|
|
||||||
call.metadataSent = true;
|
|
||||||
var batch = {};
|
|
||||||
batch[grpc.opType.SEND_INITIAL_METADATA] =
|
|
||||||
responseMetadata._getCoreRepresentation();
|
|
||||||
call.startBatch(batch, function() {});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
emitter.getPeer = function() {
|
|
||||||
return call.getPeer();
|
|
||||||
};
|
|
||||||
emitter.on('error', function(error) {
|
emitter.on('error', function(error) {
|
||||||
handleError(call, error);
|
handleError(call, error);
|
||||||
});
|
});
|
||||||
emitter.metadata = metadata;
|
emitter.metadata = metadata;
|
||||||
waitForCancel(call, emitter);
|
emitter.waitForCancel();
|
||||||
emitter.call = call;
|
|
||||||
var batch = {};
|
var batch = {};
|
||||||
batch[grpc.opType.RECV_MESSAGE] = true;
|
batch[grpc.opType.RECV_MESSAGE] = true;
|
||||||
call.startBatch(batch, function(err, result) {
|
call.startBatch(batch, function(err, result) {
|
||||||
|
@ -508,7 +504,7 @@ function handleUnary(call, handler, metadata) {
|
||||||
*/
|
*/
|
||||||
function handleServerStreaming(call, handler, metadata) {
|
function handleServerStreaming(call, handler, metadata) {
|
||||||
var stream = new ServerWritableStream(call, handler.serialize);
|
var stream = new ServerWritableStream(call, handler.serialize);
|
||||||
waitForCancel(call, stream);
|
stream.waitForCancel();
|
||||||
stream.metadata = metadata;
|
stream.metadata = metadata;
|
||||||
var batch = {};
|
var batch = {};
|
||||||
batch[grpc.opType.RECV_MESSAGE] = true;
|
batch[grpc.opType.RECV_MESSAGE] = true;
|
||||||
|
@ -537,19 +533,10 @@ function handleServerStreaming(call, handler, metadata) {
|
||||||
*/
|
*/
|
||||||
function handleClientStreaming(call, handler, metadata) {
|
function handleClientStreaming(call, handler, metadata) {
|
||||||
var stream = new ServerReadableStream(call, handler.deserialize);
|
var stream = new ServerReadableStream(call, handler.deserialize);
|
||||||
stream.sendMetadata = function(responseMetadata) {
|
|
||||||
if (!call.metadataSent) {
|
|
||||||
call.metadataSent = true;
|
|
||||||
var batch = {};
|
|
||||||
batch[grpc.opType.SEND_INITIAL_METADATA] =
|
|
||||||
responseMetadata._getCoreRepresentation();
|
|
||||||
call.startBatch(batch, function() {});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
stream.on('error', function(error) {
|
stream.on('error', function(error) {
|
||||||
handleError(call, error);
|
handleError(call, error);
|
||||||
});
|
});
|
||||||
waitForCancel(call, stream);
|
stream.waitForCancel();
|
||||||
stream.metadata = metadata;
|
stream.metadata = metadata;
|
||||||
handler.func(stream, function(err, value, trailer, flags) {
|
handler.func(stream, function(err, value, trailer, flags) {
|
||||||
stream.terminate();
|
stream.terminate();
|
||||||
|
@ -574,7 +561,7 @@ function handleClientStreaming(call, handler, metadata) {
|
||||||
function handleBidiStreaming(call, handler, metadata) {
|
function handleBidiStreaming(call, handler, metadata) {
|
||||||
var stream = new ServerDuplexStream(call, handler.serialize,
|
var stream = new ServerDuplexStream(call, handler.serialize,
|
||||||
handler.deserialize);
|
handler.deserialize);
|
||||||
waitForCancel(call, stream);
|
stream.waitForCancel();
|
||||||
stream.metadata = metadata;
|
stream.metadata = metadata;
|
||||||
handler.func(stream);
|
handler.func(stream);
|
||||||
}
|
}
|
||||||
|
|
|
@ -312,6 +312,54 @@ describe('Generic client and server', function() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
describe('Server-side getPeer', function() {
|
||||||
|
function toString(val) {
|
||||||
|
return val.toString();
|
||||||
|
}
|
||||||
|
function toBuffer(str) {
|
||||||
|
return new Buffer(str);
|
||||||
|
}
|
||||||
|
var string_service_attrs = {
|
||||||
|
'getPeer' : {
|
||||||
|
path: '/string/getPeer',
|
||||||
|
requestStream: false,
|
||||||
|
responseStream: false,
|
||||||
|
requestSerialize: toBuffer,
|
||||||
|
requestDeserialize: toString,
|
||||||
|
responseSerialize: toBuffer,
|
||||||
|
responseDeserialize: toString
|
||||||
|
}
|
||||||
|
};
|
||||||
|
var client;
|
||||||
|
var server;
|
||||||
|
before(function() {
|
||||||
|
server = new grpc.Server();
|
||||||
|
server.addService(string_service_attrs, {
|
||||||
|
getPeer: function(call, callback) {
|
||||||
|
try {
|
||||||
|
callback(null, call.getPeer());
|
||||||
|
} catch (e) {
|
||||||
|
call.emit('error', e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
var port = server.bind('localhost:0', server_insecure_creds);
|
||||||
|
server.start();
|
||||||
|
var Client = grpc.makeGenericClientConstructor(string_service_attrs);
|
||||||
|
client = new Client('localhost:' + port,
|
||||||
|
grpc.credentials.createInsecure());
|
||||||
|
});
|
||||||
|
after(function() {
|
||||||
|
server.forceShutdown();
|
||||||
|
});
|
||||||
|
it('should respond with a string representing the client', function(done) {
|
||||||
|
client.getPeer('', function(err, response) {
|
||||||
|
assert.ifError(err);
|
||||||
|
// We don't expect a specific value, just that it worked without error
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
describe('Echo metadata', function() {
|
describe('Echo metadata', function() {
|
||||||
var client;
|
var client;
|
||||||
var server;
|
var server;
|
||||||
|
|
Loading…
Reference in New Issue