mirror of https://github.com/grpc/grpc-node.git
313 lines
9.5 KiB
JavaScript
313 lines
9.5 KiB
JavaScript
/*
|
|
*
|
|
* Copyright 2014, Google Inc.
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are
|
|
* met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above
|
|
* copyright notice, this list of conditions and the following disclaimer
|
|
* in the documentation and/or other materials provided with the
|
|
* distribution.
|
|
* * Neither the name of Google Inc. nor the names of its
|
|
* contributors may be used to endorse or promote products derived from
|
|
* this software without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*
|
|
*/
|
|
|
|
var _ = require('underscore');
|
|
|
|
var grpc = require('bindings')('grpc.node');
|
|
|
|
var common = require('./common');
|
|
|
|
var Duplex = require('stream').Duplex;
|
|
var util = require('util');
|
|
|
|
util.inherits(GrpcServerStream, Duplex);
|
|
|
|
/**
|
|
* Class for representing a gRPC server side stream as a Node stream. Extends
|
|
* from stream.Duplex.
|
|
* @constructor
|
|
* @param {grpc.Call} call Call object to proxy
|
|
* @param {function(*):Buffer} serialize Serialization function for responses
|
|
* @param {function(Buffer):*} deserialize Deserialization function for requests
|
|
*/
|
|
function GrpcServerStream(call, serialize, deserialize) {
|
|
Duplex.call(this, {objectMode: true});
|
|
if (!serialize) {
|
|
serialize = function(value) {
|
|
return value;
|
|
};
|
|
}
|
|
if (!deserialize) {
|
|
deserialize = function(value) {
|
|
return value;
|
|
};
|
|
}
|
|
this._call = call;
|
|
// Indicate that a status has been sent
|
|
var finished = false;
|
|
var self = this;
|
|
var status = {
|
|
'code' : grpc.status.OK,
|
|
'details' : 'OK'
|
|
};
|
|
|
|
/**
|
|
* Serialize a response value to a buffer. Always maps null to null. Otherwise
|
|
* uses the provided serialize function
|
|
* @param {*} value The value to serialize
|
|
* @return {Buffer} The serialized value
|
|
*/
|
|
this.serialize = function(value) {
|
|
if (value === null || value === undefined) {
|
|
return null;
|
|
}
|
|
return serialize(value);
|
|
};
|
|
|
|
/**
|
|
* Deserialize a request buffer to a value. Always maps null to null.
|
|
* Otherwise uses the provided deserialize function.
|
|
* @param {Buffer} buffer The buffer to deserialize
|
|
* @return {*} The deserialized value
|
|
*/
|
|
this.deserialize = function(buffer) {
|
|
if (buffer === null) {
|
|
return null;
|
|
}
|
|
return deserialize(buffer);
|
|
};
|
|
|
|
/**
|
|
* Send the pending status
|
|
*/
|
|
function sendStatus() {
|
|
call.startWriteStatus(status.code, status.details, function() {
|
|
});
|
|
finished = true;
|
|
}
|
|
this.on('finish', sendStatus);
|
|
/**
|
|
* Set the pending status to a given error status. If the error does not have
|
|
* code or details properties, the code will be set to grpc.status.INTERNAL
|
|
* and the details will be set to 'Unknown Error'.
|
|
* @param {Error} err The error object
|
|
*/
|
|
function setStatus(err) {
|
|
var code = grpc.status.INTERNAL;
|
|
var details = 'Unknown Error';
|
|
|
|
if (err.hasOwnProperty('code')) {
|
|
code = err.code;
|
|
if (err.hasOwnProperty('details')) {
|
|
details = err.details;
|
|
}
|
|
}
|
|
status = {'code': code, 'details': details};
|
|
}
|
|
/**
|
|
* Terminate the call. This includes indicating that reads are done, draining
|
|
* all pending writes, and sending the given error as a status
|
|
* @param {Error} err The error object
|
|
* @this GrpcServerStream
|
|
*/
|
|
function terminateCall(err) {
|
|
// Drain readable data
|
|
this.on('data', function() {});
|
|
setStatus(err);
|
|
this.end();
|
|
}
|
|
this.on('error', terminateCall);
|
|
// Indicates that a read is pending
|
|
var reading = false;
|
|
/**
|
|
* Callback to be called when a READ event is received. Pushes the data onto
|
|
* the read queue and starts reading again if applicable
|
|
* @param {grpc.Event} event READ event object
|
|
*/
|
|
function readCallback(event) {
|
|
if (finished) {
|
|
self.push(null);
|
|
return;
|
|
}
|
|
var data = event.data;
|
|
if (self.push(deserialize(data)) && data != null) {
|
|
self._call.startRead(readCallback);
|
|
} else {
|
|
reading = false;
|
|
}
|
|
}
|
|
/**
|
|
* Start reading if there is not already a pending read. Reading will
|
|
* continue until self.push returns false (indicating reads should slow
|
|
* down) or the read data is null (indicating that there is no more data).
|
|
*/
|
|
this.startReading = function() {
|
|
if (finished) {
|
|
self.push(null);
|
|
} else {
|
|
if (!reading) {
|
|
reading = true;
|
|
self._call.startRead(readCallback);
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Start reading from the gRPC data source. This is an implementation of a
|
|
* method required for implementing stream.Readable
|
|
* @param {number} size Ignored
|
|
*/
|
|
GrpcServerStream.prototype._read = function(size) {
|
|
this.startReading();
|
|
};
|
|
|
|
/**
|
|
* Start writing a chunk of data. This is an implementation of a method required
|
|
* for implementing stream.Writable.
|
|
* @param {Buffer} chunk The chunk of data to write
|
|
* @param {string} encoding Ignored
|
|
* @param {function(Error=)} callback Callback to indicate that the write is
|
|
* complete
|
|
*/
|
|
GrpcServerStream.prototype._write = function(chunk, encoding, callback) {
|
|
var self = this;
|
|
self._call.startWrite(self.serialize(chunk), function(event) {
|
|
callback();
|
|
}, 0);
|
|
};
|
|
|
|
/**
|
|
* Constructs a server object that stores request handlers and delegates
|
|
* incoming requests to those handlers
|
|
* @constructor
|
|
* @param {Array} options Options that should be passed to the internal server
|
|
* implementation
|
|
*/
|
|
function Server(options) {
|
|
this.handlers = {};
|
|
var handlers = this.handlers;
|
|
var server = new grpc.Server(options);
|
|
this._server = server;
|
|
var started = false;
|
|
/**
|
|
* Start the server and begin handling requests
|
|
* @this Server
|
|
*/
|
|
this.start = function() {
|
|
console.log('Server starting');
|
|
_.each(handlers, function(handler, handler_name) {
|
|
console.log('Serving', handler_name);
|
|
});
|
|
if (this.started) {
|
|
throw 'Server is already running';
|
|
}
|
|
server.start();
|
|
/**
|
|
* Handles the SERVER_RPC_NEW event. If there is a handler associated with
|
|
* the requested method, use that handler to respond to the request. Then
|
|
* wait for the next request
|
|
* @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW
|
|
*/
|
|
function handleNewCall(event) {
|
|
var call = event.call;
|
|
var data = event.data;
|
|
if (data == null) {
|
|
return;
|
|
}
|
|
server.requestCall(handleNewCall);
|
|
var handler = undefined;
|
|
var deadline = data.absolute_deadline;
|
|
var cancelled = false;
|
|
if (handlers.hasOwnProperty(data.method)) {
|
|
handler = handlers[data.method];
|
|
}
|
|
call.serverAccept(function(event) {
|
|
if (event.data.code === grpc.status.CANCELLED) {
|
|
cancelled = true;
|
|
}
|
|
}, 0);
|
|
call.serverEndInitialMetadata(0);
|
|
var stream = new GrpcServerStream(call, handler.serialize,
|
|
handler.deserialize);
|
|
Object.defineProperty(stream, 'cancelled', {
|
|
get: function() { return cancelled;}
|
|
});
|
|
try {
|
|
handler.func(stream, data.metadata);
|
|
} catch (e) {
|
|
stream.emit('error', e);
|
|
}
|
|
}
|
|
server.requestCall(handleNewCall);
|
|
};
|
|
/** Shuts down the server.
|
|
*/
|
|
this.shutdown = function() {
|
|
server.shutdown();
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Registers a handler to handle the named method. Fails if there already is
|
|
* a handler for the given method. Returns true on success
|
|
* @param {string} name The name of the method that the provided function should
|
|
* handle/respond to.
|
|
* @param {function} handler Function that takes a stream of request values and
|
|
* returns a stream of response values
|
|
* @param {function(*):Buffer} serialize Serialization function for responses
|
|
* @param {function(Buffer):*} deserialize Deserialization function for requests
|
|
* @return {boolean} True if the handler was set. False if a handler was already
|
|
* set for that name.
|
|
*/
|
|
Server.prototype.register = function(name, handler, serialize, deserialize) {
|
|
if (this.handlers.hasOwnProperty(name)) {
|
|
return false;
|
|
}
|
|
this.handlers[name] = {
|
|
func: handler,
|
|
serialize: serialize,
|
|
deserialize: deserialize
|
|
};
|
|
return true;
|
|
};
|
|
|
|
/**
|
|
* Binds the server to the given port, with SSL enabled if secure is specified
|
|
* @param {string} port The port that the server should bind on, in the format
|
|
* "address:port"
|
|
* @param {boolean=} secure Whether the server should open a secure port
|
|
*/
|
|
Server.prototype.bind = function(port, secure) {
|
|
if (secure) {
|
|
this._server.addSecureHttp2Port(port);
|
|
} else {
|
|
this._server.addHttp2Port(port);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* See documentation for Server
|
|
*/
|
|
module.exports = Server;
|