diff --git a/.gitignore b/.gitignore index caa17e01..aec475c0 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ node_modules/ npm-debug.log yarn-error.log yarn.lock + +*~ \ No newline at end of file diff --git a/src/call-stream.ts b/src/call-stream.ts index 16c5348d..37e98314 100644 --- a/src/call-stream.ts +++ b/src/call-stream.ts @@ -1,14 +1,72 @@ import * as stream from 'stream'; -import { Status } from './constants'; + +import {CallCredentials} from './call-credentials'; +import {Status} from './constants'; +import {Metadata} from './metadata'; +import {ObjectDuplex} from './object-stream'; + +export interface CallOptions { + deadline?: Date|number; + host?: string; + credentials?: CallCredentials; + flags?: number; +} + +export interface StatusObject { + code: Status; + details: string; + metadata: Metadata; +} + +export interface WriteObject { + message: Buffer; + flags?: number; +} + +export interface CallStream extends ObjectDuplex { + cancelWithStatus(status: Status, details: string): void; + getPeer(): string; + + addListener(event: string, listener: Function): this; + emit(event: string|symbol, ...args: any[]): boolean; + on(event: string, listener: Function): this; + once(event: string, listener: Function): this; + prependListener(event: string, listener: Function): this; + prependOnceListener(event: string, listener: Function): this; + removeListener(event: string, listener: Function): this; + + addListener(event: 'metadata', listener: (metadata: Metadata) => void): this; + emit(event: 'metadata', metadata: Metadata): boolean; + on(event: 'metadata', listener: (metadata: Metadata) => void): this; + once(event: 'metadata', listener: (metadata: Metadata) => void): this; + prependListener(event: 'metadata', listener: (metadata: Metadata) => void): + this; + prependOnceListener( + event: 'metadata', listener: (metadata: Metadata) => void): this; + removeListener(event: 'metadata', listener: (metadata: Metadata) => void): + this; + + addListener(event: 'status', listener: (status: StatusObject) => void): this; + emit(event: 'status', status: StatusObject): boolean; + on(event: 'status', listener: (status: StatusObject) => void): this; + once(event: 'status', listener: (status: StatusObject) => void): this; + prependListener(event: 'status', listener: (status: StatusObject) => void): + this; + prependOnceListener( + event: 'status', listener: (status: StatusObject) => void): this; + removeListener(event: 'status', listener: (status: StatusObject) => void): + this; +} /** * This class represents a duplex stream associated with a single gRPC call. */ -export class CallStream extends stream.Duplex { - /** - * Cancels the call associated with this stream with a given status. - */ - cancelWithStatus(status: Status) { - throw new Error(); +export class CallStreamImpl extends stream.Duplex implements CallStream { + cancelWithStatus(status: Status, details: string): void { + throw new Error('Not yet implemented'); + } + + getPeer(): string { + throw new Error('Not yet implemented'); } } diff --git a/src/call.ts b/src/call.ts index da26bf19..ba38a41c 100644 --- a/src/call.ts +++ b/src/call.ts @@ -1,17 +1,246 @@ -import * as events from 'events'; -import * as stream from 'stream'; -const { EventEmitter } = events; -const { Readable, Writable, Duplex } = stream; +import {EventEmitter} from 'events'; +import {Duplex, Readable, Writable} from 'stream'; -export interface Call { - // cancel(); - // getPeer(); +import {CallStream, StatusObject, WriteObject} from './call-stream'; +import {Status} from './constants'; +import {Metadata} from './metadata'; +import {ObjectReadable, ObjectWritable} from './object-stream'; + +export interface ServiceError extends Error { + code?: number; + metadata?: Metadata; } -export class ClientUnaryCall extends EventEmitter implements Call {} +export class ServiceErrorImpl extends Error implements ServiceError { + code?: number; + metadata?: Metadata; +} -export class ClientReadableStream extends Readable implements Call {} +export interface Call extends EventEmitter { + cancel(): void; + getPeer(): string; -export class ClientWritableStream extends Writable implements Call {} + addListener(event: string, listener: Function): this; + emit(event: string|symbol, ...args: any[]): boolean; + on(event: string, listener: Function): this; + once(event: string, listener: Function): this; + prependListener(event: string, listener: Function): this; + prependOnceListener(event: string, listener: Function): this; + removeListener(event: string, listener: Function): this; -export class ClientDuplexStream extends Duplex implements Call {} + addListener(event: 'metadata', listener: (metadata: Metadata) => void): this; + emit(event: 'metadata', metadata: Metadata): boolean; + on(event: 'metadata', listener: (metadata: Metadata) => void): this; + once(event: 'metadata', listener: (metadata: Metadata) => void): this; + prependListener(event: 'metadata', listener: (metadata: Metadata) => void): + this; + prependOnceListener( + event: 'metadata', listener: (metadata: Metadata) => void): this; + removeListener(event: 'metadata', listener: (metadata: Metadata) => void): + this; +} + +export interface ClientUnaryCall extends Call {} + +export class ClientUnaryCallImpl extends EventEmitter implements Call { + constructor(private readonly call: CallStream) { + super(); + call.on('metadata', (metadata: Metadata) => { + this.emit('metadata', metadata); + }); + } + + cancel(): void { + this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); + } + + getPeer(): string { + return this.call.getPeer(); + } +} + +export interface ClientReadableStream extends + Call, ObjectReadable { + deserialize: (chunk: Buffer) => ResponseType; + + addListener(event: string, listener: Function): this; + emit(event: string|symbol, ...args: any[]): boolean; + on(event: string, listener: Function): this; + once(event: string, listener: Function): this; + prependListener(event: string, listener: Function): this; + prependOnceListener(event: string, listener: Function): this; + removeListener(event: string, listener: Function): this; + + addListener(event: 'status', listener: (status: StatusObject) => void): this; + emit(event: 'status', status: StatusObject): boolean; + on(event: 'status', listener: (status: StatusObject) => void): this; + once(event: 'status', listener: (status: StatusObject) => void): this; + prependListener(event: 'status', listener: (status: StatusObject) => void): + this; + prependOnceListener( + event: 'status', listener: (status: StatusObject) => void): this; + removeListener(event: 'status', listener: (status: StatusObject) => void): + this; +} + +export interface ClientWritableStream extends + Call, ObjectWritable { + serialize: (value: RequestType) => Buffer; + + addListener(event: string, listener: Function): this; + emit(event: string|symbol, ...args: any[]): boolean; + on(event: string, listener: Function): this; + once(event: string, listener: Function): this; + prependListener(event: string, listener: Function): this; + prependOnceListener(event: string, listener: Function): this; + removeListener(event: string, listener: Function): this; +} + +export interface ClientDuplexStream extends + ClientWritableStream, ClientReadableStream { + addListener(event: string, listener: Function): this; + emit(event: string|symbol, ...args: any[]): boolean; + on(event: string, listener: Function): this; + once(event: string, listener: Function): this; + prependListener(event: string, listener: Function): this; + prependOnceListener(event: string, listener: Function): this; + removeListener(event: string, listener: Function): this; +} + +function setUpReadableStream( + stream: ClientReadableStream, call: CallStream, + deserialize: (chunk: Buffer) => ResponseType): void { + call.on('data', (data: Buffer) => { + let deserialized: ResponseType; + try { + deserialized = deserialize(data); + } catch (e) { + call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response'); + return; + } + if (!stream.push(deserialized)) { + call.pause(); + } + }); + call.on('end', () => { + stream.push(null); + }); + call.on('status', (status: StatusObject) => { + stream.emit('status', status); + if (status.code !== Status.OK) { + const error = new ServiceErrorImpl(status.details); + error.code = status.code; + error.metadata = status.metadata; + stream.emit('error', error); + } + }); + call.pause(); +} + +export class ClientReadableStreamImpl extends Readable implements + ClientReadableStream { + constructor( + private readonly call: CallStream, + public readonly deserialize: (chunk: Buffer) => ResponseType) { + super({objectMode: true}); + call.on('metadata', (metadata: Metadata) => { + this.emit('metadata', metadata); + }); + setUpReadableStream(this, call, deserialize); + } + + cancel(): void { + this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); + } + + getPeer(): string { + return this.call.getPeer(); + } + + _read(_size: number): void { + this.call.resume(); + } +} + +function tryWrite( + call: CallStream, serialize: (value: RequestType) => Buffer, + chunk: RequestType, encoding: string, cb: Function) { + let message: Buffer; + const flags: number = Number(encoding); + try { + message = serialize(chunk); + } catch (e) { + call.cancelWithStatus(Status.INTERNAL, 'Serialization failure'); + cb(e); + return; + } + const writeObj: WriteObject = {message: message}; + if (!Number.isNaN(flags)) { + writeObj.flags = flags; + } + call.write(writeObj, cb); +} + +export class ClientWritableStreamImpl extends Writable implements + ClientWritableStream { + constructor( + private readonly call: CallStream, + public readonly serialize: (value: RequestType) => Buffer) { + super({objectMode: true}); + call.on('metadata', (metadata: Metadata) => { + this.emit('metadata', metadata); + }); + } + + cancel(): void { + this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); + } + + getPeer(): string { + return this.call.getPeer(); + } + + _write(chunk: RequestType, encoding: string, cb: Function) { + tryWrite(this.call, this.serialize, chunk, encoding, cb); + } + + _final(cb: Function) { + this.call.end(); + cb(); + } +} + +export class ClientDuplexStreamImpl extends Duplex + implements ClientDuplexStream { + constructor( + private readonly call: CallStream, + public readonly serialize: (value: RequestType) => Buffer, + public readonly deserialize: (chunk: Buffer) => ResponseType) { + super({objectMode: true}); + call.on('metadata', (metadata: Metadata) => { + this.emit('metadata', metadata); + }); + setUpReadableStream(this, call, deserialize); + } + + cancel(): void { + this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); + } + + getPeer(): string { + return this.call.getPeer(); + } + + _read(_size: number): void { + this.call.resume(); + } + + _write(chunk: RequestType, encoding: string, cb: Function) { + tryWrite(this.call, this.serialize, chunk, encoding, cb); + } + + _final(cb: Function) { + this.call.end(); + cb(); + } +} diff --git a/src/channel.ts b/src/channel.ts index 45e5dceb..4abbdf5a 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -1,25 +1,30 @@ -import { CallStream } from './call-stream'; -import { ChannelCredentials } from './channel-credentials'; -import { Metadata } from './metadata'; +import {CallOptions, CallStream} from './call-stream'; +import {ChannelCredentials} from './channel-credentials'; +import {Metadata} from './metadata'; /** * An interface that contains options used when initializing a Channel instance. */ -export interface ChannelOptions {} +export interface ChannelOptions { [index: string]: string|number; } export class SubChannel {} -// todo: maybe we want an interface for load balancing, but no implementation for anything complicated +// todo: maybe we want an interface for load balancing, but no implementation +// for anything complicated /** - * A class that represents a communication channel to a server specified by a given address. + * A class that represents a communication channel to a server specified by a + * given address. */ export class Channel { - constructor(address: string, credentials?: ChannelCredentials, options?: ChannelOptions) { + constructor( + address: string, credentials?: ChannelCredentials, + options?: ChannelOptions) { throw new Error(); } - createStream(methodName: string, metadata: Metadata) : CallStream { + createStream(methodName: string, metadata: Metadata, options: CallOptions): + CallStream { throw new Error(); } diff --git a/src/client.ts b/src/client.ts index f5134d51..e289284c 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,19 +1,245 @@ -import { ClientUnaryCall } from './call' -import { Metadata } from './metadata' +import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError, ServiceErrorImpl} from './call'; +import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream'; +import {Channel, ChannelOptions} from './channel'; +import {ChannelCredentials} from './channel-credentials'; +import {Status} from './constants'; +import {Metadata} from './metadata'; -export interface CallOptions {} +export interface UnaryCallback { + (err: ServiceError|null, value?: ResponseType): void; +} export class Client { - makeUnaryRequest( - method: string, - serialize: (value: RequestType) => Buffer, - deserialize: (value: Buffer) => ResponseType, - argument: RequestType, - metadata?: Metadata, - options?: CallOptions, - callback?: (err?: Error, value?: ResponseType) => void - ): ClientUnaryCall { - throw new Error(); + private readonly channel: Channel; + constructor( + address: string, credentials: ChannelCredentials, + options: ChannelOptions = {}) { + if (options['grpc.primary_user_agent']) { + options['grpc.primary_user_agent'] += ' '; + } else { + options['grpc.primary_user_agent'] = ''; + } + // TODO(murgatroid99): Figure out how to get version number + // options['grpc.primary_user_agent'] += 'grpc-node/' + version; + this.channel = new Channel(address, credentials, options); } - // TODO: Do for other method types and overloads -} \ No newline at end of file + + close(): void { + this.channel.close(); + } + + waitForReady(deadline: Date|number, callback: (error: Error|null) => void): + void { + throw new Error('waitForReady is not yet implemented'); + } + + private handleUnaryResponse( + call: CallStream, deserialize: (value: Buffer) => ResponseType, + callback: UnaryCallback): void { + let responseMessage: ResponseType|null = null; + call.on('data', (data: Buffer) => { + if (responseMessage != null) { + call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); + } + try { + responseMessage = deserialize(data); + } catch (e) { + call.cancelWithStatus( + Status.INTERNAL, 'Failed to parse server response'); + } + }); + call.on('end', () => { + if (responseMessage == null) { + call.cancelWithStatus(Status.INTERNAL, 'Not enough responses received'); + } + }); + call.on('status', (status: StatusObject) => { + /* We assume that call emits status after it emits end, and that it + * accounts for any cancelWithStatus calls up until it emits status. + * Therefore, considering the above event handlers, status.code should be + * OK if and only if we have a non-null responseMessage */ + if (status.code === Status.OK) { + callback(null, responseMessage as ResponseType); + } else { + const error = new ServiceErrorImpl(status.details); + error.code = status.code; + error.metadata = status.metadata; + callback(error); + } + }); + } + + private checkOptionalUnaryResponseArguments( + arg1: Metadata|CallOptions|UnaryCallback, + arg2?: CallOptions|UnaryCallback, + arg3?: UnaryCallback): { + metadata: Metadata, + options: CallOptions, + callback: UnaryCallback + } { + if (arg1 instanceof Function) { + return {metadata: new Metadata(), options: {}, callback: arg1}; + } else if (arg2 instanceof Function) { + if (arg1 instanceof Metadata) { + return {metadata: arg1, options: {}, callback: arg2}; + } else { + return {metadata: new Metadata(), options: arg1, callback: arg2}; + } + } else { + if (!((arg1 instanceof Metadata) && (arg2 instanceof Object) && + (arg3 instanceof Function))) { + throw new Error('Incorrect arguments passed'); + } + return {metadata: arg1, options: arg2, callback: arg3}; + } + } + + makeUnaryRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, argument: RequestType, + metadata: Metadata, options: CallOptions, + callback: UnaryCallback): ClientUnaryCall; + makeUnaryRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, argument: RequestType, + metadata: Metadata, + callback: UnaryCallback): ClientUnaryCall; + makeUnaryRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, argument: RequestType, + options: CallOptions, + callback: UnaryCallback): ClientUnaryCall; + makeUnaryRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, argument: RequestType, + callback: UnaryCallback): ClientUnaryCall; + + makeUnaryRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, argument: RequestType, + metadata: Metadata|CallOptions|UnaryCallback, + options?: CallOptions|UnaryCallback, + callback?: UnaryCallback): ClientUnaryCall { + ({metadata, options, callback} = + this.checkOptionalUnaryResponseArguments( + metadata, options, callback)); + const call: CallStream = + this.channel.createStream(method, metadata, options); + const emitter: ClientUnaryCall = new ClientUnaryCallImpl(call); + const message: Buffer = serialize(argument); + const writeObj: WriteObject = {message: message}; + writeObj.flags = options.flags; + call.write(writeObj); + call.end(); + this.handleUnaryResponse(call, deserialize, callback); + return emitter; + } + + makeClientStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, metadata: Metadata, + options: CallOptions, + callback: UnaryCallback): ClientWritableStream; + makeClientStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, metadata: Metadata, + callback: UnaryCallback): ClientWritableStream; + makeClientStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, options: CallOptions, + callback: UnaryCallback): ClientWritableStream; + makeClientStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, + callback: UnaryCallback): ClientWritableStream; + + makeClientStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, + metadata: Metadata|CallOptions|UnaryCallback, + options?: CallOptions|UnaryCallback, + callback?: UnaryCallback): + ClientWritableStream { + ({metadata, options, callback} = + this.checkOptionalUnaryResponseArguments( + metadata, options, callback)); + const call: CallStream = + this.channel.createStream(method, metadata, options); + const stream: ClientWritableStream = + new ClientWritableStreamImpl(call, serialize); + this.handleUnaryResponse(call, deserialize, callback); + return stream; + } + + private checkMetadataAndOptions( + arg1?: Metadata|CallOptions, + arg2?: CallOptions): {metadata: Metadata, options: CallOptions} { + let metadata: Metadata; + let options: CallOptions; + if (arg1 instanceof Metadata) { + metadata = arg1; + if (arg2) { + options = arg2; + } else { + options = {}; + } + } else { + if (arg1) { + options = arg1; + } else { + options = {}; + } + metadata = new Metadata(); + } + return {metadata, options}; + } + + makeServerStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, argument: RequestType, + metadata: Metadata, + options?: CallOptions): ClientReadableStream; + makeServerStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, argument: RequestType, + options?: CallOptions): ClientReadableStream; + makeServerStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, argument: RequestType, + metadata?: Metadata|CallOptions, + options?: CallOptions): ClientReadableStream { + ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); + const call: CallStream = + this.channel.createStream(method, metadata, options); + const stream: ClientReadableStream = + new ClientReadableStreamImpl(call, deserialize); + const message: Buffer = serialize(argument); + const writeObj: WriteObject = {message: message}; + writeObj.flags = options.flags; + call.write(writeObj); + call.end(); + return stream; + } + + makeBidiStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, metadata: Metadata, + options?: CallOptions): ClientDuplexStream; + makeBidiStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, + options?: CallOptions): ClientDuplexStream; + makeBidiStreamRequest( + method: string, serialize: (value: RequestType) => Buffer, + deserialize: (value: Buffer) => ResponseType, + metadata?: Metadata|CallOptions, + options?: CallOptions): ClientDuplexStream { + ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); + const call: CallStream = + this.channel.createStream(method, metadata, options); + const stream: ClientDuplexStream = + new ClientDuplexStreamImpl( + call, serialize, deserialize); + return stream; + } +} diff --git a/src/object-stream.ts b/src/object-stream.ts new file mode 100644 index 00000000..356dd6da --- /dev/null +++ b/src/object-stream.ts @@ -0,0 +1,65 @@ +import { Readable, Writable, Duplex } from 'stream'; + +export interface IntermediateObjectReadable extends Readable { + read(size?: number): any & T; +} + +export interface ObjectReadable extends IntermediateObjectReadable { + read(size?: number): T; + + addListener(event: string, listener: Function): this; + emit(event: string | symbol, ...args: any[]): boolean; + on(event: string, listener: Function): this; + once(event: string, listener: Function): this; + prependListener(event: string, listener: Function): this; + prependOnceListener(event: string, listener: Function): this; + removeListener(event: string, listener: Function): this; + + addListener(event: 'data', listener: (chunk: T) => void): this; + emit(event: 'data', chunk: T): boolean; + on(event: 'data', listener: (chunk: T) => void): this; + once(event: 'data', listener: (chunk: T) => void): this; + prependListener(event: 'data', listener: (chunk: T) => void): this; + prependOnceListener(event: 'data', listener: (chunk: T) => void): this; + removeListener(event: 'data', listener: (chunk: T) => void): this; +} + +export interface IntermediateObjectWritable extends Writable { + _write(chunk: any & T, encoding: string, callback: Function): void; + write(chunk: any & T, cb?: Function): boolean; + write(chunk: any & T, encoding?: any, cb?: Function): boolean; + setDefaultEncoding(encoding: string): this; + end(): void; + end(chunk: any & T, cb?: Function): void; + end(chunk: any & T, encoding?: any, cb?: Function): void; +} + +export interface ObjectWritable extends IntermediateObjectWritable { + _write(chunk: T, encoding: string, callback: Function): void; + write(chunk: T, cb?: Function): boolean; + write(chunk: T, encoding?: any, cb?: Function): boolean; + setDefaultEncoding(encoding: string): this; + end(): void; + end(chunk: T, cb?: Function): void; + end(chunk: T, encoding?: any, cb?: Function): void; +} + +export interface ObjectDuplex extends Duplex, ObjectWritable, ObjectReadable { + read(size?: number): U; + + _write(chunk: T, encoding: string, callback: Function): void; + write(chunk: T, cb?: Function): boolean; + write(chunk: T, encoding?: any, cb?: Function): boolean; + end(): void; + end(chunk: T, cb?: Function): void; + end(chunk: T, encoding?: any, cb?: Function): void; + + + addListener(event: string, listener: Function): this; + emit(event: string | symbol, ...args: any[]): boolean; + on(event: string, listener: Function): this; + once(event: string, listener: Function): this; + prependListener(event: string, listener: Function): this; + prependOnceListener(event: string, listener: Function): this; + removeListener(event: string, listener: Function): this; +}