Merge branch 'master' into cq_create_api_changes

This commit is contained in:
Sree Kuchibhotla 2017-04-06 13:09:17 -07:00
commit 76130a5088
6 changed files with 56 additions and 34 deletions

View File

@ -45,6 +45,7 @@
namespace grpc { namespace grpc {
namespace node { namespace node {
using Nan::Callback;
using Nan::MaybeLocal; using Nan::MaybeLocal;
using v8::Function; using v8::Function;
@ -62,7 +63,11 @@ grpc_byte_buffer *BufferToByteBuffer(Local<Value> buffer) {
} }
namespace { namespace {
void delete_buffer(char *data, void *hint) { delete[] data; } void delete_buffer(char *data, void *hint) {
grpc_slice *slice = static_cast<grpc_slice *>(hint);
grpc_slice_unref(*slice);
delete slice;
}
} }
Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) { Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
@ -75,31 +80,15 @@ Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
Nan::ThrowError("Error initializing byte buffer reader."); Nan::ThrowError("Error initializing byte buffer reader.");
return scope.Escape(Nan::Undefined()); return scope.Escape(Nan::Undefined());
} }
grpc_slice slice = grpc_byte_buffer_reader_readall(&reader); grpc_slice *slice = new grpc_slice;
size_t length = GRPC_SLICE_LENGTH(slice); *slice = grpc_byte_buffer_reader_readall(&reader);
char *result = new char[length]; grpc_byte_buffer_reader_destroy(&reader);
memcpy(result, GRPC_SLICE_START_PTR(slice), length); char *result = reinterpret_cast<char *>(GRPC_SLICE_START_PTR(*slice));
grpc_slice_unref(slice); size_t length = GRPC_SLICE_LENGTH(*slice);
return scope.Escape(MakeFastBuffer( Local<Value> buf =
Nan::NewBuffer(result, length, delete_buffer, NULL).ToLocalChecked())); Nan::NewBuffer(result, length, delete_buffer, slice).ToLocalChecked();
return scope.Escape(buf);
} }
Local<Value> MakeFastBuffer(Local<Value> slowBuffer) {
Nan::EscapableHandleScope scope;
Local<Object> globalObj = Nan::GetCurrentContext()->Global();
MaybeLocal<Value> constructorValue = Nan::Get(
globalObj, Nan::New("Buffer").ToLocalChecked());
Local<Function> bufferConstructor = Local<Function>::Cast(
constructorValue.ToLocalChecked());
const int argc = 3;
Local<Value> consArgs[argc] = {
slowBuffer,
Nan::New<Number>(::node::Buffer::Length(slowBuffer)),
Nan::New<Number>(0)
};
MaybeLocal<Object> fastBuffer = Nan::NewInstance(bufferConstructor,
argc, consArgs);
return scope.Escape(fastBuffer.ToLocalChecked());
}
} // namespace node } // namespace node
} // namespace grpc } // namespace grpc

View File

@ -50,10 +50,6 @@ grpc_byte_buffer *BufferToByteBuffer(v8::Local<v8::Value> buffer);
/* Convert a grpc_byte_buffer to a Node.js Buffer */ /* Convert a grpc_byte_buffer to a Node.js Buffer */
v8::Local<v8::Value> ByteBufferToBuffer(grpc_byte_buffer *buffer); v8::Local<v8::Value> ByteBufferToBuffer(grpc_byte_buffer *buffer);
/* Convert a ::node::Buffer to a fast Buffer, as defined in the Node
Buffer documentation */
v8::Local<v8::Value> MakeFastBuffer(v8::Local<v8::Value> slowBuffer);
} // namespace node } // namespace node
} // namespace grpc } // namespace grpc

View File

