mirror of https://github.com/grpc/grpc-node.git
grpc-js: Add Server#createConnectionInjector API
This commit is contained in:
parent
0ba7d70fb9
commit
321b6603b0
|
@ -74,6 +74,7 @@ import { ServerInterceptingCallInterface, ServerInterceptor, getServerIntercepti
|
|||
import { PartialStatusObject } from './call-interface';
|
||||
import { CallEventTracker } from './transport';
|
||||
import { Socket } from 'net';
|
||||
import { Duplex } from 'stream';
|
||||
|
||||
const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
|
||||
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
|
||||
|
@ -225,6 +226,12 @@ export interface ServerOptions extends ChannelOptions {
|
|||
interceptors?: ServerInterceptor[]
|
||||
}
|
||||
|
||||
export interface ConnectionInjector {
|
||||
injectConnection(connection: Duplex): void;
|
||||
drain(graceTimeMs: number): void;
|
||||
destroy(): void;
|
||||
}
|
||||
|
||||
export class Server {
|
||||
private boundPorts: Map<string, BoundPort>= new Map();
|
||||
private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();
|
||||
|
@ -808,6 +815,70 @@ export class Server {
|
|||
}
|
||||
}
|
||||
|
||||
private registerInjectorToChannelz() {
|
||||
return registerChannelzSocket(
|
||||
'injector',
|
||||
() => {
|
||||
return {
|
||||
localAddress: null,
|
||||
remoteAddress: null,
|
||||
security: null,
|
||||
remoteName: null,
|
||||
streamsStarted: 0,
|
||||
streamsSucceeded: 0,
|
||||
streamsFailed: 0,
|
||||
messagesSent: 0,
|
||||
messagesReceived: 0,
|
||||
keepAlivesSent: 0,
|
||||
lastLocalStreamCreatedTimestamp: null,
|
||||
lastRemoteStreamCreatedTimestamp: null,
|
||||
lastMessageSentTimestamp: null,
|
||||
lastMessageReceivedTimestamp: null,
|
||||
localFlowControlWindow: null,
|
||||
remoteFlowControlWindow: null,
|
||||
};
|
||||
},
|
||||
this.channelzEnabled
|
||||
);
|
||||
}
|
||||
|
||||
createConnectionInjector(credentials: ServerCredentials): ConnectionInjector {
|
||||
if (credentials === null || !(credentials instanceof ServerCredentials)) {
|
||||
throw new TypeError('creds must be a ServerCredentials object');
|
||||
}
|
||||
const server = this.createHttp2Server(credentials);
|
||||
const channelzRef = this.registerInjectorToChannelz();
|
||||
if (this.channelzEnabled) {
|
||||
this.listenerChildrenTracker.refChild(channelzRef);
|
||||
}
|
||||
const sessionsSet: Set<http2.ServerHttp2Session> = new Set();
|
||||
this.http2Servers.set(server, {
|
||||
channelzRef: channelzRef,
|
||||
sessions: sessionsSet
|
||||
});
|
||||
return {
|
||||
injectConnection: (connection: Duplex) => {
|
||||
server.emit('connection', connection);
|
||||
},
|
||||
drain: (graceTimeMs: number) => {
|
||||
for (const session of sessionsSet) {
|
||||
this.closeSession(session);
|
||||
}
|
||||
setTimeout(() => {
|
||||
for (const session of sessionsSet) {
|
||||
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
|
||||
}
|
||||
}, graceTimeMs).unref?.();
|
||||
},
|
||||
destroy: () => {
|
||||
this.closeServer(server)
|
||||
for (const session of sessionsSet) {
|
||||
this.closeSession(session);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private closeServer(server: AnyHttp2Server, callback?: () => void) {
|
||||
this.trace('Closing server with address ' + JSON.stringify(server.address()));
|
||||
const serverInfo = this.http2Servers.get(server);
|
||||
|
|
|
@ -21,6 +21,7 @@ import * as assert from 'assert';
|
|||
import * as fs from 'fs';
|
||||
import * as http2 from 'http2';
|
||||
import * as path from 'path';
|
||||
import * as net from 'net';
|
||||
import * as protoLoader from '@grpc/proto-loader';
|
||||
|
||||
import * as grpc from '../src';
|
||||
|
@ -905,6 +906,72 @@ describe('Echo service', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('Connection injector', () => {
|
||||
let tcpServer: net.Server;
|
||||
let server: Server;
|
||||
let client: ServiceClient;
|
||||
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
|
||||
const echoService = loadProtoFile(protoFile)
|
||||
.EchoService as ServiceClientConstructor;
|
||||
|
||||
const serviceImplementation = {
|
||||
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
|
||||
callback(null, call.request);
|
||||
},
|
||||
echoBidiStream(call: ServerDuplexStream<any, any>) {
|
||||
call.on('data', data => {
|
||||
call.write(data);
|
||||
});
|
||||
call.on('end', () => {
|
||||
call.end();
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
before(done => {
|
||||
server = new Server();
|
||||
const creds = ServerCredentials.createSsl(
|
||||
null,
|
||||
[{ private_key: key, cert_chain: cert }]
|
||||
);
|
||||
const connectionInjector = server.createConnectionInjector(creds);
|
||||
tcpServer = net.createServer(socket => {
|
||||
connectionInjector.injectConnection(socket);
|
||||
});
|
||||
server.addService(echoService.service, serviceImplementation);
|
||||
tcpServer.listen(0, 'localhost', () => {
|
||||
const port = (tcpServer.address() as net.AddressInfo).port;
|
||||
client = new echoService(
|
||||
`localhost:${port}`,
|
||||
grpc.credentials.createSsl(ca),
|
||||
{
|
||||
'grpc.ssl_target_name_override': 'foo.test.google.fr',
|
||||
'grpc.default_authority': 'foo.test.google.fr'
|
||||
}
|
||||
);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
after(done => {
|
||||
client.close();
|
||||
tcpServer.close();
|
||||
server.tryShutdown(done);
|
||||
});
|
||||
|
||||
it('should respond to a request', done => {
|
||||
client.echo(
|
||||
{ value: 'test value', value2: 3 },
|
||||
(error: ServiceError, response: any) => {
|
||||
assert.ifError(error);
|
||||
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
|
||||
done();
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
})
|
||||
|
||||
describe('Generic client and server', () => {
|
||||
function toString(val: any) {
|
||||
return val.toString();
|
||||
|
|
Loading…
Reference in New Issue