Merge branch 'master' into grpc-js_xds_interop_client

This commit is contained in:
Michael Lumish 2020-09-02 16:06:02 -07:00
commit a45fb43091
5 changed files with 47 additions and 23 deletions

View File

@ -33,6 +33,8 @@ export interface ChannelOptions {
'grpc.max_send_message_length'?: number;
'grpc.max_receive_message_length'?: number;
'grpc.enable_http_proxy'?: number;
'grpc.http_connect_target'?: string;
'grpc.http_connect_creds'?: string;
[key: string]: any;
}

View File

@ -120,7 +120,7 @@ export interface Channel {
interface ConnectivityStateWatcher {
currentState: ConnectivityState;
timer: NodeJS.Timeout;
timer: NodeJS.Timeout | null;
callback: (error?: Error) => void;
}
@ -417,7 +417,9 @@ export class ChannelImplementation implements Channel {
const watchersCopy = this.connectivityStateWatchers.slice();
for (const watcherObject of watchersCopy) {
if (newState !== watcherObject.currentState) {
clearTimeout(watcherObject.timer);
if(watcherObject.timer) {
clearTimeout(watcherObject.timer);
}
this.removeConnectivityStateWatcher(watcherObject);
watcherObject.callback();
}
@ -452,25 +454,29 @@ export class ChannelImplementation implements Channel {
deadline: Date | number,
callback: (error?: Error) => void
): void {
const deadlineDate: Date =
deadline instanceof Date ? deadline : new Date(deadline);
const now = new Date();
if (deadlineDate <= now) {
process.nextTick(
callback,
new Error('Deadline passed without connectivity state change')
);
return;
}
const watcherObject = {
currentState,
callback,
timer: setTimeout(() => {
let timer = null;
if(deadline !== Infinity) {
const deadlineDate: Date =
deadline instanceof Date ? deadline : new Date(deadline);
const now = new Date();
if (deadline === -Infinity || deadlineDate <= now) {
process.nextTick(
callback,
new Error('Deadline passed without connectivity state change')
);
return;
}
timer = setTimeout(() => {
this.removeConnectivityStateWatcher(watcherObject);
callback(
new Error('Deadline passed without connectivity state change')
);
}, deadlineDate.getTime() - now.getTime()),
}, deadlineDate.getTime() - now.getTime())
}
const watcherObject = {
currentState,
callback,
timer
};
this.connectivityStateWatchers.push(watcherObject);
}

View File

@ -215,8 +215,10 @@ export function getProxiedConnection(
* connection to a TLS connection.
* This is a workaround for https://github.com/nodejs/node/issues/32922
* See https://github.com/grpc/grpc-node/pull/1369 for more info. */
const remoteHost = getDefaultAuthority(parsedTarget);
const targetPath = getDefaultAuthority(parsedTarget);
const hostPort = splitHostPort(targetPath);
const remoteHost = hostPort?.host ?? targetPath;
const cts = tls.connect(
{
host: remoteHost,

View File

@ -737,12 +737,24 @@ export class Http2ServerCallStream<
} else {
this.messagesToPush.push(deserialized);
}
} catch (err) {
} catch (error) {
// Ignore any remaining messages when errors occur.
this.bufferedMessages.length = 0;
err.code = Status.INTERNAL;
readable.emit('error', err);
if (
!(
'code' in error &&
typeof error.code === 'number' &&
Number.isInteger(error.code) &&
error.code >= Status.OK &&
error.code <= Status.UNAUTHENTICATED
)
) {
// The error code is not a valid gRPC code so its being overwritten.
error.code = Status.INTERNAL;
}
readable.emit('error', error);
}
this.isPushPending = false;

View File

@ -473,11 +473,13 @@ export class Subchannel {
* if a connection is successfully established through the proxy.
* If the proxy is not used, these connectionOptions are discarded
* anyway */
connectionOptions.servername = getDefaultAuthority(
const targetPath = getDefaultAuthority(
parseUri(this.options['grpc.http_connect_target'] as string) ?? {
path: 'localhost',
}
);
const hostPort = splitHostPort(targetPath);
connectionOptions.servername = hostPort?.host ?? targetPath;
}
}
}