Merge pull request #2897 from murgatroid99/grpc-js-xds_interop_server

grpc-js-xds: Add interop server implementation
This commit is contained in:
Michael Lumish 2025-02-05 13:51:08 -08:00 committed by GitHub
commit 06a05a4200
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 391 additions and 7 deletions

View File

@ -23,7 +23,7 @@ import * as reflection from './packages/grpc-reflection/gulpfile';
import * as protobuf from './packages/proto-loader/gulpfile';
import * as internalTest from './test/gulpfile';
const installAll = gulp.series(protobuf.install, jsCore.install, healthCheck.install, internalTest.install, jsXds.install, reflection.install);
const installAll = gulp.series(protobuf.install, jsCore.install, healthCheck.install, reflection.install, internalTest.install, jsXds.install);
const lint = gulp.parallel(jsCore.lint);

View File

@ -35,6 +35,7 @@
"license": "Apache-2.0",
"devDependencies": {
"@grpc/grpc-js": "file:../grpc-js",
"@types/mocha": "^10.0.10",
"typescript": "^5.2.2"
}
}

View File

@ -14,7 +14,7 @@
# Dockerfile for building the xDS interop client. To build the image, run the
# following command from grpc-node directory:
# docker build -t <TAG> -f packages/grpc-js-xds/interop/Dockerfile .
# docker build -t <TAG> -f packages/grpc-js-xds/interop/test-client.Dockerfile .
FROM node:18-slim as build
@ -26,16 +26,23 @@ WORKDIR /node/src/grpc-node/packages/proto-loader
RUN npm install
WORKDIR /node/src/grpc-node/packages/grpc-js
RUN npm install
WORKDIR /node/src/grpc-node/packages/grpc-health-check
RUN npm install
WORKDIR /node/src/grpc-node/packages/grpc-reflection
RUN npm install
WORKDIR /node/src/grpc-node/packages/grpc-js-xds
RUN npm install
FROM gcr.io/distroless/nodejs18-debian11:latest
WORKDIR /node/src/grpc-node
COPY --from=build /node/src/grpc-node/packages/proto-loader ./packages/proto-loader/
COPY --from=build /node/src/grpc-node/packages/grpc-health-check ./packages/grpc-health-check/
COPY --from=build /node/src/grpc-node/packages/grpc-reflection ./packages/grpc-reflection/
COPY --from=build /node/src/grpc-node/packages/grpc-js ./packages/grpc-js/
COPY --from=build /node/src/grpc-node/packages/grpc-js-xds ./packages/grpc-js-xds/
ENV GRPC_VERBOSITY="DEBUG"
ENV GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager,cds_balancer,xds_cluster_resolver,xds_cluster_impl,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds,outlier_detection,server,server_call,ring_hash
ENV NODE_XDS_INTEROP_VERBOSITY=1
ENTRYPOINT [ "/nodejs/bin/node", "/node/src/grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client" ]

View File

@ -0,0 +1,54 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Dockerfile for building the xDS interop client. To build the image, run the
# following command from grpc-node directory:
# docker build -t <TAG> -f packages/grpc-js-xds/interop/test-server.Dockerfile .
FROM node:18-slim as build
# Make a grpc-node directory and copy the repo into it.
WORKDIR /node/src/grpc-node
COPY . .
WORKDIR /node/src/grpc-node/packages/proto-loader
RUN npm install
WORKDIR /node/src/grpc-node/packages/grpc-js
RUN npm install
WORKDIR /node/src/grpc-node/packages/grpc-health-check
RUN npm install
WORKDIR /node/src/grpc-node/packages/grpc-reflection
RUN npm install
WORKDIR /node/src/grpc-node/packages/grpc-js-xds
RUN npm install
ENV TINI_VERSION v0.19.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini
FROM gcr.io/distroless/nodejs18-debian11:latest
WORKDIR /node/src/grpc-node
COPY --from=build /node/src/grpc-node/packages/proto-loader ./packages/proto-loader/
COPY --from=build /node/src/grpc-node/packages/grpc-health-check ./packages/grpc-health-check/
COPY --from=build /node/src/grpc-node/packages/grpc-reflection ./packages/grpc-reflection/
COPY --from=build /node/src/grpc-node/packages/grpc-js ./packages/grpc-js/
COPY --from=build /node/src/grpc-node/packages/grpc-js-xds ./packages/grpc-js-xds/
ENV GRPC_VERBOSITY="DEBUG"
ENV GRPC_TRACE=xds_client,server,xds_server
# tini serves as PID 1 and enables the server to properly respond to signals.
COPY --from=build /tini /tini
ENTRYPOINT [ "/tini", "-g", "-vv", "--", "/nodejs/bin/node", "/node/src/grpc-node/packages/grpc-js-xds/build/interop/xds-interop-server" ]

