grpc-js: define Server API contract

This commit defines the Server API contract, and implements
the Server functionality, minus the actual handling of requests.
This commit is contained in:
cjihrig 2019-04-06 00:59:48 -04:00
parent 167625719d
commit 62e7f0c85a
No known key found for this signature in database
GPG Key ID: 7434390BDBE9B9C5
6 changed files with 676 additions and 0 deletions

View File

@ -15,6 +15,7 @@
"types": "build/src/index.d.ts",
"license": "Apache-2.0",
"devDependencies": {
"@grpc/proto-loader": "^0.4.0",
"@types/lodash": "^4.14.108",
"@types/mocha": "^5.2.6",
"@types/node": "^11.13.2",

View File

@ -0,0 +1,148 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import {EventEmitter} from 'events';
import {Duplex, Readable, Writable} from 'stream';
import {ServiceError} from './call';
import {Deserialize, Serialize} from './make-client';
import {Metadata} from './metadata';
export class ServerUnaryCall<RequestType> extends EventEmitter {
cancelled: boolean;
request: RequestType|null;
constructor(private call: ServerCall, public metadata: Metadata) {
super();
this.cancelled = false;
this.request = null; // TODO(cjihrig): Read the unary request here.
}
getPeer(): string {
throw new Error('not implemented yet');
}
sendMetadata(responseMetadata: Metadata): void {
throw new Error('not implemented yet');
}
}
export class ServerReadableStream<RequestType> extends Readable {
cancelled: boolean;
constructor(
private call: ServerCall, public metadata: Metadata,
private deserialize: Deserialize<RequestType>) {
super();
this.cancelled = false;
}
getPeer(): string {
throw new Error('not implemented yet');
}
sendMetadata(responseMetadata: Metadata): void {
throw new Error('not implemented yet');
}
}
export class ServerWritableStream<RequestType, ResponseType> extends Writable {
cancelled: boolean;
request: RequestType|null;
constructor(
private call: ServerCall, public metadata: Metadata,
private serialize: Serialize<ResponseType>) {
super();
this.cancelled = false;
this.request = null; // TODO(cjihrig): Read the unary request here.
}
getPeer(): string {
throw new Error('not implemented yet');
}
sendMetadata(responseMetadata: Metadata): void {
throw new Error('not implemented yet');
}
}
export class ServerDuplexStream<RequestType, ResponseType> extends Duplex {
cancelled: boolean;
constructor(
private call: ServerCall, public metadata: Metadata,
private serialize: Serialize<ResponseType>,
private deserialize: Deserialize<RequestType>) {
super();
this.cancelled = false;
}
getPeer(): string {
throw new Error('not implemented yet');
}
sendMetadata(responseMetadata: Metadata): void {
throw new Error('not implemented yet');
}
}
// Internal class that wraps the HTTP2 request.
export class ServerCall {}
// Unary response callback signature.
export type sendUnaryData<ResponseType> =
(error: ServiceError|null, value: ResponseType|null, trailer?: Metadata,
flags?: number) => void;
// User provided handler for unary calls.
export type handleUnaryCall<RequestType, ResponseType> =
(call: ServerUnaryCall<RequestType>,
callback: sendUnaryData<ResponseType>) => void;
// User provided handler for client streaming calls.
export type handleClientStreamingCall<RequestType, ResponseType> =
(call: ServerReadableStream<RequestType>,
callback: sendUnaryData<ResponseType>) => void;
// User provided handler for server streaming calls.
export type handleServerStreamingCall<RequestType, ResponseType> =
(call: ServerWritableStream<RequestType, ResponseType>) => void;
// User provided handler for bidirectional streaming calls.
export type handleBidiStreamingCall<RequestType, ResponseType> =
(call: ServerDuplexStream<RequestType, ResponseType>) => void;
export type HandleCall<RequestType, ResponseType> =
handleUnaryCall<RequestType, ResponseType>|
handleClientStreamingCall<RequestType, ResponseType>|
handleServerStreamingCall<RequestType, ResponseType>|
handleBidiStreamingCall<RequestType, ResponseType>;
export type Handler<RequestType, ResponseType> = {
func: HandleCall<RequestType, ResponseType>;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
type: HandlerType;
};
export type HandlerType = 'bidi'|'clientStream'|'serverStream'|'unary';

View File

@ -0,0 +1,230 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as http2 from 'http2';
import {AddressInfo, ListenOptions} from 'net';
import {URL} from 'url';
import {ServiceError} from './call';
import {Status} from './constants';
import {Deserialize, Serialize, ServiceDefinition} from './make-client';
import {HandleCall, Handler, HandlerType, sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from './server-call';
import {ServerCredentials} from './server-credentials';
function noop(): void {}
type PartialServiceError = Partial<ServiceError>;
const unimplementedStatusResponse: PartialServiceError = {
code: Status.UNIMPLEMENTED,
details: 'The server does not implement this method',
};
// tslint:disable:no-any
type UntypedHandleCall = HandleCall<any, any>;
type UntypedHandler = Handler<any, any>;
type UntypedServiceImplementation = {
[name: string]: UntypedHandleCall
};
const defaultHandler = {
unary(call: ServerUnaryCall<any>, callback: sendUnaryData<any>): void {
callback(unimplementedStatusResponse as ServiceError, null);
},
clientStream(call: ServerReadableStream<any>, callback: sendUnaryData<any>):
void {
callback(unimplementedStatusResponse as ServiceError, null);
},
serverStream(call: ServerWritableStream<any, any>): void {
call.emit('error', unimplementedStatusResponse);
},
bidi(call: ServerDuplexStream<any, any>): void {
call.emit('error', unimplementedStatusResponse);
}
};
// tslint:enable:no-any
export class Server {
private http2Server: http2.Http2Server|http2.Http2SecureServer|null = null;
private handlers: Map<string, UntypedHandler> =
new Map<string, UntypedHandler>();
private started = false;
constructor(options?: object) {}
addProtoService(): void {
throw new Error('Not implemented. Use addService() instead');
}
addService(service: ServiceDefinition, implementation: object): void {
if (this.started === true) {
throw new Error('Can\'t add a service to a started server.');
}
if (service === null || typeof service !== 'object' ||
implementation === null || typeof implementation !== 'object') {
throw new Error('addService() requires two objects as arguments');
}
const serviceKeys = Object.keys(service);
if (serviceKeys.length === 0) {
throw new Error('Cannot add an empty service to a server');
}
const implMap: UntypedServiceImplementation =
implementation as UntypedServiceImplementation;
serviceKeys.forEach((name) => {
const attrs = service[name];
let methodType: HandlerType;
if (attrs.requestStream) {
if (attrs.responseStream) {
methodType = 'bidi';
} else {
methodType = 'clientStream';
}
} else {
if (attrs.responseStream) {
methodType = 'serverStream';
} else {
methodType = 'unary';
}
}
let implFn = implMap[name];
let impl;
if (implFn === undefined && typeof attrs.originalName === 'string') {
implFn = implMap[attrs.originalName];
}
if (implFn !== undefined) {
impl = implFn.bind(implementation);
} else {
impl = defaultHandler[methodType];
}
const success = this.register(
attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize,
methodType);
if (success === false) {
throw new Error(`Method handler for ${attrs.path} already provided.`);
}
});
}
bind(port: string, creds: ServerCredentials): void {
throw new Error('Not implemented. Use bindAsync() instead');
}
bindAsync(
port: string, creds: ServerCredentials,
callback: (error: Error|null, port: number) => void): void {
if (this.started === true) {
throw new Error('server is already started');
}
if (typeof port !== 'string') {
throw new TypeError('port must be a string');
}
if (creds === null || typeof creds !== 'object') {
throw new TypeError('creds must be an object');
}
if (typeof callback !== 'function') {
throw new TypeError('callback must be a function');
}
const url = new URL(`http://${port}`);
const options: ListenOptions = {host: url.hostname, port: +url.port};
if (creds._isSecure()) {
this.http2Server = http2.createSecureServer(
creds._getSettings() as http2.SecureServerOptions);
} else {
this.http2Server = http2.createServer();
}
// TODO(cjihrig): Set up the handlers, to allow requests to be processed.
function onError(err: Error): void {
callback(err, -1);
}
this.http2Server.once('error', onError);
this.http2Server.listen(options, () => {
const server =
this.http2Server as http2.Http2Server | http2.Http2SecureServer;
const port = (server.address() as AddressInfo).port;
server.removeListener('error', onError);
callback(null, port);
});
}
forceShutdown(): void {
throw new Error('Not yet implemented');
}
register<RequestType, ResponseType>(
name: string, handler: HandleCall<RequestType, ResponseType>,
serialize: Serialize<ResponseType>, deserialize: Deserialize<RequestType>,
type: string): boolean {
if (this.handlers.has(name)) {
return false;
}
this.handlers.set(
name,
{func: handler, serialize, deserialize, type: type as HandlerType});
return true;
}
start(): void {
if (this.http2Server === null || this.http2Server.listening !== true) {
throw new Error('server must be bound in order to start');
}
if (this.started === true) {
throw new Error('server is already started');
}
this.started = true;
}
tryShutdown(callback: (error?: Error) => void): void {
callback = typeof callback === 'function' ? callback : noop;
if (this.http2Server === null) {
callback(new Error('server is not running'));
return;
}
this.http2Server.close((err?: Error) => {
this.started = false;
callback(err);
});
}
addHttp2Port(): void {
throw new Error('Not yet implemented');
}
}

View File

@ -15,8 +15,19 @@
*
*/
import * as loader from '@grpc/proto-loader';
import * as assert from 'assert';
import {GrpcObject, loadPackageDefinition} from '../src/make-client';
const protoLoaderOptions = {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
};
export function mockFunction(): never {
throw new Error('Not implemented');
}
@ -100,3 +111,8 @@ export namespace assert2 {
}
}
}
export function loadProtoFile(file: string): GrpcObject {
const packageDefinition = loader.loadSync(file, protoLoaderOptions);
return loadPackageDefinition(packageDefinition);
}

