Merge pull request #8 from murgatroid99/client_impl

Add simple implementations for client and call classes
This commit is contained in:
Michael Lumish 2017-08-15 15:43:16 -07:00 committed by GitHub
commit 54f90fa7d1
6 changed files with 626 additions and 41 deletions

2
.gitignore vendored
View File

@ -5,3 +5,5 @@ node_modules/
npm-debug.log
yarn-error.log
yarn.lock
*~

View File

@ -1,14 +1,72 @@
import * as stream from 'stream';
import { Status } from './constants';
import {CallCredentials} from './call-credentials';
import {Status} from './constants';
import {Metadata} from './metadata';
import {ObjectDuplex} from './object-stream';
export interface CallOptions {
deadline?: Date|number;
host?: string;
credentials?: CallCredentials;
flags?: number;
}
export interface StatusObject {
code: Status;
details: string;
metadata: Metadata;
}
export interface WriteObject {
message: Buffer;
flags?: number;
}
export interface CallStream extends ObjectDuplex<WriteObject, Buffer> {
cancelWithStatus(status: Status, details: string): void;
getPeer(): string;
addListener(event: string, listener: Function): this;
emit(event: string|symbol, ...args: any[]): boolean;
on(event: string, listener: Function): this;
once(event: string, listener: Function): this;
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;
addListener(event: 'metadata', listener: (metadata: Metadata) => void): this;
emit(event: 'metadata', metadata: Metadata): boolean;
on(event: 'metadata', listener: (metadata: Metadata) => void): this;
once(event: 'metadata', listener: (metadata: Metadata) => void): this;
prependListener(event: 'metadata', listener: (metadata: Metadata) => void):
this;
prependOnceListener(
event: 'metadata', listener: (metadata: Metadata) => void): this;
removeListener(event: 'metadata', listener: (metadata: Metadata) => void):
this;
addListener(event: 'status', listener: (status: StatusObject) => void): this;
emit(event: 'status', status: StatusObject): boolean;
on(event: 'status', listener: (status: StatusObject) => void): this;
once(event: 'status', listener: (status: StatusObject) => void): this;
prependListener(event: 'status', listener: (status: StatusObject) => void):
this;
prependOnceListener(
event: 'status', listener: (status: StatusObject) => void): this;
removeListener(event: 'status', listener: (status: StatusObject) => void):
this;
}
/**
* This class represents a duplex stream associated with a single gRPC call.
*/
export class CallStream extends stream.Duplex {
/**
* Cancels the call associated with this stream with a given status.
*/
cancelWithStatus(status: Status) {
throw new Error();
export class CallStreamImpl extends stream.Duplex implements CallStream {
cancelWithStatus(status: Status, details: string): void {
throw new Error('Not yet implemented');
}
getPeer(): string {
throw new Error('Not yet implemented');
}
}

View File

@ -1,17 +1,246 @@
import * as events from 'events';
import * as stream from 'stream';
const { EventEmitter } = events;
const { Readable, Writable, Duplex } = stream;
import {EventEmitter} from 'events';
import {Duplex, Readable, Writable} from 'stream';
export interface Call {
// cancel();
// getPeer();
import {CallStream, StatusObject, WriteObject} from './call-stream';
import {Status} from './constants';
import {Metadata} from './metadata';
import {ObjectReadable, ObjectWritable} from './object-stream';
export interface ServiceError extends Error {
code?: number;
metadata?: Metadata;
}
export class ClientUnaryCall extends EventEmitter implements Call {}
export class ServiceErrorImpl extends Error implements ServiceError {
code?: number;
metadata?: Metadata;
}
export class ClientReadableStream extends Readable implements Call {}
export interface Call extends EventEmitter {
cancel(): void;
getPeer(): string;
export class ClientWritableStream extends Writable implements Call {}
addListener(event: string, listener: Function): this;
emit(event: string|symbol, ...args: any[]): boolean;
on(event: string, listener: Function): this;
once(event: string, listener: Function): this;
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;
export class ClientDuplexStream extends Duplex implements Call {}
addListener(event: 'metadata', listener: (metadata: Metadata) => void): this;
emit(event: 'metadata', metadata: Metadata): boolean;
on(event: 'metadata', listener: (metadata: Metadata) => void): this;
once(event: 'metadata', listener: (metadata: Metadata) => void): this;
prependListener(event: 'metadata', listener: (metadata: Metadata) => void):
this;
prependOnceListener(
event: 'metadata', listener: (metadata: Metadata) => void): this;
removeListener(event: 'metadata', listener: (metadata: Metadata) => void):
this;
}
export interface ClientUnaryCall extends Call {}
export class ClientUnaryCallImpl extends EventEmitter implements Call {
constructor(private readonly call: CallStream) {
super();
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
}
cancel(): void {
this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
}
getPeer(): string {
return this.call.getPeer();
}
}
export interface ClientReadableStream<ResponseType> extends
Call, ObjectReadable<ResponseType> {
deserialize: (chunk: Buffer) => ResponseType;
addListener(event: string, listener: Function): this;
emit(event: string|symbol, ...args: any[]): boolean;
on(event: string, listener: Function): this;
once(event: string, listener: Function): this;
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;
addListener(event: 'status', listener: (status: StatusObject) => void): this;
emit(event: 'status', status: StatusObject): boolean;
on(event: 'status', listener: (status: StatusObject) => void): this;
once(event: 'status', listener: (status: StatusObject) => void): this;
prependListener(event: 'status', listener: (status: StatusObject) => void):
this;
prependOnceListener(
event: 'status', listener: (status: StatusObject) => void): this;
removeListener(event: 'status', listener: (status: StatusObject) => void):
this;
}
export interface ClientWritableStream<RequestType> extends
Call, ObjectWritable<RequestType> {
serialize: (value: RequestType) => Buffer;
addListener(event: string, listener: Function): this;
emit(event: string|symbol, ...args: any[]): boolean;
on(event: string, listener: Function): this;
once(event: string, listener: Function): this;
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;
}
export interface ClientDuplexStream<RequestType, ResponseType> extends
ClientWritableStream<RequestType>, ClientReadableStream<ResponseType> {
addListener(event: string, listener: Function): this;
emit(event: string|symbol, ...args: any[]): boolean;
on(event: string, listener: Function): this;
once(event: string, listener: Function): this;
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;
}
function setUpReadableStream<ResponseType>(
stream: ClientReadableStream<ResponseType>, call: CallStream,
deserialize: (chunk: Buffer) => ResponseType): void {
call.on('data', (data: Buffer) => {
let deserialized: ResponseType;
try {
deserialized = deserialize(data);
} catch (e) {
call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response');
return;
}
if (!stream.push(deserialized)) {
call.pause();
}
});
call.on('end', () => {
stream.push(null);
});
call.on('status', (status: StatusObject) => {
stream.emit('status', status);
if (status.code !== Status.OK) {
const error = new ServiceErrorImpl(status.details);
error.code = status.code;
error.metadata = status.metadata;
stream.emit('error', error);
}
});
call.pause();
}
export class ClientReadableStreamImpl<ResponseType> extends Readable implements
ClientReadableStream<ResponseType> {
constructor(
private readonly call: CallStream,
public readonly deserialize: (chunk: Buffer) => ResponseType) {
super({objectMode: true});
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
setUpReadableStream<ResponseType>(this, call, deserialize);
}
cancel(): void {
this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
}
getPeer(): string {
return this.call.getPeer();
}
_read(_size: number): void {
this.call.resume();
}
}
function tryWrite<RequestType>(
call: CallStream, serialize: (value: RequestType) => Buffer,
chunk: RequestType, encoding: string, cb: Function) {
let message: Buffer;
const flags: number = Number(encoding);
try {
message = serialize(chunk);
} catch (e) {
call.cancelWithStatus(Status.INTERNAL, 'Serialization failure');
cb(e);
return;
}
const writeObj: WriteObject = {message: message};
if (!Number.isNaN(flags)) {
writeObj.flags = flags;
}
call.write(writeObj, cb);
}
export class ClientWritableStreamImpl<RequestType> extends Writable implements
ClientWritableStream<RequestType> {
constructor(
private readonly call: CallStream,
public readonly serialize: (value: RequestType) => Buffer) {
super({objectMode: true});
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
}
cancel(): void {
this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
}
getPeer(): string {
return this.call.getPeer();
}
_write(chunk: RequestType, encoding: string, cb: Function) {
tryWrite<RequestType>(this.call, this.serialize, chunk, encoding, cb);
}
_final(cb: Function) {
this.call.end();
cb();
}
}
export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
implements ClientDuplexStream<RequestType, ResponseType> {
constructor(
private readonly call: CallStream,
public readonly serialize: (value: RequestType) => Buffer,
public readonly deserialize: (chunk: Buffer) => ResponseType) {
super({objectMode: true});
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
setUpReadableStream<ResponseType>(this, call, deserialize);
}
cancel(): void {
this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
}
getPeer(): string {
return this.call.getPeer();
}
_read(_size: number): void {
this.call.resume();
}
_write(chunk: RequestType, encoding: string, cb: Function) {
tryWrite<RequestType>(this.call, this.serialize, chunk, encoding, cb);
}
_final(cb: Function) {
this.call.end();
cb();
}
}

View File

@ -1,25 +1,30 @@
import { CallStream } from './call-stream';
import { ChannelCredentials } from './channel-credentials';
import { Metadata } from './metadata';
import {CallOptions, CallStream} from './call-stream';
import {ChannelCredentials} from './channel-credentials';
import {Metadata} from './metadata';
/**
* An interface that contains options used when initializing a Channel instance.
*/
export interface ChannelOptions {}
export interface ChannelOptions { [index: string]: string|number; }
export class SubChannel {}
// todo: maybe we want an interface for load balancing, but no implementation for anything complicated
// todo: maybe we want an interface for load balancing, but no implementation
// for anything complicated
/**
* A class that represents a communication channel to a server specified by a given address.
* A class that represents a communication channel to a server specified by a
* given address.
*/
export class Channel {
constructor(address: string, credentials?: ChannelCredentials, options?: ChannelOptions) {
constructor(
address: string, credentials?: ChannelCredentials,
options?: ChannelOptions) {
throw new Error();
}
createStream(methodName: string, metadata: Metadata) : CallStream {
createStream(methodName: string, metadata: Metadata, options: CallOptions):
CallStream {
throw new Error();
}

View File

@ -1,19 +1,245 @@
import { ClientUnaryCall } from './call'
import { Metadata } from './metadata'
import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError, ServiceErrorImpl} from './call';
import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream';
import {Channel, ChannelOptions} from './channel';
import {ChannelCredentials} from './channel-credentials';
import {Status} from './constants';
import {Metadata} from './metadata';
export interface CallOptions {}
export interface UnaryCallback<ResponseType> {
(err: ServiceError|null, value?: ResponseType): void;
}
export class Client {
makeUnaryRequest<RequestType, ResponseType>(
method: string,
serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType,
argument: RequestType,
metadata?: Metadata,
options?: CallOptions,
callback?: (err?: Error, value?: ResponseType) => void
): ClientUnaryCall {
throw new Error();
private readonly channel: Channel;
constructor(
address: string, credentials: ChannelCredentials,
options: ChannelOptions = {}) {
if (options['grpc.primary_user_agent']) {
options['grpc.primary_user_agent'] += ' ';
} else {
options['grpc.primary_user_agent'] = '';
}
// TODO(murgatroid99): Figure out how to get version number
// options['grpc.primary_user_agent'] += 'grpc-node/' + version;
this.channel = new Channel(address, credentials, options);
}
// TODO: Do for other method types and overloads
}
close(): void {
this.channel.close();
}
waitForReady(deadline: Date|number, callback: (error: Error|null) => void):
void {
throw new Error('waitForReady is not yet implemented');
}
private handleUnaryResponse<ResponseType>(
call: CallStream, deserialize: (value: Buffer) => ResponseType,
callback: UnaryCallback<ResponseType>): void {
let responseMessage: ResponseType|null = null;
call.on('data', (data: Buffer) => {
if (responseMessage != null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
}
try {
responseMessage = deserialize(data);
} catch (e) {
call.cancelWithStatus(
Status.INTERNAL, 'Failed to parse server response');
}
});
call.on('end', () => {
if (responseMessage == null) {
call.cancelWithStatus(Status.INTERNAL, 'Not enough responses received');
}
});
call.on('status', (status: StatusObject) => {
/* We assume that call emits status after it emits end, and that it
* accounts for any cancelWithStatus calls up until it emits status.
* Therefore, considering the above event handlers, status.code should be
* OK if and only if we have a non-null responseMessage */
if (status.code === Status.OK) {
callback(null, responseMessage as ResponseType);
} else {
const error = new ServiceErrorImpl(status.details);
error.code = status.code;
error.metadata = status.metadata;
callback(error);
}
});
}
private checkOptionalUnaryResponseArguments<ResponseType>(
arg1: Metadata|CallOptions|UnaryCallback<ResponseType>,
arg2?: CallOptions|UnaryCallback<ResponseType>,
arg3?: UnaryCallback<ResponseType>): {
metadata: Metadata,
options: CallOptions,
callback: UnaryCallback<ResponseType>
} {
if (arg1 instanceof Function) {
return {metadata: new Metadata(), options: {}, callback: arg1};
} else if (arg2 instanceof Function) {
if (arg1 instanceof Metadata) {
return {metadata: arg1, options: {}, callback: arg2};
} else {
return {metadata: new Metadata(), options: arg1, callback: arg2};
}
} else {
if (!((arg1 instanceof Metadata) && (arg2 instanceof Object) &&
(arg3 instanceof Function))) {
throw new Error('Incorrect arguments passed');
}
return {metadata: arg1, options: arg2, callback: arg3};
}
}
makeUnaryRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, argument: RequestType,
metadata: Metadata, options: CallOptions,
callback: UnaryCallback<ResponseType>): ClientUnaryCall;
makeUnaryRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, argument: RequestType,
metadata: Metadata,
callback: UnaryCallback<ResponseType>): ClientUnaryCall;
makeUnaryRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, argument: RequestType,
options: CallOptions,
callback: UnaryCallback<ResponseType>): ClientUnaryCall;
makeUnaryRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, argument: RequestType,
callback: UnaryCallback<ResponseType>): ClientUnaryCall;
makeUnaryRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, argument: RequestType,
metadata: Metadata|CallOptions|UnaryCallback<ResponseType>,
options?: CallOptions|UnaryCallback<ResponseType>,
callback?: UnaryCallback<ResponseType>): ClientUnaryCall {
({metadata, options, callback} =
this.checkOptionalUnaryResponseArguments<ResponseType>(
metadata, options, callback));
const call: CallStream =
this.channel.createStream(method, metadata, options);
const emitter: ClientUnaryCall = new ClientUnaryCallImpl(call);
const message: Buffer = serialize(argument);
const writeObj: WriteObject = {message: message};
writeObj.flags = options.flags;
call.write(writeObj);
call.end();
this.handleUnaryResponse<ResponseType>(call, deserialize, callback);
return emitter;
}
makeClientStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, metadata: Metadata,
options: CallOptions,
callback: UnaryCallback<ResponseType>): ClientWritableStream<RequestType>;
makeClientStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, metadata: Metadata,
callback: UnaryCallback<ResponseType>): ClientWritableStream<RequestType>;
makeClientStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, options: CallOptions,
callback: UnaryCallback<ResponseType>): ClientWritableStream<RequestType>;
makeClientStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType,
callback: UnaryCallback<ResponseType>): ClientWritableStream<RequestType>;
makeClientStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType,
metadata: Metadata|CallOptions|UnaryCallback<ResponseType>,
options?: CallOptions|UnaryCallback<ResponseType>,
callback?: UnaryCallback<ResponseType>):
ClientWritableStream<RequestType> {
({metadata, options, callback} =
this.checkOptionalUnaryResponseArguments<ResponseType>(
metadata, options, callback));
const call: CallStream =
this.channel.createStream(method, metadata, options);
const stream: ClientWritableStream<RequestType> =
new ClientWritableStreamImpl<RequestType>(call, serialize);
this.handleUnaryResponse<ResponseType>(call, deserialize, callback);
return stream;
}
private checkMetadataAndOptions(
arg1?: Metadata|CallOptions,
arg2?: CallOptions): {metadata: Metadata, options: CallOptions} {
let metadata: Metadata;
let options: CallOptions;
if (arg1 instanceof Metadata) {
metadata = arg1;
if (arg2) {
options = arg2;
} else {
options = {};
}
} else {
if (arg1) {
options = arg1;
} else {
options = {};
}
metadata = new Metadata();
}
return {metadata, options};
}
makeServerStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, argument: RequestType,
metadata: Metadata,
options?: CallOptions): ClientReadableStream<ResponseType>;
makeServerStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, argument: RequestType,
options?: CallOptions): ClientReadableStream<ResponseType>;
makeServerStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, argument: RequestType,
metadata?: Metadata|CallOptions,
options?: CallOptions): ClientReadableStream<ResponseType> {
({metadata, options} = this.checkMetadataAndOptions(metadata, options));
const call: CallStream =
this.channel.createStream(method, metadata, options);
const stream: ClientReadableStream<ResponseType> =
new ClientReadableStreamImpl<ResponseType>(call, deserialize);
const message: Buffer = serialize(argument);
const writeObj: WriteObject = {message: message};
writeObj.flags = options.flags;
call.write(writeObj);
call.end();
return stream;
}
makeBidiStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType, metadata: Metadata,
options?: CallOptions): ClientDuplexStream<RequestType, ResponseType>;
makeBidiStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType,
options?: CallOptions): ClientDuplexStream<RequestType, ResponseType>;
makeBidiStreamRequest<RequestType, ResponseType>(
method: string, serialize: (value: RequestType) => Buffer,
deserialize: (value: Buffer) => ResponseType,
metadata?: Metadata|CallOptions,
options?: CallOptions): ClientDuplexStream<RequestType, ResponseType> {
({metadata, options} = this.checkMetadataAndOptions(metadata, options));
const call: CallStream =
this.channel.createStream(method, metadata, options);
const stream: ClientDuplexStream<RequestType, ResponseType> =
new ClientDuplexStreamImpl<RequestType, ResponseType>(
call, serialize, deserialize);
return stream;
}
}

