Merge branch 'master' into grpc-js-xds_resource_timer

This commit is contained in:
Michael Lumish 2022-08-08 13:52:38 -07:00
commit 6ab1abccff
28 changed files with 846 additions and 198 deletions

View File

@ -0,0 +1,38 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Dockerfile for building the xDS interop client. To build the image, run the
# following command from grpc-node directory:
# docker build -t <TAG> -f packages/grpc-js-xds/interop/Dockerfile .
FROM node:16-alpine as build
# Make a grpc-node directory and copy the repo into it.
WORKDIR /node/src/grpc-node
COPY . .
WORKDIR /node/src/grpc-node/packages/grpc-js
RUN npm install
WORKDIR /node/src/grpc-node/packages/grpc-js-xds
RUN npm install
FROM node:16-alpine
WORKDIR /node/src/grpc-node
COPY --from=build /node/src/grpc-node/packages/grpc-js ./packages/grpc-js/
COPY --from=build /node/src/grpc-node/packages/grpc-js-xds ./packages/grpc-js-xds/
ENV GRPC_VERBOSITY="DEBUG"
ENV GRPC_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds,outlier_detection
ENTRYPOINT [ "node", "/node/src/grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client" ]

View File

@ -0,0 +1,172 @@
#!/usr/bin/env bash
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -eo pipefail
# Constants
readonly GITHUB_REPOSITORY_NAME="grpc-node"
readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/grpc/${TEST_DRIVER_BRANCH:-master}/tools/internal_ci/linux/grpc_xds_k8s_install_test_driver.sh"
## xDS test client Docker images
readonly SERVER_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/java-server:v1.46.x"
readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/node-client"
readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}"
readonly BUILD_APP_PATH="packages/grpc-js-xds/interop/Dockerfile"
readonly LANGUAGE_NAME="Node"
#######################################
# Builds test app Docker images and pushes them to GCR
# Globals:
# BUILD_APP_PATH
# CLIENT_IMAGE_NAME: Test client Docker image name
# GIT_COMMIT: SHA-1 of git commit being built
# TESTING_VERSION: version branch under test, f.e. v1.42.x, master
# Arguments:
# None
# Outputs:
# Writes the output of `gcloud builds submit` to stdout, stderr
#######################################
build_test_app_docker_images() {
echo "Building ${LANGUAGE_NAME} xDS interop test app Docker images"
pushd "${SRC_DIR}"
docker build \
-f "${BUILD_APP_PATH}" \
-t "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \
.
gcloud -q auth configure-docker
docker push "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}"
if is_version_branch "${TESTING_VERSION}"; then
tag_and_push_docker_image "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}" "${TESTING_VERSION}"
fi
popd
}
#######################################
# Builds test app and its docker images unless they already exist
# Globals:
# CLIENT_IMAGE_NAME: Test client Docker image name
# GIT_COMMIT: SHA-1 of git commit being built
# FORCE_IMAGE_BUILD
# Arguments:
# None
# Outputs:
# Writes the output to stdout, stderr
#######################################
build_docker_images_if_needed() {
# Check if images already exist
client_tags="$(gcloud_gcr_list_image_tags "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}")"
printf "Client image: %s:%s\n" "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}"
echo "${client_tags:-Client image not found}"
# Build if any of the images are missing, or FORCE_IMAGE_BUILD=1
if [[ "${FORCE_IMAGE_BUILD}" == "1" || -z "${client_tags}" ]]; then
build_test_app_docker_images
else
echo "Skipping ${LANGUAGE_NAME} test app build"
fi
}
#######################################
# Executes the test case
# Globals:
# TEST_DRIVER_FLAGFILE: Relative path to test driver flagfile
# KUBE_CONTEXT: The name of kubectl context with GKE cluster access
# SECONDARY_KUBE_CONTEXT: The name of kubectl context with secondary GKE cluster access, if any
# TEST_XML_OUTPUT_DIR: Output directory for the test xUnit XML report
# CLIENT_IMAGE_NAME: Test client Docker image name
# GIT_COMMIT: SHA-1 of git commit being built
# Arguments:
# Test case name
# Outputs:
# Writes the output of test execution to stdout, stderr
# Test xUnit report to ${TEST_XML_OUTPUT_DIR}/${test_name}/sponge_log.xml
#######################################
run_test() {
# Test driver usage:
# https://github.com/grpc/grpc/tree/master/tools/run_tests/xds_k8s_test_driver#basic-usage
local test_name="${1:?Usage: run_test test_name}"
# testing_version is used by the framework to determine the supported PSM
# features. It's captured from Kokoro job name of the Node repo, which takes
# the form:
# grpc/node/<branch name>/<job name>
python3 -m "tests.${test_name}" \
--flagfile="${TEST_DRIVER_FLAGFILE}" \
--kube_context="${KUBE_CONTEXT}" \
--secondary_kube_context="${SECONDARY_KUBE_CONTEXT}" \
--client_image="${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \
--server_image="${SERVER_IMAGE_NAME}" \
--testing_version="${TESTING_VERSION}" \
--xml_output_file="${TEST_XML_OUTPUT_DIR}/${test_name}/sponge_log.xml"
}
#######################################
# Main function: provision software necessary to execute tests, and run them
# Globals:
# KOKORO_ARTIFACTS_DIR
# GITHUB_REPOSITORY_NAME
# SRC_DIR: Populated with absolute path to the source repo
# TEST_DRIVER_REPO_DIR: Populated with the path to the repo containing
# the test driver
# TEST_DRIVER_FULL_DIR: Populated with the path to the test driver source code
# TEST_DRIVER_FLAGFILE: Populated with relative path to test driver flagfile
# TEST_XML_OUTPUT_DIR: Populated with the path to test xUnit XML report
# GIT_ORIGIN_URL: Populated with the origin URL of git repo used for the build
# GIT_COMMIT: Populated with the SHA-1 of git commit being built
# GIT_COMMIT_SHORT: Populated with the short SHA-1 of git commit being built
# KUBE_CONTEXT: Populated with name of kubectl context with GKE cluster access
# SECONDARY_KUBE_CONTEXT: Populated with name of kubectl context with secondary GKE cluster access, if any
# Arguments:
# None
# Outputs:
# Writes the output of test execution to stdout, stderr
#######################################
main() {
local script_dir
script_dir="$(dirname "$0")"
cd "${script_dir}"
git submodule update --init --recursive
# Source the test driver from the master branch.
echo "Sourcing test driver install script from: ${TEST_DRIVER_INSTALL_SCRIPT_URL}"
source /dev/stdin <<< "$(curl -s "${TEST_DRIVER_INSTALL_SCRIPT_URL}")"
activate_gke_cluster GKE_CLUSTER_PSM_LB
activate_secondary_gke_cluster GKE_CLUSTER_PSM_LB
set -x
if [[ -n "${KOKORO_ARTIFACTS_DIR}" ]]; then
kokoro_setup_test_driver "${GITHUB_REPOSITORY_NAME}"
else
local_setup_test_driver "${script_dir}"
fi
build_docker_images_if_needed
# Run tests
cd "${TEST_DRIVER_FULL_DIR}"
local failed_tests=0
test_suites=("baseline_test" "api_listener_test" "change_backend_service_test" "failover_test" "remove_neg_test" "round_robin_test")
for test in "${test_suites[@]}"; do
run_test $test || (( failed_tests++ ))
done
echo "Failed test suites: ${failed_tests}"
if (( failed_tests > 0 )); then
exit 1
fi
}
main "$@"