View File

@ -0,0 +1,308 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as grpc from '@grpc/grpc-js';
import * as grpc_xds from '../src';
import { ProtoGrpcType } from './generated/test';
import * as protoLoader from '@grpc/proto-loader';
import * as yargs from 'yargs';
import * as os from 'os';
import { HealthImplementation } from 'grpc-health-check';
import { Empty__Output, Empty } from './generated/grpc/testing/Empty';
import { SimpleRequest__Output } from './generated/grpc/testing/SimpleRequest';
import { SimpleResponse } from './generated/grpc/testing/SimpleResponse';
import { ReflectionService } from '@grpc/reflection';
const packageDefinition = protoLoader.loadSync('grpc/testing/test.proto', {
keepCase: true,
defaults: true,
oneofs: true,
json: true,
longs: String,
enums: String,
includeDirs: [__dirname + '/../../proto']
});
const loadedProto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType;
function setAsyncTimeout(delayMs: number): Promise<void> {
return new Promise<void>(resolve => {
setTimeout(() => {
resolve();
}, delayMs);
});
}
const HOSTNAME = os.hostname();
const TEST_SERVICE_NAME = '/grpc.testing.TestService/';
function testInfoInterceptor(methodDescriptor: grpc.ServerMethodDefinition<any, any>, call: grpc.ServerInterceptingCallInterface) {
const listener: grpc.ServerListener = {
onReceiveMetadata: async (metadata, next) => {
let attemptNum = 0;
const attemptNumHeader = metadata.get('grpc-previous-rpc-attempts');
if (attemptNumHeader.length > 0) {
attemptNum = Number(attemptNumHeader[0]);
if (Number.isNaN(attemptNum)) {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: 'Invalid format for grpc-previous-rpc-attempts header: ' + attemptNumHeader[0]
});
return;
}
}
const rpcBehavior = metadata.get('rpc-behavior').filter(v => typeof v === 'string').join(',');
for (const value of rpcBehavior.split(',')) {
let behaviorEntry: string;
if (value.startsWith('hostname=')) {
const splitValue = value.split(' ');
if (splitValue.length > 1) {
if (splitValue[0].substring('hostname='.length) !== HOSTNAME) {
continue;
}
behaviorEntry = splitValue[1];
} else {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: 'Invalid format for rpc-behavior header: ' + value
});
return;
}
} else {
behaviorEntry = value;
}
if (behaviorEntry.startsWith('sleep-')) {
const delaySec = Number(behaviorEntry.substring('sleep-'.length));
if (Number.isNaN(delaySec)) {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: 'Invalid format for rpc-behavior header: ' + value
});
return;
}
await setAsyncTimeout(delaySec * 1000);
}
if (behaviorEntry === 'keep-open') {
return;
}
if (behaviorEntry.startsWith('error-code-')) {
const errorCode = Number(behaviorEntry.substring('error-code-'.length));
if (Number.isNaN(errorCode)) {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: 'Invalid format for rpc-behavior header: ' + value
});
return;
}
call.sendStatus({
code: errorCode,
details: 'RPC failed as directed by rpc-behavior header value ' + value
});
return;
}
if (behaviorEntry.startsWith('succeed-on-retry-attempt-')) {
const targetAttempt = Number(behaviorEntry.substring('succeed-on-retry-attempt-'.length));
if (Number.isNaN(targetAttempt)) {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: 'Invalid format for rpc-behavior header: ' + value
});
return;
}
if (attemptNum === targetAttempt) {
next(metadata);
return;
}
}
}
next(metadata);
}
};
const responder: grpc.Responder = {
start: next => {
next(listener);
},
sendMetadata: (metadata, next) => {
metadata.add('hostname', HOSTNAME);
next(metadata);
}
}
return new grpc.ServerInterceptingCall(call, responder);
};
function adminServiceInterceptor(methodDescriptor: grpc.ServerMethodDefinition<any, any>, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall {
const listener: grpc.ServerListener = {
onReceiveMessage: (message, next) => {
console.log(`Received request to method ${methodDescriptor.path}: ${JSON.stringify(message)}`);
next(message);
}
}
const responder: grpc.Responder = {
start: next => {
next(listener);
}
};
return new grpc.ServerInterceptingCall(call, responder);
}
function unifiedInterceptor(methodDescriptor: grpc.ServerMethodDefinition<any, any>, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall {
if (methodDescriptor.path.startsWith(TEST_SERVICE_NAME)) {
return testInfoInterceptor(methodDescriptor, call);
} else {
return adminServiceInterceptor(methodDescriptor, call);
}
}
const testServiceHandler = {
EmptyCall: (call: grpc.ServerUnaryCall<Empty__Output, Empty>, callback: grpc.sendUnaryData<Empty>) => {
callback(null, {});
},
UnaryCall: (call: grpc.ServerUnaryCall<SimpleRequest__Output, SimpleResponse>, callback: grpc.sendUnaryData<SimpleResponse>) => {
callback(null, {
hostname: HOSTNAME,
payload: {
body: Buffer.from('0'.repeat(call.request.response_size))
}
});
}
};
function serverBindPromise(server: grpc.Server, port: string, credentials: grpc.ServerCredentials): Promise<number> {
return new Promise((resolve, reject) => {
server.bindAsync(port, credentials, (error, port) => {
if (error) {
reject(error);
} else {
resolve(port);
}
})
})
}
function getIPv4Address(): string | null {
for (const [name, addressList] of Object.entries(os.networkInterfaces())) {
if (name === 'lo' || !addressList) {
continue;
}
for (const address of addressList) {
if (address.family === 'IPv4') {
return address.address;
}
}
}
return null;
}
function getIPv6Addresses(): string[] {
const ipv6Addresses: string[] = [];
for (const [name, addressList] of Object.entries(os.networkInterfaces())) {
if (name === 'lo' || !addressList) {
continue;
}
for (const address of addressList) {
if (address.family === 'IPv6') {
ipv6Addresses.push(address.address);
}
}
}
return ipv6Addresses;
}
async function main() {
const argv = yargs
.string(['port', 'maintenance_port', 'address_type'])
.boolean(['secure_mode'])
.demandOption(['port'])
.default('address_type', 'IPV4_IPV6')
.default('secure_mode', false)
.parse()
console.log('Starting xDS interop server. Args: ', argv);
const healthImpl = new HealthImplementation({'': 'NOT_SERVING'});
const xdsUpdateHealthServiceImpl = {
SetServing(call: grpc.ServerUnaryCall<Empty, Empty__Output>, callback: grpc.sendUnaryData<Empty__Output>) {
healthImpl.setStatus('', 'SERVING');
callback(null, {});
},
SetNotServing(call: grpc.ServerUnaryCall<Empty, Empty__Output>, callback: grpc.sendUnaryData<Empty__Output>) {
healthImpl.setStatus('', 'NOT_SERVING');
callback(null, {});
}
}
const reflection = new ReflectionService(packageDefinition, {
services: ['grpc.testing.TestService']
})
const addressType = argv.address_type.toUpperCase();
if (argv.secure_mode) {
if (addressType !== 'IPV4_IPV6') {
throw new Error('Secure mode only supports IPV4_IPV6 address type');
}
const maintenanceServer = new grpc.Server({interceptors: [adminServiceInterceptor]});
maintenanceServer.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl)
healthImpl.addToServer(maintenanceServer);
reflection.addToServer(maintenanceServer);
grpc.addAdminServicesToServer(maintenanceServer);
const server = new grpc_xds.XdsServer({interceptors: [testInfoInterceptor]});
server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler);
const xdsCreds = new grpc_xds.XdsServerCredentials(grpc.ServerCredentials.createInsecure());
await Promise.all([
serverBindPromise(maintenanceServer, `[::]:${argv.maintenance_port}`, grpc.ServerCredentials.createInsecure()),
serverBindPromise(server, `[::]:${argv.port}`, xdsCreds)
]);
} else {
const server = new grpc.Server({interceptors: [unifiedInterceptor]});
server.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl);
healthImpl.addToServer(server);
reflection.addToServer(server);
grpc.addAdminServicesToServer(server);
server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler);
const creds = grpc.ServerCredentials.createInsecure();
switch (addressType) {
case 'IPV4_IPV6':
await serverBindPromise(server, `[::]:${argv.port}`, creds);
break;
case 'IPV4':
await serverBindPromise(server, `127.0.0.1:${argv.port}`, creds);
const address = getIPv4Address();
if (address) {
await serverBindPromise(server, `${address}:${argv.port}`, creds);
}
break;
case 'IPV6':
await serverBindPromise(server, `[::1]:${argv.port}`, creds);
for (const address of getIPv6Addresses()) {
try {
await serverBindPromise(server, `[${address}]:${argv.port}`, creds);
} catch (e) {
console.log(`Binding ${address} failed with error ${(e as Error).message}`);
}
}
break;
default:
throw new Error(`Unknown address type: ${argv.address_type}`);
}
}
healthImpl.setStatus('', 'SERVING');
}
if (require.main === module) {
main();
}

