mirror of https://github.com/grpc/grpc-node.git
Merge remote-tracking branch 'upstream/@grpc/grpc-js@1.6.x' into v1.6.x_upmerge
This commit is contained in:
commit
4aa8ecf414
|
@ -726,7 +726,7 @@ export class XdsClient {
|
|||
if (serviceKind) {
|
||||
this.adsState[serviceKind].reportStreamError({
|
||||
code: status.UNAVAILABLE,
|
||||
details: message,
|
||||
details: message + ' Node ID=' + this.adsNodeV3!.id,
|
||||
metadata: new Metadata()
|
||||
});
|
||||
resourceNames = this.adsState[serviceKind].getResourceNames();
|
||||
|
@ -771,6 +771,7 @@ export class XdsClient {
|
|||
}
|
||||
|
||||
private reportStreamError(status: StatusObject) {
|
||||
status = {...status, details: status.details + ' Node ID=' + this.adsNodeV3!.id};
|
||||
this.adsState.eds.reportStreamError(status);
|
||||
this.adsState.cds.reportStreamError(status);
|
||||
this.adsState.rds.reportStreamError(status);
|
||||
|
@ -781,7 +782,6 @@ export class XdsClient {
|
|||
trace('Received LRS response');
|
||||
/* Once we get any response from the server, we assume that the stream is
|
||||
* in a good state, so we can reset the backoff timer. */
|
||||
this.lrsBackoff.stop();
|
||||
this.lrsBackoff.reset();
|
||||
if (
|
||||
!this.receivedLrsSettingsForCurrentStream ||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
|
||||
import { isIPv4, isIPv6 } from "net";
|
||||
import { Locality__Output } from "../generated/envoy/config/core/v3/Locality";
|
||||
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
|
||||
import { Any__Output } from "../generated/google/protobuf/Any";
|
||||
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
|
||||
|
@ -27,6 +28,10 @@ function trace(text: string): void {
|
|||
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
function localitiesEqual(a: Locality__Output, b: Locality__Output) {
|
||||
return a.region === b.region && a.sub_zone === b.sub_zone && a.zone === b.zone;
|
||||
}
|
||||
|
||||
export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
|
||||
public versionInfo = '';
|
||||
public nonce = '';
|
||||
|
@ -112,7 +117,17 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
|
|||
* @param message
|
||||
*/
|
||||
private validateResponse(message: ClusterLoadAssignment__Output) {
|
||||
const seenLocalities: {locality: Locality__Output, priority: number}[] = [];
|
||||
for (const endpoint of message.endpoints) {
|
||||
if (!endpoint.locality) {
|
||||
return false;
|
||||
}
|
||||
for (const {locality, priority} of seenLocalities) {
|
||||
if (localitiesEqual(endpoint.locality, locality) && endpoint.priority === priority) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
seenLocalities.push({locality: endpoint.locality, priority: endpoint.priority});
|
||||
for (const lb of endpoint.lb_endpoints) {
|
||||
const socketAddress = lb.endpoint?.address?.socket_address;
|
||||
if (!socketAddress) {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "@grpc/grpc-js",
|
||||
"version": "1.6.2",
|
||||
"version": "1.6.7",
|
||||
"description": "gRPC Library for Node - pure JS implementation",
|
||||
"homepage": "https://grpc.io/",
|
||||
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
|
||||
|
|
|
@ -100,6 +100,7 @@ export class BackoffTimeout {
|
|||
}
|
||||
|
||||
private runTimer(delay: number) {
|
||||
clearTimeout(this.timerId);
|
||||
this.timerId = setTimeout(() => {
|
||||
this.callback();
|
||||
this.running = false;
|
||||
|
|
|
@ -76,9 +76,11 @@ export type ClientDuplexStream<
|
|||
* error is not necessarily a problem in gRPC itself.
|
||||
* @param status
|
||||
*/
|
||||
export function callErrorFromStatus(status: StatusObject): ServiceError {
|
||||
export function callErrorFromStatus(status: StatusObject, callerStack: string): ServiceError {
|
||||
const message = `${status.code} ${Status[status.code]}: ${status.details}`;
|
||||
return Object.assign(new Error(message), status);
|
||||
const error = new Error(message);
|
||||
const stack = `${error.stack}\nfor call at\n${callerStack}`;
|
||||
return Object.assign(new Error(message), status, {stack});
|
||||
}
|
||||
|
||||
export class ClientUnaryCallImpl
|
||||
|
|
|
@ -20,6 +20,7 @@ import {
|
|||
Call,
|
||||
Http2CallStream,
|
||||
CallStreamOptions,
|
||||
StatusObject,
|
||||
} from './call-stream';
|
||||
import { ChannelCredentials } from './channel-credentials';
|
||||
import { ChannelOptions } from './channel-options';
|
||||
|
@ -170,6 +171,14 @@ export class ChannelImplementation implements Channel {
|
|||
*/
|
||||
private callRefTimer: NodeJS.Timer;
|
||||
private configSelector: ConfigSelector | null = null;
|
||||
/**
|
||||
* This is the error from the name resolver if it failed most recently. It
|
||||
* is only used to end calls that start while there is no config selector
|
||||
* and the name resolver is in backoff, so it should be nulled if
|
||||
* configSelector becomes set or the channel state becomes anything other
|
||||
* than TRANSIENT_FAILURE.
|
||||
*/
|
||||
private currentResolutionError: StatusObject | null = null;
|
||||
|
||||
// Channelz info
|
||||
private readonly channelzEnabled: boolean = true;
|
||||
|
@ -219,16 +228,9 @@ export class ChannelImplementation implements Channel {
|
|||
}
|
||||
|
||||
this.channelzTrace = new ChannelzTrace();
|
||||
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
|
||||
if (this.channelzEnabled) {
|
||||
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
|
||||
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
|
||||
} else {
|
||||
// Dummy channelz ref that will never be used
|
||||
this.channelzRef = {
|
||||
kind: 'channel',
|
||||
id: -1,
|
||||
name: ''
|
||||
};
|
||||
}
|
||||
|
||||
if (this.options['grpc.default_authority']) {
|
||||
|
@ -297,6 +299,7 @@ export class ChannelImplementation implements Channel {
|
|||
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
|
||||
}
|
||||
this.configSelector = configSelector;
|
||||
this.currentResolutionError = null;
|
||||
/* We process the queue asynchronously to ensure that the corresponding
|
||||
* load balancer update has completed. */
|
||||
process.nextTick(() => {
|
||||
|
@ -316,6 +319,9 @@ export class ChannelImplementation implements Channel {
|
|||
if (this.configSelectionQueue.length > 0) {
|
||||
this.trace('Name resolution failed with calls queued for config selection');
|
||||
}
|
||||
if (this.configSelector === null) {
|
||||
this.currentResolutionError = status;
|
||||
}
|
||||
const localQueue = this.configSelectionQueue;
|
||||
this.configSelectionQueue = [];
|
||||
this.callRefTimerUnref();
|
||||
|
@ -598,6 +604,9 @@ export class ChannelImplementation implements Channel {
|
|||
watcherObject.callback();
|
||||
}
|
||||
}
|
||||
if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
|
||||
this.currentResolutionError = null;
|
||||
}
|
||||
}
|
||||
|
||||
private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
|
||||
|
@ -612,11 +621,15 @@ export class ChannelImplementation implements Channel {
|
|||
* ResolvingLoadBalancer may be idle and if so it needs to be kicked
|
||||
* because it now has a pending request. */
|
||||
this.resolvingLoadBalancer.exitIdle();
|
||||
this.configSelectionQueue.push({
|
||||
callStream: stream,
|
||||
callMetadata: metadata,
|
||||
});
|
||||
this.callRefTimerRef();
|
||||
if (this.currentResolutionError && !metadata.getOptions().waitForReady) {
|
||||
stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details);
|
||||
} else {
|
||||
this.configSelectionQueue.push({
|
||||
callStream: stream,
|
||||
callMetadata: metadata,
|
||||
});
|
||||
this.callRefTimerRef();
|
||||
}
|
||||
} else {
|
||||
const callConfig = this.configSelector(stream.getMethod(), metadata);
|
||||
if (callConfig.status === Status.OK) {
|
||||
|
|
|
@ -347,31 +347,39 @@ const subchannels: (SubchannelEntry | undefined)[] = [];
|
|||
const servers: (ServerEntry | undefined)[] = [];
|
||||
const sockets: (SocketEntry | undefined)[] = [];
|
||||
|
||||
export function registerChannelzChannel(name: string, getInfo: () => ChannelInfo): ChannelRef {
|
||||
export function registerChannelzChannel(name: string, getInfo: () => ChannelInfo, channelzEnabled: boolean): ChannelRef {
|
||||
const id = getNextId();
|
||||
const ref: ChannelRef = {id, name, kind: 'channel'};
|
||||
channels[id] = { ref, getInfo };
|
||||
if (channelzEnabled) {
|
||||
channels[id] = { ref, getInfo };
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
export function registerChannelzSubchannel(name: string, getInfo:() => SubchannelInfo): SubchannelRef {
|
||||
export function registerChannelzSubchannel(name: string, getInfo:() => SubchannelInfo, channelzEnabled: boolean): SubchannelRef {
|
||||
const id = getNextId();
|
||||
const ref: SubchannelRef = {id, name, kind: 'subchannel'};
|
||||
subchannels[id] = { ref, getInfo };
|
||||
if (channelzEnabled) {
|
||||
subchannels[id] = { ref, getInfo };
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
export function registerChannelzServer(getInfo: () => ServerInfo): ServerRef {
|
||||
export function registerChannelzServer(getInfo: () => ServerInfo, channelzEnabled: boolean): ServerRef {
|
||||
const id = getNextId();
|
||||
const ref: ServerRef = {id, kind: 'server'};
|
||||
servers[id] = { ref, getInfo };
|
||||
if (channelzEnabled) {
|
||||
servers[id] = { ref, getInfo };
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
export function registerChannelzSocket(name: string, getInfo: () => SocketInfo): SocketRef {
|
||||
export function registerChannelzSocket(name: string, getInfo: () => SocketInfo, channelzEnabled: boolean): SocketRef {
|
||||
const id = getNextId();
|
||||
const ref: SocketRef = {id, name, kind: 'socket'};
|
||||
sockets[id] = { ref, getInfo};
|
||||
if (channelzEnabled) {
|
||||
sockets[id] = { ref, getInfo};
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
|
|
|
@ -321,6 +321,7 @@ export class Client {
|
|||
}
|
||||
let responseMessage: ResponseType | null = null;
|
||||
let receivedStatus = false;
|
||||
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
|
||||
call.start(callProperties.metadata, {
|
||||
onReceiveMetadata: (metadata) => {
|
||||
emitter.emit('metadata', metadata);
|
||||
|
@ -338,9 +339,17 @@ export class Client {
|
|||
}
|
||||
receivedStatus = true;
|
||||
if (status.code === Status.OK) {
|
||||
callProperties.callback!(null, responseMessage!);
|
||||
if (responseMessage === null) {
|
||||
callProperties.callback!(callErrorFromStatus({
|
||||
code: Status.INTERNAL,
|
||||
details: 'No message received',
|
||||
metadata: status.metadata
|
||||
}, callerStack));
|
||||
} else {
|
||||
callProperties.callback!(null, responseMessage);
|
||||
}
|
||||
} else {
|
||||
callProperties.callback!(callErrorFromStatus(status));
|
||||
callProperties.callback!(callErrorFromStatus(status, callerStack));
|
||||
}
|
||||
emitter.emit('status', status);
|
||||
},
|
||||
|
@ -438,6 +447,7 @@ export class Client {
|
|||
}
|
||||
let responseMessage: ResponseType | null = null;
|
||||
let receivedStatus = false;
|
||||
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
|
||||
call.start(callProperties.metadata, {
|
||||
onReceiveMetadata: (metadata) => {
|
||||
emitter.emit('metadata', metadata);
|
||||
|
@ -455,9 +465,17 @@ export class Client {
|
|||
}
|
||||
receivedStatus = true;
|
||||
if (status.code === Status.OK) {
|
||||
callProperties.callback!(null, responseMessage!);
|
||||
if (responseMessage === null) {
|
||||
callProperties.callback!(callErrorFromStatus({
|
||||
code: Status.INTERNAL,
|
||||
details: 'No message received',
|
||||
metadata: status.metadata
|
||||
}, callerStack));
|
||||
} else {
|
||||
callProperties.callback!(null, responseMessage);
|
||||
}
|
||||
} else {
|
||||
callProperties.callback!(callErrorFromStatus(status));
|
||||
callProperties.callback!(callErrorFromStatus(status, callerStack));
|
||||
}
|
||||
emitter.emit('status', status);
|
||||
},
|
||||
|
@ -559,6 +577,7 @@ export class Client {
|
|||
call.setCredentials(callProperties.callOptions.credentials);
|
||||
}
|
||||
let receivedStatus = false;
|
||||
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
|
||||
call.start(callProperties.metadata, {
|
||||
onReceiveMetadata(metadata: Metadata) {
|
||||
stream.emit('metadata', metadata);
|
||||
|
@ -574,7 +593,7 @@ export class Client {
|
|||
receivedStatus = true;
|
||||
stream.push(null);
|
||||
if (status.code !== Status.OK) {
|
||||
stream.emit('error', callErrorFromStatus(status));
|
||||
stream.emit('error', callErrorFromStatus(status, callerStack));
|
||||
}
|
||||
stream.emit('status', status);
|
||||
},
|
||||
|
@ -656,6 +675,7 @@ export class Client {
|
|||
call.setCredentials(callProperties.callOptions.credentials);
|
||||
}
|
||||
let receivedStatus = false;
|
||||
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
|
||||
call.start(callProperties.metadata, {
|
||||
onReceiveMetadata(metadata: Metadata) {
|
||||
stream.emit('metadata', metadata);
|
||||
|
@ -670,7 +690,7 @@ export class Client {
|
|||
receivedStatus = true;
|
||||
stream.push(null);
|
||||
if (status.code !== Status.OK) {
|
||||
stream.emit('error', callErrorFromStatus(status));
|
||||
stream.emit('error', callErrorFromStatus(status, callerStack));
|
||||
}
|
||||
stream.emit('status', status);
|
||||
},
|
||||
|
|
|
@ -334,17 +334,21 @@ class OutlierDetectionCounterFilterFactory implements FilterFactory<OutlierDetec
|
|||
}
|
||||
|
||||
class OutlierDetectionPicker implements Picker {
|
||||
constructor(private wrappedPicker: Picker) {}
|
||||
constructor(private wrappedPicker: Picker, private countCalls: boolean) {}
|
||||
pick(pickArgs: PickArgs): PickResult {
|
||||
const wrappedPick = this.wrappedPicker.pick(pickArgs);
|
||||
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
|
||||
const subchannelWrapper = wrappedPick.subchannel as OutlierDetectionSubchannelWrapper;
|
||||
const mapEntry = subchannelWrapper.getMapEntry();
|
||||
if (mapEntry) {
|
||||
const extraFilterFactories = [...wrappedPick.extraFilterFactories];
|
||||
if (this.countCalls) {
|
||||
extraFilterFactories.push(new OutlierDetectionCounterFilterFactory(mapEntry.counter));
|
||||
}
|
||||
return {
|
||||
...wrappedPick,
|
||||
subchannel: subchannelWrapper.getWrappedSubchannel(),
|
||||
extraFilterFactories: [...wrappedPick.extraFilterFactories, new OutlierDetectionCounterFilterFactory(mapEntry.counter)]
|
||||
extraFilterFactories: extraFilterFactories
|
||||
};
|
||||
} else {
|
||||
return wrappedPick;
|
||||
|
@ -361,6 +365,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
private addressMap: Map<string, MapEntry> = new Map<string, MapEntry>();
|
||||
private latestConfig: OutlierDetectionLoadBalancingConfig | null = null;
|
||||
private ejectionTimer: NodeJS.Timer;
|
||||
private timerStartTime: Date | null = null;
|
||||
|
||||
constructor(channelControlHelper: ChannelControlHelper) {
|
||||
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
|
||||
|
@ -373,7 +378,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
},
|
||||
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
||||
if (connectivityState === ConnectivityState.READY) {
|
||||
channelControlHelper.updateState(connectivityState, new OutlierDetectionPicker(picker));
|
||||
channelControlHelper.updateState(connectivityState, new OutlierDetectionPicker(picker, this.isCountingEnabled()));
|
||||
} else {
|
||||
channelControlHelper.updateState(connectivityState, picker);
|
||||
}
|
||||
|
@ -383,6 +388,12 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
clearInterval(this.ejectionTimer);
|
||||
}
|
||||
|
||||
private isCountingEnabled(): boolean {
|
||||
return this.latestConfig !== null &&
|
||||
(this.latestConfig.getSuccessRateEjectionConfig() !== null ||
|
||||
this.latestConfig.getFailurePercentageEjectionConfig() !== null);
|
||||
}
|
||||
|
||||
private getCurrentEjectionPercent() {
|
||||
let ejectionCount = 0;
|
||||
for (const mapEntry of this.addressMap.values()) {
|
||||
|
@ -418,7 +429,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
// Step 2
|
||||
const successRateMean = successRates.reduce((a, b) => a + b);
|
||||
const successRateMean = successRates.reduce((a, b) => a + b) / successRates.length;
|
||||
let successRateVariance = 0;
|
||||
for (const rate of successRates) {
|
||||
const deviation = rate - successRateMean;
|
||||
|
@ -501,16 +512,26 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
private runChecks() {
|
||||
const ejectionTimestamp = new Date();
|
||||
|
||||
private switchAllBuckets() {
|
||||
for (const mapEntry of this.addressMap.values()) {
|
||||
mapEntry.counter.switchBuckets();
|
||||
}
|
||||
}
|
||||
|
||||
private startTimer(delayMs: number) {
|
||||
this.ejectionTimer = setTimeout(() => this.runChecks(), delayMs);
|
||||
}
|
||||
|
||||
private runChecks() {
|
||||
const ejectionTimestamp = new Date();
|
||||
|
||||
this.switchAllBuckets();
|
||||
|
||||
if (!this.latestConfig) {
|
||||
return;
|
||||
}
|
||||
this.timerStartTime = ejectionTimestamp;
|
||||
this.startTimer(this.latestConfig.getIntervalMs());
|
||||
|
||||
this.runSuccessRateCheck(ejectionTimestamp);
|
||||
this.runFailurePercentageCheck(ejectionTimestamp);
|
||||
|
@ -561,10 +582,21 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
);
|
||||
this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
|
||||
|
||||
if (this.latestConfig === null || this.latestConfig.getIntervalMs() !== lbConfig.getIntervalMs()) {
|
||||
clearInterval(this.ejectionTimer);
|
||||
this.ejectionTimer = setInterval(() => this.runChecks(), lbConfig.getIntervalMs());
|
||||
if (lbConfig.getSuccessRateEjectionConfig() || lbConfig.getFailurePercentageEjectionConfig()) {
|
||||
if (this.timerStartTime) {
|
||||
clearTimeout(this.ejectionTimer);
|
||||
const remainingDelay = lbConfig.getIntervalMs() - ((new Date()).getTime() - this.timerStartTime.getTime());
|
||||
this.startTimer(remainingDelay);
|
||||
} else {
|
||||
this.timerStartTime = new Date();
|
||||
this.startTimer(lbConfig.getIntervalMs());
|
||||
this.switchAllBuckets();
|
||||
}
|
||||
} else {
|
||||
this.timerStartTime = null;
|
||||
clearTimeout(this.ejectionTimer);
|
||||
}
|
||||
|
||||
this.latestConfig = lbConfig;
|
||||
}
|
||||
exitIdle(): void {
|
||||
|
@ -574,6 +606,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
this.childBalancer.resetBackoff();
|
||||
}
|
||||
destroy(): void {
|
||||
clearTimeout(this.ejectionTimer);
|
||||
this.childBalancer.destroy();
|
||||
}
|
||||
getTypeName(): string {
|
||||
|
|
|
@ -184,8 +184,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
) {
|
||||
/* If all of the subchannels are IDLE we should go back to a
|
||||
* basic IDLE state where there is no subchannel list to avoid
|
||||
* holding unused resources */
|
||||
this.resetSubchannelList();
|
||||
* holding unused resources. We do not reset triedAllSubchannels
|
||||
* because that is a reminder to request reresolution the next time
|
||||
* this LB policy needs to connect. */
|
||||
this.resetSubchannelList(false);
|
||||
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
||||
return;
|
||||
}
|
||||
|
@ -337,7 +339,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
this.channelControlHelper.updateState(newState, picker);
|
||||
}
|
||||
|
||||
private resetSubchannelList() {
|
||||
private resetSubchannelList(resetTriedAllSubchannels = true) {
|
||||
for (const subchannel of this.subchannels) {
|
||||
subchannel.removeConnectivityStateListener(this.subchannelStateListener);
|
||||
subchannel.unref();
|
||||
|
@ -352,7 +354,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
[ConnectivityState.TRANSIENT_FAILURE]: 0,
|
||||
};
|
||||
this.subchannels = [];
|
||||
this.triedAllSubchannels = false;
|
||||
if (resetTriedAllSubchannels) {
|
||||
this.triedAllSubchannels = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -425,6 +429,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
exitIdle() {
|
||||
if (
|
||||
this.currentState === ConnectivityState.IDLE ||
|
||||
this.triedAllSubchannels
|
||||
) {
|
||||
this.channelControlHelper.requestReresolution();
|
||||
}
|
||||
for (const subchannel of this.subchannels) {
|
||||
subchannel.startConnecting();
|
||||
}
|
||||
|
@ -433,12 +443,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
this.connectToAddressList();
|
||||
}
|
||||
}
|
||||
if (
|
||||
this.currentState === ConnectivityState.IDLE ||
|
||||
this.triedAllSubchannels
|
||||
) {
|
||||
this.channelControlHelper.requestReresolution();
|
||||
}
|
||||
}
|
||||
|
||||
resetBackoff() {
|
||||
|
|
|
@ -158,7 +158,6 @@ class DnsResolver implements Resolver {
|
|||
if (this.ipResult !== null) {
|
||||
trace('Returning IP address for target ' + uriToString(this.target));
|
||||
setImmediate(() => {
|
||||
this.backoff.reset();
|
||||
this.listener.onSuccessfulResolution(
|
||||
this.ipResult!,
|
||||
null,
|
||||
|
@ -167,6 +166,8 @@ class DnsResolver implements Resolver {
|
|||
{}
|
||||
);
|
||||
});
|
||||
this.backoff.stop();
|
||||
this.backoff.reset();
|
||||
return;
|
||||
}
|
||||
if (this.dnsHostname === null) {
|
||||
|
@ -178,7 +179,11 @@ class DnsResolver implements Resolver {
|
|||
metadata: new Metadata(),
|
||||
});
|
||||
});
|
||||
this.stopNextResolutionTimer();
|
||||
} else {
|
||||
if (this.pendingLookupPromise !== null) {
|
||||
return;
|
||||
}
|
||||
trace('Looking up DNS hostname ' + this.dnsHostname);
|
||||
/* We clear out latestLookupResult here to ensure that it contains the
|
||||
* latest result since the last time we started resolving. That way, the
|
||||
|
@ -299,6 +304,7 @@ class DnsResolver implements Resolver {
|
|||
}
|
||||
|
||||
private startNextResolutionTimer() {
|
||||
clearTimeout(this.nextResolutionTimer);
|
||||
this.nextResolutionTimer = setTimeout(() => {
|
||||
this.stopNextResolutionTimer();
|
||||
if (this.continueResolving) {
|
||||
|
@ -314,9 +320,12 @@ class DnsResolver implements Resolver {
|
|||
}
|
||||
|
||||
private startResolutionWithBackoff() {
|
||||
if (this.pendingLookupPromise === null) {
|
||||
this.continueResolving = false;
|
||||
this.startResolution();
|
||||
this.backoff.runOnce();
|
||||
this.startNextResolutionTimer();
|
||||
}
|
||||
}
|
||||
|
||||
updateResolution() {
|
||||
|
|
|
@ -268,6 +268,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
|||
if (this.currentState === ConnectivityState.IDLE) {
|
||||
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
|
||||
}
|
||||
this.backoffTimeout.runOnce();
|
||||
}
|
||||
|
||||
private updateState(connectivityState: ConnectivityState, picker: Picker) {
|
||||
|
@ -294,19 +295,17 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
|||
);
|
||||
this.onFailedResolution(error);
|
||||
}
|
||||
this.backoffTimeout.runOnce();
|
||||
}
|
||||
|
||||
exitIdle() {
|
||||
this.childLoadBalancer.exitIdle();
|
||||
if (this.currentState === ConnectivityState.IDLE) {
|
||||
if (this.currentState === ConnectivityState.IDLE || this.currentState === ConnectivityState.TRANSIENT_FAILURE) {
|
||||
if (this.backoffTimeout.isRunning()) {
|
||||
this.continueResolving = true;
|
||||
} else {
|
||||
this.updateResolution();
|
||||
}
|
||||
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
|
||||
}
|
||||
this.childLoadBalancer.exitIdle();
|
||||
}
|
||||
|
||||
updateAddressList(
|
||||
|
|
|
@ -161,17 +161,11 @@ export class Server {
|
|||
if (this.options['grpc.enable_channelz'] === 0) {
|
||||
this.channelzEnabled = false;
|
||||
}
|
||||
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo(), this.channelzEnabled);
|
||||
if (this.channelzEnabled) {
|
||||
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo());
|
||||
this.channelzTrace.addTrace('CT_INFO', 'Server created');
|
||||
this.trace('Server constructed');
|
||||
} else {
|
||||
// Dummy channelz ref that will never be used
|
||||
this.channelzRef = {
|
||||
kind: 'server',
|
||||
id: -1
|
||||
};
|
||||
}
|
||||
this.trace('Server constructed');
|
||||
}
|
||||
|
||||
private getChannelzInfo(): ServerInfo {
|
||||
|
@ -431,34 +425,28 @@ export class Server {
|
|||
}
|
||||
}
|
||||
let channelzRef: SocketRef;
|
||||
if (this.channelzEnabled) {
|
||||
channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
|
||||
return {
|
||||
localAddress: boundSubchannelAddress,
|
||||
remoteAddress: null,
|
||||
security: null,
|
||||
remoteName: null,
|
||||
streamsStarted: 0,
|
||||
streamsSucceeded: 0,
|
||||
streamsFailed: 0,
|
||||
messagesSent: 0,
|
||||
messagesReceived: 0,
|
||||
keepAlivesSent: 0,
|
||||
lastLocalStreamCreatedTimestamp: null,
|
||||
lastRemoteStreamCreatedTimestamp: null,
|
||||
lastMessageSentTimestamp: null,
|
||||
lastMessageReceivedTimestamp: null,
|
||||
localFlowControlWindow: null,
|
||||
remoteFlowControlWindow: null
|
||||
};
|
||||
});
|
||||
this.listenerChildrenTracker.refChild(channelzRef);
|
||||
} else {
|
||||
channelzRef = {
|
||||
kind: 'socket',
|
||||
id: -1,
|
||||
name: ''
|
||||
channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
|
||||
return {
|
||||
localAddress: boundSubchannelAddress,
|
||||
remoteAddress: null,
|
||||
security: null,
|
||||
remoteName: null,
|
||||
streamsStarted: 0,
|
||||
streamsSucceeded: 0,
|
||||
streamsFailed: 0,
|
||||
messagesSent: 0,
|
||||
messagesReceived: 0,
|
||||
keepAlivesSent: 0,
|
||||
lastLocalStreamCreatedTimestamp: null,
|
||||
lastRemoteStreamCreatedTimestamp: null,
|
||||
lastMessageSentTimestamp: null,
|
||||
lastMessageReceivedTimestamp: null,
|
||||
localFlowControlWindow: null,
|
||||
remoteFlowControlWindow: null
|
||||
};
|
||||
}, this.channelzEnabled);
|
||||
if (this.channelzEnabled) {
|
||||
this.listenerChildrenTracker.refChild(channelzRef);
|
||||
}
|
||||
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
|
||||
this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
|
||||
|
@ -509,34 +497,28 @@ export class Server {
|
|||
port: boundAddress.port
|
||||
};
|
||||
let channelzRef: SocketRef;
|
||||
if (this.channelzEnabled) {
|
||||
channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
|
||||
return {
|
||||
localAddress: boundSubchannelAddress,
|
||||
remoteAddress: null,
|
||||
security: null,
|
||||
remoteName: null,
|
||||
streamsStarted: 0,
|
||||
streamsSucceeded: 0,
|
||||
streamsFailed: 0,
|
||||
messagesSent: 0,
|
||||
messagesReceived: 0,
|
||||
keepAlivesSent: 0,
|
||||
lastLocalStreamCreatedTimestamp: null,
|
||||
lastRemoteStreamCreatedTimestamp: null,
|
||||
lastMessageSentTimestamp: null,
|
||||
lastMessageReceivedTimestamp: null,
|
||||
localFlowControlWindow: null,
|
||||
remoteFlowControlWindow: null
|
||||
};
|
||||
});
|
||||
this.listenerChildrenTracker.refChild(channelzRef);
|
||||
} else {
|
||||
channelzRef = {
|
||||
kind: 'socket',
|
||||
id: -1,
|
||||
name: ''
|
||||
channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
|
||||
return {
|
||||
localAddress: boundSubchannelAddress,
|
||||
remoteAddress: null,
|
||||
security: null,
|
||||
remoteName: null,
|
||||
streamsStarted: 0,
|
||||
streamsSucceeded: 0,
|
||||
streamsFailed: 0,
|
||||
messagesSent: 0,
|
||||
messagesReceived: 0,
|
||||
keepAlivesSent: 0,
|
||||
lastLocalStreamCreatedTimestamp: null,
|
||||
lastRemoteStreamCreatedTimestamp: null,
|
||||
lastMessageSentTimestamp: null,
|
||||
lastMessageReceivedTimestamp: null,
|
||||
localFlowControlWindow: null,
|
||||
remoteFlowControlWindow: null
|
||||
};
|
||||
}, this.channelzEnabled);
|
||||
if (this.channelzEnabled) {
|
||||
this.listenerChildrenTracker.refChild(channelzRef);
|
||||
}
|
||||
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
|
||||
this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
|
||||
|
@ -893,15 +875,7 @@ export class Server {
|
|||
}
|
||||
|
||||
let channelzRef: SocketRef;
|
||||
if (this.channelzEnabled) {
|
||||
channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session));
|
||||
} else {
|
||||
channelzRef = {
|
||||
kind: 'socket',
|
||||
id: -1,
|
||||
name: ''
|
||||
}
|
||||
}
|
||||
channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
|
||||
|
||||
const channelzSessionInfo: ChannelzSessionInfo = {
|
||||
ref: channelzRef,
|
||||
|
|
|
@ -49,10 +49,8 @@ export class SubchannelPool {
|
|||
/**
|
||||
* A pool of subchannels use for making connections. Subchannels with the
|
||||
* exact same parameters will be reused.
|
||||
* @param global If true, this is the global subchannel pool. Otherwise, it
|
||||
* is the pool for a single channel.
|
||||
*/
|
||||
constructor(private global: boolean) {}
|
||||
constructor() {}
|
||||
|
||||
/**
|
||||
* Unrefs all unused subchannels and cancels the cleanup task if all
|
||||
|
@ -95,7 +93,7 @@ export class SubchannelPool {
|
|||
* Ensures that the cleanup task is spawned.
|
||||
*/
|
||||
ensureCleanupTask(): void {
|
||||
if (this.global && this.cleanupTimer === null) {
|
||||
if (this.cleanupTimer === null) {
|
||||
this.cleanupTimer = setInterval(() => {
|
||||
this.unrefUnusedSubchannels();
|
||||
}, REF_CHECK_INTERVAL);
|
||||
|
@ -156,14 +154,12 @@ export class SubchannelPool {
|
|||
channelCredentials,
|
||||
subchannel,
|
||||
});
|
||||
if (this.global) {
|
||||
subchannel.ref();
|
||||
}
|
||||
subchannel.ref();
|
||||
return subchannel;
|
||||
}
|
||||
}
|
||||
|
||||
const globalSubchannelPool = new SubchannelPool(true);
|
||||
const globalSubchannelPool = new SubchannelPool();
|
||||
|
||||
/**
|
||||
* Get either the global subchannel pool, or a new subchannel pool.
|
||||
|
@ -173,6 +169,6 @@ export function getSubchannelPool(global: boolean): SubchannelPool {
|
|||
if (global) {
|
||||
return globalSubchannelPool;
|
||||
} else {
|
||||
return new SubchannelPool(false);
|
||||
return new SubchannelPool();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ export class Subchannel {
|
|||
* socket disconnects. Used for ending active calls with an UNAVAILABLE
|
||||
* status.
|
||||
*/
|
||||
private disconnectListeners: Array<() => void> = [];
|
||||
private disconnectListeners: Set<() => void> = new Set();
|
||||
|
||||
private backoffTimeout: BackoffTimeout;
|
||||
|
||||
|
@ -227,16 +227,9 @@ export class Subchannel {
|
|||
this.channelzEnabled = false;
|
||||
}
|
||||
this.channelzTrace = new ChannelzTrace();
|
||||
this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
|
||||
if (this.channelzEnabled) {
|
||||
this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo());
|
||||
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
|
||||
} else {
|
||||
// Dummy channelz ref that will never be used
|
||||
this.channelzRef = {
|
||||
kind: 'subchannel',
|
||||
id: -1,
|
||||
name: ''
|
||||
};
|
||||
}
|
||||
this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
|
||||
}
|
||||
|
@ -328,6 +321,10 @@ export class Subchannel {
|
|||
logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
|
||||
}
|
||||
|
||||
private keepaliveTrace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
|
||||
}
|
||||
|
||||
private handleBackoffTimer() {
|
||||
if (this.continueConnecting) {
|
||||
this.transitionToState(
|
||||
|
@ -358,18 +355,15 @@ export class Subchannel {
|
|||
if (this.channelzEnabled) {
|
||||
this.keepalivesSent += 1;
|
||||
}
|
||||
logging.trace(
|
||||
LogVerbosity.DEBUG,
|
||||
'keepalive',
|
||||
'(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' +
|
||||
'Sending ping'
|
||||
);
|
||||
this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms');
|
||||
this.keepaliveTimeoutId = setTimeout(() => {
|
||||
this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
|
||||
this.keepaliveTrace('Ping timeout passed without response');
|
||||
this.handleDisconnect();
|
||||
}, this.keepaliveTimeoutMs);
|
||||
this.keepaliveTimeoutId.unref?.();
|
||||
this.session!.ping(
|
||||
(err: Error | null, duration: number, payload: Buffer) => {
|
||||
this.keepaliveTrace('Received ping response');
|
||||
clearTimeout(this.keepaliveTimeoutId);
|
||||
}
|
||||
);
|
||||
|
@ -384,6 +378,11 @@ export class Subchannel {
|
|||
* sending pings should also involve some network activity. */
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop keepalive pings when terminating a connection. This discards the
|
||||
* outstanding ping timeout, so it should not be called if the same
|
||||
* connection will still be used.
|
||||
*/
|
||||
private stopKeepalivePings() {
|
||||
clearInterval(this.keepaliveIntervalId);
|
||||
clearTimeout(this.keepaliveTimeoutId);
|
||||
|
@ -407,6 +406,12 @@ export class Subchannel {
|
|||
connectionOptions.maxSessionMemory = this.options[
|
||||
'grpc-node.max_session_memory'
|
||||
];
|
||||
} else {
|
||||
/* By default, set a very large max session memory limit, to effectively
|
||||
* disable enforcement of the limit. Some testing indicates that Node's
|
||||
* behavior degrades badly when this limit is reached, so we solve that
|
||||
* by disabling the check entirely. */
|
||||
connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
|
||||
}
|
||||
let addressScheme = 'http://';
|
||||
if ('secureContext' in connectionOptions) {
|
||||
|
@ -484,8 +489,8 @@ export class Subchannel {
|
|||
connectionOptions
|
||||
);
|
||||
this.session = session;
|
||||
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!, this.channelzEnabled);
|
||||
if (this.channelzEnabled) {
|
||||
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!);
|
||||
this.childrenTracker.refChild(this.channelzSocketRef);
|
||||
}
|
||||
session.unref();
|
||||
|
@ -637,6 +642,15 @@ export class Subchannel {
|
|||
);
|
||||
}
|
||||
|
||||
private handleDisconnect() {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.READY],
|
||||
ConnectivityState.TRANSIENT_FAILURE);
|
||||
for (const listener of this.disconnectListeners.values()) {
|
||||
listener();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiate a state transition from any element of oldStates to the new
|
||||
* state. If the current connectivityState is not in oldStates, do nothing.
|
||||
|
@ -667,12 +681,7 @@ export class Subchannel {
|
|||
const session = this.session!;
|
||||
session.socket.once('close', () => {
|
||||
if (this.session === session) {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.READY],
|
||||
ConnectivityState.TRANSIENT_FAILURE);
|
||||
for (const listener of this.disconnectListeners) {
|
||||
listener();
|
||||
}
|
||||
this.handleDisconnect();
|
||||
}
|
||||
});
|
||||
if (this.keepaliveWithoutCalls) {
|
||||
|
@ -732,7 +741,7 @@ export class Subchannel {
|
|||
}
|
||||
this.transitionToState(
|
||||
[ConnectivityState.CONNECTING, ConnectivityState.READY],
|
||||
ConnectivityState.TRANSIENT_FAILURE
|
||||
ConnectivityState.IDLE
|
||||
);
|
||||
if (this.channelzEnabled) {
|
||||
unregisterChannelzRef(this.channelzRef);
|
||||
|
@ -773,7 +782,7 @@ export class Subchannel {
|
|||
}
|
||||
this.backoffTimeout.unref();
|
||||
if (!this.keepaliveWithoutCalls) {
|
||||
this.stopKeepalivePings();
|
||||
clearInterval(this.keepaliveIntervalId);
|
||||
}
|
||||
this.checkBothRefcounts();
|
||||
}
|
||||
|
@ -962,14 +971,11 @@ export class Subchannel {
|
|||
}
|
||||
|
||||
addDisconnectListener(listener: () => void) {
|
||||
this.disconnectListeners.push(listener);
|
||||
this.disconnectListeners.add(listener);
|
||||
}
|
||||
|
||||
removeDisconnectListener(listener: () => void) {
|
||||
const listenerIndex = this.disconnectListeners.indexOf(listener);
|
||||
if (listenerIndex > -1) {
|
||||
this.disconnectListeners.splice(listenerIndex, 1);
|
||||
}
|
||||
this.disconnectListeners.delete(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,6 +25,7 @@ message Request {
|
|||
|
||||
message Response {
|
||||
int32 count = 1;
|
||||
string message = 2;
|
||||
}
|
||||
|
||||
service TestService {
|
||||
|
|
|
@ -3,8 +3,10 @@
|
|||
|
||||
export interface Response {
|
||||
'count'?: (number);
|
||||
'message'?: (string);
|
||||
}
|
||||
|
||||
export interface Response__Output {
|
||||
'count': (number);
|
||||
'message': (string);
|
||||
}
|
||||
|
|
|
@ -11,28 +11,28 @@ export interface TestServiceClient extends grpc.Client {
|
|||
bidiStream(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_Request, _Response__Output>;
|
||||
bidiStream(options?: grpc.CallOptions): grpc.ClientDuplexStream<_Request, _Response__Output>;
|
||||
|
||||
ClientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
|
||||
ClientStream(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
|
||||
ClientStream(options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
|
||||
ClientStream(callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
|
||||
clientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
|
||||
clientStream(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
|
||||
clientStream(options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
|
||||
clientStream(callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
|
||||
ClientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
|
||||
ClientStream(metadata: grpc.Metadata, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
|
||||
ClientStream(options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
|
||||
ClientStream(callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
|
||||
clientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
|
||||
clientStream(metadata: grpc.Metadata, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
|
||||
clientStream(options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
|
||||
clientStream(callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
|
||||
|
||||
ServerStream(argument: _Request, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_Response__Output>;
|
||||
ServerStream(argument: _Request, options?: grpc.CallOptions): grpc.ClientReadableStream<_Response__Output>;
|
||||
serverStream(argument: _Request, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_Response__Output>;
|
||||
serverStream(argument: _Request, options?: grpc.CallOptions): grpc.ClientReadableStream<_Response__Output>;
|
||||
|
||||
Unary(argument: _Request, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
|
||||
Unary(argument: _Request, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
|
||||
Unary(argument: _Request, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
|
||||
Unary(argument: _Request, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
|
||||
unary(argument: _Request, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
|
||||
unary(argument: _Request, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
|
||||
unary(argument: _Request, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
|
||||
unary(argument: _Request, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
|
||||
Unary(argument: _Request, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
|
||||
Unary(argument: _Request, metadata: grpc.Metadata, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
|
||||
Unary(argument: _Request, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
|
||||
Unary(argument: _Request, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
|
||||
unary(argument: _Request, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
|
||||
unary(argument: _Request, metadata: grpc.Metadata, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
|
||||
unary(argument: _Request, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
|
||||
unary(argument: _Request, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -92,4 +92,28 @@ describe('Client without a server', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Client with a nonexistent target domain', () => {
|
||||
let client: Client;
|
||||
before(() => {
|
||||
// DNS name that does not exist per RFC 6761 section 6.4
|
||||
client = new Client('host.invalid', clientInsecureCreds);
|
||||
});
|
||||
after(() => {
|
||||
client.close();
|
||||
});
|
||||
it('should fail multiple calls', function(done) {
|
||||
this.timeout(5000);
|
||||
// Regression test for https://github.com/grpc/grpc-node/issues/1411
|
||||
client.makeUnaryRequest('/service/method', x => x, x => x, Buffer.from([]), (error, value) => {
|
||||
assert(error);
|
||||
assert.strictEqual(error?.code, grpc.status.UNAVAILABLE);
|
||||
client.makeUnaryRequest('/service/method', x => x, x => x, Buffer.from([]), (error, value) => {
|
||||
assert(error);
|
||||
assert.strictEqual(error?.code, grpc.status.UNAVAILABLE);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -356,6 +356,70 @@ describe('Name Resolver', () => {
|
|||
const resolver2 = resolverManager.createResolver(target2, listener, {});
|
||||
resolver2.updateResolution();
|
||||
});
|
||||
it('should not keep repeating successful resolutions', done => {
|
||||
const target = resolverManager.mapUriDefaultScheme(parseUri('localhost')!)!;
|
||||
let resultCount = 0;
|
||||
const resolver = resolverManager.createResolver(target, {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
);
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
);
|
||||
resultCount += 1;
|
||||
if (resultCount === 1) {
|
||||
process.nextTick(() => resolver.updateResolution());
|
||||
}
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
assert.ifError(error);
|
||||
},
|
||||
}, {'grpc.dns_min_time_between_resolutions_ms': 2000});
|
||||
resolver.updateResolution();
|
||||
setTimeout(() => {
|
||||
assert.strictEqual(resultCount, 2, `resultCount ${resultCount} !== 2`);
|
||||
done();
|
||||
}, 10_000);
|
||||
}).timeout(15_000);
|
||||
it('should not keep repeating failed resolutions', done => {
|
||||
const target = resolverManager.mapUriDefaultScheme(parseUri('host.invalid')!)!;
|
||||
let resultCount = 0;
|
||||
const resolver = resolverManager.createResolver(target, {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
assert.fail('Resolution succeeded unexpectedly');
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
resultCount += 1;
|
||||
if (resultCount === 1) {
|
||||
process.nextTick(() => resolver.updateResolution());
|
||||
}
|
||||
},
|
||||
}, {});
|
||||
resolver.updateResolution();
|
||||
setTimeout(() => {
|
||||
assert.strictEqual(resultCount, 2, `resultCount ${resultCount} !== 2`);
|
||||
done();
|
||||
}, 10_000);
|
||||
}).timeout(15_000);
|
||||
});
|
||||
describe('UDS Names', () => {
|
||||
it('Should handle a relative Unix Domain Socket name', done => {
|
||||
|
|
|
@ -623,7 +623,7 @@ describe('Generic client and server', () => {
|
|||
describe('Compressed requests', () => {
|
||||
const testServiceHandlers: TestServiceHandlers = {
|
||||
Unary(call, callback) {
|
||||
callback(null, { count: 500000 });
|
||||
callback(null, { count: 500000, message: call.request.message });
|
||||
},
|
||||
|
||||
ClientStream(call, callback) {
|
||||
|
@ -847,6 +847,20 @@ describe('Compressed requests', () => {
|
|||
})
|
||||
});
|
||||
});
|
||||
|
||||
it('Should handle large messages', done => {
|
||||
let longMessage = '';
|
||||
for (let i = 0; i < 400000; i++) {
|
||||
const letter = 'abcdefghijklmnopqrstuvwxyz'[Math.floor(Math.random() * 26)];
|
||||
longMessage = longMessage + letter.repeat(10);
|
||||
}
|
||||
|
||||
client.unary({message: longMessage}, (err, response) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(response?.message, longMessage);
|
||||
done();
|
||||
})
|
||||
})
|
||||
|
||||
/* As of Node 16, Writable and Duplex streams validate the encoding
|
||||
* argument to write, and the flags values we are passing there are not
|
||||
|
|
Loading…
Reference in New Issue