Merge pull request #275 from kjin/grpc-js-connect

grpc-js-core: only listen for channel connect event once
This commit is contained in:
Michael Lumish 2018-04-17 11:24:40 -07:00 committed by GitHub
commit 8176c70a94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 8 deletions

View File

@ -87,13 +87,13 @@ function setUpReadableStream<ResponseType>(
stream.push(null);
});
call.on('status', (status: StatusObject) => {
stream.emit('status', status);
if (status.code !== Status.OK) {
const statusName = _.invert(Status)[status.code];
const message: string = `${status.code} ${statusName}: ${status.details}`;
const error: ServiceError = Object.assign(new Error(status.details), status);
stream.emit('error', error);
}
stream.emit('status', status);
});
call.pause();
}

View File

@ -85,6 +85,8 @@ export class Http2Channel extends EventEmitter implements Channel {
private readonly target: url.URL;
private readonly defaultAuthority: string;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
// Helper Promise object only used in the implementation of connect().
private connecting: Promise<void>|null = null;
/* For now, we have up to one subchannel, which will exist as long as we are
* connecting or trying to connect */
private subChannel: http2.ClientHttp2Session|null = null;
@ -127,6 +129,7 @@ export class Http2Channel extends EventEmitter implements Channel {
this.subChannel.removeListener('connect', this.subChannelConnectCallback);
this.subChannel.removeListener('close', this.subChannelCloseCallback);
this.subChannel = null;
this.emit('shutdown');
clearTimeout(this.backoffTimerId);
}
break;
@ -279,15 +282,38 @@ export class Http2Channel extends EventEmitter implements Channel {
return stream;
}
/**
* Attempts to connect, returning a Promise that resolves when the connection
* is successful, or rejects if the channel is shut down.
*/
connect(): Promise<void> {
return new Promise((resolve) => {
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING);
if (this.connectivityState === ConnectivityState.READY) {
setImmediate(resolve);
} else {
this.once('connect', resolve);
if (this.connectivityState === ConnectivityState.READY) {
return Promise.resolve();
} else if (this.connectivityState === ConnectivityState.SHUTDOWN) {
return Promise.reject(new Error('Channel has been shut down'));
} else {
// In effect, this.connecting is only assigned upon the first attempt to
// transition from IDLE to CONNECTING, so this condition could have also
// been (connectivityState === IDLE).
if (!this.connecting) {
this.connecting = new Promise((resolve, reject) => {
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING);
const onConnect = () => {
this.connecting = null;
this.removeListener('shutdown', onShutdown);
resolve();
};
const onShutdown = () => {
this.connecting = null;
this.removeListener('connect', onConnect);
reject(new Error('Channel has been shut down'));
};
this.once('connect', onConnect);
this.once('shutdown', onShutdown);
});
}
});
return this.connecting;
}
}
getConnectivityState(): ConnectivityState {

View File

@ -42,6 +42,12 @@ export class Client {
clearTimeout(timer);
}
cb(null);
}, (err: Error) => {
// Rejection occurs if channel is shut down first.
if (timer) {
clearTimeout(timer);
}
cb(err);
});
if (deadline !== Infinity) {
let timeout: number;