import {EventEmitter} from 'events'; import {EmitterAugmentation1} from './events'; import {Duplex, Readable, Writable} from 'stream'; import {CallStream, StatusObject, WriteObject} from './call-stream'; import {Status} from './constants'; import {Metadata} from './metadata'; import {ObjectReadable, ObjectWritable} from './object-stream'; export interface ServiceError extends Error { code?: number; metadata?: Metadata; } export class ServiceErrorImpl extends Error implements ServiceError { code?: number; metadata?: Metadata; } export type Call = { cancel(): void; getPeer(): string; } & EmitterAugmentation1<'metadata', Metadata> & EmitterAugmentation1<'status', StatusObject> & EventEmitter; export type ClientUnaryCall = Call; export type ClientReadableStream = { deserialize: (chunk: Buffer) => ResponseType; } & Call & ObjectReadable; export type ClientWritableStream = { serialize: (value: RequestType) => Buffer; } & Call & ObjectWritable; export type ClientDuplexStream = ClientWritableStream & ClientReadableStream; export class ClientUnaryCallImpl extends EventEmitter implements ClientUnaryCall { constructor(private readonly call: CallStream) { super(); call.on('metadata', (metadata: Metadata) => { this.emit('metadata', metadata); }); call.on('status', (status: StatusObject) => { this.emit('status', status); }); } cancel(): void { this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); } getPeer(): string { return this.call.getPeer(); } } function setUpReadableStream( stream: ClientReadableStream, call: CallStream, deserialize: (chunk: Buffer) => ResponseType): void { call.on('data', (data: Buffer) => { let deserialized: ResponseType; try { deserialized = deserialize(data); } catch (e) { call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response'); return; } if (!stream.push(deserialized)) { call.pause(); } }); call.on('end', () => { stream.push(null); }); call.on('status', (status: StatusObject) => { stream.emit('status', status); if (status.code !== Status.OK) { const error = new ServiceErrorImpl(status.details); error.code = status.code; error.metadata = status.metadata; stream.emit('error', error); } }); call.pause(); } export class ClientReadableStreamImpl extends Readable implements ClientReadableStream { constructor( private readonly call: CallStream, public readonly deserialize: (chunk: Buffer) => ResponseType) { super({objectMode: true}); call.on('metadata', (metadata: Metadata) => { this.emit('metadata', metadata); }); setUpReadableStream(this, call, deserialize); } cancel(): void { this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); } getPeer(): string { return this.call.getPeer(); } _read(_size: number): void { this.call.resume(); } } function tryWrite( call: CallStream, serialize: (value: RequestType) => Buffer, chunk: RequestType, encoding: string, cb: Function) { let message: Buffer; const flags: number = Number(encoding); try { message = serialize(chunk); } catch (e) { call.cancelWithStatus(Status.INTERNAL, 'Serialization failure'); cb(e); return; } const writeObj: WriteObject = {message: message}; if (!Number.isNaN(flags)) { writeObj.flags = flags; } call.write(writeObj, cb); } export class ClientWritableStreamImpl extends Writable implements ClientWritableStream { constructor( private readonly call: CallStream, public readonly serialize: (value: RequestType) => Buffer) { super({objectMode: true}); call.on('metadata', (metadata: Metadata) => { this.emit('metadata', metadata); }); call.on('status', (status: StatusObject) => { this.emit('status', status); }); } cancel(): void { this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); } getPeer(): string { return this.call.getPeer(); } _write(chunk: RequestType, encoding: string, cb: Function) { tryWrite(this.call, this.serialize, chunk, encoding, cb); } _final(cb: Function) { this.call.end(); cb(); } } export class ClientDuplexStreamImpl extends Duplex implements ClientDuplexStream { constructor( private readonly call: CallStream, public readonly serialize: (value: RequestType) => Buffer, public readonly deserialize: (chunk: Buffer) => ResponseType) { super({objectMode: true}); call.on('metadata', (metadata: Metadata) => { this.emit('metadata', metadata); }); setUpReadableStream(this, call, deserialize); } cancel(): void { this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); } getPeer(): string { return this.call.getPeer(); } _read(_size: number): void { this.call.resume(); } _write(chunk: RequestType, encoding: string, cb: Function) { tryWrite(this.call, this.serialize, chunk, encoding, cb); } _final(cb: Function) { this.call.end(); cb(); } }