Merge pull request #1789 from murgatroid99/grpc-js-xds_timeout_support

grpc-js-xds: Propagate timeouts from xDS responses to method config
This commit is contained in:
Michael Lumish 2021-05-20 10:23:09 -07:00 committed by GitHub
commit 47af80643d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 221 additions and 19 deletions

View File

@ -197,29 +197,44 @@ let anyCallSucceeded = false;
const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
stats_per_method: {
EmptyCall: {
EMPTY_CALL: {
rpcs_started: 0,
result: {}
},
UnaryCall: {
UNARY_CALL: {
rpcs_started: 0,
result: {}
}
}
};
const callTimeHistogram: {[callType: string]: {[status: number]: number[]}} = {
UnaryCall: {},
EmptyCall: {}
}
function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
const callTypeStats = accumulatedStats.stats_per_method![type];
const callTypeStats = accumulatedStats.stats_per_method![callTypeEnumMapReverse[type]];
callTypeStats.rpcs_started! += 1;
const notifier = callStatsTracker.startCall();
let gotMetadata: boolean = false;
let hostname: string | null = null;
let completed: boolean = false;
let completedWithError: boolean = false;
const startTime = process.hrtime();
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + currentConfig.timeoutSec);
const callback = (error: grpc.ServiceError | undefined, value: Empty__Output | undefined) => {
const statusCode = error?.code ?? grpc.status.OK;
const duration = process.hrtime(startTime);
if (!callTimeHistogram[type][statusCode]) {
callTimeHistogram[type][statusCode] = [];
}
if (callTimeHistogram[type][statusCode][duration[0]]) {
callTimeHistogram[type][statusCode][duration[0]] += 1;
} else {
callTimeHistogram[type][statusCode][duration[0]] = 1;
}
callTypeStats.result![statusCode] = (callTypeStats.result![statusCode] ?? 0) + 1;
if (error) {
if (failOnFailedRpcs && anyCallSucceeded) {
@ -269,18 +284,27 @@ const callTypeEnumMap = {
'UNARY_CALL': 'UnaryCall' as CallType
};
const callTypeEnumMapReverse = {
'EmptyCall': 'EMPTY_CALL',
'UnaryCall': 'UNARY_CALL'
}
const DEFAULT_TIMEOUT_SEC = 20;
function main() {
const argv = yargs
.string(['fail_on_failed_rpcs', 'server', 'stats_port', 'rpc', 'metadata'])
.number(['num_channels', 'qps'])
.number(['num_channels', 'qps', 'rpc_timeout_sec'])
.demandOption(['server', 'stats_port'])
.default('num_channels', 1)
.default('qps', 1)
.default('rpc', 'UnaryCall')
.default('metadata', '')
.default('rpc_timeout_sec', DEFAULT_TIMEOUT_SEC)
.argv;
console.log('Starting xDS interop client. Args: ', argv);
currentConfig.callTypes = argv.rpc.split(',').filter(value => value === 'EmptyCall' || value === 'UnaryCall') as CallType[];
currentConfig.timeoutSec = argv.rpc_timeout_sec;
for (const item of argv.metadata.split(',')) {
const [method, key, value] = item.split(':');
if (value === undefined) {
@ -316,23 +340,30 @@ function main() {
});
},
GetClientAccumulatedStats: (call, callback) => {
console.log(`Sending accumulated stats response: ${JSON.stringify(accumulatedStats)}`);
console.log(`Call durations: ${JSON.stringify(callTimeHistogram, undefined, 2)}`);
callback(null, accumulatedStats);
}
}
const xdsUpdateClientConfigureServiceImpl: XdsUpdateClientConfigureServiceHandlers = {
Configure: (call, callback) => {
console.log('Received new client configuration: ' + JSON.stringify(call.request, undefined, 2));
const callMetadata = {
EmptyCall: new grpc.Metadata(),
UnaryCall: new grpc.Metadata()
}
};
for (const metadataItem of call.request.metadata) {
callMetadata[callTypeEnumMap[metadataItem.type]].add(metadataItem.key, metadataItem.value);
}
currentConfig.callTypes = call.request.types.map(value => callTypeEnumMap[value]);
currentConfig.metadata = callMetadata;
currentConfig.timeoutSec = call.request.timeout_sec
console.log('Received new client configuration: ' + JSON.stringify(currentConfig, undefined, 2));
if (call.request.timeout_sec > 0) {
currentConfig.timeoutSec = call.request.timeout_sec;
} else {
currentConfig.timeoutSec = DEFAULT_TIMEOUT_SEC;
}
console.log('Updated to new client configuration: ' + JSON.stringify(currentConfig, undefined, 2));
callback(null, {});
}
}

View File

@ -52,9 +52,9 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh
GRPC_NODE_VERBOSITY=DEBUG \
NODE_XDS_INTEROP_VERBOSITY=1 \
python3 grpc/tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--test_case="all,timeout" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server-2 \
--source_image=projects/grpc-testing/global/images/xds-test-server-4 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
--gcp_suffix=$(date '+%s') \
--verbose \

View File

@ -40,6 +40,8 @@ import { XdsClusterManagerLoadBalancingConfig } from './load-balancer-xds-cluste
import { ExactValueMatcher, Fraction, FullMatcher, HeaderMatcher, Matcher, PathExactValueMatcher, PathPrefixValueMatcher, PathSafeRegexValueMatcher, PrefixValueMatcher, PresentValueMatcher, RangeValueMatcher, RejectValueMatcher, SafeRegexValueMatcher, SuffixValueMatcher, ValueMatcher } from './matcher';
import { RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action';
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from './resources';
import Duration = experimental.Duration;
import { Duration__Output } from './generated/google/protobuf/Duration';
const TRACER_NAME = 'xds_resolver';
@ -187,6 +189,20 @@ function getPredicateForMatcher(routeMatch: RouteMatch__Output): Matcher {
return new FullMatcher(pathMatcher, headerMatchers, runtimeFraction);
}
/**
* Convert a Duration protobuf message object to a Duration object as used in
* the ServiceConfig definition. The difference is that the protobuf message
* defines seconds as a long, which is represented as a string in JavaScript,
* and the one used in the service config defines it as a number.
* @param duration
*/
function protoDurationToDuration(duration: Duration__Output): Duration {
return {
seconds: Number.parseInt(duration.seconds),
nanos: duration.nanos
}
}
class XdsResolver implements Resolver {
private hasReportedSuccess = false;
@ -203,6 +219,8 @@ class XdsResolver implements Resolver {
private clusterRefcounts = new Map<string, {inLastConfig: boolean, refCount: number}>();
private latestDefaultTimeout: Duration | undefined = undefined;
constructor(
private target: GrpcUri,
private listener: ResolverListener,
@ -211,6 +229,12 @@ class XdsResolver implements Resolver {
this.ldsWatcher = {
onValidUpdate: (update: Listener__Output) => {
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, update.api_listener!.api_listener!.value);
const defaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout;
if (defaultTimeout === undefined) {
this.latestDefaultTimeout = undefined;
} else {
this.latestDefaultTimeout = protoDurationToDuration(defaultTimeout);
}
switch (httpConnectionManager.route_specifier) {
case 'rds': {
const routeConfigName = httpConnectionManager.rds!.route_config_name;
@ -295,13 +319,28 @@ class XdsResolver implements Resolver {
const matchList: {matcher: Matcher, action: RouteAction}[] = [];
for (const route of virtualHost.routes) {
let routeAction: RouteAction;
let timeout: Duration | undefined;
/* For field prioritization see
* https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#supported-fields
*/
if (route.route?.max_stream_duration?.grpc_timeout_header_max) {
timeout = protoDurationToDuration(route.route.max_stream_duration.grpc_timeout_header_max);
} else if (route.route?.max_stream_duration?.max_stream_duration) {
timeout = protoDurationToDuration(route.route.max_stream_duration.max_stream_duration);
} else {
timeout = this.latestDefaultTimeout;
}
// "A value of 0 indicates the application's deadline is used without modification."
if (timeout?.seconds === 0 && timeout.nanos === 0) {
timeout = undefined;
}
switch (route.route!.cluster_specifier) {
case 'cluster_header':
continue;
case 'cluster':{
const cluster = route.route!.cluster!;
allConfigClusters.add(cluster);
routeAction = new SingleClusterRouteAction(cluster);
routeAction = new SingleClusterRouteAction(cluster, timeout);
break;
}
case 'weighted_clusters': {
@ -310,7 +349,7 @@ class XdsResolver implements Resolver {
allConfigClusters.add(clusterWeight.name);
weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0});
}
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100);
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, timeout);
}
}
const routeMatcher = getPredicateForMatcher(route.match!);
@ -343,7 +382,7 @@ class XdsResolver implements Resolver {
this.unrefCluster(clusterName);
}
return {
methodConfig: {name: []},
methodConfig: {name: [], timeout: action.getTimeout()},
onCommitted: onCommitted,
pickInformation: {cluster: clusterName},
status: status.OK

View File

@ -14,20 +14,41 @@
* limitations under the License.
*/
import { experimental } from '@grpc/grpc-js';
import Duration = experimental.Duration;
export interface RouteAction {
toString(): string;
getCluster(): string;
getTimeout(): Duration | undefined;
}
function durationToLogString(duration: Duration) {
const millis = Math.floor(duration.nanos / 1_000_000);
if (millis > 0) {
return duration.seconds + '.' + millis;
} else {
return '' + duration.seconds;
}
}
export class SingleClusterRouteAction implements RouteAction {
constructor(private cluster: string) {}
constructor(private cluster: string, private timeout: Duration | undefined) {}
getCluster() {
return this.cluster;
}
toString() {
return 'SingleCluster(' + this.cluster + ')';
if (this.timeout) {
return 'SingleCluster(' + this.cluster + ', ' + 'timeout=' + durationToLogString(this.timeout) + 's)';
} else {
return 'SingleCluster(' + this.cluster + ')';
}
}
getTimeout() {
return this.timeout;
}
}
@ -46,7 +67,7 @@ export class WeightedClusterRouteAction implements RouteAction {
* The weighted cluster choices represented as a CDF
*/
private clusterChoices: ClusterChoice[];
constructor(private clusters: WeightedCluster[], private totalWeight: number) {
constructor(private clusters: WeightedCluster[], private totalWeight: number, private timeout: Duration | undefined) {
this.clusterChoices = [];
let lastNumerator = 0;
for (const clusterWeight of clusters) {
@ -67,6 +88,15 @@ export class WeightedClusterRouteAction implements RouteAction {
}
toString() {
return 'WeightedCluster(' + this.clusters.map(({name, weight}) => '(' + name + ':' + weight + ')').join(', ') + ')';
const clusterListString = this.clusters.map(({name, weight}) => '(' + name + ':' + weight + ')').join(', ')
if (this.timeout) {
return 'WeightedCluster(' + clusterListString + ', ' + 'timeout=' + durationToLogString(this.timeout) + 's)';
} else {
return 'WeightedCluster(' + clusterListString + ')';
}
}
getTimeout() {
return this.timeout;
}
}

View File

@ -1,7 +1,7 @@
export { trace } from './logging';
export { Resolver, ResolverListener, registerResolver, ConfigSelector } from './resolver';
export { GrpcUri, uriToString } from './uri-parser';
export { ServiceConfig } from './service-config';
export { ServiceConfig, Duration } from './service-config';
export { BackoffTimeout } from './backoff-timeout';
export { LoadBalancer, LoadBalancingConfig, ChannelControlHelper, registerLoadBalancerType, getFirstUsableConfig, validateLoadBalancingConfig } from './load-balancer';
export { SubchannelAddress, subchannelAddressToString } from './subchannel';

View File

@ -242,6 +242,18 @@ export class Metadata {
return this.internalRepr;
}
/**
* This modifies the behavior of JSON.stringify to show an object
* representation of the metadata map.
*/
toJSON() {
const result: {[key: string]: MetadataValue[]} = {};
for (const [key, values] of this.internalRepr.entries()) {
result[key] = values;
}
return result;
}
/**
* Returns a new Metadata object based fields in a given IncomingHttpHeaders
* object.

View File

@ -0,0 +1,90 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as assert from 'assert';
import * as grpc from '../src';
import { experimental } from '../src';
import { ServerCredentials } from '../src';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { loadProtoFile } from './common';
import ServiceConfig = experimental.ServiceConfig;
const clientInsecureCreds = grpc.credentials.createInsecure();
const serverInsecureCreds = ServerCredentials.createInsecure();
const TIMEOUT_SERVICE_CONFIG: ServiceConfig = {
loadBalancingConfig: [],
methodConfig: [{
name: [
{service: 'TestService'}
],
timeout: {
seconds: 1,
nanos: 0
}
}]
};
describe('Client with configured timeout', () => {
let server: grpc.Server;
let Client: ServiceClientConstructor;
let client: ServiceClient;
before(done => {
Client = loadProtoFile(__dirname + '/fixtures/test_service.proto').TestService as ServiceClientConstructor;
server = new grpc.Server();
server.addService(Client.service, {
unary: () => {},
clientStream: () => {},
serverStream: () => {},
bidiStream: () => {}
});
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
if (error) {
done(error);
return;
}
server.start();
client = new Client(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(TIMEOUT_SERVICE_CONFIG)});
done();
});
});
after(done => {
client.close();
server.tryShutdown(done);
});
it('Should end calls without explicit deadline with DEADLINE_EXCEEDED', done => {
client.unary({}, (error: grpc.ServiceError, value: unknown) =>{
assert(error);
assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
});
it('Should end calls with a long explicit deadline with DEADLINE_EXCEEDED', done => {
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 20);
client.unary({}, (error: grpc.ServiceError, value: unknown) =>{
assert(error);
assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED);
done();
});
});
});

View File

@ -16,7 +16,7 @@
# Location of the continuous shell script in repository.
build_file: "grpc-node/packages/grpc-js-xds/scripts/xds.sh"
timeout_mins: 120
timeout_mins: 180
action {
define_artifacts {
regex: "github/grpc/reports/**"

View File

@ -16,7 +16,7 @@
# Location of the continuous shell script in repository.
build_file: "grpc-node/packages/grpc-js-xds/scripts/xds-v3.sh"
timeout_mins: 120
timeout_mins: 180
action {
define_artifacts {
regex: "github/grpc/reports/**"