View File

@ -0,0 +1,158 @@
#!/usr/bin/env bash
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -eo pipefail
# Constants
readonly GITHUB_REPOSITORY_NAME="grpc-node"
readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/grpc/${TEST_DRIVER_BRANCH:-master}/tools/internal_ci/linux/grpc_xds_k8s_install_test_driver.sh"
## xDS test client Docker images
readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/node-client"
readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}"
readonly BUILD_APP_PATH="packages/grpc-js-xds/interop/Dockerfile"
readonly LANGUAGE_NAME="Node"
#######################################
# Builds test app Docker images and pushes them to GCR
# Globals:
# BUILD_APP_PATH
# CLIENT_IMAGE_NAME: Test client Docker image name
# GIT_COMMIT: SHA-1 of git commit being built
# TESTING_VERSION: version branch under test, f.e. v1.42.x, master
# Arguments:
# None
# Outputs:
# Writes the output of `gcloud builds submit` to stdout, stderr
#######################################
build_test_app_docker_images() {
echo "Building ${LANGUAGE_NAME} xDS interop test app Docker images"
pushd "${SRC_DIR}"
docker build \
-f "${BUILD_APP_PATH}" \
-t "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \
.
gcloud -q auth configure-docker
docker push "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}"
if is_version_branch "${TESTING_VERSION}"; then
tag_and_push_docker_image "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}" "${TESTING_VERSION}"
fi
popd
}
#######################################
# Builds test app and its docker images unless they already exist
# Globals:
# CLIENT_IMAGE_NAME: Test client Docker image name
# GIT_COMMIT: SHA-1 of git commit being built
# FORCE_IMAGE_BUILD
# Arguments:
# None
# Outputs:
# Writes the output to stdout, stderr
#######################################
build_docker_images_if_needed() {
# Check if images already exist
client_tags="$(gcloud_gcr_list_image_tags "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}")"
printf "Client image: %s:%s\n" "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}"
echo "${client_tags:-Client image not found}"
# Build if any of the images are missing, or FORCE_IMAGE_BUILD=1
if [[ "${FORCE_IMAGE_BUILD}" == "1" || -z "${client_tags}" ]]; then
build_test_app_docker_images
else
echo "Skipping ${LANGUAGE_NAME} test app build"
fi
}
#######################################
# Executes the test case
# Globals:
# TEST_DRIVER_FLAGFILE: Relative path to test driver flagfile
# KUBE_CONTEXT: The name of kubectl context with GKE cluster access
# TEST_XML_OUTPUT_DIR: Output directory for the test xUnit XML report
# CLIENT_IMAGE_NAME: Test client Docker image name
# GIT_COMMIT: SHA-1 of git commit being built
# TESTING_VERSION: version branch under test: used by the framework to determine the supported PSM
# features.
# Arguments:
# Test case name
# Outputs:
# Writes the output of test execution to stdout, stderr
# Test xUnit report to ${TEST_XML_OUTPUT_DIR}/${test_name}/sponge_log.xml
#######################################
run_test() {
# Test driver usage:
# https://github.com/grpc/grpc/tree/master/tools/run_tests/xds_k8s_test_driver#basic-usage
local test_name="${1:?Usage: run_test test_name}"
set -x
python3 -m "tests.${test_name}" \
--flagfile="${TEST_DRIVER_FLAGFILE}" \
--kube_context="${KUBE_CONTEXT}" \
--client_image="${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \
--testing_version="${TESTING_VERSION}" \
--xml_output_file="${TEST_XML_OUTPUT_DIR}/${test_name}/sponge_log.xml" \
--flagfile="config/url-map.cfg"
set +x
}
#######################################
# Main function: provision software necessary to execute tests, and run them
# Globals:
# KOKORO_ARTIFACTS_DIR
# GITHUB_REPOSITORY_NAME
# SRC_DIR: Populated with absolute path to the source repo
# TEST_DRIVER_REPO_DIR: Populated with the path to the repo containing
# the test driver
# TEST_DRIVER_FULL_DIR: Populated with the path to the test driver source code
# TEST_DRIVER_FLAGFILE: Populated with relative path to test driver flagfile
# TEST_XML_OUTPUT_DIR: Populated with the path to test xUnit XML report
# GIT_ORIGIN_URL: Populated with the origin URL of git repo used for the build
# GIT_COMMIT: Populated with the SHA-1 of git commit being built
# GIT_COMMIT_SHORT: Populated with the short SHA-1 of git commit being built
# KUBE_CONTEXT: Populated with name of kubectl context with GKE cluster access
# Arguments:
# None
# Outputs:
# Writes the output of test execution to stdout, stderr
#######################################
main() {
local script_dir
script_dir="$(dirname "$0")"
cd "${script_dir}"
git submodule update --init --recursive
# Source the test driver from the master branch.
echo "Sourcing test driver install script from: ${TEST_DRIVER_INSTALL_SCRIPT_URL}"
source /dev/stdin <<< "$(curl -s "${TEST_DRIVER_INSTALL_SCRIPT_URL}")"
activate_gke_cluster GKE_CLUSTER_PSM_BASIC
set -x
if [[ -n "${KOKORO_ARTIFACTS_DIR}" ]]; then
kokoro_setup_test_driver "${GITHUB_REPOSITORY_NAME}"
else
local_setup_test_driver "${script_dir}"
fi
build_docker_images_if_needed
# Run tests
cd "${TEST_DRIVER_FULL_DIR}"
run_test url_map
}
main "$@"

View File

