mirror of https://github.com/grpc/grpc-node.git
315 lines
11 KiB
TypeScript
315 lines
11 KiB
TypeScript
/*
|
|
* Copyright 2020 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
import * as grpc from '@grpc/grpc-js';
|
|
|
|
import * as grpc_xds from '../src';
|
|
|
|
import { ProtoGrpcType } from './generated/test';
|
|
|
|
import * as protoLoader from '@grpc/proto-loader';
|
|
import * as yargs from 'yargs';
|
|
import * as os from 'os';
|
|
import { HealthImplementation } from 'grpc-health-check';
|
|
import { Empty__Output, Empty } from './generated/grpc/testing/Empty';
|
|
import { SimpleRequest__Output } from './generated/grpc/testing/SimpleRequest';
|
|
import { SimpleResponse } from './generated/grpc/testing/SimpleResponse';
|
|
import { ReflectionService } from '@grpc/reflection';
|
|
|
|
grpc_xds.register();
|
|
|
|
const packageDefinition = protoLoader.loadSync('grpc/testing/test.proto', {
|
|
keepCase: true,
|
|
defaults: true,
|
|
oneofs: true,
|
|
json: true,
|
|
longs: String,
|
|
enums: String,
|
|
includeDirs: [__dirname + '/../../proto']
|
|
});
|
|
|
|
const loadedProto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType;
|
|
|
|
function setAsyncTimeout(delayMs: number): Promise<void> {
|
|
return new Promise<void>(resolve => {
|
|
setTimeout(() => {
|
|
resolve();
|
|
}, delayMs);
|
|
});
|
|
}
|
|
|
|
const HOSTNAME = os.hostname();
|
|
|
|
const TEST_SERVICE_NAME = '/grpc.testing.TestService/';
|
|
|
|
function testInfoInterceptor(methodDescriptor: grpc.ServerMethodDefinition<any, any>, call: grpc.ServerInterceptingCallInterface) {
|
|
const listener: grpc.ServerListener = {
|
|
onReceiveMetadata: async (metadata, next) => {
|
|
let attemptNum = 0;
|
|
const attemptNumHeader = metadata.get('grpc-previous-rpc-attempts');
|
|
if (attemptNumHeader.length > 0) {
|
|
attemptNum = Number(attemptNumHeader[0]);
|
|
if (Number.isNaN(attemptNum)) {
|
|
call.sendStatus({
|
|
code: grpc.status.INVALID_ARGUMENT,
|
|
details: 'Invalid format for grpc-previous-rpc-attempts header: ' + attemptNumHeader[0]
|
|
});
|
|
return;
|
|
}
|
|
}
|
|
const rpcBehavior = metadata.get('rpc-behavior').filter(v => typeof v === 'string').join(',');
|
|
for (const value of rpcBehavior.split(',')) {
|
|
let behaviorEntry: string;
|
|
if (value.startsWith('hostname=')) {
|
|
const splitValue = value.split(' ');
|
|
if (splitValue.length > 1) {
|
|
if (splitValue[0].substring('hostname='.length) !== HOSTNAME) {
|
|
continue;
|
|
}
|
|
behaviorEntry = splitValue[1];
|
|
} else {
|
|
call.sendStatus({
|
|
code: grpc.status.INVALID_ARGUMENT,
|
|
details: 'Invalid format for rpc-behavior header: ' + value
|
|
});
|
|
return;
|
|
}
|
|
} else {
|
|
behaviorEntry = value;
|
|
}
|
|
if (behaviorEntry.startsWith('sleep-')) {
|
|
const delaySec = Number(behaviorEntry.substring('sleep-'.length));
|
|
if (Number.isNaN(delaySec)) {
|
|
call.sendStatus({
|
|
code: grpc.status.INVALID_ARGUMENT,
|
|
details: 'Invalid format for rpc-behavior header: ' + value
|
|
});
|
|
return;
|
|
}
|
|
await setAsyncTimeout(delaySec * 1000);
|
|
}
|
|
if (behaviorEntry === 'keep-open') {
|
|
return;
|
|
}
|
|
if (behaviorEntry.startsWith('error-code-')) {
|
|
const errorCode = Number(behaviorEntry.substring('error-code-'.length));
|
|
if (Number.isNaN(errorCode)) {
|
|
call.sendStatus({
|
|
code: grpc.status.INVALID_ARGUMENT,
|
|
details: 'Invalid format for rpc-behavior header: ' + value
|
|
});
|
|
return;
|
|
}
|
|
call.sendStatus({
|
|
code: errorCode,
|
|
details: 'RPC failed as directed by rpc-behavior header value ' + value
|
|
});
|
|
return;
|
|
}
|
|
if (behaviorEntry.startsWith('succeed-on-retry-attempt-')) {
|
|
const targetAttempt = Number(behaviorEntry.substring('succeed-on-retry-attempt-'.length));
|
|
if (Number.isNaN(targetAttempt)) {
|
|
call.sendStatus({
|
|
code: grpc.status.INVALID_ARGUMENT,
|
|
details: 'Invalid format for rpc-behavior header: ' + value
|
|
});
|
|
return;
|
|
}
|
|
if (attemptNum === targetAttempt) {
|
|
next(metadata);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
next(metadata);
|
|
}
|
|
};
|
|
const responder: grpc.Responder = {
|
|
start: next => {
|
|
next(listener);
|
|
},
|
|
sendMetadata: (metadata, next) => {
|
|
metadata.add('hostname', HOSTNAME);
|
|
next(metadata);
|
|
}
|
|
}
|
|
return new grpc.ServerInterceptingCall(call, responder);
|
|
};
|
|
|
|
function adminServiceInterceptor(methodDescriptor: grpc.ServerMethodDefinition<any, any>, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall {
|
|
const listener: grpc.ServerListener = {
|
|
onReceiveMessage: (message, next) => {
|
|
console.log(`Received request to method ${methodDescriptor.path}: ${JSON.stringify(message)}`);
|
|
next(message);
|
|
}
|
|
}
|
|
const responder: grpc.Responder = {
|
|
start: next => {
|
|
next(listener);
|
|
},
|
|
sendMessage: (message, next) => {
|
|
console.log(`Responded to request to method ${methodDescriptor.path}: ${JSON.stringify(message)}`);
|
|
next(message);
|
|
}
|
|
};
|
|
return new grpc.ServerInterceptingCall(call, responder);
|
|
}
|
|
|
|
function unifiedInterceptor(methodDescriptor: grpc.ServerMethodDefinition<any, any>, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall {
|
|
if (methodDescriptor.path.startsWith(TEST_SERVICE_NAME)) {
|
|
return testInfoInterceptor(methodDescriptor, call);
|
|
} else {
|
|
return adminServiceInterceptor(methodDescriptor, call);
|
|
}
|
|
}
|
|
|
|
const testServiceHandler = {
|
|
EmptyCall: (call: grpc.ServerUnaryCall<Empty__Output, Empty>, callback: grpc.sendUnaryData<Empty>) => {
|
|
callback(null, {});
|
|
},
|
|
UnaryCall: (call: grpc.ServerUnaryCall<SimpleRequest__Output, SimpleResponse>, callback: grpc.sendUnaryData<SimpleResponse>) => {
|
|
callback(null, {
|
|
hostname: HOSTNAME,
|
|
payload: {
|
|
body: Buffer.from('0'.repeat(call.request.response_size))
|
|
}
|
|
});
|
|
}
|
|
};
|
|
|
|
function serverBindPromise(server: grpc.Server, port: string, credentials: grpc.ServerCredentials): Promise<number> {
|
|
return new Promise((resolve, reject) => {
|
|
server.bindAsync(port, credentials, (error, port) => {
|
|
if (error) {
|
|
reject(error);
|
|
} else {
|
|
resolve(port);
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
function getIPv4Address(): string | null {
|
|
for (const [name, addressList] of Object.entries(os.networkInterfaces())) {
|
|
if (name === 'lo' || !addressList) {
|
|
continue;
|
|
}
|
|
for (const address of addressList) {
|
|
if (address.family === 'IPv4') {
|
|
return address.address;
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function getIPv6Addresses(): string[] {
|
|
const ipv6Addresses: string[] = [];
|
|
for (const [name, addressList] of Object.entries(os.networkInterfaces())) {
|
|
if (name === 'lo' || !addressList) {
|
|
continue;
|
|
}
|
|
for (const address of addressList) {
|
|
if (address.family === 'IPv6') {
|
|
ipv6Addresses.push(address.address);
|
|
}
|
|
}
|
|
}
|
|
return ipv6Addresses;
|
|
}
|
|
|
|
async function main() {
|
|
const argv = yargs
|
|
.string(['port', 'maintenance_port', 'address_type', 'secure_mode'])
|
|
.demandOption(['port'])
|
|
.default('address_type', 'IPV4_IPV6')
|
|
.default('secure_mode', 'false')
|
|
.parse()
|
|
console.log('Starting xDS interop server. Args: ', argv);
|
|
const healthImpl = new HealthImplementation({'': 'NOT_SERVING'});
|
|
const xdsUpdateHealthServiceImpl = {
|
|
SetServing(call: grpc.ServerUnaryCall<Empty, Empty__Output>, callback: grpc.sendUnaryData<Empty__Output>) {
|
|
healthImpl.setStatus('', 'SERVING');
|
|
callback(null, {});
|
|
},
|
|
SetNotServing(call: grpc.ServerUnaryCall<Empty, Empty__Output>, callback: grpc.sendUnaryData<Empty__Output>) {
|
|
healthImpl.setStatus('', 'NOT_SERVING');
|
|
callback(null, {});
|
|
}
|
|
}
|
|
const reflection = new ReflectionService(packageDefinition, {
|
|
services: ['grpc.testing.TestService']
|
|
})
|
|
const addressType = argv.address_type.toUpperCase();
|
|
const secureMode = argv.secure_mode.toLowerCase() == 'true';
|
|
if (secureMode) {
|
|
if (addressType !== 'IPV4_IPV6') {
|
|
throw new Error('Secure mode only supports IPV4_IPV6 address type');
|
|
}
|
|
const maintenanceServer = new grpc.Server({interceptors: [adminServiceInterceptor]});
|
|
maintenanceServer.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl)
|
|
healthImpl.addToServer(maintenanceServer);
|
|
reflection.addToServer(maintenanceServer);
|
|
grpc.addAdminServicesToServer(maintenanceServer);
|
|
|
|
const server = new grpc_xds.XdsServer({interceptors: [testInfoInterceptor]});
|
|
server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler);
|
|
const xdsCreds = new grpc_xds.XdsServerCredentials(grpc.ServerCredentials.createInsecure());
|
|
await Promise.all([
|
|
serverBindPromise(maintenanceServer, `[::]:${argv.maintenance_port}`, grpc.ServerCredentials.createInsecure()),
|
|
serverBindPromise(server, `0.0.0.0:${argv.port}`, xdsCreds)
|
|
]);
|
|
} else {
|
|
const server = new grpc.Server({interceptors: [unifiedInterceptor]});
|
|
server.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl);
|
|
healthImpl.addToServer(server);
|
|
reflection.addToServer(server);
|
|
grpc.addAdminServicesToServer(server);
|
|
server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler);
|
|
const creds = grpc.ServerCredentials.createInsecure();
|
|
switch (addressType) {
|
|
case 'IPV4_IPV6':
|
|
await serverBindPromise(server, `[::]:${argv.port}`, creds);
|
|
break;
|
|
case 'IPV4':
|
|
await serverBindPromise(server, `127.0.0.1:${argv.port}`, creds);
|
|
const address = getIPv4Address();
|
|
if (address) {
|
|
await serverBindPromise(server, `${address}:${argv.port}`, creds);
|
|
}
|
|
break;
|
|
case 'IPV6':
|
|
await serverBindPromise(server, `[::1]:${argv.port}`, creds);
|
|
for (const address of getIPv6Addresses()) {
|
|
try {
|
|
await serverBindPromise(server, `[${address}]:${argv.port}`, creds);
|
|
} catch (e) {
|
|
console.log(`Binding ${address} failed with error ${(e as Error).message}`);
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
throw new Error(`Unknown address type: ${argv.address_type}`);
|
|
}
|
|
}
|
|
healthImpl.setStatus('', 'SERVING');
|
|
}
|
|
|
|
if (require.main === module) {
|
|
main();
|
|
}
|