mirror of https://github.com/grpc/grpc-node.git
Merge branch 'master' into proto-loader_type_generator
This commit is contained in:
commit
4cc4a17910
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "@grpc/grpc-js",
|
"name": "@grpc/grpc-js",
|
||||||
"version": "1.0.5",
|
"version": "1.1.2",
|
||||||
"description": "gRPC Library for Node - pure JS implementation",
|
"description": "gRPC Library for Node - pure JS implementation",
|
||||||
"homepage": "https://grpc.io/",
|
"homepage": "https://grpc.io/",
|
||||||
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
|
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
|
||||||
|
@ -59,9 +59,6 @@
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"semver": "^6.2.0"
|
"semver": "^6.2.0"
|
||||||
},
|
},
|
||||||
"peerDependencies": {
|
|
||||||
"google-auth-library": "5.x || 6.x"
|
|
||||||
},
|
|
||||||
"files": [
|
"files": [
|
||||||
"src/*.ts",
|
"src/*.ts",
|
||||||
"build/src/*.{js,d.ts,js.map}",
|
"build/src/*.{js,d.ts,js.map}",
|
||||||
|
|
|
@ -20,6 +20,7 @@ import { Channel } from './channel';
|
||||||
import { BaseFilter, Filter, FilterFactory } from './filter';
|
import { BaseFilter, Filter, FilterFactory } from './filter';
|
||||||
import { Metadata } from './metadata';
|
import { Metadata } from './metadata';
|
||||||
import { Status } from './constants';
|
import { Status } from './constants';
|
||||||
|
import { splitHostPort } from './uri-parser';
|
||||||
|
|
||||||
export class CallCredentialsFilter extends BaseFilter implements Filter {
|
export class CallCredentialsFilter extends BaseFilter implements Filter {
|
||||||
private serviceUrl: string;
|
private serviceUrl: string;
|
||||||
|
@ -38,9 +39,10 @@ export class CallCredentialsFilter extends BaseFilter implements Filter {
|
||||||
if (splitPath.length >= 2) {
|
if (splitPath.length >= 2) {
|
||||||
serviceName = splitPath[1];
|
serviceName = splitPath[1];
|
||||||
}
|
}
|
||||||
|
const hostname = splitHostPort(stream.getHost())?.host ?? 'localhost';
|
||||||
/* Currently, call credentials are only allowed on HTTPS connections, so we
|
/* Currently, call credentials are only allowed on HTTPS connections, so we
|
||||||
* can assume that the scheme is "https" */
|
* can assume that the scheme is "https" */
|
||||||
this.serviceUrl = `https://${stream.getHost()}/${serviceName}`;
|
this.serviceUrl = `https://${hostname}/${serviceName}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
|
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
|
||||||
|
|
|
@ -541,6 +541,15 @@ export class Http2CallStream implements Call {
|
||||||
code = Status.PERMISSION_DENIED;
|
code = Status.PERMISSION_DENIED;
|
||||||
details = 'Protocol not secure enough';
|
details = 'Protocol not secure enough';
|
||||||
break;
|
break;
|
||||||
|
case http2.constants.NGHTTP2_INTERNAL_ERROR:
|
||||||
|
code = Status.INTERNAL;
|
||||||
|
/* This error code was previously handled in the default case, and
|
||||||
|
* there are several instances of it online, so I wanted to
|
||||||
|
* preserve the original error message so that people find existing
|
||||||
|
* information in searches, but also include the more recognizable
|
||||||
|
* "Internal server error" message. */
|
||||||
|
details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
code = Status.INTERNAL;
|
code = Status.INTERNAL;
|
||||||
details = `Received RST_STREAM with code ${stream.rstCode}`;
|
details = `Received RST_STREAM with code ${stream.rstCode}`;
|
||||||
|
|
|
@ -558,9 +558,7 @@ export class Client {
|
||||||
},
|
},
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
onReceiveMessage(message: any) {
|
onReceiveMessage(message: any) {
|
||||||
if (stream.push(message)) {
|
stream.push(message);
|
||||||
call.startRead();
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
onReceiveStatus(status: StatusObject) {
|
onReceiveStatus(status: StatusObject) {
|
||||||
if (receivedStatus) {
|
if (receivedStatus) {
|
||||||
|
@ -656,9 +654,7 @@ export class Client {
|
||||||
stream.emit('metadata', metadata);
|
stream.emit('metadata', metadata);
|
||||||
},
|
},
|
||||||
onReceiveMessage(message: Buffer) {
|
onReceiveMessage(message: Buffer) {
|
||||||
if (stream.push(message)) {
|
stream.push(message)
|
||||||
call.startRead();
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
onReceiveStatus(status: StatusObject) {
|
onReceiveStatus(status: StatusObject) {
|
||||||
if (receivedStatus) {
|
if (receivedStatus) {
|
||||||
|
|
|
@ -41,6 +41,18 @@ export enum LogVerbosity {
|
||||||
ERROR,
|
ERROR,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NOTE: This enum is not currently used in any implemented API in this
|
||||||
|
* library. It is included only for type parity with the other implementation.
|
||||||
|
*/
|
||||||
|
export enum Propagate {
|
||||||
|
DEADLINE = 1,
|
||||||
|
CENSUS_STATS_CONTEXT = 2,
|
||||||
|
CENSUS_TRACING_CONTEXT = 4,
|
||||||
|
CANCELLATION = 8,
|
||||||
|
DEFAULTS = 65536,
|
||||||
|
}
|
||||||
|
|
||||||
// -1 means unlimited
|
// -1 means unlimited
|
||||||
export const DEFAULT_MAX_SEND_MESSAGE_LENGTH = -1;
|
export const DEFAULT_MAX_SEND_MESSAGE_LENGTH = -1;
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ import {
|
||||||
CallProperties,
|
CallProperties,
|
||||||
UnaryCallback,
|
UnaryCallback,
|
||||||
} from './client';
|
} from './client';
|
||||||
import { LogVerbosity, Status } from './constants';
|
import { LogVerbosity, Status, Propagate } from './constants';
|
||||||
import * as logging from './logging';
|
import * as logging from './logging';
|
||||||
import {
|
import {
|
||||||
Deserialize,
|
Deserialize,
|
||||||
|
@ -127,6 +127,7 @@ export {
|
||||||
LogVerbosity as logVerbosity,
|
LogVerbosity as logVerbosity,
|
||||||
Status as status,
|
Status as status,
|
||||||
ConnectivityState as connectivityState,
|
ConnectivityState as connectivityState,
|
||||||
|
Propagate as propagate,
|
||||||
// TODO: Other constants as well
|
// TODO: Other constants as well
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,322 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2020 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { LoadBalancer, ChannelControlHelper, getFirstUsableConfig, registerLoadBalancerType } from "./load-balancer";
|
||||||
|
import { SubchannelAddress } from "./subchannel";
|
||||||
|
import { LoadBalancingConfig, WeightedTarget, isWeightedTargetLoadBalancingConfig } from "./load-balancing-config";
|
||||||
|
import { Picker, PickResult, PickArgs, QueuePicker, UnavailablePicker } from "./picker";
|
||||||
|
import { ConnectivityState } from "./channel";
|
||||||
|
import { ChildLoadBalancerHandler } from "./load-balancer-child-handler";
|
||||||
|
import { Status } from "./constants";
|
||||||
|
import { Metadata } from "./metadata";
|
||||||
|
import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority";
|
||||||
|
|
||||||
|
const TYPE_NAME = 'weighted_target';
|
||||||
|
|
||||||
|
const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a picker and a subinterval of a larger interval used for randomly
|
||||||
|
* selecting an element of a list of these objects.
|
||||||
|
*/
|
||||||
|
interface WeightedPicker {
|
||||||
|
picker: Picker;
|
||||||
|
/**
|
||||||
|
* The exclusive end of the interval associated with this element. The start
|
||||||
|
* of the interval is implicitly the rangeEnd of the previous element in the
|
||||||
|
* list, or 0 for the first element in the list.
|
||||||
|
*/
|
||||||
|
rangeEnd: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
class WeightedTargetPicker implements Picker {
|
||||||
|
private rangeTotal: number;
|
||||||
|
constructor(private readonly pickerList: WeightedPicker[]) {
|
||||||
|
this.rangeTotal = pickerList[pickerList.length - 1].rangeEnd;
|
||||||
|
}
|
||||||
|
pick(pickArgs: PickArgs): PickResult {
|
||||||
|
// num | 0 is equivalent to floor(num)
|
||||||
|
const selection = (Math.random() * this.rangeTotal) | 0;
|
||||||
|
|
||||||
|
/* Binary search for the element of the list such that
|
||||||
|
* pickerList[index - 1].rangeEnd <= selection < pickerList[index].rangeEnd
|
||||||
|
*/
|
||||||
|
let mid = 0;
|
||||||
|
let startIndex = 0;
|
||||||
|
let endIndex = this.pickerList.length - 1;
|
||||||
|
let index = 0;
|
||||||
|
while (endIndex > startIndex) {
|
||||||
|
mid = ((startIndex + endIndex) / 2) | 0;
|
||||||
|
if (this.pickerList[mid].rangeEnd > selection) {
|
||||||
|
endIndex = mid;
|
||||||
|
} else if (this.pickerList[mid].rangeEnd < selection) {
|
||||||
|
startIndex = mid + 1;
|
||||||
|
} else {
|
||||||
|
// + 1 here because the range is exclusive at the top end
|
||||||
|
index = mid + 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (index === 0) {
|
||||||
|
index = startIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.pickerList[index].picker.pick(pickArgs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface WeightedChild {
|
||||||
|
updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void;
|
||||||
|
exitIdle(): void;
|
||||||
|
resetBackoff(): void;
|
||||||
|
destroy(): void;
|
||||||
|
deactivate(): void;
|
||||||
|
maybeReactivate(): void;
|
||||||
|
getConnectivityState(): ConnectivityState;
|
||||||
|
getPicker(): Picker;
|
||||||
|
getWeight(): number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class WeightedTargetLoadBalancer implements LoadBalancer {
|
||||||
|
private WeightedChildImpl = class implements WeightedChild {
|
||||||
|
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
||||||
|
private picker: Picker;
|
||||||
|
private childBalancer: ChildLoadBalancerHandler;
|
||||||
|
private deactivationTimer: NodeJS.Timer | null = null;
|
||||||
|
private weight: number = 0;
|
||||||
|
|
||||||
|
constructor(private parent: WeightedTargetLoadBalancer, private name: string) {
|
||||||
|
this.childBalancer = new ChildLoadBalancerHandler({
|
||||||
|
createSubchannel: (subchannelAddress, subchannelOptions) => {
|
||||||
|
return this.parent.channelControlHelper.createSubchannel(subchannelAddress, subchannelOptions);
|
||||||
|
},
|
||||||
|
updateState: (connectivityState, picker) => {
|
||||||
|
this.updateState(connectivityState, picker);
|
||||||
|
},
|
||||||
|
requestReresolution: () => {
|
||||||
|
this.parent.channelControlHelper.requestReresolution();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
this.picker = new QueuePicker(this.childBalancer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private updateState(connectivityState: ConnectivityState, picker: Picker) {
|
||||||
|
this.connectivityState = connectivityState;
|
||||||
|
this.picker = picker;
|
||||||
|
this.parent.updateState();
|
||||||
|
}
|
||||||
|
|
||||||
|
updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void {
|
||||||
|
this.weight = lbConfig.weight;
|
||||||
|
const childConfig = getFirstUsableConfig(lbConfig.child_policy);
|
||||||
|
if (childConfig !== null) {
|
||||||
|
this.childBalancer.updateAddressList(addressList, childConfig, attributes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
exitIdle(): void {
|
||||||
|
this.childBalancer.exitIdle();
|
||||||
|
}
|
||||||
|
resetBackoff(): void {
|
||||||
|
this.childBalancer.resetBackoff();
|
||||||
|
}
|
||||||
|
destroy(): void {
|
||||||
|
this.childBalancer.destroy();
|
||||||
|
if (this.deactivationTimer !== null) {
|
||||||
|
clearTimeout(this.deactivationTimer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
deactivate(): void {
|
||||||
|
if (this.deactivationTimer === null) {
|
||||||
|
this.deactivationTimer = setTimeout(() => {
|
||||||
|
this.parent.targets.delete(this.name);
|
||||||
|
this.deactivationTimer = null;
|
||||||
|
}, DEFAULT_RETENTION_INTERVAL_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
maybeReactivate(): void {
|
||||||
|
if (this.deactivationTimer !== null) {
|
||||||
|
clearTimeout(this.deactivationTimer);
|
||||||
|
this.deactivationTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
getConnectivityState(): ConnectivityState {
|
||||||
|
return this.connectivityState;
|
||||||
|
}
|
||||||
|
getPicker(): Picker {
|
||||||
|
return this.picker;
|
||||||
|
}
|
||||||
|
getWeight(): number {
|
||||||
|
return this.weight;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// end of WeightedChildImpl
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map of target names to target children. Includes current targets and
|
||||||
|
* previous targets with deactivation timers that have not yet triggered.
|
||||||
|
*/
|
||||||
|
private targets: Map<string, WeightedChild> = new Map<string, WeightedChild>();
|
||||||
|
/**
|
||||||
|
* List of current target names.
|
||||||
|
*/
|
||||||
|
private targetList: string[] = [];
|
||||||
|
|
||||||
|
constructor(private channelControlHelper: ChannelControlHelper) {}
|
||||||
|
|
||||||
|
private updateState() {
|
||||||
|
const pickerList: WeightedPicker[] = [];
|
||||||
|
let end = 0;
|
||||||
|
|
||||||
|
let connectingCount = 0;
|
||||||
|
let idleCount = 0;
|
||||||
|
let transientFailureCount = 0;
|
||||||
|
for (const targetName of this.targetList) {
|
||||||
|
const target = this.targets.get(targetName);
|
||||||
|
if (target === undefined) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
switch (target.getConnectivityState()) {
|
||||||
|
case ConnectivityState.READY:
|
||||||
|
end += target.getWeight();
|
||||||
|
pickerList.push({
|
||||||
|
picker: target.getPicker(),
|
||||||
|
rangeEnd: end
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
case ConnectivityState.CONNECTING:
|
||||||
|
connectingCount += 1;
|
||||||
|
break;
|
||||||
|
case ConnectivityState.IDLE:
|
||||||
|
idleCount += 1;
|
||||||
|
break;
|
||||||
|
case ConnectivityState.TRANSIENT_FAILURE:
|
||||||
|
transientFailureCount += 1;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// Ignore the other possiblity, SHUTDOWN
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let connectivityState: ConnectivityState;
|
||||||
|
if (pickerList.length > 0) {
|
||||||
|
connectivityState = ConnectivityState.READY;
|
||||||
|
} else if (connectingCount > 0) {
|
||||||
|
connectivityState = ConnectivityState.CONNECTING;
|
||||||
|
} else if (idleCount > 0) {
|
||||||
|
connectivityState = ConnectivityState.IDLE;
|
||||||
|
} else {
|
||||||
|
connectivityState = ConnectivityState.TRANSIENT_FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
|
let picker: Picker;
|
||||||
|
switch (connectivityState) {
|
||||||
|
case ConnectivityState.READY:
|
||||||
|
picker = new WeightedTargetPicker(pickerList);
|
||||||
|
break;
|
||||||
|
case ConnectivityState.CONNECTING:
|
||||||
|
case ConnectivityState.READY:
|
||||||
|
picker = new QueuePicker(this);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
picker = new UnavailablePicker({
|
||||||
|
code: Status.UNAVAILABLE,
|
||||||
|
details: 'weighted_target: all children report state TRANSIENT_FAILURE',
|
||||||
|
metadata: new Metadata()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.channelControlHelper.updateState(connectivityState, picker);
|
||||||
|
}
|
||||||
|
|
||||||
|
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||||
|
if (!isWeightedTargetLoadBalancingConfig(lbConfig)) {
|
||||||
|
// Reject a config of the wrong type
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* For each address, the first element of its localityPath array determines
|
||||||
|
* which child it belongs to. So we bucket those addresses by that first
|
||||||
|
* element, and pass along the rest of the localityPath for that child
|
||||||
|
* to use. */
|
||||||
|
const childAddressMap = new Map<string, SubchannelAddress[]>();
|
||||||
|
for (const address of addressList) {
|
||||||
|
if (!isLocalitySubchannelAddress(address)) {
|
||||||
|
// Reject address that cannot be associated with targets
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (address.localityPath.length < 1) {
|
||||||
|
// Reject address that cannot be associated with targets
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const childName = address.localityPath[0];
|
||||||
|
const childAddress: LocalitySubchannelAddress = {
|
||||||
|
...address,
|
||||||
|
localityPath: address.localityPath.slice(1),
|
||||||
|
};
|
||||||
|
let childAddressList = childAddressMap.get(childName);
|
||||||
|
if (childAddressList === undefined) {
|
||||||
|
childAddressList = [];
|
||||||
|
childAddressMap.set(childName, childAddressList);
|
||||||
|
}
|
||||||
|
childAddressList.push(childAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.targetList = Array.from(lbConfig.weighted_target.targets.keys());
|
||||||
|
for (const [targetName, targetConfig] of lbConfig.weighted_target.targets) {
|
||||||
|
let target = this.targets.get(targetName);
|
||||||
|
if (target === undefined) {
|
||||||
|
target = new this.WeightedChildImpl(this, targetName);
|
||||||
|
this.targets.set(targetName, target);
|
||||||
|
} else {
|
||||||
|
target.maybeReactivate();
|
||||||
|
}
|
||||||
|
target.updateAddressList(childAddressMap.get(targetName) ?? [], targetConfig, attributes);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deactivate targets that are not in the new config
|
||||||
|
for (const [targetName, target] of this.targets) {
|
||||||
|
if (this.targetList.indexOf(targetName) < 0) {
|
||||||
|
target.deactivate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.updateState();
|
||||||
|
}
|
||||||
|
exitIdle(): void {
|
||||||
|
for (const targetName of this.targetList) {
|
||||||
|
this.targets.get(targetName)?.exitIdle();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resetBackoff(): void {
|
||||||
|
for (const targetName of this.targetList) {
|
||||||
|
this.targets.get(targetName)?.resetBackoff();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
destroy(): void {
|
||||||
|
for (const target of this.targets.values()) {
|
||||||
|
target.destroy();
|
||||||
|
}
|
||||||
|
this.targets.clear();
|
||||||
|
}
|
||||||
|
getTypeName(): string {
|
||||||
|
return TYPE_NAME;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function setup() {
|
||||||
|
registerLoadBalancerType(TYPE_NAME, WeightedTargetLoadBalancer);
|
||||||
|
}
|
|
@ -23,6 +23,7 @@ import { LoadBalancingConfig } from './load-balancing-config';
|
||||||
import * as load_balancer_pick_first from './load-balancer-pick-first';
|
import * as load_balancer_pick_first from './load-balancer-pick-first';
|
||||||
import * as load_balancer_round_robin from './load-balancer-round-robin';
|
import * as load_balancer_round_robin from './load-balancer-round-robin';
|
||||||
import * as load_balancer_priority from './load-balancer-priority';
|
import * as load_balancer_priority from './load-balancer-priority';
|
||||||
|
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A collection of functions associated with a channel that a load balancer
|
* A collection of functions associated with a channel that a load balancer
|
||||||
|
@ -139,4 +140,5 @@ export function registerAll() {
|
||||||
load_balancer_pick_first.setup();
|
load_balancer_pick_first.setup();
|
||||||
load_balancer_round_robin.setup();
|
load_balancer_round_robin.setup();
|
||||||
load_balancer_priority.setup();
|
load_balancer_priority.setup();
|
||||||
|
load_balancer_weighted_target.setup();
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,15 @@ export interface PriorityLbConfig {
|
||||||
priorities: string[];
|
priorities: string[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface WeightedTarget {
|
||||||
|
weight: number;
|
||||||
|
child_policy: LoadBalancingConfig[];
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface WeightedTargetLbConfig {
|
||||||
|
targets: Map<string, WeightedTarget>;
|
||||||
|
}
|
||||||
|
|
||||||
export interface PickFirstLoadBalancingConfig {
|
export interface PickFirstLoadBalancingConfig {
|
||||||
name: 'pick_first';
|
name: 'pick_first';
|
||||||
pick_first: PickFirstConfig;
|
pick_first: PickFirstConfig;
|
||||||
|
@ -73,12 +82,18 @@ export interface PriorityLoadBalancingConfig {
|
||||||
priority: PriorityLbConfig;
|
priority: PriorityLbConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface WeightedTargetLoadBalancingConfig {
|
||||||
|
name: 'weighted_target';
|
||||||
|
weighted_target: WeightedTargetLbConfig;
|
||||||
|
}
|
||||||
|
|
||||||
export type LoadBalancingConfig =
|
export type LoadBalancingConfig =
|
||||||
| PickFirstLoadBalancingConfig
|
| PickFirstLoadBalancingConfig
|
||||||
| RoundRobinLoadBalancingConfig
|
| RoundRobinLoadBalancingConfig
|
||||||
| XdsLoadBalancingConfig
|
| XdsLoadBalancingConfig
|
||||||
| GrpcLbLoadBalancingConfig
|
| GrpcLbLoadBalancingConfig
|
||||||
| PriorityLoadBalancingConfig;
|
| PriorityLoadBalancingConfig
|
||||||
|
| WeightedTargetLoadBalancingConfig;
|
||||||
|
|
||||||
export function isRoundRobinLoadBalancingConfig(
|
export function isRoundRobinLoadBalancingConfig(
|
||||||
lbconfig: LoadBalancingConfig
|
lbconfig: LoadBalancingConfig
|
||||||
|
@ -104,6 +119,12 @@ export function isPriorityLoadBalancingConfig(
|
||||||
return lbconfig.name === 'priority';
|
return lbconfig.name === 'priority';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function isWeightedTargetLoadBalancingConfig(
|
||||||
|
lbconfig: LoadBalancingConfig
|
||||||
|
): lbconfig is WeightedTargetLoadBalancingConfig {
|
||||||
|
return lbconfig.name === 'weighted_target';
|
||||||
|
}
|
||||||
|
|
||||||
/* In these functions we assume the input came from a JSON object. Therefore we
|
/* In these functions we assume the input came from a JSON object. Therefore we
|
||||||
* expect that the prototype is uninteresting and that `in` can be used
|
* expect that the prototype is uninteresting and that `in` can be used
|
||||||
* effectively */
|
* effectively */
|
||||||
|
|
|
@ -269,12 +269,7 @@ class DnsResolver implements Resolver {
|
||||||
* @param target
|
* @param target
|
||||||
*/
|
*/
|
||||||
static getDefaultAuthority(target: GrpcUri): string {
|
static getDefaultAuthority(target: GrpcUri): string {
|
||||||
const hostPort = splitHostPort(target.path);
|
return target.path;
|
||||||
if (hostPort !== null) {
|
|
||||||
return hostPort.host;
|
|
||||||
} else {
|
|
||||||
throw new Error(`Failed to parse target ${uriToString(target)}`);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -500,10 +500,11 @@ export class Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If any sessions are active, close them gracefully.
|
|
||||||
pendingChecks += this.sessions.size;
|
|
||||||
this.sessions.forEach((session) => {
|
this.sessions.forEach((session) => {
|
||||||
|
if (!session.closed) {
|
||||||
|
pendingChecks += 1;
|
||||||
session.close(maybeCallback);
|
session.close(maybeCallback);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
if (pendingChecks === 0) {
|
if (pendingChecks === 0) {
|
||||||
callback();
|
callback();
|
||||||
|
@ -608,6 +609,10 @@ export class Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.sessions.add(session);
|
this.sessions.add(session);
|
||||||
|
|
||||||
|
session.on('close', () => {
|
||||||
|
this.sessions.delete(session);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ import * as logging from './logging';
|
||||||
import { LogVerbosity } from './constants';
|
import { LogVerbosity } from './constants';
|
||||||
import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
|
import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
|
||||||
import * as net from 'net';
|
import * as net from 'net';
|
||||||
import { GrpcUri } from './uri-parser';
|
import { GrpcUri, parseUri, splitHostPort } from './uri-parser';
|
||||||
import { ConnectionOptions } from 'tls';
|
import { ConnectionOptions } from 'tls';
|
||||||
import { FilterFactory, Filter } from './filter';
|
import { FilterFactory, Filter } from './filter';
|
||||||
|
|
||||||
|
@ -286,6 +286,9 @@ export class Subchannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
private createSession(proxyConnectionResult: ProxyConnectionResult) {
|
private createSession(proxyConnectionResult: ProxyConnectionResult) {
|
||||||
|
const targetAuthority = getDefaultAuthority(
|
||||||
|
proxyConnectionResult.realTarget ?? this.channelTarget
|
||||||
|
);
|
||||||
let connectionOptions: http2.SecureClientSessionOptions =
|
let connectionOptions: http2.SecureClientSessionOptions =
|
||||||
this.credentials._getConnectionOptions() || {};
|
this.credentials._getConnectionOptions() || {};
|
||||||
let addressScheme = 'http://';
|
let addressScheme = 'http://';
|
||||||
|
@ -305,8 +308,18 @@ export class Subchannel {
|
||||||
return checkServerIdentity(sslTargetNameOverride, cert);
|
return checkServerIdentity(sslTargetNameOverride, cert);
|
||||||
};
|
};
|
||||||
connectionOptions.servername = sslTargetNameOverride;
|
connectionOptions.servername = sslTargetNameOverride;
|
||||||
|
} else {
|
||||||
|
const authorityHostname =
|
||||||
|
splitHostPort(targetAuthority)?.host ?? 'localhost';
|
||||||
|
// We want to always set servername to support SNI
|
||||||
|
connectionOptions.servername = authorityHostname;
|
||||||
}
|
}
|
||||||
if (proxyConnectionResult.socket) {
|
if (proxyConnectionResult.socket) {
|
||||||
|
/* This is part of the workaround for
|
||||||
|
* https://github.com/nodejs/node/issues/32922. Without that bug,
|
||||||
|
* proxyConnectionResult.socket would always be a plaintext socket and
|
||||||
|
* this would say
|
||||||
|
* connectionOptions.socket = proxyConnectionResult.socket; */
|
||||||
connectionOptions.createConnection = (authority, option) => {
|
connectionOptions.createConnection = (authority, option) => {
|
||||||
return proxyConnectionResult.socket!;
|
return proxyConnectionResult.socket!;
|
||||||
};
|
};
|
||||||
|
@ -350,10 +363,7 @@ export class Subchannel {
|
||||||
* determines whether the connection will be established over TLS or not.
|
* determines whether the connection will be established over TLS or not.
|
||||||
*/
|
*/
|
||||||
const session = http2.connect(
|
const session = http2.connect(
|
||||||
addressScheme +
|
addressScheme + targetAuthority,
|
||||||
getDefaultAuthority(
|
|
||||||
proxyConnectionResult.realTarget ?? this.channelTarget
|
|
||||||
),
|
|
||||||
connectionOptions
|
connectionOptions
|
||||||
);
|
);
|
||||||
this.session = session;
|
this.session = session;
|
||||||
|
@ -404,6 +414,11 @@ export class Subchannel {
|
||||||
KEEPALIVE_MAX_TIME_MS
|
KEEPALIVE_MAX_TIME_MS
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
trace(
|
||||||
|
this.subchannelAddress +
|
||||||
|
' connection closed by GOAWAY with code ' +
|
||||||
|
errorCode
|
||||||
|
);
|
||||||
this.transitionToState(
|
this.transitionToState(
|
||||||
[ConnectivityState.CONNECTING, ConnectivityState.READY],
|
[ConnectivityState.CONNECTING, ConnectivityState.READY],
|
||||||
ConnectivityState.IDLE
|
ConnectivityState.IDLE
|
||||||
|
@ -446,6 +461,18 @@ export class Subchannel {
|
||||||
return checkServerIdentity(sslTargetNameOverride, cert);
|
return checkServerIdentity(sslTargetNameOverride, cert);
|
||||||
};
|
};
|
||||||
connectionOptions.servername = sslTargetNameOverride;
|
connectionOptions.servername = sslTargetNameOverride;
|
||||||
|
} else {
|
||||||
|
if ('grpc.http_connect_target' in this.options) {
|
||||||
|
/* This is more or less how servername will be set in createSession
|
||||||
|
* if a connection is successfully established through the proxy.
|
||||||
|
* If the proxy is not used, these connectionOptions are discarded
|
||||||
|
* anyway */
|
||||||
|
connectionOptions.servername = getDefaultAuthority(
|
||||||
|
parseUri(this.options['grpc.http_connect_target'] as string) ?? {
|
||||||
|
path: 'localhost',
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -640,7 +667,24 @@ export class Subchannel {
|
||||||
headers[HTTP2_HEADER_METHOD] = 'POST';
|
headers[HTTP2_HEADER_METHOD] = 'POST';
|
||||||
headers[HTTP2_HEADER_PATH] = callStream.getMethod();
|
headers[HTTP2_HEADER_PATH] = callStream.getMethod();
|
||||||
headers[HTTP2_HEADER_TE] = 'trailers';
|
headers[HTTP2_HEADER_TE] = 'trailers';
|
||||||
const http2Stream = this.session!.request(headers);
|
let http2Stream: http2.ClientHttp2Stream;
|
||||||
|
/* In theory, if an error is thrown by session.request because session has
|
||||||
|
* become unusable (e.g. because it has received a goaway), this subchannel
|
||||||
|
* should soon see the corresponding close or goaway event anyway and leave
|
||||||
|
* READY. But we have seen reports that this does not happen
|
||||||
|
* (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096)
|
||||||
|
* so for defense in depth, we just discard the session when we see an
|
||||||
|
* error here.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
http2Stream = this.session!.request(headers);
|
||||||
|
} catch (e) {
|
||||||
|
this.transitionToState(
|
||||||
|
[ConnectivityState.READY],
|
||||||
|
ConnectivityState.TRANSIENT_FAILURE
|
||||||
|
);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
let headersString = '';
|
let headersString = '';
|
||||||
for (const header of Object.keys(headers)) {
|
for (const header of Object.keys(headers)) {
|
||||||
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
"prepublishOnly": "git submodule update --init --recursive && node copy_well_known_protos.js"
|
"prepublishOnly": "git submodule update --init --recursive && node copy_well_known_protos.js"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"node-pre-gyp": "^0.12.0"
|
"node-pre-gyp": "^0.15.0"
|
||||||
},
|
},
|
||||||
"binary": {
|
"binary": {
|
||||||
"module_name": "grpc_tools",
|
"module_name": "grpc_tools",
|
||||||
|
|
|
@ -406,23 +406,29 @@ export function loadSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load Google's well-known proto files that aren't exposed by Protobuf.js.
|
// Load Google's well-known proto files that aren't exposed by Protobuf.js.
|
||||||
{
|
|
||||||
// Protobuf.js exposes: any, duration, empty, field_mask, struct, timestamp,
|
// Protobuf.js exposes: any, duration, empty, field_mask, struct, timestamp,
|
||||||
// and wrappers. compiler/plugin is excluded in Protobuf.js and here.
|
// and wrappers. compiler/plugin is excluded in Protobuf.js and here.
|
||||||
const wellKnownProtos = ['api', 'descriptor', 'source_context', 'type'];
|
|
||||||
const sourceDir = path.join(
|
|
||||||
path.dirname(require.resolve('protobufjs')),
|
|
||||||
'google',
|
|
||||||
'protobuf'
|
|
||||||
);
|
|
||||||
|
|
||||||
for (const proto of wellKnownProtos) {
|
// Using constant strings for compatibility with tools like Webpack
|
||||||
const file = path.join(sourceDir, `${proto}.proto`);
|
const apiDescriptor = require('protobufjs/google/protobuf/api.json');
|
||||||
const descriptor = Protobuf.loadSync(file).toJSON();
|
const descriptorDescriptor = require('protobufjs/google/protobuf/descriptor.json');
|
||||||
|
const sourceContextDescriptor = require('protobufjs/google/protobuf/source_context.json');
|
||||||
|
const typeDescriptor = require('protobufjs/google/protobuf/type.json');
|
||||||
|
|
||||||
Protobuf.common(
|
Protobuf.common(
|
||||||
proto,
|
'api',
|
||||||
(descriptor.nested!.google as Protobuf.INamespace).nested!
|
apiDescriptor.nested.google.nested.protobuf.nested
|
||||||
|
);
|
||||||
|
Protobuf.common(
|
||||||
|
'descriptor',
|
||||||
|
descriptorDescriptor.nested.google.nested.protobuf.nested
|
||||||
|
);
|
||||||
|
Protobuf.common(
|
||||||
|
'source_context',
|
||||||
|
sourceContextDescriptor.nested.google.nested.protobuf.nested
|
||||||
|
);
|
||||||
|
Protobuf.common(
|
||||||
|
'type',
|
||||||
|
typeDescriptor.nested.google.nested.protobuf.nested
|
||||||
);
|
);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -19,3 +19,7 @@ import "google/protobuf/descriptor.proto";
|
||||||
extend google.protobuf.FieldOptions {
|
extend google.protobuf.FieldOptions {
|
||||||
bool redact = 52000;
|
bool redact = 52000;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message DescriptorHolder {
|
||||||
|
google.protobuf.DescriptorProto descriptor = 1;
|
||||||
|
}
|
||||||
|
|
|
@ -144,6 +144,30 @@ describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, functio
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
it('should receive all messages in a long stream', function(done) {
|
||||||
|
this.timeout(20000);
|
||||||
|
var arg = {
|
||||||
|
response_type: 'COMPRESSABLE',
|
||||||
|
response_parameters: [
|
||||||
|
]
|
||||||
|
};
|
||||||
|
for (let i = 0; i < 100000; i++) {
|
||||||
|
arg.response_parameters.push({size: 0});
|
||||||
|
}
|
||||||
|
var call = client.streamingOutputCall(arg);
|
||||||
|
let responseCount = 0;
|
||||||
|
call.on('data', (value) => {
|
||||||
|
responseCount++;
|
||||||
|
});
|
||||||
|
call.on('status', (status) => {
|
||||||
|
assert.strictEqual(status.code, grpc.status.OK);
|
||||||
|
assert.strictEqual(responseCount, arg.response_parameters.length);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
call.on('error', (error) => {
|
||||||
|
assert.ifError(error);
|
||||||
|
});
|
||||||
|
});
|
||||||
describe('max message size', function() {
|
describe('max message size', function() {
|
||||||
// A size that is larger than the default limit
|
// A size that is larger than the default limit
|
||||||
const largeMessageSize = 8 * 1024 * 1024;
|
const largeMessageSize = 8 * 1024 * 1024;
|
||||||
|
|
|
@ -39,9 +39,13 @@ AsyncDelayQueue.prototype.runNext = function() {
|
||||||
var continueCallback = _.bind(this.runNext, this);
|
var continueCallback = _.bind(this.runNext, this);
|
||||||
if (next) {
|
if (next) {
|
||||||
this.callback_pending = true;
|
this.callback_pending = true;
|
||||||
|
if (next.delay === 0) {
|
||||||
|
next.callback(continueCallback);
|
||||||
|
} else {
|
||||||
setTimeout(function() {
|
setTimeout(function() {
|
||||||
next.callback(continueCallback);
|
next.callback(continueCallback);
|
||||||
}, next.delay);
|
}, next.delay);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
this.callback_pending = false;
|
this.callback_pending = false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue