Merge branch 'master' into grpc-js_xds_interop_client

This commit is contained in:
Michael Lumish 2020-08-20 11:05:00 -07:00
commit ffef02c943
7 changed files with 72 additions and 18 deletions

View File

@ -8,6 +8,10 @@ Node 12 is recommended. The exact set of compatible Node versions can be found i
npm install @grpc/grpc-js npm install @grpc/grpc-js
``` ```
## Documentation
Documentation specifically for the `@grpc/grpc-js` package is currently not available. However, [documentation is available for the `grpc` package](https://grpc.github.io/grpc/node/grpc.html), and the two packages contain mostly the same interface. There are a few notable differences, however, and these differences are noted in the "Migrating from grpc" section below.
## Features ## Features
- Clients - Clients
@ -28,7 +32,7 @@ This library does not directly handle `.proto` files. To use `.proto` files with
`@grpc/grpc-js` is almost a drop-in replacement for `grpc`, but you may need to make a few code changes to use it: `@grpc/grpc-js` is almost a drop-in replacement for `grpc`, but you may need to make a few code changes to use it:
- If you are currently loading `.proto` files using `grpc.load`, that function is not available in this library. You should instead load your `.proto` files using `@grpc/proto-loader` and load the resulting package definition objects into `@grpc/grpc-js` using `grpc.loadPackageDefinition`. - If you are currently loading `.proto` files using `grpc.load`, that function is not available in this library. You should instead load your `.proto` files using `@grpc/proto-loader` and load the resulting package definition objects into `@grpc/grpc-js` using `grpc.loadPackageDefinition`.
- If you are currently loading packages generated by `grpc-tools`, you should instead generate your files using the `--generate_package_definitions` option in `grpc-tools`, then load the object exported by the generated file into `@grpc/grpc-js` using `grpc.loadPackageDefinition`. - If you are currently loading packages generated by `grpc-tools`, you should instead generate your files using the `generate_package_definition` option in `grpc-tools`, then load the object exported by the generated file into `@grpc/grpc-js` using `grpc.loadPackageDefinition`.
- If you have a server and you are using `Server#bind` to bind ports, you will need to use `Server#bindAsync` instead. - If you have a server and you are using `Server#bind` to bind ports, you will need to use `Server#bindAsync` instead.
## Some Notes on API Guarantees ## Some Notes on API Guarantees

View File

@ -1,6 +1,6 @@
{ {
"name": "@grpc/grpc-js", "name": "@grpc/grpc-js",
"version": "1.1.3", "version": "1.1.5",
"description": "gRPC Library for Node - pure JS implementation", "description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/", "homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
@ -20,7 +20,6 @@
"@types/lodash": "^4.14.108", "@types/lodash": "^4.14.108",
"@types/mocha": "^5.2.6", "@types/mocha": "^5.2.6",
"@types/ncp": "^2.0.1", "@types/ncp": "^2.0.1",
"@types/node": "^12.7.5",
"@types/pify": "^3.0.2", "@types/pify": "^3.0.2",
"@types/semver": "^6.0.1", "@types/semver": "^6.0.1",
"@types/yargs": "^15.0.5", "@types/yargs": "^15.0.5",
@ -60,6 +59,7 @@
}, },
"dependencies": { "dependencies": {
"@grpc/proto-loader": "^0.6.0-pre14", "@grpc/proto-loader": "^0.6.0-pre14",
"@types/node": "^12.12.47",
"google-auth-library": "^5.10.1", "google-auth-library": "^5.10.1",
"semver": "^6.2.0" "semver": "^6.2.0"
}, },

View File

@ -227,7 +227,15 @@ export class Http2CallStream implements Call {
const filteredStatus = this.filterStack.receiveTrailers( const filteredStatus = this.filterStack.receiveTrailers(
this.finalStatus! this.finalStatus!
); );
this.listener?.onReceiveStatus(filteredStatus); /* We delay the actual action of bubbling up the status to insulate the
* cleanup code in this class from any errors that may be thrown in the
* upper layers as a result of bubbling up the status. In particular,
* if the status is not OK, the "error" event may be emitted
* synchronously at the top level, which will result in a thrown error if
* the user does not handle that event. */
process.nextTick(() => {
this.listener?.onReceiveStatus(filteredStatus);
});
if (this.subchannel) { if (this.subchannel) {
this.subchannel.callUnref(); this.subchannel.callUnref();
this.subchannel.removeDisconnectListener(this.disconnectListener); this.subchannel.removeDisconnectListener(this.disconnectListener);
@ -602,6 +610,7 @@ export class Http2CallStream implements Call {
} else { } else {
code = http2.constants.NGHTTP2_CANCEL; code = http2.constants.NGHTTP2_CANCEL;
} }
this.trace('close http2 stream with code ' + code);
this.http2Stream.close(code); this.http2Stream.close(code);
} }
} }
@ -630,7 +639,7 @@ export class Http2CallStream implements Call {
} }
getPeer(): string { getPeer(): string {
throw new Error('Not yet implemented'); return this.subchannel?.getAddress() ?? this.channel.getTarget();
} }
getMethod(): string { getMethod(): string {

View File

@ -93,7 +93,7 @@ export class ClientUnaryCallImpl extends EventEmitter
} }
getPeer(): string { getPeer(): string {
return this.call?.getPeer() ?? ''; return this.call?.getPeer() ?? 'unknown';
} }
} }
@ -109,7 +109,7 @@ export class ClientReadableStreamImpl<ResponseType> extends Readable
} }
getPeer(): string { getPeer(): string {
return this.call?.getPeer() ?? ''; return this.call?.getPeer() ?? 'unknown';
} }
_read(_size: number): void { _read(_size: number): void {
@ -129,7 +129,7 @@ export class ClientWritableStreamImpl<RequestType> extends Writable
} }
getPeer(): string { getPeer(): string {
return this.call?.getPeer() ?? ''; return this.call?.getPeer() ?? 'unknown';
} }
_write(chunk: RequestType, encoding: string, cb: WriteCallback) { _write(chunk: RequestType, encoding: string, cb: WriteCallback) {
@ -164,7 +164,7 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
} }
getPeer(): string { getPeer(): string {
return this.call?.getPeer() ?? ''; return this.call?.getPeer() ?? 'unknown';
} }
_read(_size: number): void { _read(_size: number): void {

View File

@ -91,10 +91,13 @@ export type ServerWritableStream<
RequestType, RequestType,
ResponseType ResponseType
> = ServerSurfaceCall & > = ServerSurfaceCall &
ObjectWritable<ResponseType> & { request: RequestType }; ObjectWritable<ResponseType> & {
request: RequestType;
end: (metadata?: Metadata) => void;
};
export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall & export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
ObjectReadable<RequestType> & ObjectReadable<RequestType> &
ObjectWritable<ResponseType>; ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };
export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
implements ServerUnaryCall<RequestType, ResponseType> { implements ServerUnaryCall<RequestType, ResponseType> {
@ -111,7 +114,7 @@ export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
} }
getPeer(): string { getPeer(): string {
throw new Error('not implemented yet'); return this.call.getPeer();
} }
sendMetadata(responseMetadata: Metadata): void { sendMetadata(responseMetadata: Metadata): void {
@ -144,7 +147,7 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
} }
getPeer(): string { getPeer(): string {
throw new Error('not implemented yet'); return this.call.getPeer();
} }
sendMetadata(responseMetadata: Metadata): void { sendMetadata(responseMetadata: Metadata): void {
@ -176,7 +179,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
} }
getPeer(): string { getPeer(): string {
throw new Error('not implemented yet'); return this.call.getPeer();
} }
sendMetadata(responseMetadata: Metadata): void { sendMetadata(responseMetadata: Metadata): void {
@ -247,12 +250,21 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
} }
getPeer(): string { getPeer(): string {
throw new Error('not implemented yet'); return this.call.getPeer();
} }
sendMetadata(responseMetadata: Metadata): void { sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata); this.call.sendMetadata(responseMetadata);
} }
// eslint-disable-next-line @typescript-eslint/no-explicit-any
end(metadata?: any) {
if (metadata) {
this.trailingMetadata = metadata;
}
super.end();
}
} }
ServerDuplexStreamImpl.prototype._read = ServerDuplexStreamImpl.prototype._read =
@ -371,6 +383,12 @@ export class Http2ServerCallStream<
}); });
this.stream.once('close', () => { this.stream.once('close', () => {
trace(
'Request to method ' +
this.handler?.path +
' stream closed with rstCode ' +
this.stream.rstCode
);
this.cancelled = true; this.cancelled = true;
this.emit('cancelled', 'cancelled'); this.emit('cancelled', 'cancelled');
}); });
@ -411,7 +429,7 @@ export class Http2ServerCallStream<
this.metadataSent = true; this.metadataSent = true;
const custom = customMetadata ? customMetadata.toHttp2Headers() : null; const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
// TODO(cjihrig): Include compression headers. // TODO(cjihrig): Include compression headers.
const headers = Object.assign(defaultResponseHeaders, custom); const headers = Object.assign({}, defaultResponseHeaders, custom);
this.stream.respond(headers, defaultResponseOptions); this.stream.respond(headers, defaultResponseOptions);
} }
@ -736,6 +754,19 @@ export class Http2ServerCallStream<
); );
} }
} }
getPeer(): string {
const socket = this.stream.session.socket;
if (socket.remoteAddress) {
if (socket.remotePort) {
return `${socket.remoteAddress}:${socket.remotePort}`;
} else {
return socket.remoteAddress;
}
} else {
return 'unknown';
}
}
} }
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */

View File

@ -543,11 +543,21 @@ export class Server {
try { try {
const path = headers[http2.constants.HTTP2_HEADER_PATH] as string; const path = headers[http2.constants.HTTP2_HEADER_PATH] as string;
const serverAddress = http2Server.address();
let serverAddressString = 'null';
if (serverAddress) {
if (typeof serverAddress === 'string') {
serverAddressString = serverAddress;
} else {
serverAddressString =
serverAddress.address + ':' + serverAddress.port;
}
}
trace( trace(
'Received call to method ' + 'Received call to method ' +
path + path +
' at address ' + ' at address ' +
http2Server.address()?.toString() serverAddressString
); );
const handler = this.handlers.get(path); const handler = this.handlers.get(path);

View File

@ -415,7 +415,7 @@ export class Subchannel {
); );
} }
trace( trace(
this.subchannelAddress + this.subchannelAddressString +
' connection closed by GOAWAY with code ' + ' connection closed by GOAWAY with code ' +
errorCode errorCode
); );