View File

@ -0,0 +1,50 @@
syntax = "proto3";
package math;
message DivArgs {
int64 dividend = 1;
int64 divisor = 2;
}
message DivReply {
int64 quotient = 1;
int64 remainder = 2;
}
message FibArgs {
int64 limit = 1;
}
message Num {
int64 num = 1;
}
message FibReply {
int64 count = 1;
}
service Math {
// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient
// and remainder.
rpc Div (DivArgs) returns (DivReply) {
}
// DivMany accepts an arbitrary number of division args from the client stream
// and sends back the results in the reply stream. The stream continues until
// the client closes its end; the server does the same after sending all the
// replies. The stream ends immediately if either end aborts.
rpc DivMany (stream DivArgs) returns (stream DivReply) {
}
// Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib
// generates up to limit numbers; otherwise it continues until the call is
// canceled. Unlike Fib above, Fib has no final FibReply.
rpc Fib (FibArgs) returns (stream Num) {
}
// Sum sums a stream of numbers, returning the final result once the stream
// is closed.
rpc Sum (stream Num) returns (Num) {
}
}

View File

@ -0,0 +1,231 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Allow `any` data type for testing runtime type checking.
// tslint:disable no-any
import * as assert from 'assert';
import * as fs from 'fs';
import * as path from 'path';
import {ServerCredentials} from '../src';
import {Server} from '../src/server';
import {loadProtoFile} from './common';
const ca = fs.readFileSync(path.join(__dirname, 'fixtures', 'ca.pem'));
const key = fs.readFileSync(path.join(__dirname, 'fixtures', 'server1.key'));
const cert = fs.readFileSync(path.join(__dirname, 'fixtures', 'server1.pem'));
function noop(): void {}
describe('Server', () => {
describe('constructor', () => {
it('should work with no arguments', () => {
assert.doesNotThrow(() => {
new Server(); // tslint:disable-line:no-unused-expression
});
});
it('should work with an empty object argument', () => {
assert.doesNotThrow(() => {
new Server({}); // tslint:disable-line:no-unused-expression
});
});
it('should be an instance of Server', () => {
const server = new Server();
assert(server instanceof Server);
});
});
describe('bindAsync', () => {
it('binds with insecure credentials', (done) => {
const server = new Server();
server.bindAsync(
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
assert(typeof port === 'number' && port > 0);
server.tryShutdown(done);
});
});
it('binds with secure credentials', (done) => {
const server = new Server();
const creds = ServerCredentials.createSsl(
ca, [{private_key: key, cert_chain: cert}], true);
server.bindAsync('localhost:0', creds, (err, port) => {
assert.ifError(err);
assert(typeof port === 'number' && port > 0);
server.tryShutdown(done);
});
});
it('throws if bind is called after the server is started', () => {
const server = new Server();
server.bindAsync(
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
server.start();
assert.throws(() => {
server.bindAsync(
'localhost:0', ServerCredentials.createInsecure(), noop);
}, /server is already started/);
});
});
it('throws on invalid inputs', () => {
const server = new Server();
assert.throws(() => {
server.bindAsync(null as any, ServerCredentials.createInsecure(), noop);
}, /port must be a string/);
assert.throws(() => {
server.bindAsync('localhost:0', null as any, noop);
}, /creds must be an object/);
assert.throws(() => {
server.bindAsync(
'localhost:0', ServerCredentials.createInsecure(), null as any);
}, /callback must be a function/);
});
});
describe('tryShutdown', () => {
it('calls back with an error if the server is not running', (done) => {
const server = new Server();
server.tryShutdown((err) => {
assert(err !== undefined && err.message === 'server is not running');
done();
});
});
});
describe('start', () => {
let server: Server;
beforeEach((done) => {
server = new Server();
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), done);
});
afterEach((done) => {
server.tryShutdown(done);
});
it('starts without error', () => {
assert.doesNotThrow(() => {
server.start();
});
});
it('throws if started twice', () => {
server.start();
assert.throws(() => {
server.start();
}, /server is already started/);
});
it('throws if the server is not bound', () => {
const server = new Server();
assert.throws(() => {
server.start();
}, /server must be bound in order to start/);
});
});
describe('addService', () => {
const mathProtoFile = path.join(__dirname, 'fixtures', 'math.proto');
const mathClient = (loadProtoFile(mathProtoFile).math as any).Math;
const mathServiceAttrs = mathClient.service;
const dummyImpls = {div() {}, divMany() {}, fib() {}, sum() {}};
const altDummyImpls = {Div() {}, DivMany() {}, Fib() {}, Sum() {}};
it('succeeds with a single service', () => {
const server = new Server();
assert.doesNotThrow(() => {
server.addService(mathServiceAttrs, dummyImpls);
});
});
it('fails to add an empty service', () => {
const server = new Server();
assert.throws(() => {
server.addService({}, dummyImpls);
}, /Cannot add an empty service to a server/);
});
it('fails with conflicting method names', () => {
const server = new Server();
server.addService(mathServiceAttrs, dummyImpls);
assert.throws(() => {
server.addService(mathServiceAttrs, dummyImpls);
}, /Method handler for .+ already provided/);
});
it('supports method names as originally written', () => {
const server = new Server();
assert.doesNotThrow(() => {
server.addService(mathServiceAttrs, altDummyImpls);
});
});
it('fails if the server has been started', (done) => {
const server = new Server();
server.bindAsync(
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
server.start();
assert.throws(() => {
server.addService(mathServiceAttrs, dummyImpls);
}, /Can't add a service to a started server\./);
server.tryShutdown(done);
});
});
});
it('throws when unimplemented methods are called', () => {
const server = new Server();
assert.throws(() => {
server.addProtoService();
}, /Not implemented. Use addService\(\) instead/);
assert.throws(() => {
server.forceShutdown();
}, /Not yet implemented/);
assert.throws(() => {
server.addHttp2Port();
}, /Not yet implemented/);
assert.throws(() => {
server.bind('localhost:0', ServerCredentials.createInsecure());
}, /Not implemented. Use bindAsync\(\) instead/);
});
});