/* * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import * as grpc from '@grpc/grpc-js'; import * as grpc_xds from '../src'; import { ProtoGrpcType } from './generated/test'; import * as protoLoader from '@grpc/proto-loader'; import * as yargs from 'yargs'; import * as os from 'os'; import { HealthImplementation } from 'grpc-health-check'; import { Empty__Output, Empty } from './generated/grpc/testing/Empty'; import { SimpleRequest__Output } from './generated/grpc/testing/SimpleRequest'; import { SimpleResponse } from './generated/grpc/testing/SimpleResponse'; import { ReflectionService } from '@grpc/reflection'; grpc_xds.register(); const packageDefinition = protoLoader.loadSync('grpc/testing/test.proto', { keepCase: true, defaults: true, oneofs: true, json: true, longs: String, enums: String, includeDirs: [__dirname + '/../../proto'] }); const loadedProto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType; function setAsyncTimeout(delayMs: number): Promise { return new Promise(resolve => { setTimeout(() => { resolve(); }, delayMs); }); } const HOSTNAME = os.hostname(); const TEST_SERVICE_NAME = '/grpc.testing.TestService/'; function testInfoInterceptor(methodDescriptor: grpc.ServerMethodDefinition, call: grpc.ServerInterceptingCallInterface) { const listener: grpc.ServerListener = { onReceiveMetadata: async (metadata, next) => { let attemptNum = 0; const attemptNumHeader = metadata.get('grpc-previous-rpc-attempts'); if (attemptNumHeader.length > 0) { attemptNum = Number(attemptNumHeader[0]); if (Number.isNaN(attemptNum)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for grpc-previous-rpc-attempts header: ' + attemptNumHeader[0] }); return; } } const rpcBehavior = metadata.get('rpc-behavior').filter(v => typeof v === 'string').join(','); for (const value of rpcBehavior.split(',')) { let behaviorEntry: string; if (value.startsWith('hostname=')) { const splitValue = value.split(' '); if (splitValue.length > 1) { if (splitValue[0].substring('hostname='.length) !== HOSTNAME) { continue; } behaviorEntry = splitValue[1]; } else { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for rpc-behavior header: ' + value }); return; } } else { behaviorEntry = value; } if (behaviorEntry.startsWith('sleep-')) { const delaySec = Number(behaviorEntry.substring('sleep-'.length)); if (Number.isNaN(delaySec)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for rpc-behavior header: ' + value }); return; } await setAsyncTimeout(delaySec * 1000); } if (behaviorEntry === 'keep-open') { return; } if (behaviorEntry.startsWith('error-code-')) { const errorCode = Number(behaviorEntry.substring('error-code-'.length)); if (Number.isNaN(errorCode)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for rpc-behavior header: ' + value }); return; } call.sendStatus({ code: errorCode, details: 'RPC failed as directed by rpc-behavior header value ' + value }); return; } if (behaviorEntry.startsWith('succeed-on-retry-attempt-')) { const targetAttempt = Number(behaviorEntry.substring('succeed-on-retry-attempt-'.length)); if (Number.isNaN(targetAttempt)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for rpc-behavior header: ' + value }); return; } if (attemptNum === targetAttempt) { next(metadata); return; } } } next(metadata); } }; const responder: grpc.Responder = { start: next => { next(listener); }, sendMetadata: (metadata, next) => { metadata.add('hostname', HOSTNAME); next(metadata); } } return new grpc.ServerInterceptingCall(call, responder); }; function adminServiceInterceptor(methodDescriptor: grpc.ServerMethodDefinition, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall { const listener: grpc.ServerListener = { onReceiveMessage: (message, next) => { console.log(`Received request to method ${methodDescriptor.path}: ${JSON.stringify(message)}`); next(message); } } const responder: grpc.Responder = { start: next => { next(listener); }, sendMessage: (message, next) => { console.log(`Responded to request to method ${methodDescriptor.path}: ${JSON.stringify(message)}`); next(message); } }; return new grpc.ServerInterceptingCall(call, responder); } function unifiedInterceptor(methodDescriptor: grpc.ServerMethodDefinition, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall { if (methodDescriptor.path.startsWith(TEST_SERVICE_NAME)) { return testInfoInterceptor(methodDescriptor, call); } else { return adminServiceInterceptor(methodDescriptor, call); } } const testServiceHandler = { EmptyCall: (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { callback(null, {}); }, UnaryCall: (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { callback(null, { hostname: HOSTNAME, payload: { body: Buffer.from('0'.repeat(call.request.response_size)) } }); } }; function serverBindPromise(server: grpc.Server, port: string, credentials: grpc.ServerCredentials): Promise { return new Promise((resolve, reject) => { server.bindAsync(port, credentials, (error, port) => { if (error) { reject(error); } else { resolve(port); } }) }) } function getIPv4Address(): string | null { for (const [name, addressList] of Object.entries(os.networkInterfaces())) { if (name === 'lo' || !addressList) { continue; } for (const address of addressList) { if (address.family === 'IPv4') { return address.address; } } } return null; } function getIPv6Addresses(): string[] { const ipv6Addresses: string[] = []; for (const [name, addressList] of Object.entries(os.networkInterfaces())) { if (name === 'lo' || !addressList) { continue; } for (const address of addressList) { if (address.family === 'IPv6') { ipv6Addresses.push(address.address); } } } return ipv6Addresses; } async function main() { const argv = yargs .string(['port', 'maintenance_port', 'address_type', 'secure_mode']) .demandOption(['port']) .default('address_type', 'IPV4_IPV6') .default('secure_mode', 'false') .parse() console.log('Starting xDS interop server. Args: ', argv); const healthImpl = new HealthImplementation({'': 'NOT_SERVING'}); const xdsUpdateHealthServiceImpl = { SetServing(call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) { healthImpl.setStatus('', 'SERVING'); callback(null, {}); }, SetNotServing(call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) { healthImpl.setStatus('', 'NOT_SERVING'); callback(null, {}); } } const reflection = new ReflectionService(packageDefinition, { services: ['grpc.testing.TestService'] }) const addressType = argv.address_type.toUpperCase(); const secureMode = argv.secure_mode.toLowerCase() == 'true'; if (secureMode) { if (addressType !== 'IPV4_IPV6') { throw new Error('Secure mode only supports IPV4_IPV6 address type'); } const maintenanceServer = new grpc.Server({interceptors: [adminServiceInterceptor]}); maintenanceServer.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl) healthImpl.addToServer(maintenanceServer); reflection.addToServer(maintenanceServer); grpc.addAdminServicesToServer(maintenanceServer); const server = new grpc_xds.XdsServer({interceptors: [testInfoInterceptor]}); server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler); const xdsCreds = new grpc_xds.XdsServerCredentials(grpc.ServerCredentials.createInsecure()); await Promise.all([ serverBindPromise(maintenanceServer, `[::]:${argv.maintenance_port}`, grpc.ServerCredentials.createInsecure()), serverBindPromise(server, `0.0.0.0:${argv.port}`, xdsCreds) ]); } else { const server = new grpc.Server({interceptors: [unifiedInterceptor]}); server.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl); healthImpl.addToServer(server); reflection.addToServer(server); grpc.addAdminServicesToServer(server); server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler); const creds = grpc.ServerCredentials.createInsecure(); switch (addressType) { case 'IPV4_IPV6': await serverBindPromise(server, `[::]:${argv.port}`, creds); break; case 'IPV4': await serverBindPromise(server, `127.0.0.1:${argv.port}`, creds); const address = getIPv4Address(); if (address) { await serverBindPromise(server, `${address}:${argv.port}`, creds); } break; case 'IPV6': await serverBindPromise(server, `[::1]:${argv.port}`, creds); for (const address of getIPv6Addresses()) { try { await serverBindPromise(server, `[${address}]:${argv.port}`, creds); } catch (e) { console.log(`Binding ${address} failed with error ${(e as Error).message}`); } } break; default: throw new Error(`Unknown address type: ${argv.address_type}`); } } healthImpl.setStatus('', 'SERVING'); } if (require.main === module) { main(); }