65
src/object-stream.ts Normal file
View File

@ -0,0 +1,65 @@
import { Readable, Writable, Duplex } from 'stream';
export interface IntermediateObjectReadable<T> extends Readable {
read(size?: number): any & T;
}
export interface ObjectReadable<T> extends IntermediateObjectReadable<T> {
read(size?: number): T;
addListener(event: string, listener: Function): this;
emit(event: string | symbol, ...args: any[]): boolean;
on(event: string, listener: Function): this;
once(event: string, listener: Function): this;
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;
addListener(event: 'data', listener: (chunk: T) => void): this;
emit(event: 'data', chunk: T): boolean;
on(event: 'data', listener: (chunk: T) => void): this;
once(event: 'data', listener: (chunk: T) => void): this;
prependListener(event: 'data', listener: (chunk: T) => void): this;
prependOnceListener(event: 'data', listener: (chunk: T) => void): this;
removeListener(event: 'data', listener: (chunk: T) => void): this;
}
export interface IntermediateObjectWritable<T> extends Writable {
_write(chunk: any & T, encoding: string, callback: Function): void;
write(chunk: any & T, cb?: Function): boolean;
write(chunk: any & T, encoding?: any, cb?: Function): boolean;
setDefaultEncoding(encoding: string): this;
end(): void;
end(chunk: any & T, cb?: Function): void;
end(chunk: any & T, encoding?: any, cb?: Function): void;
}
export interface ObjectWritable<T> extends IntermediateObjectWritable<T> {
_write(chunk: T, encoding: string, callback: Function): void;
write(chunk: T, cb?: Function): boolean;
write(chunk: T, encoding?: any, cb?: Function): boolean;
setDefaultEncoding(encoding: string): this;
end(): void;
end(chunk: T, cb?: Function): void;
end(chunk: T, encoding?: any, cb?: Function): void;
}
export interface ObjectDuplex<T, U> extends Duplex, ObjectWritable<T>, ObjectReadable<U> {
read(size?: number): U;
_write(chunk: T, encoding: string, callback: Function): void;
write(chunk: T, cb?: Function): boolean;
write(chunk: T, encoding?: any, cb?: Function): boolean;
end(): void;
end(chunk: T, cb?: Function): void;
end(chunk: T, encoding?: any, cb?: Function): void;
addListener(event: string, listener: Function): this;
emit(event: string | symbol, ...args: any[]): boolean;
on(event: string, listener: Function): this;
once(event: string, listener: Function): this;
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;
}