grpc-js-xds: Refactor xDS stream state and add resource timer

This commit is contained in:
Michael Lumish 2022-05-12 17:18:55 -07:00
parent 0679e3e492
commit 067bb13f27
6 changed files with 237 additions and 507 deletions

View File

@ -309,7 +309,7 @@ export class XdsClient {
const edsState = new EdsState(() => {
this.updateNames('eds');
});
const cdsState = new CdsState(edsState, () => {
const cdsState = new CdsState(() => {
this.updateNames('cds');
});
const rdsState = new RdsState(() => {
@ -630,6 +630,7 @@ export class XdsClient {
this.updateNames(service);
}
}
this.reportAdsStreamStarted();
}
}
@ -777,6 +778,13 @@ export class XdsClient {
this.adsState.lds.reportStreamError(status);
}
private reportAdsStreamStarted() {
this.adsState.eds.reportAdsStreamStart();
this.adsState.cds.reportAdsStreamStart();
this.adsState.rds.reportAdsStreamStart();
this.adsState.lds.reportAdsStreamStart();
}
private handleLrsResponse(message: LoadStatsResponse__Output) {
trace('Received LRS response');
/* Once we get any response from the server, we assume that the stream is

View File

@ -15,94 +15,21 @@
*
*/
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { EXPERIMENTAL_OUTLIER_DETECTION } from "../environment";
import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster";
import { Any__Output } from "../generated/google/protobuf/Any";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value";
import { EdsState } from "./eds-state";
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
export class CdsState implements XdsStreamState<Cluster__Output> {
versionInfo = '';
nonce = '';
private watchers: Map<string, Watcher<Cluster__Output>[]> = new Map<
string,
Watcher<Cluster__Output>[]
>();
private latestResponses: Cluster__Output[] = [];
private latestIsV2 = false;
constructor(
private edsState: EdsState,
private updateResourceNames: () => void
) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param clusterName
* @param watcher
*/
addWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Adding CDS watcher for clusterName ' + clusterName);
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(clusterName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.name === clusterName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
watcher.onValidUpdate(message, isV2);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
export class CdsState extends BaseXdsStreamState<Cluster__Output> implements XdsStreamState<Cluster__Output> {
protected isStateOfTheWorld(): boolean {
return true;
}
removeWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Removing CDS watcher for clusterName ' + clusterName);
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(clusterName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
protected getResourceName(resource: Cluster__Output): string {
return resource.name;
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
protected getProtocolName(): string {
return 'CDS';
}
private validateNonnegativeDuration(duration: Duration__Output | null): boolean {
@ -125,7 +52,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
return percentage.value >=0 && percentage.value <= 100;
}
private validateResponse(message: Cluster__Output): boolean {
public validateResponse(message: Cluster__Output): boolean {
if (message.type !== 'EDS') {
return false;
}
@ -167,69 +94,4 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
}
return true;
}
/**
* Given a list of clusterNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
private handleMissingNames(allClusterNames: Set<string>): string[] {
const missingNames: string[] = [];
for (const [clusterName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(clusterName)) {
trace('Reporting CDS resource does not exist for clusterName ' + clusterName);
missingNames.push(clusterName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
return missingNames;
}
handleResponses(responses: ResourcePair<Cluster__Output>[], isV2: boolean): HandleResponseResult {
const validResponses: Cluster__Output[] = [];
const result: HandleResponseResult = {
accepted: [],
rejected: [],
missing: []
}
for (const {resource, raw} of responses) {
if (this.validateResponse(resource)) {
validResponses.push(resource);
result.accepted.push({
name: resource.name,
raw: raw});
} else {
trace('CDS validation failed for message ' + JSON.stringify(resource));
result.rejected.push({
name: resource.name,
raw: raw,
error: `Cluster validation failed for resource ${resource.name}`
});
}
}
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allClusterNames: Set<string> = new Set<string>();
for (const message of validResponses) {
allClusterNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received CDS updates for cluster names [' + Array.from(allClusterNames) + ']');
result.missing = this.handleMissingNames(allClusterNames);
return result;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

View File

@ -19,7 +19,7 @@ import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { isIPv4, isIPv6 } from "net";
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
import { Any__Output } from "../generated/google/protobuf/Any";
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
import { BaseXdsStreamState, HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
const TRACER_NAME = 'xds_client';
@ -27,83 +27,15 @@ function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
public versionInfo = '';
public nonce = '';
private watchers: Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();
private latestResponses: ClusterLoadAssignment__Output[] = [];
private latestIsV2 = false;
constructor(private updateResourceNames: () => void) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param edsServiceName
* @param watcher
*/
addWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(edsServiceName, watchersEntry);
}
trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
watcher.onValidUpdate(message, isV2);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output> implements XdsStreamState<ClusterLoadAssignment__Output> {
protected getResourceName(resource: ClusterLoadAssignment__Output): string {
return resource.cluster_name;
}
removeWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Removing EDS watcher for edsServiceName ' + edsServiceName);
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
protected getProtocolName(): string {
return 'EDS';
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
protected isStateOfTheWorld(): boolean {
return false;
}
/**
@ -111,7 +43,7 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateResponse(message: ClusterLoadAssignment__Output) {
public validateResponse(message: ClusterLoadAssignment__Output) {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
@ -128,48 +60,4 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
}
return true;
}
handleResponses(responses: ResourcePair<ClusterLoadAssignment__Output>[], isV2: boolean): HandleResponseResult {
const validResponses: ClusterLoadAssignment__Output[] = [];
let result: HandleResponseResult = {
accepted: [],
rejected: [],
missing: []
}
for (const {resource, raw} of responses) {
if (this.validateResponse(resource)) {
validResponses.push(resource);
result.accepted.push({
name: resource.cluster_name,
raw: raw});
} else {
trace('EDS validation failed for message ' + JSON.stringify(resource));
result.rejected.push({
name: resource.cluster_name,
raw: raw,
error: `ClusterLoadAssignment validation failed for resource ${resource.cluster_name}`
});
}
}
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allClusterNames: Set<string> = new Set<string>();
for (const message of validResponses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received EDS updates for cluster names [' + Array.from(allClusterNames) + ']');
return result;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

View File

@ -15,16 +15,13 @@
*
*/
import * as protoLoader from '@grpc/proto-loader';
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { experimental, logVerbosity } from "@grpc/grpc-js";
import { Listener__Output } from '../generated/envoy/config/listener/v3/Listener';
import { RdsState } from "./rds-state";
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
import { HttpConnectionManager__Output } from '../generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager';
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V2, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from '../resources';
import { getTopLevelFilterUrl, validateTopLevelFilter } from '../http-filter';
import { EXPERIMENTAL_FAULT_INJECTION } from '../environment';
import { Any__Output } from '../generated/google/protobuf/Any';
const TRACER_NAME = 'xds_client';
@ -34,69 +31,22 @@ function trace(text: string): void {
const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router';
export class LdsState implements XdsStreamState<Listener__Output> {
versionInfo = '';
nonce = '';
private watchers: Map<string, Watcher<Listener__Output>[]> = new Map<string, Watcher<Listener__Output>[]>();
private latestResponses: Listener__Output[] = [];
private latestIsV2 = false;
constructor(private rdsState: RdsState, private updateResourceNames: () => void) {}
addWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
trace('Adding RDS watcher for targetName ' + targetName);
let watchersEntry = this.watchers.get(targetName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(targetName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.name === targetName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing RDS update for new watcher for targetName ' + targetName);
watcher.onValidUpdate(message, isV2);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
export class LdsState extends BaseXdsStreamState<Listener__Output> implements XdsStreamState<Listener__Output> {
protected getResourceName(resource: Listener__Output): string {
return resource.name;
}
protected getProtocolName(): string {
return 'LDS';
}
protected isStateOfTheWorld(): boolean {
return true;
}
removeWatcher(targetName: string, watcher: Watcher<Listener__Output>): void {
trace('Removing RDS watcher for targetName ' + targetName);
const watchersEntry = this.watchers.get(targetName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(targetName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
constructor(private rdsState: RdsState, updateResourceNames: () => void) {
super(updateResourceNames);
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
private validateResponse(message: Listener__Output, isV2: boolean): boolean {
public validateResponse(message: Listener__Output, isV2: boolean): boolean {
if (
!(
message.api_listener?.api_listener &&
@ -143,63 +93,4 @@ export class LdsState implements XdsStreamState<Listener__Output> {
}
return false;
}
private handleMissingNames(allTargetNames: Set<string>): string[] {
const missingNames: string[] = [];
for (const [targetName, watcherList] of this.watchers.entries()) {
if (!allTargetNames.has(targetName)) {
missingNames.push(targetName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
return missingNames;
}
handleResponses(responses: ResourcePair<Listener__Output>[], isV2: boolean): HandleResponseResult {
const validResponses: Listener__Output[] = [];
let result: HandleResponseResult = {
accepted: [],
rejected: [],
missing: []
}
for (const {resource, raw} of responses) {
if (this.validateResponse(resource, isV2)) {
validResponses.push(resource);
result.accepted.push({
name: resource.name,
raw: raw
});
} else {
trace('LDS validation failed for message ' + JSON.stringify(resource));
result.rejected.push({
name: resource.name,
raw: raw,
error: `Listener validation failed for resource ${resource.name}`
});
}
}
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allTargetNames = new Set<string>();
for (const message of validResponses) {
allTargetNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received LDS response with listener names [' + Array.from(allTargetNames) + ']');
result.missing = this.handleMissingNames(allTargetNames);
return result;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

View File

@ -15,20 +15,10 @@
*
*/
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { EXPERIMENTAL_FAULT_INJECTION } from "../environment";
import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration";
import { Any__Output } from "../generated/google/protobuf/Any";
import { validateOverrideFilter } from "../http-filter";
import { CdsLoadBalancingConfig } from "../load-balancer-cds";
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
import ServiceConfig = experimental.ServiceConfig;
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";
const SUPPORTED_PATH_SPECIFIERS = ['prefix', 'path', 'safe_regex'];
const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
@ -40,68 +30,16 @@ const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
'suffix_match'];
const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header'];
export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
versionInfo = '';
nonce = '';
private watchers: Map<string, Watcher<RouteConfiguration__Output>[]> = new Map<string, Watcher<RouteConfiguration__Output>[]>();
private latestResponses: RouteConfiguration__Output[] = [];
private latestIsV2 = false;
constructor(private updateResourceNames: () => void) {}
addWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
trace('Adding RDS watcher for routeConfigName ' + routeConfigName);
let watchersEntry = this.watchers.get(routeConfigName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(routeConfigName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.name === routeConfigName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing RDS update for new watcher for routeConfigName ' + routeConfigName);
watcher.onValidUpdate(message, isV2);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> implements XdsStreamState<RouteConfiguration__Output> {
protected isStateOfTheWorld(): boolean {
return false;
}
removeWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>): void {
trace('Removing RDS watcher for routeConfigName ' + routeConfigName);
const watchersEntry = this.watchers.get(routeConfigName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(routeConfigName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
protected getResourceName(resource: RouteConfiguration__Output): string {
return resource.name;
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
protected getProtocolName(): string {
return 'RDS';
}
validateResponse(message: RouteConfiguration__Output, isV2: boolean): boolean {
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
for (const virtualHost of message.virtual_hosts) {
@ -172,48 +110,4 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
}
return true;
}
handleResponses(responses: ResourcePair<RouteConfiguration__Output>[], isV2: boolean): HandleResponseResult {
const validResponses: RouteConfiguration__Output[] = [];
let result: HandleResponseResult = {
accepted: [],
rejected: [],
missing: []
}
for (const {resource, raw} of responses) {
if (this.validateResponse(resource, isV2)) {
validResponses.push(resource);
result.accepted.push({
name: resource.name,
raw: raw});
} else {
trace('RDS validation failed for message ' + JSON.stringify(resource));
result.rejected.push({
name: resource.name,
raw: raw,
error: `Route validation failed for resource ${resource.name}`
});
}
}
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allRouteConfigNames = new Set<string>();
for (const message of validResponses) {
allRouteConfigNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received RDS response with route config names [' + Array.from(allRouteConfigNames) + ']');
return result;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

View File

@ -15,9 +15,11 @@
*
*/
import { StatusObject } from "@grpc/grpc-js";
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { Any__Output } from "../generated/google/protobuf/Any";
const TRACER_NAME = 'xds_client';
export interface Watcher<UpdateType> {
/* Including the isV2 flag here is a bit of a kludge. It would probably be
* better for XdsStreamState#handleResponses to transform the protobuf
@ -63,4 +65,189 @@ export interface XdsStreamState<ResponseType> {
handleResponses(responses: ResourcePair<ResponseType>[], isV2: boolean): HandleResponseResult;
reportStreamError(status: StatusObject): void;
reportAdsStreamStart(): void;
addWatcher(name: string, watcher: Watcher<ResponseType>): void;
removeWatcher(resourceName: string, watcher: Watcher<ResponseType>): void;
}
interface SubscriptionEntry<ResponseType> {
watchers: Watcher<ResponseType>[];
cachedResponse: ResponseType | null;
resourceTimer: NodeJS.Timer;
}
const RESOURCE_TIMEOUT_MS = 15_000;
export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState<ResponseType> {
versionInfo = '';
nonce = '';
private subscriptions: Map<string, SubscriptionEntry<ResponseType>> = new Map<string, SubscriptionEntry<ResponseType>>();
private latestIsV2 = false;
private isAdsStreamRunning = false;
constructor(private updateResourceNames: () => void) {}
protected trace(text: string) {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, this.getProtocolName() + ' | ' + text);
}
private startResourceTimer(subscriptionEntry: SubscriptionEntry<ResponseType>) {
clearTimeout(subscriptionEntry.resourceTimer);
subscriptionEntry.resourceTimer = setTimeout(() => {
for (const watcher of subscriptionEntry.watchers) {
watcher.onResourceDoesNotExist();
}
}, RESOURCE_TIMEOUT_MS);
}
addWatcher(name: string, watcher: Watcher<ResponseType>): void {
this.trace('Adding watcher for name ' + name);
let subscriptionEntry = this.subscriptions.get(name);
let addedName = false;
if (subscriptionEntry === undefined) {
addedName = true;
subscriptionEntry = {
watchers: [],
cachedResponse: null,
resourceTimer: setTimeout(() => {}, 0)
};
this.startResourceTimer(subscriptionEntry);
this.subscriptions.set(name, subscriptionEntry);
}
subscriptionEntry.watchers.push(watcher);
if (subscriptionEntry.cachedResponse !== null) {
const cachedResponse = subscriptionEntry.cachedResponse;
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
this.trace('Reporting existing update for new watcher for name ' + name);
watcher.onValidUpdate(cachedResponse, this.latestIsV2);
});
}
if (addedName) {
this.updateResourceNames();
}
}
removeWatcher(resourceName: string, watcher: Watcher<ResponseType>): void {
this.trace('Removing watcher for name ' + resourceName);
const subscriptionEntry = this.subscriptions.get(resourceName);
if (subscriptionEntry !== undefined) {
const entryIndex = subscriptionEntry.watchers.indexOf(watcher);
if (entryIndex >= 0) {
subscriptionEntry.watchers.splice(entryIndex, 1);
}
if (subscriptionEntry.watchers.length === 0) {
clearTimeout(subscriptionEntry.resourceTimer);
this.subscriptions.delete(resourceName);
this.updateResourceNames();
}
}
}
getResourceNames(): string[] {
return Array.from(this.subscriptions.keys());
}
handleResponses(responses: ResourcePair<ResponseType>[], isV2: boolean): HandleResponseResult {
const validResponses: ResponseType[] = [];
let result: HandleResponseResult = {
accepted: [],
rejected: [],
missing: []
}
for (const {resource, raw} of responses) {
const resourceName = this.getResourceName(resource);
if (this.validateResponse(resource, isV2)) {
validResponses.push(resource);
result.accepted.push({
name: resourceName,
raw: raw});
} else {
this.trace('Validation failed for message ' + JSON.stringify(resource));
result.rejected.push({
name: resourceName,
raw: raw,
error: `Validation failed for resource ${resourceName}`
});
}
}
this.latestIsV2 = isV2;
const allResourceNames = new Set<string>();
for (const resource of validResponses) {
const resourceName = this.getResourceName(resource);
allResourceNames.add(resourceName);
const subscriptionEntry = this.subscriptions.get(resourceName);
if (subscriptionEntry) {
const watchers = subscriptionEntry.watchers;
for (const watcher of watchers) {
watcher.onValidUpdate(resource, isV2);
}
clearTimeout(subscriptionEntry.resourceTimer);
subscriptionEntry.cachedResponse = resource;
}
}
result.missing = this.handleMissingNames(allResourceNames);
this.trace('Received response with resource names [' + Array.from(allResourceNames) + ']');
return result;
}
reportStreamError(status: StatusObject): void {
for (const subscriptionEntry of this.subscriptions.values()) {
for (const watcher of subscriptionEntry.watchers) {
watcher.onTransientError(status);
}
clearTimeout(subscriptionEntry.resourceTimer);
}
this.isAdsStreamRunning = false;
}
reportAdsStreamStart() {
this.isAdsStreamRunning = true;
for (const subscriptionEntry of this.subscriptions.values()) {
if (subscriptionEntry.cachedResponse === null) {
this.startResourceTimer(subscriptionEntry);
}
}
}
private handleMissingNames(allResponseNames: Set<String>): string[] {
if (this.isStateOfTheWorld()) {
const missingNames: string[] = [];
for (const [resourceName, subscriptionEntry] of this.subscriptions.entries()) {
if (!allResponseNames.has(resourceName) && subscriptionEntry.cachedResponse !== null) {
this.trace('Reporting resource does not exist named ' + resourceName);
missingNames.push(resourceName);
for (const watcher of subscriptionEntry.watchers) {
watcher.onResourceDoesNotExist();
}
}
}
return missingNames;
} else {
return [];
}
}
/**
* Apply the validation rules for this resource type to this resource
* instance.
* This function is public so that the LDS validateResponse can call into
* the RDS validateResponse.
* @param resource The resource object sent by the xDS server
* @param isV2 If true, the resource is an xDS V2 resource instead of xDS V3
*/
public abstract validateResponse(resource: ResponseType, isV2: boolean): boolean;
/**
* Get the name of a resource object. The name is some field of the object, so
* getting it depends on the specific type.
* @param resource
*/
protected abstract getResourceName(resource: ResponseType): string;
protected abstract getProtocolName(): string;
/**
* Indicates whether responses are "state of the world", i.e. that they
* contain all resources and that omitted previously-seen resources should
* be treated as removed.
*/
protected abstract isStateOfTheWorld(): boolean;
}