@ -37,7 +37,6 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include "slice.h" #include "slice.h"
#include "byte_buffer.h"
namespace grpc { namespace grpc {
namespace node { namespace node {
@ -93,9 +92,9 @@ Local<Value> CreateBufferFromSlice(const grpc_slice slice) {
Nan::EscapableHandleScope scope; Nan::EscapableHandleScope scope;
grpc_slice *slice_ptr = new grpc_slice; grpc_slice *slice_ptr = new grpc_slice;
*slice_ptr = grpc_slice_ref(slice); *slice_ptr = grpc_slice_ref(slice);
return scope.Escape(MakeFastBuffer(Nan::NewBuffer( return scope.Escape(Nan::NewBuffer(
const_cast<char *>(reinterpret_cast<const char *>(GRPC_SLICE_START_PTR(*slice_ptr))), const_cast<char *>(reinterpret_cast<const char *>(GRPC_SLICE_START_PTR(*slice_ptr))),
GRPC_SLICE_LENGTH(*slice_ptr), SliceFreeCallback, slice_ptr).ToLocalChecked())); GRPC_SLICE_LENGTH(*slice_ptr), SliceFreeCallback, slice_ptr).ToLocalChecked());
} }
} // namespace node } // namespace node

View File

@ -88,6 +88,13 @@ function streamingCall(call) {
}); });
} }
function makeUnaryGenericCall(response_size) {
var response = zeroBuffer(response_size);
return function unaryGenericCall(call, callback) {
callback(null, response);
};
}
function makeStreamingGenericCall(response_size) { function makeStreamingGenericCall(response_size) {
var response = zeroBuffer(response_size); var response = zeroBuffer(response_size);
return function streamingGenericCall(call) { return function streamingGenericCall(call) {
@ -129,6 +136,7 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
this.port = server.bind(host + ':' + port, server_creds); this.port = server.bind(host + ':' + port, server_creds);
if (generic) { if (generic) {
server.addService(genericService, { server.addService(genericService, {
unaryCall: makeUnaryGenericCall(response_size),
streamingCall: makeStreamingGenericCall(response_size) streamingCall: makeStreamingGenericCall(response_size)
}); });
} else { } else {

View File

@ -34,8 +34,17 @@
var _ = require('lodash'); var _ = require('lodash');
module.exports = { module.exports = {
'unaryCall' : {
path: '/grpc.testing.BenchmarkService/UnaryCall',
requestStream: false,
responseStream: false,
requestSerialize: _.identity,
requestDeserialize: _.identity,
responseSerialize: _.identity,
responseDeserialize: _.identity
},
'streamingCall' : { 'streamingCall' : {
path: '/grpc.testing/BenchmarkService', path: '/grpc.testing.BenchmarkService/StreamingCall',
requestStream: true, requestStream: true,
responseStream: true, responseStream: true,
requestSerialize: _.identity, requestSerialize: _.identity,

View File

@ -89,6 +89,7 @@ module.exports = function WorkerServiceImpl(benchmark_impl, server) {
default: default:
call.emit('error', new Error('Unsupported PayloadConfig type' + call.emit('error', new Error('Unsupported PayloadConfig type' +
setup.payload_config.payload)); setup.payload_config.payload));
return;
} }
switch (setup.load_params.load) { switch (setup.load_params.load) {
case 'closed_loop': case 'closed_loop':
@ -103,6 +104,7 @@ module.exports = function WorkerServiceImpl(benchmark_impl, server) {
default: default:
call.emit('error', new Error('Unsupported LoadParams type' + call.emit('error', new Error('Unsupported LoadParams type' +
setup.load_params.load)); setup.load_params.load));
return;
} }
stats = client.mark(); stats = client.mark();
call.write({ call.write({
@ -137,8 +139,27 @@ module.exports = function WorkerServiceImpl(benchmark_impl, server) {
switch (request.argtype) { switch (request.argtype) {
case 'setup': case 'setup':
console.log('ServerConfig %j', request.setup); console.log('ServerConfig %j', request.setup);
var setup = request.setup;
var resp_size, generic;
if (setup.payload_config) {
switch (setup.payload_config.payload) {
case 'bytebuf_params':
resp_size = setup.payload_config.bytebuf_params.resp_size;
generic = true;
break;
case 'simple_params':
resp_size = setup.payload_config.simple_params.resp_size;
generic = false;
break;
default:
call.emit('error', new Error('Unsupported PayloadConfig type' +
setup.payload_config.payload));
return;
}
}
server = new BenchmarkServer('[::]', request.setup.port, server = new BenchmarkServer('[::]', request.setup.port,
request.setup.security_params); request.setup.security_params,
generic, resp_size);
server.on('started', function() { server.on('started', function() {
stats = server.mark(); stats = server.mark();
call.write({ call.write({