mirror of https://github.com/grpc/grpc-node.git
				
				
				
			grpc-js: add client streaming RPC support
This commit adds client streaming RPC support.
This commit is contained in:
		
							parent
							
								
									b36b285f4c
								
							
						
					
					
						commit
						b8af8c9474
					
				| 
						 | 
				
			
			@ -24,6 +24,7 @@ import {StatusObject} from './call-stream';
 | 
			
		|||
import {Status} from './constants';
 | 
			
		||||
import {Deserialize, Serialize} from './make-client';
 | 
			
		||||
import {Metadata} from './metadata';
 | 
			
		||||
import {StreamDecoder} from './stream-decoder';
 | 
			
		||||
 | 
			
		||||
function noop(): void {}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -100,6 +101,7 @@ export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
 | 
			
		|||
export class ServerReadableStreamImpl<RequestType, ResponseType> extends
 | 
			
		||||
    Readable implements ServerReadableStream<RequestType, ResponseType> {
 | 
			
		||||
  cancelled: boolean;
 | 
			
		||||
  private decoder: StreamDecoder;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
      private call: Http2ServerCallStream<RequestType, ResponseType>,
 | 
			
		||||
| 
						 | 
				
			
			@ -107,6 +109,44 @@ export class ServerReadableStreamImpl<RequestType, ResponseType> extends
 | 
			
		|||
      private _deserialize: Deserialize<RequestType>) {
 | 
			
		||||
    super({objectMode: true});
 | 
			
		||||
    this.cancelled = false;
 | 
			
		||||
    this.decoder = new StreamDecoder();
 | 
			
		||||
 | 
			
		||||
    const http2Stream = this.call._getHttp2Stream();
 | 
			
		||||
 | 
			
		||||
    http2Stream.on('data', async (data: Buffer) => {
 | 
			
		||||
      const message = this.decoder.write(data);
 | 
			
		||||
 | 
			
		||||
      if (message === null) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      try {
 | 
			
		||||
        const deserialized = await this.call.deserializeMessage(message);
 | 
			
		||||
 | 
			
		||||
        if (!this.push(deserialized)) {
 | 
			
		||||
          http2Stream.pause();
 | 
			
		||||
        }
 | 
			
		||||
      } catch (err) {
 | 
			
		||||
        err.code = Status.INTERNAL;
 | 
			
		||||
        this.emit('error', err);
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    http2Stream.once('end', () => {
 | 
			
		||||
      this.push(null);
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  _read(size: number) {
 | 
			
		||||
    this.call._getHttp2Stream().resume();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  deserialize(input: Buffer): RequestType|null {
 | 
			
		||||
    if (input === null || input === undefined) {
 | 
			
		||||
      return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return this._deserialize(input);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  getPeer(): string {
 | 
			
		||||
| 
						 | 
				
			
			@ -468,6 +508,10 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
 | 
			
		|||
    this.sendMetadata();
 | 
			
		||||
    return this.stream.write(chunk);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  _getHttp2Stream(): http2.ServerHttp2Stream {
 | 
			
		||||
    return this.stream;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// tslint:disable:no-any
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -314,7 +314,22 @@ function handleClientStreaming<RequestType, ResponseType>(
 | 
			
		|||
    call: Http2ServerCallStream<RequestType, ResponseType>,
 | 
			
		||||
    handler: ClientStreamingHandler<RequestType, ResponseType>,
 | 
			
		||||
    metadata: Metadata): void {
 | 
			
		||||
  throw new Error('not implemented yet');
 | 
			
		||||
  const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
 | 
			
		||||
      call, metadata, handler.deserialize);
 | 
			
		||||
 | 
			
		||||
  function respond(
 | 
			
		||||
      err: ServiceError|null, value: ResponseType|null, trailer?: Metadata,
 | 
			
		||||
      flags?: number) {
 | 
			
		||||
    stream.destroy();
 | 
			
		||||
    call.sendUnaryMessage(err, value, trailer, flags);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (call.cancelled) {
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  stream.on('error', respond);
 | 
			
		||||
  handler.func(stream, respond);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -126,6 +126,17 @@ describe('Client malformed response handling', () => {
 | 
			
		|||
    });
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should get an INTERNAL status with a client stream call', (done) => {
 | 
			
		||||
    const call = client.clientStream((err: ServiceError, data: any) => {
 | 
			
		||||
      assert(err);
 | 
			
		||||
      assert.strictEqual(err.code, grpc.status.INTERNAL);
 | 
			
		||||
      done();
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    call.write({});
 | 
			
		||||
    call.end();
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should get an INTERNAL status with a server stream call', (done) => {
 | 
			
		||||
    const call = client.serverStream({});
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -229,6 +240,17 @@ describe('Server serialization failure handling', () => {
 | 
			
		|||
    });
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should get an INTERNAL status with a client stream call', (done) => {
 | 
			
		||||
    const call = client.clientStream((err: ServiceError, data: any) => {
 | 
			
		||||
      assert(err);
 | 
			
		||||
      assert.strictEqual(err.code, grpc.status.INTERNAL);
 | 
			
		||||
      done();
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    call.write({});
 | 
			
		||||
    call.end();
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should get an INTERNAL status with a server stream call', (done) => {
 | 
			
		||||
    const call = client.serverStream({});
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -397,6 +419,18 @@ describe('Other conditions', () => {
 | 
			
		|||
      });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should respond correctly to a client stream', (done) => {
 | 
			
		||||
      const call =
 | 
			
		||||
          misbehavingClient.clientStream((err: ServiceError, data: any) => {
 | 
			
		||||
            assert(err);
 | 
			
		||||
            assert.strictEqual(err.code, grpc.status.INTERNAL);
 | 
			
		||||
            done();
 | 
			
		||||
          });
 | 
			
		||||
 | 
			
		||||
      call.write(badArg);
 | 
			
		||||
      call.end();
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should respond correctly to a server stream', (done) => {
 | 
			
		||||
      const call = misbehavingClient.serverStream(badArg);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -457,6 +491,56 @@ describe('Other conditions', () => {
 | 
			
		|||
      });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should be present when a client stream call succeeds', (done) => {
 | 
			
		||||
      let count = 0;
 | 
			
		||||
      const call = client.clientStream((err: ServiceError, data: any) => {
 | 
			
		||||
        assert.ifError(err);
 | 
			
		||||
 | 
			
		||||
        count++;
 | 
			
		||||
        if (count === 2) {
 | 
			
		||||
          done();
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      call.write({error: false});
 | 
			
		||||
      call.write({error: false});
 | 
			
		||||
      call.end();
 | 
			
		||||
 | 
			
		||||
      call.on('status', (status: grpc.StatusObject) => {
 | 
			
		||||
        assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
 | 
			
		||||
 | 
			
		||||
        count++;
 | 
			
		||||
        if (count === 2) {
 | 
			
		||||
          done();
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should be present when a client stream call fails', (done) => {
 | 
			
		||||
      let count = 0;
 | 
			
		||||
      const call = client.clientStream((err: ServiceError, data: any) => {
 | 
			
		||||
        assert(err);
 | 
			
		||||
 | 
			
		||||
        count++;
 | 
			
		||||
        if (count === 2) {
 | 
			
		||||
          done();
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      call.write({error: false});
 | 
			
		||||
      call.write({error: true});
 | 
			
		||||
      call.end();
 | 
			
		||||
 | 
			
		||||
      call.on('status', (status: grpc.StatusObject) => {
 | 
			
		||||
        assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
 | 
			
		||||
 | 
			
		||||
        count++;
 | 
			
		||||
        if (count === 2) {
 | 
			
		||||
          done();
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should be present when a server stream call succeeds', (done) => {
 | 
			
		||||
      const call = client.serverStream({error: false});
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -489,6 +573,19 @@ describe('Other conditions', () => {
 | 
			
		|||
      });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('for a client stream call', (done) => {
 | 
			
		||||
      const call = client.clientStream((err: ServiceError, data: any) => {
 | 
			
		||||
        assert(err);
 | 
			
		||||
        assert.strictEqual(err.code, grpc.status.UNKNOWN);
 | 
			
		||||
        assert.strictEqual(err.details, 'Requested error');
 | 
			
		||||
        done();
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      call.write({error: false});
 | 
			
		||||
      call.write({error: true});
 | 
			
		||||
      call.end();
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('for a server stream call', (done) => {
 | 
			
		||||
      const call = client.serverStream({error: true});
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -263,6 +263,16 @@ describe('Server', () => {
 | 
			
		|||
          });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should respond to a client stream with UNIMPLEMENTED', (done) => {
 | 
			
		||||
      const call = client.sum((error: ServiceError, response: any) => {
 | 
			
		||||
        assert(error);
 | 
			
		||||
        assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED);
 | 
			
		||||
        done();
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      call.end();
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should respond to a server stream with UNIMPLEMENTED', (done) => {
 | 
			
		||||
      const call = client.fib({limit: 5});
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue