mirror of https://github.com/grpc/grpc-node.git
345 lines
11 KiB
TypeScript
345 lines
11 KiB
TypeScript
/*
|
|
* Copyright 2022 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
import { CallCredentials } from './call-credentials';
|
|
import {
|
|
Call,
|
|
InterceptingListener,
|
|
MessageContext,
|
|
StatusObject,
|
|
} from './call-interface';
|
|
import { SubchannelCall } from './subchannel-call';
|
|
import { ConnectivityState } from './connectivity-state';
|
|
import { LogVerbosity, Status } from './constants';
|
|
import { Deadline, getDeadlineTimeoutString } from './deadline';
|
|
import { InternalChannel } from './internal-channel';
|
|
import { Metadata } from './metadata';
|
|
import { PickResultType } from './picker';
|
|
import { CallConfig } from './resolver';
|
|
import { splitHostPort } from './uri-parser';
|
|
import * as logging from './logging';
|
|
import { restrictControlPlaneStatusCode } from './control-plane-status';
|
|
import * as http2 from 'http2';
|
|
|
|
const TRACER_NAME = 'load_balancing_call';
|
|
|
|
export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';
|
|
|
|
export interface StatusObjectWithProgress extends StatusObject {
|
|
progress: RpcProgress;
|
|
}
|
|
|
|
export interface LoadBalancingCallInterceptingListener
|
|
extends InterceptingListener {
|
|
onReceiveStatus(status: StatusObjectWithProgress): void;
|
|
}
|
|
|
|
export class LoadBalancingCall implements Call {
|
|
private child: SubchannelCall | null = null;
|
|
private readPending = false;
|
|
private pendingMessage: { context: MessageContext; message: Buffer } | null =
|
|
null;
|
|
private pendingHalfClose = false;
|
|
private ended = false;
|
|
private serviceUrl: string;
|
|
private metadata: Metadata | null = null;
|
|
private listener: InterceptingListener | null = null;
|
|
private onCallEnded: ((statusCode: Status) => void) | null = null;
|
|
constructor(
|
|
private readonly channel: InternalChannel,
|
|
private readonly callConfig: CallConfig,
|
|
private readonly methodName: string,
|
|
private readonly host: string,
|
|
private readonly credentials: CallCredentials,
|
|
private readonly deadline: Deadline,
|
|
private readonly callNumber: number
|
|
) {
|
|
const splitPath: string[] = this.methodName.split('/');
|
|
let serviceName = '';
|
|
/* The standard path format is "/{serviceName}/{methodName}", so if we split
|
|
* by '/', the first item should be empty and the second should be the
|
|
* service name */
|
|
if (splitPath.length >= 2) {
|
|
serviceName = splitPath[1];
|
|
}
|
|
const hostname = splitHostPort(this.host)?.host ?? 'localhost';
|
|
/* Currently, call credentials are only allowed on HTTPS connections, so we
|
|
* can assume that the scheme is "https" */
|
|
this.serviceUrl = `https://${hostname}/${serviceName}`;
|
|
}
|
|
|
|
private trace(text: string): void {
|
|
logging.trace(
|
|
LogVerbosity.DEBUG,
|
|
TRACER_NAME,
|
|
'[' + this.callNumber + '] ' + text
|
|
);
|
|
}
|
|
|
|
private outputStatus(status: StatusObject, progress: RpcProgress) {
|
|
if (!this.ended) {
|
|
this.ended = true;
|
|
this.trace(
|
|
'ended with status: code=' +
|
|
status.code +
|
|
' details="' +
|
|
status.details +
|
|
'"'
|
|
);
|
|
const finalStatus = { ...status, progress };
|
|
this.listener?.onReceiveStatus(finalStatus);
|
|
this.onCallEnded?.(finalStatus.code);
|
|
}
|
|
}
|
|
|
|
doPick() {
|
|
if (this.ended) {
|
|
return;
|
|
}
|
|
if (!this.metadata) {
|
|
throw new Error('doPick called before start');
|
|
}
|
|
this.trace('Pick called');
|
|
const pickResult = this.channel.doPick(
|
|
this.metadata,
|
|
this.callConfig.pickInformation
|
|
);
|
|
const subchannelString = pickResult.subchannel
|
|
? '(' +
|
|
pickResult.subchannel.getChannelzRef().id +
|
|
') ' +
|
|
pickResult.subchannel.getAddress()
|
|
: '' + pickResult.subchannel;
|
|
this.trace(
|
|
'Pick result: ' +
|
|
PickResultType[pickResult.pickResultType] +
|
|
' subchannel: ' +
|
|
subchannelString +
|
|
' status: ' +
|
|
pickResult.status?.code +
|
|
' ' +
|
|
pickResult.status?.details
|
|
);
|
|
switch (pickResult.pickResultType) {
|
|
case PickResultType.COMPLETE:
|
|
this.credentials
|
|
.generateMetadata({ service_url: this.serviceUrl })
|
|
.then(
|
|
credsMetadata => {
|
|
const finalMetadata = this.metadata!.clone();
|
|
finalMetadata.merge(credsMetadata);
|
|
if (finalMetadata.get('authorization').length > 1) {
|
|
this.outputStatus(
|
|
{
|
|
code: Status.INTERNAL,
|
|
details:
|
|
'"authorization" metadata cannot have multiple values',
|
|
metadata: new Metadata(),
|
|
},
|
|
'PROCESSED'
|
|
);
|
|
}
|
|
if (
|
|
pickResult.subchannel!.getConnectivityState() !==
|
|
ConnectivityState.READY
|
|
) {
|
|
this.trace(
|
|
'Picked subchannel ' +
|
|
subchannelString +
|
|
' has state ' +
|
|
ConnectivityState[
|
|
pickResult.subchannel!.getConnectivityState()
|
|
] +
|
|
' after getting credentials metadata. Retrying pick'
|
|
);
|
|
this.doPick();
|
|
return;
|
|
}
|
|
|
|
if (this.deadline !== Infinity) {
|
|
finalMetadata.set(
|
|
'grpc-timeout',
|
|
getDeadlineTimeoutString(this.deadline)
|
|
);
|
|
}
|
|
try {
|
|
this.child = pickResult
|
|
.subchannel!.getRealSubchannel()
|
|
.createCall(finalMetadata, this.host, this.methodName, {
|
|
onReceiveMetadata: metadata => {
|
|
this.trace('Received metadata');
|
|
this.listener!.onReceiveMetadata(metadata);
|
|
},
|
|
onReceiveMessage: message => {
|
|
this.trace('Received message');
|
|
this.listener!.onReceiveMessage(message);
|
|
},
|
|
onReceiveStatus: status => {
|
|
this.trace('Received status');
|
|
if (
|
|
status.rstCode ===
|
|
http2.constants.NGHTTP2_REFUSED_STREAM
|
|
) {
|
|
this.outputStatus(status, 'REFUSED');
|
|
} else {
|
|
this.outputStatus(status, 'PROCESSED');
|
|
}
|
|
},
|
|
});
|
|
} catch (error) {
|
|
this.trace(
|
|
'Failed to start call on picked subchannel ' +
|
|
subchannelString +
|
|
' with error ' +
|
|
(error as Error).message
|
|
);
|
|
this.outputStatus(
|
|
{
|
|
code: Status.INTERNAL,
|
|
details:
|
|
'Failed to start HTTP/2 stream with error ' +
|
|
(error as Error).message,
|
|
metadata: new Metadata(),
|
|
},
|
|
'NOT_STARTED'
|
|
);
|
|
return;
|
|
}
|
|
this.callConfig.onCommitted?.();
|
|
pickResult.onCallStarted?.();
|
|
this.onCallEnded = pickResult.onCallEnded;
|
|
this.trace(
|
|
'Created child call [' + this.child.getCallNumber() + ']'
|
|
);
|
|
if (this.readPending) {
|
|
this.child.startRead();
|
|
}
|
|
if (this.pendingMessage) {
|
|
this.child.sendMessageWithContext(
|
|
this.pendingMessage.context,
|
|
this.pendingMessage.message
|
|
);
|
|
}
|
|
if (this.pendingHalfClose) {
|
|
this.child.halfClose();
|
|
}
|
|
},
|
|
(error: Error & { code: number }) => {
|
|
// We assume the error code isn't 0 (Status.OK)
|
|
const { code, details } = restrictControlPlaneStatusCode(
|
|
typeof error.code === 'number' ? error.code : Status.UNKNOWN,
|
|
`Getting metadata from plugin failed with error: ${error.message}`
|
|
);
|
|
this.outputStatus(
|
|
{
|
|
code: code,
|
|
details: details,
|
|
metadata: new Metadata(),
|
|
},
|
|
'PROCESSED'
|
|
);
|
|
}
|
|
);
|
|
break;
|
|
case PickResultType.DROP:
|
|
const { code, details } = restrictControlPlaneStatusCode(
|
|
pickResult.status!.code,
|
|
pickResult.status!.details
|
|
);
|
|
setImmediate(() => {
|
|
this.outputStatus(
|
|
{ code, details, metadata: pickResult.status!.metadata },
|
|
'DROP'
|
|
);
|
|
});
|
|
break;
|
|
case PickResultType.TRANSIENT_FAILURE:
|
|
if (this.metadata.getOptions().waitForReady) {
|
|
this.channel.queueCallForPick(this);
|
|
} else {
|
|
const { code, details } = restrictControlPlaneStatusCode(
|
|
pickResult.status!.code,
|
|
pickResult.status!.details
|
|
);
|
|
setImmediate(() => {
|
|
this.outputStatus(
|
|
{ code, details, metadata: pickResult.status!.metadata },
|
|
'PROCESSED'
|
|
);
|
|
});
|
|
}
|
|
break;
|
|
case PickResultType.QUEUE:
|
|
this.channel.queueCallForPick(this);
|
|
}
|
|
}
|
|
|
|
cancelWithStatus(status: Status, details: string): void {
|
|
this.trace(
|
|
'cancelWithStatus code: ' + status + ' details: "' + details + '"'
|
|
);
|
|
this.child?.cancelWithStatus(status, details);
|
|
this.outputStatus(
|
|
{ code: status, details: details, metadata: new Metadata() },
|
|
'PROCESSED'
|
|
);
|
|
}
|
|
getPeer(): string {
|
|
return this.child?.getPeer() ?? this.channel.getTarget();
|
|
}
|
|
start(
|
|
metadata: Metadata,
|
|
listener: LoadBalancingCallInterceptingListener
|
|
): void {
|
|
this.trace('start called');
|
|
this.listener = listener;
|
|
this.metadata = metadata;
|
|
this.doPick();
|
|
}
|
|
sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
|
this.trace('write() called with message of length ' + message.length);
|
|
if (this.child) {
|
|
this.child.sendMessageWithContext(context, message);
|
|
} else {
|
|
this.pendingMessage = { context, message };
|
|
}
|
|
}
|
|
startRead(): void {
|
|
this.trace('startRead called');
|
|
if (this.child) {
|
|
this.child.startRead();
|
|
} else {
|
|
this.readPending = true;
|
|
}
|
|
}
|
|
halfClose(): void {
|
|
this.trace('halfClose called');
|
|
if (this.child) {
|
|
this.child.halfClose();
|
|
} else {
|
|
this.pendingHalfClose = true;
|
|
}
|
|
}
|
|
setCredentials(credentials: CallCredentials): void {
|
|
throw new Error('Method not implemented.');
|
|
}
|
|
|
|
getCallNumber(): number {
|
|
return this.callNumber;
|
|
}
|
|
}
|