Merge branch 'master' into grpc-js_config_selector

This commit is contained in:
Michael Lumish 2021-02-01 14:20:36 -08:00
commit e35a7d0a25
16 changed files with 173 additions and 89 deletions

3
SECURITY.md Normal file
View File

@ -0,0 +1,3 @@
# Security Policy
For information on gRPC Security Policy and reporting potentional security issues, please see [gRPC CVE Process](https://github.com/grpc/proposal/blob/master/P4-grpc-cve-process.md).

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js-xds",
"version": "1.0.0",
"version": "1.2.1",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js",
"scripts": {
@ -32,22 +32,36 @@
"homepage": "https://github.com/grpc/grpc-node#readme",
"devDependencies": {
"@grpc/grpc-js": "file:../grpc-js",
"gts": "^2.0.2",
"typescript": "^3.8.3",
"@types/gulp": "^4.0.6",
"@types/gulp-mocha": "0.0.32",
"@types/mocha": "^5.2.6",
"@types/node": "^13.11.1",
"@types/yargs": "^15.0.5",
"gts": "^2.0.2",
"typescript": "^3.8.3",
"yargs": "^15.4.1"
},
"dependencies": {
"@grpc/proto-loader": "^0.6.0-pre14"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.2.0"
"@grpc/grpc-js": "~1.2.2"
},
"engines": {
"node": ">=10.10.0"
}
},
"files": [
"src/**/*.ts",
"build/src/**/*.{js,d.ts,js.map}",
"deps/envoy-api/envoy/api/v2/**/*.proto",
"deps/envoy-api/envoy/config/**/*.proto",
"deps/envoy-api/envoy/service/**/*.proto",
"deps/envoy-api/envoy/type/**/*.proto",
"deps/envoy-api/envoy/annotations/**/*.proto",
"deps/googleapis/google/api/**/*.proto",
"deps/googleapis/google/protobuf/**/*.proto",
"deps/googleapis/google/rpc/**/*.proto",
"deps/udpa/udpa/annotations/**/*.proto",
"deps/protoc-gen-validate/validate/**/*.proto"
]
}

View File

@ -30,8 +30,8 @@ function trace(text: string): void {
}
class XdsResolver implements Resolver {
private resolutionStarted = false;
private hasReportedSuccess = false;
private xdsClient: XdsClient | null = null;
constructor(
private target: GrpcUri,
@ -39,29 +39,28 @@ class XdsResolver implements Resolver {
private channelOptions: ChannelOptions
) {}
private reportResolutionError() {
private reportResolutionError(reason: string) {
this.listener.onError({
code: status.UNAVAILABLE,
details: `xDS name resolution failed for target ${uriToString(
this.target
)}`,
)}: ${reason}`,
metadata: new Metadata(),
});
}
updateResolution(): void {
// Wait until updateResolution is called once to start the xDS requests
if (!this.resolutionStarted) {
this.resolutionStarted = true;
if (this.xdsClient === null) {
trace('Starting resolution for target ' + uriToString(this.target));
const xdsClient = new XdsClient(
this.xdsClient = new XdsClient(
this.target.path,
{
onValidUpdate: (update: ServiceConfig) => {
trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update));
this.hasReportedSuccess = true;
this.listener.onSuccessfulResolution([], update, null, null, {
xdsClient: xdsClient,
xdsClient: this.xdsClient,
});
},
onTransientError: (error: StatusObject) => {
@ -69,12 +68,12 @@ class XdsResolver implements Resolver {
* not already provided a ServiceConfig for the upper layer to use */
if (!this.hasReportedSuccess) {
trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details);
this.reportResolutionError();
this.reportResolutionError(error.details);
}
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ': resource does not exist');
this.reportResolutionError();
this.reportResolutionError("Resource does not exist");
},
},
this.channelOptions
@ -82,6 +81,10 @@ class XdsResolver implements Resolver {
}
}
destroy() {
this.xdsClient?.shutdown();
}
static getDefaultAuthority(target: GrpcUri) {
return target.path;
}

View File

@ -1018,13 +1018,11 @@ export class XdsClient {
this.lrsBackoff.runOnce();
this.lrsCall = this.lrsClient.streamLoadStats();
this.lrsCall.on('metadata', () => {
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
/* Once we get any response from the server, we assume that the stream is
* in a good state, so we can reset the backoff timer. */
this.lrsBackoff.stop();
this.lrsBackoff.reset();
});
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
if (
message.load_reporting_interval?.seconds !==
this.latestLrsSettings?.load_reporting_interval?.seconds ||
@ -1157,7 +1155,7 @@ export class XdsClient {
}
removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
trace('Watcher removed for endpoint ' + clusterName);
trace('Watcher removed for cluster ' + clusterName);
this.adsState[CDS_TYPE_URL].removeWatcher(clusterName, watcher);
}

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.2.0",
"version": "1.2.5",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -37,6 +37,10 @@ const {
NGHTTP2_CANCEL,
} = http2.constants;
interface NodeError extends Error {
code: string;
}
export type Deadline = Date | number;
export interface CallStreamOptions {
@ -202,6 +206,8 @@ export class Http2CallStream implements Call {
private listener: InterceptingListener | null = null;
private internalErrorMessage: string | null = null;
constructor(
private readonly methodName: string,
private readonly channel: ChannelImplementation,
@ -518,66 +524,86 @@ export class Http2CallStream implements Call {
this.maybeOutputStatus();
});
stream.on('close', () => {
this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
/* If we have a final status with an OK status code, that means that
* we have received all of the messages and we have processed the
* trailers and the call completed successfully, so it doesn't matter
* how the stream ends after that */
if (this.finalStatus?.code === Status.OK) {
return;
}
let code: Status;
let details = '';
switch (stream.rstCode) {
case http2.constants.NGHTTP2_NO_ERROR:
/* If we get a NO_ERROR code and we already have a status, the
* stream completed properly and we just haven't fully processed
* it yet */
if (this.finalStatus !== null) {
return;
}
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
break;
case http2.constants.NGHTTP2_REFUSED_STREAM:
code = Status.UNAVAILABLE;
details = 'Stream refused by server';
break;
case http2.constants.NGHTTP2_CANCEL:
code = Status.CANCELLED;
details = 'Call cancelled';
break;
case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
code = Status.RESOURCE_EXHAUSTED;
details = 'Bandwidth exhausted';
break;
case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
code = Status.PERMISSION_DENIED;
details = 'Protocol not secure enough';
break;
case http2.constants.NGHTTP2_INTERNAL_ERROR:
code = Status.INTERNAL;
/* This error code was previously handled in the default case, and
* there are several instances of it online, so I wanted to
* preserve the original error message so that people find existing
* information in searches, but also include the more recognizable
* "Internal server error" message. */
details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
break;
default:
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
}
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({ code, details, metadata: new Metadata() });
/* Use process.next tick to ensure that this code happens after any
* "error" event that may be emitted at about the same time, so that
* we can bubble up the error message from that event. */
process.nextTick(() => {
this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
/* If we have a final status with an OK status code, that means that
* we have received all of the messages and we have processed the
* trailers and the call completed successfully, so it doesn't matter
* how the stream ends after that */
if (this.finalStatus?.code === Status.OK) {
return;
}
let code: Status;
let details = '';
switch (stream.rstCode) {
case http2.constants.NGHTTP2_NO_ERROR:
/* If we get a NO_ERROR code and we already have a status, the
* stream completed properly and we just haven't fully processed
* it yet */
if (this.finalStatus !== null) {
return;
}
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
break;
case http2.constants.NGHTTP2_REFUSED_STREAM:
code = Status.UNAVAILABLE;
details = 'Stream refused by server';
break;
case http2.constants.NGHTTP2_CANCEL:
code = Status.CANCELLED;
details = 'Call cancelled';
break;
case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
code = Status.RESOURCE_EXHAUSTED;
details = 'Bandwidth exhausted';
break;
case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
code = Status.PERMISSION_DENIED;
details = 'Protocol not secure enough';
break;
case http2.constants.NGHTTP2_INTERNAL_ERROR:
code = Status.INTERNAL;
if (this.internalErrorMessage === null) {
/* This error code was previously handled in the default case, and
* there are several instances of it online, so I wanted to
* preserve the original error message so that people find existing
* information in searches, but also include the more recognizable
* "Internal server error" message. */
details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
} else {
/* The "Received RST_STREAM with code ..." error is preserved
* here for continuity with errors reported online, but the
* error message at the end will probably be more relevant in
* most cases. */
details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalErrorMessage}`;
}
break;
default:
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
}
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({ code, details, metadata: new Metadata() });
});
});
stream.on('error', (err: Error) => {
stream.on('error', (err: NodeError) => {
/* We need an error handler here to stop "Uncaught Error" exceptions
* from bubbling up. However, errors here should all correspond to
* "close" events, where we will handle the error more granularly */
/* Specifically looking for stream errors that were *not* constructed
* from a RST_STREAM response here:
* https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
*/
if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
this.internalErrorMessage = err.message;
}
});
if (!this.pendingRead) {
stream.pause();
@ -630,7 +656,11 @@ export class Http2CallStream implements Call {
getDeadline(): Deadline {
if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
return this.options.parentCall.getDeadline();
const parentDeadline = this.options.parentCall.getDeadline();
const selfDeadline = this.options.deadline;
const parentDeadlineMsecs = parentDeadline instanceof Date ? parentDeadline.getTime() : parentDeadline;
const selfDeadlineMsecs = selfDeadline instanceof Date ? selfDeadline.getTime() : selfDeadline;
return Math.min(parentDeadlineMsecs, selfDeadlineMsecs);
} else {
return this.options.deadline;
}

View File

@ -347,10 +347,11 @@ class BaseInterceptingCall implements InterceptingCallInterface {
let serialized: Buffer;
try {
serialized = this.methodDefinition.requestSerialize(message);
this.call.sendMessageWithContext(context, serialized);
} catch (e) {
this.call.cancelWithStatus(Status.INTERNAL, `Request message serialization failure: ${e.message}`);
return;
}
this.call.sendMessageWithContext(context, serialized);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessage(message: any) {
@ -370,7 +371,6 @@ class BaseInterceptingCall implements InterceptingCallInterface {
let deserialized: any;
try {
deserialized = this.methodDefinition.responseDeserialize(message);
interceptingListener?.onReceiveMessage?.(deserialized);
} catch (e) {
readError = {
code: Status.INTERNAL,
@ -378,7 +378,9 @@ class BaseInterceptingCall implements InterceptingCallInterface {
metadata: new Metadata(),
};
this.call.cancelWithStatus(readError.code, readError.details);
return;
}
interceptingListener?.onReceiveMessage?.(deserialized);
},
onReceiveStatus: (status) => {
if (readError) {

View File

@ -56,10 +56,14 @@ export class DeadlineFilter extends BaseFilter implements Filter {
}
const now: number = new Date().getTime();
let timeout = this.deadline - now;
if (timeout < 0) {
timeout = 0;
}
if (this.deadline !== Infinity) {
if (timeout <= 0) {
process.nextTick(() => {
callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED,
'Deadline exceeded'
);
});
} else if (this.deadline !== Infinity) {
this.timer = setTimeout(() => {
callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED,

View File

@ -128,14 +128,12 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
this.subchannelStateCounts[previousState] -= 1;
this.subchannelStateCounts[newState] += 1;
this.calculateAndUpdateState();
if (newState === ConnectivityState.TRANSIENT_FAILURE) {
this.channelControlHelper.requestReresolution();
}
if (
newState === ConnectivityState.TRANSIENT_FAILURE ||
newState === ConnectivityState.IDLE
) {
this.channelControlHelper.requestReresolution();
subchannel.startConnecting();
}
};

View File

@ -93,6 +93,15 @@ export interface ServiceClientConstructor {
service: ServiceDefinition;
}
/**
* Returns true, if given key is included in the blacklisted
* keys.
* @param key key for check, string.
*/
function isPrototypePolluted(key: string): Boolean {
return ['__proto__', 'prototype', 'constructor'].includes(key);
}
/**
* Creates a constructor for a client with the given methods, as specified in
* the methods argument. The resulting class will have an instance method for
@ -122,7 +131,7 @@ export function makeClientConstructor(
}
Object.keys(methods).forEach((name) => {
if (name === '__proto__') {
if (isPrototypePolluted(name)) {
return;
}
const attrs = methods[name];
@ -155,7 +164,7 @@ export function makeClientConstructor(
ServiceClientImpl.prototype[name] = methodFunc;
// Associate all provided attributes with the method
Object.assign(ServiceClientImpl.prototype[name], attrs);
if (attrs.originalName && attrs.originalName !== '__proto__') {
if (attrs.originalName && !isPrototypePolluted(attrs.originalName)) {
ServiceClientImpl.prototype[attrs.originalName] =
ServiceClientImpl.prototype[name];
}
@ -204,7 +213,7 @@ export function loadPackageDefinition(
if (Object.prototype.hasOwnProperty.call(packageDef, serviceFqn)) {
const service = packageDef[serviceFqn];
const nameComponents = serviceFqn.split('.');
if (nameComponents.some(comp => comp === '__proto__')) {
if (nameComponents.some((comp: string) => isPrototypePolluted(comp))) {
continue;
}
const serviceName = nameComponents[nameComponents.length - 1];

View File

@ -264,6 +264,12 @@ class DnsResolver implements Resolver {
}
}
destroy() {
/* Do nothing. There is not a practical way to cancel in-flight DNS
* requests, and after this function is called we can expect that
* updateResolution will not be called again. */
}
/**
* Get the default authority for the given target. For IP targets, that is
* the IP address. For DNS targets, it is the hostname.

View File

@ -45,6 +45,10 @@ class UdsResolver implements Resolver {
);
}
destroy() {
// This resolver owns no resources, so we do nothing here.
}
static getDefaultAuthority(target: GrpcUri): string {
return 'localhost';
}

View File

@ -80,6 +80,11 @@ export interface Resolver {
* called synchronously with the constructor or updateResolution.
*/
updateResolution(): void;
/**
* Destroy the resolver. Should be called when the owning channel shuts down.
*/
destroy(): void;
}
export interface ResolverConstructor {

View File

@ -291,6 +291,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
destroy() {
this.childLoadBalancer.destroy();
this.innerResolver.destroy();
this.updateState(ConnectivityState.SHUTDOWN, new UnavailablePicker());
}

View File

@ -24,4 +24,8 @@ describe('loadPackageDefinition', () => {
loadPackageDefinition({'__proto__.polluted': true} as any);
assert.notStrictEqual(({} as any).polluted, true);
});
it('Should not allow prototype pollution #2', () => {
loadPackageDefinition({'constructor.prototype.polluted': true} as any);
assert.notStrictEqual(({} as any).polluted, true);
});
});

View File

@ -394,6 +394,9 @@ describe('Name Resolver', () => {
return [];
}
destroy() {
}
static getDefaultAuthority(target: GrpcUri): string {
return 'other';
}