grpc-js-xds: Implement CSDS

This commit is contained in:
Michael Lumish 2021-12-02 16:15:09 -05:00
parent dca36701fc
commit 858d1b66ad
2 changed files with 206 additions and 0 deletions

View File

@ -0,0 +1,204 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { Node } from "./generated/envoy/config/core/v3/Node";
import { ClientConfig, _envoy_service_status_v3_ClientConfig_GenericXdsConfig as GenericXdsConfig } from "./generated/envoy/service/status/v3/ClientConfig";
import { ClientStatusDiscoveryServiceHandlers } from "./generated/envoy/service/status/v3/ClientStatusDiscoveryService";
import { ClientStatusRequest__Output } from "./generated/envoy/service/status/v3/ClientStatusRequest";
import { ClientStatusResponse } from "./generated/envoy/service/status/v3/ClientStatusResponse";
import { Timestamp } from "./generated/google/protobuf/Timestamp";
import { AdsTypeUrl, CDS_TYPE_URL_V2, CDS_TYPE_URL_V3, EDS_TYPE_URL_V2, EDS_TYPE_URL_V3, LDS_TYPE_URL_V2, LDS_TYPE_URL_V3, RDS_TYPE_URL_V2, RDS_TYPE_URL_V3 } from "./resources";
import { HandleResponseResult } from "./xds-stream-state/xds-stream-state";
import { sendUnaryData, ServerDuplexStream, ServerUnaryCall, status, experimental, loadPackageDefinition } from '@grpc/grpc-js';
import { loadSync } from "@grpc/proto-loader";
import { ProtoGrpcType as CsdsProtoGrpcType } from "./generated/csds";
import registerAdminService = experimental.registerAdminService;
function dateToProtoTimestamp(date?: Date | null): Timestamp | null {
if (!date) {
return null;
}
const millisSinceEpoch = date.getTime();
return {
seconds: (millisSinceEpoch / 1000) | 0,
nanos: (millisSinceEpoch % 1000) * 1_000_000
}
}
let clientNode: Node | null = null;
const configStatus = {
[EDS_TYPE_URL_V2]: new Map<string, GenericXdsConfig>(),
[EDS_TYPE_URL_V3]: new Map<string, GenericXdsConfig>(),
[CDS_TYPE_URL_V2]: new Map<string, GenericXdsConfig>(),
[CDS_TYPE_URL_V3]: new Map<string, GenericXdsConfig>(),
[RDS_TYPE_URL_V2]: new Map<string, GenericXdsConfig>(),
[RDS_TYPE_URL_V3]: new Map<string, GenericXdsConfig>(),
[LDS_TYPE_URL_V2]: new Map<string, GenericXdsConfig>(),
[LDS_TYPE_URL_V3]: new Map<string, GenericXdsConfig>()
};
/**
* This function only accepts a v3 Node message, because we are only supporting
* v3 CSDS and it only handles v3 Nodes. If the client is actually using v2 xDS
* APIs, it should just provide the equivalent v3 Node message.
* @param node The Node message for the client that is requesting resources
*/
export function setCsdsClientNode(node: Node) {
clientNode = node;
}
/**
* Update the config status maps from the list of names of requested resources
* for a specific type URL. These lists are the source of truth for determining
* what resources will be listed in the CSDS response. Any resource that is not
* in this list will never actually be applied anywhere.
* @param typeUrl The resource type URL
* @param names The list of resource names that are being requested
*/
export function updateRequestedNameList(typeUrl: AdsTypeUrl, names: string[]) {
const currentTime = dateToProtoTimestamp(new Date());
const configMap = configStatus[typeUrl];
for (const name of names) {
if (!configMap.has(name)) {
configMap.set(name, {
type_url: typeUrl,
name: name,
last_updated: currentTime,
client_status: 'REQUESTED'
});
}
}
for (const name of configMap.keys()) {
if (!names.includes(name)) {
configMap.delete(name);
}
}
}
/**
* Update the config status maps from the result of parsing a single ADS
* response. All resources that validated are considered "ACKED", and all
* resources that failed validation are considered "NACKED".
* @param typeUrl The type URL of resources in this response
* @param versionInfo The version info field from this response
* @param updates The lists of resources that passed and failed validation
*/
export function updateResourceResponse(typeUrl: AdsTypeUrl, versionInfo: string, updates: HandleResponseResult) {
const currentTime = dateToProtoTimestamp(new Date());
const configMap = configStatus[typeUrl];
for (const {name, raw} of updates.accepted) {
const mapEntry = configMap.get(name);
if (mapEntry) {
mapEntry.client_status = 'ACKED';
mapEntry.version_info = versionInfo;
mapEntry.xds_config = raw;
mapEntry.error_state = null;
mapEntry.last_updated = currentTime;
}
}
for (const {name, error, raw} of updates.rejected) {
const mapEntry = configMap.get(name);
if (mapEntry) {
mapEntry.client_status = 'NACKED';
mapEntry.error_state = {
failed_configuration: raw,
last_update_attempt: currentTime,
details: error,
version_info: versionInfo
};
}
}
for (const name of updates.missing) {
const mapEntry = configMap.get(name);
if (mapEntry) {
mapEntry.client_status = 'DOES_NOT_EXIST';
mapEntry.version_info = versionInfo;
mapEntry.xds_config = null;
mapEntry.error_state = null;
mapEntry.last_updated = currentTime;
}
}
}
function getCurrentConfig(): ClientConfig {
const genericConfigList: GenericXdsConfig[] = [];
for (const configMap of Object.values(configStatus)) {
for (const configValue of configMap.values()) {
genericConfigList.push(configValue);
}
}
return {
node: clientNode,
generic_xds_configs: genericConfigList
};
}
const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
FetchClientStatus(call: ServerUnaryCall<ClientStatusRequest__Output, ClientStatusResponse>, callback: sendUnaryData<ClientStatusResponse>) {
const request = call.request;
if (request.node_matchers !== null) {
callback({
code: status.INVALID_ARGUMENT,
details: 'Node matchers not supported'
});
return;
}
callback(null, {
config: [getCurrentConfig()]
});
},
StreamClientStatus(call: ServerDuplexStream<ClientStatusRequest__Output, ClientStatusResponse>) {
call.on('data', (request: ClientStatusRequest__Output) => {
if (request.node_matchers !== null) {
call.emit('error', {
code: status.INVALID_ARGUMENT,
details: 'Node matchers not supported'
});
return;
}
call.write({
config: [getCurrentConfig()]
});
});
call.on('end', () => {
call.end();
});
}
}
const loadedProto = loadSync('envoy/service/status/v3/csds.proto', {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
includeDirs: [
// Paths are relative to src/build
__dirname + '/../../deps/envoy-api/',
__dirname + '/../../deps/xds/',
],
});
const csdsGrpcObject = loadPackageDefinition(loadedProto) as unknown as CsdsProtoGrpcType;
const csdsServiceDefinition = csdsGrpcObject.envoy.service.status.v3.ClientStatusDiscoveryService.service;
export function setup() {
registerAdminService(() => csdsServiceDefinition, () => csdsImplementation);
}

View File

@ -24,6 +24,7 @@ import * as load_balancer_weighted_target from './load-balancer-weighted-target'
import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager';
import * as router_filter from './http-filter/router-filter';
import * as fault_injection_filter from './http-filter/fault-injection-filter';
import * as csds from './csds';
/**
* Register the "xds:" name scheme with the @grpc/grpc-js library.
@ -38,4 +39,5 @@ export function register() {
load_balancer_xds_cluster_manager.setup();
router_filter.setup();
fault_injection_filter.setup();
csds.setup();
}