@ -79,9 +79,8 @@ function translateOutlierDetectionConfig(outlierDetection: OutlierDetection__Out
return undefined;
}
if (!outlierDetection) {
/* No-op outlier detection config, with max possible interval and no
* ejection criteria configured. */
return new OutlierDetectionLoadBalancingConfig(~(1<<31), null, null, null, null, null, []);
/* No-op outlier detection config, with all fields unset. */
return new OutlierDetectionLoadBalancingConfig(null, null, null, null, null, null, []);
}
let successRateConfig: Partial<SuccessRateEjectionConfig> | null = null;
/* Success rate ejection is enabled by default, so we only disable it if

View File

@ -727,7 +727,7 @@ export class XdsClient {
if (serviceKind) {
this.adsState[serviceKind].reportStreamError({
code: status.UNAVAILABLE,
details: message,
details: message + ' Node ID=' + this.adsNodeV3!.id,
metadata: new Metadata()
});
resourceNames = this.adsState[serviceKind].getResourceNames();
@ -772,6 +772,7 @@ export class XdsClient {
}
private reportStreamError(status: StatusObject) {
status = {...status, details: status.details + ' Node ID=' + this.adsNodeV3!.id};
this.adsState.eds.reportStreamError(status);
this.adsState.cds.reportStreamError(status);
this.adsState.rds.reportStreamError(status);
@ -789,7 +790,6 @@ export class XdsClient {
trace('Received LRS response');
/* Once we get any response from the server, we assume that the stream is
* in a good state, so we can reset the backoff timer. */
this.lrsBackoff.stop();
this.lrsBackoff.reset();
if (
!this.receivedLrsSettingsForCurrentStream ||

View File

@ -17,6 +17,7 @@
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { isIPv4, isIPv6 } from "net";
import { Locality__Output } from "../generated/envoy/config/core/v3/Locality";
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
import { Any__Output } from "../generated/google/protobuf/Any";
import { BaseXdsStreamState, HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
@ -27,6 +28,10 @@ function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
function localitiesEqual(a: Locality__Output, b: Locality__Output) {
return a.region === b.region && a.sub_zone === b.sub_zone && a.zone === b.zone;
}
export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output> implements XdsStreamState<ClusterLoadAssignment__Output> {
protected getResourceName(resource: ClusterLoadAssignment__Output): string {
return resource.cluster_name;
@ -44,7 +49,17 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
* @param message
*/
public validateResponse(message: ClusterLoadAssignment__Output) {
const seenLocalities: {locality: Locality__Output, priority: number}[] = [];
for (const endpoint of message.endpoints) {
if (!endpoint.locality) {
return false;
}
for (const {locality, priority} of seenLocalities) {
if (localitiesEqual(endpoint.locality, locality) && endpoint.priority === priority) {
return false;
}
}
seenLocalities.push({locality: endpoint.locality, priority: endpoint.priority});
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.6.2",
"version": "1.6.8",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
@ -58,7 +58,7 @@
"generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto"
},
"dependencies": {
"@grpc/proto-loader": "^0.6.4",
"@grpc/proto-loader": "^0.7.0",
"@types/node": ">=12.12.47"
},
"files": [

View File

@ -100,6 +100,7 @@ export class BackoffTimeout {
}
private runTimer(delay: number) {
clearTimeout(this.timerId);
this.timerId = setTimeout(() => {
this.callback();
this.running = false;

View File

@ -76,9 +76,11 @@ export type ClientDuplexStream<
* error is not necessarily a problem in gRPC itself.
* @param status
*/
export function callErrorFromStatus(status: StatusObject): ServiceError {
export function callErrorFromStatus(status: StatusObject, callerStack: string): ServiceError {
const message = `${status.code} ${Status[status.code]}: ${status.details}`;
return Object.assign(new Error(message), status);
const error = new Error(message);
const stack = `${error.stack}\nfor call at\n${callerStack}`;
return Object.assign(new Error(message), status, {stack});
}
export class ClientUnaryCallImpl

View File

@ -20,6 +20,7 @@ import {
Call,
Http2CallStream,
CallStreamOptions,
StatusObject,
} from './call-stream';
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
@ -170,6 +171,14 @@ export class ChannelImplementation implements Channel {
*/
private callRefTimer: NodeJS.Timer;
private configSelector: ConfigSelector | null = null;
/**
* This is the error from the name resolver if it failed most recently. It
* is only used to end calls that start while there is no config selector
* and the name resolver is in backoff, so it should be nulled if
* configSelector becomes set or the channel state becomes anything other
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
// Channelz info
private readonly channelzEnabled: boolean = true;
@ -219,16 +228,9 @@ export class ChannelImplementation implements Channel {
}
this.channelzTrace = new ChannelzTrace();
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
if (this.channelzEnabled) {
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'channel',
id: -1,
name: ''
};
}
if (this.options['grpc.default_authority']) {
@ -297,6 +299,7 @@ export class ChannelImplementation implements Channel {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
}
this.configSelector = configSelector;
this.currentResolutionError = null;
/* We process the queue asynchronously to ensure that the corresponding
* load balancer update has completed. */
process.nextTick(() => {
@ -316,6 +319,9 @@ export class ChannelImplementation implements Channel {
if (this.configSelectionQueue.length > 0) {
this.trace('Name resolution failed with calls queued for config selection');
}
if (this.configSelector === null) {
this.currentResolutionError = status;
}
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
this.callRefTimerUnref();
@ -598,6 +604,9 @@ export class ChannelImplementation implements Channel {
watcherObject.callback();
}
}
if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
this.currentResolutionError = null;
}
}
private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
@ -612,11 +621,15 @@ export class ChannelImplementation implements Channel {
* ResolvingLoadBalancer may be idle and if so it needs to be kicked
* because it now has a pending request. */
this.resolvingLoadBalancer.exitIdle();
this.configSelectionQueue.push({
callStream: stream,
callMetadata: metadata,
});
this.callRefTimerRef();
if (this.currentResolutionError && !metadata.getOptions().waitForReady) {
stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details);
} else {
this.configSelectionQueue.push({
callStream: stream,
callMetadata: metadata,
});
this.callRefTimerRef();
}
} else {
const callConfig = this.configSelector(stream.getMethod(), metadata);
if (callConfig.status === Status.OK) {

View File

@ -347,31 +347,39 @@ const subchannels: (SubchannelEntry | undefined)[] = [];
const servers: (ServerEntry | undefined)[] = [];
const sockets: (SocketEntry | undefined)[] = [];
export function registerChannelzChannel(name: string, getInfo: () => ChannelInfo): ChannelRef {
export function registerChannelzChannel(name: string, getInfo: () => ChannelInfo, channelzEnabled: boolean): ChannelRef {
const id = getNextId();
const ref: ChannelRef = {id, name, kind: 'channel'};
channels[id] = { ref, getInfo };
if (channelzEnabled) {
channels[id] = { ref, getInfo };
}
return ref;
}
export function registerChannelzSubchannel(name: string, getInfo:() => SubchannelInfo): SubchannelRef {
export function registerChannelzSubchannel(name: string, getInfo:() => SubchannelInfo, channelzEnabled: boolean): SubchannelRef {
const id = getNextId();
const ref: SubchannelRef = {id, name, kind: 'subchannel'};
subchannels[id] = { ref, getInfo };
if (channelzEnabled) {
subchannels[id] = { ref, getInfo };
}
return ref;
}
export function registerChannelzServer(getInfo: () => ServerInfo): ServerRef {
export function registerChannelzServer(getInfo: () => ServerInfo, channelzEnabled: boolean): ServerRef {
const id = getNextId();
const ref: ServerRef = {id, kind: 'server'};
servers[id] = { ref, getInfo };
if (channelzEnabled) {
servers[id] = { ref, getInfo };
}
return ref;
}
export function registerChannelzSocket(name: string, getInfo: () => SocketInfo): SocketRef {
export function registerChannelzSocket(name: string, getInfo: () => SocketInfo, channelzEnabled: boolean): SocketRef {
const id = getNextId();
const ref: SocketRef = {id, name, kind: 'socket'};
sockets[id] = { ref, getInfo};
if (channelzEnabled) {
sockets[id] = { ref, getInfo};
}
return ref;
}

View File

@ -321,6 +321,7 @@ export class Client {
}
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
call.start(callProperties.metadata, {
onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata);
@ -338,9 +339,17 @@ export class Client {
}
receivedStatus = true;
if (status.code === Status.OK) {
callProperties.callback!(null, responseMessage!);
if (responseMessage === null) {
callProperties.callback!(callErrorFromStatus({
code: Status.INTERNAL,
details: 'No message received',
metadata: status.metadata
}, callerStack));
} else {
callProperties.callback!(null, responseMessage);
}
} else {
callProperties.callback!(callErrorFromStatus(status));
callProperties.callback!(callErrorFromStatus(status, callerStack));
}
emitter.emit('status', status);
},
@ -438,6 +447,7 @@ export class Client {
}
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
call.start(callProperties.metadata, {
onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata);
@ -455,9 +465,17 @@ export class Client {
}
receivedStatus = true;
if (status.code === Status.OK) {
callProperties.callback!(null, responseMessage!);
if (responseMessage === null) {
callProperties.callback!(callErrorFromStatus({
code: Status.INTERNAL,
details: 'No message received',
metadata: status.metadata
}, callerStack));
} else {
callProperties.callback!(null, responseMessage);
}
} else {
callProperties.callback!(callErrorFromStatus(status));
callProperties.callback!(callErrorFromStatus(status, callerStack));
}
emitter.emit('status', status);
},
@ -559,6 +577,7 @@ export class Client {
call.setCredentials(callProperties.callOptions.credentials);
}
let receivedStatus = false;
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
call.start(callProperties.metadata, {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
@ -574,7 +593,7 @@ export class Client {
receivedStatus = true;
stream.push(null);
if (status.code !== Status.OK) {
stream.emit('error', callErrorFromStatus(status));
stream.emit('error', callErrorFromStatus(status, callerStack));
}
stream.emit('status', status);
},
@ -656,6 +675,7 @@ export class Client {
call.setCredentials(callProperties.callOptions.credentials);
}
let receivedStatus = false;
const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
call.start(callProperties.metadata, {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
@ -670,7 +690,7 @@ export class Client {
receivedStatus = true;
stream.push(null);
if (status.code !== Status.OK) {
stream.emit('error', callErrorFromStatus(status));
stream.emit('error', callErrorFromStatus(status, callerStack));
}
stream.emit('status', status);
},

View File

@ -18,7 +18,7 @@
import { ChannelOptions, connectivityState, StatusObject } from ".";
import { Call } from "./call-stream";
import { ConnectivityState } from "./connectivity-state";
import { Status } from "./constants";
import { LogVerbosity, Status } from "./constants";
import { durationToMs, isDuration, msToDuration } from "./duration";
import { ChannelControlHelper, createChildChannelControlHelper, registerLoadBalancerType } from "./experimental";
import { BaseFilter, Filter, FilterFactory } from "./filter";
@ -28,7 +28,13 @@ import { PickArgs, Picker, PickResult, PickResultType, QueuePicker, UnavailableP
import { Subchannel } from "./subchannel";
import { SubchannelAddress, subchannelAddressToString } from "./subchannel-address";
import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from "./subchannel-interface";
import * as logging from './logging';
const TRACER_NAME = 'outlier_detection';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const TYPE_NAME = 'outlier_detection';
@ -193,12 +199,13 @@ export class OutlierDetectionLoadBalancingConfig implements LoadBalancingConfig
}
class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface {
private childSubchannelState: ConnectivityState = ConnectivityState.IDLE;
private childSubchannelState: ConnectivityState;
private stateListeners: ConnectivityStateListener[] = [];
private ejected: boolean = false;
private refCount: number = 0;
constructor(childSubchannel: SubchannelInterface, private mapEntry?: MapEntry) {
super(childSubchannel);
this.childSubchannelState = childSubchannel.getConnectivityState();
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState) => {
this.childSubchannelState = newState;
if (!this.ejected) {
@ -209,6 +216,14 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
});
}
getConnectivityState(): connectivityState {
if (this.ejected) {
return ConnectivityState.TRANSIENT_FAILURE;
} else {
return this.childSubchannelState;
}
}
/**
* Add a listener function to be called whenever the wrapper's
* connectivity state changes.
@ -334,20 +349,27 @@ class OutlierDetectionCounterFilterFactory implements FilterFactory<OutlierDetec
}
class OutlierDetectionPicker implements Picker {
constructor(private wrappedPicker: Picker) {}
constructor(private wrappedPicker: Picker, private countCalls: boolean) {}
pick(pickArgs: PickArgs): PickResult {
const wrappedPick = this.wrappedPicker.pick(pickArgs);
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
const subchannelWrapper = wrappedPick.subchannel as OutlierDetectionSubchannelWrapper;
const mapEntry = subchannelWrapper.getMapEntry();
if (mapEntry) {
const extraFilterFactories = [...wrappedPick.extraFilterFactories];
if (this.countCalls) {
extraFilterFactories.push(new OutlierDetectionCounterFilterFactory(mapEntry.counter));
}
return {
...wrappedPick,
subchannel: subchannelWrapper.getWrappedSubchannel(),
extraFilterFactories: [...wrappedPick.extraFilterFactories, new OutlierDetectionCounterFilterFactory(mapEntry.counter)]
extraFilterFactories: extraFilterFactories
};
} else {
return wrappedPick;
return {
...wrappedPick,
subchannel: subchannelWrapper.getWrappedSubchannel()
}
}
} else {
return wrappedPick;
@ -361,6 +383,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
private addressMap: Map<string, MapEntry> = new Map<string, MapEntry>();
private latestConfig: OutlierDetectionLoadBalancingConfig | null = null;
private ejectionTimer: NodeJS.Timer;
private timerStartTime: Date | null = null;
constructor(channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
@ -368,12 +391,16 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
const originalSubchannel = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
const mapEntry = this.addressMap.get(subchannelAddressToString(subchannelAddress));
const subchannelWrapper = new OutlierDetectionSubchannelWrapper(originalSubchannel, mapEntry);
if (mapEntry?.currentEjectionTimestamp !== null) {
// If the address is ejected, propagate that to the new subchannel wrapper
subchannelWrapper.eject();
}
mapEntry?.subchannelWrappers.push(subchannelWrapper);
return subchannelWrapper;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
if (connectivityState === ConnectivityState.READY) {
channelControlHelper.updateState(connectivityState, new OutlierDetectionPicker(picker));
channelControlHelper.updateState(connectivityState, new OutlierDetectionPicker(picker, this.isCountingEnabled()));
} else {
channelControlHelper.updateState(connectivityState, picker);
}
@ -383,6 +410,12 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
clearInterval(this.ejectionTimer);
}
private isCountingEnabled(): boolean {
return this.latestConfig !== null &&
(this.latestConfig.getSuccessRateEjectionConfig() !== null ||
this.latestConfig.getFailurePercentageEjectionConfig() !== null);
}
private getCurrentEjectionPercent() {
let ejectionCount = 0;
for (const mapEntry of this.addressMap.values()) {
@ -401,6 +434,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
if (!successRateConfig) {
return;
}
trace('Running success rate check');
// Step 1
const targetRequestVolume = successRateConfig.request_volume;
let addresesWithTargetVolume = 0;
@ -413,22 +447,25 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
successRates.push(successes/(successes + failures));
}
}
trace('Found ' + addresesWithTargetVolume + ' success rate candidates; currentEjectionPercent=' + this.getCurrentEjectionPercent() + ' successRates=[' + successRates + ']');
if (addresesWithTargetVolume < successRateConfig.minimum_hosts) {
return;
}
// Step 2
const successRateMean = successRates.reduce((a, b) => a + b);
let successRateVariance = 0;
const successRateMean = successRates.reduce((a, b) => a + b) / successRates.length;
let successRateDeviationSum = 0;
for (const rate of successRates) {
const deviation = rate - successRateMean;
successRateVariance += deviation * deviation;
successRateDeviationSum += deviation * deviation;
}
const successRateVariance = successRateDeviationSum / successRates.length;
const successRateStdev = Math.sqrt(successRateVariance);
const ejectionThreshold = successRateMean - successRateStdev * (successRateConfig.stdev_factor / 1000);
trace('stdev=' + successRateStdev + ' ejectionThreshold=' + ejectionThreshold);
// Step 3
for (const mapEntry of this.addressMap.values()) {
for (const [address, mapEntry] of this.addressMap.entries()) {
// Step 3.i
if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) {
break;
@ -441,9 +478,12 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
}
// Step 3.iii
const successRate = successes / (successes + failures);
trace('Checking candidate ' + address + ' successRate=' + successRate);
if (successRate < ejectionThreshold) {
const randomNumber = Math.random() * 100;
trace('Candidate ' + address + ' randomNumber=' + randomNumber + ' enforcement_percentage=' + successRateConfig.enforcement_percentage);
if (randomNumber < successRateConfig.enforcement_percentage) {
trace('Ejecting candidate ' + address);
this.eject(mapEntry, ejectionTimestamp);
}
}
@ -458,13 +498,14 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
if (!failurePercentageConfig) {
return;
}
trace('Running failure percentage check. threshold=' + failurePercentageConfig.threshold + ' request volume threshold=' + failurePercentageConfig.request_volume);
// Step 1
if (this.addressMap.size < failurePercentageConfig.minimum_hosts) {
return;
}
// Step 2
for (const mapEntry of this.addressMap.values()) {
for (const [address, mapEntry] of this.addressMap.entries()) {
// Step 2.i
if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) {
break;
@ -472,6 +513,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
// Step 2.ii
const successes = mapEntry.counter.getLastSuccesses();
const failures = mapEntry.counter.getLastFailures();
trace('Candidate successes=' + successes + ' failures=' + failures);
if (successes + failures < failurePercentageConfig.request_volume) {
continue;
}
@ -479,7 +521,9 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
const failurePercentage = (failures * 100) / (failures + successes);
if (failurePercentage > failurePercentageConfig.threshold) {
const randomNumber = Math.random() * 100;
trace('Candidate ' + address + ' randomNumber=' + randomNumber + ' enforcement_percentage=' + failurePercentageConfig.enforcement_percentage);
if (randomNumber < failurePercentageConfig.enforcement_percentage) {
trace('Ejecting candidate ' + address);
this.eject(mapEntry, ejectionTimestamp);
}
}
@ -501,21 +545,32 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
}
}
private runChecks() {
const ejectionTimestamp = new Date();
private switchAllBuckets() {
for (const mapEntry of this.addressMap.values()) {
mapEntry.counter.switchBuckets();
}
}
private startTimer(delayMs: number) {
this.ejectionTimer = setTimeout(() => this.runChecks(), delayMs);
}
private runChecks() {
const ejectionTimestamp = new Date();
trace('Ejection timer running');
this.switchAllBuckets();
if (!this.latestConfig) {
return;
}
this.timerStartTime = ejectionTimestamp;
this.startTimer(this.latestConfig.getIntervalMs());
this.runSuccessRateCheck(ejectionTimestamp);
this.runFailurePercentageCheck(ejectionTimestamp);
for (const mapEntry of this.addressMap.values()) {
for (const [address, mapEntry] of this.addressMap.entries()) {
if (mapEntry.currentEjectionTimestamp === null) {
if (mapEntry.ejectionTimeMultiplier > 0) {
mapEntry.ejectionTimeMultiplier -= 1;
@ -526,6 +581,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
const returnTime = new Date(mapEntry.currentEjectionTimestamp.getTime());
returnTime.setMilliseconds(returnTime.getMilliseconds() + Math.min(baseEjectionTimeMs * mapEntry.ejectionTimeMultiplier, Math.max(baseEjectionTimeMs, maxEjectionTimeMs)));
if (returnTime < new Date()) {
trace('Unejecting ' + address);
this.uneject(mapEntry);
}
}
@ -542,6 +598,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
}
for (const address of subchannelAddresses) {
if (!this.addressMap.has(address)) {
trace('Adding map entry for ' + address);
this.addressMap.set(address, {
counter: new CallCounter(),
currentEjectionTimestamp: null,
@ -552,6 +609,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
}
for (const key of this.addressMap.keys()) {
if (!subchannelAddresses.has(key)) {
trace('Removing map entry for ' + key);
this.addressMap.delete(key);
}
}
@ -561,10 +619,28 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
);
this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
if (this.latestConfig === null || this.latestConfig.getIntervalMs() !== lbConfig.getIntervalMs()) {
clearInterval(this.ejectionTimer);
this.ejectionTimer = setInterval(() => this.runChecks(), lbConfig.getIntervalMs());
if (lbConfig.getSuccessRateEjectionConfig() || lbConfig.getFailurePercentageEjectionConfig()) {
if (this.timerStartTime) {
trace('Previous timer existed. Replacing timer');
clearTimeout(this.ejectionTimer);
const remainingDelay = lbConfig.getIntervalMs() - ((new Date()).getTime() - this.timerStartTime.getTime());
this.startTimer(remainingDelay);
} else {
trace('Starting new timer');
this.timerStartTime = new Date();
this.startTimer(lbConfig.getIntervalMs());
this.switchAllBuckets();
}
} else {
trace('Counting disabled. Cancelling timer.');
this.timerStartTime = null;
clearTimeout(this.ejectionTimer);
for (const mapEntry of this.addressMap.values()) {
this.uneject(mapEntry);
mapEntry.ejectionTimeMultiplier = 0;
}
}
this.latestConfig = lbConfig;
}
exitIdle(): void {
@ -574,6 +650,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
this.childBalancer.resetBackoff();
}
destroy(): void {
clearTimeout(this.ejectionTimer);
this.childBalancer.destroy();
}
getTypeName(): string {

View File

@ -184,8 +184,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
) {
/* If all of the subchannels are IDLE we should go back to a
* basic IDLE state where there is no subchannel list to avoid
* holding unused resources */
this.resetSubchannelList();
* holding unused resources. We do not reset triedAllSubchannels
* because that is a reminder to request reresolution the next time
* this LB policy needs to connect. */
this.resetSubchannelList(false);
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
return;
}
@ -337,7 +339,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(newState, picker);
}
private resetSubchannelList() {
private resetSubchannelList(resetTriedAllSubchannels = true) {
for (const subchannel of this.subchannels) {
subchannel.removeConnectivityStateListener(this.subchannelStateListener);
subchannel.unref();
@ -352,7 +354,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
[ConnectivityState.TRANSIENT_FAILURE]: 0,
};
this.subchannels = [];
this.triedAllSubchannels = false;
if (resetTriedAllSubchannels) {
this.triedAllSubchannels = false;
}
}
/**
@ -425,6 +429,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
exitIdle() {
if (
this.currentState === ConnectivityState.IDLE ||
this.triedAllSubchannels
) {
this.channelControlHelper.requestReresolution();
}
for (const subchannel of this.subchannels) {
subchannel.startConnecting();
}
@ -433,12 +443,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.connectToAddressList();
}
}
if (
this.currentState === ConnectivityState.IDLE ||
this.triedAllSubchannels
) {
this.channelControlHelper.requestReresolution();
}
}
resetBackoff() {

View File

@ -158,7 +158,6 @@ class DnsResolver implements Resolver {
if (this.ipResult !== null) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.backoff.reset();
this.listener.onSuccessfulResolution(
this.ipResult!,
null,
@ -167,6 +166,8 @@ class DnsResolver implements Resolver {
{}
);
});
this.backoff.stop();
this.backoff.reset();
return;
}
if (this.dnsHostname === null) {
@ -178,7 +179,11 @@ class DnsResolver implements Resolver {
metadata: new Metadata(),
});
});
this.stopNextResolutionTimer();
} else {
if (this.pendingLookupPromise !== null) {
return;
}
trace('Looking up DNS hostname ' + this.dnsHostname);
/* We clear out latestLookupResult here to ensure that it contains the
* latest result since the last time we started resolving. That way, the
@ -299,6 +304,7 @@ class DnsResolver implements Resolver {
}
private startNextResolutionTimer() {
clearTimeout(this.nextResolutionTimer);
this.nextResolutionTimer = setTimeout(() => {
this.stopNextResolutionTimer();
if (this.continueResolving) {
@ -314,9 +320,12 @@ class DnsResolver implements Resolver {
}
private startResolutionWithBackoff() {
if (this.pendingLookupPromise === null) {
this.continueResolving = false;
this.startResolution();
this.backoff.runOnce();
this.startNextResolutionTimer();
}
}
updateResolution() {

View File

@ -268,6 +268,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
if (this.currentState === ConnectivityState.IDLE) {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
}
this.backoffTimeout.runOnce();
}
private updateState(connectivityState: ConnectivityState, picker: Picker) {
@ -294,19 +295,17 @@ export class ResolvingLoadBalancer implements LoadBalancer {
);
this.onFailedResolution(error);
}
this.backoffTimeout.runOnce();
}
exitIdle() {
this.childLoadBalancer.exitIdle();
if (this.currentState === ConnectivityState.IDLE) {
if (this.currentState === ConnectivityState.IDLE || this.currentState === ConnectivityState.TRANSIENT_FAILURE) {
if (this.backoffTimeout.isRunning()) {
this.continueResolving = true;
} else {
this.updateResolution();
}
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
}
this.childLoadBalancer.exitIdle();
}
updateAddressList(

View File

@ -161,17 +161,11 @@ export class Server {
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo(), this.channelzEnabled);
if (this.channelzEnabled) {
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Server created');
this.trace('Server constructed');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'server',
id: -1
};
}
this.trace('Server constructed');
}
private getChannelzInfo(): ServerInfo {
@ -431,34 +425,28 @@ export class Server {
}
}
let channelzRef: SocketRef;
if (this.channelzEnabled) {
channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
return {
localAddress: boundSubchannelAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null
};
});
this.listenerChildrenTracker.refChild(channelzRef);
} else {
channelzRef = {
kind: 'socket',
id: -1,
name: ''
channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
return {
localAddress: boundSubchannelAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null
};
}, this.channelzEnabled);
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
@ -509,34 +497,28 @@ export class Server {
port: boundAddress.port
};
let channelzRef: SocketRef;
if (this.channelzEnabled) {
channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
return {
localAddress: boundSubchannelAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null
};
});
this.listenerChildrenTracker.refChild(channelzRef);
} else {
channelzRef = {
kind: 'socket',
id: -1,
name: ''
channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
return {
localAddress: boundSubchannelAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null
};
}, this.channelzEnabled);
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
@ -893,15 +875,7 @@ export class Server {
}
let channelzRef: SocketRef;
if (this.channelzEnabled) {
channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session));
} else {
channelzRef = {
kind: 'socket',
id: -1,
name: ''
}
}
channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
const channelzSessionInfo: ChannelzSessionInfo = {
ref: channelzRef,

View File

@ -49,10 +49,8 @@ export class SubchannelPool {
/**
* A pool of subchannels use for making connections. Subchannels with the
* exact same parameters will be reused.
* @param global If true, this is the global subchannel pool. Otherwise, it
* is the pool for a single channel.
*/
constructor(private global: boolean) {}
constructor() {}
/**
* Unrefs all unused subchannels and cancels the cleanup task if all
@ -95,7 +93,7 @@ export class SubchannelPool {
* Ensures that the cleanup task is spawned.
*/
ensureCleanupTask(): void {
if (this.global && this.cleanupTimer === null) {
if (this.cleanupTimer === null) {
this.cleanupTimer = setInterval(() => {
this.unrefUnusedSubchannels();
}, REF_CHECK_INTERVAL);
@ -156,14 +154,12 @@ export class SubchannelPool {
channelCredentials,
subchannel,
});
if (this.global) {
subchannel.ref();
}
subchannel.ref();
return subchannel;
}
}
const globalSubchannelPool = new SubchannelPool(true);
const globalSubchannelPool = new SubchannelPool();
/**
* Get either the global subchannel pool, or a new subchannel pool.
@ -173,6 +169,6 @@ export function getSubchannelPool(global: boolean): SubchannelPool {
if (global) {
return globalSubchannelPool;
} else {
return new SubchannelPool(false);
return new SubchannelPool();
}
}

View File

@ -108,7 +108,7 @@ export class Subchannel {
* socket disconnects. Used for ending active calls with an UNAVAILABLE
* status.
*/
private disconnectListeners: Array<() => void> = [];
private disconnectListeners: Set<() => void> = new Set();
private backoffTimeout: BackoffTimeout;
@ -227,16 +227,9 @@ export class Subchannel {
this.channelzEnabled = false;
}
this.channelzTrace = new ChannelzTrace();
this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
if (this.channelzEnabled) {
this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'subchannel',
id: -1,
name: ''
};
}
this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
}
@ -328,6 +321,10 @@ export class Subchannel {
logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private keepaliveTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private handleBackoffTimer() {
if (this.continueConnecting) {
this.transitionToState(
@ -358,18 +355,15 @@ export class Subchannel {
if (this.channelzEnabled) {
this.keepalivesSent += 1;
}
logging.trace(
LogVerbosity.DEBUG,
'keepalive',
'(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' +
'Sending ping'
);
this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms');
this.keepaliveTimeoutId = setTimeout(() => {
this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
this.keepaliveTrace('Ping timeout passed without response');
this.handleDisconnect();
}, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.();
this.session!.ping(
(err: Error | null, duration: number, payload: Buffer) => {
this.keepaliveTrace('Received ping response');
clearTimeout(this.keepaliveTimeoutId);
}
);
@ -384,6 +378,11 @@ export class Subchannel {
* sending pings should also involve some network activity. */
}
/**
* Stop keepalive pings when terminating a connection. This discards the
* outstanding ping timeout, so it should not be called if the same
* connection will still be used.
*/
private stopKeepalivePings() {
clearInterval(this.keepaliveIntervalId);
clearTimeout(this.keepaliveTimeoutId);
@ -407,6 +406,12 @@ export class Subchannel {
connectionOptions.maxSessionMemory = this.options[
'grpc-node.max_session_memory'
];
} else {
/* By default, set a very large max session memory limit, to effectively
* disable enforcement of the limit. Some testing indicates that Node's
* behavior degrades badly when this limit is reached, so we solve that
* by disabling the check entirely. */
connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
}
let addressScheme = 'http://';
if ('secureContext' in connectionOptions) {
@ -484,8 +489,8 @@ export class Subchannel {
connectionOptions
);
this.session = session;
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!, this.channelzEnabled);
if (this.channelzEnabled) {
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!);
this.childrenTracker.refChild(this.channelzSocketRef);
}
session.unref();
@ -637,6 +642,15 @@ export class Subchannel {
);
}
private handleDisconnect() {
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
for (const listener of this.disconnectListeners.values()) {
listener();
}
}
/**
* Initiate a state transition from any element of oldStates to the new
* state. If the current connectivityState is not in oldStates, do nothing.
@ -667,12 +681,7 @@ export class Subchannel {
const session = this.session!;
session.socket.once('close', () => {
if (this.session === session) {
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
for (const listener of this.disconnectListeners) {
listener();
}
this.handleDisconnect();
}
});
if (this.keepaliveWithoutCalls) {
@ -732,7 +741,7 @@ export class Subchannel {
}
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
ConnectivityState.IDLE
);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
@ -773,7 +782,7 @@ export class Subchannel {
}
this.backoffTimeout.unref();
if (!this.keepaliveWithoutCalls) {
this.stopKeepalivePings();
clearInterval(this.keepaliveIntervalId);
}
this.checkBothRefcounts();
}
@ -962,14 +971,11 @@ export class Subchannel {
}
addDisconnectListener(listener: () => void) {
this.disconnectListeners.push(listener);
this.disconnectListeners.add(listener);
}
removeDisconnectListener(listener: () => void) {
const listenerIndex = this.disconnectListeners.indexOf(listener);
if (listenerIndex > -1) {
this.disconnectListeners.splice(listenerIndex, 1);
}
this.disconnectListeners.delete(listener);
}
/**

View File

@ -25,6 +25,7 @@ message Request {
message Response {
int32 count = 1;
string message = 2;
}
service TestService {

View File

@ -3,8 +3,10 @@
export interface Response {
'count'?: (number);
'message'?: (string);
}
export interface Response__Output {
'count': (number);
'message': (string);
}

View File

@ -11,28 +11,28 @@ export interface TestServiceClient extends grpc.Client {
bidiStream(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_Request, _Response__Output>;
bidiStream(options?: grpc.CallOptions): grpc.ClientDuplexStream<_Request, _Response__Output>;
ClientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
ClientStream(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
ClientStream(options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
ClientStream(callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
clientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
clientStream(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
clientStream(options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
clientStream(callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientWritableStream<_Request>;
ClientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
ClientStream(metadata: grpc.Metadata, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
ClientStream(options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
ClientStream(callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
clientStream(metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
clientStream(metadata: grpc.Metadata, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
clientStream(options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
clientStream(callback: grpc.requestCallback<_Response__Output>): grpc.ClientWritableStream<_Request>;
ServerStream(argument: _Request, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_Response__Output>;
ServerStream(argument: _Request, options?: grpc.CallOptions): grpc.ClientReadableStream<_Response__Output>;
serverStream(argument: _Request, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_Response__Output>;
serverStream(argument: _Request, options?: grpc.CallOptions): grpc.ClientReadableStream<_Response__Output>;
Unary(argument: _Request, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
Unary(argument: _Request, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
Unary(argument: _Request, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
Unary(argument: _Request, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
unary(argument: _Request, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
unary(argument: _Request, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
unary(argument: _Request, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
unary(argument: _Request, callback: (error?: grpc.ServiceError, result?: _Response__Output) => void): grpc.ClientUnaryCall;
Unary(argument: _Request, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
Unary(argument: _Request, metadata: grpc.Metadata, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
Unary(argument: _Request, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
Unary(argument: _Request, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
unary(argument: _Request, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
unary(argument: _Request, metadata: grpc.Metadata, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
unary(argument: _Request, options: grpc.CallOptions, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
unary(argument: _Request, callback: grpc.requestCallback<_Response__Output>): grpc.ClientUnaryCall;
}

View File

@ -92,4 +92,28 @@ describe('Client without a server', () => {
});
});
});
});
describe('Client with a nonexistent target domain', () => {
let client: Client;
before(() => {
// DNS name that does not exist per RFC 6761 section 6.4
client = new Client('host.invalid', clientInsecureCreds);
});
after(() => {
client.close();
});
it('should fail multiple calls', function(done) {
this.timeout(5000);
// Regression test for https://github.com/grpc/grpc-node/issues/1411
client.makeUnaryRequest('/service/method', x => x, x => x, Buffer.from([]), (error, value) => {
assert(error);
assert.strictEqual(error?.code, grpc.status.UNAVAILABLE);
client.makeUnaryRequest('/service/method', x => x, x => x, Buffer.from([]), (error, value) => {
assert(error);
assert.strictEqual(error?.code, grpc.status.UNAVAILABLE);
done();
});
});
});
});

View File

@ -356,6 +356,70 @@ describe('Name Resolver', () => {
const resolver2 = resolverManager.createResolver(target2, listener, {});
resolver2.updateResolution();
});
it('should not keep repeating successful resolutions', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('localhost')!)!;
let resultCount = 0;
const resolver = resolverManager.createResolver(target, {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '127.0.0.1' &&
addr.port === 443
)
);
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '::1' &&
addr.port === 443
)
);
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
},
onError: (error: StatusObject) => {
assert.ifError(error);
},
}, {'grpc.dns_min_time_between_resolutions_ms': 2000});
resolver.updateResolution();
setTimeout(() => {
assert.strictEqual(resultCount, 2, `resultCount ${resultCount} !== 2`);
done();
}, 10_000);
}).timeout(15_000);
it('should not keep repeating failed resolutions', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('host.invalid')!)!;
let resultCount = 0;
const resolver = resolverManager.createResolver(target, {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert.fail('Resolution succeeded unexpectedly');
},
onError: (error: StatusObject) => {
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
},
}, {});
resolver.updateResolution();
setTimeout(() => {
assert.strictEqual(resultCount, 2, `resultCount ${resultCount} !== 2`);
done();
}, 10_000);
}).timeout(15_000);
});
describe('UDS Names', () => {
it('Should handle a relative Unix Domain Socket name', done => {

View File

@ -623,7 +623,7 @@ describe('Generic client and server', () => {
describe('Compressed requests', () => {
const testServiceHandlers: TestServiceHandlers = {
Unary(call, callback) {
callback(null, { count: 500000 });
callback(null, { count: 500000, message: call.request.message });
},
ClientStream(call, callback) {
@ -847,6 +847,20 @@ describe('Compressed requests', () => {
})
});
});
it('Should handle large messages', done => {
let longMessage = '';
for (let i = 0; i < 400000; i++) {
const letter = 'abcdefghijklmnopqrstuvwxyz'[Math.floor(Math.random() * 26)];
longMessage = longMessage + letter.repeat(10);
}
client.unary({message: longMessage}, (err, response) => {
assert.ifError(err);
assert.strictEqual(response?.message, longMessage);
done();
})
})
/* As of Node 16, Writable and Duplex streams validate the encoding
* argument to write, and the flags values we are passing there are not

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/proto-loader",
"version": "0.6.12",
"version": "0.7.0",
"author": "Google Inc.",
"contributors": [
{
@ -48,7 +48,7 @@
"@types/long": "^4.0.1",
"lodash.camelcase": "^4.3.0",
"long": "^4.0.0",
"protobufjs": "^6.10.0",
"protobufjs": "^7.0.0",
"yargs": "^16.2.0"
},
"devDependencies": {

View File

@ -0,0 +1,26 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for Kokoro (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc-node/packages/grpc-js-xds/scripts/xds_k8s_lb.sh"
timeout_mins: 180
action {
define_artifacts {
regex: "artifacts/**/*sponge_log.xml"
regex: "artifacts/**/*sponge_log.log"
strip_prefix: "artifacts"
}
}

View File

@ -0,0 +1,26 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for Kokoro (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc-node/packages/grpc-js-xds/scripts/xds_k8s_url_map.sh"
timeout_mins: 180
action {
define_artifacts {
regex: "artifacts/**/*sponge_log.xml"
regex: "artifacts/**/*sponge_log.log"
strip_prefix: "artifacts"
}
}