/* * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import * as grpc from '@grpc/grpc-js'; import * as grpc_xds from '../src'; import { ProtoGrpcType } from './generated/test'; import * as protoLoader from '@grpc/proto-loader'; import * as yargs from 'yargs'; import * as os from 'os'; import { HealthImplementation } from 'grpc-health-check'; import { Empty__Output, Empty } from './generated/grpc/testing/Empty'; import { SimpleRequest__Output } from './generated/grpc/testing/SimpleRequest'; import { SimpleResponse } from './generated/grpc/testing/SimpleResponse'; import { ReflectionService } from '@grpc/reflection'; const packageDefinition = protoLoader.loadSync('grpc/testing/test.proto', { keepCase: true, defaults: true, oneofs: true, json: true, longs: String, enums: String, includeDirs: [__dirname + '/../../proto'] }); const loadedProto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType; function setAsyncTimeout(delayMs: number): Promise { return new Promise(resolve => { setTimeout(() => { resolve(); }, delayMs); }); } const HOSTNAME = os.hostname(); function testInfoInterceptor(methodDescriptor: grpc.ServerMethodDefinition, call: grpc.ServerInterceptingCallInterface) { const listener: grpc.ServerListener = { onReceiveMetadata: async (metadata, next) => { let attemptNum = 0; const attemptNumHeader = metadata.get('grpc-previous-rpc-attempts'); if (attemptNumHeader.length > 0) { attemptNum = Number(attemptNumHeader[0]); if (Number.isNaN(attemptNum)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for grpc-previous-rpc-attempts header: ' + attemptNumHeader[0] }); return; } } const rpcBehavior = metadata.get('rpc-behavior').filter(v => typeof v === 'string').join(','); for (const value of rpcBehavior.split(',')) { let behaviorEntry: string; if (value.startsWith('hostname=')) { const splitValue = value.split(' '); if (splitValue.length > 1) { if (splitValue[0].substring('hostname='.length) !== HOSTNAME) { continue; } behaviorEntry = splitValue[1]; } else { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for rpc-behavior header: ' + value }); return; } } else { behaviorEntry = value; } if (behaviorEntry.startsWith('sleep-')) { const delaySec = Number(behaviorEntry.substring('sleep-'.length)); if (Number.isNaN(delaySec)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for rpc-behavior header: ' + value }); return; } await setAsyncTimeout(delaySec * 1000); } if (behaviorEntry === 'keep-open') { return; } if (behaviorEntry.startsWith('error-code-')) { const errorCode = Number(behaviorEntry.substring('error-code-'.length)); if (Number.isNaN(errorCode)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for rpc-behavior header: ' + value }); return; } call.sendStatus({ code: errorCode, details: 'RPC failed as directed by rpc-behavior header value ' + value }); return; } if (behaviorEntry.startsWith('succeed-on-retry-attempt-')) { const targetAttempt = Number(behaviorEntry.substring('succeed-on-retry-attempt-'.length)); if (Number.isNaN(targetAttempt)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: 'Invalid format for rpc-behavior header: ' + value }); return; } if (attemptNum === targetAttempt) { next(metadata); return; } } } next(metadata); } }; const responder: grpc.Responder = { start: next => { next(listener); }, sendMetadata: (metadata, next) => { metadata.add('hostname', HOSTNAME); next(metadata); } } return new grpc.ServerInterceptingCall(call, responder); }; const testServiceHandler = { EmptyCall: (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { callback(null, {}); }, UnaryCall: (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { callback(null, { hostname: HOSTNAME, payload: { body: Buffer.from('0'.repeat(call.request.response_size)) } }); } }; function serverBindPromise(server: grpc.Server, port: string, credentials: grpc.ServerCredentials): Promise { return new Promise((resolve, reject) => { server.bindAsync(port, credentials, (error, port) => { if (error) { reject(error); } else { resolve(port); } }) }) } function getIPv4Address(): string | null { for (const [name, addressList] of Object.entries(os.networkInterfaces())) { if (name === 'lo' || !addressList) { continue; } for (const address of addressList) { if (address.family === 'IPv4') { return address.address; } } } return null; } function getIPv6Addresses(): string[] { const ipv6Addresses: string[] = []; for (const [name, addressList] of Object.entries(os.networkInterfaces())) { if (name === 'lo' || !addressList) { continue; } for (const address of addressList) { if (address.family === 'IPv6') { ipv6Addresses.push(address.address); } } } return ipv6Addresses; } async function main() { const argv = yargs .string(['port', 'maintenance_port', 'address_type']) .boolean(['secure_mode']) .choices('address_type', ['IPV4', 'IPV6', 'IPV4_IPV6']) .demandOption(['port']) .default('address_type', 'IPV4_IPV6') .default('secure_mode', false) .parse() console.log('Starting xDS interop server. Args: ', argv); const healthImpl = new HealthImplementation({'': 'NOT_SERVING'}); const xdsUpdateHealthServiceImpl = { SetServing(call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) { console.log('SetServing called'); healthImpl.setStatus('', 'SERVING'); callback(null, {}); }, SetNotServing(call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) { console.log('SetNotServing called'); healthImpl.setStatus('', 'NOT_SERVING'); callback(null, {}); } } const reflection = new ReflectionService(packageDefinition, { services: ['grpc.testing.TestService'] }) if (argv.secure_mode) { if (argv.address_type !== 'IPV4_IPV6') { throw new Error('Secure mode only supports IPV4_IPV6 address type'); } const maintenanceServer = new grpc.Server(); maintenanceServer.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl) healthImpl.addToServer(maintenanceServer); reflection.addToServer(maintenanceServer); grpc.addAdminServicesToServer(maintenanceServer); const server = new grpc_xds.XdsServer({interceptors: [testInfoInterceptor]}); server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler); const xdsCreds = new grpc_xds.XdsServerCredentials(grpc.ServerCredentials.createInsecure()); await Promise.all([ serverBindPromise(maintenanceServer, `[::]:${argv.maintenance_port}`, grpc.ServerCredentials.createInsecure()), serverBindPromise(server, `[::]:${argv.port}`, xdsCreds) ]); } else { const server = new grpc.Server({interceptors: [testInfoInterceptor]}); server.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl); healthImpl.addToServer(server); reflection.addToServer(server); grpc.addAdminServicesToServer(server); server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler); const creds = grpc.ServerCredentials.createInsecure(); switch (argv.address_type) { case 'IPV4_IPV6': await serverBindPromise(server, `[::]:${argv.port}`, creds); break; case 'IPV4': await serverBindPromise(server, `127.0.0.1:${argv.port}`, creds); const address = getIPv4Address(); if (address) { await serverBindPromise(server, `${address}:${argv.port}`, creds); } break; case 'IPV6': await serverBindPromise(server, `[::1]:${argv.port}`, creds); for (const address of getIPv6Addresses()) { await serverBindPromise(server, `${address}:${argv.port}`, creds); } break; default: throw new Error(`Unknown address type: ${argv.address_type}`); } } healthImpl.setStatus('', 'SERVING'); } if (require.main === module) { main(); }