diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts index fe9f3c62..a050b44c 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -17,6 +17,7 @@ import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { isIPv4, isIPv6 } from "net"; +import { Locality__Output } from "../generated/envoy/config/core/v3/Locality"; import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment"; import { Any__Output } from "../generated/google/protobuf/Any"; import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; @@ -27,6 +28,10 @@ function trace(text: string): void { experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); } +function localitiesEqual(a: Locality__Output, b: Locality__Output) { + return a.region === b.region && a.sub_zone === b.sub_zone && a.zone === b.zone; +} + export class EdsState implements XdsStreamState { public versionInfo = ''; public nonce = ''; @@ -112,7 +117,17 @@ export class EdsState implements XdsStreamState { * @param message */ private validateResponse(message: ClusterLoadAssignment__Output) { + const seenLocalities: {locality: Locality__Output, priority: number}[] = []; for (const endpoint of message.endpoints) { + if (!endpoint.locality) { + return false; + } + for (const {locality, priority} of seenLocalities) { + if (localitiesEqual(endpoint.locality, locality) && endpoint.priority === priority) { + return false; + } + } + seenLocalities.push({locality: endpoint.locality, priority: endpoint.priority}); for (const lb of endpoint.lb_endpoints) { const socketAddress = lb.endpoint?.address?.socket_address; if (!socketAddress) {