diff --git a/xds/src/main/java/io/grpc/xds/XdsClientLoadRecorder.java b/xds/src/main/java/io/grpc/xds/XdsClientLoadRecorder.java new file mode 100644 index 0000000000..edf531ca7f --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsClientLoadRecorder.java @@ -0,0 +1,118 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.grpc.ClientStreamTracer; +import io.grpc.ClientStreamTracer.StreamInfo; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.util.ForwardingClientStreamTracer; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.ThreadSafe; + +/** + * An {@link XdsClientLoadRecorder} instance records and aggregates client-side load data into an + * {@link ClientLoadCounter} object. + */ +@ThreadSafe +final class XdsClientLoadRecorder extends ClientStreamTracer.Factory { + + private final ClientStreamTracer.Factory delegate; + private final ClientLoadCounter counter; + + XdsClientLoadRecorder(ClientLoadCounter counter, ClientStreamTracer.Factory delegate) { + this.counter = checkNotNull(counter, "counter"); + this.delegate = checkNotNull(delegate, "delegate"); + } + + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + counter.callsInProgress.getAndIncrement(); + final ClientStreamTracer delegateTracer = delegate.newClientStreamTracer(info, headers); + return new StreamTracer(delegateTracer); + } + + /** + * A {@link ClientLoadSnapshot} represents a snapshot of {@link ClientLoadCounter} to be sent as + * part of {@link io.envoyproxy.envoy.api.v2.endpoint.ClusterStats} to the balancer. + */ + static final class ClientLoadSnapshot { + + final long callsSucceed; + final long callsInProgress; + final long callsFailed; + + ClientLoadSnapshot(long callsSucceed, long callsInProgress, long callsFailed) { + this.callsSucceed = callsSucceed; + this.callsInProgress = callsInProgress; + this.callsFailed = callsFailed; + } + } + + static final class ClientLoadCounter { + + private final AtomicLong callsInProgress = new AtomicLong(); + private final AtomicLong callsFinished = new AtomicLong(); + private final AtomicLong callsFailed = new AtomicLong(); + private boolean active = true; + + /** + * Generate a query count snapshot and reset counts for next snapshot. + */ + ClientLoadSnapshot snapshot() { + long numFailed = callsFailed.getAndSet(0); + return new ClientLoadSnapshot( + callsFinished.getAndSet(0) - numFailed, + callsInProgress.get(), + numFailed); + } + + boolean isActive() { + return active; + } + + void setActive(boolean value) { + active = value; + } + } + + private class StreamTracer extends ForwardingClientStreamTracer { + + private final ClientStreamTracer delegate; + + private StreamTracer(ClientStreamTracer delegate) { + this.delegate = checkNotNull(delegate, "delegate"); + } + + @Override + protected ClientStreamTracer delegate() { + return delegate; + } + + @Override + public void streamClosed(Status status) { + counter.callsFinished.getAndIncrement(); + counter.callsInProgress.getAndDecrement(); + if (!status.isOk()) { + counter.callsFailed.getAndIncrement(); + } + delegate().streamClosed(status); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadReportStore.java b/xds/src/main/java/io/grpc/xds/XdsLoadReportStore.java new file mode 100644 index 0000000000..103b8fc1fd --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsLoadReportStore.java @@ -0,0 +1,169 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Duration; +import io.envoyproxy.envoy.api.v2.core.Locality; +import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; +import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests; +import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats; +import io.grpc.ClientStreamTracer; +import io.grpc.ClientStreamTracer.StreamInfo; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.Metadata; +import io.grpc.xds.XdsClientLoadRecorder.ClientLoadCounter; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An {@link XdsLoadReportStore} instance holds the client side load stats for a cluster. + */ +final class XdsLoadReportStore { + + private static final ClientStreamTracer NOOP_CLIENT_STREAM_TRACER = + new ClientStreamTracer() { + }; + private static final ClientStreamTracer.Factory NOOP_CLIENT_STREAM_TRACER_FACTORY = + new ClientStreamTracer.Factory() { + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return NOOP_CLIENT_STREAM_TRACER; + } + }; + private final String clusterName; + private final ConcurrentMap localityLoadCounters; + // Cluster level dropped request counts for each category specified in the DropOverload policy. + private final ConcurrentMap dropCounters; + + XdsLoadReportStore(String clusterName) { + this(clusterName, new ConcurrentHashMap(), + new ConcurrentHashMap()); + } + + @VisibleForTesting + XdsLoadReportStore(String clusterName, + ConcurrentMap localityLoadCounters, + ConcurrentMap dropCounters) { + this.clusterName = checkNotNull(clusterName, "clusterName"); + this.localityLoadCounters = checkNotNull(localityLoadCounters, "localityLoadCounters"); + this.dropCounters = checkNotNull(dropCounters, "dropCounters"); + } + + /** + * Generates a {@link ClusterStats} containing load stats in locality granularity. + * This method should be called in the same synchronized context that + * {@link XdsLoadBalancer#helper#getSynchronizationContext} returns. + */ + ClusterStats generateLoadReport(Duration interval) { + ClusterStats.Builder statsBuilder = ClusterStats.newBuilder().setClusterName(clusterName) + .setLoadReportInterval(interval); + for (Map.Entry entry : localityLoadCounters + .entrySet()) { + XdsClientLoadRecorder.ClientLoadSnapshot snapshot = entry.getValue().snapshot(); + statsBuilder + .addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder() + .setLocality(entry.getKey()) + .setTotalSuccessfulRequests(snapshot.callsSucceed) + .setTotalErrorRequests(snapshot.callsFailed) + .setTotalRequestsInProgress(snapshot.callsInProgress)); + // Discard counters for localities that are no longer exposed by the remote balancer and + // no RPCs ongoing. + if (!entry.getValue().isActive() && snapshot.callsInProgress == 0) { + localityLoadCounters.remove(entry.getKey()); + } + } + for (Map.Entry entry : dropCounters.entrySet()) { + statsBuilder.addDroppedRequests(DroppedRequests.newBuilder() + .setCategory(entry.getKey()) + .setDroppedCount(entry.getValue().getAndSet(0))); + } + return statsBuilder.build(); + } + + /** + * Create a {@link ClientLoadCounter} for the provided locality or make it active if already in + * this {@link XdsLoadReportStore}. This method needs to be called at locality updates only for + * newly assigned localities in balancer discovery responses. + * This method should be called in the same synchronized context that + * {@link XdsLoadBalancer#helper#getSynchronizationContext} returns. + */ + void addLocality(final Locality locality) { + ClientLoadCounter counter = localityLoadCounters.get(locality); + checkState(counter == null || !counter.isActive(), + "An active ClientLoadCounter for locality %s already exists", locality); + if (counter == null) { + localityLoadCounters.put(locality, new ClientLoadCounter()); + } else { + counter.setActive(true); + } + } + + /** + * Deactivate the {@link ClientLoadCounter} for the provided locality in by this + * {@link XdsLoadReportStore}. Inactive {@link ClientLoadCounter}s are for localities + * no longer exposed by the remote balancer. This method needs to be called at + * locality updates only for localities newly removed from balancer discovery responses. + * This method should be called in the same synchronized context that + * {@link XdsLoadBalancer#helper#getSynchronizationContext} returns. + */ + void removeLocality(final Locality locality) { + ClientLoadCounter counter = localityLoadCounters.get(locality); + checkState(counter != null && counter.isActive(), + "No active ClientLoadCounter for locality %s exists", locality); + counter.setActive(false); + } + + /** + * Intercepts a in-locality PickResult with load recording {@link ClientStreamTracer.Factory}. + */ + PickResult interceptPickResult(PickResult pickResult, Locality locality) { + if (!pickResult.getStatus().isOk()) { + return pickResult; + } + XdsClientLoadRecorder.ClientLoadCounter counter = localityLoadCounters.get(locality); + if (counter == null) { + return pickResult; + } + ClientStreamTracer.Factory originFactory = pickResult.getStreamTracerFactory(); + if (originFactory == null) { + originFactory = NOOP_CLIENT_STREAM_TRACER_FACTORY; + } + XdsClientLoadRecorder recorder = new XdsClientLoadRecorder(counter, originFactory); + return PickResult.withSubchannel(pickResult.getSubchannel(), recorder); + } + + /** + * Record that a request has been dropped by drop overload policy with the provided category + * instructed by the remote balancer. + */ + void recordDroppedRequest(String category) { + AtomicLong counter = dropCounters.get(category); + if (counter == null) { + counter = dropCounters.putIfAbsent(category, new AtomicLong()); + if (counter == null) { + counter = dropCounters.get(category); + } + } + counter.getAndIncrement(); + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadReportStoreTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadReportStoreTest.java new file mode 100644 index 0000000000..5c3ee0beda --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsLoadReportStoreTest.java @@ -0,0 +1,308 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.protobuf.Duration; +import io.envoyproxy.envoy.api.v2.core.Locality; +import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; +import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests; +import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.ClientStreamTracer; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.xds.XdsClientLoadRecorder.ClientLoadCounter; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Unit tests for {@link XdsLoadReportStore}. */ +public class XdsLoadReportStoreTest { + private static final String SERVICE_NAME = "api.google.com"; + private static final ClientStreamTracer.StreamInfo STREAM_INFO = + new ClientStreamTracer.StreamInfo() { + @Override + public Attributes getTransportAttrs() { + return Attributes.EMPTY; + } + + @Override + public CallOptions getCallOptions() { + return CallOptions.DEFAULT; + } + }; + private static final Locality TEST_LOCALITY = Locality.newBuilder() + .setRegion("test_region") + .setZone("test_zone") + .setSubZone("test_subzone") + .build(); + @Mock Subchannel fakeSubchannel; + private ConcurrentMap localityLoadCounters; + private ConcurrentMap dropCounters; + private XdsLoadReportStore loadStore; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + localityLoadCounters = new ConcurrentHashMap<>(); + dropCounters = new ConcurrentHashMap<>(); + loadStore = new XdsLoadReportStore(SERVICE_NAME, localityLoadCounters, dropCounters); + } + + @Test + public void loadNotRecordedForUntrackedLocality() { + PickResult pickResult = PickResult.withSubchannel(fakeSubchannel); + // XdsClientLoadStore does not record loads for untracked localities. + PickResult interceptedPickResult = loadStore.interceptPickResult(pickResult, TEST_LOCALITY); + assertThat(localityLoadCounters).hasSize(0); + assertThat(interceptedPickResult.getStreamTracerFactory()).isNull(); + } + + @Test + public void invalidPickResultNotIntercepted() { + PickResult errorResult = PickResult.withError(Status.UNAVAILABLE.withDescription("Error")); + PickResult emptyResult = PickResult.withNoResult(); + PickResult droppedResult = PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped")); + PickResult interceptedErrorResult = loadStore.interceptPickResult(errorResult, TEST_LOCALITY); + PickResult interceptedEmptyResult = loadStore.interceptPickResult(emptyResult, TEST_LOCALITY); + PickResult interceptedDroppedResult = loadStore + .interceptPickResult(droppedResult, TEST_LOCALITY); + assertThat(localityLoadCounters).hasSize(0); + assertThat(interceptedErrorResult.getStreamTracerFactory()).isNull(); + assertThat(interceptedEmptyResult.getStreamTracerFactory()).isNull(); + assertThat(interceptedDroppedResult.getStreamTracerFactory()).isNull(); + } + + @Test + public void interceptPreservesOriginStreamTracer() { + loadStore.addLocality(TEST_LOCALITY); + ClientStreamTracer.Factory mockFactory = mock(ClientStreamTracer.Factory.class); + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + when(mockFactory + .newClientStreamTracer(any(ClientStreamTracer.StreamInfo.class), any(Metadata.class))) + .thenReturn(mockTracer); + PickResult pickResult = PickResult.withSubchannel(fakeSubchannel, mockFactory); + PickResult interceptedPickResult = loadStore.interceptPickResult(pickResult, TEST_LOCALITY); + Metadata metadata = new Metadata(); + interceptedPickResult.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, metadata) + .streamClosed(Status.OK); + ArgumentCaptor streamInfoArgumentCaptor = ArgumentCaptor + .forClass(null); + ArgumentCaptor metadataArgumentCaptor = ArgumentCaptor.forClass(null); + verify(mockFactory).newClientStreamTracer(streamInfoArgumentCaptor.capture(), + metadataArgumentCaptor.capture()); + assertThat(streamInfoArgumentCaptor.getValue()).isSameAs(STREAM_INFO); + assertThat(metadataArgumentCaptor.getValue()).isSameAs(metadata); + verify(mockTracer).streamClosed(Status.OK); + } + + @Test + public void loadStatsRecording() { + Locality locality1 = + Locality.newBuilder() + .setRegion("test_region1") + .setZone("test_zone") + .setSubZone("test_subzone") + .build(); + loadStore.addLocality(locality1); + PickResult pickResult1 = PickResult.withSubchannel(fakeSubchannel); + PickResult interceptedPickResult1 = loadStore.interceptPickResult(pickResult1, locality1); + assertThat(interceptedPickResult1.getSubchannel()).isSameAs(fakeSubchannel); + assertThat(localityLoadCounters).containsKey(locality1); + ClientStreamTracer tracer = + interceptedPickResult1 + .getStreamTracerFactory() + .newClientStreamTracer(STREAM_INFO, new Metadata()); + Duration interval = Duration.newBuilder().setNanos(342).build(); + ClusterStats expectedLoadReport = buildClusterStats(interval, + Collections.singletonList(buildUpstreamLocalityStats(locality1, 0, 0, 1)), null); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + + // Make another load report should not reset count for calls in progress. + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + + tracer.streamClosed(Status.OK); + expectedLoadReport = buildClusterStats(interval, + Collections.singletonList(buildUpstreamLocalityStats(locality1, 1, 0, 0)), null); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + + // Make another load report should reset finished calls count for calls finished. + expectedLoadReport = buildClusterStats(interval, + Collections.singletonList(buildUpstreamLocalityStats(locality1, 0, 0, 0)), null); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + + // PickResult within the same locality should aggregate to the same counter. + PickResult pickResult2 = PickResult.withSubchannel(fakeSubchannel); + PickResult interceptedPickResult2 = loadStore.interceptPickResult(pickResult2, locality1); + assertThat(localityLoadCounters).hasSize(1); + interceptedPickResult1 + .getStreamTracerFactory() + .newClientStreamTracer(STREAM_INFO, new Metadata()) + .streamClosed(Status.ABORTED); + interceptedPickResult2 + .getStreamTracerFactory() + .newClientStreamTracer(STREAM_INFO, new Metadata()) + .streamClosed(Status.CANCELLED); + expectedLoadReport = buildClusterStats(interval, + Collections.singletonList(buildUpstreamLocalityStats(locality1, 0, 2, 0)), null); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + + expectedLoadReport = buildClusterStats(interval, + Collections.singletonList(buildUpstreamLocalityStats(locality1, 0, 0, 0)), null); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + + Locality locality2 = + Locality.newBuilder() + .setRegion("test_region2") + .setZone("test_zone") + .setSubZone("test_subzone") + .build(); + loadStore.addLocality(locality2); + PickResult pickResult3 = PickResult.withSubchannel(fakeSubchannel); + PickResult interceptedPickResult3 = loadStore.interceptPickResult(pickResult3, locality2); + assertThat(localityLoadCounters).containsKey(locality2); + assertThat(localityLoadCounters).hasSize(2); + interceptedPickResult3 + .getStreamTracerFactory() + .newClientStreamTracer(STREAM_INFO, new Metadata()); + List upstreamLocalityStatsList = + Arrays.asList(buildUpstreamLocalityStats(locality1, 0, 0, 0), + buildUpstreamLocalityStats(locality2, 0, 0, 1)); + expectedLoadReport = buildClusterStats(interval, upstreamLocalityStatsList, null); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + } + + @Test + public void loadRecordingForRemovedLocality() { + loadStore.addLocality(TEST_LOCALITY); + assertThat(localityLoadCounters).containsKey(TEST_LOCALITY); + PickResult pickResult = PickResult.withSubchannel(fakeSubchannel); + PickResult interceptedPickResult = loadStore.interceptPickResult(pickResult, TEST_LOCALITY); + ClientStreamTracer tracer = interceptedPickResult + .getStreamTracerFactory() + .newClientStreamTracer(STREAM_INFO, new Metadata()); + + Duration interval = Duration.newBuilder().setNanos(342).build(); + ClusterStats expectedLoadReport = buildClusterStats(interval, + Collections.singletonList(buildUpstreamLocalityStats(TEST_LOCALITY, 0, 0, 1)), null); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + // Remote balancer instructs to remove the locality while client has in-progress calls + // to backends in the locality, the XdsClientLoadStore continues tracking its load stats. + loadStore.removeLocality(TEST_LOCALITY); + assertThat(localityLoadCounters).containsKey(TEST_LOCALITY); + expectedLoadReport = buildClusterStats(interval, + Collections.singletonList(buildUpstreamLocalityStats(TEST_LOCALITY, 0, 0, 1)), null); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + + tracer.streamClosed(Status.OK); + expectedLoadReport = buildClusterStats(interval, + Collections.singletonList(buildUpstreamLocalityStats(TEST_LOCALITY, 1, 0, 0)), null); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + assertThat(localityLoadCounters).doesNotContainKey(TEST_LOCALITY); + } + + @Test + public void recordingDroppedRequests() { + Random rand = new Random(); + int numLbDrop = rand.nextInt(1000); + int numThrottleDrop = rand.nextInt(1000); + for (int i = 0; i < numLbDrop; i++) { + loadStore.recordDroppedRequest("lb"); + } + for (int i = 0; i < numThrottleDrop; i++) { + loadStore.recordDroppedRequest("throttle"); + } + Duration interval = Duration.newBuilder().setNanos(342).build(); + ClusterStats expectedLoadReport = buildClusterStats(interval, + null, + Arrays.asList( + DroppedRequests.newBuilder() + .setCategory("lb") + .setDroppedCount(numLbDrop) + .build(), + DroppedRequests.newBuilder() + .setCategory("throttle") + .setDroppedCount(numThrottleDrop) + .build() + )); + assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval)); + assertEquals(0, dropCounters.get("lb").get()); + assertEquals(0, dropCounters.get("throttle").get()); + } + + private UpstreamLocalityStats buildUpstreamLocalityStats(Locality locality, long callsSucceed, + long callsFailed, long callsInProgress) { + return UpstreamLocalityStats.newBuilder() + .setLocality(locality) + .setTotalSuccessfulRequests(callsSucceed) + .setTotalErrorRequests(callsFailed) + .setTotalRequestsInProgress(callsInProgress) + .build(); + } + + private ClusterStats buildClusterStats(Duration interval, + @Nullable List upstreamLocalityStatsList, + @Nullable List droppedRequestsList) { + ClusterStats.Builder clusterStatsBuilder = ClusterStats.newBuilder() + .setClusterName(SERVICE_NAME) + .setLoadReportInterval(interval); + if (upstreamLocalityStatsList != null) { + clusterStatsBuilder.addAllUpstreamLocalityStats(upstreamLocalityStatsList); + } + if (droppedRequestsList != null) { + long dropCount = 0; + for (DroppedRequests drop : droppedRequestsList) { + dropCount += drop.getDroppedCount(); + clusterStatsBuilder.addDroppedRequests(drop); + } + clusterStatsBuilder.setTotalDroppedRequests(dropCount); + } + return clusterStatsBuilder.build(); + } + + private void assertClusterStatsEqual(ClusterStats stats1, ClusterStats stats2) { + assertEquals(stats1.getClusterName(), stats2.getClusterName()); + assertEquals(stats1.getLoadReportInterval(), stats2.getLoadReportInterval()); + assertEquals(stats1.getUpstreamLocalityStatsCount(), stats2.getUpstreamLocalityStatsCount()); + assertEquals(stats1.getDroppedRequestsCount(), stats2.getDroppedRequestsCount()); + assertEquals(new HashSet<>(stats1.getUpstreamLocalityStatsList()), + new HashSet<>(stats2.getUpstreamLocalityStatsList())); + assertEquals(new HashSet<>(stats1.getDroppedRequestsList()), + new HashSet<>(stats2.getDroppedRequestsList())); + } +}