mirror of https://github.com/grpc/grpc-node.git
grpc-js-xds: Update code to handle modified experimental APIs
This commit is contained in:
parent
67bec19b4e
commit
8a312e63b7
|
@ -231,7 +231,7 @@ const NUMBER_REGEX = /\d+/;
|
||||||
let totalActiveFaults = 0;
|
let totalActiveFaults = 0;
|
||||||
|
|
||||||
class FaultInjectionFilter extends BaseFilter implements Filter {
|
class FaultInjectionFilter extends BaseFilter implements Filter {
|
||||||
constructor(private callStream: CallStream, private config: FaultInjectionConfig) {
|
constructor(private config: FaultInjectionConfig) {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -316,7 +316,7 @@ class FaultInjectionFilter extends BaseFilter implements Filter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (abortStatus !== null && rollRandomPercentage(numerator, denominator)) {
|
if (abortStatus !== null && rollRandomPercentage(numerator, denominator)) {
|
||||||
this.callStream.cancelWithStatus(abortStatus, 'Fault injected');
|
return Promise.reject({code: abortStatus, details: 'Fault injected', metadata: new Metadata()});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return metadata;
|
return metadata;
|
||||||
|
@ -333,8 +333,8 @@ class FaultInjectionFilterFactory implements FilterFactory<FaultInjectionFilter>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
createFilter(callStream: experimental.CallStream): FaultInjectionFilter {
|
createFilter(): FaultInjectionFilter {
|
||||||
return new FaultInjectionFilter(callStream, this.config);
|
return new FaultInjectionFilter(this.config);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ class RouterFilter extends BaseFilter implements Filter {}
|
||||||
class RouterFilterFactory implements FilterFactory<RouterFilter> {
|
class RouterFilterFactory implements FilterFactory<RouterFilter> {
|
||||||
constructor(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig) {}
|
constructor(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig) {}
|
||||||
|
|
||||||
createFilter(callStream: experimental.CallStream): RouterFilter {
|
createFilter(): RouterFilter {
|
||||||
return new RouterFilter();
|
return new RouterFilter();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,24 +146,6 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class CallEndTrackingFilter extends BaseFilter implements Filter {
|
|
||||||
constructor(private onCallEnd: () => void) {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
receiveTrailers(status: StatusObject) {
|
|
||||||
this.onCallEnd();
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class CallTrackingFilterFactory implements FilterFactory<CallEndTrackingFilter> {
|
|
||||||
constructor(private onCallEnd: () => void) {}
|
|
||||||
|
|
||||||
createFilter(callStream: CallStream) {
|
|
||||||
return new CallEndTrackingFilter(this.onCallEnd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class load balances over a cluster by making an EDS request and then
|
* This class load balances over a cluster by making an EDS request and then
|
||||||
* transforming the result into a configuration for another load balancing
|
* transforming the result into a configuration for another load balancing
|
||||||
|
@ -217,9 +199,6 @@ export class EdsLoadBalancer implements LoadBalancer {
|
||||||
* balancer. */
|
* balancer. */
|
||||||
if (dropCategory === null) {
|
if (dropCategory === null) {
|
||||||
const originalPick = originalPicker.pick(pickArgs);
|
const originalPick = originalPicker.pick(pickArgs);
|
||||||
const trackingFilterFactory: FilterFactory<Filter> = new CallTrackingFilterFactory(() => {
|
|
||||||
this.concurrentRequests -= 1;
|
|
||||||
});
|
|
||||||
return {
|
return {
|
||||||
pickResultType: originalPick.pickResultType,
|
pickResultType: originalPick.pickResultType,
|
||||||
status: originalPick.status,
|
status: originalPick.status,
|
||||||
|
@ -228,7 +207,10 @@ export class EdsLoadBalancer implements LoadBalancer {
|
||||||
originalPick.onCallStarted?.();
|
originalPick.onCallStarted?.();
|
||||||
this.concurrentRequests += 1;
|
this.concurrentRequests += 1;
|
||||||
},
|
},
|
||||||
extraFilterFactories: originalPick.extraFilterFactories.concat(trackingFilterFactory)
|
onCallEnded: status => {
|
||||||
|
originalPick.onCallEnded?.(status);
|
||||||
|
this.concurrentRequests -= 1;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
let details: string;
|
let details: string;
|
||||||
|
@ -247,7 +229,7 @@ export class EdsLoadBalancer implements LoadBalancer {
|
||||||
metadata: new Metadata(),
|
metadata: new Metadata(),
|
||||||
},
|
},
|
||||||
subchannel: null,
|
subchannel: null,
|
||||||
extraFilterFactories: [],
|
onCallEnded: null,
|
||||||
onCallStarted: null
|
onCallStarted: null
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,29 +108,6 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Filter class that reports when the call ends.
|
|
||||||
*/
|
|
||||||
class CallEndTrackingFilter extends BaseFilter implements Filter {
|
|
||||||
constructor(private localityStatsReporter: XdsClusterLocalityStats) {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
receiveTrailers(status: StatusObject) {
|
|
||||||
this.localityStatsReporter.addCallFinished(status.code !== Status.OK);
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class CallEndTrackingFilterFactory
|
|
||||||
implements FilterFactory<CallEndTrackingFilter> {
|
|
||||||
constructor(private localityStatsReporter: XdsClusterLocalityStats) {}
|
|
||||||
|
|
||||||
createFilter(callStream: Call): CallEndTrackingFilter {
|
|
||||||
return new CallEndTrackingFilter(this.localityStatsReporter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Picker that delegates picking to another picker, and reports when calls
|
* Picker that delegates picking to another picker, and reports when calls
|
||||||
* created using those picks start and end.
|
* created using those picks start and end.
|
||||||
|
@ -144,9 +121,6 @@ class LoadReportingPicker implements Picker {
|
||||||
pick(pickArgs: PickArgs): PickResult {
|
pick(pickArgs: PickArgs): PickResult {
|
||||||
const wrappedPick = this.wrappedPicker.pick(pickArgs);
|
const wrappedPick = this.wrappedPicker.pick(pickArgs);
|
||||||
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
|
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
|
||||||
const trackingFilterFactory = new CallEndTrackingFilterFactory(
|
|
||||||
this.localityStatsReporter
|
|
||||||
);
|
|
||||||
return {
|
return {
|
||||||
pickResultType: PickResultType.COMPLETE,
|
pickResultType: PickResultType.COMPLETE,
|
||||||
subchannel: wrappedPick.subchannel,
|
subchannel: wrappedPick.subchannel,
|
||||||
|
@ -155,7 +129,10 @@ class LoadReportingPicker implements Picker {
|
||||||
wrappedPick.onCallStarted?.();
|
wrappedPick.onCallStarted?.();
|
||||||
this.localityStatsReporter.addCallStarted();
|
this.localityStatsReporter.addCallStarted();
|
||||||
},
|
},
|
||||||
extraFilterFactories: wrappedPick.extraFilterFactories.concat(trackingFilterFactory),
|
onCallEnded: status => {
|
||||||
|
wrappedPick.onCallEnded?.(status);
|
||||||
|
this.localityStatsReporter.addCallFinished(status !== Status.OK);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
return wrappedPick;
|
return wrappedPick;
|
||||||
|
|
|
@ -107,8 +107,8 @@ class XdsClusterManagerPicker implements Picker {
|
||||||
metadata: new Metadata(),
|
metadata: new Metadata(),
|
||||||
},
|
},
|
||||||
subchannel: null,
|
subchannel: null,
|
||||||
extraFilterFactories: [],
|
onCallStarted: null,
|
||||||
onCallStarted: null
|
onCallEnded: null
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue