mirror of https://github.com/grpc/grpc-node.git
Add more tracers, fix onResourceDoesNotExist handling
This commit is contained in:
parent
2404446234
commit
197cc84e7a
|
|
@ -200,7 +200,9 @@ function main() {
|
|||
|
||||
const loadBalancerStatsServiceImpl: LoadBalancerStatsServiceHandlers = {
|
||||
GetClientStats: (call, callback) => {
|
||||
console.log(`Received stats request with num_rpcs=${call.request.num_rpcs} and timeout_sec=${call.request.num_rpcs}`);
|
||||
callStatsTracker.getCallStats(call.request.num_rpcs, call.request.timeout_sec).then((value) => {
|
||||
console.log(`Sending stats response: ${JSON.stringify(value)}`);
|
||||
callback(null, value);
|
||||
}, (error) => {
|
||||
callback({code: grpc.status.ABORTED, details: 'Call stats collection failed'});
|
||||
|
|
|
|||
|
|
@ -34,6 +34,14 @@ import { ConnectivityState } from './channel';
|
|||
import { UnavailablePicker } from './picker';
|
||||
import { Status } from './constants';
|
||||
import { Metadata } from './metadata';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
|
||||
const TRACER_NAME = 'cds_balancer';
|
||||
|
||||
function trace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
const TYPE_NAME = 'cds';
|
||||
|
||||
|
|
@ -70,6 +78,7 @@ export class CdsLoadBalancer implements LoadBalancer {
|
|||
* Otherwise, if the field is omitted, load reporting is disabled. */
|
||||
edsConfig.lrsLoadReportingServerName = '';
|
||||
}
|
||||
trace('Child update EDS config: ' + JSON.stringify(edsConfig));
|
||||
this.childBalancer.updateAddressList(
|
||||
[],
|
||||
{ name: 'eds', eds: edsConfig },
|
||||
|
|
@ -82,6 +91,8 @@ export class CdsLoadBalancer implements LoadBalancer {
|
|||
this.watcher
|
||||
);
|
||||
this.isWatcherActive = false;
|
||||
this.channelControlHelper.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: Status.UNAVAILABLE, details: 'CDS resource does not exist', metadata: new Metadata()}));
|
||||
this.childBalancer.destroy();
|
||||
},
|
||||
onTransientError: (status) => {
|
||||
if (this.latestCdsUpdate === null) {
|
||||
|
|
@ -104,11 +115,14 @@ export class CdsLoadBalancer implements LoadBalancer {
|
|||
attributes: { [key: string]: unknown }
|
||||
): void {
|
||||
if (!isCdsLoadBalancingConfig(lbConfig)) {
|
||||
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig));
|
||||
return;
|
||||
}
|
||||
if (!(attributes.xdsClient instanceof XdsClient)) {
|
||||
trace('Discarding address list update missing xdsClient attribute');
|
||||
return;
|
||||
}
|
||||
trace('Received update with config ' + JSON.stringify(lbConfig));
|
||||
this.xdsClient = attributes.xdsClient;
|
||||
this.latestAttributes = attributes;
|
||||
|
||||
|
|
|
|||
|
|
@ -133,9 +133,11 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
|
|||
destroy(): void {
|
||||
if (this.currentChild) {
|
||||
this.currentChild.destroy();
|
||||
this.currentChild = null;
|
||||
}
|
||||
if (this.pendingChild) {
|
||||
this.pendingChild.destroy();
|
||||
this.pendingChild = null;
|
||||
}
|
||||
}
|
||||
getTypeName(): string {
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ import {
|
|||
registerLoadBalancerType,
|
||||
getFirstUsableConfig,
|
||||
} from './load-balancer';
|
||||
import { SubchannelAddress } from './subchannel';
|
||||
import { SubchannelAddress, subchannelAddressToString } from './subchannel';
|
||||
import {
|
||||
LoadBalancingConfig,
|
||||
isEdsLoadBalancingConfig,
|
||||
|
|
@ -40,6 +40,14 @@ import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
|
|||
import { LocalitySubchannelAddress } from './load-balancer-priority';
|
||||
import { Status } from './constants';
|
||||
import { Metadata } from './metadata';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
|
||||
const TRACER_NAME = 'eds_balancer';
|
||||
|
||||
function trace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
const TYPE_NAME = 'eds';
|
||||
|
||||
|
|
@ -136,6 +144,8 @@ export class EdsLoadBalancer implements LoadBalancer {
|
|||
this.watcher
|
||||
);
|
||||
this.isWatcherActive = false;
|
||||
this.channelControlHelper.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: Status.UNAVAILABLE, details: 'EDS resource does not exist', metadata: new Metadata()}));
|
||||
this.childBalancer.destroy();
|
||||
},
|
||||
onTransientError: (status) => {
|
||||
if (this.latestEdsUpdate === null) {
|
||||
|
|
@ -356,6 +366,8 @@ export class EdsLoadBalancer implements LoadBalancer {
|
|||
priorities: newPriorityNames.filter((value) => value !== undefined),
|
||||
},
|
||||
};
|
||||
trace('Child update addresses: ' + addressList.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'));
|
||||
trace('Child update service config: ' + JSON.stringify(childConfig));
|
||||
this.childBalancer.updateAddressList(
|
||||
addressList,
|
||||
childConfig,
|
||||
|
|
@ -372,11 +384,14 @@ export class EdsLoadBalancer implements LoadBalancer {
|
|||
attributes: { [key: string]: unknown }
|
||||
): void {
|
||||
if (!isEdsLoadBalancingConfig(lbConfig)) {
|
||||
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig));
|
||||
return;
|
||||
}
|
||||
if (!(attributes.xdsClient instanceof XdsClient)) {
|
||||
trace('Discarding address list update missing xdsClient attribute');
|
||||
return;
|
||||
}
|
||||
trace('Received update with config: ' + JSON.stringify(lbConfig));
|
||||
this.lastestConfig = lbConfig;
|
||||
this.latestAttributes = attributes;
|
||||
this.xdsClient = attributes.xdsClient;
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ import {
|
|||
getFirstUsableConfig,
|
||||
registerLoadBalancerType,
|
||||
} from './load-balancer';
|
||||
import { SubchannelAddress } from './subchannel';
|
||||
import { SubchannelAddress, subchannelAddressToString } from './subchannel';
|
||||
import {
|
||||
LoadBalancingConfig,
|
||||
isPriorityLoadBalancingConfig,
|
||||
|
|
@ -32,6 +32,14 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
|
|||
import { ChannelOptions } from './channel-options';
|
||||
import { Status } from './constants';
|
||||
import { Metadata } from './metadata';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
|
||||
const TRACER_NAME = 'priority';
|
||||
|
||||
function trace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
const TYPE_NAME = 'priority';
|
||||
|
||||
|
|
@ -222,6 +230,10 @@ export class PriorityLoadBalancer implements LoadBalancer {
|
|||
constructor(private channelControlHelper: ChannelControlHelper) {}
|
||||
|
||||
private updateState(state: ConnectivityState, picker: Picker) {
|
||||
trace(
|
||||
'Transitioning to ' +
|
||||
ConnectivityState[state]
|
||||
);
|
||||
/* If switching to IDLE, use a QueuePicker attached to this load balancer
|
||||
* so that when the picker calls exitIdle, that in turn calls exitIdle on
|
||||
* the PriorityChildImpl, which will start the failover timer. */
|
||||
|
|
@ -233,6 +245,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
|
|||
|
||||
private onChildStateChange(child: PriorityChildBalancer) {
|
||||
const childState = child.getConnectivityState();
|
||||
trace('Child ' + child.getName() + ' transitioning to ' + ConnectivityState[childState]);
|
||||
if (child === this.currentChildFromBeforeUpdate) {
|
||||
if (
|
||||
childState === ConnectivityState.READY ||
|
||||
|
|
@ -369,6 +382,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
|
|||
): void {
|
||||
if (!isPriorityLoadBalancingConfig(lbConfig)) {
|
||||
// Reject a config of the wrong type
|
||||
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig));
|
||||
return;
|
||||
}
|
||||
const priorityConfig = lbConfig.priority;
|
||||
|
|
@ -416,6 +430,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
|
|||
const chosenChildConfig = getFirstUsableConfig(childConfig.config);
|
||||
if (chosenChildConfig !== null) {
|
||||
const childAddresses = childAddressMap.get(childName) ?? [];
|
||||
trace('Assigning child ' + childName + ' address list ' + childAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'))
|
||||
this.latestUpdates.set(childName, {
|
||||
subchannelAddress: childAddresses,
|
||||
lbConfig: chosenChildConfig,
|
||||
|
|
@ -433,6 +448,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
|
|||
// Deactivate all children that are no longer in the priority list
|
||||
for (const [childName, child] of this.children) {
|
||||
if (this.priorities.indexOf(childName) < 0) {
|
||||
trace('Deactivating child ' + childName);
|
||||
child.deactivate();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
|
||||
import { LoadBalancer, ChannelControlHelper, getFirstUsableConfig, registerLoadBalancerType } from "./load-balancer";
|
||||
import { SubchannelAddress } from "./subchannel";
|
||||
import { SubchannelAddress, subchannelAddressToString } from "./subchannel";
|
||||
import { LoadBalancingConfig, WeightedTarget, isWeightedTargetLoadBalancingConfig } from "./load-balancing-config";
|
||||
import { Picker, PickResult, PickArgs, QueuePicker, UnavailablePicker } from "./picker";
|
||||
import { ConnectivityState } from "./channel";
|
||||
|
|
@ -24,6 +24,14 @@ import { ChildLoadBalancerHandler } from "./load-balancer-child-handler";
|
|||
import { Status } from "./constants";
|
||||
import { Metadata } from "./metadata";
|
||||
import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority";
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
|
||||
const TRACER_NAME = 'weighted_target';
|
||||
|
||||
function trace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
const TYPE_NAME = 'weighted_target';
|
||||
|
||||
|
|
@ -116,6 +124,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
private updateState(connectivityState: ConnectivityState, picker: Picker) {
|
||||
trace('Target ' + this.name + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[connectivityState]);
|
||||
this.connectivityState = connectivityState;
|
||||
this.picker = picker;
|
||||
this.parent.updateState();
|
||||
|
|
@ -239,12 +248,17 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
|
|||
metadata: new Metadata()
|
||||
});
|
||||
}
|
||||
trace(
|
||||
'Transitioning to ' +
|
||||
ConnectivityState[connectivityState]
|
||||
);
|
||||
this.channelControlHelper.updateState(connectivityState, picker);
|
||||
}
|
||||
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
if (!isWeightedTargetLoadBalancingConfig(lbConfig)) {
|
||||
// Reject a config of the wrong type
|
||||
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -252,7 +266,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
|
|||
* which child it belongs to. So we bucket those addresses by that first
|
||||
* element, and pass along the rest of the localityPath for that child
|
||||
* to use. */
|
||||
const childAddressMap = new Map<string, SubchannelAddress[]>();
|
||||
const childAddressMap = new Map<string, LocalitySubchannelAddress[]>();
|
||||
for (const address of addressList) {
|
||||
if (!isLocalitySubchannelAddress(address)) {
|
||||
// Reject address that cannot be associated with targets
|
||||
|
|
@ -284,12 +298,15 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
|
|||
} else {
|
||||
target.maybeReactivate();
|
||||
}
|
||||
target.updateAddressList(childAddressMap.get(targetName) ?? [], targetConfig, attributes);
|
||||
const targetAddresses = childAddressMap.get(targetName) ?? [];
|
||||
trace('Assigning target ' + targetName + ' address list ' + targetAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'));
|
||||
target.updateAddressList(targetAddresses, targetConfig, attributes);
|
||||
}
|
||||
|
||||
// Deactivate targets that are not in the new config
|
||||
for (const [targetName, target] of this.targets) {
|
||||
if (this.targetList.indexOf(targetName) < 0) {
|
||||
trace('Deactivating target ' + targetName);
|
||||
target.deactivate();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,9 +19,16 @@ import { GrpcUri, uriToString } from './uri-parser';
|
|||
import { XdsClient } from './xds-client';
|
||||
import { ServiceConfig } from './service-config';
|
||||
import { StatusObject } from './call-stream';
|
||||
import { Status } from './constants';
|
||||
import { Status, LogVerbosity } from './constants';
|
||||
import { Metadata } from './metadata';
|
||||
import { ChannelOptions } from './channel-options';
|
||||
import * as logging from './logging';
|
||||
|
||||
const TRACER_NAME = 'xds_resolver';
|
||||
|
||||
function trace(text: string): void {
|
||||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||
}
|
||||
|
||||
class XdsResolver implements Resolver {
|
||||
private resolutionStarted = false;
|
||||
|
|
@ -47,10 +54,12 @@ class XdsResolver implements Resolver {
|
|||
// Wait until updateResolution is called once to start the xDS requests
|
||||
if (!this.resolutionStarted) {
|
||||
this.resolutionStarted = true;
|
||||
trace('Starting resolution for target ' + this.target);
|
||||
const xdsClient = new XdsClient(
|
||||
this.target.path,
|
||||
{
|
||||
onValidUpdate: (update: ServiceConfig) => {
|
||||
trace('Resolved service config for target ' + this.target + ': ' + JSON.stringify(update));
|
||||
this.hasReportedSuccess = true;
|
||||
this.listener.onSuccessfulResolution([], update, null, {
|
||||
xdsClient: xdsClient,
|
||||
|
|
@ -60,10 +69,12 @@ class XdsResolver implements Resolver {
|
|||
/* A transient error only needs to bubble up as a failure if we have
|
||||
* not already provided a ServiceConfig for the upper layer to use */
|
||||
if (!this.hasReportedSuccess) {
|
||||
trace('Resolution error for target ' + this.target + ' due to xDS client transient error ' + error.details);
|
||||
this.reportResolutionError();
|
||||
}
|
||||
},
|
||||
onResourceDoesNotExist: () => {
|
||||
trace('Resolution error for target ' + this.target + ': resource does not exist');
|
||||
this.reportResolutionError();
|
||||
},
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in New Issue