Merge pull request #2241 from grpc/@grpc/grpc-js@1.7.x

Upmerge v1.7.x branch into master
This commit is contained in:
Michael Lumish 2022-10-11 14:43:19 -07:00 committed by GitHub
commit fd7655805b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 395 additions and 23 deletions

View File

@ -16,4 +16,4 @@
*/
export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION ?? 'true') === 'true';
export const EXPERIMENTAL_OUTLIER_DETECTION = process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION === 'true';
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';

View File

@ -18,6 +18,7 @@
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { isIPv4, isIPv6 } from "net";
import { Locality__Output } from "../generated/envoy/config/core/v3/Locality";
import { SocketAddress__Output } from "../generated/envoy/config/core/v3/SocketAddress";
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
import { Any__Output } from "../generated/google/protobuf/Any";
import { BaseXdsStreamState, HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
@ -32,6 +33,10 @@ function localitiesEqual(a: Locality__Output, b: Locality__Output) {
return a.region === b.region && a.sub_zone === b.sub_zone && a.zone === b.zone;
}
function addressesEqual(a: SocketAddress__Output, b: SocketAddress__Output) {
return a.address === b.address && a.port_value === b.port_value;
}
export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output> implements XdsStreamState<ClusterLoadAssignment__Output> {
protected getResourceName(resource: ClusterLoadAssignment__Output): string {
return resource.cluster_name;
@ -50,6 +55,8 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
*/
public validateResponse(message: ClusterLoadAssignment__Output) {
const seenLocalities: {locality: Locality__Output, priority: number}[] = [];
const seenAddresses: SocketAddress__Output[] = [];
const priorityTotalWeights: Map<number, number> = new Map();
for (const endpoint of message.endpoints) {
if (!endpoint.locality) {
return false;
@ -71,6 +78,23 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
return false;
}
for (const address of seenAddresses) {
if (addressesEqual(socketAddress, address)) {
return false;
}
}
seenAddresses.push(socketAddress);
}
priorityTotalWeights.set(endpoint.priority, (priorityTotalWeights.get(endpoint.priority) ?? 0) + (endpoint.load_balancing_weight?.value ?? 0));
}
for (const totalWeight of priorityTotalWeights.values()) {
if (totalWeight >= 1<<32) {
return false;
}
}
for (const priority of priorityTotalWeights.keys()) {
if (priority > 0 && !priorityTotalWeights.has(priority - 1)) {
return false;
}
}
return true;

View File

@ -89,6 +89,9 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
}
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
if (route.route.weighted_clusters!.total_weight?.value === 0) {
return false;
}
let weightSum = 0;
for (const clusterWeight of route.route.weighted_clusters!.clusters) {
weightSum += clusterWeight.weight?.value ?? 0;

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.7.0",
"version": "1.7.1",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -68,6 +68,28 @@ function getNewCallNumber(): number {
return callNumber;
}
const INAPPROPRIATE_CONTROL_PLANE_CODES: Status[] = [
Status.OK,
Status.INVALID_ARGUMENT,
Status.NOT_FOUND,
Status.ALREADY_EXISTS,
Status.FAILED_PRECONDITION,
Status.ABORTED,
Status.OUT_OF_RANGE,
Status.DATA_LOSS
]
function restrictControlPlaneStatusCode(code: Status, details: string): {code: Status, details: string} {
if (INAPPROPRIATE_CONTROL_PLANE_CODES.includes(code)) {
return {
code: Status.INTERNAL,
details: `Invalid status from control plane: ${code} ${Status[code]} ${details}`
}
} else {
return {code, details};
}
}
/**
* An interface that represents a communication channel to a server specified
* by a given address.
@ -320,7 +342,7 @@ export class ChannelImplementation implements Channel {
this.trace('Name resolution failed with calls queued for config selection');
}
if (this.configSelector === null) {
this.currentResolutionError = status;
this.currentResolutionError = {...restrictControlPlaneStatusCode(status.code, status.details), metadata: status.metadata};
}
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
@ -534,10 +556,11 @@ export class ChannelImplementation implements Channel {
},
(error: Error & { code: number }) => {
// We assume the error code isn't 0 (Status.OK)
callStream.cancelWithStatus(
const {code, details} = restrictControlPlaneStatusCode(
typeof error.code === 'number' ? error.code : Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${error.message}`
);
)
callStream.cancelWithStatus(code, details);
}
);
}
@ -549,17 +572,13 @@ export class ChannelImplementation implements Channel {
if (callMetadata.getOptions().waitForReady) {
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
} else {
callStream.cancelWithStatus(
pickResult.status!.code,
pickResult.status!.details
);
const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
callStream.cancelWithStatus(code, details);
}
break;
case PickResultType.DROP:
callStream.cancelWithStatus(
pickResult.status!.code,
pickResult.status!.details
);
const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
callStream.cancelWithStatus(code, details);
break;
default:
throw new Error(
@ -668,10 +687,8 @@ export class ChannelImplementation implements Channel {
this.tryPick(stream, metadata, callConfig, []);
}
} else {
stream.cancelWithStatus(
callConfig.status,
'Failed to route call to method ' + stream.getMethod()
);
const {code, details} = restrictControlPlaneStatusCode(callConfig.status, 'Failed to route call to method ' + stream.getMethod());
stream.cancelWithStatus(code, details);
}
}
}

View File

@ -108,6 +108,10 @@ export type ClientOptions = Partial<ChannelOptions> & {
callInvocationTransformer?: CallInvocationTransformer;
};
function getErrorStackString(error: Error): string {
return error.stack!.split('\n').slice(1).join('\n');
}
/**
* A generic gRPC client. Primarily useful as a base class for all generated
* clients.
@ -321,7 +325,7 @@ export class Client {
}
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
const callerStackError = new Error();
call.start(callProperties.metadata, {
onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata);
@ -340,6 +344,7 @@ export class Client {
receivedStatus = true;
if (status.code === Status.OK) {
if (responseMessage === null) {
const callerStack = getErrorStackString(callerStackError);
callProperties.callback!(callErrorFromStatus({
code: Status.INTERNAL,
details: 'No message received',
@ -349,6 +354,7 @@ export class Client {
callProperties.callback!(null, responseMessage);
}
} else {
const callerStack = getErrorStackString(callerStackError);
callProperties.callback!(callErrorFromStatus(status, callerStack));
}
emitter.emit('status', status);
@ -447,7 +453,7 @@ export class Client {
}
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
const callerStackError = new Error();
call.start(callProperties.metadata, {
onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata);
@ -466,6 +472,7 @@ export class Client {
receivedStatus = true;
if (status.code === Status.OK) {
if (responseMessage === null) {
const callerStack = getErrorStackString(callerStackError);
callProperties.callback!(callErrorFromStatus({
code: Status.INTERNAL,
details: 'No message received',
@ -475,6 +482,7 @@ export class Client {
callProperties.callback!(null, responseMessage);
}
} else {
const callerStack = getErrorStackString(callerStackError);
callProperties.callback!(callErrorFromStatus(status, callerStack));
}
emitter.emit('status', status);
@ -577,7 +585,7 @@ export class Client {
call.setCredentials(callProperties.callOptions.credentials);
}
let receivedStatus = false;
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
const callerStackError = new Error();
call.start(callProperties.metadata, {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
@ -593,6 +601,7 @@ export class Client {
receivedStatus = true;
stream.push(null);
if (status.code !== Status.OK) {
const callerStack = getErrorStackString(callerStackError);
stream.emit('error', callErrorFromStatus(status, callerStack));
}
stream.emit('status', status);
@ -675,7 +684,7 @@ export class Client {
call.setCredentials(callProperties.callOptions.credentials);
}
let receivedStatus = false;
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
const callerStackError = new Error();
call.start(callProperties.metadata, {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
@ -690,6 +699,7 @@ export class Client {
receivedStatus = true;
stream.push(null);
if (status.code !== Status.OK) {
const callerStack = getErrorStackString(callerStackError);
stream.emit('error', callErrorFromStatus(status, callerStack));
}
stream.emit('status', status);

View File

@ -38,7 +38,7 @@ function trace(text: string): void {
const TYPE_NAME = 'outlier_detection';
const OUTLIER_DETECTION_ENABLED = process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION !== 'false';
const OUTLIER_DETECTION_ENABLED = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
export interface SuccessRateEjectionConfig {
readonly stdev_factor: number;

View File

@ -17,12 +17,22 @@
import * as assert from 'assert';
import * as fs from 'fs';
import * as path from 'path';
import { promisify } from 'util';
import * as protoLoader from '@grpc/proto-loader';
import { CallCredentials } from '../src/call-credentials';
import { ChannelCredentials } from '../src/channel-credentials';
import * as grpc from '../src';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { TestServiceClient, TestServiceHandlers } from './generated/TestService';
import { ProtoGrpcType as TestServiceGrpcType } from './generated/test_service';
import { assert2, mockFunction } from './common';
import { assert2, loadProtoFile, mockFunction } from './common';
import { sendUnaryData, ServerUnaryCall, ServiceError } from '../src';
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile).EchoService as ServiceClientConstructor;
class CallCredentialsMock implements CallCredentials {
child: CallCredentialsMock | null = null;
@ -138,3 +148,65 @@ describe('ChannelCredentials Implementation', () => {
});
});
});
describe('ChannelCredentials usage', () => {
let client: ServiceClient;
let server: grpc.Server;
before(async () => {
const {ca, key, cert} = await pFixtures;
const serverCreds = grpc.ServerCredentials.createSsl(null, [{private_key: key, cert_chain: cert}]);
const channelCreds = ChannelCredentials.createSsl(ca);
const callCreds = CallCredentials.createFromMetadataGenerator((options, cb) => {
const metadata = new grpc.Metadata();
metadata.set('test-key', 'test-value');
cb(null, metadata);
});
const combinedCreds = channelCreds.compose(callCreds);
return new Promise<void>((resolve, reject) => {
server = new grpc.Server();
server.addService(echoService.service, {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
call.sendMetadata(call.metadata);
callback(null, call.request);
},
});
server.bindAsync(
'localhost:0',
serverCreds,
(err, port) => {
if (err) {
reject(err);
return;
}
client = new echoService(
`localhost:${port}`,
combinedCreds,
{'grpc.ssl_target_name_override': 'foo.test.google.fr', 'grpc.default_authority': 'foo.test.google.fr'}
);
server.start();
resolve();
}
);
});
});
after(() => {
server.forceShutdown();
});
it('Should send the metadata from call credentials attached to channel credentials', (done) => {
const call = client.echo(
{ value: 'test value', value2: 3 },
assert2.mustCall((error: ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
})
);
call.on('metadata', assert2.mustCall((metadata: grpc.Metadata) => {
assert.deepStrictEqual(metadata.get('test-key'), ['test-value']);
}));
assert2.afterMustCallsSatisfied(done);
});
});

View File

@ -19,6 +19,7 @@ import * as assert from 'assert';
import * as path from 'path';
import * as grpc from '../src';
import { loadProtoFile } from './common';
import { OutlierDetectionLoadBalancingConfig } from '../src/load-balancer-outlier-detection'
function multiDone(done: Mocha.Done, target: number) {
let count = 0;
@ -67,6 +68,251 @@ const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const EchoService = loadProtoFile(protoFile)
.EchoService as grpc.ServiceClientConstructor;
describe('Outlier detection config validation', () => {
describe('interval', () => {
it('Should reject a negative interval', () => {
const loadBalancingConfig = {
interval: {
seconds: -1,
nanos: 0
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /interval parse error: values out of range for non-negative Duaration/);
});
it('Should reject a large interval', () => {
const loadBalancingConfig = {
interval: {
seconds: 1e12,
nanos: 0
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /interval parse error: values out of range for non-negative Duaration/);
});
it('Should reject a negative interval.nanos', () => {
const loadBalancingConfig = {
interval: {
seconds: 0,
nanos: -1
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /interval parse error: values out of range for non-negative Duaration/);
});
it('Should reject a large interval.nanos', () => {
const loadBalancingConfig = {
interval: {
seconds: 0,
nanos: 1e12
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /interval parse error: values out of range for non-negative Duaration/);
});
});
describe('base_ejection_time', () => {
it('Should reject a negative base_ejection_time', () => {
const loadBalancingConfig = {
base_ejection_time: {
seconds: -1,
nanos: 0
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /base_ejection_time parse error: values out of range for non-negative Duaration/);
});
it('Should reject a large base_ejection_time', () => {
const loadBalancingConfig = {
base_ejection_time: {
seconds: 1e12,
nanos: 0
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /base_ejection_time parse error: values out of range for non-negative Duaration/);
});
it('Should reject a negative base_ejection_time.nanos', () => {
const loadBalancingConfig = {
base_ejection_time: {
seconds: 0,
nanos: -1
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /base_ejection_time parse error: values out of range for non-negative Duaration/);
});
it('Should reject a large base_ejection_time.nanos', () => {
const loadBalancingConfig = {
base_ejection_time: {
seconds: 0,
nanos: 1e12
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /base_ejection_time parse error: values out of range for non-negative Duaration/);
});
});
describe('max_ejection_time', () => {
it('Should reject a negative max_ejection_time', () => {
const loadBalancingConfig = {
max_ejection_time: {
seconds: -1,
nanos: 0
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /max_ejection_time parse error: values out of range for non-negative Duaration/);
});
it('Should reject a large max_ejection_time', () => {
const loadBalancingConfig = {
max_ejection_time: {
seconds: 1e12,
nanos: 0
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /max_ejection_time parse error: values out of range for non-negative Duaration/);
});
it('Should reject a negative max_ejection_time.nanos', () => {
const loadBalancingConfig = {
max_ejection_time: {
seconds: 0,
nanos: -1
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /max_ejection_time parse error: values out of range for non-negative Duaration/);
});
it('Should reject a large max_ejection_time.nanos', () => {
const loadBalancingConfig = {
max_ejection_time: {
seconds: 0,
nanos: 1e12
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /max_ejection_time parse error: values out of range for non-negative Duaration/);
});
});
describe('max_ejection_percent', () => {
it('Should reject a value above 100', () => {
const loadBalancingConfig = {
max_ejection_percent: 101,
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /max_ejection_percent parse error: value out of range for percentage/);
});
it('Should reject a negative value', () => {
const loadBalancingConfig = {
max_ejection_percent: -1,
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /max_ejection_percent parse error: value out of range for percentage/);
});
});
describe('success_rate_ejection.enforcement_percentage', () => {
it('Should reject a value above 100', () => {
const loadBalancingConfig = {
success_rate_ejection: {
enforcement_percentage: 101
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /success_rate_ejection\.enforcement_percentage parse error: value out of range for percentage/);
});
it('Should reject a negative value', () => {
const loadBalancingConfig = {
success_rate_ejection: {
enforcement_percentage: -1
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /success_rate_ejection\.enforcement_percentage parse error: value out of range for percentage/);
});
});
describe('failure_percentage_ejection.threshold', () => {
it('Should reject a value above 100', () => {
const loadBalancingConfig = {
failure_percentage_ejection: {
threshold: 101
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /failure_percentage_ejection\.threshold parse error: value out of range for percentage/);
});
it('Should reject a negative value', () => {
const loadBalancingConfig = {
failure_percentage_ejection: {
threshold: -1
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /failure_percentage_ejection\.threshold parse error: value out of range for percentage/);
});
});
describe('failure_percentage_ejection.enforcement_percentage', () => {
it('Should reject a value above 100', () => {
const loadBalancingConfig = {
failure_percentage_ejection: {
enforcement_percentage: 101
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /failure_percentage_ejection\.enforcement_percentage parse error: value out of range for percentage/);
});
it('Should reject a negative value', () => {
const loadBalancingConfig = {
failure_percentage_ejection: {
enforcement_percentage: -1
},
child_policy: [{round_robin: {}}]
};
assert.throws(() => {
OutlierDetectionLoadBalancingConfig.createFromJson(loadBalancingConfig);
}, /failure_percentage_ejection\.enforcement_percentage parse error: value out of range for percentage/);
});
});
});
describe('Outlier detection', () => {
const GOOD_PORTS = 4;
let goodServer: grpc.Server;