mirror of https://github.com/grpc/grpc-node.git
Fix race between parsing messages and receiving status in Node client
This commit is contained in:
parent
bb1a0b2334
commit
c85428c74b
|
@ -290,6 +290,7 @@ function timeoutOnSleepingServer(client, done) {
|
|||
call.write({
|
||||
payload: {body: zeroBuffer(27182)}
|
||||
});
|
||||
call.on('data', function() {});
|
||||
call.on('error', function(error) {
|
||||
|
||||
assert(error.code === grpc.status.DEADLINE_EXCEEDED ||
|
||||
|
@ -336,6 +337,7 @@ function customMetadata(client, done) {
|
|||
['test_initial_metadata_value']);
|
||||
done();
|
||||
});
|
||||
stream.on('data', function() {});
|
||||
stream.on('status', function(status) {
|
||||
var echo_trailer = status.metadata.get(ECHO_TRAILING_KEY);
|
||||
assert(echo_trailer.length > 0);
|
||||
|
@ -361,6 +363,7 @@ function statusCodeAndMessage(client, done) {
|
|||
done();
|
||||
});
|
||||
var duplex = client.fullDuplexCall();
|
||||
duplex.on('data', function() {});
|
||||
duplex.on('status', function(status) {
|
||||
assert(status);
|
||||
assert.strictEqual(status.code, 2);
|
||||
|
|
106
src/client.js
106
src/client.js
|
@ -131,8 +131,68 @@ function ClientReadableStream(call, deserialize) {
|
|||
this.finished = false;
|
||||
this.reading = false;
|
||||
this.deserialize = common.wrapIgnoreNull(deserialize);
|
||||
/* Status generated from reading messages from the server. Overrides the
|
||||
* status from the server if not OK */
|
||||
this.read_status = null;
|
||||
/* Status received from the server. */
|
||||
this.received_status = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when all messages from the server have been processed. The status
|
||||
* parameter indicates that the call should end with that status. status
|
||||
* defaults to OK if not provided.
|
||||
* @param {Object!} status The status that the call should end with
|
||||
*/
|
||||
function _readsDone(status) {
|
||||
/* jshint validthis: true */
|
||||
if (!status) {
|
||||
status = {code: grpc.status.OK, details: 'OK'};
|
||||
}
|
||||
this.finished = true;
|
||||
this.read_status = status;
|
||||
this._emitStatusIfDone();
|
||||
}
|
||||
|
||||
ClientReadableStream.prototype._readsDone = _readsDone;
|
||||
|
||||
/**
|
||||
* Called to indicate that we have received a status from the server.
|
||||
*/
|
||||
function _receiveStatus(status) {
|
||||
/* jshint validthis: true */
|
||||
this.received_status = status;
|
||||
this._emitStatusIfDone();
|
||||
}
|
||||
|
||||
ClientReadableStream.prototype._receiveStatus = _receiveStatus;
|
||||
|
||||
/**
|
||||
* If we have both processed all incoming messages and received the status from
|
||||
* the server, emit the status. Otherwise, do nothing.
|
||||
*/
|
||||
function _emitStatusIfDone() {
|
||||
/* jshint validthis: true */
|
||||
var status;
|
||||
if (this.read_status && this.received_status) {
|
||||
if (this.read_status.code !== grpc.status.OK) {
|
||||
status = this.read_status;
|
||||
} else {
|
||||
status = this.received_status;
|
||||
}
|
||||
this.emit('status', status);
|
||||
if (status.code !== grpc.status.OK) {
|
||||
var error = new Error(status.details);
|
||||
error.code = status.code;
|
||||
error.metadata = status.metadata;
|
||||
this.emit('error', error);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone;
|
||||
|
||||
/**
|
||||
* Read the next object from the stream.
|
||||
* @access private
|
||||
|
@ -150,6 +210,7 @@ function _read(size) {
|
|||
if (err) {
|
||||
// Something has gone wrong. Stop reading and wait for status
|
||||
self.finished = true;
|
||||
self._readsDone();
|
||||
return;
|
||||
}
|
||||
var data = event.read;
|
||||
|
@ -157,8 +218,11 @@ function _read(size) {
|
|||
try {
|
||||
deserialized = self.deserialize(data);
|
||||
} catch (e) {
|
||||
self.call.cancelWithStatus(grpc.status.INTERNAL,
|
||||
'Failed to parse server response');
|
||||
self._readsDone({code: grpc.status.INTERNAL,
|
||||
details: 'Failed to parse server response'});
|
||||
}
|
||||
if (data === null) {
|
||||
self._readsDone();
|
||||
}
|
||||
if (self.push(deserialized) && data !== null) {
|
||||
var read_batch = {};
|
||||
|
@ -198,6 +262,11 @@ function ClientDuplexStream(call, serialize, deserialize) {
|
|||
this.serialize = common.wrapIgnoreNull(serialize);
|
||||
this.deserialize = common.wrapIgnoreNull(deserialize);
|
||||
this.call = call;
|
||||
/* Status generated from reading messages from the server. Overrides the
|
||||
* status from the server if not OK */
|
||||
this.read_status = null;
|
||||
/* Status received from the server. */
|
||||
this.received_status = null;
|
||||
this.on('finish', function() {
|
||||
var batch = {};
|
||||
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
|
||||
|
@ -205,6 +274,9 @@ function ClientDuplexStream(call, serialize, deserialize) {
|
|||
});
|
||||
}
|
||||
|
||||
ClientDuplexStream.prototype._readsDone = _readsDone;
|
||||
ClientDuplexStream.prototype._receiveStatus = _receiveStatus;
|
||||
ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone;
|
||||
ClientDuplexStream.prototype._read = _read;
|
||||
ClientDuplexStream.prototype._write = _write;
|
||||
|
||||
|
@ -487,22 +559,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
|
|||
var status_batch = {};
|
||||
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
|
||||
call.startBatch(status_batch, function(err, response) {
|
||||
response.status.metadata = Metadata._fromCoreRepresentation(
|
||||
response.status.metadata);
|
||||
stream.emit('status', response.status);
|
||||
if (response.status.code !== grpc.status.OK) {
|
||||
var error = new Error(response.status.details);
|
||||
error.code = response.status.code;
|
||||
error.metadata = response.status.metadata;
|
||||
stream.emit('error', error);
|
||||
return;
|
||||
} else {
|
||||
if (err) {
|
||||
// Got a batch error, but OK status. Something went wrong
|
||||
stream.emit('error', err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
response.status.metadata = Metadata._fromCoreRepresentation(
|
||||
response.status.metadata);
|
||||
stream._receiveStatus(response.status);
|
||||
});
|
||||
return stream;
|
||||
}
|
||||
|
@ -552,22 +615,13 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
|
|||
var status_batch = {};
|
||||
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
|
||||
call.startBatch(status_batch, function(err, response) {
|
||||
response.status.metadata = Metadata._fromCoreRepresentation(
|
||||
response.status.metadata);
|
||||
stream.emit('status', response.status);
|
||||
if (response.status.code !== grpc.status.OK) {
|
||||
var error = new Error(response.status.details);
|
||||
error.code = response.status.code;
|
||||
error.metadata = response.status.metadata;
|
||||
stream.emit('error', error);
|
||||
return;
|
||||
} else {
|
||||
if (err) {
|
||||
// Got a batch error, but OK status. Something went wrong
|
||||
stream.emit('error', err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
response.status.metadata = Metadata._fromCoreRepresentation(
|
||||
response.status.metadata);
|
||||
stream._receiveStatus(response.status);
|
||||
});
|
||||
return stream;
|
||||
}
|
||||
|
|
|
@ -1000,6 +1000,7 @@ describe('Call propagation', function() {
|
|||
proxy_impl.serverStream = function(parent) {
|
||||
var child = client.serverStream(parent.request, null,
|
||||
{parent: parent});
|
||||
child.on('data', function() {});
|
||||
child.on('error', function(err) {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.CANCELLED);
|
||||
|
@ -1013,6 +1014,7 @@ describe('Call propagation', function() {
|
|||
var proxy_client = new Client('localhost:' + proxy_port,
|
||||
grpc.credentials.createInsecure());
|
||||
call = proxy_client.serverStream({});
|
||||
call.on('data', function() {});
|
||||
call.on('error', function(err) {
|
||||
done();
|
||||
});
|
||||
|
@ -1022,6 +1024,7 @@ describe('Call propagation', function() {
|
|||
var call;
|
||||
proxy_impl.bidiStream = function(parent) {
|
||||
var child = client.bidiStream(null, {parent: parent});
|
||||
child.on('data', function() {});
|
||||
child.on('error', function(err) {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.CANCELLED);
|
||||
|
@ -1035,6 +1038,7 @@ describe('Call propagation', function() {
|
|||
var proxy_client = new Client('localhost:' + proxy_port,
|
||||
grpc.credentials.createInsecure());
|
||||
call = proxy_client.bidiStream();
|
||||
call.on('data', function() {});
|
||||
call.on('error', function(err) {
|
||||
done();
|
||||
});
|
||||
|
@ -1074,6 +1078,7 @@ describe('Call propagation', function() {
|
|||
proxy_impl.bidiStream = function(parent) {
|
||||
var child = client.bidiStream(
|
||||
null, {parent: parent, propagate_flags: deadline_flags});
|
||||
child.on('data', function() {});
|
||||
child.on('error', function(err) {
|
||||
assert(err);
|
||||
assert(err.code === grpc.status.DEADLINE_EXCEEDED ||
|
||||
|
@ -1089,6 +1094,7 @@ describe('Call propagation', function() {
|
|||
var deadline = new Date();
|
||||
deadline.setSeconds(deadline.getSeconds() + 1);
|
||||
var call = proxy_client.bidiStream(null, {deadline: deadline});
|
||||
call.on('data', function() {});
|
||||
call.on('error', function(err) {
|
||||
done();
|
||||
});
|
||||
|
@ -1130,6 +1136,7 @@ describe('Cancelling surface client', function() {
|
|||
});
|
||||
it('Should correctly cancel a server stream call', function(done) {
|
||||
var call = client.fib({'limit': 5});
|
||||
call.on('data', function() {});
|
||||
call.on('error', function(error) {
|
||||
assert.strictEqual(error.code, surface_client.status.CANCELLED);
|
||||
done();
|
||||
|
@ -1138,6 +1145,7 @@ describe('Cancelling surface client', function() {
|
|||
});
|
||||
it('Should correctly cancel a bidi stream call', function(done) {
|
||||
var call = client.divMany();
|
||||
call.on('data', function() {});
|
||||
call.on('error', function(error) {
|
||||
assert.strictEqual(error.code, surface_client.status.CANCELLED);
|
||||
done();
|
||||
|
|
Loading…
Reference in New Issue