Add resolver and service config handling code

This commit is contained in:
murgatroid99 2019-07-16 14:35:06 -07:00
parent a996adaade
commit acdd2abfc3
4 changed files with 567 additions and 0 deletions

View File

@ -0,0 +1,109 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/* This file is an implementation of gRFC A24:
* https://github.com/grpc/proposal/blob/master/A24-lb-policy-config.md */
import { isString, isArray } from "util";
export interface RoundRobinConfig {
}
export interface XdsConfig {
balancerName: string;
childPolicy: LoadBalancingConfig[];
fallbackPolicy: LoadBalancingConfig[];
}
export interface GrpcLbConfig {
childPolicy: LoadBalancingConfig[];
}
export interface LoadBalancingConfig {
/* Exactly one of these must be set for a config to be valid */
round_robin?: RoundRobinConfig;
xds?: XdsConfig;
grpclb?: GrpcLbConfig;
}
/* In these functions we assume the input came from a JSON object. Therefore we
* expect that the prototype is uninteresting and that `in` can be used
* effectively */
function validateXdsConfig(xds: any): XdsConfig {
if (!('balancerName' in xds) || !isString(xds.balancerName)) {
throw new Error('Invalid xds config: invalid balancerName');
}
const xdsConfig: XdsConfig = {
balancerName: xds.balancerName,
childPolicy: [],
fallbackPolicy: []
};
if ('childPolicy' in xds) {
if (!isArray(xds.childPolicy)) {
throw new Error('Invalid xds config: invalid childPolicy');
}
for (const policy of xds.childPolicy) {
xdsConfig.childPolicy.push(validateConfig(policy));
}
}
if ('fallbackPolicy' in xds) {
if (!isArray(xds.fallbackPolicy)) {
throw new Error('Invalid xds config: invalid fallbackPolicy');
}
for (const policy of xds.fallbackPolicy) {
xdsConfig.fallbackPolicy.push(validateConfig(policy));
}
}
return xdsConfig;
}
function validateGrpcLbConfig(grpclb: any): GrpcLbConfig {
const grpcLbConfig: GrpcLbConfig = {
childPolicy: []
};
if ('childPolicy' in grpclb) {
if (!isArray(grpclb.childPolicy)) {
throw new Error('Invalid xds config: invalid childPolicy');
}
for (const policy of grpclb.childPolicy) {
grpcLbConfig.childPolicy.push(validateConfig(policy));
}
}
return grpcLbConfig;
}
export function validateConfig(obj: any): LoadBalancingConfig {
if ('round_robin' in obj) {
if ('xds' in obj || 'grpclb' in obj) {
throw new Error('Multiple load balancing policies configured');
}
if (obj['round_robin'] instanceof Object) {
return { round_robin: {} }
}
}
if ('xds' in obj) {
if ('grpclb' in obj) {
throw new Error('Multiple load balancing policies configured');
}
return {xds: validateXdsConfig(obj.xds)};
}
if ('grpclb' in obj) {
return {grpclb: validateGrpcLbConfig(obj.grpclb)};
}
throw new Error('No recognized load balancing policy configured');
}

View File

