mirror of https://github.com/grpc/grpc-node.git
388 lines
14 KiB
TypeScript
388 lines
14 KiB
TypeScript
/*
|
|
* Copyright 2023 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
import { ServerDuplexStream, Server, UntypedServiceImplementation, ServerCredentials, loadPackageDefinition, experimental, logVerbosity } from "@grpc/grpc-js";
|
|
import { AnyExtension, loadSync } from "@grpc/proto-loader";
|
|
import { EventEmitter } from "stream";
|
|
import { Cluster } from "../src/generated/envoy/config/cluster/v3/Cluster";
|
|
import { ClusterLoadAssignment } from "../src/generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
|
|
import { Listener } from "../src/generated/envoy/config/listener/v3/Listener";
|
|
import { RouteConfiguration } from "../src/generated/envoy/config/route/v3/RouteConfiguration";
|
|
import { AggregatedDiscoveryServiceHandlers } from "../src/generated/envoy/service/discovery/v3/AggregatedDiscoveryService";
|
|
import { DiscoveryRequest__Output } from "../src/generated/envoy/service/discovery/v3/DiscoveryRequest";
|
|
import { DiscoveryResponse } from "../src/generated/envoy/service/discovery/v3/DiscoveryResponse";
|
|
import { Any } from "../src/generated/google/protobuf/Any";
|
|
import { LDS_TYPE_URL, RDS_TYPE_URL, CDS_TYPE_URL, EDS_TYPE_URL, LdsTypeUrl, RdsTypeUrl, CdsTypeUrl, EdsTypeUrl, AdsTypeUrl } from "../src/resources"
|
|
import * as adsTypes from '../src/generated/ads';
|
|
import * as lrsTypes from '../src/generated/lrs';
|
|
import { LoadStatsRequest__Output } from "../src/generated/envoy/service/load_stats/v3/LoadStatsRequest";
|
|
import { LoadStatsResponse } from "../src/generated/envoy/service/load_stats/v3/LoadStatsResponse";
|
|
import * as path from 'path';
|
|
|
|
const TRACER_NAME = 'control_plane_server';
|
|
|
|
function trace(text: string) {
|
|
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
|
|
}
|
|
|
|
const loadedProtos = loadPackageDefinition(loadSync(
|
|
[
|
|
'envoy/service/discovery/v3/ads.proto',
|
|
'envoy/service/load_stats/v3/lrs.proto',
|
|
'envoy/config/listener/v3/listener.proto',
|
|
'envoy/config/route/v3/route.proto',
|
|
'envoy/config/cluster/v3/cluster.proto',
|
|
'envoy/config/endpoint/v3/endpoint.proto',
|
|
'envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto',
|
|
'envoy/extensions/clusters/aggregate/v3/cluster.proto',
|
|
'envoy/extensions/load_balancing_policies/round_robin/v3/round_robin.proto',
|
|
'envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto',
|
|
'envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto',
|
|
'envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto',
|
|
'envoy/extensions/transport_sockets/tls/v3/tls.proto',
|
|
'xds/type/v3/typed_struct.proto'
|
|
],
|
|
{
|
|
keepCase: true,
|
|
longs: String,
|
|
enums: String,
|
|
defaults: true,
|
|
oneofs: true,
|
|
json: true,
|
|
includeDirs: [
|
|
// Paths are relative to src/build
|
|
__dirname + '/../../deps/envoy-api/',
|
|
__dirname + '/../../deps/xds/',
|
|
__dirname + '/../../deps/googleapis/',
|
|
__dirname + '/../../deps/protoc-gen-validate/',
|
|
],
|
|
})) as unknown as adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType;
|
|
|
|
type AdsInputType<T extends AdsTypeUrl> = T extends EdsTypeUrl
|
|
? ClusterLoadAssignment
|
|
: T extends CdsTypeUrl
|
|
? Cluster
|
|
: T extends RdsTypeUrl
|
|
? RouteConfiguration
|
|
: Listener;
|
|
|
|
const ADS_TYPE_URLS = new Set([LDS_TYPE_URL, RDS_TYPE_URL, CDS_TYPE_URL, EDS_TYPE_URL]);
|
|
|
|
interface ResponseState {
|
|
state: 'ACKED' | 'NACKED';
|
|
errorMessage?: string;
|
|
}
|
|
|
|
interface ResponseListener {
|
|
(typeUrl: AdsTypeUrl, responseState: ResponseState): void;
|
|
}
|
|
|
|
type ResourceAny<T extends AdsTypeUrl> = AdsInputType<T> & {'@type': T};
|
|
|
|
interface ResourceState<T extends AdsTypeUrl> {
|
|
resource?: ResourceAny<T>;
|
|
resourceTypeVersion: number;
|
|
subscriptions: Set<string>;
|
|
}
|
|
|
|
interface ResourceTypeState<T extends AdsTypeUrl> {
|
|
resourceTypeVersion: number;
|
|
/**
|
|
* Key type is type URL
|
|
*/
|
|
resourceNameMap: Map<string, ResourceState<T>>;
|
|
}
|
|
|
|
interface ResourceMap {
|
|
[EDS_TYPE_URL]: ResourceTypeState<EdsTypeUrl>;
|
|
[CDS_TYPE_URL]: ResourceTypeState<CdsTypeUrl>;
|
|
[RDS_TYPE_URL]: ResourceTypeState<RdsTypeUrl>;
|
|
[LDS_TYPE_URL]: ResourceTypeState<LdsTypeUrl>;
|
|
}
|
|
|
|
function isAdsTypeUrl(value: string): value is AdsTypeUrl {
|
|
return ADS_TYPE_URLS.has(value);
|
|
}
|
|
|
|
export class ControlPlaneServer {
|
|
private resourceMap: ResourceMap = {
|
|
[EDS_TYPE_URL]: {
|
|
resourceTypeVersion: 0,
|
|
resourceNameMap: new Map()
|
|
},
|
|
[CDS_TYPE_URL]: {
|
|
resourceTypeVersion: 0,
|
|
resourceNameMap: new Map()
|
|
},
|
|
[RDS_TYPE_URL]: {
|
|
resourceTypeVersion: 0,
|
|
resourceNameMap: new Map()
|
|
},
|
|
[LDS_TYPE_URL]: {
|
|
resourceTypeVersion: 0,
|
|
resourceNameMap: new Map()
|
|
},
|
|
};
|
|
private responseListeners = new Set<ResponseListener>();
|
|
private resourceTypesToIgnore = new Set<AdsTypeUrl>();
|
|
private clients = new Map<string, ServerDuplexStream<DiscoveryRequest__Output, DiscoveryResponse>>();
|
|
private server: Server | null = null;
|
|
private port: number | null = null;
|
|
private nextStreamId: number = 0;
|
|
|
|
addResponseListener(listener: ResponseListener) {
|
|
this.responseListeners.add(listener);
|
|
}
|
|
|
|
removeResponseListener(listener: ResponseListener) {
|
|
this.responseListeners.delete(listener);
|
|
}
|
|
|
|
setResource<T extends AdsTypeUrl>(resource: ResourceAny<T>, name: string) {
|
|
trace(`Set resource type_url=${resource['@type']} name=${name}`);
|
|
const resourceTypeState = this.resourceMap[resource["@type"]] as ResourceTypeState<T>;
|
|
resourceTypeState.resourceTypeVersion += 1;
|
|
let resourceState: ResourceState<T> | undefined = resourceTypeState.resourceNameMap.get(name);
|
|
if (!resourceState) {
|
|
resourceState = {
|
|
resourceTypeVersion: 0,
|
|
subscriptions: new Set()
|
|
};
|
|
resourceTypeState.resourceNameMap.set(name, resourceState);
|
|
}
|
|
resourceState.resourceTypeVersion = resourceTypeState.resourceTypeVersion;
|
|
resourceState.resource = resource;
|
|
this.sendResourceUpdates(resource['@type'], resourceState.subscriptions, new Set([name]));
|
|
}
|
|
|
|
setLdsResource(resource: Listener) {
|
|
trace(`setLdsResource(${resource.name!})`);
|
|
this.setResource({...resource, '@type': LDS_TYPE_URL}, resource.name!);
|
|
}
|
|
|
|
setRdsResource(resource: RouteConfiguration) {
|
|
trace(`setRdsResource(${resource.name!})`);
|
|
this.setResource({...resource, '@type': RDS_TYPE_URL}, resource.name!);
|
|
}
|
|
|
|
setCdsResource(resource: Cluster) {
|
|
trace(`setCdsResource(${resource.name!})`);
|
|
this.setResource({...resource, '@type': CDS_TYPE_URL}, resource.name!);
|
|
}
|
|
|
|
setEdsResource(resource: ClusterLoadAssignment) {
|
|
trace(`setEdsResource(${resource.cluster_name!})`);
|
|
this.setResource({...resource, '@type': EDS_TYPE_URL}, resource.cluster_name!);
|
|
}
|
|
|
|
unsetResource<T extends AdsTypeUrl>(typeUrl: T, name: string) {
|
|
const resourceTypeState = this.resourceMap[typeUrl] as ResourceTypeState<T>;
|
|
resourceTypeState.resourceTypeVersion += 1;
|
|
let resourceState: ResourceState<T> | undefined = resourceTypeState.resourceNameMap.get(name);
|
|
if (resourceState) {
|
|
resourceState.resourceTypeVersion = resourceTypeState.resourceTypeVersion;
|
|
delete resourceState.resource;
|
|
this.sendResourceUpdates(typeUrl, resourceState.subscriptions, new Set([name]));
|
|
}
|
|
}
|
|
|
|
ignoreResourceType(typeUrl: AdsTypeUrl) {
|
|
this.resourceTypesToIgnore.add(typeUrl);
|
|
}
|
|
|
|
private sendResourceUpdates<T extends AdsTypeUrl>(typeUrl: T, clients: Set<string>, includeResources: Set<string>) {
|
|
const resourceTypeState = this.resourceMap[typeUrl] as ResourceTypeState<T>;
|
|
const clientResources = new Map<string, Any[]>();
|
|
for (const [resourceName, resourceState] of resourceTypeState.resourceNameMap) {
|
|
/* For RDS and EDS, only send updates for the listed updated resources.
|
|
* Otherwise include all resources. */
|
|
if ((typeUrl === RDS_TYPE_URL || typeUrl === EDS_TYPE_URL) && !includeResources.has(resourceName)) {
|
|
continue;
|
|
}
|
|
if (!resourceState.resource) {
|
|
continue;
|
|
}
|
|
for (const clientName of clients) {
|
|
if (!resourceState.subscriptions.has(clientName)) {
|
|
continue;
|
|
}
|
|
let resourcesList = clientResources.get(clientName);
|
|
if (!resourcesList) {
|
|
resourcesList = [];
|
|
clientResources.set(clientName, resourcesList);
|
|
}
|
|
resourcesList.push(resourceState.resource);
|
|
}
|
|
}
|
|
for (const [clientName, resourceList] of clientResources) {
|
|
this.clients.get(clientName)?.write({
|
|
resources: resourceList,
|
|
version_info: resourceTypeState.resourceTypeVersion.toString(),
|
|
nonce: resourceTypeState.resourceTypeVersion.toString(),
|
|
type_url: typeUrl
|
|
});
|
|
}
|
|
}
|
|
|
|
private updateResponseListeners(typeUrl: AdsTypeUrl, responseState: ResponseState) {
|
|
for (const listener of this.responseListeners) {
|
|
listener(typeUrl, responseState);
|
|
}
|
|
}
|
|
|
|
private maybeSubscribe<T extends AdsTypeUrl>(typeUrl: T, client: string, resourceName: string): boolean {
|
|
const resourceTypeState = this.resourceMap[typeUrl] as ResourceTypeState<T>;
|
|
let resourceState = resourceTypeState.resourceNameMap.get(resourceName);
|
|
if (!resourceState) {
|
|
resourceState = {
|
|
resourceTypeVersion: 0,
|
|
subscriptions: new Set()
|
|
};
|
|
resourceTypeState.resourceNameMap.set(resourceName, resourceState);
|
|
}
|
|
const newlySubscribed = !resourceState.subscriptions.has(client);
|
|
resourceState.subscriptions.add(client);
|
|
return newlySubscribed;
|
|
}
|
|
|
|
private handleUnsubscriptions(typeUrl: AdsTypeUrl, client: string, requestedResourceNames?: Set<string>) {
|
|
const resourceTypeState = this.resourceMap[typeUrl];
|
|
for (const [resourceName, resourceState] of resourceTypeState.resourceNameMap) {
|
|
if (!requestedResourceNames || !requestedResourceNames.has(resourceName)) {
|
|
resourceState.subscriptions.delete(client);
|
|
if (!resourceState.resource && resourceState.subscriptions.size === 0) {
|
|
resourceTypeState.resourceNameMap.delete(resourceName)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private handleRequest(clientName: string, request: DiscoveryRequest__Output) {
|
|
if (!isAdsTypeUrl(request.type_url)) {
|
|
console.error(`Received ADS request with unsupported type_url ${request.type_url}`);
|
|
return;
|
|
}
|
|
const clientResourceVersion = request.version_info === '' ? 0 : Number.parseInt(request.version_info);
|
|
if (request.error_detail) {
|
|
this.updateResponseListeners(request.type_url, {state: 'NACKED', errorMessage: request.error_detail.message});
|
|
} else {
|
|
this.updateResponseListeners(request.type_url, {state: 'ACKED'});
|
|
}
|
|
const requestedResourceNames = new Set(request.resource_names);
|
|
const resourceTypeState = this.resourceMap[request.type_url];
|
|
const updatedResources = new Set<string>();
|
|
trace(`Received request type_url=${request.type_url} names=[${Array.from(requestedResourceNames)}]`);
|
|
for (const resourceName of requestedResourceNames) {
|
|
if (this.maybeSubscribe(request.type_url, clientName, resourceName) || resourceTypeState.resourceNameMap.get(resourceName)!.resourceTypeVersion > clientResourceVersion) {
|
|
updatedResources.add(resourceName);
|
|
}
|
|
}
|
|
this.handleUnsubscriptions(request.type_url, clientName, requestedResourceNames);
|
|
if (updatedResources.size > 0) {
|
|
this.sendResourceUpdates(request.type_url, new Set([clientName]), updatedResources);
|
|
}
|
|
}
|
|
|
|
private getStreamId(): number {
|
|
const id = this.nextStreamId;
|
|
this.nextStreamId += 1;
|
|
return id;
|
|
}
|
|
|
|
StreamAggregatedResources(call: ServerDuplexStream<DiscoveryRequest__Output, DiscoveryResponse>) {
|
|
const clientName = `${call.getPeer()}(${this.getStreamId()})`;
|
|
this.clients.set(clientName, call);
|
|
call.on('data', (request: DiscoveryRequest__Output) => {
|
|
this.handleRequest(clientName, request);
|
|
});
|
|
call.on('end', () => {
|
|
this.clients.delete(clientName);
|
|
for (const typeUrl of ADS_TYPE_URLS) {
|
|
this.handleUnsubscriptions(typeUrl as AdsTypeUrl, clientName);
|
|
}
|
|
call.end();
|
|
});
|
|
}
|
|
|
|
StreamLoadStats(call: ServerDuplexStream<LoadStatsRequest__Output, LoadStatsResponse>) {
|
|
const statsResponse = {load_reporting_interval: {seconds: 30}};
|
|
call.write(statsResponse);
|
|
call.on('data', (request: LoadStatsRequest__Output) => {
|
|
call.write(statsResponse);
|
|
});
|
|
call.on('end', () => {
|
|
call.end();
|
|
});
|
|
}
|
|
|
|
startServer(callback: (error: Error | null, port: number) => void) {
|
|
if (this.server) {
|
|
return;
|
|
}
|
|
const server = new Server();
|
|
server.addService(loadedProtos.envoy.service.discovery.v3.AggregatedDiscoveryService.service, this as unknown as UntypedServiceImplementation);
|
|
server.addService(loadedProtos.envoy.service.load_stats.v3.LoadReportingService.service, this as unknown as UntypedServiceImplementation);
|
|
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (error, port) => {
|
|
if (!error) {
|
|
this.server = server;
|
|
this.port = port;
|
|
}
|
|
callback(error, port);
|
|
});
|
|
}
|
|
|
|
shutdownServer() {
|
|
this.server?.forceShutdown();
|
|
}
|
|
|
|
getBootstrapServerConfig() {
|
|
if (this.port === null) {
|
|
throw new Error('Bootstrap info unavailable; server not started');
|
|
}
|
|
return {
|
|
server_uri: `localhost:${this.port}`,
|
|
channel_creds: [{type: 'insecure'}]
|
|
};
|
|
}
|
|
|
|
getBootstrapInfoString(): string {
|
|
if (this.port === null) {
|
|
throw new Error('Bootstrap info unavailable; server not started');
|
|
}
|
|
const bootstrapInfo = {
|
|
xds_servers: [this.getBootstrapServerConfig()],
|
|
node: {
|
|
id: 'test',
|
|
locality: {}
|
|
},
|
|
server_listener_resource_name_template: '%s',
|
|
certificate_providers: {
|
|
test_certificates: {
|
|
plugin_name: 'file_watcher',
|
|
config: {
|
|
certificate_file: path.join(__dirname, 'fixtures', 'server1.pem'),
|
|
private_key_file: path.join(__dirname, 'fixtures', 'server1.key'),
|
|
ca_certificate_file: path.join(__dirname, 'fixtures', 'ca.pem'),
|
|
refresh_interval: '60s'
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return JSON.stringify(bootstrapInfo);
|
|
}
|
|
}
|