Add XdsClient tracers, and stream start backoff, and fix some bugs

This commit is contained in:
Michael Lumish 2020-08-25 13:22:30 -07:00
parent 8580204a73
commit af949674da
1 changed files with 66 additions and 18 deletions

View File

@ -54,6 +54,7 @@ import { Listener__Output } from './generated/envoy/api/v2/Listener';
import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration';
import { Any__Output } from './generated/google/protobuf/Any';
import { BackoffTimeout } from './backoff-timeout';
const TRACER_NAME = 'xds_client';
@ -260,6 +261,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Adding EDS watcher for edsServiceName ' + edsServiceName);
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
@ -276,6 +278,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
watcher.onValidUpdate(message);
});
}
@ -289,6 +292,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Removing EDS watcher for edsServiceName ' + edsServiceName);
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
@ -342,6 +346,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
handleMissingNames(allEdsServiceNames: Set<string>) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allEdsServiceNames.has(edsServiceName)) {
trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
@ -352,7 +357,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
handleResponses(responses: ClusterLoadAssignment__Output[]) {
for (const message of responses) {
if (!this.validateResponse(message)) {
return 'ClusterLoadAssignment validation failed';
return 'EDS Error: ClusterLoadAssignment validation failed';
}
}
this.latestResponses = responses;
@ -364,6 +369,7 @@ class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
watcher.onValidUpdate(message);
}
}
trace('Received EDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
return null;
}
@ -400,6 +406,7 @@ class CdsState implements XdsStreamState<Cluster__Output> {
* @param watcher
*/
addWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Adding CDS watcher for clusterName ' + clusterName);
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
@ -416,6 +423,7 @@ class CdsState implements XdsStreamState<Cluster__Output> {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
watcher.onValidUpdate(message);
});
}
@ -426,6 +434,7 @@ class CdsState implements XdsStreamState<Cluster__Output> {
}
removeWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Removing CDS watcher for clusterName ' + clusterName);
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
@ -466,14 +475,15 @@ class CdsState implements XdsStreamState<Cluster__Output> {
}
/**
* Given a list of edsServiceNames (which may actually be the cluster name),
* Given a list of clusterNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
private handleMissingNames(allClusterNames: Set<string>) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(edsServiceName)) {
for (const [clusterName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(clusterName)) {
trace('Reporting CDS resource does not exist for clusterName ' + clusterName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
@ -484,7 +494,7 @@ class CdsState implements XdsStreamState<Cluster__Output> {
handleResponses(responses: Cluster__Output[]): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
return 'Cluster validation failed';
return 'CDS Error: Cluster validation failed';
}
}
this.latestResponses = responses;
@ -501,6 +511,7 @@ class CdsState implements XdsStreamState<Cluster__Output> {
watcher.onValidUpdate(message);
}
}
trace('Received CDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
this.edsState.handleMissingNames(allEdsServiceNames);
return null;
@ -535,6 +546,7 @@ class RdsState implements XdsStreamState<RouteConfiguration__Output> {
if (virtualHost.domains.indexOf(this.routeConfigName!) >= 0) {
const route = virtualHost.routes[virtualHost.routes.length - 1];
if (route.match?.prefix === '' && route.route?.cluster) {
trace('Reporting RDS update for host ' + this.routeConfigName + ' with cluster ' + route.route.cluster);
this.watcher.onValidUpdate({
methodConfig: [],
loadBalancingConfig: [
@ -546,10 +558,11 @@ class RdsState implements XdsStreamState<RouteConfiguration__Output> {
},
],
});
break;
return;
}
}
}
trace('Reporting RDS resource does not exist');
/* If none of the routes match the one we are looking for, bubble up an
* error. */
this.watcher.onResourceDoesNotExist();
@ -623,11 +636,13 @@ class LdsState implements XdsStreamState<Listener__Output> {
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
trace('Received LDS update with RDS route config name ' + httpConnectionManager.rds!.route_config_name);
this.rdsState.setRouteConfigName(
httpConnectionManager.rds!.route_config_name
);
break;
case 'route_config':
trace('Received LDS update with route configuration');
this.rdsState.setRouteConfigName(null);
this.rdsState.handleSingleMessage(
httpConnectionManager.route_config!
@ -637,7 +652,7 @@ class LdsState implements XdsStreamState<Listener__Output> {
// The validation rules should prevent this
}
} else {
return 'Listener validation failed';
return 'LRS Error: Listener validation failed';
}
}
}
@ -677,11 +692,11 @@ function getResponseMessages<T extends AdsTypeUrl>(
result.push(resource as protoLoader.AnyExtension & OutputType<T>);
} else {
throw new Error(
`Invalid resource type ${
`ADS Error: Invalid resource type ${
protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url
}`
}, expected ${typeUrl}`
);
}
}
@ -711,6 +726,9 @@ export class XdsClient {
private adsState: AdsState;
private adsBackoff: BackoffTimeout;
private lrsBackoff: BackoffTimeout;
constructor(
targetName: string,
serviceConfigWatcher: Watcher<ServiceConfig>,
@ -752,6 +770,14 @@ export class XdsClient {
delete channelArgs[arg];
}
channelArgs['grpc.keepalive_time_ms'] = 5000;
this.adsBackoff = new BackoffTimeout(() => {
this.maybeStartAdsStream();
});
this.lrsBackoff = new BackoffTimeout(() => {
this.maybeStartLrsStream();
})
Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(
([bootstrapInfo, protoDefinitions]) => {
if (this.hasShutdown) {
@ -770,6 +796,7 @@ export class XdsClient {
...node,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
trace('Starting xDS client connected to server URI ' + bootstrapInfo.xdsServers[0].serverUri);
this.adsClient = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
bootstrapInfo.xdsServers[0].serverUri,
createGoogleDefaultCredentials(),
@ -828,6 +855,7 @@ export class XdsClient {
errorString = `Unknown type_url ${message.type_url}`;
}
if (errorString === null) {
trace('Acking message with type URL ' + message.type_url);
/* errorString can only be null in one of the first 4 cases, which
* implies that message.type_url is one of the 4 known type URLs, which
* means that this type assertion is valid. */
@ -836,6 +864,7 @@ export class XdsClient {
this.adsState[typeUrl].versionInfo = message.version_info;
this.ack(typeUrl);
} else {
trace('Nacking message with type URL ' + message.type_url + ': "' + errorString + '"');
this.nack(message.type_url, errorString);
}
}
@ -854,6 +883,9 @@ export class XdsClient {
if (this.hasShutdown) {
return;
}
trace('Starting ADS stream');
// Backoff relative to when we start the request
this.adsBackoff.runOnce();
this.adsCall = this.adsClient.StreamAggregatedResources();
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
this.handleAdsResponse(message);
@ -864,10 +896,11 @@ export class XdsClient {
);
this.adsCall = null;
this.reportStreamError(error);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
/* If the backoff timer is no longer running, we do not need to wait any
* more to start the new call. */
if (!this.adsBackoff.isRunning()) {
this.maybeStartAdsStream();
}
});
const allTypeUrls: AdsTypeUrl[] = [
@ -889,6 +922,11 @@ export class XdsClient {
* version info are updated so that it sends the post-update values.
*/
ack(typeUrl: AdsTypeUrl) {
/* An ack is the best indication of a successful interaction between the
* client and the server, so we can reset the backoff timer here. */
this.adsBackoff.stop();
this.adsBackoff.reset();
this.updateNames(typeUrl);
}
@ -953,8 +991,17 @@ export class XdsClient {
if (this.hasShutdown) {
return;
}
trace('Starting LRS stream');
this.lrsBackoff.runOnce();
this.lrsCall = this.lrsClient.streamLoadStats();
this.lrsCall.on('metadata', () => {
/* Once we get any response from the server, we assume that the stream is
* in a good state, so we can reset the backoff timer. */
this.lrsBackoff.stop();
this.lrsBackoff.reset();
});
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
if (
message.load_reporting_interval?.seconds !==
@ -970,7 +1017,7 @@ export class XdsClient {
const loadReportingIntervalMs =
Number.parseInt(message.load_reporting_interval!.seconds) * 1000 +
message.load_reporting_interval!.nanos / 1_000_000;
setInterval(() => {
this.statsTimer = setInterval(() => {
this.sendStats();
}, loadReportingIntervalMs);
}
@ -982,10 +1029,11 @@ export class XdsClient {
);
this.lrsCall = null;
clearInterval(this.statsTimer);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
/* If the backoff timer is no longer running, we do not need to wait any
* more to start the new call. */
if (!this.lrsBackoff.isRunning()) {
this.maybeStartLrsStream();
}
});
this.lrsCall.write({
node: this.lrsNode!,