Merge branch 'slice_with_exec_ctx' into metadata_filter

This commit is contained in:
Craig Tiller 2016-12-27 08:59:00 -08:00
commit ce3b1befa9
3 changed files with 40 additions and 12 deletions

View File

@ -99,7 +99,18 @@ function ClientWritableStream(call, serialize) {
function _write(chunk, encoding, callback) { function _write(chunk, encoding, callback) {
/* jshint validthis: true */ /* jshint validthis: true */
var batch = {}; var batch = {};
var message = this.serialize(chunk); var message;
try {
message = this.serialize(chunk);
} catch (e) {
/* Sending this error to the server and emitting it immediately on the
client may put the call in a slightly weird state on the client side,
but passing an object that causes a serialization failure is a misuse
of the API anyway, so that's OK. The primary purpose here is to give the
programmer a useful error and to stop the stream properly */
this.call.cancelWithStatus(grpc.status.INTERNAL, "Serialization failure");
callback(e);
}
if (_.isFinite(encoding)) { if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we /* Attach the encoding if it is a finite number. This is the closest we
* can get to checking that it is valid flags */ * can get to checking that it is valid flags */
@ -184,14 +195,15 @@ function _emitStatusIfDone() {
} else { } else {
status = this.received_status; status = this.received_status;
} }
this.emit('status', status); if (status.code === grpc.status.OK) {
if (status.code !== grpc.status.OK) { this.push(null);
} else {
var error = new Error(status.details); var error = new Error(status.details);
error.code = status.code; error.code = status.code;
error.metadata = status.metadata; error.metadata = status.metadata;
this.emit('error', error); this.emit('error', error);
return;
} }
this.emit('status', status);
} }
} }
@ -224,9 +236,11 @@ function _read(size) {
} catch (e) { } catch (e) {
self._readsDone({code: grpc.status.INTERNAL, self._readsDone({code: grpc.status.INTERNAL,
details: 'Failed to parse server response'}); details: 'Failed to parse server response'});
return;
} }
if (data === null) { if (data === null) {
self._readsDone(); self._readsDone();
return;
} }
if (self.push(deserialized) && data !== null) { if (self.push(deserialized) && data !== null) {
var read_batch = {}; var read_batch = {};
@ -396,6 +410,8 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
var status = response.status; var status = response.status;
var error; var error;
var deserialized; var deserialized;
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
if (status.code === grpc.status.OK) { if (status.code === grpc.status.OK) {
if (err) { if (err) {
// Got a batch error, but OK status. Something went wrong // Got a batch error, but OK status. Something went wrong
@ -423,8 +439,6 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
args.callback(null, deserialized); args.callback(null, deserialized);
} }
emitter.emit('status', status); emitter.emit('status', status);
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
}); });
return emitter; return emitter;
} }

View File

@ -127,7 +127,14 @@ function sendUnaryResponse(call, value, serialize, metadata, flags) {
(new Metadata())._getCoreRepresentation(); (new Metadata())._getCoreRepresentation();
call.metadataSent = true; call.metadataSent = true;
} }
var message = serialize(value); var message;
try {
message = serialize(value);
} catch (e) {
e.code = grpc.status.INTERNAL;
handleError(e);
return;
}
message.grpcWriteFlags = flags; message.grpcWriteFlags = flags;
end_batch[grpc.opType.SEND_MESSAGE] = message; end_batch[grpc.opType.SEND_MESSAGE] = message;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
@ -278,7 +285,14 @@ function _write(chunk, encoding, callback) {
(new Metadata())._getCoreRepresentation(); (new Metadata())._getCoreRepresentation();
this.call.metadataSent = true; this.call.metadataSent = true;
} }
var message = this.serialize(chunk); var message;
try {
message = this.serialize(chunk);
} catch (e) {
e.code = grpc.status.INTERNAL;
callback(e);
return;
}
if (_.isFinite(encoding)) { if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we /* Attach the encoding if it is a finite number. This is the closest we
* can get to checking that it is valid flags */ * can get to checking that it is valid flags */

View File

@ -179,8 +179,8 @@ describe('Server.prototype.addProtoService', function() {
call.on('data', function(value) { call.on('data', function(value) {
assert.fail('No messages expected'); assert.fail('No messages expected');
}); });
call.on('status', function(status) { call.on('error', function(err) {
assert.strictEqual(status.code, grpc.status.UNIMPLEMENTED); assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
done(); done();
}); });
}); });
@ -189,8 +189,8 @@ describe('Server.prototype.addProtoService', function() {
call.on('data', function(value) { call.on('data', function(value) {
assert.fail('No messages expected'); assert.fail('No messages expected');
}); });
call.on('status', function(status) { call.on('error', function(err) {
assert.strictEqual(status.code, grpc.status.UNIMPLEMENTED); assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
done(); done();
}); });
call.end(); call.end();