diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index e6e3195e..78e33f27 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -15,6 +15,7 @@ "types": "build/src/index.d.ts", "license": "Apache-2.0", "devDependencies": { + "@grpc/proto-loader": "^0.4.0", "@types/lodash": "^4.14.108", "@types/mocha": "^5.2.6", "@types/node": "^11.13.2", diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts new file mode 100644 index 00000000..57c3d20d --- /dev/null +++ b/packages/grpc-js/src/server-call.ts @@ -0,0 +1,148 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import {EventEmitter} from 'events'; +import {Duplex, Readable, Writable} from 'stream'; +import {ServiceError} from './call'; +import {Deserialize, Serialize} from './make-client'; +import {Metadata} from './metadata'; + + +export class ServerUnaryCall extends EventEmitter { + cancelled: boolean; + request: RequestType|null; + + constructor(private call: ServerCall, public metadata: Metadata) { + super(); + this.cancelled = false; + this.request = null; // TODO(cjihrig): Read the unary request here. + } + + getPeer(): string { + throw new Error('not implemented yet'); + } + + sendMetadata(responseMetadata: Metadata): void { + throw new Error('not implemented yet'); + } +} + + +export class ServerReadableStream extends Readable { + cancelled: boolean; + + constructor( + private call: ServerCall, public metadata: Metadata, + private deserialize: Deserialize) { + super(); + this.cancelled = false; + } + + getPeer(): string { + throw new Error('not implemented yet'); + } + + sendMetadata(responseMetadata: Metadata): void { + throw new Error('not implemented yet'); + } +} + + +export class ServerWritableStream extends Writable { + cancelled: boolean; + request: RequestType|null; + + constructor( + private call: ServerCall, public metadata: Metadata, + private serialize: Serialize) { + super(); + this.cancelled = false; + this.request = null; // TODO(cjihrig): Read the unary request here. + } + + getPeer(): string { + throw new Error('not implemented yet'); + } + + sendMetadata(responseMetadata: Metadata): void { + throw new Error('not implemented yet'); + } +} + + +export class ServerDuplexStream extends Duplex { + cancelled: boolean; + + constructor( + private call: ServerCall, public metadata: Metadata, + private serialize: Serialize, + private deserialize: Deserialize) { + super(); + this.cancelled = false; + } + + getPeer(): string { + throw new Error('not implemented yet'); + } + + sendMetadata(responseMetadata: Metadata): void { + throw new Error('not implemented yet'); + } +} + + +// Internal class that wraps the HTTP2 request. +export class ServerCall {} + + +// Unary response callback signature. +export type sendUnaryData = + (error: ServiceError|null, value: ResponseType|null, trailer?: Metadata, + flags?: number) => void; + +// User provided handler for unary calls. +export type handleUnaryCall = + (call: ServerUnaryCall, + callback: sendUnaryData) => void; + +// User provided handler for client streaming calls. +export type handleClientStreamingCall = + (call: ServerReadableStream, + callback: sendUnaryData) => void; + +// User provided handler for server streaming calls. +export type handleServerStreamingCall = + (call: ServerWritableStream) => void; + +// User provided handler for bidirectional streaming calls. +export type handleBidiStreamingCall = + (call: ServerDuplexStream) => void; + +export type HandleCall = + handleUnaryCall| + handleClientStreamingCall| + handleServerStreamingCall| + handleBidiStreamingCall; + +export type Handler = { + func: HandleCall; + serialize: Serialize; + deserialize: Deserialize; + type: HandlerType; +}; + +export type HandlerType = 'bidi'|'clientStream'|'serverStream'|'unary'; diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts new file mode 100644 index 00000000..decdbe5a --- /dev/null +++ b/packages/grpc-js/src/server.ts @@ -0,0 +1,230 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as http2 from 'http2'; +import {AddressInfo, ListenOptions} from 'net'; +import {URL} from 'url'; + +import {ServiceError} from './call'; +import {Status} from './constants'; +import {Deserialize, Serialize, ServiceDefinition} from './make-client'; +import {HandleCall, Handler, HandlerType, sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from './server-call'; +import {ServerCredentials} from './server-credentials'; + +function noop(): void {} + +type PartialServiceError = Partial; +const unimplementedStatusResponse: PartialServiceError = { + code: Status.UNIMPLEMENTED, + details: 'The server does not implement this method', +}; + +// tslint:disable:no-any +type UntypedHandleCall = HandleCall; +type UntypedHandler = Handler; +type UntypedServiceImplementation = { + [name: string]: UntypedHandleCall +}; + +const defaultHandler = { + unary(call: ServerUnaryCall, callback: sendUnaryData): void { + callback(unimplementedStatusResponse as ServiceError, null); + }, + clientStream(call: ServerReadableStream, callback: sendUnaryData): + void { + callback(unimplementedStatusResponse as ServiceError, null); + }, + serverStream(call: ServerWritableStream): void { + call.emit('error', unimplementedStatusResponse); + }, + bidi(call: ServerDuplexStream): void { + call.emit('error', unimplementedStatusResponse); + } +}; +// tslint:enable:no-any + +export class Server { + private http2Server: http2.Http2Server|http2.Http2SecureServer|null = null; + private handlers: Map = + new Map(); + private started = false; + + constructor(options?: object) {} + + addProtoService(): void { + throw new Error('Not implemented. Use addService() instead'); + } + + addService(service: ServiceDefinition, implementation: object): void { + if (this.started === true) { + throw new Error('Can\'t add a service to a started server.'); + } + + if (service === null || typeof service !== 'object' || + implementation === null || typeof implementation !== 'object') { + throw new Error('addService() requires two objects as arguments'); + } + + const serviceKeys = Object.keys(service); + + if (serviceKeys.length === 0) { + throw new Error('Cannot add an empty service to a server'); + } + + const implMap: UntypedServiceImplementation = + implementation as UntypedServiceImplementation; + + serviceKeys.forEach((name) => { + const attrs = service[name]; + let methodType: HandlerType; + + if (attrs.requestStream) { + if (attrs.responseStream) { + methodType = 'bidi'; + } else { + methodType = 'clientStream'; + } + } else { + if (attrs.responseStream) { + methodType = 'serverStream'; + } else { + methodType = 'unary'; + } + } + + let implFn = implMap[name]; + let impl; + + if (implFn === undefined && typeof attrs.originalName === 'string') { + implFn = implMap[attrs.originalName]; + } + + if (implFn !== undefined) { + impl = implFn.bind(implementation); + } else { + impl = defaultHandler[methodType]; + } + + const success = this.register( + attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, + methodType); + + if (success === false) { + throw new Error(`Method handler for ${attrs.path} already provided.`); + } + }); + } + + bind(port: string, creds: ServerCredentials): void { + throw new Error('Not implemented. Use bindAsync() instead'); + } + + bindAsync( + port: string, creds: ServerCredentials, + callback: (error: Error|null, port: number) => void): void { + if (this.started === true) { + throw new Error('server is already started'); + } + + if (typeof port !== 'string') { + throw new TypeError('port must be a string'); + } + + if (creds === null || typeof creds !== 'object') { + throw new TypeError('creds must be an object'); + } + + if (typeof callback !== 'function') { + throw new TypeError('callback must be a function'); + } + + const url = new URL(`http://${port}`); + const options: ListenOptions = {host: url.hostname, port: +url.port}; + + if (creds._isSecure()) { + this.http2Server = http2.createSecureServer( + creds._getSettings() as http2.SecureServerOptions); + } else { + this.http2Server = http2.createServer(); + } + + // TODO(cjihrig): Set up the handlers, to allow requests to be processed. + + function onError(err: Error): void { + callback(err, -1); + } + + this.http2Server.once('error', onError); + + this.http2Server.listen(options, () => { + const server = + this.http2Server as http2.Http2Server | http2.Http2SecureServer; + const port = (server.address() as AddressInfo).port; + + server.removeListener('error', onError); + callback(null, port); + }); + } + + forceShutdown(): void { + throw new Error('Not yet implemented'); + } + + register( + name: string, handler: HandleCall, + serialize: Serialize, deserialize: Deserialize, + type: string): boolean { + if (this.handlers.has(name)) { + return false; + } + + this.handlers.set( + name, + {func: handler, serialize, deserialize, type: type as HandlerType}); + return true; + } + + start(): void { + if (this.http2Server === null || this.http2Server.listening !== true) { + throw new Error('server must be bound in order to start'); + } + + if (this.started === true) { + throw new Error('server is already started'); + } + + this.started = true; + } + + tryShutdown(callback: (error?: Error) => void): void { + callback = typeof callback === 'function' ? callback : noop; + + if (this.http2Server === null) { + callback(new Error('server is not running')); + return; + } + + this.http2Server.close((err?: Error) => { + this.started = false; + callback(err); + }); + } + + addHttp2Port(): void { + throw new Error('Not yet implemented'); + } +} diff --git a/packages/grpc-js/test/common.ts b/packages/grpc-js/test/common.ts index 1a1908e7..17976596 100644 --- a/packages/grpc-js/test/common.ts +++ b/packages/grpc-js/test/common.ts @@ -15,8 +15,19 @@ * */ +import * as loader from '@grpc/proto-loader'; import * as assert from 'assert'; +import {GrpcObject, loadPackageDefinition} from '../src/make-client'; + +const protoLoaderOptions = { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true +}; + export function mockFunction(): never { throw new Error('Not implemented'); } @@ -100,3 +111,8 @@ export namespace assert2 { } } } + +export function loadProtoFile(file: string): GrpcObject { + const packageDefinition = loader.loadSync(file, protoLoaderOptions); + return loadPackageDefinition(packageDefinition); +} diff --git a/packages/grpc-js/test/fixtures/math.proto b/packages/grpc-js/test/fixtures/math.proto new file mode 100644 index 00000000..ca59a7af --- /dev/null +++ b/packages/grpc-js/test/fixtures/math.proto @@ -0,0 +1,50 @@ +syntax = "proto3"; + +package math; + +message DivArgs { + int64 dividend = 1; + int64 divisor = 2; +} + +message DivReply { + int64 quotient = 1; + int64 remainder = 2; +} + +message FibArgs { + int64 limit = 1; +} + +message Num { + int64 num = 1; +} + +message FibReply { + int64 count = 1; +} + +service Math { + // Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient + // and remainder. + rpc Div (DivArgs) returns (DivReply) { + } + + // DivMany accepts an arbitrary number of division args from the client stream + // and sends back the results in the reply stream. The stream continues until + // the client closes its end; the server does the same after sending all the + // replies. The stream ends immediately if either end aborts. + rpc DivMany (stream DivArgs) returns (stream DivReply) { + } + + // Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib + // generates up to limit numbers; otherwise it continues until the call is + // canceled. Unlike Fib above, Fib has no final FibReply. + rpc Fib (FibArgs) returns (stream Num) { + } + + // Sum sums a stream of numbers, returning the final result once the stream + // is closed. + rpc Sum (stream Num) returns (Num) { + } +} diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts new file mode 100644 index 00000000..0b2f0524 --- /dev/null +++ b/packages/grpc-js/test/test-server.ts @@ -0,0 +1,231 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Allow `any` data type for testing runtime type checking. +// tslint:disable no-any +import * as assert from 'assert'; +import * as fs from 'fs'; +import * as path from 'path'; + +import {ServerCredentials} from '../src'; +import {Server} from '../src/server'; + +import {loadProtoFile} from './common'; + +const ca = fs.readFileSync(path.join(__dirname, 'fixtures', 'ca.pem')); +const key = fs.readFileSync(path.join(__dirname, 'fixtures', 'server1.key')); +const cert = fs.readFileSync(path.join(__dirname, 'fixtures', 'server1.pem')); +function noop(): void {} + + +describe('Server', () => { + describe('constructor', () => { + it('should work with no arguments', () => { + assert.doesNotThrow(() => { + new Server(); // tslint:disable-line:no-unused-expression + }); + }); + + it('should work with an empty object argument', () => { + assert.doesNotThrow(() => { + new Server({}); // tslint:disable-line:no-unused-expression + }); + }); + + it('should be an instance of Server', () => { + const server = new Server(); + + assert(server instanceof Server); + }); + }); + + describe('bindAsync', () => { + it('binds with insecure credentials', (done) => { + const server = new Server(); + + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), (err, port) => { + assert.ifError(err); + assert(typeof port === 'number' && port > 0); + server.tryShutdown(done); + }); + }); + + it('binds with secure credentials', (done) => { + const server = new Server(); + const creds = ServerCredentials.createSsl( + ca, [{private_key: key, cert_chain: cert}], true); + + server.bindAsync('localhost:0', creds, (err, port) => { + assert.ifError(err); + assert(typeof port === 'number' && port > 0); + server.tryShutdown(done); + }); + }); + + it('throws if bind is called after the server is started', () => { + const server = new Server(); + + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), (err, port) => { + assert.ifError(err); + server.start(); + assert.throws(() => { + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), noop); + }, /server is already started/); + }); + }); + + it('throws on invalid inputs', () => { + const server = new Server(); + + assert.throws(() => { + server.bindAsync(null as any, ServerCredentials.createInsecure(), noop); + }, /port must be a string/); + + assert.throws(() => { + server.bindAsync('localhost:0', null as any, noop); + }, /creds must be an object/); + + assert.throws(() => { + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), null as any); + }, /callback must be a function/); + }); + }); + + describe('tryShutdown', () => { + it('calls back with an error if the server is not running', (done) => { + const server = new Server(); + + server.tryShutdown((err) => { + assert(err !== undefined && err.message === 'server is not running'); + done(); + }); + }); + }); + + describe('start', () => { + let server: Server; + + beforeEach((done) => { + server = new Server(); + server.bindAsync('localhost:0', ServerCredentials.createInsecure(), done); + }); + + afterEach((done) => { + server.tryShutdown(done); + }); + + it('starts without error', () => { + assert.doesNotThrow(() => { + server.start(); + }); + }); + + it('throws if started twice', () => { + server.start(); + assert.throws(() => { + server.start(); + }, /server is already started/); + }); + + it('throws if the server is not bound', () => { + const server = new Server(); + + assert.throws(() => { + server.start(); + }, /server must be bound in order to start/); + }); + }); + + describe('addService', () => { + const mathProtoFile = path.join(__dirname, 'fixtures', 'math.proto'); + const mathClient = (loadProtoFile(mathProtoFile).math as any).Math; + const mathServiceAttrs = mathClient.service; + const dummyImpls = {div() {}, divMany() {}, fib() {}, sum() {}}; + const altDummyImpls = {Div() {}, DivMany() {}, Fib() {}, Sum() {}}; + + it('succeeds with a single service', () => { + const server = new Server(); + + assert.doesNotThrow(() => { + server.addService(mathServiceAttrs, dummyImpls); + }); + }); + + it('fails to add an empty service', () => { + const server = new Server(); + + assert.throws(() => { + server.addService({}, dummyImpls); + }, /Cannot add an empty service to a server/); + }); + + it('fails with conflicting method names', () => { + const server = new Server(); + + server.addService(mathServiceAttrs, dummyImpls); + assert.throws(() => { + server.addService(mathServiceAttrs, dummyImpls); + }, /Method handler for .+ already provided/); + }); + + it('supports method names as originally written', () => { + const server = new Server(); + + assert.doesNotThrow(() => { + server.addService(mathServiceAttrs, altDummyImpls); + }); + }); + + it('fails if the server has been started', (done) => { + const server = new Server(); + + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), (err, port) => { + assert.ifError(err); + server.start(); + assert.throws(() => { + server.addService(mathServiceAttrs, dummyImpls); + }, /Can't add a service to a started server\./); + server.tryShutdown(done); + }); + }); + }); + + it('throws when unimplemented methods are called', () => { + const server = new Server(); + + assert.throws(() => { + server.addProtoService(); + }, /Not implemented. Use addService\(\) instead/); + + assert.throws(() => { + server.forceShutdown(); + }, /Not yet implemented/); + + assert.throws(() => { + server.addHttp2Port(); + }, /Not yet implemented/); + + assert.throws(() => { + server.bind('localhost:0', ServerCredentials.createInsecure()); + }, /Not implemented. Use bindAsync\(\) instead/); + }); +});