From 43d42dcf3f2409c53a623b46ab72c89af353e4e8 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 11 Apr 2023 14:24:25 -0700 Subject: [PATCH 01/10] grpc-js: Fix connectivity state change event sequencing --- .../grpc-js/src/load-balancer-pick-first.ts | 2 +- packages/grpc-js/src/subchannel.ts | 65 ++++----- .../test/test-global-subchannel-pool.ts | 130 ++++++++++++++++++ 3 files changed, 165 insertions(+), 32 deletions(-) create mode 100644 packages/grpc-js/test/test-global-subchannel-pool.ts diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index a501b1f7..41d21a2e 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -322,12 +322,12 @@ export class PickFirstLoadBalancer implements LoadBalancer { ); } this.currentPick = subchannel; - this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel)); subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener); subchannel.ref(); this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); this.resetSubchannelList(); clearTimeout(this.connectionDelayTimeout); + this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel)); } private updateState(newState: ConnectivityState, picker: Picker) { diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index c93e0c45..de420cc9 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -60,7 +60,7 @@ export class Subchannel { * state changes. Will be modified by `addConnectivityStateListener` and * `removeConnectivityStateListener` */ - private stateListeners: ConnectivityStateListener[] = []; + private stateListeners: Set = new Set(); private backoffTimeout: BackoffTimeout; @@ -227,6 +227,8 @@ export class Subchannel { } const previousState = this.connectivityState; this.connectivityState = newState; + process.nextTick(() => { + }); switch (newState) { case ConnectivityState.READY: this.stopBackoff(); @@ -261,9 +263,7 @@ export class Subchannel { default: throw new Error(`Invalid state: unknown ConnectivityState ${newState}`); } - /* We use a shallow copy of the stateListeners array in case a listener - * is removed during this iteration */ - for (const listener of [...this.stateListeners]) { + for (const listener of this.stateListeners) { listener(this, previousState, newState, this.keepaliveTime); } return true; @@ -291,13 +291,15 @@ export class Subchannel { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); } - this.transitionToState( - [ConnectivityState.CONNECTING, ConnectivityState.READY], - ConnectivityState.IDLE - ); if (this.channelzEnabled) { unregisterChannelzRef(this.channelzRef); } + process.nextTick(() => { + this.transitionToState( + [ConnectivityState.CONNECTING, ConnectivityState.READY], + ConnectivityState.IDLE + ); + }); } } @@ -339,20 +341,22 @@ export class Subchannel { * Otherwise, do nothing. */ startConnecting() { - /* First, try to transition from IDLE to connecting. If that doesn't happen - * because the state is not currently IDLE, check if it is - * TRANSIENT_FAILURE, and if so indicate that it should go back to - * connecting after the backoff timer ends. Otherwise do nothing */ - if ( - !this.transitionToState( - [ConnectivityState.IDLE], - ConnectivityState.CONNECTING - ) - ) { - if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) { - this.continueConnecting = true; + process.nextTick(() => { + /* First, try to transition from IDLE to connecting. If that doesn't happen + * because the state is not currently IDLE, check if it is + * TRANSIENT_FAILURE, and if so indicate that it should go back to + * connecting after the backoff timer ends. Otherwise do nothing */ + if ( + !this.transitionToState( + [ConnectivityState.IDLE], + ConnectivityState.CONNECTING + ) + ) { + if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) { + this.continueConnecting = true; + } } - } + }); } /** @@ -368,7 +372,7 @@ export class Subchannel { * @param listener */ addConnectivityStateListener(listener: ConnectivityStateListener) { - this.stateListeners.push(listener); + this.stateListeners.add(listener); } /** @@ -377,21 +381,20 @@ export class Subchannel { * `addConnectivityStateListener` */ removeConnectivityStateListener(listener: ConnectivityStateListener) { - const listenerIndex = this.stateListeners.indexOf(listener); - if (listenerIndex > -1) { - this.stateListeners.splice(listenerIndex, 1); - } + this.stateListeners.delete(listener); } /** * Reset the backoff timeout, and immediately start connecting if in backoff. */ resetBackoff() { - this.backoffTimeout.reset(); - this.transitionToState( - [ConnectivityState.TRANSIENT_FAILURE], - ConnectivityState.CONNECTING - ); + process.nextTick(() => { + this.backoffTimeout.reset(); + this.transitionToState( + [ConnectivityState.TRANSIENT_FAILURE], + ConnectivityState.CONNECTING + ); + }); } getAddress(): string { diff --git a/packages/grpc-js/test/test-global-subchannel-pool.ts b/packages/grpc-js/test/test-global-subchannel-pool.ts new file mode 100644 index 00000000..c1912568 --- /dev/null +++ b/packages/grpc-js/test/test-global-subchannel-pool.ts @@ -0,0 +1,130 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; +import * as path from 'path'; + +import * as grpc from '../src'; +import {sendUnaryData, Server, ServerCredentials, ServerUnaryCall, ServiceClientConstructor, ServiceError} from '../src'; + +import {loadProtoFile} from './common'; + +const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); +const echoService = + loadProtoFile(protoFile).EchoService as ServiceClientConstructor; + +describe.only('Global subchannel pool', () => { + let server: Server; + let serverPort: number; + + let client1: InstanceType; + let client2: InstanceType; + + let promises: Promise[]; + + before(done => { + server = new Server(); + server.addService(echoService.service, { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + }, + }); + + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), (err, port) => { + assert.ifError(err); + serverPort = port; + server.start(); + done(); + }); + }); + + beforeEach(() => { + promises = []; + }) + + after(done => { + server.tryShutdown(done); + }); + + function callService(client: InstanceType) { + return new Promise((resolve) => { + const request = {value: 'test value', value2: 3}; + + client.echo(request, (error: ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, request); + resolve(); + }); + }) + } + + function connect() { + const grpcOptions = { + 'grpc.use_local_subchannel_pool': 0, + } + + client1 = new echoService( + `127.0.0.1:${serverPort}`, grpc.credentials.createInsecure(), + grpcOptions); + + client2 = new echoService( + `127.0.0.1:${serverPort}`, grpc.credentials.createInsecure(), + grpcOptions); + } + + /* This is a regression test for a bug where client1.close in the + * waitForReady callback would cause the subchannel to transition to IDLE + * even though client2 is also using it. */ + it('Should handle client.close calls in waitForReady', + done => { + connect(); + + promises.push(new Promise((resolve) => { + client1.waitForReady(Date.now() + 50, (error) => { + assert.ifError(error); + client1.close(); + resolve(); + }); + })) + + promises.push(new Promise((resolve) => { + client2.waitForReady(Date.now() + 50, (error) => { + assert.ifError(error); + resolve(); + }); + })) + + Promise.all(promises).then(() => {done()}); + }) + + it('Call the service', done => { + promises.push(callService(client2)); + + Promise.all(promises).then(() => { + done(); + }); + }) + + it('Should complete the client lifecycle without error', done => { + setTimeout(() => { + client1.close(); + client2.close(); + done() + }, 500); + }); +}); From 6bc85716cd65bea8e532341efaf1e0331ec9328c Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 12 Apr 2023 14:46:27 -0700 Subject: [PATCH 02/10] grpc-js: Bump version to 1.8.14 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 815dabeb..dd341ef0 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.13", + "version": "1.8.14", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", From 37099980127e05a83c9f3d609879c0af6ca4e66b Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 13 Apr 2023 09:25:38 -0700 Subject: [PATCH 03/10] grpc-js: Fix a couple of errors from a previous PR --- packages/grpc-js/src/subchannel.ts | 2 -- packages/grpc-js/test/test-global-subchannel-pool.ts | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index de420cc9..307f6b81 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -227,8 +227,6 @@ export class Subchannel { } const previousState = this.connectivityState; this.connectivityState = newState; - process.nextTick(() => { - }); switch (newState) { case ConnectivityState.READY: this.stopBackoff(); diff --git a/packages/grpc-js/test/test-global-subchannel-pool.ts b/packages/grpc-js/test/test-global-subchannel-pool.ts index c1912568..999a11bf 100644 --- a/packages/grpc-js/test/test-global-subchannel-pool.ts +++ b/packages/grpc-js/test/test-global-subchannel-pool.ts @@ -27,7 +27,7 @@ const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); const echoService = loadProtoFile(protoFile).EchoService as ServiceClientConstructor; -describe.only('Global subchannel pool', () => { +describe('Global subchannel pool', () => { let server: Server; let serverPort: number; From 2cb6ef86d4debde64e440d53ddb0f5ae10f228c7 Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Fri, 7 Apr 2023 14:45:21 -0700 Subject: [PATCH 04/10] PSM Interop: experiment with qps affect on circuit_breaking ref b/232859415 --- packages/grpc-js-xds/scripts/xds.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/grpc-js-xds/scripts/xds.sh b/packages/grpc-js-xds/scripts/xds.sh index d4490c5e..af22f584 100755 --- a/packages/grpc-js-xds/scripts/xds.sh +++ b/packages/grpc-js-xds/scripts/xds.sh @@ -60,6 +60,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ --gcp_suffix=$(date '+%s') \ --verbose \ + --qps=50 \ ${XDS_V3_OPT-} \ --client_cmd="$(which node) --enable-source-maps --prof --logfile=${KOKORO_ARTIFACTS_DIR}/github/grpc/reports/prof.log grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \ --server=xds:///{server_uri} \ From 856559cce1d27771d50dc97d130f5dc423dd4723 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 20 Apr 2023 14:34:06 -0700 Subject: [PATCH 05/10] grpc-js-xds: Fix handling of resource validation errors --- .../src/xds-stream-state/xds-stream-state.ts | 46 +++--- packages/grpc-js-xds/test/test-nack.ts | 156 ++++++++++++++++++ 2 files changed, 182 insertions(+), 20 deletions(-) create mode 100644 packages/grpc-js-xds/test/test-nack.ts diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index e20bc7e9..b2d7b227 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -15,7 +15,7 @@ * */ -import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { experimental, logVerbosity, Metadata, status, StatusObject } from "@grpc/grpc-js"; import { Any__Output } from "../generated/google/protobuf/Any"; const TRACER_NAME = 'xds_client'; @@ -157,19 +157,32 @@ export abstract class BaseXdsStreamState implements XdsStreamState return Array.from(this.subscriptions.keys()); } handleResponses(responses: ResourcePair[]): HandleResponseResult { - const validResponses: ResponseType[] = []; let result: HandleResponseResult = { accepted: [], rejected: [], missing: [] } + const allResourceNames = new Set(); for (const {resource, raw} of responses) { const resourceName = this.getResourceName(resource); + allResourceNames.add(resourceName); + const subscriptionEntry = this.subscriptions.get(resourceName); if (this.validateResponse(resource)) { - validResponses.push(resource); result.accepted.push({ name: resourceName, raw: raw}); + if (subscriptionEntry) { + const watchers = subscriptionEntry.watchers; + for (const watcher of watchers) { + watcher.onValidUpdate(resource); + } + clearTimeout(subscriptionEntry.resourceTimer); + subscriptionEntry.cachedResponse = resource; + if (subscriptionEntry.deletionIgnored) { + experimental.log(logVerbosity.INFO, 'Received resource with previously ignored deletion: ' + resourceName); + subscriptionEntry.deletionIgnored = false; + } + } } else { this.trace('Validation failed for message ' + JSON.stringify(resource)); result.rejected.push({ @@ -177,23 +190,16 @@ export abstract class BaseXdsStreamState implements XdsStreamState raw: raw, error: `Validation failed for resource ${resourceName}` }); - } - } - const allResourceNames = new Set(); - for (const resource of validResponses) { - const resourceName = this.getResourceName(resource); - allResourceNames.add(resourceName); - const subscriptionEntry = this.subscriptions.get(resourceName); - if (subscriptionEntry) { - const watchers = subscriptionEntry.watchers; - for (const watcher of watchers) { - watcher.onValidUpdate(resource); - } - clearTimeout(subscriptionEntry.resourceTimer); - subscriptionEntry.cachedResponse = resource; - if (subscriptionEntry.deletionIgnored) { - experimental.log(logVerbosity.INFO, 'Received resource with previously ignored deletion: ' + resourceName); - subscriptionEntry.deletionIgnored = false; + if (subscriptionEntry) { + const watchers = subscriptionEntry.watchers; + for (const watcher of watchers) { + watcher.onTransientError({ + code: status.UNAVAILABLE, + details: `Validation failed for resource ${resourceName}`, + metadata: new Metadata() + }); + } + clearTimeout(subscriptionEntry.resourceTimer); } } } diff --git a/packages/grpc-js-xds/test/test-nack.ts b/packages/grpc-js-xds/test/test-nack.ts new file mode 100644 index 00000000..ad1aad44 --- /dev/null +++ b/packages/grpc-js-xds/test/test-nack.ts @@ -0,0 +1,156 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; +import { register } from "../src"; +import { Backend } from "./backend"; +import { XdsTestClient } from "./client"; +import { FakeCluster, FakeRouteGroup } from "./framework"; +import { XdsServer } from "./xds-server"; + +register(); + +describe('Validation errors', () => { + let xdsServer: XdsServer; + let client: XdsTestClient; + beforeEach(done => { + xdsServer = new XdsServer(); + xdsServer.startServer(error => { + done(error); + }); + }); + afterEach(() => { + client?.close(); + xdsServer?.shutdownServer(); + }); + it('Should continue to use a valid resource after receiving an invalid EDS update', done => { + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + client = new XdsTestClient('route1', xdsServer); + client.startCalls(100); + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + // After backends receive calls, set invalid EDS resource + xdsServer.setEdsResource({cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]}); + let seenNack = false; + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + if (seenNack) { + return; + } + seenNack = true; + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + client.stopCalls(); + done(); + }); + } + }); + }, reason => done(reason)); + }, reason => done(reason)); + }); + it('Should continue to use a valid resource after receiving an invalid CDS update', done => { + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + client = new XdsTestClient('route1', xdsServer); + client.startCalls(100); + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + // After backends receive calls, set invalid CDS resource + xdsServer.setCdsResource({name: cluster.getClusterConfig().name}); + let seenNack = false; + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + if (seenNack) { + return; + } + seenNack = true; + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + client.stopCalls(); + done(); + }); + } + }); + }, reason => done(reason)); + }, reason => done(reason)); + }); + it('Should continue to use a valid resource after receiving an invalid RDS update', done => { + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + client = new XdsTestClient('route1', xdsServer); + client.startCalls(100); + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + // After backends receive calls, set invalid RDS resource + xdsServer.setRdsResource({name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]}); + let seenNack = false; + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + if (seenNack) { + return; + } + seenNack = true; + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + client.stopCalls(); + done(); + }); + } + }); + }, reason => done(reason)); + }, reason => done(reason)); + }); + it('Should continue to use a valid resource after receiving an invalid LDS update', done => { + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + client = new XdsTestClient('route1', xdsServer); + client.startCalls(100); + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + // After backends receive calls, set invalid LDS resource + xdsServer.setLdsResource({name: routeGroup.getListener().name}); + let seenNack = false; + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + if (seenNack) { + return; + } + seenNack = true; + routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { + client.stopCalls(); + done(); + }); + } + }); + }, reason => done(reason)); + }, reason => done(reason)); + }); +}); \ No newline at end of file From 48ef1ed202e19c36e934015b6b1ed039cf7d2f00 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 20 Apr 2023 14:35:39 -0700 Subject: [PATCH 06/10] grpc-js-xds: Bump version to 1.8.2 --- packages/grpc-js-xds/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index 9511776c..7c735b65 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js-xds", - "version": "1.8.1", + "version": "1.8.2", "description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.", "main": "build/src/index.js", "scripts": { From dfccd687f0bc1df7d8d7dd599249b098a772cf53 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 24 Apr 2023 16:21:12 -0700 Subject: [PATCH 07/10] Address review comments --- .../src/xds-stream-state/xds-stream-state.ts | 8 +++----- packages/grpc-js-xds/test/test-nack.ts | 20 +++++++++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index b2d7b227..b04adb79 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -172,14 +172,13 @@ export abstract class BaseXdsStreamState implements XdsStreamState name: resourceName, raw: raw}); if (subscriptionEntry) { - const watchers = subscriptionEntry.watchers; - for (const watcher of watchers) { + for (const watcher of subscriptionEntry.watchers) { watcher.onValidUpdate(resource); } clearTimeout(subscriptionEntry.resourceTimer); subscriptionEntry.cachedResponse = resource; if (subscriptionEntry.deletionIgnored) { - experimental.log(logVerbosity.INFO, 'Received resource with previously ignored deletion: ' + resourceName); + experimental.log(logVerbosity.INFO, `Received resource with previously ignored deletion: ${resourceName}`); subscriptionEntry.deletionIgnored = false; } } @@ -191,8 +190,7 @@ export abstract class BaseXdsStreamState implements XdsStreamState error: `Validation failed for resource ${resourceName}` }); if (subscriptionEntry) { - const watchers = subscriptionEntry.watchers; - for (const watcher of watchers) { + for (const watcher of subscriptionEntry.watchers) { watcher.onTransientError({ code: status.UNAVAILABLE, details: `Validation failed for resource ${resourceName}`, diff --git a/packages/grpc-js-xds/test/test-nack.ts b/packages/grpc-js-xds/test/test-nack.ts index ad1aad44..b5bfb773 100644 --- a/packages/grpc-js-xds/test/test-nack.ts +++ b/packages/grpc-js-xds/test/test-nack.ts @@ -38,7 +38,7 @@ describe('Validation errors', () => { xdsServer?.shutdownServer(); }); it('Should continue to use a valid resource after receiving an invalid EDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -49,7 +49,8 @@ describe('Validation errors', () => { client.startCalls(100); routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { // After backends receive calls, set invalid EDS resource - xdsServer.setEdsResource({cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]}); + const invalidEdsResource = {cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]}; + xdsServer.setEdsResource(invalidEdsResource); let seenNack = false; xdsServer.addResponseListener((typeUrl, responseState) => { if (responseState.state === 'NACKED') { @@ -67,7 +68,7 @@ describe('Validation errors', () => { }, reason => done(reason)); }); it('Should continue to use a valid resource after receiving an invalid CDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -78,7 +79,8 @@ describe('Validation errors', () => { client.startCalls(100); routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { // After backends receive calls, set invalid CDS resource - xdsServer.setCdsResource({name: cluster.getClusterConfig().name}); + const invalidCdsResource = {name: cluster.getClusterConfig().name}; + xdsServer.setCdsResource(invalidCdsResource); let seenNack = false; xdsServer.addResponseListener((typeUrl, responseState) => { if (responseState.state === 'NACKED') { @@ -96,7 +98,7 @@ describe('Validation errors', () => { }, reason => done(reason)); }); it('Should continue to use a valid resource after receiving an invalid RDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -107,7 +109,8 @@ describe('Validation errors', () => { client.startCalls(100); routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { // After backends receive calls, set invalid RDS resource - xdsServer.setRdsResource({name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]}); + const invalidRdsResource = {name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]}; + xdsServer.setRdsResource(invalidRdsResource); let seenNack = false; xdsServer.addResponseListener((typeUrl, responseState) => { if (responseState.state === 'NACKED') { @@ -125,7 +128,7 @@ describe('Validation errors', () => { }, reason => done(reason)); }); it('Should continue to use a valid resource after receiving an invalid LDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -136,7 +139,8 @@ describe('Validation errors', () => { client.startCalls(100); routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { // After backends receive calls, set invalid LDS resource - xdsServer.setLdsResource({name: routeGroup.getListener().name}); + const invalidLdsResource = {name: routeGroup.getListener().name}; + xdsServer.setLdsResource(invalidLdsResource); let seenNack = false; xdsServer.addResponseListener((typeUrl, responseState) => { if (responseState.state === 'NACKED') { From edeeda6424d568b80eb4478b63afba05c51904a5 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 24 Apr 2023 16:22:49 -0700 Subject: [PATCH 08/10] Add trailing newline in packages/grpc-js-xds/test/test-nack.ts Co-authored-by: Sergii Tkachenko --- packages/grpc-js-xds/test/test-nack.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/test/test-nack.ts b/packages/grpc-js-xds/test/test-nack.ts index b5bfb773..9395628a 100644 --- a/packages/grpc-js-xds/test/test-nack.ts +++ b/packages/grpc-js-xds/test/test-nack.ts @@ -157,4 +157,4 @@ describe('Validation errors', () => { }, reason => done(reason)); }, reason => done(reason)); }); -}); \ No newline at end of file +}); From 2f869495cc59fa24d56ea270baecfca683a7febf Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 24 Apr 2023 17:05:28 -0700 Subject: [PATCH 09/10] Update tests with master test framework changes --- packages/grpc-js-xds/test/test-nack.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/grpc-js-xds/test/test-nack.ts b/packages/grpc-js-xds/test/test-nack.ts index 9395628a..9e69e0dc 100644 --- a/packages/grpc-js-xds/test/test-nack.ts +++ b/packages/grpc-js-xds/test/test-nack.ts @@ -19,7 +19,7 @@ import * as assert from 'assert'; import { register } from "../src"; import { Backend } from "./backend"; import { XdsTestClient } from "./client"; -import { FakeCluster, FakeRouteGroup } from "./framework"; +import { FakeEdsCluster, FakeRouteGroup } from "./framework"; import { XdsServer } from "./xds-server"; register(); @@ -38,7 +38,7 @@ describe('Validation errors', () => { xdsServer?.shutdownServer(); }); it('Should continue to use a valid resource after receiving an invalid EDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); + const cluster = new FakeEdsCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -68,7 +68,7 @@ describe('Validation errors', () => { }, reason => done(reason)); }); it('Should continue to use a valid resource after receiving an invalid CDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); + const cluster = new FakeEdsCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -98,7 +98,7 @@ describe('Validation errors', () => { }, reason => done(reason)); }); it('Should continue to use a valid resource after receiving an invalid RDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); + const cluster = new FakeEdsCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); @@ -128,7 +128,7 @@ describe('Validation errors', () => { }, reason => done(reason)); }); it('Should continue to use a valid resource after receiving an invalid LDS update', done => { - const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); + const cluster = new FakeEdsCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]); const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]); routeGroup.startAllBackends().then(() => { xdsServer.setEdsResource(cluster.getEndpointConfig()); From 85d227b1d3a8c6cea31db06f64ef21b1316cd595 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 24 Apr 2023 17:27:44 -0700 Subject: [PATCH 10/10] Update test logic to account for recent validation changes --- packages/grpc-js-xds/test/test-nack.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/test/test-nack.ts b/packages/grpc-js-xds/test/test-nack.ts index 9e69e0dc..ae2a9b3e 100644 --- a/packages/grpc-js-xds/test/test-nack.ts +++ b/packages/grpc-js-xds/test/test-nack.ts @@ -17,6 +17,7 @@ import * as assert from 'assert'; import { register } from "../src"; +import { Cluster } from '../src/generated/envoy/config/cluster/v3/Cluster'; import { Backend } from "./backend"; import { XdsTestClient } from "./client"; import { FakeEdsCluster, FakeRouteGroup } from "./framework"; @@ -79,7 +80,7 @@ describe('Validation errors', () => { client.startCalls(100); routeGroup.waitForAllBackendsToReceiveTraffic().then(() => { // After backends receive calls, set invalid CDS resource - const invalidCdsResource = {name: cluster.getClusterConfig().name}; + const invalidCdsResource: Cluster = {name: cluster.getClusterConfig().name, type: 'EDS'}; xdsServer.setCdsResource(invalidCdsResource); let seenNack = false; xdsServer.addResponseListener((typeUrl, responseState) => {