Merge branch 'master' into grpc-js_weighted_target_lb

This commit is contained in:
Michael Lumish 2020-06-09 10:38:30 -07:00
commit 29d7b4dc16
10 changed files with 556 additions and 73 deletions

View File

@ -43,5 +43,6 @@ In addition, all channel arguments defined in [this header file](https://github.
- `grpc.use_local_subchannel_pool`
- `grpc.max_send_message_length`
- `grpc.max_receive_message_length`
- `grpc.enable_http_proxy`
- `channelOverride`
- `channelFactoryOverride`

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.0.4",
"version": "1.0.5",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
@ -26,6 +26,7 @@
"@types/semver": "^6.0.1",
"clang-format": "^1.0.55",
"execa": "^2.0.3",
"google-auth-library": "^6.0.0",
"gts": "^2.0.0",
"gulp": "^4.0.2",
"gulp-mocha": "^6.0.0",
@ -56,9 +57,11 @@
"posttest": "npm run check"
},
"dependencies": {
"google-auth-library": "^6.0.0",
"semver": "^6.2.0"
},
"peerDependencies": {
"google-auth-library": "5.x || 6.x"
},
"files": [
"src/*.ts",
"build/src/*.{js,d.ts,js.map}",

View File

@ -289,6 +289,13 @@ export class Http2CallStream implements Call {
);
this.canPush = false;
process.nextTick(() => {
/* If we have already output the status any later messages should be
* ignored, and can cause out-of-order operation errors higher up in the
* stack. Checking as late as possible here to avoid any race conditions.
*/
if (this.statusOutput) {
return;
}
this.listener?.onReceiveMessage(message);
this.maybeOutputStatus();
});

View File

@ -32,6 +32,7 @@ export interface ChannelOptions {
'grpc.use_local_subchannel_pool'?: number;
'grpc.max_send_message_length'?: number;
'grpc.max_receive_message_length'?: number;
'grpc.enable_http_proxy'?: number;
[key: string]: string | number | undefined;
}
@ -53,6 +54,7 @@ export const recognizedOptions = {
'grpc.use_local_subchannel_pool': true,
'grpc.max_send_message_length': true,
'grpc.max_receive_message_length': true,
'grpc.enable_http_proxy': true,
};
export function channelOptionsEqual(

View File

@ -125,6 +125,9 @@ export function mapProxyName(
target: target,
extraOptions: {},
};
if ((options['grpc.enable_http_proxy'] ?? 1) === 0) {
return noProxyResult;
}
const proxyInfo = getProxyInfo();
if (!proxyInfo.address) {
return noProxyResult;

View File

@ -31,6 +31,7 @@ import { ChannelCredentials } from './channel-credentials';
import {
CallOptions,
Client,
ClientOptions,
CallInvocationTransformer,
CallProperties,
UnaryCallback,
@ -69,65 +70,52 @@ if (!semver.satisfies(process.version, supportedNodeVersions)) {
throw new Error(`@grpc/grpc-js only works on Node ${supportedNodeVersions}`);
}
interface IndexedObject {
[key: string]: any; // eslint-disable-line @typescript-eslint/no-explicit-any
[key: number]: any; // eslint-disable-line @typescript-eslint/no-explicit-any
}
function mixin(...sources: IndexedObject[]) {
const result: { [key: string]: Function } = {};
for (const source of sources) {
for (const propName of Object.getOwnPropertyNames(source)) {
const property: any = source[propName]; // eslint-disable-line @typescript-eslint/no-explicit-any
if (typeof property === 'function') {
result[propName] = property;
}
}
}
return result;
}
export { OAuth2Client };
/**** Client Credentials ****/
// Using assign only copies enumerable properties, which is what we want
export const credentials = mixin(
{
/**
* Combine a ChannelCredentials with any number of CallCredentials into a
* single ChannelCredentials object.
* @param channelCredentials The ChannelCredentials object.
* @param callCredentials Any number of CallCredentials objects.
* @return The resulting ChannelCredentials object.
*/
combineChannelCredentials: (
channelCredentials: ChannelCredentials,
...callCredentials: CallCredentials[]
): ChannelCredentials => {
return callCredentials.reduce(
(acc, other) => acc.compose(other),
channelCredentials
);
},
/**
* Combine any number of CallCredentials into a single CallCredentials
* object.
* @param first The first CallCredentials object.
* @param additional Any number of additional CallCredentials objects.
* @return The resulting CallCredentials object.
*/
combineCallCredentials: (
first: CallCredentials,
...additional: CallCredentials[]
): CallCredentials => {
return additional.reduce((acc, other) => acc.compose(other), first);
},
export const credentials = {
/**
* Combine a ChannelCredentials with any number of CallCredentials into a
* single ChannelCredentials object.
* @param channelCredentials The ChannelCredentials object.
* @param callCredentials Any number of CallCredentials objects.
* @return The resulting ChannelCredentials object.
*/
combineChannelCredentials: (
channelCredentials: ChannelCredentials,
...callCredentials: CallCredentials[]
): ChannelCredentials => {
return callCredentials.reduce(
(acc, other) => acc.compose(other),
channelCredentials
);
},
ChannelCredentials,
CallCredentials
);
/**
* Combine any number of CallCredentials into a single CallCredentials
* object.
* @param first The first CallCredentials object.
* @param additional Any number of additional CallCredentials objects.
* @return The resulting CallCredentials object.
*/
combineCallCredentials: (
first: CallCredentials,
...additional: CallCredentials[]
): CallCredentials => {
return additional.reduce((acc, other) => acc.compose(other), first);
},
// from channel-credentials.ts
createInsecure: ChannelCredentials.createInsecure,
createSsl: ChannelCredentials.createSsl,
// from call-credentials.ts
createFromMetadataGenerator: CallCredentials.createFromMetadataGenerator,
createFromGoogleCredential: CallCredentials.createFromGoogleCredential,
createEmpty: CallCredentials.createEmpty,
};
/**** Metadata ****/
@ -146,6 +134,7 @@ export {
export {
Client,
ClientOptions,
loadPackageDefinition,
makeClientConstructor,
makeClientConstructor as makeGenericClientConstructor,

View File

@ -0,0 +1,467 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import {
LoadBalancer,
ChannelControlHelper,
getFirstUsableConfig,
registerLoadBalancerType,
} from './load-balancer';
import { SubchannelAddress } from './subchannel';
import {
LoadBalancingConfig,
isPriorityLoadBalancingConfig,
} from './load-balancing-config';
import { ConnectivityState } from './channel';
import { Picker, QueuePicker, UnavailablePicker } from './picker';
import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { ChannelOptions } from './channel-options';
import { Status } from './constants';
import { Metadata } from './metadata';
const TYPE_NAME = 'priority';
const DEFAULT_FAILOVER_TIME_MS = 10_000;
const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000;
export type LocalitySubchannelAddress = SubchannelAddress & {
localityPath: string[];
};
export function isLocalitySubchannelAddress(
address: SubchannelAddress
): address is LocalitySubchannelAddress {
return Array.isArray((address as LocalitySubchannelAddress).localityPath);
}
interface PriorityChildBalancer {
updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
attributes: { [key: string]: unknown }
): void;
exitIdle(): void;
resetBackoff(): void;
deactivate(): void;
maybeReactivate(): void;
cancelFailoverTimer(): void;
isFailoverTimerPending(): boolean;
getConnectivityState(): ConnectivityState;
getPicker(): Picker;
getName(): string;
destroy(): void;
}
interface UpdateArgs {
subchannelAddress: SubchannelAddress[];
lbConfig: LoadBalancingConfig;
}
export class PriorityLoadBalancer implements LoadBalancer {
/**
* Inner class for holding a child priority and managing associated timers.
*/
private PriorityChildImpl = class implements PriorityChildBalancer {
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private picker: Picker;
private childBalancer: ChildLoadBalancerHandler;
private failoverTimer: NodeJS.Timer | null = null;
private deactivationTimer: NodeJS.Timer | null = null;
constructor(private parent: PriorityLoadBalancer, private name: string) {
this.childBalancer = new ChildLoadBalancerHandler({
createSubchannel: (
subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions
) => {
return this.parent.channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs
);
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker);
},
requestReresolution: () => {
this.parent.channelControlHelper.requestReresolution();
},
});
this.picker = new QueuePicker(this.childBalancer);
}
private updateState(connectivityState: ConnectivityState, picker: Picker) {
this.connectivityState = connectivityState;
this.picker = picker;
this.parent.onChildStateChange(this);
}
private startFailoverTimer() {
if (this.failoverTimer === null) {
this.failoverTimer = setTimeout(() => {
this.failoverTimer = null;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker()
);
}, DEFAULT_FAILOVER_TIME_MS);
}
}
updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
this.childBalancer.updateAddressList(addressList, lbConfig, attributes);
this.startFailoverTimer();
}
exitIdle() {
if (this.connectivityState === ConnectivityState.IDLE) {
this.startFailoverTimer();
}
this.childBalancer.exitIdle();
}
resetBackoff() {
this.childBalancer.resetBackoff();
}
deactivate() {
if (this.deactivationTimer === null) {
this.deactivationTimer = setTimeout(() => {
this.parent.deleteChild(this);
this.childBalancer.destroy();
}, DEFAULT_RETENTION_INTERVAL_MS);
}
}
maybeReactivate() {
if (this.deactivationTimer !== null) {
clearTimeout(this.deactivationTimer);
this.deactivationTimer = null;
}
}
cancelFailoverTimer() {
if (this.failoverTimer !== null) {
clearTimeout(this.failoverTimer);
this.failoverTimer = null;
}
}
isFailoverTimerPending() {
return this.failoverTimer !== null;
}
getConnectivityState() {
return this.connectivityState;
}
getPicker() {
return this.picker;
}
getName() {
return this.name;
}
destroy() {
this.childBalancer.destroy();
}
};
// End of inner class PriorityChildImpl
private children: Map<string, PriorityChildBalancer> = new Map<
string,
PriorityChildBalancer
>();
/**
* The priority order of child names from the latest config update.
*/
private priorities: string[] = [];
/**
* The attributes object from the latest update, saved to be passed along to
* each new child as they are created
*/
private latestAttributes: { [key: string]: unknown } = {};
/**
* The latest load balancing policies and address lists for each child from
* the latest update
*/
private latestUpdates: Map<string, UpdateArgs> = new Map<
string,
UpdateArgs
>();
/**
* Current chosen priority that requests are sent to
*/
private currentPriority: number | null = null;
/**
* After an update, this preserves the currently selected child from before
* the update. We continue to use that child until it disconnects, or
* another higher-priority child connects, or it is deleted because it is not
* in the new priority list at all and its retention interval has expired, or
* we try and fail to connect to every child in the new priority list.
*/
private currentChildFromBeforeUpdate: PriorityChildBalancer | null = null;
constructor(private channelControlHelper: ChannelControlHelper) {}
private updateState(state: ConnectivityState, picker: Picker) {
/* If switching to IDLE, use a QueuePicker attached to this load balancer
* so that when the picker calls exitIdle, that in turn calls exitIdle on
* the PriorityChildImpl, which will start the failover timer. */
if (state === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
}
this.channelControlHelper.updateState(state, picker);
}
private onChildStateChange(child: PriorityChildBalancer) {
const childState = child.getConnectivityState();
if (child === this.currentChildFromBeforeUpdate) {
if (
childState === ConnectivityState.READY ||
childState === ConnectivityState.IDLE
) {
this.updateState(childState, child.getPicker());
} else {
this.currentChildFromBeforeUpdate = null;
this.tryNextPriority(true);
}
return;
}
const childPriority = this.priorities.indexOf(child.getName());
if (childPriority < 0) {
// child is not in the priority list, ignore updates
return;
}
if (this.currentPriority !== null && childPriority > this.currentPriority) {
// child is lower priority than the currently selected child, ignore updates
return;
}
if (childState === ConnectivityState.TRANSIENT_FAILURE) {
/* Report connecting if and only if the currently selected child is the
* one entering TRANSIENT_FAILURE */
this.tryNextPriority(childPriority === this.currentPriority);
return;
}
if (this.currentPriority === null || childPriority < this.currentPriority) {
/* In this case, either there is no currently selected child or this
* child is higher priority than the currently selected child, so we want
* to switch to it if it is READY or IDLE. */
if (
childState === ConnectivityState.READY ||
childState === ConnectivityState.IDLE
) {
this.selectPriority(childPriority);
}
return;
}
/* The currently selected child has updated state to something other than
* TRANSIENT_FAILURE, so we pass that update along */
this.updateState(childState, child.getPicker());
}
private deleteChild(child: PriorityChildBalancer) {
if (child === this.currentChildFromBeforeUpdate) {
this.currentChildFromBeforeUpdate = null;
/* If we get to this point, the currentChildFromBeforeUpdate was still in
* use, so we are still trying to connect to the specified priorities */
this.tryNextPriority(true);
}
}
/**
* Select the child at the specified priority, and report that child's state
* as this balancer's state until that child disconnects or a higher-priority
* child connects.
* @param priority
*/
private selectPriority(priority: number) {
this.currentPriority = priority;
const chosenChild = this.children.get(this.priorities[priority])!;
this.updateState(
chosenChild.getConnectivityState(),
chosenChild.getPicker()
);
this.currentChildFromBeforeUpdate = null;
// Deactivate each child of lower priority than the chosen child
for (let i = priority + 1; i < this.priorities.length; i++) {
this.children.get(this.priorities[i])?.deactivate();
}
}
/**
* Check each child in priority order until we find one to use
* @param reportConnecting Whether we should report a CONNECTING state if we
* stop before picking a specific child. This should be true when we have
* not already selected a child.
*/
private tryNextPriority(reportConnecting: boolean) {
for (const [index, childName] of this.priorities.entries()) {
let child = this.children.get(childName);
/* If the child doesn't already exist, create it and update it. */
if (child === undefined) {
if (reportConnecting) {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
}
child = new this.PriorityChildImpl(this, childName);
this.children.set(childName, child);
const childUpdate = this.latestUpdates.get(childName);
if (childUpdate !== undefined) {
child.updateAddressList(
childUpdate.subchannelAddress,
childUpdate.lbConfig,
this.latestAttributes
);
}
}
/* We're going to try to use this child, so reactivate it if it has been
* deactivated */
child.maybeReactivate();
if (
child.getConnectivityState() === ConnectivityState.READY ||
child.getConnectivityState() === ConnectivityState.IDLE
) {
this.selectPriority(index);
return;
}
if (child.isFailoverTimerPending()) {
/* This child is still trying to connect. Wait until its failover timer
* has ended to continue to the next one */
if (reportConnecting) {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
}
return;
}
}
this.currentPriority = null;
this.currentChildFromBeforeUpdate = null;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
code: Status.UNAVAILABLE,
details: 'No ready priority',
metadata: new Metadata(),
})
);
}
updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
if (!isPriorityLoadBalancingConfig(lbConfig)) {
// Reject a config of the wrong type
return;
}
const priorityConfig = lbConfig.priority;
/* For each address, the first element of its localityPath array determines
* which child it belongs to. So we bucket those addresses by that first
* element, and pass along the rest of the localityPath for that child
* to use. */
const childAddressMap: Map<string, LocalitySubchannelAddress[]> = new Map<
string,
LocalitySubchannelAddress[]
>();
for (const address of addressList) {
if (!isLocalitySubchannelAddress(address)) {
// Reject address that cannot be prioritized
return;
}
if (address.localityPath.length < 1) {
// Reject address that cannot be prioritized
return;
}
const childName = address.localityPath[0];
const childAddress: LocalitySubchannelAddress = {
...address,
localityPath: address.localityPath.slice(1),
};
let childAddressList = childAddressMap.get(childName);
if (childAddressList === undefined) {
childAddressList = [];
childAddressMap.set(childName, childAddressList);
}
childAddressList.push(childAddress);
}
if (this.currentPriority !== null) {
this.currentChildFromBeforeUpdate = this.children.get(
this.priorities[this.currentPriority]
)!;
this.currentPriority = null;
}
this.latestAttributes = attributes;
this.latestUpdates.clear();
this.priorities = priorityConfig.priorities;
/* Pair up the new child configs with the corresponding address lists, and
* update all existing children with their new configs */
for (const [childName, childConfig] of priorityConfig.children) {
const chosenChildConfig = getFirstUsableConfig(childConfig.config);
if (chosenChildConfig !== null) {
const childAddresses = childAddressMap.get(childName) ?? [];
this.latestUpdates.set(childName, {
subchannelAddress: childAddresses,
lbConfig: chosenChildConfig,
});
const existingChild = this.children.get(childName);
if (existingChild !== undefined) {
existingChild.updateAddressList(
childAddresses,
chosenChildConfig,
attributes
);
}
}
}
// Deactivate all children that are no longer in the priority list
for (const [childName, child] of this.children) {
if (this.priorities.indexOf(childName) < 0) {
child.deactivate();
}
}
// Only report connecting if there are no existing children
this.tryNextPriority(this.children.size === 0);
}
exitIdle(): void {
if (this.currentPriority !== null) {
this.children.get(this.priorities[this.currentPriority])?.exitIdle();
}
}
resetBackoff(): void {
for (const child of this.children.values()) {
child.resetBackoff();
}
}
destroy(): void {
for (const child of this.children.values()) {
child.destroy();
}
this.children.clear();
this.currentChildFromBeforeUpdate?.destroy();
this.currentChildFromBeforeUpdate = null;
}
getTypeName(): string {
return TYPE_NAME;
}
}
export function setup() {
registerLoadBalancerType(TYPE_NAME, PriorityLoadBalancer);
}