View File

@ -38,8 +38,10 @@
"@types/gulp-mocha": "0.0.32",
"@types/mocha": "^5.2.6",
"@types/node": ">=20.11.20",
"@grpc/reflection": "file:../grpc-reflection",
"@types/yargs": "^15.0.5",
"find-free-ports": "^3.1.1",
"grpc-health-check": "file:../grpc-health-check",
"gts": "^5.0.1",
"ncp": "^2.0.0",
"typescript": "^5.1.3",

View File

@ -28,11 +28,12 @@ set -eo pipefail
# Writes the output of docker image build stdout, stderr
#######################################
psm::lang::build_docker_images() {
local client_dockerfile="packages/grpc-js-xds/interop/Dockerfile"
local client_dockerfile="packages/grpc-js-xds/interop/test-client.Dockerfile"
local server_dockerfile="packages/grpc-js-xds/interop/test-server.Dockerfile"
cd "${SRC_DIR}"
psm::tools::run_verbose git submodule update --init --recursive
psm::tools::run_verbose git submodule status
psm::build::docker_images_generic "${client_dockerfile}"
psm::build::docker_images_generic "${client_dockerfile}" "${server_dockerfile}"
}

View File

@ -44,6 +44,7 @@ import {
makeClientConstructor,
MethodDefinition,
Serialize,
ServerMethodDefinition,
ServiceDefinition,
} from './make-client';
import { Metadata, MetadataOptions, MetadataValue } from './metadata';
@ -181,6 +182,7 @@ export {
ServerWritableStream,
ServerDuplexStream,
ServerErrorResponse,
ServerMethodDefinition,
ServiceDefinition,
UntypedHandleCall,
UntypedServiceImplementation,

View File

@ -337,6 +337,7 @@ export interface ServerInterceptingCallInterface {
export class ServerInterceptingCall implements ServerInterceptingCallInterface {
private responder: FullResponder;
private processingMetadata = false;
private sentMetadata = false;
private processingMessage = false;
private pendingMessage: any = null;
private pendingMessageCallback: (() => void) | null = null;
@ -395,6 +396,7 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface {
}
sendMetadata(metadata: Metadata): void {
this.processingMetadata = true;
this.sentMetadata = true;
this.responder.sendMetadata(metadata, interceptedMetadata => {
this.processingMetadata = false;
this.nextCall.sendMetadata(interceptedMetadata);
@ -404,6 +406,9 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface {
}
sendMessage(message: any, callback: () => void): void {
this.processingMessage = true;
if (!this.sentMetadata) {
this.sendMetadata(new Metadata());
}
this.responder.sendMessage(message, interceptedMessage => {
this.processingMessage = false;
if (this.processingMetadata) {

View File

@ -99,6 +99,10 @@ const { HTTP2_HEADER_PATH } = http2.constants;
const TRACER_NAME = 'server';
const kMaxAge = Buffer.from('max_age');
function serverCallTrace(text: string) {
logging.trace(LogVerbosity.DEBUG, 'server_call', text);
}
type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer;
interface BindResult {
@ -1248,7 +1252,7 @@ export class Server {
}
private _retrieveHandler(path: string): Handler<any, any> | null {
this.trace(
serverCallTrace(
'Received call to method ' +
path +
' at address ' +
@ -1258,7 +1262,7 @@ export class Server {
const handler = this.handlers.get(path);
if (handler === undefined) {
this.trace(
serverCallTrace(
'No handler registered for method ' +
path +
'. Sending UNIMPLEMENTED status.'

View File

@ -186,7 +186,6 @@ describe('Server interceptors', () => {
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
call.sendMetadata(new grpc.Metadata());
callback(null, call.request);
},
});

View File

@ -39,6 +39,7 @@
},
"devDependencies": {
"@grpc/grpc-js": "file:../grpc-js",
"@types/mocha": "^10.0.10",
"copyfiles": "^2.4.1",
"typescript": "^5.2.2"
}