grpc-js: add bidirectional streaming RPC support

This commit adds bidi streaming RPC support to the server.
This commit is contained in:
cjihrig 2019-05-16 11:23:54 -04:00
parent 3bebc2230a
commit 1aa11525fd
No known key found for this signature in database
GPG Key ID: 7434390BDBE9B9C5
4 changed files with 114 additions and 3 deletions

View File

@ -204,13 +204,21 @@ export class ServerWritableStreamImpl<RequestType, ResponseType> extends
export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
implements ServerDuplexStream<RequestType, ResponseType> {
cancelled: boolean;
private trailingMetadata: Metadata;
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata, private _serialize: Serialize<ResponseType>,
private _deserialize: Deserialize<RequestType>) {
public metadata: Metadata, public serialize: Serialize<ResponseType>,
public deserialize: Deserialize<RequestType>) {
super({objectMode: true});
this.cancelled = false;
this.trailingMetadata = new Metadata();
this.call.setupReadable(this);
this.on('error', (err) => {
this.call.sendError(err as ServiceError);
this.end();
});
}
getPeer(): string {
@ -222,6 +230,14 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
}
}
ServerDuplexStreamImpl.prototype._read =
ServerReadableStreamImpl.prototype._read;
ServerDuplexStreamImpl.prototype._write =
ServerWritableStreamImpl.prototype._write;
ServerDuplexStreamImpl.prototype._final =
ServerWritableStreamImpl.prototype._final;
ServerDuplexStreamImpl.prototype.end = ServerWritableStreamImpl.prototype.end;
// Unary response callback signature.
export type sendUnaryData<ResponseType> =

View File

@ -355,5 +355,12 @@ function handleBidiStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: BidiStreamingHandler<RequestType, ResponseType>,
metadata: Metadata): void {
throw new Error('not implemented yet');
const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(
call, metadata, handler.serialize, handler.deserialize);
if (call.cancelled) {
return;
}
handler.func(stream);
}

View File

@ -147,6 +147,20 @@ describe('Client malformed response handling', () => {
done();
});
});
it('should get an INTERNAL status with a bidi stream call', (done) => {
const call = client.bidiStream();
call.on('data', noop);
call.on('error', (err: ServiceError) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
call.write({});
call.end();
});
});
describe('Server serialization failure handling', () => {
@ -444,6 +458,23 @@ describe('Other conditions', () => {
done();
});
});
it('should respond correctly to a bidi stream', (done) => {
const call = misbehavingClient.bidiStream();
call.on('data', (data: any) => {
assert.fail(data);
});
call.on('error', (err: ServiceError) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
call.write(badArg);
call.end();
});
});
describe('Trailing metadata', () => {
@ -561,6 +592,33 @@ describe('Other conditions', () => {
done();
});
});
it('should be present when a bidi stream succeeds', (done) => {
const call = client.bidiStream();
call.write({error: false});
call.write({error: false});
call.end();
call.on('data', noop);
call.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
it('should be present when a bidi stream fails', (done) => {
const call = client.bidiStream();
call.write({error: false});
call.write({error: true});
call.end();
call.on('data', noop);
call.on('error', (error: ServiceError) => {
assert.deepStrictEqual(error.metadata.get('trailer-present'), ['yes']);
done();
});
});
});
describe('Error object should contain the status', () => {
@ -597,6 +655,20 @@ describe('Other conditions', () => {
});
});
it('for a bidi stream call', (done) => {
const call = client.bidiStream();
call.write({error: false});
call.write({error: true});
call.end();
call.on('data', noop);
call.on('error', (error: ServiceError) => {
assert.strictEqual(error.code, grpc.status.UNKNOWN);
assert.strictEqual(error.details, 'Requested error');
done();
});
});
it('for a UTF-8 error message', (done) => {
client.unary(
{error: true, message: '測試字符串'},

View File

@ -286,6 +286,22 @@ describe('Server', () => {
done();
});
});
it('should respond to a bidi call with UNIMPLEMENTED', (done) => {
const call = client.divMany();
call.on('data', (value: any) => {
assert.fail('No messages expected');
});
call.on('error', (err: ServiceError) => {
assert(err);
assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
done();
});
call.end();
});
});
});