mirror of https://github.com/grpc/grpc-node.git
Pure JS: Implement public Channel API
This commit is contained in:
parent
bbddf3d103
commit
89e47c84f7
|
@ -34,4 +34,6 @@ In addition, all channel arguments defined in [this header file](https://github.
|
|||
- `grpc.secondary_user_agent`
|
||||
- `grpc.default_authority`
|
||||
- `grpc.keepalive_time_ms`
|
||||
- `grpc.keepalive_timeout_ms`
|
||||
- `grpc.keepalive_timeout_ms`
|
||||
- `channelOverride`
|
||||
- `channelFactoryOverride`
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
## Implementations
|
||||
|
||||
For a comparison of the features available in these two libraries, see [this document](https://github.com/grpc/grpc-node/tree/master/PACKGE-COMPARISON.md)
|
||||
For a comparison of the features available in these two libraries, see [this document](https://github.com/grpc/grpc-node/tree/master/PACKAGE-COMPARISON.md)
|
||||
|
||||
### C-based Client and Server
|
||||
|
||||
|
|
|
@ -18,5 +18,14 @@ npm install @grpc/grpc-js
|
|||
- TLS channel credentials
|
||||
- Call credentials (for auth)
|
||||
- Simple reconnection
|
||||
- Channel API
|
||||
|
||||
This library does not directly handle `.proto` files. To use `.proto` files with this library we recommend using the `@grpc/proto-loader` package.
|
||||
|
||||
## Some Notes on API Guarantees
|
||||
|
||||
The public API of this library follows semantic versioning, with some caveats:
|
||||
|
||||
- Some methods are prefixed with an underscore. These methods are internal and should not be considered part of the public API.
|
||||
- The class `Call` is only exposed due to limitations of TypeScript. It should not be considered part of the public API.
|
||||
- In general, any API that is exposed by this library but is not exposed by the `grpc` library is likely an error and should not be considered part of the public API.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import {promisify} from 'util';
|
||||
|
||||
import {CallCredentials} from './call-credentials';
|
||||
import {CallStream} from './call-stream';
|
||||
import {Call} from './call-stream';
|
||||
import {Http2Channel} from './channel';
|
||||
import {BaseFilter, Filter, FilterFactory} from './filter';
|
||||
import {Metadata} from './metadata';
|
||||
|
@ -41,7 +41,7 @@ export class CallCredentialsFilterFactory implements
|
|||
this.credentials = channel.credentials.getCallCredentials();
|
||||
}
|
||||
|
||||
createFilter(callStream: CallStream): CallCredentialsFilter {
|
||||
createFilter(callStream: Call): CallCredentialsFilter {
|
||||
return new CallCredentialsFilter(
|
||||
this.credentials.compose(callStream.getCredentials()),
|
||||
callStream.getHost(), callStream.getMethod());
|
||||
|
|
|
@ -8,6 +8,8 @@ import {Filter} from './filter';
|
|||
import {FilterStackFactory} from './filter-stack';
|
||||
import {Metadata} from './metadata';
|
||||
import {ObjectDuplex, WriteCallback} from './object-stream';
|
||||
import { Meta } from 'orchestrator';
|
||||
import { Channel, Http2Channel } from './channel';
|
||||
|
||||
const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL} =
|
||||
http2.constants;
|
||||
|
@ -16,12 +18,12 @@ export type Deadline = Date|number;
|
|||
|
||||
export interface CallStreamOptions {
|
||||
deadline: Deadline;
|
||||
credentials: CallCredentials;
|
||||
flags: number;
|
||||
host: string;
|
||||
parentCall: Call | null;
|
||||
}
|
||||
|
||||
export type CallOptions = Partial<CallStreamOptions>;
|
||||
export type PartialCallStreamOptions = Partial<CallStreamOptions>;
|
||||
|
||||
export interface StatusObject {
|
||||
code: Status;
|
||||
|
@ -43,11 +45,13 @@ export interface WriteObject {
|
|||
/**
|
||||
* This interface represents a duplex stream associated with a single gRPC call.
|
||||
*/
|
||||
export type CallStream = {
|
||||
export type Call = {
|
||||
cancelWithStatus(status: Status, details: string): void; getPeer(): string;
|
||||
sendMetadata(metadata: Metadata): void;
|
||||
|
||||
getDeadline(): Deadline;
|
||||
getCredentials(): CallCredentials;
|
||||
setCredentials(credentials: CallCredentials): void;
|
||||
/* If the return value is null, the call has not ended yet. Otherwise, it has
|
||||
* ended with the specified status */
|
||||
getStatus(): StatusObject | null;
|
||||
|
@ -65,7 +69,8 @@ enum ReadState {
|
|||
|
||||
const emptyBuffer = Buffer.alloc(0);
|
||||
|
||||
export class Http2CallStream extends Duplex implements CallStream {
|
||||
export class Http2CallStream extends Duplex implements Call {
|
||||
credentials: CallCredentials = CallCredentials.createEmpty();
|
||||
filterStack: Filter;
|
||||
private statusEmitted = false;
|
||||
private http2Stream: http2.ClientHttp2Stream|null = null;
|
||||
|
@ -103,6 +108,7 @@ export class Http2CallStream extends Duplex implements CallStream {
|
|||
|
||||
constructor(
|
||||
private readonly methodName: string,
|
||||
private readonly channel: Http2Channel,
|
||||
private readonly options: CallStreamOptions,
|
||||
filterStackFactory: FilterStackFactory) {
|
||||
super({objectMode: true});
|
||||
|
@ -377,6 +383,10 @@ export class Http2CallStream extends Duplex implements CallStream {
|
|||
}
|
||||
}
|
||||
|
||||
sendMetadata(metadata: Metadata): void {
|
||||
this.channel._startHttp2Stream(this.options.host, this.methodName, this, metadata);
|
||||
}
|
||||
|
||||
private destroyHttp2Stream() {
|
||||
// The http2 stream could already have been destroyed if cancelWithStatus
|
||||
// is called in response to an internal http2 error.
|
||||
|
@ -402,7 +412,11 @@ export class Http2CallStream extends Duplex implements CallStream {
|
|||
}
|
||||
|
||||
getCredentials(): CallCredentials {
|
||||
return this.options.credentials;
|
||||
return this.credentials;
|
||||
}
|
||||
|
||||
setCredentials(credentials: CallCredentials): void {
|
||||
this.credentials = credentials;
|
||||
}
|
||||
|
||||
getStatus(): StatusObject|null {
|
||||
|
|
|
@ -2,7 +2,7 @@ import {EventEmitter} from 'events';
|
|||
import * as _ from 'lodash';
|
||||
import {Duplex, Readable, Writable} from 'stream';
|
||||
|
||||
import {CallStream, StatusObject, WriteObject} from './call-stream';
|
||||
import {Call, StatusObject, WriteObject} from './call-stream';
|
||||
import {Status} from './constants';
|
||||
import {EmitterAugmentation1} from './events';
|
||||
import {Metadata} from './metadata';
|
||||
|
@ -16,7 +16,7 @@ export type ServiceError = StatusObject&Error;
|
|||
/**
|
||||
* A base type for all user-facing values returned by client-side method calls.
|
||||
*/
|
||||
export type Call = {
|
||||
export type SurfaceCall = {
|
||||
cancel(): void; getPeer(): string;
|
||||
}&EmitterAugmentation1<'metadata', Metadata>&
|
||||
EmitterAugmentation1<'status', StatusObject>&EventEmitter;
|
||||
|
@ -24,21 +24,21 @@ export type Call = {
|
|||
/**
|
||||
* A type representing the return value of a unary method call.
|
||||
*/
|
||||
export type ClientUnaryCall = Call;
|
||||
export type ClientUnaryCall = SurfaceCall;
|
||||
|
||||
/**
|
||||
* A type representing the return value of a server stream method call.
|
||||
*/
|
||||
export type ClientReadableStream<ResponseType> = {
|
||||
deserialize: (chunk: Buffer) => ResponseType;
|
||||
}&Call&ObjectReadable<ResponseType>;
|
||||
}&SurfaceCall&ObjectReadable<ResponseType>;
|
||||
|
||||
/**
|
||||
* A type representing the return value of a client stream method call.
|
||||
*/
|
||||
export type ClientWritableStream<RequestType> = {
|
||||
serialize: (value: RequestType) => Buffer;
|
||||
}&Call&ObjectWritable<RequestType>;
|
||||
}&SurfaceCall&ObjectWritable<RequestType>;
|
||||
|
||||
/**
|
||||
* A type representing the return value of a bidirectional stream method call.
|
||||
|
@ -48,7 +48,7 @@ export type ClientDuplexStream<RequestType, ResponseType> =
|
|||
|
||||
export class ClientUnaryCallImpl extends EventEmitter implements
|
||||
ClientUnaryCall {
|
||||
constructor(private readonly call: CallStream) {
|
||||
constructor(private readonly call: Call) {
|
||||
super();
|
||||
call.on('metadata', (metadata: Metadata) => {
|
||||
this.emit('metadata', metadata);
|
||||
|
@ -68,7 +68,7 @@ export class ClientUnaryCallImpl extends EventEmitter implements
|
|||
}
|
||||
|
||||
function setUpReadableStream<ResponseType>(
|
||||
stream: ClientReadableStream<ResponseType>, call: CallStream,
|
||||
stream: ClientReadableStream<ResponseType>, call: Call,
|
||||
deserialize: (chunk: Buffer) => ResponseType): void {
|
||||
call.on('data', (data: Buffer) => {
|
||||
let deserialized: ResponseType;
|
||||
|
@ -101,7 +101,7 @@ function setUpReadableStream<ResponseType>(
|
|||
export class ClientReadableStreamImpl<ResponseType> extends Readable implements
|
||||
ClientReadableStream<ResponseType> {
|
||||
constructor(
|
||||
private readonly call: CallStream,
|
||||
private readonly call: Call,
|
||||
readonly deserialize: (chunk: Buffer) => ResponseType) {
|
||||
super({objectMode: true});
|
||||
call.on('metadata', (metadata: Metadata) => {
|
||||
|
@ -124,7 +124,7 @@ export class ClientReadableStreamImpl<ResponseType> extends Readable implements
|
|||
}
|
||||
|
||||
function tryWrite<RequestType>(
|
||||
call: CallStream, serialize: (value: RequestType) => Buffer,
|
||||
call: Call, serialize: (value: RequestType) => Buffer,
|
||||
chunk: RequestType, encoding: string, cb: Function) {
|
||||
let message: Buffer;
|
||||
const flags: number = Number(encoding);
|
||||
|
@ -145,7 +145,7 @@ function tryWrite<RequestType>(
|
|||
export class ClientWritableStreamImpl<RequestType> extends Writable implements
|
||||
ClientWritableStream<RequestType> {
|
||||
constructor(
|
||||
private readonly call: CallStream,
|
||||
private readonly call: Call,
|
||||
readonly serialize: (value: RequestType) => Buffer) {
|
||||
super({objectMode: true});
|
||||
call.on('metadata', (metadata: Metadata) => {
|
||||
|
@ -177,7 +177,7 @@ export class ClientWritableStreamImpl<RequestType> extends Writable implements
|
|||
export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
|
||||
implements ClientDuplexStream<RequestType, ResponseType> {
|
||||
constructor(
|
||||
private readonly call: CallStream,
|
||||
private readonly call: Call,
|
||||
readonly serialize: (value: RequestType) => Buffer,
|
||||
readonly deserialize: (chunk: Buffer) => ResponseType) {
|
||||
super({objectMode: true});
|
||||
|
|
|
@ -5,7 +5,7 @@ import * as url from 'url';
|
|||
|
||||
import {CallCredentials} from './call-credentials';
|
||||
import {CallCredentialsFilterFactory} from './call-credentials-filter';
|
||||
import {CallOptions, CallStream, CallStreamOptions, Http2CallStream} from './call-stream';
|
||||
import {PartialCallStreamOptions, Call, CallStreamOptions, Http2CallStream, Deadline} from './call-stream';
|
||||
import {ChannelCredentials} from './channel-credentials';
|
||||
import {CompressionFilterFactory} from './compression-filter';
|
||||
import {Status} from './constants';
|
||||
|
@ -55,22 +55,48 @@ function uniformRandom(min: number, max: number) {
|
|||
* An interface that represents a communication channel to a server specified
|
||||
* by a given address.
|
||||
*/
|
||||
export interface Channel extends EventEmitter {
|
||||
createStream(methodName: string, metadata: Metadata, options: CallOptions):
|
||||
CallStream;
|
||||
connect(): Promise<void>;
|
||||
getConnectivityState(): ConnectivityState;
|
||||
export interface Channel {
|
||||
/**
|
||||
* Close the channel. This has the same functionality as the existing grpc.Client.prototype.close
|
||||
*/
|
||||
close(): void;
|
||||
|
||||
/* tslint:disable:no-any */
|
||||
addListener(event: string, listener: Function): this;
|
||||
emit(event: string|symbol, ...args: any[]): boolean;
|
||||
on(event: string, listener: Function): this;
|
||||
once(event: string, listener: Function): this;
|
||||
prependListener(event: string, listener: Function): this;
|
||||
prependOnceListener(event: string, listener: Function): this;
|
||||
removeListener(event: string, listener: Function): this;
|
||||
/* tslint:enable:no-any */
|
||||
/**
|
||||
* Return the target that this channel connects to
|
||||
*/
|
||||
getTarget(): string;
|
||||
/**
|
||||
* Get the channel's current connectivity state. This method is here mainly
|
||||
* because it is in the existing internal Channel class, and there isn't
|
||||
* another good place to put it.
|
||||
* @param tryToConnect If true, the channel will start connecting if it is
|
||||
* idle. Otherwise, idle channels will only start connecting when a
|
||||
* call starts.
|
||||
*/
|
||||
getConnectivityState(tryToConnect: boolean): ConnectivityState;
|
||||
/**
|
||||
* Watch for connectivity state changes. This is also here mainly because
|
||||
* it is in the existing external Channel class.
|
||||
* @param currentState The state to watch for transitions from. This should
|
||||
* always be populated by calling getConnectivityState immediately
|
||||
* before.
|
||||
* @param deadline A deadline for waiting for a state change
|
||||
* @param callback Called with no error when a state change, or with an
|
||||
* error if the deadline passes without a state change.
|
||||
*/
|
||||
watchConnectivityState(currentState: ConnectivityState, deadline: Date|number, callback: (error?: Error) => void): void;
|
||||
/**
|
||||
* Create a call object. Call is an opaque type that is used by the Client
|
||||
* class. This function is called by the gRPC library when starting a
|
||||
* request. Implementers should return an instance of Call that is returned
|
||||
* from calling createCall on an instance of the provided Channel class.
|
||||
* @param method The full method string to request.
|
||||
* @param deadline The call deadline
|
||||
* @param host A host string override for making the request
|
||||
* @param parentCall A server call to propagate some information from
|
||||
* @param propagateFlags A bitwise combination of elements of grpc.propagate
|
||||
* that indicates what information to propagate from parentCall.
|
||||
*/
|
||||
createCall(method: string, deadline: Deadline|null|undefined, host: string|null|undefined, parentCall: Call|null|undefined, propagateFlags: number|null|undefined): Call;
|
||||
}
|
||||
|
||||
export class Http2Channel extends EventEmitter implements Channel {
|
||||
|
@ -234,7 +260,7 @@ export class Http2Channel extends EventEmitter implements Channel {
|
|||
].filter(e => e).join(' '); // remove falsey values first
|
||||
}
|
||||
|
||||
private startHttp2Stream(
|
||||
_startHttp2Stream(
|
||||
authority: string, methodName: string, stream: Http2CallStream,
|
||||
metadata: Metadata) {
|
||||
const finalMetadata: Promise<Metadata> =
|
||||
|
@ -255,7 +281,7 @@ export class Http2Channel extends EventEmitter implements Channel {
|
|||
/* In this case, we lost the connection while finalizing
|
||||
* metadata. That should be very unusual */
|
||||
setImmediate(() => {
|
||||
this.startHttp2Stream(authority, methodName, stream, metadata);
|
||||
this._startHttp2Stream(authority, methodName, stream, metadata);
|
||||
});
|
||||
}
|
||||
})
|
||||
|
@ -268,20 +294,19 @@ export class Http2Channel extends EventEmitter implements Channel {
|
|||
});
|
||||
}
|
||||
|
||||
createStream(methodName: string, metadata: Metadata, options: CallOptions):
|
||||
CallStream {
|
||||
createCall(method: string, deadline: Deadline|null|undefined, host: string|null|undefined, parentCall: Call|null|undefined, propagateFlags: number|null|undefined):
|
||||
Call {
|
||||
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
||||
throw new Error('Channel has been shut down');
|
||||
}
|
||||
const finalOptions: CallStreamOptions = {
|
||||
deadline: options.deadline === undefined ? Infinity : options.deadline,
|
||||
credentials: options.credentials || CallCredentials.createEmpty(),
|
||||
flags: options.flags || 0,
|
||||
host: options.host || this.defaultAuthority
|
||||
deadline: (deadline === null || deadline == undefined) ? Infinity : deadline,
|
||||
flags: propagateFlags || 0,
|
||||
host: host || this.defaultAuthority,
|
||||
parentCall: parentCall || null
|
||||
};
|
||||
const stream: Http2CallStream =
|
||||
new Http2CallStream(methodName, finalOptions, this.filterStackFactory);
|
||||
this.startHttp2Stream(finalOptions.host, methodName, stream, metadata);
|
||||
new Http2CallStream(method, this, finalOptions, this.filterStackFactory);
|
||||
return stream;
|
||||
}
|
||||
|
||||
|
@ -289,7 +314,7 @@ export class Http2Channel extends EventEmitter implements Channel {
|
|||
* Attempts to connect, returning a Promise that resolves when the connection
|
||||
* is successful, or rejects if the channel is shut down.
|
||||
*/
|
||||
connect(): Promise<void> {
|
||||
private connect(): Promise<void> {
|
||||
if (this.connectivityState === ConnectivityState.READY) {
|
||||
return Promise.resolve();
|
||||
} else if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
||||
|
@ -320,10 +345,45 @@ export class Http2Channel extends EventEmitter implements Channel {
|
|||
}
|
||||
}
|
||||
|
||||
getConnectivityState(): ConnectivityState {
|
||||
getConnectivityState(tryToConnect: boolean): ConnectivityState {
|
||||
if (tryToConnect) {
|
||||
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING);
|
||||
}
|
||||
return this.connectivityState;
|
||||
}
|
||||
|
||||
watchConnectivityState(currentState: ConnectivityState, deadline: Date|number, callback: (error?: Error)=>void) {
|
||||
if (this.connectivityState !== currentState) {
|
||||
/* If the connectivity state is different from the provided currentState,
|
||||
* we assume that a state change has successfully occurred */
|
||||
setImmediate(callback);
|
||||
} else {
|
||||
let deadlineMs: number = 0;
|
||||
if (deadline instanceof Date) {
|
||||
deadlineMs = deadline.getTime();
|
||||
} else {
|
||||
deadlineMs = deadline;
|
||||
}
|
||||
let timeout: number = deadlineMs - Date.now();
|
||||
if (timeout < 0) {
|
||||
timeout = 0;
|
||||
}
|
||||
let timeoutId = setTimeout(() => {
|
||||
this.removeListener('connectivityStateChanged', eventCb);
|
||||
callback(new Error('Channel state did not change before deadline'));
|
||||
}, timeout);
|
||||
let eventCb = () => {
|
||||
clearTimeout(timeoutId);
|
||||
callback();
|
||||
};
|
||||
this.once('connectivityStateChanged', eventCb);
|
||||
}
|
||||
}
|
||||
|
||||
getTarget() {
|
||||
return this.target.toString();
|
||||
}
|
||||
|
||||
close() {
|
||||
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
||||
throw new Error('Channel has been shut down');
|
||||
|
|
|
@ -2,12 +2,13 @@ import {once} from 'lodash';
|
|||
import {URL} from 'url';
|
||||
|
||||
import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError} from './call';
|
||||
import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream';
|
||||
import {Channel, Http2Channel} from './channel';
|
||||
import {PartialCallStreamOptions, Call, StatusObject, WriteObject, Deadline} from './call-stream';
|
||||
import {Channel, Http2Channel, ConnectivityState} from './channel';
|
||||
import {ChannelCredentials} from './channel-credentials';
|
||||
import {Status} from './constants';
|
||||
import {Metadata} from './metadata';
|
||||
import {ChannelOptions} from './channel-options';
|
||||
import { CallCredentials } from './call-credentials';
|
||||
|
||||
// This symbol must be exported (for now).
|
||||
// See: https://github.com/Microsoft/TypeScript/issues/20080
|
||||
|
@ -17,6 +18,19 @@ export interface UnaryCallback<ResponseType> {
|
|||
(err: ServiceError|null, value?: ResponseType): void;
|
||||
}
|
||||
|
||||
export interface CallOptions {
|
||||
deadline?: Deadline,
|
||||
host?: string,
|
||||
parent?: Call,
|
||||
propagate_flags?: number,
|
||||
credentials?: CallCredentials
|
||||
}
|
||||
|
||||
export type ClientOptions = Partial<ChannelOptions> & {
|
||||
channelOverride?: Channel,
|
||||
channelFactoryOverride?: (address: string, credentials: ChannelCredentials, options: ClientOptions) => Channel
|
||||
};
|
||||
|
||||
/**
|
||||
* A generic gRPC client. Primarily useful as a base class for all generated
|
||||
* clients.
|
||||
|
@ -25,52 +39,53 @@ export class Client {
|
|||
private readonly[kChannel]: Channel;
|
||||
constructor(
|
||||
address: string, credentials: ChannelCredentials,
|
||||
options: Partial<ChannelOptions> = {}) {
|
||||
this[kChannel] = new Http2Channel(address, credentials, options);
|
||||
options: ClientOptions = {}) {
|
||||
if (options.channelOverride) {
|
||||
this[kChannel] = options.channelOverride;
|
||||
} else if (options.channelFactoryOverride) {
|
||||
this[kChannel] = options.channelFactoryOverride(address, credentials, options);
|
||||
} else {
|
||||
this[kChannel] = new Http2Channel(address, credentials, options);
|
||||
}
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this[kChannel].close();
|
||||
}
|
||||
|
||||
waitForReady(deadline: Date|number, callback: (error: Error|null) => void):
|
||||
void {
|
||||
const cb: (error: Error|null) => void = once(callback);
|
||||
const callbackCalled = false;
|
||||
let timer: NodeJS.Timer|null = null;
|
||||
this[kChannel].connect().then(
|
||||
() => {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
getChannel(): Channel {
|
||||
return this[kChannel];
|
||||
}
|
||||
|
||||
waitForReady(deadline: Deadline, callback: (error?: Error) => void):
|
||||
void {
|
||||
const checkState = (err?: Error) => {
|
||||
if (err) {
|
||||
callback(new Error('Failed to connect before the deadline'));
|
||||
return;
|
||||
}
|
||||
var new_state;
|
||||
try {
|
||||
new_state = this[kChannel].getConnectivityState(true);
|
||||
} catch (e) {
|
||||
callback(new Error('The channel has been closed'));
|
||||
return;
|
||||
}
|
||||
if (new_state === ConnectivityState.READY) {
|
||||
callback();
|
||||
} else {
|
||||
try {
|
||||
this[kChannel].watchConnectivityState(new_state, deadline, checkState);
|
||||
} catch (e) {
|
||||
callback(new Error('The channel has been closed'));
|
||||
}
|
||||
cb(null);
|
||||
},
|
||||
(err: Error) => {
|
||||
// Rejection occurs if channel is shut down first.
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
cb(err);
|
||||
});
|
||||
if (deadline !== Infinity) {
|
||||
let timeout: number;
|
||||
const now: number = (new Date()).getTime();
|
||||
if (deadline instanceof Date) {
|
||||
timeout = deadline.getTime() - now;
|
||||
} else {
|
||||
timeout = deadline - now;
|
||||
}
|
||||
if (timeout < 0) {
|
||||
timeout = 0;
|
||||
}
|
||||
timer = setTimeout(() => {
|
||||
cb(new Error('Failed to connect before the deadline'));
|
||||
}, timeout);
|
||||
}
|
||||
}
|
||||
};
|
||||
setImmediate(checkState);
|
||||
}
|
||||
|
||||
private handleUnaryResponse<ResponseType>(
|
||||
call: CallStream, deserialize: (value: Buffer) => ResponseType,
|
||||
call: Call, deserialize: (value: Buffer) => ResponseType,
|
||||
callback: UnaryCallback<ResponseType>): void {
|
||||
let responseMessage: ResponseType|null = null;
|
||||
call.on('data', (data: Buffer) => {
|
||||
|
@ -157,11 +172,14 @@ export class Client {
|
|||
({metadata, options, callback} =
|
||||
this.checkOptionalUnaryResponseArguments<ResponseType>(
|
||||
metadata, options, callback));
|
||||
const call: CallStream =
|
||||
this[kChannel].createStream(method, metadata, options);
|
||||
const call: Call =
|
||||
this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags);
|
||||
if (options.credentials) {
|
||||
call.setCredentials(options.credentials);
|
||||
}
|
||||
const message: Buffer = serialize(argument);
|
||||
const writeObj: WriteObject = {message};
|
||||
writeObj.flags = options.flags;
|
||||
call.sendMetadata(metadata);
|
||||
call.write(writeObj);
|
||||
call.end();
|
||||
this.handleUnaryResponse<ResponseType>(call, deserialize, callback);
|
||||
|
@ -195,8 +213,12 @@ export class Client {
|
|||
({metadata, options, callback} =
|
||||
this.checkOptionalUnaryResponseArguments<ResponseType>(
|
||||
metadata, options, callback));
|
||||
const call: CallStream =
|
||||
this[kChannel].createStream(method, metadata, options);
|
||||
const call: Call =
|
||||
this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags);
|
||||
if (options.credentials) {
|
||||
call.setCredentials(options.credentials);
|
||||
}
|
||||
call.sendMetadata(metadata);
|
||||
this.handleUnaryResponse<ResponseType>(call, deserialize, callback);
|
||||
return new ClientWritableStreamImpl<RequestType>(call, serialize);
|
||||
}
|
||||
|
@ -239,11 +261,14 @@ export class Client {
|
|||
metadata?: Metadata|CallOptions,
|
||||
options?: CallOptions): ClientReadableStream<ResponseType> {
|
||||
({metadata, options} = this.checkMetadataAndOptions(metadata, options));
|
||||
const call: CallStream =
|
||||
this[kChannel].createStream(method, metadata, options);
|
||||
const call: Call =
|
||||
this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags);
|
||||
if (options.credentials) {
|
||||
call.setCredentials(options.credentials);
|
||||
}
|
||||
const message: Buffer = serialize(argument);
|
||||
const writeObj: WriteObject = {message};
|
||||
writeObj.flags = options.flags;
|
||||
call.sendMetadata(metadata);
|
||||
call.write(writeObj);
|
||||
call.end();
|
||||
return new ClientReadableStreamImpl<ResponseType>(call, deserialize);
|
||||
|
@ -263,8 +288,12 @@ export class Client {
|
|||
metadata?: Metadata|CallOptions,
|
||||
options?: CallOptions): ClientDuplexStream<RequestType, ResponseType> {
|
||||
({metadata, options} = this.checkMetadataAndOptions(metadata, options));
|
||||
const call: CallStream =
|
||||
this[kChannel].createStream(method, metadata, options);
|
||||
const call: Call =
|
||||
this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags);
|
||||
if (options.credentials) {
|
||||
call.setCredentials(options.credentials);
|
||||
}
|
||||
call.sendMetadata(metadata);
|
||||
return new ClientDuplexStreamImpl<RequestType, ResponseType>(
|
||||
call, serialize, deserialize);
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import * as zlib from 'zlib';
|
||||
|
||||
import {CallStream, WriteFlags, WriteObject} from './call-stream';
|
||||
import {Call, WriteFlags, WriteObject} from './call-stream';
|
||||
import {Channel} from './channel';
|
||||
import {Status} from './constants';
|
||||
import {BaseFilter, Filter, FilterFactory} from './filter';
|
||||
|
@ -189,7 +189,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
|
|||
export class CompressionFilterFactory implements
|
||||
FilterFactory<CompressionFilter> {
|
||||
constructor(private readonly channel: Channel) {}
|
||||
createFilter(callStream: CallStream): CompressionFilter {
|
||||
createFilter(callStream: Call): CompressionFilter {
|
||||
return new CompressionFilter();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import {CallStream} from './call-stream';
|
||||
import {Channel, Http2Channel} from './channel';
|
||||
import {Call} from './call-stream';
|
||||
import {Channel, Http2Channel, ConnectivityState} from './channel';
|
||||
import {Status} from './constants';
|
||||
import {BaseFilter, Filter, FilterFactory} from './filter';
|
||||
import {Metadata} from './metadata';
|
||||
|
@ -24,7 +24,7 @@ export class DeadlineFilter extends BaseFilter implements Filter {
|
|||
private deadline: number;
|
||||
constructor(
|
||||
private readonly channel: Http2Channel,
|
||||
private readonly callStream: CallStream) {
|
||||
private readonly callStream: Call) {
|
||||
super();
|
||||
const callDeadline = callStream.getDeadline();
|
||||
if (callDeadline instanceof Date) {
|
||||
|
@ -46,22 +46,34 @@ export class DeadlineFilter extends BaseFilter implements Filter {
|
|||
}
|
||||
}
|
||||
|
||||
async sendMetadata(metadata: Promise<Metadata>) {
|
||||
sendMetadata(metadata: Promise<Metadata>) {
|
||||
if (this.deadline === Infinity) {
|
||||
return await metadata;
|
||||
return metadata;
|
||||
}
|
||||
await this.channel.connect();
|
||||
const timeoutString = getDeadline(this.deadline);
|
||||
const finalMetadata = await metadata;
|
||||
finalMetadata.set('grpc-timeout', timeoutString);
|
||||
return finalMetadata;
|
||||
return new Promise<Metadata>((resolve, reject) => {
|
||||
if (this.channel.getConnectivityState(false) === ConnectivityState.READY) {
|
||||
resolve(metadata);
|
||||
} else {
|
||||
const handleStateChange = (newState: ConnectivityState) => {
|
||||
if (newState === ConnectivityState.READY) {
|
||||
resolve(metadata);
|
||||
this.channel.removeListener('connectivityStateChanged', handleStateChange);
|
||||
}
|
||||
};
|
||||
this.channel.on('connectivityStateChanged', handleStateChange);
|
||||
}
|
||||
}).then((finalMetadata: Metadata) => {
|
||||
const timeoutString = getDeadline(this.deadline);
|
||||
finalMetadata.set('grpc-timeout', timeoutString);
|
||||
return finalMetadata;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export class DeadlineFilterFactory implements FilterFactory<DeadlineFilter> {
|
||||
constructor(private readonly channel: Http2Channel) {}
|
||||
|
||||
createFilter(callStream: CallStream): DeadlineFilter {
|
||||
createFilter(callStream: Call): DeadlineFilter {
|
||||
return new DeadlineFilter(this.channel, callStream);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import {flow, flowRight, map} from 'lodash';
|
||||
|
||||
import {CallStream, StatusObject, WriteObject} from './call-stream';
|
||||
import {Call, StatusObject, WriteObject} from './call-stream';
|
||||
import {Filter, FilterFactory} from './filter';
|
||||
import {Metadata} from './metadata';
|
||||
|
||||
|
@ -37,7 +37,7 @@ export class FilterStack implements Filter {
|
|||
export class FilterStackFactory implements FilterFactory<FilterStack> {
|
||||
constructor(private readonly factories: Array<FilterFactory<Filter>>) {}
|
||||
|
||||
createFilter(callStream: CallStream): FilterStack {
|
||||
createFilter(callStream: Call): FilterStack {
|
||||
return new FilterStack(
|
||||
map(this.factories, (factory) => factory.createFilter(callStream)));
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import {CallStream, StatusObject, WriteObject} from './call-stream';
|
||||
import {Call, StatusObject, WriteObject} from './call-stream';
|
||||
import {Metadata} from './metadata';
|
||||
|
||||
/**
|
||||
|
@ -40,5 +40,5 @@ export abstract class BaseFilter {
|
|||
}
|
||||
|
||||
export interface FilterFactory<T extends Filter> {
|
||||
createFilter(callStream: CallStream): T;
|
||||
createFilter(callStream: Call): T;
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import {Client} from './client';
|
|||
import {Status} from './constants';
|
||||
import {loadPackageDefinition, makeClientConstructor} from './make-client';
|
||||
import {Metadata} from './metadata';
|
||||
import { Channel } from './channel';
|
||||
|
||||
interface IndexedObject {
|
||||
[key: string]: any;
|
||||
|
@ -105,7 +106,8 @@ export {
|
|||
Client,
|
||||
loadPackageDefinition,
|
||||
makeClientConstructor,
|
||||
makeClientConstructor as makeGenericClientConstructor
|
||||
makeClientConstructor as makeGenericClientConstructor,
|
||||
Channel
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -116,7 +118,7 @@ export const closeClient = (client: Client) => client.close();
|
|||
|
||||
export const waitForClientReady =
|
||||
(client: Client, deadline: Date|number,
|
||||
callback: (error: Error|null) => void) =>
|
||||
callback: (error?: Error) => void) =>
|
||||
client.waitForReady(deadline, callback);
|
||||
|
||||
/**** Unimplemented function stubs ****/
|
||||
|
@ -155,8 +157,8 @@ export const ServerCredentials = {
|
|||
}
|
||||
};
|
||||
|
||||
export const getClientChannel = (client: any) => {
|
||||
throw new Error('Not available in this library');
|
||||
export const getClientChannel = (client: Client) => {
|
||||
return Client.prototype.getChannel.call(client);
|
||||
};
|
||||
|
||||
export const StatusBuilder = () => {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import * as _ from 'lodash';
|
||||
|
||||
import {CallOptions} from './call-stream';
|
||||
import {PartialCallStreamOptions} from './call-stream';
|
||||
import {ChannelOptions} from './channel-options';
|
||||
import {ChannelCredentials} from './channel-credentials';
|
||||
import {Client, UnaryCallback} from './client';
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import {CallStream} from './call-stream';
|
||||
import {Call} from './call-stream';
|
||||
import {StatusObject} from './call-stream';
|
||||
import {Channel} from './channel';
|
||||
import {Status} from './constants';
|
||||
|
@ -31,7 +31,7 @@ export class MetadataStatusFilter extends BaseFilter implements Filter {
|
|||
export class MetadataStatusFilterFactory implements
|
||||
FilterFactory<MetadataStatusFilter> {
|
||||
constructor(private readonly channel: Channel) {}
|
||||
createFilter(callStream: CallStream): MetadataStatusFilter {
|
||||
createFilter(callStream: Call): MetadataStatusFilter {
|
||||
return new MetadataStatusFilter();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ import * as url from 'url';
|
|||
|
||||
import { EventEmitter } from "events";
|
||||
import { Metadata } from "./metadata";
|
||||
import { CallStream, CallOptions, Http2CallStream } from "./call-stream";
|
||||
import { Call, PartialCallStreamOptions, Http2CallStream } from "./call-stream";
|
||||
import { EmitterAugmentation1, EmitterAugmentation0 } from "./events";
|
||||
import { ChannelOptions } from './channel-options';
|
||||
|
||||
|
@ -29,7 +29,7 @@ export interface SubChannel extends EventEmitter {
|
|||
* @param headers The headers to start the stream with
|
||||
* @param callStream The stream to start
|
||||
*/
|
||||
startCallStream(metadata: Metadata, callStream: CallStream): void;
|
||||
startCallStream(metadata: Metadata, callStream: Call): void;
|
||||
close(): void;
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ import * as stream from 'stream';
|
|||
|
||||
import {CallCredentials} from '../src/call-credentials';
|
||||
import {Http2CallStream} from '../src/call-stream';
|
||||
import {Channel} from '../src/channel';
|
||||
import {Channel, Http2Channel} from '../src/channel';
|
||||
import {CompressionFilterFactory} from '../src/compression-filter';
|
||||
import {Status} from '../src/constants';
|
||||
import {FilterStackFactory} from '../src/filter-stack';
|
||||
|
@ -81,9 +81,9 @@ class ClientHttp2StreamMock extends stream.Duplex implements
|
|||
describe('CallStream', () => {
|
||||
const callStreamArgs = {
|
||||
deadline: Infinity,
|
||||
credentials: CallCredentials.createEmpty(),
|
||||
flags: 0,
|
||||
host: ''
|
||||
host: '',
|
||||
parentCall: null
|
||||
};
|
||||
/* A CompressionFilter is now necessary to frame and deframe messages.
|
||||
* Currently the channel is unused, so we can replace it with an empty object,
|
||||
|
@ -102,7 +102,7 @@ describe('CallStream', () => {
|
|||
const responseMetadata = new Metadata();
|
||||
responseMetadata.add('key', 'value');
|
||||
const callStream =
|
||||
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
|
||||
new Http2CallStream('foo', <Http2Channel>{}, callStreamArgs, filterStackFactory);
|
||||
|
||||
const http2Stream = new ClientHttp2StreamMock(
|
||||
{payload: Buffer.alloc(0), frameLengths: []});
|
||||
|
@ -140,7 +140,7 @@ describe('CallStream', () => {
|
|||
maybeSkip(it)(`for error code ${key}`, () => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const callStream =
|
||||
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
|
||||
new Http2CallStream('foo', <Http2Channel>{}, callStreamArgs, filterStackFactory);
|
||||
const http2Stream = new ClientHttp2StreamMock(
|
||||
{payload: Buffer.alloc(0), frameLengths: []});
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
|
@ -160,10 +160,12 @@ describe('CallStream', () => {
|
|||
|
||||
it('should have functioning getters', (done) => {
|
||||
const callStream =
|
||||
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
|
||||
new Http2CallStream('foo', <Http2Channel>{}, callStreamArgs, filterStackFactory);
|
||||
assert.strictEqual(callStream.getDeadline(), callStreamArgs.deadline);
|
||||
assert.strictEqual(callStream.getCredentials(), callStreamArgs.credentials);
|
||||
assert.strictEqual(callStream.getStatus(), null);
|
||||
const credentials = CallCredentials.createEmpty();
|
||||
callStream.setCredentials(credentials);
|
||||
assert.strictEqual(callStream.getCredentials(), credentials);
|
||||
callStream.on('status', assert2.mustCall((status) => {
|
||||
assert.strictEqual(status.code, Status.CANCELLED);
|
||||
assert.strictEqual(status.details, ';)');
|
||||
|
@ -177,7 +179,7 @@ describe('CallStream', () => {
|
|||
describe('attachHttp2Stream', () => {
|
||||
it('should handle an empty message', (done) => {
|
||||
const callStream =
|
||||
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
|
||||
new Http2CallStream('foo', <Http2Channel>{}, callStreamArgs, filterStackFactory);
|
||||
const http2Stream =
|
||||
new ClientHttp2StreamMock({payload: serialize(''), frameLengths: []});
|
||||
callStream.once('data', assert2.mustCall((buffer) => {
|
||||
|
@ -204,7 +206,7 @@ describe('CallStream', () => {
|
|||
it(`should handle a short message where ${testCase.description}`,
|
||||
(done) => {
|
||||
const callStream =
|
||||
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
|
||||
new Http2CallStream('foo', <Http2Channel>{}, callStreamArgs, filterStackFactory);
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: serialize(message), // 21 bytes
|
||||
frameLengths: testCase.frameLengths
|
||||
|
@ -234,7 +236,7 @@ describe('CallStream', () => {
|
|||
}].forEach((testCase: {description: string, frameLengths: number[]}) => {
|
||||
it(`should handle two messages where ${testCase.description}`, (done) => {
|
||||
const callStream =
|
||||
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
|
||||
new Http2CallStream('foo', <Http2Channel>{}, callStreamArgs, filterStackFactory);
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: Buffer.concat(
|
||||
[serialize(message), serialize(message)]), // 42 bytes
|
||||
|
@ -254,7 +256,7 @@ describe('CallStream', () => {
|
|||
|
||||
it('should send buffered writes', (done) => {
|
||||
const callStream =
|
||||
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
|
||||
new Http2CallStream('foo', <Http2Channel>{}, callStreamArgs, filterStackFactory);
|
||||
const http2Stream = new ClientHttp2StreamMock(
|
||||
{payload: Buffer.alloc(0), frameLengths: []});
|
||||
let streamFlushed = false;
|
||||
|
@ -277,7 +279,7 @@ describe('CallStream', () => {
|
|||
it('should cause data chunks in write calls afterward to be written to the given stream',
|
||||
(done) => {
|
||||
const callStream =
|
||||
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
|
||||
new Http2CallStream('foo', <Http2Channel>{}, callStreamArgs, filterStackFactory);
|
||||
const http2Stream = new ClientHttp2StreamMock(
|
||||
{payload: Buffer.alloc(0), frameLengths: []});
|
||||
http2Stream.once('write', assert2.mustCall((chunk: Buffer) => {
|
||||
|
@ -295,7 +297,7 @@ describe('CallStream', () => {
|
|||
|
||||
it('should handle underlying stream errors', () => {
|
||||
const callStream =
|
||||
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
|
||||
new Http2CallStream('foo', <Http2Channel>{}, callStreamArgs, filterStackFactory);
|
||||
const http2Stream = new ClientHttp2StreamMock(
|
||||
{payload: Buffer.alloc(0), frameLengths: []});
|
||||
callStream.once('status', assert2.mustCall((status) => {
|
||||
|
|
Loading…
Reference in New Issue