View File

@ -22,6 +22,7 @@ import { Picker } from './picker';
import { LoadBalancingConfig } from './load-balancing-config';
import * as load_balancer_pick_first from './load-balancer-pick-first';
import * as load_balancer_round_robin from './load-balancer-round-robin';
import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
/**
@ -138,5 +139,6 @@ export function getFirstUsableConfig(
export function registerAll() {
load_balancer_pick_first.setup();
load_balancer_round_robin.setup();
load_balancer_priority.setup();
load_balancer_weighted_target.setup();
}

View File

@ -341,15 +341,13 @@ export type Handler<RequestType, ResponseType> =
export type HandlerType = 'bidi' | 'clientStream' | 'serverStream' | 'unary';
const noopTimer: NodeJS.Timer = setTimeout(() => {}, 0);
// Internal class that wraps the HTTP2 request.
export class Http2ServerCallStream<
RequestType,
ResponseType
> extends EventEmitter {
cancelled = false;
deadline: NodeJS.Timer = noopTimer;
deadline: NodeJS.Timer = setTimeout(() => {}, 0);
private wantTrailers = false;
private metadataSent = false;
private canPush = false;
@ -389,6 +387,9 @@ export class Http2ServerCallStream<
if ('grpc.max_receive_message_length' in options) {
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
}
// Clear noop timer
clearTimeout(this.deadline);
}
private checkCancelled(): boolean {

View File

@ -231,21 +231,25 @@ export class Subchannel {
maxDelay: options['grpc.max_reconnect_backoff_ms'],
};
this.backoffTimeout = new BackoffTimeout(() => {
if (this.continueConnecting) {
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE],
ConnectivityState.CONNECTING
);
} else {
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE],
ConnectivityState.IDLE
);
}
this.handleBackoffTimer();
}, backoffOptions);
this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
}
private handleBackoffTimer() {
if (this.continueConnecting) {
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE],
ConnectivityState.CONNECTING
);
} else {
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE],
ConnectivityState.IDLE
);
}
}
/**
* Start a backoff timer with the current nextBackoff timeout
*/
@ -505,12 +509,16 @@ export class Subchannel {
}
this.session = null;
this.stopKeepalivePings();
/* If the backoff timer has already ended by the time we get to the
* TRANSIENT_FAILURE state, we want to immediately transition out of
* TRANSIENT_FAILURE as though the backoff timer is ending right now */
if (!this.backoffTimeout.isRunning()) {
process.nextTick(() => {
this.handleBackoffTimer();
});
}
break;
case ConnectivityState.IDLE:
/* Stopping the backoff timer here is probably redundant because we
* should only transition to the IDLE state as a result of the timer
* ending, but we still want to reset the backoff timeout. */
this.stopBackoff();
if (this.session) {
this.session.close();
}