@ -0,0 +1,144 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Resolver, ResolverListener, registerResolver, registerDefaultResolver } from './resolver';
import * as dns from 'dns';
import * as util from 'util';
import { extractAndSelectServiceConfig, ServiceConfig } from './service-config';
/* These regular expressions match IP addresses with optional ports in different
* formats. In each case, capture group 1 contains the address, and capture
* group 2 contains the port number, if present */
const IPv4_REGEX = /^(\d{1,3}(?:\.\d{1,3}){3})(?::(\d+))?$/;
const IPv6_REGEX = /^([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)$/i;
const IPv6_BRACKET_REGEX = /^\[([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)\](?::(\d+))?$/i;
const DNS_REGEX = /^(?:dns:)?(?:\/\/\w+\/)?(\w+)(?::(\d+))?$/;
const DEFAULT_PORT = '443';
const resolve4Promise = util.promisify(dns.resolve4);
const resolve6Promise = util.promisify(dns.resolve6);
function parseIP(target: string): string | null {
/* These three regular expressions are all mutually exclusive, so we just
* want the first one that matches the target string, if any do. */
const match = IPv4_REGEX.exec(target) || IPv6_REGEX.exec(target) || IPv6_BRACKET_REGEX.exec(target);
if (match === null) {
return null;
}
const addr = match[1];
let port: string;
if (match[2]) {
port = match[2];
} else {
port = DEFAULT_PORT;
}
return `${addr}:${port}`;
}
function mergeArrays<T>(...arrays: T[][]): T[] {
const result: T[] = [];
for(let i = 0; i<Math.max.apply(null, arrays.map((array)=> array.length)); i++) {
for(let array of arrays) {
if(i < array.length) {
result.push(array[i]);
}
}
}
return result;
}
class DnsResolver implements Resolver {
ipResult: string | null;
dnsHostname: string | null;
port: string | null;
/* The promise results here contain, in order, the A record, the AAAA record,
* and either the TXT record or an error if TXT resolution failed */
pendingResultPromise: Promise<[string[], string[], string[][] | Error]> | null = null;
percentage: number;
constructor(private target: string, private listener: ResolverListener) {
this.ipResult = parseIP(target);
const dnsMatch = DNS_REGEX.exec(target);
if (dnsMatch === null) {
this.dnsHostname = null;
this.port = null;
} else {
this.dnsHostname = dnsMatch[1];
if (dnsMatch[2]) {
this.port = dnsMatch[2];
} else {
this.port = DEFAULT_PORT;
}
}
this.percentage = Math.random() * 100;
this.startResolution();
}
private startResolution() {
if (this.ipResult !== null) {
setImmediate(() => {
this.listener.onSuccessfulResolution([this.ipResult!], null, null);
});
return;
}
if (this.dnsHostname !== null) {
const hostname: string = this.dnsHostname;
const Aresult = resolve4Promise(hostname);
const AAAAresult = resolve6Promise(hostname);
const TXTresult = new Promise<string[][] | Error>((resolve, reject) => {
dns.resolveTxt(hostname, (err, records) => {
if (err) {
resolve(err);
} else {
resolve(records);
}
});
});
this.pendingResultPromise = Promise.all([Aresult, AAAAresult, TXTresult]);
this.pendingResultPromise.then(([Arecord, AAAArecord, TXTrecord]) => {
this.pendingResultPromise = null;
const allAddresses: string[] = mergeArrays(AAAArecord, Arecord);
let serviceConfig: ServiceConfig | null = null;
let serviceConfigError: Error | null = null;
if (TXTrecord instanceof Error) {
serviceConfigError = TXTrecord;
} else {
try {
serviceConfig = extractAndSelectServiceConfig(TXTrecord, this.percentage);
} catch (err) {
serviceConfigError = err;
}
}
this.listener.onSuccessfulResolution(allAddresses, serviceConfig, serviceConfigError);
}, (err) => {
this.pendingResultPromise = null;
this.listener.onError(err);
});
}
}
updateResolution() {
if (this.pendingResultPromise === null) {
this.startResolution();
}
}
}
export function setup(): void {
registerResolver('dns:', DnsResolver);
registerDefaultResolver(DnsResolver);
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { ServiceError } from "./call";
import { ServiceConfig } from "./service-config";
export interface ResolverListener {
onSuccessfulResolution(addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: Error | null): void;
onError(error: ServiceError): void;
}
export interface Resolver {
updateResolution(): void;
}
export interface ResolverConstructor {
new(target: string, listener: ResolverListener): Resolver;
}
const registeredResolvers: {[prefix: string]: ResolverConstructor} = {};
let defaultResolver: ResolverConstructor | null = null;
export function registerResolver(prefix: string, resolverClass: ResolverConstructor) {
registeredResolvers[prefix] = resolverClass;
}
export function registerDefaultResolver(resolverClass: ResolverConstructor) {
defaultResolver = resolverClass;
}
export function createResolver(target: string, listener: ResolverListener): Resolver {
for (const prefix of Object.keys(registeredResolvers)) {
if (target.startsWith(prefix)) {
return new registeredResolvers[prefix](target, listener);
}
}
if (defaultResolver !== null) {
return new defaultResolver(target, listener);
}
throw new Error('No resolver could be created for the provided target');
}

View File

@ -0,0 +1,259 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/* This file implements gRFC A2 and the service config spec:
* https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md
* https://github.com/grpc/grpc/blob/master/doc/service_config.md */
import * as lbconfig from './load-balancing-config';
import { isString, isArray, isBoolean, isNumber } from 'util';
import * as os from 'os';
export interface MethodConfigName {
service: string;
method?: string;
}
export interface MethodConfig {
name: MethodConfigName[];
waitForReady?: boolean;
timeout?: string;
maxRequestBytes?: number;
maxResponseBytes?: number;
}
export interface ServiceConfig {
loadBalancingPolicy?: string;
loadBalancingConfig: lbconfig.LoadBalancingConfig[]
methodConfig: MethodConfig[];
}
export interface ServiceConfigCanaryConfig {
clientLanguage?: string[];
percentage?: number;
clientHostname?: string[];
serviceConfig: ServiceConfig;
}
const TIMEOUT_REGEX = /^\d+(\.\d{1,9})?s$/;
const CLIENT_LANGUAGE_STRING = 'node';
function validateName(obj: any): MethodConfigName {
if (!('service' in obj) || !isString(obj.service)) {
throw new Error('Invalid method config name: invalid service');
}
const result: MethodConfigName = {
service: obj.service
};
if ('method' in obj) {
if (isString(obj.method)) {
result.method = obj.method;
} else {
throw new Error('Invalid method config name: invalid method');
}
}
return result;
}
function validateMethodConfig(obj: any): MethodConfig {
const result: MethodConfig = {
name: []
};
if (!('name' in obj) || !isArray(obj.name)) {
throw new Error('Invalid method config: invalid name array');
}
for (const name of obj.name) {
result.name.push(validateName(name));
}
if ('waitForReady' in obj) {
if (!isBoolean(obj.waitForReady)) {
throw new Error('Invalid method config: invalid waitForReady');
}
result.waitForReady = obj.waitForReady;
}
if ('timeout' in obj) {
if (!isString(obj.timeout) || !TIMEOUT_REGEX.test(obj.timeout)) {
throw new Error('Invalid method config: invalid timeout');
}
result.timeout = obj.timeout;
}
if ('maxRequestBytes' in obj) {
if (!isNumber(obj.maxRequestBytes)) {
throw new Error('Invalid method config: invalid maxRequestBytes');
}
result.maxRequestBytes = obj.maxRequestBytes;
}
if ('maxResponseBytes' in obj) {
if (!isNumber(obj.maxResponseBytes)) {
throw new Error('Invalid method config: invalid maxRequestBytes');
}
result.maxResponseBytes = obj.maxResponseBytes;
}
return result;
}
function validateServiceConfig(obj: any): ServiceConfig {
const result: ServiceConfig = {
loadBalancingConfig: [],
methodConfig: []
};
if ('loadBalancingPolicy' in obj) {
if (isString(obj.loadBalancingPolicy)) {
result.loadBalancingPolicy = obj.loadBalancingPolicy;
} else {
throw new Error('Invalid service config: invalid loadBalancingPolicy');
}
}
if ('loadBalancingConfig' in obj) {
if (isArray(obj.loadBalancingConfig)) {
for (const config of obj.loadBalancingConfig) {
result.loadBalancingConfig.push(lbconfig.validateConfig(config));
}
} else {
throw new Error('Invalid service config: invalid loadBalancingConfig');
}
}
if ('methodConfig' in obj) {
if (isArray(obj.methodConfig)) {
for (const methodConfig of obj.methodConfig) {
result.methodConfig.push(validateMethodConfig(methodConfig));
}
}
}
// Validate method name uniqueness
const seenMethodNames: MethodConfigName[] = [];
for (const methodConfig of result.methodConfig) {
for (const name of methodConfig.name) {
for (const seenName of seenMethodNames) {
if (name.service === seenName.service && name.method === seenName.method) {
throw new Error(`Invalid service config: duplicate name ${name.service}/${name.method}`);
}
}
seenMethodNames.push(name);
}
}
return result;
}
function validateCanaryConfig(obj: any): ServiceConfigCanaryConfig {
if (!('serviceConfig' in obj)) {
throw new Error('Invalid service config choice: missing service config');
}
const result: ServiceConfigCanaryConfig = {
serviceConfig: validateServiceConfig(obj.serviceConfig)
}
if ('clientLanguage' in obj) {
if (isArray(obj.clientLanguage)) {
result.clientLanguage = [];
for (const lang of obj.clientLanguage) {
if (isString(lang)) {
result.clientLanguage.push(lang);
} else {
throw new Error('Invalid service config choice: invalid clientLanguage');
}
}
} else {
throw new Error('Invalid service config choice: invalid clientLanguage');
}
}
if ('clientHostname' in obj) {
if (isArray(obj.clientHostname)) {
result.clientHostname = [];
for (const lang of obj.clientHostname) {
if (isString(lang)) {
result.clientHostname.push(lang);
} else {
throw new Error('Invalid service config choice: invalid clientHostname');
}
}
} else {
throw new Error('Invalid service config choice: invalid clientHostname');
}
}
if ('percentage' in obj) {
if (isNumber(obj.percentage) && 0 <= obj.percentage && obj.percentage <= 100) {
result.percentage = obj.percentage;
} else {
throw new Error('Invalid service config choice: invalid percentage');
}
}
// Validate that no unexpected fields are present
const allowedFields = ['clientLanguage', 'percentage', 'clientHostname', 'serviceConfig'];
for (const field in obj) {
if (!allowedFields.includes(field)) {
throw new Error(`Invalid service config choice: unexpected field ${field}`);
}
}
return result;
}
function validateAndSelectCanaryConfig(obj: any, percentage: number): ServiceConfig {
if (!isArray(obj)) {
throw new Error('Invalid service config list');
}
for (const config of obj) {
const validatedConfig = validateCanaryConfig(config);
/* For each field, we check if it is present, then only discard the
* config if the field value does not match the current client */
if (isNumber(validatedConfig.percentage) && percentage > validatedConfig.percentage) {
continue;
}
if (isArray(validatedConfig.clientHostname)) {
let hostnameMatched = false;
for (const hostname of validatedConfig.clientHostname) {
if (hostname === os.hostname()) {
hostnameMatched = true;
}
}
if (!hostnameMatched) {
continue;
}
}
if (isArray(validatedConfig.clientLanguage)) {
let languageMatched = false;
for (const language of validatedConfig.clientLanguage) {
if (language === CLIENT_LANGUAGE_STRING) {
languageMatched = true;
}
}
if (!languageMatched) {
continue;
}
}
return validatedConfig.serviceConfig;
}
throw new Error('No matching service config found');
}
/**
* Find the "grpc_config" record among the TXT records, parse its value as JSON, validate its contents,
* and select a service config with selection fields that all match this client. Most of these steps
* can fail with an error; the caller must handle any errors thrown this way.
* @param txtRecord The TXT record array that is output from a successful call to dns.resolveTxt
* @param percentage A number chosen from the range [0, 100) that is used to select which config to use
*/
export function extractAndSelectServiceConfig(txtRecord: string[][], percentage: number): ServiceConfig | null {
for (const record of txtRecord) {
if (record.length > 0 && record[0].startsWith('grpc_config=')) {
const recordString = [record[0].substring('grpc_config='.length)].concat(record.slice(1)).join('');
const recordJson: any = JSON.parse(recordString);
return validateAndSelectCanaryConfig(recordJson, percentage);
}
}
return null;
}